Atomic multi-tx (#489). r=emily,nalexander

* Pre: rename begin_transaction to begin_tx_application.

* Take an EXCLUSIVE transaction when bootstrapping, and an IMMEDIATE transaction when writing.

This avoids the remote possibility of another write sneaking in the door
while we're preparing to write, avoids us needing to upgrade locks, etc.

  After a BEGIN IMMEDIATE, no other database connection will be able to write
  to the database or do a BEGIN IMMEDIATE or BEGIN EXCLUSIVE. Other processes
  can continue to read from the database, however.

  An exclusive transaction causes EXCLUSIVE locks to be acquired on all
  databases. After a BEGIN EXCLUSIVE, no other database connection except for
  read_uncommitted connections will be able to read the database and no other
  connection without exception will be able to write the database until the
  transaction is complete.

* Hacky implementation of atomic multi-tx.

* Hold the last report, returning the InProgress from each operation.

* Rewrite transact in terms of InProgress.

* Test rollback.

* Remove unused imports.

* Don't use Rc for transaction reports.

* Pre: break out USER0 as a part boundary constant.

* Export TX0 and USER0 from mentat_db. This is for testing.

* Review comments: commenting.

* Test tempid allocation and rollback.
This commit is contained in:
Richard Newman 2017-12-05 07:58:24 -08:00 committed by GitHub
parent 1b72f5bbb6
commit 95b9c7f7f5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 229 additions and 42 deletions

View file

@ -60,6 +60,9 @@ path = "query-sql"
[dependencies.mentat_query_translator]
path = "query-translator"
[dependencies.mentat_tx]
path = "tx"
[dependencies.mentat_tx_parser]
path = "tx-parser"

View file

@ -32,6 +32,9 @@ use types::{Partition, PartitionMap};
/// This is the start of the :db.part/tx partition.
pub const TX0: i64 = 0x10000000;
/// This is the start of the :db.part/user partition.
pub const USER0: i64 = 0x10000;
lazy_static! {
static ref V1_IDENTS: Vec<(symbols::NamespacedKeyword, i64)> = {
vec![(ns_keyword!("db", "ident"), entids::DB_IDENT),
@ -78,7 +81,7 @@ lazy_static! {
static ref V1_PARTS: Vec<(symbols::NamespacedKeyword, i64, i64)> = {
vec![(ns_keyword!("db.part", "db"), 0, (1 + V1_IDENTS.len()) as i64),
(ns_keyword!("db.part", "user"), 0x10000, 0x10000),
(ns_keyword!("db.part", "user"), USER0, USER0),
(ns_keyword!("db.part", "tx"), TX0, TX0),
]
};

View file

@ -21,8 +21,9 @@ use std::rc::Rc;
use itertools;
use itertools::Itertools;
use rusqlite;
use rusqlite::types::{ToSql, ToSqlOutput};
use rusqlite::TransactionBehavior;
use rusqlite::limits::Limit;
use rusqlite::types::{ToSql, ToSqlOutput};
use ::{repeat_values, to_namespaced_keyword};
use bootstrap;
@ -208,7 +209,7 @@ fn get_user_version(conn: &rusqlite::Connection) -> Result<i32> {
// TODO: rename "SQL" functions to align with "datoms" functions.
pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result<DB> {
let tx = conn.transaction()?;
let tx = conn.transaction_with_behavior(TransactionBehavior::Exclusive)?;
for statement in (&V1_STATEMENTS).iter() {
tx.execute(statement, &[])?;
@ -518,7 +519,7 @@ pub trait MentatStoring {
///
/// Use this to create temporary tables, prepare indices, set pragmas, etc, before the initial
/// `insert_non_fts_searches` invocation.
fn begin_transaction(&self) -> Result<()>;
fn begin_tx_application(&self) -> Result<()>;
// TODO: this is not a reasonable abstraction, but I don't want to really consider non-SQL storage just yet.
fn insert_non_fts_searches<'a>(&self, entities: &'a [ReducedEntity], search_type: SearchType) -> Result<()>;
@ -710,7 +711,7 @@ impl MentatStoring for rusqlite::Connection {
}
/// Create empty temporary tables for search parameters and search results.
fn begin_transaction(&self) -> Result<()> {
fn begin_tx_application(&self) -> Result<()> {
// We can't do this in one shot, since we can't prepare a batch statement.
let statements = [
r#"DROP TABLE IF EXISTS temp.exact_searches"#,
@ -1163,7 +1164,8 @@ mod tests {
let details = {
// The block scopes the borrow of self.sqlite.
let tx = self.sqlite.transaction()?;
// We're about to write, so go straight ahead and get an IMMEDIATE transaction.
let tx = self.sqlite.transaction_with_behavior(TransactionBehavior::Immediate)?;
// Applying the transaction can fail, so we don't unwrap.
let details = transact(&tx, self.partition_map.clone(), &self.schema, &self.schema, entities)?;
tx.commit()?;

View file

@ -48,6 +48,13 @@ mod internal_types;
mod upsert_resolution;
mod tx;
// Export these for reference from tests. cfg(test) should work, but doesn't.
// #[cfg(test)]
pub use bootstrap::{
TX0,
USER0,
};
pub use db::{
TypedSQLValue,
new_connection,

View file

@ -700,7 +700,7 @@ pub fn transact<'conn, 'a, I>(
let tx_instant = ::now(); // Label the transaction with the timestamp when we first see it: leading edge.
let tx_id = partition_map.allocate_entid(":db.part/tx");
conn.begin_transaction()?;
conn.begin_tx_application()?;
let mut tx = Tx::new(conn, partition_map, schema_for_mutation, schema, tx_id, tx_instant);

View file

@ -27,11 +27,9 @@ use rusqlite::{
use mentat_core::{
SQLValueType,
SQLValueTypeSet,
TypedValue,
ValueType,
ValueTypeTag,
ValueTypeSet,
};
use mentat_db::{
@ -73,7 +71,7 @@ error_chain! {
}
}
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
pub enum QueryResults {
Scalar(Option<TypedValue>),
Tuple(Option<Vec<TypedValue>>),

View file

@ -10,7 +10,6 @@
use mentat_core::{
SQLValueType,
SQLValueTypeSet,
TypedValue,
ValueType,
};

View file

@ -13,6 +13,9 @@
use std::sync::{Arc, Mutex};
use rusqlite;
use rusqlite::{
TransactionBehavior,
};
use edn;
@ -27,6 +30,8 @@ use mentat_db::{
TxReport,
};
use mentat_tx;
use mentat_tx_parser;
use errors::*;
@ -78,6 +83,83 @@ pub struct Conn {
// the schema changes. #315.
}
/// Represents an in-progress, not yet committed, set of changes to the store.
/// Call `commit` to commit your changes, or `rollback` to discard them.
/// A transaction is held open until you do so.
/// Your changes will be implicitly dropped along with this struct.
pub struct InProgress<'a, 'c> {
transaction: rusqlite::Transaction<'c>,
mutex: &'a Mutex<Metadata>,
generation: u64,
partition_map: PartitionMap,
schema: Schema,
last_report: Option<TxReport>, // For now we track only the last, but we could accumulate all.
}
impl<'a, 'c> InProgress<'a, 'c> {
pub fn transact_entities<I>(mut self, entities: I) -> Result<InProgress<'a, 'c>> where I: IntoIterator<Item=mentat_tx::entities::Entity> {
let (report, next_partition_map, next_schema) = transact(&self.transaction, self.partition_map, &self.schema, &self.schema, entities)?;
self.partition_map = next_partition_map;
if let Some(schema) = next_schema {
self.schema = schema;
}
self.last_report = Some(report);
Ok(self)
}
/// Query the Mentat store, using the given connection and the current metadata.
pub fn q_once<T>(&self,
query: &str,
inputs: T) -> Result<QueryResults>
where T: Into<Option<QueryInputs>>
{
q_once(&*(self.transaction),
&self.schema,
query,
inputs)
}
pub fn transact(self, transaction: &str) -> Result<InProgress<'a, 'c>> {
let assertion_vector = edn::parse::value(transaction)?;
let entities = mentat_tx_parser::Tx::parse(&assertion_vector)?;
self.transact_entities(entities)
}
pub fn last_report(&self) -> Option<&TxReport> {
self.last_report.as_ref()
}
pub fn rollback(mut self) -> Result<()> {
self.last_report = None;
self.transaction.rollback().map_err(|e| e.into())
}
pub fn commit(self) -> Result<Option<TxReport>> {
// The mutex is taken during this entire method.
let mut metadata = self.mutex.lock().unwrap();
if self.generation != metadata.generation {
// Somebody else wrote!
// Retrying is tracked by https://github.com/mozilla/mentat/issues/357.
// This should not occur -- an attempt to take a competing IMMEDIATE transaction
// will fail with `SQLITE_BUSY`, causing this function to abort.
bail!("Lost the transact() race!");
}
// Commit the SQLite transaction while we hold the mutex.
self.transaction.commit()?;
metadata.generation += 1;
metadata.partition_map = self.partition_map;
if self.schema != *(metadata.schema) {
metadata.schema = Arc::new(self.schema);
}
Ok(self.last_report)
}
}
impl Conn {
// Intentionally not public.
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
@ -123,17 +205,8 @@ impl Conn {
inputs)
}
/// Transact entities against the Mentat store, using the given connection and the current
/// metadata.
pub fn transact(&mut self,
sqlite: &mut rusqlite::Connection,
transaction: &str) -> Result<TxReport> {
let assertion_vector = edn::parse::value(transaction)?;
let entities = mentat_tx_parser::Tx::parse(&assertion_vector)?;
let tx = sqlite.transaction()?;
pub fn begin_transaction<'m, 'conn>(&'m mut self, sqlite: &'conn mut rusqlite::Connection) -> Result<InProgress<'m, 'conn>> {
let tx = sqlite.transaction_with_behavior(TransactionBehavior::Immediate)?;
let (current_generation, current_partition_map, current_schema) =
{
// The mutex is taken during this block.
@ -145,28 +218,32 @@ impl Conn {
current.schema.clone())
};
// The transaction is processed while the mutex is not held.
let (report, next_partition_map, next_schema) = transact(&tx, current_partition_map, &*current_schema, &*current_schema, entities)?;
Ok(InProgress {
mutex: &self.metadata,
transaction: tx,
generation: current_generation,
partition_map: current_partition_map,
schema: (*current_schema).clone(),
last_report: None,
})
}
{
// The mutex is taken during this block.
let mut metadata = self.metadata.lock().unwrap();
/// Transact entities against the Mentat store, using the given connection and the current
/// metadata.
pub fn transact(&mut self,
sqlite: &mut rusqlite::Connection,
transaction: &str) -> Result<TxReport> {
// Parse outside the SQL transaction. This is a tradeoff: we are limiting the scope of the
// transaction, and indeed we don't even create a SQL transaction if the provided input is
// invalid, but it means SQLite errors won't be found until the parse is complete, and if
// there's a race for the database (don't do that!) we are less likely to win it.
let assertion_vector = edn::parse::value(transaction)?;
let entities = mentat_tx_parser::Tx::parse(&assertion_vector)?;
if current_generation != metadata.generation {
// Somebody else wrote!
// Retrying is tracked by https://github.com/mozilla/mentat/issues/357.
bail!("Lost the transact() race!");
}
// Commit the SQLite transaction while we hold the mutex.
tx.commit()?;
metadata.generation += 1;
metadata.partition_map = next_partition_map;
if let Some(next_schema) = next_schema {
metadata.schema = Arc::new(next_schema);
}
}
let report = self.begin_transaction(sqlite)?
.transact_entities(entities)?
.commit()?
.expect("we always get a report");
Ok(report)
}
@ -177,6 +254,11 @@ mod tests {
use super::*;
extern crate mentat_parser_utils;
use mentat_core::{
TypedValue,
};
use mentat_db::USER0;
#[test]
fn test_transact_does_not_collide_existing_entids() {
@ -232,6 +314,98 @@ mod tests {
assert_eq!(report.tempids["temp"], next);
}
/// Return the entid that will be allocated to the next transacted tempid.
fn get_next_entid(conn: &Conn) -> i64 {
let partition_map = &conn.metadata.lock().unwrap().partition_map;
partition_map.get(":db.part/user").unwrap().index
}
#[test]
fn test_compound_transact() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
let tempid_offset = get_next_entid(&conn);
let t = "[[:db/add \"one\" :db/ident :a/keyword1] \
[:db/add \"two\" :db/ident :a/keyword2]]";
// This can refer to `t`, 'cos they occur in separate txes.
let t2 = "[{:db.schema/attribute \"three\", :db/ident :a/keyword1}]";
// Scoped borrow of `conn`.
{
let in_progress = conn.begin_transaction(&mut sqlite).expect("begun successfully");
let in_progress = in_progress.transact(t).expect("transacted successfully");
let one = in_progress.last_report().unwrap().tempids.get("one").expect("found one").clone();
let two = in_progress.last_report().unwrap().tempids.get("two").expect("found two").clone();
assert!(one != two);
assert!(one == tempid_offset || one == tempid_offset + 1);
assert!(two == tempid_offset || two == tempid_offset + 1);
let during = in_progress.q_once("[:find ?x . :where [?x :db/ident :a/keyword1]]", None)
.expect("query succeeded");
assert_eq!(during, QueryResults::Scalar(Some(TypedValue::Ref(one))));
let report = in_progress.transact(t2)
.expect("t2 succeeded")
.commit()
.expect("commit succeeded");
let three = report.unwrap().tempids.get("three").expect("found three").clone();
assert!(one != three);
assert!(two != three);
}
// The DB part table changed.
let tempid_offset_after = get_next_entid(&conn);
assert_eq!(tempid_offset + 3, tempid_offset_after);
}
#[test]
fn test_compound_rollback() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
let tempid_offset = get_next_entid(&conn);
// Nothing in the store => USER0 should be our starting point.
assert_eq!(tempid_offset, USER0);
let t = "[[:db/add \"one\" :db/ident :a/keyword1] \
[:db/add \"two\" :db/ident :a/keyword2]]";
// Scoped borrow of `sqlite`.
{
let in_progress = conn.begin_transaction(&mut sqlite).expect("begun successfully");
let in_progress = in_progress.transact(t).expect("transacted successfully");
let one = in_progress.last_report().unwrap().tempids.get("one").expect("found it").clone();
let two = in_progress.last_report().unwrap().tempids.get("two").expect("found it").clone();
// The IDs are contiguous, starting at the previous part index.
assert!(one != two);
assert!(one == tempid_offset || one == tempid_offset + 1);
assert!(two == tempid_offset || two == tempid_offset + 1);
// Inside the InProgress we can see our changes.
let during = in_progress.q_once("[:find ?x . :where [?x :db/ident :a/keyword1]]", None)
.expect("query succeeded");
assert_eq!(during, QueryResults::Scalar(Some(TypedValue::Ref(one))));
in_progress.rollback()
.expect("rollback succeeded");
}
let after = conn.q_once(&mut sqlite, "[:find ?x . :where [?x :db/ident :a/keyword1]]", None)
.expect("query succeeded");
assert_eq!(after, QueryResults::Scalar(None));
// The DB part table is unchanged.
let tempid_offset_after = get_next_entid(&conn);
assert_eq!(tempid_offset, tempid_offset_after);
}
#[test]
fn test_transact_errors() {
let mut sqlite = db::new_connection("").unwrap();

View file

@ -22,6 +22,7 @@ extern crate mentat_query_parser;
extern crate mentat_query_projector;
extern crate mentat_query_translator;
extern crate mentat_sql;
extern crate mentat_tx;
extern crate mentat_tx_parser;
use rusqlite::Connection;