diff --git a/db/src/db.rs b/db/src/db.rs index 5ab33df2..ffe46558 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -245,8 +245,9 @@ lazy_static! { r#"CREATE INDEX idx_idents_unique ON idents (e, a, v, value_type_tag)"#, r#"CREATE TABLE schema (e INTEGER NOT NULL, a SMALLINT NOT NULL, v BLOB NOT NULL, value_type_tag SMALLINT NOT NULL)"#, r#"CREATE INDEX idx_schema_unique ON schema (e, a, v, value_type_tag)"#, + // TODO: store entid instead of ident for partition name. - r#"CREATE TABLE parts (part TEXT NOT NULL PRIMARY KEY, start INTEGER NOT NULL, end INTEGER NOT NULL, idx INTEGER NOT NULL, allow_excision SMALLINT NOT NULL)"#, + r#"CREATE TABLE known_parts (part TEXT NOT NULL PRIMARY KEY, start INTEGER NOT NULL, end INTEGER NOT NULL, allow_excision SMALLINT NOT NULL)"#, ] }; } @@ -288,6 +289,35 @@ pub fn create_empty_current_version(conn: &mut rusqlite::Connection) -> Result<( Ok((tx, DB::new(bootstrap_partition_map, bootstrap_schema))) } +/// Creates a partition map view for the main timeline based on partitions +/// defined in 'known_parts'. +fn create_current_partition_view(conn: &rusqlite::Connection) -> Result<()> { + let mut stmt = conn.prepare("SELECT part, end FROM known_parts ORDER BY end ASC")?; + let known_parts: Result> = stmt.query_and_then(&[], |row| { + Ok(( + row.get_checked(0)?, + row.get_checked(1)?, + )) + })?.collect(); + + let mut case = vec![]; + for &(ref part, ref end) in known_parts?.iter() { + case.push(format!(r#"WHEN e <= {} THEN "{}""#, end, part)); + } + + let view_stmt = format!("CREATE VIEW parts AS + SELECT + CASE {} END AS part, + min(e) AS start, + max(e) + 1 AS idx + FROM timelined_transactions WHERE timeline = {} GROUP BY part", + case.join(" "), ::TIMELINE_MAIN + ); + + conn.execute(&view_stmt, &[])?; + Ok(()) +} + // TODO: rename "SQL" functions to align with "datoms" functions. pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result { let (tx, mut db) = create_empty_current_version(conn)?; @@ -298,9 +328,11 @@ pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result { // This is necessary: `transact` will only UPDATE parts, not INSERT them if they're missing. for (part, partition) in db.partition_map.iter() { // TODO: Convert "keyword" part to SQL using Value conversion. - tx.execute("INSERT INTO parts (part, start, end, idx, allow_excision) VALUES (?, ?, ?, ?, ?)", &[part, &partition.start, &partition.end, &partition.next_entid(), &partition.allow_excision])?; + tx.execute("INSERT INTO known_parts (part, start, end, allow_excision) VALUES (?, ?, ?, ?)", &[part, &partition.start, &partition.end, &partition.allow_excision])?; } + create_current_partition_view(&tx)?; + // TODO: return to transact_internal to self-manage the encompassing SQLite transaction. let bootstrap_schema_for_mutation = Schema::default(); // The bootstrap transaction will populate this schema. @@ -435,8 +467,37 @@ pub(crate) fn read_materialized_view(conn: &rusqlite::Connection, table: &str) - } /// Read the partition map materialized view from the given SQL store. -fn read_partition_map(conn: &rusqlite::Connection) -> Result { - let mut stmt: rusqlite::Statement = conn.prepare("SELECT part, start, end, idx, allow_excision FROM parts")?; +pub(crate) fn read_partition_map(conn: &rusqlite::Connection) -> Result { + // An obviously expensive query, but we only need to run it once. + // First part of the union sprinkles 'allow_excision' into the 'parts' view. + // Second part of the union takes care of partitions which are known + // but don't have any transactions. + let mut stmt: rusqlite::Statement = conn.prepare(" + SELECT + known_parts.part, + known_parts.start, + known_parts.end, + parts.idx, + known_parts.allow_excision + FROM + parts + INNER JOIN + known_parts + ON parts.part = known_parts.part + + UNION + + SELECT + part, + start, + end, + start, + allow_excision + FROM + known_parts + WHERE + part NOT IN (SELECT part FROM parts)" + )?; let m = stmt.query_and_then(&[], |row| -> Result<(String, Partition)> { Ok((row.get_checked(0)?, Partition::new(row.get_checked(1)?, row.get_checked(2)?, row.get_checked(3)?, row.get_checked(4)?))) })?.collect(); @@ -989,35 +1050,6 @@ impl MentatStoring for rusqlite::Connection { } } -/// Update the current partition map materialized view. -// TODO: only update changed partitions. -pub fn update_partition_map(conn: &rusqlite::Connection, partition_map: &PartitionMap) -> Result<()> { - let values_per_statement = 2; - let max_vars = conn.limit(Limit::SQLITE_LIMIT_VARIABLE_NUMBER) as usize; - let max_partitions = max_vars / values_per_statement; - if partition_map.len() > max_partitions { - bail!(DbErrorKind::NotYetImplemented(format!("No more than {} partitions are supported", max_partitions))); - } - - // Like "UPDATE parts SET idx = CASE WHEN part = ? THEN ? WHEN part = ? THEN ? ELSE idx END". - let s = format!("UPDATE parts SET idx = CASE {} ELSE idx END", - repeat("WHEN part = ? THEN ?").take(partition_map.len()).join(" ")); - - // Lifetimes of temporary values make this building a slice of references annoying if we're - // using partition.next_entid() getter; instead, we peek into partition directly. - let params: Vec<&ToSql> = partition_map.iter().flat_map(|(name, partition)| { - once(name as &ToSql) - .chain(once(&partition.next_entid_to_allocate as &ToSql)) - }).collect(); - - // TODO: only cache the latest of these statements. Changing the set of partitions isn't - // supported in the Clojure implementation at all, and might not be supported in Mentat soon, - // so this is very low priority. - let mut stmt = conn.prepare_cached(s.as_str())?; - stmt.execute(¶ms[..]).context(DbErrorKind::FailedToUpdatePartitionMap)?; - Ok(()) -} - /// Extract metadata-related [e a typed_value added] datoms committed in the given transaction. pub fn committed_metadata_assertions(conn: &rusqlite::Connection, tx_id: Entid) -> Result> { let sql_stmt = format!(r#" diff --git a/db/src/lib.rs b/db/src/lib.rs index 27cd0943..a798bb05 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -63,6 +63,8 @@ pub use bootstrap::{ USER0, }; +pub static TIMELINE_MAIN: i64 = 0; + pub use schema::{ AttributeBuilder, AttributeValidation, diff --git a/db/src/tx.rs b/db/src/tx.rs index 64988176..bda684c7 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -818,7 +818,6 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { } - db::update_partition_map(self.store, &self.partition_map)?; self.watcher.done(&self.tx_id, self.schema)?; if tx_might_update_metadata {