From 4ec780c87af95d0c08990957004c3eca1ea22b03 Mon Sep 17 00:00:00 2001 From: Grisha Kruglov Date: Tue, 17 Jul 2018 17:54:13 -0700 Subject: [PATCH] Part 3: Use a view to derive parts table Being able to derive partition map from partition definitions and current state of the world (transactions), segmented by timelines, is useful because it lets us not worry about keeping materialized partition maps up-to-date - since there's no need for materialized partition maps at that point. This comes in very handy when we start moving chunks of transactions off of our mainline. Alternative to this work would look like materializing partition maps per timeline, growing support for incremental "backwards update" of the materialized maps, etc. Our core partitions are defined in 'known_parts' table during bootstrap, and what used to be 'parts' table is a generated view that operates over transactions to figure out partition index. 'parts' is defined for the main timeline. Querying parts for other timelines or for particular timeline+tx combinations will look similar. --- db/src/db.rs | 98 ++++++++++++++++++++++++++++++++++----------------- db/src/lib.rs | 2 ++ db/src/tx.rs | 1 - 3 files changed, 67 insertions(+), 34 deletions(-) diff --git a/db/src/db.rs b/db/src/db.rs index 5ab33df2..ffe46558 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -245,8 +245,9 @@ lazy_static! { r#"CREATE INDEX idx_idents_unique ON idents (e, a, v, value_type_tag)"#, r#"CREATE TABLE schema (e INTEGER NOT NULL, a SMALLINT NOT NULL, v BLOB NOT NULL, value_type_tag SMALLINT NOT NULL)"#, r#"CREATE INDEX idx_schema_unique ON schema (e, a, v, value_type_tag)"#, + // TODO: store entid instead of ident for partition name. - r#"CREATE TABLE parts (part TEXT NOT NULL PRIMARY KEY, start INTEGER NOT NULL, end INTEGER NOT NULL, idx INTEGER NOT NULL, allow_excision SMALLINT NOT NULL)"#, + r#"CREATE TABLE known_parts (part TEXT NOT NULL PRIMARY KEY, start INTEGER NOT NULL, end INTEGER NOT NULL, allow_excision SMALLINT NOT NULL)"#, ] }; } @@ -288,6 +289,35 @@ pub fn create_empty_current_version(conn: &mut rusqlite::Connection) -> Result<( Ok((tx, DB::new(bootstrap_partition_map, bootstrap_schema))) } +/// Creates a partition map view for the main timeline based on partitions +/// defined in 'known_parts'. +fn create_current_partition_view(conn: &rusqlite::Connection) -> Result<()> { + let mut stmt = conn.prepare("SELECT part, end FROM known_parts ORDER BY end ASC")?; + let known_parts: Result> = stmt.query_and_then(&[], |row| { + Ok(( + row.get_checked(0)?, + row.get_checked(1)?, + )) + })?.collect(); + + let mut case = vec![]; + for &(ref part, ref end) in known_parts?.iter() { + case.push(format!(r#"WHEN e <= {} THEN "{}""#, end, part)); + } + + let view_stmt = format!("CREATE VIEW parts AS + SELECT + CASE {} END AS part, + min(e) AS start, + max(e) + 1 AS idx + FROM timelined_transactions WHERE timeline = {} GROUP BY part", + case.join(" "), ::TIMELINE_MAIN + ); + + conn.execute(&view_stmt, &[])?; + Ok(()) +} + // TODO: rename "SQL" functions to align with "datoms" functions. pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result { let (tx, mut db) = create_empty_current_version(conn)?; @@ -298,9 +328,11 @@ pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result { // This is necessary: `transact` will only UPDATE parts, not INSERT them if they're missing. for (part, partition) in db.partition_map.iter() { // TODO: Convert "keyword" part to SQL using Value conversion. - tx.execute("INSERT INTO parts (part, start, end, idx, allow_excision) VALUES (?, ?, ?, ?, ?)", &[part, &partition.start, &partition.end, &partition.next_entid(), &partition.allow_excision])?; + tx.execute("INSERT INTO known_parts (part, start, end, allow_excision) VALUES (?, ?, ?, ?)", &[part, &partition.start, &partition.end, &partition.allow_excision])?; } + create_current_partition_view(&tx)?; + // TODO: return to transact_internal to self-manage the encompassing SQLite transaction. let bootstrap_schema_for_mutation = Schema::default(); // The bootstrap transaction will populate this schema. @@ -435,8 +467,37 @@ pub(crate) fn read_materialized_view(conn: &rusqlite::Connection, table: &str) - } /// Read the partition map materialized view from the given SQL store. -fn read_partition_map(conn: &rusqlite::Connection) -> Result { - let mut stmt: rusqlite::Statement = conn.prepare("SELECT part, start, end, idx, allow_excision FROM parts")?; +pub(crate) fn read_partition_map(conn: &rusqlite::Connection) -> Result { + // An obviously expensive query, but we only need to run it once. + // First part of the union sprinkles 'allow_excision' into the 'parts' view. + // Second part of the union takes care of partitions which are known + // but don't have any transactions. + let mut stmt: rusqlite::Statement = conn.prepare(" + SELECT + known_parts.part, + known_parts.start, + known_parts.end, + parts.idx, + known_parts.allow_excision + FROM + parts + INNER JOIN + known_parts + ON parts.part = known_parts.part + + UNION + + SELECT + part, + start, + end, + start, + allow_excision + FROM + known_parts + WHERE + part NOT IN (SELECT part FROM parts)" + )?; let m = stmt.query_and_then(&[], |row| -> Result<(String, Partition)> { Ok((row.get_checked(0)?, Partition::new(row.get_checked(1)?, row.get_checked(2)?, row.get_checked(3)?, row.get_checked(4)?))) })?.collect(); @@ -989,35 +1050,6 @@ impl MentatStoring for rusqlite::Connection { } } -/// Update the current partition map materialized view. -// TODO: only update changed partitions. -pub fn update_partition_map(conn: &rusqlite::Connection, partition_map: &PartitionMap) -> Result<()> { - let values_per_statement = 2; - let max_vars = conn.limit(Limit::SQLITE_LIMIT_VARIABLE_NUMBER) as usize; - let max_partitions = max_vars / values_per_statement; - if partition_map.len() > max_partitions { - bail!(DbErrorKind::NotYetImplemented(format!("No more than {} partitions are supported", max_partitions))); - } - - // Like "UPDATE parts SET idx = CASE WHEN part = ? THEN ? WHEN part = ? THEN ? ELSE idx END". - let s = format!("UPDATE parts SET idx = CASE {} ELSE idx END", - repeat("WHEN part = ? THEN ?").take(partition_map.len()).join(" ")); - - // Lifetimes of temporary values make this building a slice of references annoying if we're - // using partition.next_entid() getter; instead, we peek into partition directly. - let params: Vec<&ToSql> = partition_map.iter().flat_map(|(name, partition)| { - once(name as &ToSql) - .chain(once(&partition.next_entid_to_allocate as &ToSql)) - }).collect(); - - // TODO: only cache the latest of these statements. Changing the set of partitions isn't - // supported in the Clojure implementation at all, and might not be supported in Mentat soon, - // so this is very low priority. - let mut stmt = conn.prepare_cached(s.as_str())?; - stmt.execute(¶ms[..]).context(DbErrorKind::FailedToUpdatePartitionMap)?; - Ok(()) -} - /// Extract metadata-related [e a typed_value added] datoms committed in the given transaction. pub fn committed_metadata_assertions(conn: &rusqlite::Connection, tx_id: Entid) -> Result> { let sql_stmt = format!(r#" diff --git a/db/src/lib.rs b/db/src/lib.rs index 27cd0943..a798bb05 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -63,6 +63,8 @@ pub use bootstrap::{ USER0, }; +pub static TIMELINE_MAIN: i64 = 0; + pub use schema::{ AttributeBuilder, AttributeValidation, diff --git a/db/src/tx.rs b/db/src/tx.rs index 64988176..bda684c7 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -818,7 +818,6 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { } - db::update_partition_map(self.store, &self.partition_map)?; self.watcher.done(&self.tx_id, self.schema)?; if tx_might_update_metadata {