Part 1: Allow specifying transactor's commit behaviour

Normally we want to both materialize our changes (into 'datoms')
as well as commit source transactions into 'transactions' table.

However, when moving transactions from timeline to timeline
we don't want to persist artifacts (rewind assertions), just their
materializations.

This patch expands the 'db' interface to allow for this split,
and changes transactor's functions to take a crate-private 'action'
which defines desired behaviour.
This commit is contained in:
Grisha Kruglov 2018-07-17 12:18:53 -07:00 committed by Grisha Kruglov
parent 5a29efa336
commit 0974108a52
2 changed files with 138 additions and 29 deletions

View file

@ -426,14 +426,10 @@ impl TypedSQLValue for TypedValue {
/// store. /// store.
pub(crate) fn read_materialized_view(conn: &rusqlite::Connection, table: &str) -> Result<Vec<(Entid, Entid, TypedValue)>> { pub(crate) fn read_materialized_view(conn: &rusqlite::Connection, table: &str) -> Result<Vec<(Entid, Entid, TypedValue)>> {
let mut stmt: rusqlite::Statement = conn.prepare(format!("SELECT e, a, v, value_type_tag FROM {}", table).as_str())?; let mut stmt: rusqlite::Statement = conn.prepare(format!("SELECT e, a, v, value_type_tag FROM {}", table).as_str())?;
let m: Result<Vec<(Entid, Entid, TypedValue)>> = stmt.query_and_then(&[], |row| { let m: Result<Vec<_>> = stmt.query_and_then(
let e: Entid = row.get_checked(0)?; &[],
let a: Entid = row.get_checked(1)?; row_to_datom_assertion
let v: rusqlite::types::Value = row.get_checked(2)?; )?.collect();
let value_type_tag: i32 = row.get_checked(3)?;
let typed_value = TypedValue::from_sql_value_pair(v, value_type_tag)?;
Ok((e, a, typed_value))
})?.collect();
m m
} }
@ -515,14 +511,20 @@ pub trait MentatStoring {
fn insert_non_fts_searches<'a>(&self, entities: &'a [ReducedEntity], search_type: SearchType) -> Result<()>; fn insert_non_fts_searches<'a>(&self, entities: &'a [ReducedEntity], search_type: SearchType) -> Result<()>;
fn insert_fts_searches<'a>(&self, entities: &'a [ReducedEntity], search_type: SearchType) -> Result<()>; fn insert_fts_searches<'a>(&self, entities: &'a [ReducedEntity], search_type: SearchType) -> Result<()>;
/// Finalize the underlying storage layer after a Mentat transaction. /// Prepare the underlying storage layer for finalization after a Mentat transaction.
/// ///
/// Use this to finalize temporary tables, complete indices, revert pragmas, etc, after the /// Use this to finalize temporary tables, complete indices, revert pragmas, etc, after the
/// final `insert_non_fts_searches` invocation. /// final `insert_non_fts_searches` invocation.
fn commit_transaction(&self, tx_id: Entid) -> Result<()>; fn materialize_mentat_transaction(&self, tx_id: Entid) -> Result<()>;
/// Extract metadata-related [e a typed_value added] datoms committed in the given transaction. /// Finalize the underlying storage layer after a Mentat transaction.
fn committed_metadata_assertions(&self, tx_id: Entid) -> Result<Vec<(Entid, Entid, TypedValue, bool)>>; ///
/// This is a final step in performing a transaction.
fn commit_mentat_transaction(&self, tx_id: Entid) -> Result<()>;
/// Extract metadata-related [e a typed_value added] datoms resolved in the last
/// materialized transaction.
fn resolved_metadata_assertions(&self) -> Result<Vec<(Entid, Entid, TypedValue, bool)>>;
} }
/// Take search rows and complete `temp.search_results`. /// Take search rows and complete `temp.search_results`.
@ -945,23 +947,43 @@ impl MentatStoring for rusqlite::Connection {
results.map(|_| ()) results.map(|_| ())
} }
fn commit_transaction(&self, tx_id: Entid) -> Result<()> { fn commit_mentat_transaction(&self, tx_id: Entid) -> Result<()> {
search(&self)?;
insert_transaction(&self, tx_id)?; insert_transaction(&self, tx_id)?;
Ok(())
}
fn materialize_mentat_transaction(&self, tx_id: Entid) -> Result<()> {
search(&self)?;
update_datoms(&self, tx_id)?; update_datoms(&self, tx_id)?;
Ok(()) Ok(())
} }
fn committed_metadata_assertions(&self, tx_id: Entid) -> Result<Vec<(Entid, Entid, TypedValue, bool)>> { fn resolved_metadata_assertions(&self) -> Result<Vec<(Entid, Entid, TypedValue, bool)>> {
// TODO: use concat! to avoid creating String instances. let sql_stmt = format!(r#"
let mut stmt = self.prepare_cached(format!("SELECT e, a, v, value_type_tag, added FROM transactions WHERE tx = ? AND a IN {} ORDER BY e, a, v, value_type_tag, added", entids::METADATA_SQL_LIST.as_str()).as_str())?; SELECT e, a, v, value_type_tag, added FROM
let params = [&tx_id as &ToSql]; (
let m: Result<Vec<_>> = stmt.query_and_then(&params[..], |row| -> Result<(Entid, Entid, TypedValue, bool)> { SELECT e0 as e, a0 as a, v0 as v, value_type_tag0 as value_type_tag, 1 as added
Ok((row.get_checked(0)?, FROM temp.search_results
row.get_checked(1)?, WHERE a0 IN {} AND added0 IS 1 AND ((rid IS NULL) OR
TypedValue::from_sql_value_pair(row.get_checked(2)?, row.get_checked(3)?)?, ((rid IS NOT NULL) AND (v0 IS NOT v)))
row.get_checked(4)?))
})?.collect(); UNION
SELECT e0 as e, a0 as a, v, value_type_tag0 as value_type_tag, 0 as added
FROM temp.search_results
WHERE a0 in {} AND rid IS NOT NULL AND
((added0 IS 0) OR
(added0 IS 1 AND search_type IS ':db.cardinality/one' AND v0 IS NOT v))
) ORDER BY e, a, v, value_type_tag, added"#,
entids::METADATA_SQL_LIST.as_str(), entids::METADATA_SQL_LIST.as_str()
);
let mut stmt = self.prepare_cached(&sql_stmt)?;
let m: Result<Vec<_>> = stmt.query_and_then(
&[],
row_to_transaction_assertion
)?.collect();
m m
} }
} }
@ -995,6 +1017,43 @@ pub fn update_partition_map(conn: &rusqlite::Connection, partition_map: &Partiti
Ok(()) Ok(())
} }
/// Extract metadata-related [e a typed_value added] datoms committed in the given transaction.
pub fn committed_metadata_assertions(conn: &rusqlite::Connection, tx_id: Entid) -> Result<Vec<(Entid, Entid, TypedValue, bool)>> {
let sql_stmt = format!(r#"
SELECT e, a, v, value_type_tag, added
FROM transactions
WHERE tx = ? AND a IN {}
ORDER BY e, a, v, value_type_tag, added"#,
entids::METADATA_SQL_LIST.as_str()
);
let mut stmt = conn.prepare_cached(&sql_stmt)?;
let m: Result<Vec<_>> = stmt.query_and_then(
&[&tx_id as &ToSql],
row_to_transaction_assertion
)?.collect();
m
}
/// Takes a row, produces a transaction quadruple.
fn row_to_transaction_assertion(row: &rusqlite::Row) -> Result<(Entid, Entid, TypedValue, bool)> {
Ok((
row.get_checked(0)?,
row.get_checked(1)?,
TypedValue::from_sql_value_pair(row.get_checked(2)?, row.get_checked(3)?)?,
row.get_checked(4)?
))
}
/// Takes a row, produces a datom quadruple.
fn row_to_datom_assertion(row: &rusqlite::Row) -> Result<(Entid, Entid, TypedValue)> {
Ok((
row.get_checked(0)?,
row.get_checked(1)?,
TypedValue::from_sql_value_pair(row.get_checked(2)?, row.get_checked(3)?)?
))
}
/// Update the metadata materialized views based on the given metadata report. /// Update the metadata materialized views based on the given metadata report.
/// ///
/// This updates the "entids", "idents", and "schema" materialized views, copying directly from the /// This updates the "entids", "idents", and "schema" materialized views, copying directly from the

