Enforce partition integrity when setting its index r=nalexander

Timelines work starts to perform modifications on the partitions
that go beyond simple allocations. This change pre-emptively protects
partition integrity by asserting that index modifications are legal.
This commit is contained in:
Grisha Kruglov 2018-07-12 14:42:59 -07:00 committed by Grisha Kruglov
parent 38a92229d7
commit 6290cc9db2
4 changed files with 92 additions and 23 deletions

View file

@ -297,7 +297,7 @@ pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result<DB> {
// This is necessary: `transact` will only UPDATE parts, not INSERT them if they're missing. // This is necessary: `transact` will only UPDATE parts, not INSERT them if they're missing.
for (part, partition) in db.partition_map.iter() { for (part, partition) in db.partition_map.iter() {
// TODO: Convert "keyword" part to SQL using Value conversion. // TODO: Convert "keyword" part to SQL using Value conversion.
tx.execute("INSERT INTO parts (part, start, end, idx, allow_excision) VALUES (?, ?, ?, ?, ?)", &[part, &partition.start, &partition.end, &partition.index, &partition.allow_excision])?; tx.execute("INSERT INTO parts (part, start, end, idx, allow_excision) VALUES (?, ?, ?, ?, ?)", &[part, &partition.start, &partition.end, &partition.next_entid(), &partition.allow_excision])?;
} }
// TODO: return to transact_internal to self-manage the encompassing SQLite transaction. // TODO: return to transact_internal to self-manage the encompassing SQLite transaction.
@ -980,9 +980,11 @@ pub fn update_partition_map(conn: &rusqlite::Connection, partition_map: &Partiti
let s = format!("UPDATE parts SET idx = CASE {} ELSE idx END", let s = format!("UPDATE parts SET idx = CASE {} ELSE idx END",
repeat("WHEN part = ? THEN ?").take(partition_map.len()).join(" ")); repeat("WHEN part = ? THEN ?").take(partition_map.len()).join(" "));
// Lifetimes of temporary values make this building a slice of references annoying if we're
// using partition.next_entid() getter; instead, we peek into partition directly.
let params: Vec<&ToSql> = partition_map.iter().flat_map(|(name, partition)| { let params: Vec<&ToSql> = partition_map.iter().flat_map(|(name, partition)| {
once(name as &ToSql) once(name as &ToSql)
.chain(once(&partition.index as &ToSql)) .chain(once(&partition.next_entid_to_allocate as &ToSql))
}).collect(); }).collect();
// TODO: only cache the latest of these statements. Changing the set of partitions isn't // TODO: only cache the latest of these statements. Changing the set of partitions isn't
@ -1088,9 +1090,9 @@ impl PartitionMap {
pub(crate) fn allocate_entids(&mut self, partition: &str, n: usize) -> Range<i64> { pub(crate) fn allocate_entids(&mut self, partition: &str, n: usize) -> Range<i64> {
match self.get_mut(partition) { match self.get_mut(partition) {
Some(partition) => { Some(partition) => {
let idx = partition.index; let idx = partition.next_entid();
partition.index += n as i64; partition.set_next_entid(idx + n as i64);
idx..partition.index idx..partition.next_entid()
}, },
// This is a programming error. // This is a programming error.
None => panic!("Cannot allocate entid from unknown partition: {}", partition), None => panic!("Cannot allocate entid from unknown partition: {}", partition),

View file

@ -379,7 +379,7 @@ impl TestConn {
} }
pub fn last_tx_id(&self) -> Entid { pub fn last_tx_id(&self) -> Entid {
self.partition_map.get(&":db.part/tx".to_string()).unwrap().index - 1 self.partition_map.get(&":db.part/tx".to_string()).unwrap().next_entid() - 1
} }
pub fn last_transaction(&self) -> Datoms { pub fn last_transaction(&self) -> Datoms {
@ -415,7 +415,7 @@ impl TestConn {
// Add a fake partition to allow tests to do things like // Add a fake partition to allow tests to do things like
// [:db/add 111 :foo/bar 222] // [:db/add 111 :foo/bar 222]
{ {
let fake_partition = Partition { start: 100, end: 2000, index: 1000, allow_excision: true }; let fake_partition = Partition::new(100, 2000, 1000, true);
parts.insert(":db.part/fake".into(), fake_partition); parts.insert(":db.part/fake".into(), fake_partition);
} }

View file

@ -48,28 +48,41 @@ use errors;
#[cfg_attr(feature = "syncable", derive(Serialize,Deserialize))] #[cfg_attr(feature = "syncable", derive(Serialize,Deserialize))]
pub struct Partition { pub struct Partition {
/// The first entid in the partition. /// The first entid in the partition.
pub start: i64, pub start: Entid,
/// Maximum allowed entid in the partition. /// Maximum allowed entid in the partition.
pub end: i64, pub end: Entid,
/// The next entid to be allocated in the partition.
pub index: i64,
/// `true` if entids in the partition can be excised with `:db/excise`. /// `true` if entids in the partition can be excised with `:db/excise`.
pub allow_excision: bool, pub allow_excision: bool,
/// The next entid to be allocated in the partition.
/// Unless you must use this directly, prefer using provided setter and getter helpers.
pub(crate) next_entid_to_allocate: Entid,
} }
impl Partition { impl Partition {
pub fn new(start: i64, end: i64, index: i64, allow_excision: bool) -> Partition { pub fn new(start: Entid, end: Entid, next_entid_to_allocate: Entid, allow_excision: bool) -> Partition {
assert!(start <= index, "A partition represents a monotonic increasing sequence of entids."); assert!(
Partition { start, end, index, allow_excision } start <= next_entid_to_allocate && next_entid_to_allocate <= end,
"A partition represents a monotonic increasing sequence of entids."
);
Partition { start, end, next_entid_to_allocate, allow_excision }
} }
pub fn contains_entid(&self, e: i64) -> bool { pub fn contains_entid(&self, e: Entid) -> bool {
(e >= self.start) && (e < self.index) (e >= self.start) && (e < self.next_entid_to_allocate)
} }
pub fn allows_entid(&self, e: i64) -> bool { pub fn allows_entid(&self, e: Entid) -> bool {
(e >= self.start) && (e <= self.end) (e >= self.start) && (e <= self.end)
} }
pub fn next_entid(&self) -> Entid {
self.next_entid_to_allocate
}
pub fn set_next_entid(&mut self, e: Entid) {
assert!(self.allows_entid(e), "Partition index must be within its allocated space.");
self.next_entid_to_allocate = e;
}
} }
/// Map partition names to `Partition` instances. /// Map partition names to `Partition` instances.
@ -146,3 +159,57 @@ pub trait TransactableValue: Clone {
fn as_tempid(&self) -> Option<TempId>; fn as_tempid(&self) -> Option<TempId>;
} }
#[cfg(test)]
mod tests {
use super::Partition;
#[test]
#[should_panic(expected = "A partition represents a monotonic increasing sequence of entids.")]
fn test_partition_limits_sanity1() {
Partition::new(100, 1000, 1001, true);
}
#[test]
#[should_panic(expected = "A partition represents a monotonic increasing sequence of entids.")]
fn test_partition_limits_sanity2() {
Partition::new(100, 1000, 99, true);
}
#[test]
#[should_panic(expected = "Partition index must be within its allocated space.")]
fn test_partition_limits_boundary1() {
let mut part = Partition::new(100, 1000, 100, true);
part.set_next_entid(2000);
}
#[test]
#[should_panic(expected = "Partition index must be within its allocated space.")]
fn test_partition_limits_boundary2() {
let mut part = Partition::new(100, 1000, 100, true);
part.set_next_entid(1001);
}
#[test]
#[should_panic(expected = "Partition index must be within its allocated space.")]
fn test_partition_limits_boundary3() {
let mut part = Partition::new(100, 1000, 100, true);
part.set_next_entid(99);
}
#[test]
#[should_panic(expected = "Partition index must be within its allocated space.")]
fn test_partition_limits_boundary4() {
let mut part = Partition::new(100, 1000, 100, true);
part.set_next_entid(-100);
}
#[test]
fn test_partition_limits_boundary5() {
let mut part = Partition::new(100, 1000, 100, true);
part.set_next_entid(100); // First entid that's allowed.
part.set_next_entid(101); // Just after first.
part.set_next_entid(1000); // Last entid that's allowed.
part.set_next_entid(999); // Just before last.
}
}

View file

@ -533,7 +533,7 @@ impl<'a, 'c> InProgress<'a, 'c> {
} }
pub fn last_tx_id(&self) -> Entid { pub fn last_tx_id(&self) -> Entid {
self.partition_map[":db.part/tx"].index - 1 self.partition_map[":db.part/tx"].next_entid() - 1
} }
} }
@ -630,7 +630,7 @@ impl Conn {
// The mutex is taken during this entire method. // The mutex is taken during this entire method.
let metadata = self.metadata.lock().unwrap(); let metadata = self.metadata.lock().unwrap();
metadata.partition_map[":db.part/tx"].index - 1 metadata.partition_map[":db.part/tx"].next_entid() - 1
} }
/// Query the Mentat store, using the given connection and the current metadata. /// Query the Mentat store, using the given connection and the current metadata.
@ -884,7 +884,7 @@ mod tests {
// Let's find out the next ID that'll be allocated. We're going to try to collide with it // Let's find out the next ID that'll be allocated. We're going to try to collide with it
// a bit later. // a bit later.
let next = conn.metadata.lock().expect("metadata") let next = conn.metadata.lock().expect("metadata")
.partition_map[":db.part/user"].index; .partition_map[":db.part/user"].next_entid();
let t = format!("[[:db/add {} :db.schema/attribute \"tempid\"]]", next + 1); let t = format!("[[:db/add {} :db.schema/attribute \"tempid\"]]", next + 1);
match conn.transact(&mut sqlite, t.as_str()) { match conn.transact(&mut sqlite, t.as_str()) {
@ -908,7 +908,7 @@ mod tests {
let mut conn = Conn::connect(&mut sqlite).unwrap(); let mut conn = Conn::connect(&mut sqlite).unwrap();
// Let's find out the next ID that'll be allocated. We're going to try to collide with it. // Let's find out the next ID that'll be allocated. We're going to try to collide with it.
let next = conn.metadata.lock().expect("metadata").partition_map[":db.part/user"].index; let next = conn.metadata.lock().expect("metadata").partition_map[":db.part/user"].next_entid();
// If this were to be resolved, we'd get [:db/add 65537 :db.schema/attribute 65537], but // If this were to be resolved, we'd get [:db/add 65537 :db.schema/attribute 65537], but
// we should reject this, because the first ID was provided by the user! // we should reject this, because the first ID was provided by the user!
@ -931,9 +931,9 @@ mod tests {
} }
/// Return the entid that will be allocated to the next transacted tempid. /// Return the entid that will be allocated to the next transacted tempid.
fn get_next_entid(conn: &Conn) -> i64 { fn get_next_entid(conn: &Conn) -> Entid {
let partition_map = &conn.metadata.lock().unwrap().partition_map; let partition_map = &conn.metadata.lock().unwrap().partition_map;
partition_map.get(":db.part/user").unwrap().index partition_map.get(":db.part/user").unwrap().next_entid()
} }
#[test] #[test]