mentat/tolstoy/src/syncer.rs
2021-08-22 17:41:50 -04:00

901 lines
35 KiB
Rust

// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::fmt;
use std::collections::HashSet;
use uuid::Uuid;
use core_traits::{Entid, KnownEntid, TypedValue};
use edn::entities::{EntityPlace, LookupRef, TxFunction};
use edn::PlainSymbol;
use mentat_db::{entids, timelines, PartitionMap, CORE_SCHEMA_VERSION};
use mentat_transaction::{InProgress, Queryable, TermBuilder};
use mentat_transaction::entity_builder::BuildTerms;
use mentat_transaction::query::{QueryInputs, Variable};
use crate::bootstrap::BootstrapHelper;
use public_traits::errors::Result;
use crate::metadata::{PartitionsTable, SyncMetadata};
use crate::schema::ensure_current_version;
use crate::tx_mapper::TxMapper;
use crate::tx_processor::{Processor, TxReceiver};
use crate::tx_uploader::TxUploader;
use crate::types::{GlobalTransactionLog, LocalTx, Tx, TxPart};
use tolstoy_traits::errors::TolstoyError;
use crate::logger::d;
use crate::syncer;
pub struct Syncer {}
#[derive(Debug, PartialEq, Clone)]
pub enum SyncFollowup {
None,
FullSync,
}
impl fmt::Display for SyncFollowup {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SyncFollowup::None => write!(f, "None"),
SyncFollowup::FullSync => write!(f, "Full sync"),
}
}
}
#[derive(Debug, PartialEq, Clone)]
pub enum SyncReport {
IncompatibleRemoteBootstrap(i64, i64),
BadRemoteState(String),
NoChanges,
RemoteFastForward,
LocalFastForward,
Merge(SyncFollowup),
}
pub enum SyncResult {
Atomic(SyncReport),
NonAtomic(Vec<SyncReport>),
}
impl fmt::Display for SyncReport {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SyncReport::IncompatibleRemoteBootstrap(local, remote) => write!(
f,
"Incompatible remote bootstrap transaction version. Local: {}, remote: {}.",
local, remote
),
SyncReport::BadRemoteState(err) => write!(f, "Bad remote state: {}", err),
SyncReport::NoChanges => write!(f, "Neither local nor remote have any new changes"),
SyncReport::RemoteFastForward => write!(f, "Fast-forwarded remote"),
SyncReport::LocalFastForward => write!(f, "Fast-forwarded local"),
SyncReport::Merge(follow_up) => write!(
f,
"Merged local and remote, requesting a follow-up: {}",
follow_up
),
}
}
}
impl fmt::Display for SyncResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SyncResult::Atomic(report) => write!(f, "Single atomic sync: {}", report),
SyncResult::NonAtomic(reports) => {
writeln!(f, "Series of atomic syncs ({})", reports.len())?;
for report in reports {
writeln!(f, "{}", report)?;
}
writeln!(f, "\\o/")
}
}
}
}
#[derive(Debug, PartialEq)]
enum SyncAction {
NoOp,
// TODO this is the same as remote fast-forward from local root.
// It's currently distinguished from remote fast-forward for a more
// path through the "first-sync against non-empty remote" flow.
PopulateRemote,
RemoteFastForward,
LocalFastForward,
// Generic name since we might merge, or rebase, or do something else.
CombineChanges,
}
/// Represents remote state relative to previous sync.
/// On first sync, it's always "Changed" unless remote is "Empty".
pub enum RemoteDataState {
Empty,
Changed,
Unchanged,
}
/// Remote state is expressed in terms of what "remote head" actually is,
/// and what we think it is.
impl<'a> From<(&'a Uuid, &'a Uuid)> for RemoteDataState {
fn from((known_remote_head, actual_remote_head): (&Uuid, &Uuid)) -> RemoteDataState {
if *actual_remote_head == Uuid::nil() {
RemoteDataState::Empty
} else if actual_remote_head != known_remote_head {
RemoteDataState::Changed
} else {
RemoteDataState::Unchanged
}
}
}
/// Represents local state relative to previous sync.
/// On first sync it's always "Changed".
/// Local client can't be empty: there's always at least a bootstrap transaction.
pub enum LocalDataState {
Changed,
Unchanged,
}
/// Local state is expressed in terms of presence of a "mapping" for the local head.
/// Presence of a mapping means that we've uploaded our local head,
/// indicating that there's no local changes.
/// Absence of a mapping indicates that local head hasn't been uploaded
/// and that we have local changes.
impl From<Option<Uuid>> for LocalDataState {
fn from(mapped_local_head: Option<Uuid>) -> LocalDataState {
match mapped_local_head {
Some(_) => LocalDataState::Unchanged,
None => LocalDataState::Changed,
}
}
}
// TODO rename this thing.
pub struct LocalTxSet {
txs: Vec<LocalTx>,
}
impl LocalTxSet {
pub fn new() -> LocalTxSet {
LocalTxSet { txs: vec![] }
}
}
impl Default for syncer::LocalTxSet {
fn default() -> Self {
Self::new()
}
}
impl TxReceiver<Vec<LocalTx>> for LocalTxSet {
fn tx<T>(&mut self, tx_id: Entid, datoms: &mut T) -> Result<()>
where
T: Iterator<Item = TxPart>,
{
self.txs.push(LocalTx {
tx: tx_id,
parts: datoms.collect(),
});
Ok(())
}
fn done(self) -> Vec<LocalTx> {
self.txs
}
}
impl Syncer {
/// Produces a SyncAction based on local and remote states.
fn what_do(remote_state: RemoteDataState, local_state: LocalDataState) -> SyncAction {
match remote_state {
RemoteDataState::Empty => SyncAction::PopulateRemote,
RemoteDataState::Changed => match local_state {
LocalDataState::Changed => SyncAction::CombineChanges,
LocalDataState::Unchanged => SyncAction::LocalFastForward,
},
RemoteDataState::Unchanged => match local_state {
LocalDataState::Changed => SyncAction::RemoteFastForward,
LocalDataState::Unchanged => SyncAction::NoOp,
},
}
}
/// Upload local txs: (from_tx, HEAD]. Remote head is necessary here because we need to specify
/// "parent" for each transaction we'll upload; remote head will be first transaction's parent.
fn fast_forward_remote<R>(
db_tx: &mut rusqlite::Transaction<'_>,
from_tx: Option<Entid>,
remote_client: &mut R,
remote_head: &Uuid,
) -> Result<()>
where
R: GlobalTransactionLog,
{
// TODO consider moving head manipulations into uploader?
let report;
// Scope to avoid double-borrowing mutable remote_client.
{
// Prepare an uploader.
let uploader = TxUploader::new(
remote_client,
remote_head,
SyncMetadata::get_partitions(db_tx, PartitionsTable::Tolstoy)?,
);
// Walk the local transactions in the database and upload them.
report = Processor::process(db_tx, from_tx, uploader)?;
}
if let Some(last_tx_uploaded) = report.head {
// Upload remote head.
remote_client.set_head(&last_tx_uploaded)?;
// On success:
// - persist local mappings from the receiver
// - update our local "remote head".
TxMapper::set_lg_mappings(
db_tx,
report
.temp_uuids
.iter()
.map(|v| (*v.0, v.1).into())
.collect(),
)?;
SyncMetadata::set_remote_head(db_tx, &last_tx_uploaded)?;
}
Ok(())
}
fn local_tx_for_uuid(db_tx: &rusqlite::Transaction<'_>, uuid: &Uuid) -> Result<Entid> {
match TxMapper::get_tx_for_uuid(db_tx, uuid)? {
Some(t) => Ok(t),
None => bail!(TolstoyError::TxIncorrectlyMapped(0)),
}
}
fn remote_parts_to_builder(builder: &mut TermBuilder, parts: Vec<TxPart>) -> Result<()> {
for part in parts {
let e: EntityPlace<TypedValue>;
let a = KnownEntid(part.a);
let v = part.v;
// Instead of providing a 'txInstant' datom directly, we map it
// into a (transaction-tx) style assertion.
// Transactor knows how to pick out a txInstant value out of these
// assertions and use that value for the generated transaction's txInstant.
if part.a == entids::DB_TX_INSTANT {
e = EntityPlace::TxFunction(TxFunction {
op: PlainSymbol("transaction-tx".to_string()),
});
} else {
e = KnownEntid(part.e).into();
}
if part.added {
builder.add(e, a, v)?;
} else {
builder.retract(e, a, v)?;
}
}
Ok(())
}
/// In context of a "transaction to be applied", a PartitionMap supplied here
/// represents what a PartitionMap will be once this transaction is applied.
/// This works well for regular assertions: entids are supplied, and we need
/// them to be allocated in the user partition space.
/// However, we need to decrement 'tx' partition's index, so that the transactor's
/// allocated tx will match what came off the wire.
/// N.B.: this depends on absence of holes in the 'tx' partition!
fn rewind_tx_partition_by_one(partition_map: &mut PartitionMap) -> Result<()> {
if let Some(tx_part) = partition_map.get_mut(":db.part/tx") {
assert_eq!(false, tx_part.allow_excision); // Sanity check.
let next_entid = tx_part.next_entid() - 1;
tx_part.set_next_entid(next_entid);
Ok(())
} else {
bail!(TolstoyError::BadRemoteState(
"Missing tx partition in an incoming transaction".to_string()
));
}
}
fn fast_forward_local<'a, 'c>(
in_progress: &mut InProgress<'a, 'c>,
txs: Vec<Tx>,
) -> Result<SyncReport> {
let mut last_tx = None;
for tx in txs {
let mut builder = TermBuilder::new();
// TODO both here and in the merge scenario we're doing the same thing with the partition maps
// and with the txInstant datom rewriting.
// Figure out how to combine these operations into a resuable primitive(s).
// See notes in 'merge' for why we're doing this stuff.
let mut partition_map = match tx.parts[0].partitions.clone() {
Some(parts) => parts,
None => {
return Ok(SyncReport::BadRemoteState(
"Missing partition map in incoming transaction".to_string(),
));
}
};
// Make space in the provided tx partition for the transaction we're about to create.
// See function's notes for details.
Syncer::rewind_tx_partition_by_one(&mut partition_map)?;
Syncer::remote_parts_to_builder(&mut builder, tx.parts)?;
// Allocate space for the incoming entids.
in_progress.partition_map = partition_map;
let report = in_progress.transact_builder(builder)?;
last_tx = Some((report.tx_id, tx.tx));
}
// We've just transacted a new tx, and generated a new tx entid. Map it to the corresponding
// incoming tx uuid, advance our "locally known remote head".
if let Some((entid, uuid)) = last_tx {
SyncMetadata::set_remote_head_and_map(
&mut in_progress.transaction,
(entid, &uuid).into(),
)?;
}
Ok(SyncReport::LocalFastForward)
}
fn merge(
ip: &mut InProgress<'_, '_>,
incoming_txs: Vec<Tx>,
mut local_txs_to_merge: Vec<LocalTx>,
) -> Result<SyncReport> {
d(&"Rewinding local transactions.".to_string());
// 1) Rewind local to shared root.
local_txs_to_merge.sort(); // TODO sort at the interface level?
let (new_schema, new_partition_map) = timelines::move_from_main_timeline(
&ip.transaction,
&ip.schema,
ip.partition_map.clone(),
local_txs_to_merge[0].tx..,
// A poor man's parent reference. This might be brittle, although
// excisions are prohibited in the 'tx' partition, so this should hold...
local_txs_to_merge[0].tx - 1,
)?;
if let Some(schema) = new_schema {
ip.schema = schema
};
ip.partition_map = new_partition_map;
// 2) Transact incoming.
// 2.1) Prepare remote tx tuples (TermBuilder, PartitionMap, Uuid), which represent
// a remote transaction, its global identifier and partitions after it's applied.
d(&"Transacting incoming...".to_string());
let mut builders = vec![];
for remote_tx in incoming_txs {
let mut builder = TermBuilder::new();
let partition_map = match remote_tx.parts[0].partitions.clone() {
Some(parts) => parts,
None => {
return Ok(SyncReport::BadRemoteState(
"Missing partition map in incoming transaction".to_string(),
));
}
};
Syncer::remote_parts_to_builder(&mut builder, remote_tx.parts)?;
builders.push((builder, partition_map, remote_tx.tx));
}
let mut remote_report = None;
for (builder, mut partition_map, remote_tx) in builders {
// Make space in the provided tx partition for the transaction we're about to create.
// See function's notes for details.
Syncer::rewind_tx_partition_by_one(&mut partition_map)?;
// This allocates our incoming entids in each builder,
// letting us just use KnownEntid in the builders.
ip.partition_map = partition_map;
remote_report = Some((ip.transact_builder(builder)?.tx_id, remote_tx));
}
d(&"Transacting local on top of incoming...".to_string());
// 3) Rebase local transactions on top of remote.
let mut clean_rebase = true;
for local_tx in local_txs_to_merge {
let mut builder = TermBuilder::new();
// This is the beginnings of entity merging.
// An entid might be already known to the Schema, or it
// might be allocated in this transaction.
// In the former case, refer to it verbatim.
// In the latter case, rewrite it as a tempid, and let the transactor allocate it.
let mut entids_that_will_allocate = HashSet::new();
// We currently support "strict schema merging": we'll smush attribute definitions,
// but only if they're the same.
// e.g. prohibited would be defining different cardinality for the same attribute.
// Defining new attributes is allowed if:
// - attribute is defined either on local or remote,
// - attribute is defined on both local and remote in the same way.
// Modifying an attribute is currently not supported (requires higher order schema migrations).
// Note that "same" local and remote attributes might have different entids in the
// two sets of transactions.
// Set of entities that may alter "installed" attribute.
// Since this is a rebase of local on top of remote, an "installed"
// attribute might be one that was present in the root, or one that was
// defined by remote.
let mut might_alter_installed_attributes = HashSet::new();
// Set of entities that describe a new attribute, not present in the root
// or on the remote.
let mut will_not_alter_installed_attribute = HashSet::new();
// Note that at this point, remote and local have flipped - we're transacting
// local on top of incoming (which are already in the schema).
// Go through local datoms, and classify any schema-altering entids into
// one of the two sets above.
for part in &local_tx.parts {
// If we have an ident definition locally, check if remote
// already defined this ident. If it did, we'll need to ensure
// both local and remote are defining it in the same way.
if part.a == entids::DB_IDENT {
match part.v {
TypedValue::Keyword(ref local_kw) => {
// Remote did not define this ident. Make a note of it,
// so that we'll know to ignore its attribute datoms.
if !ip.schema.ident_map.contains_key(local_kw) {
will_not_alter_installed_attribute.insert(part.e);
// Otherwise, we'll need to ensure we have the same attribute definition
// for it.
} else {
might_alter_installed_attributes.insert(part.e);
}
}
_ => panic!("programming error: wrong value type for a local ident"),
}
} else if entids::is_a_schema_attribute(part.a)
&& !will_not_alter_installed_attribute.contains(&part.e)
{
might_alter_installed_attributes.insert(part.e);
}
}
for part in &local_tx.parts {
match part.a {
// We'll be ignoring this datom later on (to be generated by the transactor).
// During a merge we're concerned with entities in the "user" partition,
// while this falls into the "tx" partition.
// We have preserved the original txInstant value on the alternate timeline.
entids::DB_TX_INSTANT => continue,
// 'e's will be replaced with tempids, letting transactor handle everything.
// Non-unique entities are "duplicated". Unique entities are upserted.
_ => {
// Retractions never allocated tempids in the transactor.
if part.added {
entids_that_will_allocate.insert(part.e);
}
}
}
}
// :db/ident is a db.unique/identity attribute, which means transactor will upsert
// attribute assertions. E.g. if a new attribute was defined on local and not on remote,
// it will be inserted. If both local and remote defined the same attribute
// with different entids, we'll converge and use remote's entid.
// Same follows for other types of db.unique/identity attributes.
// If user-defined attribute is db.unique/identity, we'll "smush" local and remote
// assertions against it.
// For example, {:p/name "Grisha"} assertion on local and
// {:p/name "Grisha"} assertion on remote will result in a single entity.
// If user-defined attribute is not unique, however, no smushing will be performed.
// The above example will result in two entities.
for part in local_tx.parts {
// Skip the "tx instant" datom: it will be generated by our transactor.
// We don't care about preserving exact state of these datoms: they're already
// stashed away on the timeline we've created above.
if part.a == entids::DB_TX_INSTANT {
continue;
}
let e: EntityPlace<TypedValue>;
let a = KnownEntid(part.a);
let v = part.v;
// Rewrite entids if they will allocate (see entity merging notes above).
if entids_that_will_allocate.contains(&part.e) {
e = builder.named_tempid(format!("{}", part.e)).into();
// Otherwise, refer to existing entities.
} else {
e = KnownEntid(part.e).into();
}
// TODO we need to do the same rewriting for part.v if it's a Ref.
// N.b.: attribute can't refer to an unallocated entity, so it's always a KnownEntid.
// To illustrate, this is not a valid transaction, and will fail ("no entid found for ident: :person/name"):
// [
// {:db/ident :person/name :db/valueType :db.type/string :db/cardinality :db.cardinality/one}
// {:person/name "Grisha"}
// ]
// One would need to split that transaction into two,
// at which point :person/name will refer to an allocated entity.
match part.added {
true => builder.add(e, a, v)?,
false => {
if entids_that_will_allocate.contains(&part.e) {
builder.retract(e, a, v)?;
continue;
}
// TODO handle tempids in ValuePlace, as well.
// Retractions with non-upserting tempids are not currently supported.
// We work around this by using a lookup-ref instead of the entity tempid.
// However:
// - lookup-ref can only be used for attributes which are :db/unique,
// - a lookup-ref must resolve. If it doesn't, our transaction will fail.
// And so:
// - we skip retractions of non-unique attributes,
// - we "pre-run" a lookup-ref to ensure it will resolve,
// and skip the retraction otherwise.
match ip.schema.attribute_map.get(&part.a) {
Some(attributes) => {
// A lookup-ref using a non-unique attribute will fail.
// Skip this retraction, since we can't make sense of it.
if attributes.unique.is_none() {
continue;
}
}
None => panic!(
"programming error: missing attribute map for a known attribute"
),
}
// TODO prepare a query and re-use it for all retractions of this type
let pre_lookup = ip.q_once(
"[:find ?e . :in ?a ?v :where [?e ?a ?v]]",
QueryInputs::with_value_sequence(vec![
(Variable::from_valid_name("?a"), a.into()),
(Variable::from_valid_name("?v"), v.clone()),
]),
)?;
if pre_lookup.is_empty() {
continue;
}
// TODO just use the value from the query instead of doing _another_ lookup-ref!
builder.retract(
EntityPlace::LookupRef(LookupRef {
a: a.into(),
v: v.clone(),
}),
a,
v,
)?;
}
}
}
// After all these checks, our builder might be empty: short-circuit.
if builder.is_empty() {
continue;
}
d(&"Savepoint before transacting a local tx...".to_string());
ip.savepoint("speculative_local")?;
d(&format!(
"Transacting builder filled with local txs... {:?}",
builder
));
let report = ip.transact_builder(builder)?;
// Let's check that we didn't modify any schema attributes.
// Our current attribute map in the schema isn't rich enough to allow
// for this check: it's missing a notion of "attribute absence" - we can't
// distinguish between a missing attribute and a default value.
// Instead, we simply query the database, checking if transaction produced
// any schema-altering datoms.
for e in might_alter_installed_attributes.iter() {
if let Some(resolved_e) = report.tempids.get(&format!("{}", e)) {
if SyncMetadata::has_entity_assertions_in_tx(
&ip.transaction,
*resolved_e,
report.tx_id,
)? {
bail!(TolstoyError::NotYetImplemented(
"Can't sync with schema alterations yet.".to_string()
));
}
}
}
if !SyncMetadata::is_tx_empty(&ip.transaction, report.tx_id)? {
d(&format!("tx {} is not a no-op", report.tx_id));
clean_rebase = false;
ip.release_savepoint("speculative_local")?;
} else {
d(&format!(
"Applied tx {} as a no-op. Rolling back the savepoint (empty tx clean-up).",
report.tx_id
));
ip.rollback_savepoint("speculative_local")?;
}
}
// TODO
// At this point, we've rebased local transactions on top of remote.
// This would be a good point to create a "merge commit" and upload our loosing timeline.
// Since we don't upload during a merge (instead, we request a follow-up sync),
// set the locally known remote HEAD to what we received from the 'remote'.
if let Some((entid, uuid)) = remote_report {
SyncMetadata::set_remote_head_and_map(&mut ip.transaction, (entid, &uuid).into())?;
}
// If necessary, request a full sync as a follow-up to fast-forward remote.
if clean_rebase {
Ok(SyncReport::Merge(SyncFollowup::None))
} else {
Ok(SyncReport::Merge(SyncFollowup::FullSync))
}
}
fn first_sync_against_non_empty<R>(
ip: &mut InProgress<'_, '_>,
remote_client: &R,
local_metadata: &SyncMetadata,
) -> Result<SyncReport>
where
R: GlobalTransactionLog,
{
d(&"remote non-empty on first sync, adopting remote state.".to_string());
// 1) Download remote transactions.
let incoming_txs = remote_client.transactions_after(&Uuid::nil())?;
if incoming_txs.is_empty() {
return Ok(SyncReport::BadRemoteState(
"Remote specified non-root HEAD but gave no transactions".to_string(),
));
}
// 2) Process remote bootstrap.
let remote_bootstrap = &incoming_txs[0];
let local_bootstrap = local_metadata.root;
let bootstrap_helper = BootstrapHelper::new(remote_bootstrap);
if !bootstrap_helper.is_compatible()? {
return Ok(SyncReport::IncompatibleRemoteBootstrap(
CORE_SCHEMA_VERSION as i64,
bootstrap_helper.core_schema_version()?,
));
}
d(&format!(
"mapping incoming bootstrap tx uuid to local bootstrap entid: {} -> {}",
remote_bootstrap.tx, local_bootstrap
));
// Map incoming bootstrap tx uuid to local bootstrap entid.
// If there's more work to do, we'll move the head again.
SyncMetadata::set_remote_head_and_map(
&mut ip.transaction,
(local_bootstrap, &remote_bootstrap.tx).into(),
)?;
// 3) Determine new local and remote data states, now that bootstrap has been dealt with.
let remote_state = if incoming_txs.len() > 1 {
RemoteDataState::Changed
} else {
RemoteDataState::Unchanged
};
let local_state = if local_metadata.root != local_metadata.head {
LocalDataState::Changed
} else {
LocalDataState::Unchanged
};
// 4) The rest of this flow isn't that special anymore.
// Since we've "merged" with the remote bootstrap, the "no-op" and
// "local fast-forward" cases are reported as merges.
match Syncer::what_do(remote_state, local_state) {
SyncAction::NoOp => Ok(SyncReport::Merge(SyncFollowup::None)),
SyncAction::PopulateRemote => {
// This is a programming error.
bail!(TolstoyError::UnexpectedState(
"Remote state can't be empty on first sync against non-empty remote"
.to_string()
))
}
SyncAction::RemoteFastForward => {
bail!(TolstoyError::NotYetImplemented("TODO fast-forward remote on first sync when remote is just bootstrap and local has more".to_string()))
}
SyncAction::LocalFastForward => {
Syncer::fast_forward_local(ip, incoming_txs[1..].to_vec())?;
Ok(SyncReport::Merge(SyncFollowup::None))
}
SyncAction::CombineChanges => {
let local_txs = Processor::process(
&ip.transaction,
Some(local_metadata.root),
LocalTxSet::new(),
)?;
Syncer::merge(ip, incoming_txs[1..].to_vec(), local_txs)
}
}
}
pub fn sync<R>(ip: &mut InProgress<'_, '_>, remote_client: &mut R) -> Result<SyncReport>
where
R: GlobalTransactionLog,
{
d(&"sync flowing".to_string());
ensure_current_version(&mut ip.transaction)?;
let remote_head = remote_client.head()?;
d(&format!("remote head {:?}", remote_head));
let locally_known_remote_head = SyncMetadata::remote_head(&ip.transaction)?;
d(&format!("local head {:?}", locally_known_remote_head));
let (root, head) = SyncMetadata::root_and_head_tx(&ip.transaction)?;
let local_metadata = SyncMetadata::new(root, head);
// impl From ... vs ::new() calls to constuct "state" objects?
let local_state = TxMapper::get(&ip.transaction, local_metadata.head)?.into();
let remote_state = (&locally_known_remote_head, &remote_head).into();
// Currently, first sync against a non-empty remote is special.
if locally_known_remote_head == Uuid::nil() && remote_head != Uuid::nil() {
return Syncer::first_sync_against_non_empty(ip, remote_client, &local_metadata);
}
match Syncer::what_do(remote_state, local_state) {
SyncAction::NoOp => {
d(&"local HEAD did not move. Nothing to do!".to_string());
Ok(SyncReport::NoChanges)
}
SyncAction::PopulateRemote => {
d(&"empty remote!".to_string());
Syncer::fast_forward_remote(
&mut ip.transaction,
None,
remote_client,
&remote_head,
)?;
Ok(SyncReport::RemoteFastForward)
}
SyncAction::RemoteFastForward => {
d(&"local HEAD moved.".to_string());
let upload_from_tx =
Syncer::local_tx_for_uuid(&ip.transaction, &locally_known_remote_head)?;
d(&"Fast-forwarding the remote.".to_string());
// TODO it's possible that we've successfully advanced remote head previously,
// but failed to advance our own local head. If that's the case, and we can recognize it,
// our sync becomes just bumping our local head. AFAICT below would currently fail.
Syncer::fast_forward_remote(
&mut ip.transaction,
Some(upload_from_tx),
remote_client,
&remote_head,
)?;
Ok(SyncReport::RemoteFastForward)
}
SyncAction::LocalFastForward => {
d(&"fast-forwarding local store.".to_string());
Syncer::fast_forward_local(
ip,
remote_client.transactions_after(&locally_known_remote_head)?,
)?;
Ok(SyncReport::LocalFastForward)
}
SyncAction::CombineChanges => {
d(&"combining changes from local and remote stores.".to_string());
// Get the starting point for out local set of txs to merge.
let combine_local_from_tx =
Syncer::local_tx_for_uuid(&ip.transaction, &locally_known_remote_head)?;
let local_txs = Processor::process(
&ip.transaction,
Some(combine_local_from_tx),
LocalTxSet::new(),
)?;
// Merge!
Syncer::merge(
ip,
// Remote txs to merge...
remote_client.transactions_after(&locally_known_remote_head)?,
// ... with the local txs.
local_txs,
)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_what_do() {
assert_eq!(
SyncAction::PopulateRemote,
Syncer::what_do(RemoteDataState::Empty, LocalDataState::Unchanged)
);
assert_eq!(
SyncAction::PopulateRemote,
Syncer::what_do(RemoteDataState::Empty, LocalDataState::Changed)
);
assert_eq!(
SyncAction::NoOp,
Syncer::what_do(RemoteDataState::Unchanged, LocalDataState::Unchanged)
);
assert_eq!(
SyncAction::RemoteFastForward,
Syncer::what_do(RemoteDataState::Unchanged, LocalDataState::Changed)
);
assert_eq!(
SyncAction::LocalFastForward,
Syncer::what_do(RemoteDataState::Changed, LocalDataState::Unchanged)
);
assert_eq!(
SyncAction::CombineChanges,
Syncer::what_do(RemoteDataState::Changed, LocalDataState::Changed)
);
}
}