diff --git a/db/src/db.rs b/db/src/db.rs index 3374ce11..68ae59d3 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -426,14 +426,10 @@ impl TypedSQLValue for TypedValue { /// store. pub(crate) fn read_materialized_view(conn: &rusqlite::Connection, table: &str) -> Result> { let mut stmt: rusqlite::Statement = conn.prepare(format!("SELECT e, a, v, value_type_tag FROM {}", table).as_str())?; - let m: Result> = stmt.query_and_then(&[], |row| { - let e: Entid = row.get_checked(0)?; - let a: Entid = row.get_checked(1)?; - let v: rusqlite::types::Value = row.get_checked(2)?; - let value_type_tag: i32 = row.get_checked(3)?; - let typed_value = TypedValue::from_sql_value_pair(v, value_type_tag)?; - Ok((e, a, typed_value)) - })?.collect(); + let m: Result> = stmt.query_and_then( + &[], + row_to_datom_assertion + )?.collect(); m } @@ -515,14 +511,20 @@ pub trait MentatStoring { fn insert_non_fts_searches<'a>(&self, entities: &'a [ReducedEntity], search_type: SearchType) -> Result<()>; fn insert_fts_searches<'a>(&self, entities: &'a [ReducedEntity], search_type: SearchType) -> Result<()>; - /// Finalize the underlying storage layer after a Mentat transaction. + /// Prepare the underlying storage layer for finalization after a Mentat transaction. /// /// Use this to finalize temporary tables, complete indices, revert pragmas, etc, after the /// final `insert_non_fts_searches` invocation. - fn commit_transaction(&self, tx_id: Entid) -> Result<()>; + fn materialize_mentat_transaction(&self, tx_id: Entid) -> Result<()>; - /// Extract metadata-related [e a typed_value added] datoms committed in the given transaction. - fn committed_metadata_assertions(&self, tx_id: Entid) -> Result>; + /// Finalize the underlying storage layer after a Mentat transaction. + /// + /// This is a final step in performing a transaction. + fn commit_mentat_transaction(&self, tx_id: Entid) -> Result<()>; + + /// Extract metadata-related [e a typed_value added] datoms resolved in the last + /// materialized transaction. + fn resolved_metadata_assertions(&self) -> Result>; } /// Take search rows and complete `temp.search_results`. @@ -945,23 +947,43 @@ impl MentatStoring for rusqlite::Connection { results.map(|_| ()) } - fn commit_transaction(&self, tx_id: Entid) -> Result<()> { - search(&self)?; + fn commit_mentat_transaction(&self, tx_id: Entid) -> Result<()> { insert_transaction(&self, tx_id)?; + Ok(()) + } + + fn materialize_mentat_transaction(&self, tx_id: Entid) -> Result<()> { + search(&self)?; update_datoms(&self, tx_id)?; Ok(()) } - fn committed_metadata_assertions(&self, tx_id: Entid) -> Result> { - // TODO: use concat! to avoid creating String instances. - let mut stmt = self.prepare_cached(format!("SELECT e, a, v, value_type_tag, added FROM transactions WHERE tx = ? AND a IN {} ORDER BY e, a, v, value_type_tag, added", entids::METADATA_SQL_LIST.as_str()).as_str())?; - let params = [&tx_id as &ToSql]; - let m: Result> = stmt.query_and_then(¶ms[..], |row| -> Result<(Entid, Entid, TypedValue, bool)> { - Ok((row.get_checked(0)?, - row.get_checked(1)?, - TypedValue::from_sql_value_pair(row.get_checked(2)?, row.get_checked(3)?)?, - row.get_checked(4)?)) - })?.collect(); + fn resolved_metadata_assertions(&self) -> Result> { + let sql_stmt = format!(r#" + SELECT e, a, v, value_type_tag, added FROM + ( + SELECT e0 as e, a0 as a, v0 as v, value_type_tag0 as value_type_tag, 1 as added + FROM temp.search_results + WHERE a0 IN {} AND added0 IS 1 AND ((rid IS NULL) OR + ((rid IS NOT NULL) AND (v0 IS NOT v))) + + UNION + + SELECT e0 as e, a0 as a, v, value_type_tag0 as value_type_tag, 0 as added + FROM temp.search_results + WHERE a0 in {} AND rid IS NOT NULL AND + ((added0 IS 0) OR + (added0 IS 1 AND search_type IS ':db.cardinality/one' AND v0 IS NOT v)) + + ) ORDER BY e, a, v, value_type_tag, added"#, + entids::METADATA_SQL_LIST.as_str(), entids::METADATA_SQL_LIST.as_str() + ); + + let mut stmt = self.prepare_cached(&sql_stmt)?; + let m: Result> = stmt.query_and_then( + &[], + row_to_transaction_assertion + )?.collect(); m } } @@ -995,6 +1017,43 @@ pub fn update_partition_map(conn: &rusqlite::Connection, partition_map: &Partiti Ok(()) } +/// Extract metadata-related [e a typed_value added] datoms committed in the given transaction. +pub fn committed_metadata_assertions(conn: &rusqlite::Connection, tx_id: Entid) -> Result> { + let sql_stmt = format!(r#" + SELECT e, a, v, value_type_tag, added + FROM transactions + WHERE tx = ? AND a IN {} + ORDER BY e, a, v, value_type_tag, added"#, + entids::METADATA_SQL_LIST.as_str() + ); + + let mut stmt = conn.prepare_cached(&sql_stmt)?; + let m: Result> = stmt.query_and_then( + &[&tx_id as &ToSql], + row_to_transaction_assertion + )?.collect(); + m +} + +/// Takes a row, produces a transaction quadruple. +fn row_to_transaction_assertion(row: &rusqlite::Row) -> Result<(Entid, Entid, TypedValue, bool)> { + Ok(( + row.get_checked(0)?, + row.get_checked(1)?, + TypedValue::from_sql_value_pair(row.get_checked(2)?, row.get_checked(3)?)?, + row.get_checked(4)? + )) +} + +/// Takes a row, produces a datom quadruple. +fn row_to_datom_assertion(row: &rusqlite::Row) -> Result<(Entid, Entid, TypedValue)> { + Ok(( + row.get_checked(0)?, + row.get_checked(1)?, + TypedValue::from_sql_value_pair(row.get_checked(2)?, row.get_checked(3)?)? + )) +} + /// Update the metadata materialized views based on the given metadata report. /// /// This updates the "entids", "idents", and "schema" materialized views, copying directly from the diff --git a/db/src/tx.rs b/db/src/tx.rs index 85010f4b..64988176 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -130,6 +130,21 @@ use watcher::{ TransactWatcher, }; +/// Defines transactor's high level behaviour. +pub(crate) enum TransactorAction { + /// Materialize transaction into 'datoms' and metadata + /// views, but do not commit it into 'transactions' table. + /// Use this if you need transaction's "side-effects", but + /// don't want its by-products to end-up in the transaction log, + /// e.g. when rewinding. + Materialize, + + /// Materialize transaction into 'datoms' and metadata + /// views, and also commit it into the 'transactions' table. + /// Use this for regular transactions. + MaterializeAndCommit, +} + /// A transaction on its way to being applied. #[derive(Debug)] pub struct Tx<'conn, 'a, W> where W: TransactWatcher { @@ -618,10 +633,15 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { let terms_with_temp_ids = self.resolve_lookup_refs(&lookup_ref_map, terms_with_temp_ids_and_lookup_refs)?; - self.transact_simple_terms(terms_with_temp_ids, tempid_set) + self.transact_simple_terms_with_action(terms_with_temp_ids, tempid_set, TransactorAction::MaterializeAndCommit) } pub fn transact_simple_terms(&mut self, terms: I, tempid_set: InternSet) -> Result + where I: IntoIterator { + self.transact_simple_terms_with_action(terms, tempid_set, TransactorAction::MaterializeAndCommit) + } + + fn transact_simple_terms_with_action(&mut self, terms: I, tempid_set: InternSet, action: TransactorAction) -> Result where I: IntoIterator { // TODO: push these into an internal transaction report? let mut tempids: BTreeMap = BTreeMap::default(); @@ -786,16 +806,29 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { self.store.insert_fts_searches(&fts_many[..], db::SearchType::Exact)?; } - self.store.commit_transaction(self.tx_id)?; + match action { + TransactorAction::Materialize => { + self.store.materialize_mentat_transaction(self.tx_id)?; + }, + TransactorAction::MaterializeAndCommit => { + self.store.materialize_mentat_transaction(self.tx_id)?; + self.store.commit_mentat_transaction(self.tx_id)?; + } + } + } db::update_partition_map(self.store, &self.partition_map)?; self.watcher.done(&self.tx_id, self.schema)?; if tx_might_update_metadata { + println!("might update schema!"); // Extract changes to metadata from the store. - let metadata_assertions = self.store.committed_metadata_assertions(self.tx_id)?; - + let metadata_assertions = match action { + TransactorAction::Materialize => self.store.resolved_metadata_assertions()?, + TransactorAction::MaterializeAndCommit => db::committed_metadata_assertions(self.store, self.tx_id)? + }; + println!("assertions: {:?}", metadata_assertions); let mut new_schema = (*self.schema_for_mutation).clone(); // Clone the underlying Schema for modification. let metadata_report = metadata::update_schema_from_entid_quadruples(&mut new_schema, metadata_assertions)?; @@ -874,8 +907,25 @@ pub fn transact_terms<'conn, 'a, I, W>(conn: &'conn rusqlite::Connection, where I: IntoIterator, W: TransactWatcher { + transact_terms_with_action( + conn, partition_map, schema_for_mutation, schema, watcher, terms, tempid_set, + TransactorAction::MaterializeAndCommit + ) +} + +pub(crate) fn transact_terms_with_action<'conn, 'a, I, W>(conn: &'conn rusqlite::Connection, + partition_map: PartitionMap, + schema_for_mutation: &'a Schema, + schema: &'a Schema, + watcher: W, + terms: I, + tempid_set: InternSet, + action: TransactorAction) -> Result<(TxReport, PartitionMap, Option, W)> + where I: IntoIterator, + W: TransactWatcher { + let mut tx = start_tx(conn, partition_map, schema_for_mutation, schema, watcher)?; - let report = tx.transact_simple_terms(terms, tempid_set)?; + let report = tx.transact_simple_terms_with_action(terms, tempid_set, action)?; conclude_tx(tx, report) }