View file

@ -130,6 +130,21 @@ use watcher::{
TransactWatcher, TransactWatcher,
}; };
/// Defines transactor's high level behaviour.
pub(crate) enum TransactorAction {
/// Materialize transaction into 'datoms' and metadata
/// views, but do not commit it into 'transactions' table.
/// Use this if you need transaction's "side-effects", but
/// don't want its by-products to end-up in the transaction log,
/// e.g. when rewinding.
Materialize,
/// Materialize transaction into 'datoms' and metadata
/// views, and also commit it into the 'transactions' table.
/// Use this for regular transactions.
MaterializeAndCommit,
}
/// A transaction on its way to being applied. /// A transaction on its way to being applied.
#[derive(Debug)] #[derive(Debug)]
pub struct Tx<'conn, 'a, W> where W: TransactWatcher { pub struct Tx<'conn, 'a, W> where W: TransactWatcher {
@ -618,10 +633,15 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
let terms_with_temp_ids = self.resolve_lookup_refs(&lookup_ref_map, terms_with_temp_ids_and_lookup_refs)?; let terms_with_temp_ids = self.resolve_lookup_refs(&lookup_ref_map, terms_with_temp_ids_and_lookup_refs)?;
self.transact_simple_terms(terms_with_temp_ids, tempid_set) self.transact_simple_terms_with_action(terms_with_temp_ids, tempid_set, TransactorAction::MaterializeAndCommit)
} }
pub fn transact_simple_terms<I>(&mut self, terms: I, tempid_set: InternSet<TempId>) -> Result<TxReport> pub fn transact_simple_terms<I>(&mut self, terms: I, tempid_set: InternSet<TempId>) -> Result<TxReport>
where I: IntoIterator<Item=TermWithTempIds> {
self.transact_simple_terms_with_action(terms, tempid_set, TransactorAction::MaterializeAndCommit)
}
fn transact_simple_terms_with_action<I>(&mut self, terms: I, tempid_set: InternSet<TempId>, action: TransactorAction) -> Result<TxReport>
where I: IntoIterator<Item=TermWithTempIds> { where I: IntoIterator<Item=TermWithTempIds> {
// TODO: push these into an internal transaction report? // TODO: push these into an internal transaction report?
let mut tempids: BTreeMap<TempId, KnownEntid> = BTreeMap::default(); let mut tempids: BTreeMap<TempId, KnownEntid> = BTreeMap::default();
@ -786,16 +806,29 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
self.store.insert_fts_searches(&fts_many[..], db::SearchType::Exact)?; self.store.insert_fts_searches(&fts_many[..], db::SearchType::Exact)?;
} }
self.store.commit_transaction(self.tx_id)?; match action {
TransactorAction::Materialize => {
self.store.materialize_mentat_transaction(self.tx_id)?;
},
TransactorAction::MaterializeAndCommit => {
self.store.materialize_mentat_transaction(self.tx_id)?;
self.store.commit_mentat_transaction(self.tx_id)?;
}
}
} }
db::update_partition_map(self.store, &self.partition_map)?; db::update_partition_map(self.store, &self.partition_map)?;
self.watcher.done(&self.tx_id, self.schema)?; self.watcher.done(&self.tx_id, self.schema)?;
if tx_might_update_metadata { if tx_might_update_metadata {
println!("might update schema!");
// Extract changes to metadata from the store. // Extract changes to metadata from the store.
let metadata_assertions = self.store.committed_metadata_assertions(self.tx_id)?; let metadata_assertions = match action {
TransactorAction::Materialize => self.store.resolved_metadata_assertions()?,
TransactorAction::MaterializeAndCommit => db::committed_metadata_assertions(self.store, self.tx_id)?
};
println!("assertions: {:?}", metadata_assertions);
let mut new_schema = (*self.schema_for_mutation).clone(); // Clone the underlying Schema for modification. let mut new_schema = (*self.schema_for_mutation).clone(); // Clone the underlying Schema for modification.
let metadata_report = metadata::update_schema_from_entid_quadruples(&mut new_schema, metadata_assertions)?; let metadata_report = metadata::update_schema_from_entid_quadruples(&mut new_schema, metadata_assertions)?;
@ -874,8 +907,25 @@ pub fn transact_terms<'conn, 'a, I, W>(conn: &'conn rusqlite::Connection,
where I: IntoIterator<Item=TermWithTempIds>, where I: IntoIterator<Item=TermWithTempIds>,
W: TransactWatcher { W: TransactWatcher {
transact_terms_with_action(
conn, partition_map, schema_for_mutation, schema, watcher, terms, tempid_set,
TransactorAction::MaterializeAndCommit
)
}
pub(crate) fn transact_terms_with_action<'conn, 'a, I, W>(conn: &'conn rusqlite::Connection,
partition_map: PartitionMap,
schema_for_mutation: &'a Schema,
schema: &'a Schema,
watcher: W,
terms: I,
tempid_set: InternSet<TempId>,
action: TransactorAction) -> Result<(TxReport, PartitionMap, Option<Schema>, W)>
where I: IntoIterator<Item=TermWithTempIds>,
W: TransactWatcher {
let mut tx = start_tx(conn, partition_map, schema_for_mutation, schema, watcher)?; let mut tx = start_tx(conn, partition_map, schema_for_mutation, schema, watcher)?;
let report = tx.transact_simple_terms(terms, tempid_set)?; let report = tx.transact_simple_terms_with_action(terms, tempid_set, action)?;
conclude_tx(tx, report) conclude_tx(tx, report)
} }