
1798 lines
71 KiB
Raw Normal View History

// Copyright 2016 Mozilla
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::borrow::{
use std::collections::{
use std::fs::{
use std::io::{
use std::path::{
use std::sync::{
use rusqlite;
use rusqlite::{
use edn;
use mentat_core::{
use mentat_core::intern_set::InternSet;
use mentat_db::cache::{
use mentat_db::db;
use mentat_db::{
use mentat_db::internal_types::TermWithTempIds;
use mentat_query_pull::{
use mentat_tx;
use mentat_tx::entities::{
use mentat_tolstoy::Syncer;
use uuid::Uuid;
use entity_builder::{
use errors::*;
use query::{
/// Connection metadata required to query from, or apply transactions to, a Mentat store.
/// Owned data for the volatile parts (generation and partition map), and `Arc` for the infrequently
/// changing parts (schema) that we want to share across threads.
/// See
pub struct Metadata {
pub generation: u64,
pub partition_map: PartitionMap,
pub schema: Arc<Schema>,
pub attribute_cache: SQLiteAttributeCache,
impl Metadata {
// Intentionally not public.
fn new(generation: u64, partition_map: PartitionMap, schema: Arc<Schema>, cache: SQLiteAttributeCache) -> Metadata {
Metadata {
generation: generation,
partition_map: partition_map,
schema: schema,
attribute_cache: cache,
/// A mutable, safe reference to the current Mentat store.
pub struct Conn {
/// `Mutex` since all reads and writes need to be exclusive. Internally, owned data for the
/// volatile parts (generation and partition map), and `Arc` for the infrequently changing parts
/// (schema, cache) that we want to share across threads. A consuming thread may use a shared
/// reference after the `Conn`'s `Metadata` has moved on.
/// The motivating case is multiple query threads taking references to the current schema to
/// perform long-running queries while a single writer thread moves the metadata -- partition
/// map and schema -- forward.
/// We want the attribute cache to be isolated across transactions, updated within
/// `InProgress` writes, and updated in the `Conn` on commit. To achieve this we
/// store the cache itself in an `Arc` inside `SQLiteAttributeCache`, so that `.get_mut()`
/// gives us copy-on-write semantics.
/// We store that cached `Arc` here in a `Mutex`, so that the main copy can be carefully
/// replaced on commit.
metadata: Mutex<Metadata>,
// TODO: maintain set of change listeners or handles to transaction report queues. #298.
// TODO: maintain cache of query plans that could be shared across threads and invalidated when
// the schema changes. #315.
tx_observer_service: Mutex<TxObservationService>,
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
/// for applications that don't require complex connection management.
pub struct Store {
conn: Conn,
sqlite: rusqlite::Connection,
impl Store {
/// Open a store at the supplied path, ensuring that it includes the bootstrap schema.
pub fn open(path: &str) -> Result<Store> {
let mut connection = ::new_connection(path)?;
let conn = Conn::connect(&mut connection)?;
Ok(Store {
conn: conn,
sqlite: connection,
/// Returns a totally blank store with no bootstrap schema. Use `open` instead.
pub fn open_empty(path: &str) -> Result<Store> {
if !path.is_empty() {
if Path::new(path).exists() {
let mut connection = ::new_connection(path)?;
let conn = Conn::empty(&mut connection)?;
Ok(Store {
conn: conn,
sqlite: connection,
pub fn transact(&mut self, transaction: &str) -> Result<TxReport> {
let mut ip = self.begin_transaction()?;
let report = ip.transact(transaction)?;
pub trait Queryable {
fn q_explain<T>(&self, query: &str, inputs: T) -> Result<QueryExplanation>
where T: Into<Option<QueryInputs>>;
fn q_once<T>(&self, query: &str, inputs: T) -> Result<QueryOutput>
where T: Into<Option<QueryInputs>>;
fn q_prepare<T>(&self, query: &str, inputs: T) -> PreparedResult
where T: Into<Option<QueryInputs>>;
fn lookup_values_for_attribute<E>(&self, entity: E, attribute: &edn::Keyword) -> Result<Vec<TypedValue>>
where E: Into<Entid>;
fn lookup_value_for_attribute<E>(&self, entity: E, attribute: &edn::Keyword) -> Result<Option<TypedValue>>
where E: Into<Entid>;
pub trait Pullable {
fn pull_attributes_for_entities<E, A>(&self, entities: E, attributes: A) -> Result<BTreeMap<Entid, ValueRc<StructuredMap>>>
where E: IntoIterator<Item=Entid>,
A: IntoIterator<Item=Entid>;
fn pull_attributes_for_entity<A>(&self, entity: Entid, attributes: A) -> Result<StructuredMap>
where A: IntoIterator<Item=Entid>;
pub trait Syncable {
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>;
/// Represents an in-progress, not yet committed, set of changes to the store.
/// Call `commit` to commit your changes, or `rollback` to discard them.
/// A transaction is held open until you do so.
/// Your changes will be implicitly dropped along with this struct.
pub struct InProgress<'a, 'c> {
transaction: rusqlite::Transaction<'c>,
mutex: &'a Mutex<Metadata>,
generation: u64,
partition_map: PartitionMap,
schema: Schema,
cache: InProgressSQLiteAttributeCache,
use_caching: bool,
tx_observer: &'a Mutex<TxObservationService>,
tx_observer_watcher: InProgressObserverTransactWatcher,
/// Represents an in-progress set of reads to the store. Just like `InProgress`,
/// which is read-write, but only allows for reads.
pub struct InProgressRead<'a, 'c>(InProgress<'a, 'c>);
impl<'a, 'c> Queryable for InProgressRead<'a, 'c> {
fn q_once<T>(&self, query: &str, inputs: T) -> Result<QueryOutput>
where T: Into<Option<QueryInputs>> {
self.0.q_once(query, inputs)
fn q_prepare<T>(&self, query: &str, inputs: T) -> PreparedResult
where T: Into<Option<QueryInputs>> {
self.0.q_prepare(query, inputs)
fn q_explain<T>(&self, query: &str, inputs: T) -> Result<QueryExplanation>
where T: Into<Option<QueryInputs>> {
self.0.q_explain(query, inputs)
fn lookup_values_for_attribute<E>(&self, entity: E, attribute: &edn::Keyword) -> Result<Vec<TypedValue>>
where E: Into<Entid> {
self.0.lookup_values_for_attribute(entity, attribute)
fn lookup_value_for_attribute<E>(&self, entity: E, attribute: &edn::Keyword) -> Result<Option<TypedValue>>
where E: Into<Entid> {
self.0.lookup_value_for_attribute(entity, attribute)
impl<'a, 'c> Pullable for InProgressRead<'a, 'c> {
fn pull_attributes_for_entities<E, A>(&self, entities: E, attributes: A) -> Result<BTreeMap<Entid, ValueRc<StructuredMap>>>
where E: IntoIterator<Item=Entid>,
A: IntoIterator<Item=Entid> {
self.0.pull_attributes_for_entities(entities, attributes)
fn pull_attributes_for_entity<A>(&self, entity: Entid, attributes: A) -> Result<StructuredMap>
where A: IntoIterator<Item=Entid> {
self.0.pull_attributes_for_entity(entity, attributes)
impl<'a, 'c> Queryable for InProgress<'a, 'c> {
fn q_once<T>(&self, query: &str, inputs: T) -> Result<QueryOutput>
where T: Into<Option<QueryInputs>> {
if self.use_caching {
let known = Known::new(&self.schema, Some(&self.cache));
} else {
fn q_prepare<T>(&self, query: &str, inputs: T) -> PreparedResult
where T: Into<Option<QueryInputs>> {
let known = Known::new(&self.schema, Some(&self.cache));
fn q_explain<T>(&self, query: &str, inputs: T) -> Result<QueryExplanation>
where T: Into<Option<QueryInputs>> {
let known = Known::new(&self.schema, Some(&self.cache));
fn lookup_values_for_attribute<E>(&self, entity: E, attribute: &edn::Keyword) -> Result<Vec<TypedValue>>
where E: Into<Entid> {
let known = Known::new(&self.schema, Some(&self.cache));
lookup_values_for_attribute(&*(self.transaction), known, entity, attribute)
fn lookup_value_for_attribute<E>(&self, entity: E, attribute: &edn::Keyword) -> Result<Option<TypedValue>>
where E: Into<Entid> {
let known = Known::new(&self.schema, Some(&self.cache));
lookup_value_for_attribute(&*(self.transaction), known, entity, attribute)
impl<'a, 'c> Pullable for InProgress<'a, 'c> {
fn pull_attributes_for_entities<E, A>(&self, entities: E, attributes: A) -> Result<BTreeMap<Entid, ValueRc<StructuredMap>>>
where E: IntoIterator<Item=Entid>,
A: IntoIterator<Item=Entid> {
pull_attributes_for_entities(&self.schema, &*(self.transaction), entities, attributes)
.map_err(|e| e.into())
fn pull_attributes_for_entity<A>(&self, entity: Entid, attributes: A) -> Result<StructuredMap>
where A: IntoIterator<Item=Entid> {
pull_attributes_for_entity(&self.schema, &*(self.transaction), entity, attributes)
.map_err(|e| e.into())
impl<'a, 'c> HasSchema for InProgressRead<'a, 'c> {
fn entid_for_type(&self, t: ValueType) -> Option<KnownEntid> {
fn get_ident<T>(&self, x: T) -> Option<&Keyword> where T: Into<Entid> {
fn get_entid(&self, x: &Keyword) -> Option<KnownEntid> {
fn attribute_for_entid<T>(&self, x: T) -> Option<&Attribute> where T: Into<Entid> {
fn attribute_for_ident(&self, ident: &Keyword) -> Option<(&Attribute, KnownEntid)> {
/// Return true if the provided entid identifies an attribute in this schema.
fn is_attribute<T>(&self, x: T) -> bool where T: Into<Entid> {
/// Return true if the provided ident identifies an attribute in this schema.
fn identifies_attribute(&self, x: &Keyword) -> bool {
fn component_attributes(&self) -> &[Entid] {
impl<'a, 'c> HasSchema for InProgress<'a, 'c> {
fn entid_for_type(&self, t: ValueType) -> Option<KnownEntid> {
fn get_ident<T>(&self, x: T) -> Option<&Keyword> where T: Into<Entid> {
fn get_entid(&self, x: &Keyword) -> Option<KnownEntid> {
fn attribute_for_entid<T>(&self, x: T) -> Option<&Attribute> where T: Into<Entid> {
fn attribute_for_ident(&self, ident: &Keyword) -> Option<(&Attribute, KnownEntid)> {
/// Return true if the provided entid identifies an attribute in this schema.
fn is_attribute<T>(&self, x: T) -> bool where T: Into<Entid> {
/// Return true if the provided ident identifies an attribute in this schema.
fn identifies_attribute(&self, x: &Keyword) -> bool {
fn component_attributes(&self) -> &[Entid] {
impl<'a, 'c> InProgress<'a, 'c> {
pub fn builder(self) -> InProgressBuilder<'a, 'c> {
/// Choose whether to use in-memory caches for running queries.
pub fn use_caching(&mut self, yesno: bool) {
self.use_caching = yesno;
/// If you only have a reference to an `InProgress`, you can't use the easy builder.
/// This exists so you can make your own.
pub fn transact_builder(&mut self, builder: TermBuilder) -> Result<TxReport> {
.and_then(|(terms, tempid_set)| {
self.transact_terms(terms, tempid_set)
pub fn transact_terms<I>(&mut self, terms: I, tempid_set: InternSet<TempId>) -> Result<TxReport> where I: IntoIterator<Item=TermWithTempIds> {
let w = InProgressTransactWatcher::new(
&mut self.tx_observer_watcher,
let (report, next_partition_map, next_schema, _watcher) =
self.partition_map = next_partition_map;
if let Some(schema) = next_schema {
self.schema = schema;
pub fn transact_entities<I>(&mut self, entities: I) -> Result<TxReport> where I: IntoIterator<Item=mentat_tx::entities::Entity> {
// We clone the partition map here, rather than trying to use a Cell or using a mutable
// reference, for two reasons:
// 1. `transact` allocates new IDs in partitions before and while doing work that might
// fail! We don't want to mutate this map on failure, so we can't just use &mut.
// 2. Even if we could roll that back, we end up putting this `PartitionMap` into our
// `Metadata` on return. If we used `Cell` or other mechanisms, we'd be using
// `Default::default` in those situations to extract the partition map, and so there
// would still be some cost.
let w = InProgressTransactWatcher::new(
&mut self.tx_observer_watcher,
let (report, next_partition_map, next_schema, _watcher) =
self.partition_map = next_partition_map;
if let Some(schema) = next_schema {
self.schema = schema;
pub fn transact<B>(&mut self, transaction: B) -> Result<TxReport> where B: Borrow<str> {
let entities = edn::parse::entities(transaction.borrow())?;
pub fn import<P>(&mut self, path: P) -> Result<TxReport>
where P: AsRef<Path> {
let mut file = File::open(path)?;
let mut text: String = String::new();
file.read_to_string(&mut text)?;
pub fn rollback(self) -> Result<()> {
self.transaction.rollback().map_err(|e| e.into())
pub fn commit(self) -> Result<()> {
// The mutex is taken during this entire method.
let mut metadata = self.mutex.lock().unwrap();
if self.generation != metadata.generation {
// Somebody else wrote!
// Retrying is tracked by
// This should not occur -- an attempt to take a competing IMMEDIATE transaction
// will fail with `SQLITE_BUSY`, causing this function to abort.
bail!("Lost the transact() race!");
// Commit the SQLite transaction while we hold the mutex.
metadata.generation += 1;
metadata.partition_map = self.partition_map;
// Update the conn's cache if we made any changes.
self.cache.commit_to(&mut metadata.attribute_cache);
if self.schema != *(metadata.schema) {
metadata.schema = Arc::new(self.schema);
// TODO: rebuild vocabularies and notify consumers that they've changed -- it's possible
// that a change has arrived over the wire and invalidated some local module.
// TODO: consider making vocabulary lookup lazy -- we won't need it much of the time.
let txes = self.tx_observer_watcher.txes;
pub fn cache(&mut self,
attribute: &Keyword,
cache_direction: CacheDirection,
cache_action: CacheAction) -> Result<()> {
let attribute_entid: Entid = self.schema
.ok_or_else(|| ErrorKind::UnknownAttribute(attribute.to_string()))?.1.into();
match cache_action {
CacheAction::Register => {
match cache_direction {
CacheDirection::Both => self.cache.register(&self.schema, &self.transaction, attribute_entid),
CacheDirection::Forward => self.cache.register_forward(&self.schema, &self.transaction, attribute_entid),
CacheDirection::Reverse => self.cache.register_reverse(&self.schema, &self.transaction, attribute_entid),
}.map_err(|e| e.into())
CacheAction::Deregister => {
struct InProgressTransactWatcher<'a, 'o> {
cache_watcher: InProgressCacheTransactWatcher<'a>,
observer_watcher: &'o mut InProgressObserverTransactWatcher,
tx_id: Option<Entid>,
impl<'a, 'o> InProgressTransactWatcher<'a, 'o> {
fn new(observer_watcher: &'o mut InProgressObserverTransactWatcher, cache_watcher: InProgressCacheTransactWatcher<'a>) -> Self {
InProgressTransactWatcher {
cache_watcher: cache_watcher,
observer_watcher: observer_watcher,
tx_id: None,
impl<'a, 'o> TransactWatcher for InProgressTransactWatcher<'a, 'o> {
fn datom(&mut self, op: OpType, e: Entid, a: Entid, v: &TypedValue) {
self.cache_watcher.datom(op.clone(), e.clone(), a.clone(), v);
self.observer_watcher.datom(op.clone(), e.clone(), a.clone(), v);
fn done(&mut self, t: &Entid, schema: &Schema) -> ::mentat_db::errors::Result<()> {
self.cache_watcher.done(t, schema)?;
self.observer_watcher.done(t, schema)?;
self.tx_id = Some(t.clone());
impl Store {
/// Intended for use from tests.
pub fn sqlite_mut(&mut self) -> &mut rusqlite::Connection {
&mut self.sqlite
pub fn is_registered_as_observer(&self, key: &String) -> bool {
impl Store {
pub fn dismantle(self) -> (rusqlite::Connection, Conn) {
(self.sqlite, self.conn)
pub fn conn(&self) -> &Conn {
pub fn begin_read<'m>(&'m mut self) -> Result<InProgressRead<'m, 'm>> {
self.conn.begin_read(&mut self.sqlite)
pub fn begin_transaction<'m>(&'m mut self) -> Result<InProgress<'m, 'm>> {
self.conn.begin_transaction(&mut self.sqlite)
pub fn cache(&mut self, attr: &Keyword, direction: CacheDirection) -> Result<()> {
let schema = &self.conn.current_schema();
self.conn.cache(&mut self.sqlite,
pub fn register_observer(&mut self, key: String, observer: Arc<TxObserver>) {
self.conn.register_observer(key, observer);
pub fn unregister_observer(&mut self, key: &String) {
impl Queryable for Store {
fn q_once<T>(&self, query: &str, inputs: T) -> Result<QueryOutput>
where T: Into<Option<QueryInputs>> {
self.conn.q_once(&self.sqlite, query, inputs)
fn q_prepare<T>(&self, query: &str, inputs: T) -> PreparedResult
where T: Into<Option<QueryInputs>> {
self.conn.q_prepare(&self.sqlite, query, inputs)
fn q_explain<T>(&self, query: &str, inputs: T) -> Result<QueryExplanation>
where T: Into<Option<QueryInputs>> {
self.conn.q_explain(&self.sqlite, query, inputs)
fn lookup_values_for_attribute<E>(&self, entity: E, attribute: &edn::Keyword) -> Result<Vec<TypedValue>>
where E: Into<Entid> {
self.conn.lookup_values_for_attribute(&self.sqlite, entity.into(), attribute)
fn lookup_value_for_attribute<E>(&self, entity: E, attribute: &edn::Keyword) -> Result<Option<TypedValue>>
where E: Into<Entid> {
self.conn.lookup_value_for_attribute(&self.sqlite, entity.into(), attribute)
impl Pullable for Store {
fn pull_attributes_for_entities<E, A>(&self, entities: E, attributes: A) -> Result<BTreeMap<Entid, ValueRc<StructuredMap>>>
where E: IntoIterator<Item=Entid>,
A: IntoIterator<Item=Entid> {
self.conn.pull_attributes_for_entities(&self.sqlite, entities, attributes)
fn pull_attributes_for_entity<A>(&self, entity: Entid, attributes: A) -> Result<StructuredMap>
where A: IntoIterator<Item=Entid> {
self.conn.pull_attributes_for_entity(&self.sqlite, entity, attributes)
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum CacheDirection {
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum CacheAction {
impl Syncable for Store {
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> {
let uuid = Uuid::parse_str(&user_uuid)?;
Ok(Syncer::flow(&mut self.sqlite, server_uri, &uuid)?)
impl Conn {
// Intentionally not public.
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
Conn {
metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema), Default::default())),
tx_observer_service: Mutex::new(TxObservationService::new()),
/// Prepare the provided SQLite handle for use as a Mentat store. Creates tables but
/// _does not_ write the bootstrap schema. This constructor should only be used by
/// consumers that expect to populate raw transaction data themselves.
fn empty(sqlite: &mut rusqlite::Connection) -> Result<Conn> {
let (tx, db) = db::create_empty_current_version(sqlite)
.chain_err(|| "Unable to initialize Mentat store")?;
Ok(Conn::new(db.partition_map, db.schema))
pub fn connect(sqlite: &mut rusqlite::Connection) -> Result<Conn> {
let db = db::ensure_current_version(sqlite)
.chain_err(|| "Unable to initialize Mentat store")?;
Ok(Conn::new(db.partition_map, db.schema))
/// Yield a clone of the current `Schema` instance.
pub fn current_schema(&self) -> Arc<Schema> {
// We always unwrap the mutex lock: if it's poisoned, this will propogate panics to all
// accessing threads. This is perhaps not reasonable; we expect the mutex to be held for
// very short intervals, but a panic during a critical update section is possible, since the
// lock encapsulates committing a SQL transaction.
// That being said, in the future we will provide an interface to take the mutex, providing
// maximum flexibility for Mentat consumers.
// This approach might need to change when we support interrupting query threads (#297), and
// will definitely need to change if we support interrupting transactor threads.
// Improving this is tracked by
pub fn current_cache(&self) -> SQLiteAttributeCache {
/// Query the Mentat store, using the given connection and the current metadata.
pub fn q_once<T>(&self,
sqlite: &rusqlite::Connection,
query: &str,
inputs: T) -> Result<QueryOutput>
where T: Into<Option<QueryInputs>> {
// Doesn't clone, unlike `current_schema`.
let metadata = self.metadata.lock().unwrap();
let known = Known::new(&*metadata.schema, Some(&metadata.attribute_cache));
/// Query the Mentat store, using the given connection and the current metadata,
/// but without using the cache.
pub fn q_uncached<T>(&self,
sqlite: &rusqlite::Connection,
query: &str,
inputs: T) -> Result<QueryOutput>
where T: Into<Option<QueryInputs>> {
let metadata = self.metadata.lock().unwrap();
&*metadata.schema, // Doesn't clone, unlike `current_schema`.
pub fn q_prepare<'sqlite, 'query, T>(&self,
sqlite: &'sqlite rusqlite::Connection,
query: &'query str,
inputs: T) -> PreparedResult<'sqlite>
where T: Into<Option<QueryInputs>> {
let metadata = self.metadata.lock().unwrap();
let known = Known::new(&*metadata.schema, Some(&metadata.attribute_cache));
pub fn q_explain<T>(&self,
sqlite: &rusqlite::Connection,
query: &str,
inputs: T) -> Result<QueryExplanation>
where T: Into<Option<QueryInputs>>
let metadata = self.metadata.lock().unwrap();
let known = Known::new(&*metadata.schema, Some(&metadata.attribute_cache));
pub fn pull_attributes_for_entities<E, A>(&self,
sqlite: &rusqlite::Connection,
entities: E,
attributes: A) -> Result<BTreeMap<Entid, ValueRc<StructuredMap>>>
where E: IntoIterator<Item=Entid>,
A: IntoIterator<Item=Entid> {
let metadata = self.metadata.lock().unwrap();
let schema = &*metadata.schema;
pull_attributes_for_entities(schema, sqlite, entities, attributes)
.map_err(|e| e.into())
pub fn pull_attributes_for_entity<A>(&self,
sqlite: &rusqlite::Connection,
entity: Entid,
attributes: A) -> Result<StructuredMap>
where A: IntoIterator<Item=Entid> {
let metadata = self.metadata.lock().unwrap();
let schema = &*metadata.schema;
pull_attributes_for_entity(schema, sqlite, entity, attributes)
.map_err(|e| e.into())
pub fn lookup_values_for_attribute(&self,
sqlite: &rusqlite::Connection,
entity: Entid,
attribute: &edn::Keyword) -> Result<Vec<TypedValue>> {
let metadata = self.metadata.lock().unwrap();
let known = Known::new(&*metadata.schema, Some(&metadata.attribute_cache));
lookup_values_for_attribute(sqlite, known, entity, attribute)
pub fn lookup_value_for_attribute(&self,
sqlite: &rusqlite::Connection,
entity: Entid,
attribute: &edn::Keyword) -> Result<Option<TypedValue>> {
let metadata = self.metadata.lock().unwrap();
let known = Known::new(&*metadata.schema, Some(&metadata.attribute_cache));
lookup_value_for_attribute(sqlite, known, entity, attribute)
2017-12-07 20:19:59 +00:00
/// Take a SQLite transaction.
fn begin_transaction_with_behavior<'m, 'conn>(&'m mut self, sqlite: &'conn mut rusqlite::Connection, behavior: TransactionBehavior) -> Result<InProgress<'m, 'conn>> {
let tx = sqlite.transaction_with_behavior(behavior)?;
let (current_generation, current_partition_map, current_schema, cache_cow) =
// The mutex is taken during this block.
let ref current: Metadata = *self.metadata.lock().unwrap();
// Expensive, but the partition map is updated after every committed transaction.
// Cheap.
Ok(InProgress {
mutex: &self.metadata,
transaction: tx,
generation: current_generation,
partition_map: current_partition_map,
schema: (*current_schema).clone(),
cache: InProgressSQLiteAttributeCache::from_cache(cache_cow),
use_caching: true,
tx_observer: &self.tx_observer_service,
tx_observer_watcher: InProgressObserverTransactWatcher::new(),
// Helper to avoid passing connections around.
// Make both args mutable so that we can't have parallel access.
pub fn begin_read<'m, 'conn>(&'m mut self, sqlite: &'conn mut rusqlite::Connection) -> Result<InProgressRead<'m, 'conn>> {
self.begin_transaction_with_behavior(sqlite, TransactionBehavior::Deferred)
pub fn begin_uncached_read<'m, 'conn>(&'m mut self, sqlite: &'conn mut rusqlite::Connection) -> Result<InProgressRead<'m, 'conn>> {
self.begin_transaction_with_behavior(sqlite, TransactionBehavior::Deferred)
.map(|mut ip| {
/// IMMEDIATE means 'start the transaction now, but don't exclude readers'. It prevents other
/// connections from taking immediate or exclusive transactions. This is appropriate for our
/// writes and `InProgress`: it means we are ready to write whenever we want to, and nobody else
/// can start a transaction that's not `DEFERRED`, but we don't need exclusivity yet.
pub fn begin_transaction<'m, 'conn>(&'m mut self, sqlite: &'conn mut rusqlite::Connection) -> Result<InProgress<'m, 'conn>> {
self.begin_transaction_with_behavior(sqlite, TransactionBehavior::Immediate)
/// Transact entities against the Mentat store, using the given connection and the current
/// metadata.
pub fn transact<B>(&mut self,
sqlite: &mut rusqlite::Connection,
transaction: B) -> Result<TxReport> where B: Borrow<str> {
// Parse outside the SQL transaction. This is a tradeoff: we are limiting the scope of the
// transaction, and indeed we don't even create a SQL transaction if the provided input is
// invalid, but it means SQLite errors won't be found until the parse is complete, and if
// there's a race for the database (don't do that!) we are less likely to win it.
let entities = edn::parse::entities(transaction.borrow())?;
let mut in_progress = self.begin_transaction(sqlite)?;
let report = in_progress.transact_entities(entities)?;
/// Adds or removes the values of a given attribute to an in-memory cache.
/// The attribute should be a namespaced string: e.g., `:foo/bar`.
/// `cache_action` determines if the attribute should be added or removed from the cache.
/// CacheAction::Add is idempotent - each attribute is only added once.
/// CacheAction::Remove throws an error if the attribute does not currently exist in the cache.
pub fn cache(&mut self,
sqlite: &mut rusqlite::Connection,
schema: &Schema,
attribute: &Keyword,
cache_direction: CacheDirection,
cache_action: CacheAction) -> Result<()> {
let mut metadata = self.metadata.lock().unwrap();
let attribute_entid: Entid;
// Immutable borrow of metadata.
attribute_entid = metadata.schema
.ok_or_else(|| ErrorKind::UnknownAttribute(attribute.to_string()))?.1.into();
let cache = &mut metadata.attribute_cache;
match cache_action {
CacheAction::Register => {
match cache_direction {
CacheDirection::Both => cache.register(schema, sqlite, attribute_entid),
CacheDirection::Forward => cache.register_forward(schema, sqlite, attribute_entid),
CacheDirection::Reverse => cache.register_reverse(schema, sqlite, attribute_entid),
}.map_err(|e| e.into())
CacheAction::Deregister => {
pub fn register_observer(&mut self, key: String, observer: Arc<TxObserver>) {
self.tx_observer_service.lock().unwrap().register(key, observer);
pub fn unregister_observer(&mut self, key: &String) {
mod tests {
use super::*;
extern crate time;
extern crate mentat_parser_utils;
use std::collections::{
use std::path::{
use std::sync::mpsc;
2018-03-09 12:18:11 +00:00
use std::time::{
2018-03-09 12:18:11 +00:00
use mentat_core::{
2018-03-09 12:18:11 +00:00
use ::entity_builder::{
use ::query::{
use ::{
2018-03-09 12:18:11 +00:00
use ::vocabulary::{
use ::vocabulary::attribute::{
2018-03-09 12:18:11 +00:00
use mentat_db::USER0;
fn test_transact_does_not_collide_existing_entids() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
// Let's find out the next ID that'll be allocated. We're going to try to collide with it
// a bit later.
let next = conn.metadata.lock().expect("metadata")
let t = format!("[[:db/add {} :db.schema/attribute \"tempid\"]]", next + 1);
match conn.transact(&mut sqlite, t.as_str()).unwrap_err() {
Error(ErrorKind::DbError(::mentat_db::errors::ErrorKind::UnrecognizedEntid(e)), _) => {
assert_eq!(e, next + 1);
x => panic!("expected transact error, got {:?}", x),
// Transact two more tempids.
let t = "[[:db/add \"one\" :db.schema/attribute \"more\"]]";
let report = conn.transact(&mut sqlite, t)
.expect("transact succeeded");
assert_eq!(report.tempids["more"], next);
assert_eq!(report.tempids["one"], next + 1);
fn test_transact_does_not_collide_new_entids() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
// Let's find out the next ID that'll be allocated. We're going to try to collide with it.
let next = conn.metadata.lock().expect("metadata").partition_map[":db.part/user"].index;
// If this were to be resolved, we'd get [:db/add 65537 :db.schema/attribute 65537], but
// we should reject this, because the first ID was provided by the user!
let t = format!("[[:db/add {} :db.schema/attribute \"tempid\"]]", next);
match conn.transact(&mut sqlite, t.as_str()).unwrap_err() {
Error(ErrorKind::DbError(::mentat_db::errors::ErrorKind::UnrecognizedEntid(e)), _) => {
// All this, despite this being the ID we were about to allocate!
assert_eq!(e, next);
x => panic!("expected transact error, got {:?}", x),
// And if we subsequently transact in a way that allocates one ID, we _will_ use that one.
// Note that `10` is a bootstrapped entid; we use it here as a known-good value.
let t = "[[:db/add 10 :db.schema/attribute \"temp\"]]";
let report = conn.transact(&mut sqlite, t)
.expect("transact succeeded");
assert_eq!(report.tempids["temp"], next);
/// Return the entid that will be allocated to the next transacted tempid.
fn get_next_entid(conn: &Conn) -> i64 {
let partition_map = &conn.metadata.lock().unwrap().partition_map;
fn test_compound_transact() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
let tempid_offset = get_next_entid(&conn);
let t = "[[:db/add \"one\" :db/ident :a/keyword1] \
[:db/add \"two\" :db/ident :a/keyword2]]";
// This can refer to `t`, 'cos they occur in separate txes.
let t2 = "[{:db.schema/attribute \"three\", :db/ident :a/keyword1}]";
// Scoped borrow of `conn`.
let mut in_progress = conn.begin_transaction(&mut sqlite).expect("begun successfully");
let report = in_progress.transact(t).expect("transacted successfully");
let one = report.tempids.get("one").expect("found one").clone();
let two = report.tempids.get("two").expect("found two").clone();
assert!(one != two);
assert!(one == tempid_offset || one == tempid_offset + 1);
assert!(two == tempid_offset || two == tempid_offset + 1);
println!("RES: {:?}", in_progress.q_once("[:find ?v :where [?x :db/ident ?v]]", None).unwrap());
let during = in_progress.q_once("[:find ?x . :where [?x :db/ident :a/keyword1]]", None)
.expect("query succeeded");
assert_eq!(during.results, QueryResults::Scalar(Some(TypedValue::Ref(one).into())));
let report = in_progress.transact(t2).expect("t2 succeeded");
in_progress.commit().expect("commit succeeded");
let three = report.tempids.get("three").expect("found three").clone();
assert!(one != three);
assert!(two != three);
// The DB part table changed.
let tempid_offset_after = get_next_entid(&conn);
assert_eq!(tempid_offset + 3, tempid_offset_after);
fn test_simple_prepared_query() {
let mut c = db::new_connection("").expect("Couldn't open conn.");
let mut conn = Conn::connect(&mut c).expect("Couldn't open DB.");
conn.transact(&mut c, r#"[
[:db/add "s" :db/ident :foo/boolean]
[:db/add "s" :db/valueType :db.type/boolean]
[:db/add "s" :db/cardinality :db.cardinality/one]
]"#).expect("successful transaction");
let report = conn.transact(&mut c, r#"[
[:db/add "u" :foo/boolean true]
[:db/add "p" :foo/boolean false]
]"#).expect("successful transaction");
let yes = report.tempids.get("u").expect("found it").clone();
let vv = Variable::from_valid_name("?v");
let values = QueryInputs::with_value_sequence(vec![(vv, true.into())]);
let read = conn.begin_read(&mut c).expect("read");
// N.B., you might choose to algebrize _without_ validating that the
// types are known. In this query we know that `?v` must be a boolean,
// and so we can kinda generate our own required input types!
let mut prepared = read.q_prepare(r#"[:find [?x ...]
:in ?v
:where [?x :foo/boolean ?v]]"#,
values).expect("prepare succeeded");
let yeses ="result");
assert_eq!(yeses.results, QueryResults::Coll(vec![TypedValue::Ref(yes).into()]));
let yeses_again ="result");
assert_eq!(yeses_again.results, QueryResults::Coll(vec![TypedValue::Ref(yes).into()]));
fn test_compound_rollback() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
let tempid_offset = get_next_entid(&conn);
// Nothing in the store => USER0 should be our starting point.
assert_eq!(tempid_offset, USER0);
let t = "[[:db/add \"one\" :db/ident :a/keyword1] \
[:db/add \"two\" :db/ident :a/keyword2]]";
// Scoped borrow of `sqlite`.
let mut in_progress = conn.begin_transaction(&mut sqlite).expect("begun successfully");
let report = in_progress.transact(t).expect("transacted successfully");
let one = report.tempids.get("one").expect("found it").clone();
let two = report.tempids.get("two").expect("found it").clone();
// The IDs are contiguous, starting at the previous part index.
assert!(one != two);
assert!(one == tempid_offset || one == tempid_offset + 1);
assert!(two == tempid_offset || two == tempid_offset + 1);
// Inside the InProgress we can see our changes.
let during = in_progress.q_once("[:find ?x . :where [?x :db/ident :a/keyword1]]", None)
.expect("query succeeded");
assert_eq!(during.results, QueryResults::Scalar(Some(TypedValue::Ref(one).into())));
// And we can do direct lookup, too.
let kw = in_progress.lookup_value_for_attribute(one, &edn::Keyword::namespaced("db", "ident"))
.expect("lookup succeeded");
assert_eq!(kw, Some(TypedValue::Keyword(edn::Keyword::namespaced("a", "keyword1").into())));
.expect("rollback succeeded");
let after = conn.q_once(&mut sqlite, "[:find ?x . :where [?x :db/ident :a/keyword1]]", None)
.expect("query succeeded");
assert_eq!(after.results, QueryResults::Scalar(None));
// The DB part table is unchanged.
let tempid_offset_after = get_next_entid(&conn);
assert_eq!(tempid_offset, tempid_offset_after);
fn test_transact_errors() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
// Good: empty transaction.
let report = conn.transact(&mut sqlite, "[]").unwrap();
assert_eq!(report.tx_id, 0x10000000 + 1);
// Bad EDN: missing closing ']'.
let report = conn.transact(&mut sqlite, "[[:db/add \"t\" :db/ident :a/keyword]");
match report.unwrap_err() {
Error(ErrorKind::EdnParseError(_), _) => { },
x => panic!("expected EDN parse error, got {:?}", x),
// Good EDN.
let report = conn.transact(&mut sqlite, "[[:db/add \"t\" :db/ident :a/keyword]]").unwrap();
assert_eq!(report.tx_id, 0x10000000 + 2);
// Bad transaction data: missing leading :db/add.
let report = conn.transact(&mut sqlite, "[[\"t\" :db/ident :b/keyword]]");
match report.unwrap_err() {
Error(ErrorKind::EdnParseError(_), _) => { },
x => panic!("expected EDN parse error, got {:?}", x),
// Good transaction data.
let report = conn.transact(&mut sqlite, "[[:db/add \"u\" :db/ident :b/keyword]]").unwrap();
assert_eq!(report.tx_id, 0x10000000 + 3);
// Bad transaction based on state of store: conflicting upsert.
let report = conn.transact(&mut sqlite, "[[:db/add \"u\" :db/ident :a/keyword]
[:db/add \"u\" :db/ident :b/keyword]]");
match report.unwrap_err() {
Error(ErrorKind::DbError(::mentat_db::errors::ErrorKind::SchemaConstraintViolation(_)), _) => { },
x => panic!("expected schema constraint violation, got {:?}", x),
fn test_add_to_cache_failure_no_attribute() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
let _report = conn.transact(&mut sqlite, r#"[
{ :db/ident :foo/bar
:db/valueType :db.type/long },
{ :db/ident :foo/baz
:db/valueType :db.type/boolean }]"#).unwrap();
let kw = kw!(:foo/bat);
let schema = conn.current_schema();
let res = conn.cache(&mut sqlite, &schema, &kw, CacheDirection::Forward, CacheAction::Register);
match res.unwrap_err() {
Error(ErrorKind::UnknownAttribute(msg), _) => assert_eq!(msg, ":foo/bat"),
x => panic!("expected UnknownAttribute error, got {:?}", x),
// TODO expand tests to cover lookup_value_for_attribute comparing with and without caching
fn test_lookup_attribute_with_caching() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
let _report = conn.transact(&mut sqlite, r#"[
{ :db/ident :foo/bar
:db/valueType :db.type/long },
{ :db/ident :foo/baz
:db/valueType :db.type/boolean }]"#).expect("transaction expected to succeed");
let mut in_progress = conn.begin_transaction(&mut sqlite).expect("transaction");
for _ in 1..100 {
let _report = in_progress.transact(r#"[
{ :foo/bar 100
:foo/baz false },
{ :foo/bar 200
:foo/baz true },
{ :foo/bar 100
:foo/baz false },
{ :foo/bar 300
:foo/baz true },
{ :foo/bar 400
:foo/baz false },
{ :foo/bar 500
:foo/baz true }]"#).expect("transaction expected to succeed");
let entities = conn.q_once(&sqlite, r#"[:find ?e . :where [?e :foo/bar 400]]"#, None).expect("Expected query to work").into_scalar().expect("expected rel results");
let first = entities.expect("expected a result");
let entid = match first {
Binding::Scalar(TypedValue::Ref(entid)) => entid,
x => panic!("expected Some(Ref), got {:?}", x),
let kw = kw!(:foo/bar);
let start = Instant::now();
let uncached_val = conn.lookup_value_for_attribute(&sqlite, entid, &kw).expect("Expected value on lookup");
let finish = Instant::now();
let uncached_elapsed_time = finish.duration_since(start);
println!("Uncached time: {:?}", uncached_elapsed_time);
let schema = conn.current_schema();
conn.cache(&mut sqlite, &schema, &kw, CacheDirection::Forward, CacheAction::Register).expect("expected caching to work");
for _ in 1..5 {
let start = Instant::now();
let cached_val = conn.lookup_value_for_attribute(&sqlite, entid, &kw).expect("Expected value on lookup");
let finish = Instant::now();
let cached_elapsed_time = finish.duration_since(start);
assert_eq!(cached_val, uncached_val);
println!("Cached time: {:?}", cached_elapsed_time);
assert!(cached_elapsed_time < uncached_elapsed_time);
fn test_cache_usage() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
let db_ident = (*conn.current_schema()).get_entid(&kw!(:db/ident)).expect("db_ident").0;
let db_type = (*conn.current_schema()).get_entid(&kw!(:db/valueType)).expect("db_ident").0;
println!("db/ident is {}", db_ident);
println!("db/type is {}", db_type);
let query = format!("[:find ?ident . :where [?e {} :db/doc][?e {} ?type][?type {} ?ident]]",
db_ident, db_type, db_ident);
println!("Query is {}", query);
let mut ip = conn.begin_transaction(&mut sqlite).expect("began");
let ident = ip.q_once(query.as_str(), None).into_scalar_result().expect("query");
assert_eq!(ident, Some(TypedValue::typed_ns_keyword("db.type", "string").into()));
let start = time::PreciseTime::now();
ip.q_once(query.as_str(), None).into_scalar_result().expect("query");
let end = time::PreciseTime::now();
println!("Uncached took {}µs",;
ip.cache(&kw!(:db/ident), CacheDirection::Forward, CacheAction::Register).expect("registered");
ip.cache(&kw!(:db/valueType), CacheDirection::Forward, CacheAction::Register).expect("registered");
let ident = ip.q_once(query.as_str(), None).into_scalar_result().expect("query");
assert_eq!(ident, Some(TypedValue::typed_ns_keyword("db.type", "string").into()));
let start = time::PreciseTime::now();
ip.q_once(query.as_str(), None).into_scalar_result().expect("query");
let end = time::PreciseTime::now();
println!("Cached took {}µs",;
// If we roll back the change, our caching operations are also rolled back.
ip.rollback().expect("rolled back");
let mut ip = conn.begin_transaction(&mut sqlite).expect("began");
let ident = ip.q_once(query.as_str(), None).into_scalar_result().expect("query");
assert_eq!(ident, Some(TypedValue::typed_ns_keyword("db.type", "string").into()));
ip.cache(&kw!(:db/ident), CacheDirection::Forward, CacheAction::Register).expect("registered");
ip.cache(&kw!(:db/valueType), CacheDirection::Forward, CacheAction::Register).expect("registered");
ip.commit().expect("rolled back");
fn fixture_path(rest: &str) -> PathBuf {
let fixtures = Path::new("fixtures/");
fn test_prepared_query_with_cache() {
let mut store = Store::open("").expect("opened");
let mut in_progress = store.begin_transaction().expect("began");
in_progress.import(fixture_path("cities.schema")).expect("transacted schema");
in_progress.import(fixture_path("all_seattle.edn")).expect("transacted data");
in_progress.cache(&kw!(:neighborhood/district), CacheDirection::Forward, CacheAction::Register).expect("cache done");
in_progress.cache(&kw!(:district/name), CacheDirection::Forward, CacheAction::Register).expect("cache done");
in_progress.cache(&kw!(:neighborhood/name), CacheDirection::Reverse, CacheAction::Register).expect("cache done");
let query = r#"[:find ?district
:in ?hood
[?neighborhood :neighborhood/name ?hood]
[?neighborhood :neighborhood/district ?d]
[?d :district/name ?district]]"#;
let hood = "Beacon Hill";
let inputs = QueryInputs::with_value_sequence(vec![(var!(?hood), TypedValue::typed_string(hood).into())]);
let mut prepared = in_progress.q_prepare(query, inputs)
match &prepared {
&PreparedQuery::Constant { select: ref _select } => {},
_ => panic!(),
let start = time::PreciseTime::now();
let results ="results");
let end = time::PreciseTime::now();
println!("Prepared cache execution took {}µs",;
vec![vec![TypedValue::typed_string("Greater Duwamish")]].into());
trait StoreCache {
fn get_entid_for_value(&self, attr: Entid, val: &TypedValue) -> Option<Entid>;
fn is_attribute_cached_reverse(&self, attr: Entid) -> bool;
fn is_attribute_cached_forward(&self, attr: Entid) -> bool;
impl StoreCache for Store {
fn get_entid_for_value(&self, attr: Entid, val: &TypedValue) -> Option<Entid> {
let cache = self.conn.current_cache();
cache.get_entid_for_value(attr, val)
fn is_attribute_cached_forward(&self, attr: Entid) -> bool {
fn is_attribute_cached_reverse(&self, attr: Entid) -> bool {
fn test_cache_mutation() {
let mut store = Store::open("").expect("opened");
let mut in_progress = store.begin_transaction().expect("begun");
{ :db/ident :foo/bar
:db/cardinality :db.cardinality/one
:db/index true
:db/unique :db.unique/identity
:db/valueType :db.type/long },
{ :db/ident :foo/baz
:db/cardinality :db.cardinality/one
:db/valueType :db.type/boolean }
{ :db/ident :foo/x
:db/cardinality :db.cardinality/many
:db/valueType :db.type/long }]"#).expect("transact");
// Cache one….
in_progress.cache(&kw!(:foo/bar), CacheDirection::Reverse, CacheAction::Register).expect("cache done");
let foo_bar = store.conn.current_schema().get_entid(&kw!(:foo/bar)).expect("foo/bar").0;
let foo_baz = store.conn.current_schema().get_entid(&kw!(:foo/baz)).expect("foo/baz").0;
let foo_x = store.conn.current_schema().get_entid(&kw!(:foo/x)).expect("foo/x").0;
// … and cache the others via the store.
store.cache(&kw!(:foo/baz), CacheDirection::Both).expect("cache done");
store.cache(&kw!(:foo/x), CacheDirection::Forward).expect("cache done");
// Add some data.
let mut in_progress = store.begin_transaction().expect("begun");
{:foo/bar 15, :foo/baz false, :foo/x [1, 2, 3]}
{:foo/bar 99, :foo/baz true}
{:foo/bar -2, :foo/baz true}
// Data is in the cache.
let first = in_progress.cache.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
assert_eq!(in_progress.cache.get_value_for_entid(&in_progress.schema, foo_baz, first).expect("val"), &TypedValue::Boolean(false));
// All three values for :foo/x.
let all_three: BTreeSet<TypedValue> = in_progress.cache
.get_values_for_entid(&in_progress.schema, foo_x, first)
assert_eq!(all_three, vec![1, 2, 3].into_iter().map(TypedValue::Long).collect());
// Data is still in the cache.
let first = store.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
let cache: SQLiteAttributeCache = store.conn.current_cache();
assert_eq!(cache.get_value_for_entid(&store.conn.current_schema(), foo_baz, first).expect("val"), &TypedValue::Boolean(false));
let all_three: BTreeSet<TypedValue> = cache.get_values_for_entid(&store.conn.current_schema(), foo_x, first)
assert_eq!(all_three, vec![1, 2, 3].into_iter().map(TypedValue::Long).collect());
// We can remove data and the cache reflects it, immediately and after commit.
let mut in_progress = store.begin_transaction().expect("began");
let first = in_progress.cache.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
in_progress.transact(format!("[[:db/retract {} :foo/x 2]]", first).as_str()).expect("transact");
let only_two: BTreeSet<TypedValue> = in_progress.cache
.get_values_for_entid(&in_progress.schema, foo_x, first)
assert_eq!(only_two, vec![1, 3].into_iter().map(TypedValue::Long).collect());
// Rollback: unchanged.
let first = store.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
let cache: SQLiteAttributeCache = store.conn.current_cache();
assert_eq!(cache.get_value_for_entid(&store.conn.current_schema(), foo_baz, first).expect("val"), &TypedValue::Boolean(false));
let all_three: BTreeSet<TypedValue> = cache.get_values_for_entid(&store.conn.current_schema(), foo_x, first)
assert_eq!(all_three, vec![1, 2, 3].into_iter().map(TypedValue::Long).collect());
// Try again, but this time commit.
let mut in_progress = store.begin_transaction().expect("began");
let first = in_progress.cache.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
in_progress.transact(format!("[[:db/retract {} :foo/x 2]]", first).as_str()).expect("transact");
let first = store.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
let cache: SQLiteAttributeCache = store.conn.current_cache();
assert_eq!(cache.get_value_for_entid(&store.conn.current_schema(), foo_baz, first).expect("val"), &TypedValue::Boolean(false));
let only_two: BTreeSet<TypedValue> = cache.get_values_for_entid(&store.conn.current_schema(), foo_x, first)
assert_eq!(only_two, vec![1, 3].into_iter().map(TypedValue::Long).collect());
2018-03-09 12:18:11 +00:00
fn test_register_observer() {
let mut conn = Store::open("").unwrap();
2018-03-09 12:18:11 +00:00
let key = "Test Observer".to_string();
let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {});
conn.register_observer(key.clone(), Arc::new(tx_observer));
fn test_deregister_observer() {
let mut conn = Store::open("").unwrap();
2018-03-09 12:18:11 +00:00
let key = "Test Observer".to_string();
let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {});
conn.register_observer(key.clone(), Arc::new(tx_observer));
fn add_schema(conn: &mut Store) {
2018-03-09 12:18:11 +00:00
// transact some schema
let mut in_progress = conn.begin_transaction().expect("expected in progress");
2018-03-09 12:18:11 +00:00
2018-03-09 12:18:11 +00:00
2018-03-09 12:18:11 +00:00
2018-03-09 12:18:11 +00:00
2018-03-09 12:18:11 +00:00
2018-03-09 12:18:11 +00:00
)).expect("expected vocubulary");
2018-03-09 12:18:11 +00:00
in_progress.commit().expect("Expected vocabulary committed");
#[derive(Default, Debug)]
2018-03-09 12:18:11 +00:00
struct ObserverOutput {
txids: Vec<i64>,
changes: Vec<BTreeSet<i64>>,
called_key: Option<String>,
fn test_observer_notified_on_registered_change() {
let mut conn = Store::open("").unwrap();
add_schema(&mut conn);
2018-03-09 12:18:11 +00:00
let name_entid: Entid = conn.conn().current_schema().get_entid(&kw!(:todo/name)).expect("entid to exist for name").into();
let date_entid: Entid = conn.conn().current_schema().get_entid(&kw!(:todo/completion_date)).expect("entid to exist for completion_date").into();
2018-03-09 12:18:11 +00:00
let mut registered_attrs = BTreeSet::new();
let key = "Test Observing".to_string();
let output = Arc::new(Mutex::new(ObserverOutput::default()));
let mut_output = Arc::downgrade(&output);
let (tx, rx): (mpsc::Sender<()>, mpsc::Receiver<()>) = mpsc::channel();
// because the TxObserver is in an Arc and is therefore Sync, we have to wrap the Sender in a Mutex to also
// make it Sync.
let thread_tx = Mutex::new(tx);
2018-03-09 12:18:11 +00:00
let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| {
if let Some(out) = mut_output.upgrade() {
let mut o = out.lock().unwrap();
o.called_key = Some(obs_key.to_string());
for (tx_id, changes) in batch.into_iter() {
2018-03-09 12:18:11 +00:00
2018-03-09 12:18:11 +00:00
conn.register_observer(key.clone(), Arc::clone(&tx_observer));
let mut tx_ids = Vec::new();
let mut changesets = Vec::new();
Add type checking and constraint checking to the transactor. (#663, #532, #679) This should address #663, by re-inserting type checking in the transactor stack after the entry point used by the term builder. Before this commit, we were using an SQLite UNIQUE index to assert that no `[e a]` pair, with `a` a cardinality one attribute, was asserted more than once. However, that's not in line with Datomic, which treats transaction inputs as a set and allows a single datom like `[e a v]` to appear multiple times. It's both awkward and not particularly efficient to look for _distinct_ repetitions in SQL, so we accept some runtime cost in order to check for repetitions in the transactor. This will allow us to address #532, which is really about whether we treat inputs as sets. A side benefit is that we can provide more helpful error messages when the transactor does detect that the input truly violates the cardinality constraints of the schema. This commit builds a trie while error checking and collecting final terms, which should be fairly efficient. It also allows a simpler expression of input-provided :db/txInstant datoms, which in turn uncovered a small issue with the transaction watcher, where-by the watcher would not see non-input-provided :db/txInstant datoms. This transition to Datomic-like input-as-set semantics allows us to address #532. Previously, two tempids that upserted to the same entid would produce duplicate datoms, and that would have been rejected by the transactor -- correctly, since we did not allow duplicate datoms under the input-as-list semantics. With input-as-set semantics, duplicate datoms are allowed; and that means that we must allow tempids to be equivalent, i.e., to resolve to the same tempid. To achieve this, we: - index the set of tempids - identify tempid indices that share an upsert - map tempids to a dense set of contiguous integer labels We use the well-known union-find algorithm, as implemented by petgraph, to efficiently manage the set of equivalent tempids. Along the way, I've fixed and added tests for two small errors in the transactor. First, don't drop datoms resolved by upsert (#679). Second, ensure that complex upserts are allocated. I don't know quite what happened here. The Clojure implementation correctly kept complex upserts that hadn't resolved as complex upserts (see and then allocated complex upserts if they didn't resolve (see Based on the code comments, I think the Rust implementation must have incorrectly tried to optimize by handling all complex upserts in at most a single generation of evolution, and that's just not correct. We're effectively implementing a topological sort, using very specific domain knowledge, and its not true that a node in a topological sort can be considered only once!
2018-04-30 22:16:05 +00:00
let db_tx_instant_entid: Entid = conn.conn().current_schema().get_entid(&kw!(:db/txInstant)).expect("entid to exist for :db/txInstant").into();
let uuid_entid: Entid = conn.conn().current_schema().get_entid(&kw!(:todo/uuid)).expect("entid to exist for name").into();
2018-03-09 12:18:11 +00:00
let mut in_progress = conn.begin_transaction().expect("expected transaction");
2018-03-09 12:18:11 +00:00
for i in 0..3 {
let mut changeset = BTreeSet::new();
Add type checking and constraint checking to the transactor. (#663, #532, #679) This should address #663, by re-inserting type checking in the transactor stack after the entry point used by the term builder. Before this commit, we were using an SQLite UNIQUE index to assert that no `[e a]` pair, with `a` a cardinality one attribute, was asserted more than once. However, that's not in line with Datomic, which treats transaction inputs as a set and allows a single datom like `[e a v]` to appear multiple times. It's both awkward and not particularly efficient to look for _distinct_ repetitions in SQL, so we accept some runtime cost in order to check for repetitions in the transactor. This will allow us to address #532, which is really about whether we treat inputs as sets. A side benefit is that we can provide more helpful error messages when the transactor does detect that the input truly violates the cardinality constraints of the schema. This commit builds a trie while error checking and collecting final terms, which should be fairly efficient. It also allows a simpler expression of input-provided :db/txInstant datoms, which in turn uncovered a small issue with the transaction watcher, where-by the watcher would not see non-input-provided :db/txInstant datoms. This transition to Datomic-like input-as-set semantics allows us to address #532. Previously, two tempids that upserted to the same entid would produce duplicate datoms, and that would have been rejected by the transactor -- correctly, since we did not allow duplicate datoms under the input-as-list semantics. With input-as-set semantics, duplicate datoms are allowed; and that means that we must allow tempids to be equivalent, i.e., to resolve to the same tempid. To achieve this, we: - index the set of tempids - identify tempid indices that share an upsert - map tempids to a dense set of contiguous integer labels We use the well-known union-find algorithm, as implemented by petgraph, to efficiently manage the set of equivalent tempids. Along the way, I've fixed and added tests for two small errors in the transactor. First, don't drop datoms resolved by upsert (#679). Second, ensure that complex upserts are allocated. I don't know quite what happened here. The Clojure implementation correctly kept complex upserts that hadn't resolved as complex upserts (see and then allocated complex upserts if they didn't resolve (see Based on the code comments, I think the Rust implementation must have incorrectly tried to optimize by handling all complex upserts in at most a single generation of evolution, and that's just not correct. We're effectively implementing a topological sort, using very specific domain knowledge, and its not true that a node in a topological sort can be considered only once!
2018-04-30 22:16:05 +00:00
2018-03-09 12:18:11 +00:00
let name = format!("todo{}", i);
let uuid = Uuid::new_v4();
let mut builder = in_progress.builder().describe_tempid(&name);
builder.add_kw(&kw!(:todo/uuid), TypedValue::Uuid(uuid)).expect("Expected added uuid");
2018-03-09 12:18:11 +00:00
builder.add_kw(&kw!(:todo/name), TypedValue::typed_string(&name)).expect("Expected added name");
2018-03-09 12:18:11 +00:00
if i % 2 == 0 {
builder.add_kw(&kw!(:todo/completion_date), TypedValue::current_instant()).expect("Expected added date");
2018-03-09 12:18:11 +00:00
let (ip, r) = builder.transact();
let report = r.expect("expected a report");
2018-03-09 12:18:11 +00:00
in_progress = ip;
let mut builder = in_progress.builder().describe_tempid("Label");
builder.add_kw(&kw!(:label/name), TypedValue::typed_string("Label 1")).expect("Expected added name");
builder.add_kw(&kw!(:label/color), TypedValue::typed_string("blue")).expect("Expected added color");
builder.commit().expect("expect transaction to occur");
let delay = Duration::from_millis(100);
let _ = rx.recv_timeout(delay);
2018-03-09 12:18:11 +00:00
let out = Arc::try_unwrap(output).expect("unwrapped");
let o = out.into_inner().expect("Expected an Output");
assert_eq!(o.called_key, Some(key.clone()));
assert_eq!(o.txids, tx_ids);
assert_eq!(o.changes, changesets);
2018-03-09 12:18:11 +00:00
fn test_observer_not_notified_on_unregistered_change() {
let mut conn = Store::open("").unwrap();
add_schema(&mut conn);
2018-03-09 12:18:11 +00:00
let name_entid: Entid = conn.conn().current_schema().get_entid(&kw!(:todo/name)).expect("entid to exist for name").into();
let date_entid: Entid = conn.conn().current_schema().get_entid(&kw!(:todo/completion_date)).expect("entid to exist for completion_date").into();
2018-03-09 12:18:11 +00:00
let mut registered_attrs = BTreeSet::new();
let key = "Test Observing".to_string();
let output = Arc::new(Mutex::new(ObserverOutput::default()));
let mut_output = Arc::downgrade(&output);
let (tx, rx): (mpsc::Sender<()>, mpsc::Receiver<()>) = mpsc::channel();
let thread_tx = Mutex::new(tx);
2018-03-09 12:18:11 +00:00
let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| {
if let Some(out) = mut_output.upgrade() {
let mut o = out.lock().unwrap();
o.called_key = Some(obs_key.to_string());
for (tx_id, changes) in batch.into_iter() {
2018-03-09 12:18:11 +00:00
2018-03-09 12:18:11 +00:00
conn.register_observer(key.clone(), Arc::clone(&tx_observer));
let tx_ids = Vec::<Entid>::new();
let changesets = Vec::<BTreeSet<Entid>>::new();
let mut in_progress = conn.begin_transaction().expect("expected transaction");
2018-03-09 12:18:11 +00:00
for i in 0..3 {
let name = format!("label{}", i);
let mut builder = in_progress.builder().describe_tempid(&name);
builder.add_kw(&kw!(:label/name), TypedValue::typed_string(&name)).expect("Expected added name");
builder.add_kw(&kw!(:label/color), TypedValue::typed_string("blue")).expect("Expected added color");
let (ip, _) = builder.transact();
in_progress = ip;
let delay = Duration::from_millis(100);
let _ = rx.recv_timeout(delay);
2018-03-09 12:18:11 +00:00
let out = Arc::try_unwrap(output).expect("unwrapped");
let o = out.into_inner().expect("Expected an Output");
assert_eq!(o.called_key, None);
assert_eq!(o.txids, tx_ids);
assert_eq!(o.changes, changesets);
2018-03-09 12:18:11 +00:00