Part 3: Use a view to derive parts table

Being able to derive partition map from partition definitions and current
state of the world (transactions), segmented by timelines, is useful
because it lets us not worry about keeping materialized partition maps
up-to-date - since there's no need for materialized partition maps at that point.

This comes in very handy when we start moving chunks of transactions off of our mainline.
Alternative to this work would look like materializing partition maps per timeline,
growing support for incremental "backwards update" of the materialized maps, etc.

Our core partitions are defined in 'known_parts' table during bootstrap,
and what used to be 'parts' table is a generated view that operates over
transactions to figure out partition index.

'parts' is defined for the main timeline. Querying parts for other timelines
or for particular timeline+tx combinations will look similar.
This commit is contained in:
Grisha Kruglov 2018-07-17 17:54:13 -07:00 committed by Grisha Kruglov
parent 3ca5255cde
commit 4ec780c87a
3 changed files with 67 additions and 34 deletions

View file

@ -245,8 +245,9 @@ lazy_static! {
r#"CREATE INDEX idx_idents_unique ON idents (e, a, v, value_type_tag)"#, r#"CREATE INDEX idx_idents_unique ON idents (e, a, v, value_type_tag)"#,
r#"CREATE TABLE schema (e INTEGER NOT NULL, a SMALLINT NOT NULL, v BLOB NOT NULL, value_type_tag SMALLINT NOT NULL)"#, r#"CREATE TABLE schema (e INTEGER NOT NULL, a SMALLINT NOT NULL, v BLOB NOT NULL, value_type_tag SMALLINT NOT NULL)"#,
r#"CREATE INDEX idx_schema_unique ON schema (e, a, v, value_type_tag)"#, r#"CREATE INDEX idx_schema_unique ON schema (e, a, v, value_type_tag)"#,
// TODO: store entid instead of ident for partition name. // TODO: store entid instead of ident for partition name.
r#"CREATE TABLE parts (part TEXT NOT NULL PRIMARY KEY, start INTEGER NOT NULL, end INTEGER NOT NULL, idx INTEGER NOT NULL, allow_excision SMALLINT NOT NULL)"#, r#"CREATE TABLE known_parts (part TEXT NOT NULL PRIMARY KEY, start INTEGER NOT NULL, end INTEGER NOT NULL, allow_excision SMALLINT NOT NULL)"#,
] ]
}; };
} }
@ -288,6 +289,35 @@ pub fn create_empty_current_version(conn: &mut rusqlite::Connection) -> Result<(
Ok((tx, DB::new(bootstrap_partition_map, bootstrap_schema))) Ok((tx, DB::new(bootstrap_partition_map, bootstrap_schema)))
} }
/// Creates a partition map view for the main timeline based on partitions
/// defined in 'known_parts'.
fn create_current_partition_view(conn: &rusqlite::Connection) -> Result<()> {
let mut stmt = conn.prepare("SELECT part, end FROM known_parts ORDER BY end ASC")?;
let known_parts: Result<Vec<(String, i64)>> = stmt.query_and_then(&[], |row| {
Ok((
row.get_checked(0)?,
row.get_checked(1)?,
))
})?.collect();
let mut case = vec![];
for &(ref part, ref end) in known_parts?.iter() {
case.push(format!(r#"WHEN e <= {} THEN "{}""#, end, part));
}
let view_stmt = format!("CREATE VIEW parts AS
SELECT
CASE {} END AS part,
min(e) AS start,
max(e) + 1 AS idx
FROM timelined_transactions WHERE timeline = {} GROUP BY part",
case.join(" "), ::TIMELINE_MAIN
);
conn.execute(&view_stmt, &[])?;
Ok(())
}
// TODO: rename "SQL" functions to align with "datoms" functions. // TODO: rename "SQL" functions to align with "datoms" functions.
pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result<DB> { pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result<DB> {
let (tx, mut db) = create_empty_current_version(conn)?; let (tx, mut db) = create_empty_current_version(conn)?;
@ -298,9 +328,11 @@ pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result<DB> {
// This is necessary: `transact` will only UPDATE parts, not INSERT them if they're missing. // This is necessary: `transact` will only UPDATE parts, not INSERT them if they're missing.
for (part, partition) in db.partition_map.iter() { for (part, partition) in db.partition_map.iter() {
// TODO: Convert "keyword" part to SQL using Value conversion. // TODO: Convert "keyword" part to SQL using Value conversion.
tx.execute("INSERT INTO parts (part, start, end, idx, allow_excision) VALUES (?, ?, ?, ?, ?)", &[part, &partition.start, &partition.end, &partition.next_entid(), &partition.allow_excision])?; tx.execute("INSERT INTO known_parts (part, start, end, allow_excision) VALUES (?, ?, ?, ?)", &[part, &partition.start, &partition.end, &partition.allow_excision])?;
} }
create_current_partition_view(&tx)?;
// TODO: return to transact_internal to self-manage the encompassing SQLite transaction. // TODO: return to transact_internal to self-manage the encompassing SQLite transaction.
let bootstrap_schema_for_mutation = Schema::default(); // The bootstrap transaction will populate this schema. let bootstrap_schema_for_mutation = Schema::default(); // The bootstrap transaction will populate this schema.
@ -435,8 +467,37 @@ pub(crate) fn read_materialized_view(conn: &rusqlite::Connection, table: &str) -
} }
/// Read the partition map materialized view from the given SQL store. /// Read the partition map materialized view from the given SQL store.
fn read_partition_map(conn: &rusqlite::Connection) -> Result<PartitionMap> { pub(crate) fn read_partition_map(conn: &rusqlite::Connection) -> Result<PartitionMap> {
let mut stmt: rusqlite::Statement = conn.prepare("SELECT part, start, end, idx, allow_excision FROM parts")?; // An obviously expensive query, but we only need to run it once.
// First part of the union sprinkles 'allow_excision' into the 'parts' view.
// Second part of the union takes care of partitions which are known
// but don't have any transactions.
let mut stmt: rusqlite::Statement = conn.prepare("
SELECT
known_parts.part,
known_parts.start,
known_parts.end,
parts.idx,
known_parts.allow_excision
FROM
parts
INNER JOIN
known_parts
ON parts.part = known_parts.part
UNION
SELECT
part,
start,
end,
start,
allow_excision
FROM
known_parts
WHERE
part NOT IN (SELECT part FROM parts)"
)?;
let m = stmt.query_and_then(&[], |row| -> Result<(String, Partition)> { let m = stmt.query_and_then(&[], |row| -> Result<(String, Partition)> {
Ok((row.get_checked(0)?, Partition::new(row.get_checked(1)?, row.get_checked(2)?, row.get_checked(3)?, row.get_checked(4)?))) Ok((row.get_checked(0)?, Partition::new(row.get_checked(1)?, row.get_checked(2)?, row.get_checked(3)?, row.get_checked(4)?)))
})?.collect(); })?.collect();
@ -989,35 +1050,6 @@ impl MentatStoring for rusqlite::Connection {
} }
} }
/// Update the current partition map materialized view.
// TODO: only update changed partitions.
pub fn update_partition_map(conn: &rusqlite::Connection, partition_map: &PartitionMap) -> Result<()> {
let values_per_statement = 2;
let max_vars = conn.limit(Limit::SQLITE_LIMIT_VARIABLE_NUMBER) as usize;
let max_partitions = max_vars / values_per_statement;
if partition_map.len() > max_partitions {
bail!(DbErrorKind::NotYetImplemented(format!("No more than {} partitions are supported", max_partitions)));
}
// Like "UPDATE parts SET idx = CASE WHEN part = ? THEN ? WHEN part = ? THEN ? ELSE idx END".
let s = format!("UPDATE parts SET idx = CASE {} ELSE idx END",
repeat("WHEN part = ? THEN ?").take(partition_map.len()).join(" "));
// Lifetimes of temporary values make this building a slice of references annoying if we're
// using partition.next_entid() getter; instead, we peek into partition directly.
let params: Vec<&ToSql> = partition_map.iter().flat_map(|(name, partition)| {
once(name as &ToSql)
.chain(once(&partition.next_entid_to_allocate as &ToSql))
}).collect();
// TODO: only cache the latest of these statements. Changing the set of partitions isn't
// supported in the Clojure implementation at all, and might not be supported in Mentat soon,
// so this is very low priority.
let mut stmt = conn.prepare_cached(s.as_str())?;
stmt.execute(&params[..]).context(DbErrorKind::FailedToUpdatePartitionMap)?;
Ok(())
}
/// Extract metadata-related [e a typed_value added] datoms committed in the given transaction. /// Extract metadata-related [e a typed_value added] datoms committed in the given transaction.
pub fn committed_metadata_assertions(conn: &rusqlite::Connection, tx_id: Entid) -> Result<Vec<(Entid, Entid, TypedValue, bool)>> { pub fn committed_metadata_assertions(conn: &rusqlite::Connection, tx_id: Entid) -> Result<Vec<(Entid, Entid, TypedValue, bool)>> {
let sql_stmt = format!(r#" let sql_stmt = format!(r#"

View file

@ -63,6 +63,8 @@ pub use bootstrap::{
USER0, USER0,
}; };
pub static TIMELINE_MAIN: i64 = 0;
pub use schema::{ pub use schema::{
AttributeBuilder, AttributeBuilder,
AttributeValidation, AttributeValidation,

View file

@ -818,7 +818,6 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
} }
db::update_partition_map(self.store, &self.partition_map)?;
self.watcher.done(&self.tx_id, self.schema)?; self.watcher.done(&self.tx_id, self.schema)?;
if tx_might_update_metadata { if tx_might_update_metadata {