mentat/tolstoy/src/tx_uploader.rs

222 lines
8.3 KiB
Rust

// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::collections::HashMap;
use uuid::Uuid;
use core_traits::Entid;
use mentat_db::{PartitionMap, V1_PARTS};
use public_traits::errors::Result;
use crate::tx_processor::TxReceiver;
use crate::types::{GlobalTransactionLog, TxPart};
use crate::logger::d;
pub struct UploaderReport {
pub temp_uuids: HashMap<Entid, Uuid>,
pub head: Option<Uuid>,
}
pub(crate) struct TxUploader<'c> {
tx_temp_uuids: HashMap<Entid, Uuid>,
remote_client: &'c mut dyn GlobalTransactionLog,
remote_head: &'c Uuid,
rolling_temp_head: Option<Uuid>,
local_partitions: PartitionMap,
}
impl<'c> TxUploader<'c> {
pub fn new(
client: &'c mut dyn GlobalTransactionLog,
remote_head: &'c Uuid,
local_partitions: PartitionMap,
) -> TxUploader<'c> {
TxUploader {
tx_temp_uuids: HashMap::new(),
remote_client: client,
remote_head,
rolling_temp_head: None,
local_partitions,
}
}
}
/// Given a set of entids and a partition map, returns a new PartitionMap that would result from
/// expanding the partitions to fit the entids.
fn allocate_partition_map_for_entids<T>(entids: T, local_partitions: &PartitionMap) -> PartitionMap
where
T: Iterator<Item = Entid>,
{
let mut parts = HashMap::new();
for name in V1_PARTS.iter().map(|&(ref part, ..)| part.to_string()) {
// This shouldn't fail: locally-sourced partitions must be present within with V1_PARTS.
let p = local_partitions.get(&name).unwrap();
parts.insert(name, (p, p.clone()));
}
// For a given partition, set its index to one greater than the largest encountered entid within its partition space.
for entid in entids {
for (p, new_p) in parts.values_mut() {
if p.allows_entid(entid) && entid >= new_p.next_entid() {
new_p.set_next_entid(entid + 1);
}
}
}
let mut m = PartitionMap::default();
for (name, (_, new_p)) in parts {
m.insert(name, new_p);
}
m
}
impl<'c> TxReceiver<UploaderReport> for TxUploader<'c> {
fn tx<T>(&mut self, tx_id: Entid, datoms: &mut T) -> Result<()>
where
T: Iterator<Item = TxPart>,
{
// Yes, we generate a new UUID for a given Tx, even if we might
// already have one mapped locally. Pre-existing local mapping will
// be replaced if this sync succeeds entirely.
// If we're seeing this tx again, it implies that previous attempt
// to sync didn't update our local head. Something went wrong last time,
// and it's unwise to try to re-use these remote tx mappings.
// We just leave garbage txs to be GC'd on the server.
let tx_uuid = Uuid::new_v4();
self.tx_temp_uuids.insert(tx_id, tx_uuid);
let mut tx_chunks = vec![];
// TODO separate bits of network work should be combined into single 'future'
let mut datoms: Vec<TxPart> = datoms.collect();
// TODO this should live within a transaction, once server support is in place.
// For now, we're uploading the PartitionMap in transaction's first chunk.
datoms[0].partitions = Some(allocate_partition_map_for_entids(
datoms.iter().map(|d| d.e),
&self.local_partitions,
));
// Upload all chunks.
for datom in &datoms {
let datom_uuid = Uuid::new_v4();
tx_chunks.push(datom_uuid);
d(&format!("putting chunk: {:?}, {:?}", &datom_uuid, &datom));
// TODO switch over to CBOR once we're past debugging stuff.
// See https://github.com/mozilla/mentat/issues/570
// let cbor_val = serde_cbor::to_value(&datom)?;
// self.remote_client.put_chunk(&datom_uuid, &serde_cbor::ser::to_vec_sd(&cbor_val)?)?;
self.remote_client.put_chunk(&datom_uuid, &datom)?;
}
// Upload tx.
// NB: At this point, we may choose to update remote & local heads.
// Depending on how much we're uploading, and how unreliable our connection
// is, this might be a good thing to do to ensure we make at least some progress.
// Comes at a cost of possibly increasing racing against other clients.
let tx_parent = match self.rolling_temp_head {
Some(p) => p,
None => *self.remote_head,
};
d(&format!(
"putting transaction: {:?}, {:?}, {:?}",
&tx_uuid, &tx_parent, &tx_chunks
));
self.remote_client
.put_transaction(&tx_uuid, &tx_parent, &tx_chunks)?;
d(&format!("updating rolling head: {:?}", tx_uuid));
self.rolling_temp_head = Some(tx_uuid);
Ok(())
}
fn done(self) -> UploaderReport {
UploaderReport {
temp_uuids: self.tx_temp_uuids,
head: self.rolling_temp_head,
}
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use mentat_db::{Partition, V1_PARTS};
use crate::schema::{PARTITION_DB, PARTITION_TX, PARTITION_USER};
fn bootstrap_partition_map() -> PartitionMap {
V1_PARTS
.iter()
.map(|&(ref part, start, end, index, allow_excision)| {
(
part.to_string(),
Partition::new(start, end, index, allow_excision),
)
})
.collect()
}
#[test]
fn test_allocate_partition_map_for_entids() {
let bootstrap_map = bootstrap_partition_map();
// Empty list of entids should not allocate any space in partitions.
let entids: Vec<Entid> = vec![];
let no_op_map = allocate_partition_map_for_entids(entids.into_iter(), &bootstrap_map);
assert_eq!(bootstrap_map, no_op_map);
// Only user partition.
let entids = vec![65536];
let new_map = allocate_partition_map_for_entids(entids.into_iter(), &bootstrap_map);
assert_eq!(65537, new_map.get(PARTITION_USER).unwrap().next_entid());
// Other partitions are untouched.
assert_eq!(41, new_map.get(PARTITION_DB).unwrap().next_entid());
assert_eq!(268435456, new_map.get(PARTITION_TX).unwrap().next_entid());
// Only tx partition.
let entids = vec![268435666];
let new_map = allocate_partition_map_for_entids(entids.into_iter(), &bootstrap_map);
assert_eq!(268435667, new_map.get(PARTITION_TX).unwrap().next_entid());
// Other partitions are untouched.
assert_eq!(65536, new_map.get(PARTITION_USER).unwrap().next_entid());
assert_eq!(41, new_map.get(PARTITION_DB).unwrap().next_entid());
// Only DB partition.
let entids = vec![41];
let new_map = allocate_partition_map_for_entids(entids.into_iter(), &bootstrap_map);
assert_eq!(42, new_map.get(PARTITION_DB).unwrap().next_entid());
// Other partitions are untouched.
assert_eq!(65536, new_map.get(PARTITION_USER).unwrap().next_entid());
assert_eq!(268435456, new_map.get(PARTITION_TX).unwrap().next_entid());
// User and tx partitions.
let entids = vec![65537, 268435456];
let new_map = allocate_partition_map_for_entids(entids.into_iter(), &bootstrap_map);
assert_eq!(65538, new_map.get(PARTITION_USER).unwrap().next_entid());
assert_eq!(268435457, new_map.get(PARTITION_TX).unwrap().next_entid());
// DB partition is untouched.
assert_eq!(41, new_map.get(PARTITION_DB).unwrap().next_entid());
// DB, user and tx partitions.
let entids = vec![41, 65666, 268435457];
let new_map = allocate_partition_map_for_entids(entids.into_iter(), &bootstrap_map);
assert_eq!(65667, new_map.get(PARTITION_USER).unwrap().next_entid());
assert_eq!(268435458, new_map.get(PARTITION_TX).unwrap().next_entid());
assert_eq!(42, new_map.get(PARTITION_DB).unwrap().next_entid());
}
}