46c2a0801f
This should address #663, by re-inserting type checking in the transactor stack after the entry point used by the term builder. Before this commit, we were using an SQLite UNIQUE index to assert that no `[e a]` pair, with `a` a cardinality one attribute, was asserted more than once. However, that's not in line with Datomic, which treats transaction inputs as a set and allows a single datom like `[e a v]` to appear multiple times. It's both awkward and not particularly efficient to look for _distinct_ repetitions in SQL, so we accept some runtime cost in order to check for repetitions in the transactor. This will allow us to address #532, which is really about whether we treat inputs as sets. A side benefit is that we can provide more helpful error messages when the transactor does detect that the input truly violates the cardinality constraints of the schema. This commit builds a trie while error checking and collecting final terms, which should be fairly efficient. It also allows a simpler expression of input-provided :db/txInstant datoms, which in turn uncovered a small issue with the transaction watcher, where-by the watcher would not see non-input-provided :db/txInstant datoms. This transition to Datomic-like input-as-set semantics allows us to address #532. Previously, two tempids that upserted to the same entid would produce duplicate datoms, and that would have been rejected by the transactor -- correctly, since we did not allow duplicate datoms under the input-as-list semantics. With input-as-set semantics, duplicate datoms are allowed; and that means that we must allow tempids to be equivalent, i.e., to resolve to the same tempid. To achieve this, we: - index the set of tempids - identify tempid indices that share an upsert - map tempids to a dense set of contiguous integer labels We use the well-known union-find algorithm, as implemented by petgraph, to efficiently manage the set of equivalent tempids. Along the way, I've fixed and added tests for two small errors in the transactor. First, don't drop datoms resolved by upsert (#679). Second, ensure that complex upserts are allocated. I don't know quite what happened here. The Clojure implementation correctly kept complex upserts that hadn't resolved as complex upserts (see9a9dfb502a/src/common/datomish/transact.cljc (L436)
) and then allocated complex upserts if they didn't resolve (see9a9dfb502a/src/common/datomish/transact.cljc (L509)
). Based on the code comments, I think the Rust implementation must have incorrectly tried to optimize by handling all complex upserts in at most a single generation of evolution, and that's just not correct. We're effectively implementing a topological sort, using very specific domain knowledge, and its not true that a node in a topological sort can be considered only once!
157 lines
4.7 KiB
Rust
157 lines
4.7 KiB
Rust
// Copyright 2016 Mozilla
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
|
// this file except in compliance with the License. You may obtain a copy of the
|
|
// License at http://www.apache.org/licenses/LICENSE-2.0
|
|
// Unless required by applicable law or agreed to in writing, software distributed
|
|
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
|
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations under the License.
|
|
|
|
extern crate mentat;
|
|
extern crate mentat_core;
|
|
extern crate mentat_tolstoy;
|
|
|
|
use std::collections::BTreeMap;
|
|
|
|
use mentat::conn::Conn;
|
|
|
|
use mentat::new_connection;
|
|
use mentat_tolstoy::tx_processor::{
|
|
Processor,
|
|
TxReceiver,
|
|
TxPart,
|
|
};
|
|
use mentat_tolstoy::errors::Result;
|
|
use mentat_core::{
|
|
Entid,
|
|
TypedValue,
|
|
ValueType,
|
|
};
|
|
|
|
struct TxCountingReceiver {
|
|
pub tx_count: usize,
|
|
pub is_done: bool,
|
|
}
|
|
|
|
impl TxCountingReceiver {
|
|
fn new() -> TxCountingReceiver {
|
|
TxCountingReceiver {
|
|
tx_count: 0,
|
|
is_done: false,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl TxReceiver for TxCountingReceiver {
|
|
fn tx<T>(&mut self, _tx_id: Entid, _d: &mut T) -> Result<()>
|
|
where T: Iterator<Item=TxPart> {
|
|
self.tx_count = self.tx_count + 1;
|
|
Ok(())
|
|
}
|
|
|
|
fn done(&mut self) -> Result<()> {
|
|
self.is_done = true;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
struct TestingReceiver {
|
|
pub txes: BTreeMap<Entid, Vec<TxPart>>,
|
|
pub is_done: bool,
|
|
}
|
|
|
|
impl TestingReceiver {
|
|
fn new() -> TestingReceiver {
|
|
TestingReceiver {
|
|
txes: BTreeMap::new(),
|
|
is_done: false,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl TxReceiver for TestingReceiver {
|
|
fn tx<T>(&mut self, tx_id: Entid, d: &mut T) -> Result<()>
|
|
where T: Iterator<Item=TxPart> {
|
|
let datoms = self.txes.entry(tx_id).or_insert(vec![]);
|
|
datoms.extend(d);
|
|
Ok(())
|
|
}
|
|
|
|
fn done(&mut self) -> Result<()> {
|
|
self.is_done = true;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
fn assert_tx_datoms_count(receiver: &TestingReceiver, tx_num: usize, expected_datoms: usize) {
|
|
let tx = receiver.txes.keys().nth(tx_num).expect("first tx");
|
|
let datoms = receiver.txes.get(tx).expect("datoms");
|
|
assert_eq!(expected_datoms, datoms.len());
|
|
}
|
|
|
|
#[test]
|
|
fn test_reader() {
|
|
let mut c = new_connection("").expect("Couldn't open conn.");
|
|
let mut conn = Conn::connect(&mut c).expect("Couldn't open DB.");
|
|
{
|
|
let db_tx = c.transaction().expect("db tx");
|
|
// Don't inspect the bootstrap transaction, but we'd like to see it's there.
|
|
let mut receiver = TxCountingReceiver::new();
|
|
assert_eq!(false, receiver.is_done);
|
|
Processor::process(&db_tx, None, &mut receiver).expect("processor");
|
|
assert_eq!(true, receiver.is_done);
|
|
assert_eq!(1, receiver.tx_count);
|
|
}
|
|
|
|
let ids = conn.transact(&mut c, r#"[
|
|
[:db/add "s" :db/ident :foo/numba]
|
|
[:db/add "s" :db/valueType :db.type/long]
|
|
[:db/add "s" :db/cardinality :db.cardinality/one]
|
|
]"#).expect("successful transaction").tempids;
|
|
let numba_entity_id = ids.get("s").unwrap();
|
|
|
|
let bootstrap_tx;
|
|
{
|
|
let db_tx = c.transaction().expect("db tx");
|
|
// Expect to see one more transaction of four parts (one for tx datom itself).
|
|
let mut receiver = TestingReceiver::new();
|
|
Processor::process(&db_tx, None, &mut receiver).expect("processor");
|
|
|
|
println!("{:#?}", receiver);
|
|
|
|
assert_eq!(2, receiver.txes.keys().count());
|
|
assert_tx_datoms_count(&receiver, 1, 4);
|
|
|
|
bootstrap_tx = Some(*receiver.txes.keys().nth(0).expect("bootstrap tx"));
|
|
}
|
|
|
|
let ids = conn.transact(&mut c, r#"[
|
|
[:db/add "b" :foo/numba 123]
|
|
]"#).expect("successful transaction").tempids;
|
|
let asserted_e = ids.get("b").unwrap();
|
|
|
|
{
|
|
let db_tx = c.transaction().expect("db tx");
|
|
|
|
// Expect to see a single two part transaction
|
|
let mut receiver = TestingReceiver::new();
|
|
|
|
// Note that we're asking for the bootstrap tx to be skipped by the processor.
|
|
Processor::process(&db_tx, bootstrap_tx, &mut receiver).expect("processor");
|
|
|
|
assert_eq!(2, receiver.txes.keys().count());
|
|
assert_tx_datoms_count(&receiver, 1, 2);
|
|
|
|
// Inspect the transaction part.
|
|
let tx_id = receiver.txes.keys().nth(1).expect("tx");
|
|
let datoms = receiver.txes.get(tx_id).expect("datoms");
|
|
let part = datoms.iter().find(|&part| &part.e == asserted_e).expect("to find asserted datom");
|
|
|
|
assert_eq!(numba_entity_id, &part.a);
|
|
assert!(part.v.matches_type(ValueType::Long));
|
|
assert_eq!(TypedValue::Long(123), part.v);
|
|
assert_eq!(true, part.added);
|
|
}
|
|
}
|