mentat/tolstoy/src/tx_processor.rs

165 lines
5.1 KiB
Rust

// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::iter::Peekable;
use mentat_db::TypedSQLValue;
use core_traits::{Entid, TypedValue};
use public_traits::errors::Result;
use crate::types::TxPart;
/// Implementors must specify type of the "receiver report" which
/// they will produce once processor is finished.
pub trait TxReceiver<RR> {
/// Called for each transaction, with an iterator over its datoms.
fn tx<T: Iterator<Item = TxPart>>(&mut self, tx_id: Entid, d: &mut T) -> Result<()>;
/// Called once processor is finished, consuming this receiver and producing a report.
fn done(self) -> RR;
}
pub struct Processor {}
pub struct DatomsIterator<'dbtx, 't, T>
where
T: Sized + Iterator<Item = Result<TxPart>>,
{
at_first: bool,
at_last: bool,
first: &'dbtx TxPart,
rows: &'t mut Peekable<T>,
}
impl<'dbtx, 't, T> DatomsIterator<'dbtx, 't, T>
where
T: Sized + Iterator<Item = Result<TxPart>> + 't,
{
fn new(first: &'dbtx TxPart, rows: &'t mut Peekable<T>) -> DatomsIterator<'dbtx, 't, T> {
DatomsIterator {
at_first: true,
at_last: false,
first,
rows,
}
}
}
impl<'dbtx, 't, T> Iterator for DatomsIterator<'dbtx, 't, T>
where
T: Sized + Iterator<Item = Result<TxPart>> + 't,
{
type Item = TxPart;
fn next(&mut self) -> Option<Self::Item> {
if self.at_last {
return None;
}
if self.at_first {
self.at_first = false;
return Some(self.first.clone());
}
// Look ahead to see if we're about to cross into
// the next partition.
{
let next_option = self.rows.peek();
match next_option {
Some(&Ok(ref next)) => {
if next.tx != self.first.tx {
self.at_last = true;
return None;
}
}
// Empty, or error. Either way, this iterator's done.
_ => {
self.at_last = true;
return None;
}
}
}
// We're in the correct partition, return a TxPart.
if let Some(result) = self.rows.next() {
match result {
Err(_) => None,
Ok(datom) => Some(TxPart {
partitions: None,
e: datom.e,
a: datom.a,
v: datom.v.clone(),
tx: datom.tx,
added: datom.added,
}),
}
} else {
self.at_last = true;
None
}
}
}
fn to_tx_part(row: &rusqlite::Row<'_>) -> Result<TxPart> {
Ok(TxPart {
partitions: None,
e: row.get(0)?,
a: row.get(1)?,
v: TypedValue::from_sql_value_pair(row.get(2)?, row.get(3)?)?,
tx: row.get(4)?,
added: row.get(5)?,
})
}
impl Processor {
pub fn process<RR, R: TxReceiver<RR>>(
sqlite: &rusqlite::Transaction<'_>,
from_tx: Option<Entid>,
mut receiver: R,
) -> Result<RR> {
let tx_filter = match from_tx {
Some(tx) => format!(" WHERE timeline = 0 AND tx > {} ", tx),
None => "WHERE timeline = 0".to_string(),
};
let select_query = format!(
"SELECT e, a, v, value_type_tag, tx, added FROM timelined_transactions {} ORDER BY tx",
tx_filter
);
let mut stmt = sqlite.prepare(&select_query)?;
let mut rows = stmt
.query_and_then(rusqlite::params![], to_tx_part)?
.peekable();
// Walk the transaction table, keeping track of the current "tx".
// Whenever "tx" changes, construct a datoms iterator and pass it to the receiver.
// NB: this logic depends on data coming out of the rows iterator to be sorted by "tx".
let mut current_tx = None;
while let Some(row) = rows.next() {
let datom = row?;
match current_tx {
Some(tx) => {
if tx != datom.tx {
current_tx = Some(datom.tx);
receiver.tx(datom.tx, &mut DatomsIterator::new(&datom, &mut rows))?;
}
}
None => {
current_tx = Some(datom.tx);
receiver.tx(datom.tx, &mut DatomsIterator::new(&datom, &mut rows))?;
}
}
}
// Consume the receiver, letting it produce a "receiver report"
// as defined by generic type RR.
Ok(receiver.done())
}
}