// Copyright 2018 Mozilla // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use // this file except in compliance with the License. You may obtain a copy of the // License at http://www.apache.org/licenses/LICENSE-2.0 // Unless required by applicable law or agreed to in writing, software distributed // under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR // CONDITIONS OF ANY KIND, either express or implied. See the License for the // specific language governing permissions and limitations under the License. use std::fmt; use std::collections::HashSet; use uuid::Uuid; use core_traits::{Entid, KnownEntid, TypedValue}; use edn::entities::{EntityPlace, LookupRef, TxFunction}; use edn::PlainSymbol; use mentat_db::{entids, timelines, PartitionMap, CORE_SCHEMA_VERSION}; use mentat_transaction::{InProgress, Queryable, TermBuilder}; use mentat_transaction::entity_builder::BuildTerms; use mentat_transaction::query::{QueryInputs, Variable}; use crate::bootstrap::BootstrapHelper; use public_traits::errors::Result; use crate::metadata::{PartitionsTable, SyncMetadata}; use crate::schema::ensure_current_version; use crate::tx_mapper::TxMapper; use crate::tx_processor::{Processor, TxReceiver}; use crate::tx_uploader::TxUploader; use crate::types::{GlobalTransactionLog, LocalTx, Tx, TxPart}; use tolstoy_traits::errors::TolstoyError; use crate::logger::d; use crate::syncer; pub struct Syncer {} #[derive(Debug, PartialEq, Clone)] pub enum SyncFollowup { None, FullSync, } impl fmt::Display for SyncFollowup { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { SyncFollowup::None => write!(f, "None"), SyncFollowup::FullSync => write!(f, "Full sync"), } } } #[derive(Debug, PartialEq, Clone)] pub enum SyncReport { IncompatibleRemoteBootstrap(i64, i64), BadRemoteState(String), NoChanges, RemoteFastForward, LocalFastForward, Merge(SyncFollowup), } pub enum SyncResult { Atomic(SyncReport), NonAtomic(Vec), } impl fmt::Display for SyncReport { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { SyncReport::IncompatibleRemoteBootstrap(local, remote) => write!( f, "Incompatible remote bootstrap transaction version. Local: {}, remote: {}.", local, remote ), SyncReport::BadRemoteState(err) => write!(f, "Bad remote state: {}", err), SyncReport::NoChanges => write!(f, "Neither local nor remote have any new changes"), SyncReport::RemoteFastForward => write!(f, "Fast-forwarded remote"), SyncReport::LocalFastForward => write!(f, "Fast-forwarded local"), SyncReport::Merge(follow_up) => write!( f, "Merged local and remote, requesting a follow-up: {}", follow_up ), } } } impl fmt::Display for SyncResult { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { SyncResult::Atomic(report) => write!(f, "Single atomic sync: {}", report), SyncResult::NonAtomic(reports) => { writeln!(f, "Series of atomic syncs ({})", reports.len())?; for report in reports { writeln!(f, "{}", report)?; } writeln!(f, "\\o/") } } } } #[derive(Debug, PartialEq)] enum SyncAction { NoOp, // TODO this is the same as remote fast-forward from local root. // It's currently distinguished from remote fast-forward for a more // path through the "first-sync against non-empty remote" flow. PopulateRemote, RemoteFastForward, LocalFastForward, // Generic name since we might merge, or rebase, or do something else. CombineChanges, } /// Represents remote state relative to previous sync. /// On first sync, it's always "Changed" unless remote is "Empty". pub enum RemoteDataState { Empty, Changed, Unchanged, } /// Remote state is expressed in terms of what "remote head" actually is, /// and what we think it is. impl<'a> From<(&'a Uuid, &'a Uuid)> for RemoteDataState { fn from((known_remote_head, actual_remote_head): (&Uuid, &Uuid)) -> RemoteDataState { if *actual_remote_head == Uuid::nil() { RemoteDataState::Empty } else if actual_remote_head != known_remote_head { RemoteDataState::Changed } else { RemoteDataState::Unchanged } } } /// Represents local state relative to previous sync. /// On first sync it's always "Changed". /// Local client can't be empty: there's always at least a bootstrap transaction. pub enum LocalDataState { Changed, Unchanged, } /// Local state is expressed in terms of presence of a "mapping" for the local head. /// Presence of a mapping means that we've uploaded our local head, /// indicating that there's no local changes. /// Absence of a mapping indicates that local head hasn't been uploaded /// and that we have local changes. impl From> for LocalDataState { fn from(mapped_local_head: Option) -> LocalDataState { match mapped_local_head { Some(_) => LocalDataState::Unchanged, None => LocalDataState::Changed, } } } // TODO rename this thing. pub struct LocalTxSet { txs: Vec, } impl LocalTxSet { pub fn new() -> LocalTxSet { LocalTxSet { txs: vec![] } } } impl Default for syncer::LocalTxSet { fn default() -> Self { Self::new() } } impl TxReceiver> for LocalTxSet { fn tx(&mut self, tx_id: Entid, datoms: &mut T) -> Result<()> where T: Iterator, { self.txs.push(LocalTx { tx: tx_id, parts: datoms.collect(), }); Ok(()) } fn done(self) -> Vec { self.txs } } impl Syncer { /// Produces a SyncAction based on local and remote states. fn what_do(remote_state: RemoteDataState, local_state: LocalDataState) -> SyncAction { match remote_state { RemoteDataState::Empty => SyncAction::PopulateRemote, RemoteDataState::Changed => match local_state { LocalDataState::Changed => SyncAction::CombineChanges, LocalDataState::Unchanged => SyncAction::LocalFastForward, }, RemoteDataState::Unchanged => match local_state { LocalDataState::Changed => SyncAction::RemoteFastForward, LocalDataState::Unchanged => SyncAction::NoOp, }, } } /// Upload local txs: (from_tx, HEAD]. Remote head is necessary here because we need to specify /// "parent" for each transaction we'll upload; remote head will be first transaction's parent. fn fast_forward_remote( db_tx: &mut rusqlite::Transaction<'_>, from_tx: Option, remote_client: &mut R, remote_head: &Uuid, ) -> Result<()> where R: GlobalTransactionLog, { // TODO consider moving head manipulations into uploader? let report; // Scope to avoid double-borrowing mutable remote_client. { // Prepare an uploader. let uploader = TxUploader::new( remote_client, remote_head, SyncMetadata::get_partitions(db_tx, PartitionsTable::Tolstoy)?, ); // Walk the local transactions in the database and upload them. report = Processor::process(db_tx, from_tx, uploader)?; } if let Some(last_tx_uploaded) = report.head { // Upload remote head. remote_client.set_head(&last_tx_uploaded)?; // On success: // - persist local mappings from the receiver // - update our local "remote head". TxMapper::set_lg_mappings( db_tx, report .temp_uuids .iter() .map(|v| (*v.0, v.1).into()) .collect(), )?; SyncMetadata::set_remote_head(db_tx, &last_tx_uploaded)?; } Ok(()) } fn local_tx_for_uuid(db_tx: &rusqlite::Transaction<'_>, uuid: &Uuid) -> Result { match TxMapper::get_tx_for_uuid(db_tx, uuid)? { Some(t) => Ok(t), None => bail!(TolstoyError::TxIncorrectlyMapped(0)), } } fn remote_parts_to_builder(builder: &mut TermBuilder, parts: Vec) -> Result<()> { for part in parts { let e: EntityPlace; let a = KnownEntid(part.a); let v = part.v; // Instead of providing a 'txInstant' datom directly, we map it // into a (transaction-tx) style assertion. // Transactor knows how to pick out a txInstant value out of these // assertions and use that value for the generated transaction's txInstant. if part.a == entids::DB_TX_INSTANT { e = EntityPlace::TxFunction(TxFunction { op: PlainSymbol("transaction-tx".to_string()), }); } else { e = KnownEntid(part.e).into(); } if part.added { builder.add(e, a, v)?; } else { builder.retract(e, a, v)?; } } Ok(()) } /// In context of a "transaction to be applied", a PartitionMap supplied here /// represents what a PartitionMap will be once this transaction is applied. /// This works well for regular assertions: entids are supplied, and we need /// them to be allocated in the user partition space. /// However, we need to decrement 'tx' partition's index, so that the transactor's /// allocated tx will match what came off the wire. /// N.B.: this depends on absence of holes in the 'tx' partition! fn rewind_tx_partition_by_one(partition_map: &mut PartitionMap) -> Result<()> { if let Some(tx_part) = partition_map.get_mut(":db.part/tx") { assert_eq!(false, tx_part.allow_excision); // Sanity check. let next_entid = tx_part.next_entid() - 1; tx_part.set_next_entid(next_entid); Ok(()) } else { bail!(TolstoyError::BadRemoteState( "Missing tx partition in an incoming transaction".to_string() )); } } fn fast_forward_local<'a, 'c>( in_progress: &mut InProgress<'a, 'c>, txs: Vec, ) -> Result { let mut last_tx = None; for tx in txs { let mut builder = TermBuilder::new(); // TODO both here and in the merge scenario we're doing the same thing with the partition maps // and with the txInstant datom rewriting. // Figure out how to combine these operations into a resuable primitive(s). // See notes in 'merge' for why we're doing this stuff. let mut partition_map = match tx.parts[0].partitions.clone() { Some(parts) => parts, None => { return Ok(SyncReport::BadRemoteState( "Missing partition map in incoming transaction".to_string(), )); } }; // Make space in the provided tx partition for the transaction we're about to create. // See function's notes for details. Syncer::rewind_tx_partition_by_one(&mut partition_map)?; Syncer::remote_parts_to_builder(&mut builder, tx.parts)?; // Allocate space for the incoming entids. in_progress.partition_map = partition_map; let report = in_progress.transact_builder(builder)?; last_tx = Some((report.tx_id, tx.tx)); } // We've just transacted a new tx, and generated a new tx entid. Map it to the corresponding // incoming tx uuid, advance our "locally known remote head". if let Some((entid, uuid)) = last_tx { SyncMetadata::set_remote_head_and_map( &mut in_progress.transaction, (entid, &uuid).into(), )?; } Ok(SyncReport::LocalFastForward) } fn merge( ip: &mut InProgress<'_, '_>, incoming_txs: Vec, mut local_txs_to_merge: Vec, ) -> Result { d(&"Rewinding local transactions.".to_string()); // 1) Rewind local to shared root. local_txs_to_merge.sort(); // TODO sort at the interface level? let (new_schema, new_partition_map) = timelines::move_from_main_timeline( &ip.transaction, &ip.schema, ip.partition_map.clone(), local_txs_to_merge[0].tx.., // A poor man's parent reference. This might be brittle, although // excisions are prohibited in the 'tx' partition, so this should hold... local_txs_to_merge[0].tx - 1, )?; if let Some(schema) = new_schema { ip.schema = schema }; ip.partition_map = new_partition_map; // 2) Transact incoming. // 2.1) Prepare remote tx tuples (TermBuilder, PartitionMap, Uuid), which represent // a remote transaction, its global identifier and partitions after it's applied. d(&"Transacting incoming...".to_string()); let mut builders = vec![]; for remote_tx in incoming_txs { let mut builder = TermBuilder::new(); let partition_map = match remote_tx.parts[0].partitions.clone() { Some(parts) => parts, None => { return Ok(SyncReport::BadRemoteState( "Missing partition map in incoming transaction".to_string(), )); } }; Syncer::remote_parts_to_builder(&mut builder, remote_tx.parts)?; builders.push((builder, partition_map, remote_tx.tx)); } let mut remote_report = None; for (builder, mut partition_map, remote_tx) in builders { // Make space in the provided tx partition for the transaction we're about to create. // See function's notes for details. Syncer::rewind_tx_partition_by_one(&mut partition_map)?; // This allocates our incoming entids in each builder, // letting us just use KnownEntid in the builders. ip.partition_map = partition_map; remote_report = Some((ip.transact_builder(builder)?.tx_id, remote_tx)); } d(&"Transacting local on top of incoming...".to_string()); // 3) Rebase local transactions on top of remote. let mut clean_rebase = true; for local_tx in local_txs_to_merge { let mut builder = TermBuilder::new(); // This is the beginnings of entity merging. // An entid might be already known to the Schema, or it // might be allocated in this transaction. // In the former case, refer to it verbatim. // In the latter case, rewrite it as a tempid, and let the transactor allocate it. let mut entids_that_will_allocate = HashSet::new(); // We currently support "strict schema merging": we'll smush attribute definitions, // but only if they're the same. // e.g. prohibited would be defining different cardinality for the same attribute. // Defining new attributes is allowed if: // - attribute is defined either on local or remote, // - attribute is defined on both local and remote in the same way. // Modifying an attribute is currently not supported (requires higher order schema migrations). // Note that "same" local and remote attributes might have different entids in the // two sets of transactions. // Set of entities that may alter "installed" attribute. // Since this is a rebase of local on top of remote, an "installed" // attribute might be one that was present in the root, or one that was // defined by remote. let mut might_alter_installed_attributes = HashSet::new(); // Set of entities that describe a new attribute, not present in the root // or on the remote. let mut will_not_alter_installed_attribute = HashSet::new(); // Note that at this point, remote and local have flipped - we're transacting // local on top of incoming (which are already in the schema). // Go through local datoms, and classify any schema-altering entids into // one of the two sets above. for part in &local_tx.parts { // If we have an ident definition locally, check if remote // already defined this ident. If it did, we'll need to ensure // both local and remote are defining it in the same way. if part.a == entids::DB_IDENT { match part.v { TypedValue::Keyword(ref local_kw) => { // Remote did not define this ident. Make a note of it, // so that we'll know to ignore its attribute datoms. if !ip.schema.ident_map.contains_key(local_kw) { will_not_alter_installed_attribute.insert(part.e); // Otherwise, we'll need to ensure we have the same attribute definition // for it. } else { might_alter_installed_attributes.insert(part.e); } } _ => panic!("programming error: wrong value type for a local ident"), } } else if entids::is_a_schema_attribute(part.a) && !will_not_alter_installed_attribute.contains(&part.e) { might_alter_installed_attributes.insert(part.e); } } for part in &local_tx.parts { match part.a { // We'll be ignoring this datom later on (to be generated by the transactor). // During a merge we're concerned with entities in the "user" partition, // while this falls into the "tx" partition. // We have preserved the original txInstant value on the alternate timeline. entids::DB_TX_INSTANT => continue, // 'e's will be replaced with tempids, letting transactor handle everything. // Non-unique entities are "duplicated". Unique entities are upserted. _ => { // Retractions never allocated tempids in the transactor. if part.added { entids_that_will_allocate.insert(part.e); } } } } // :db/ident is a db.unique/identity attribute, which means transactor will upsert // attribute assertions. E.g. if a new attribute was defined on local and not on remote, // it will be inserted. If both local and remote defined the same attribute // with different entids, we'll converge and use remote's entid. // Same follows for other types of db.unique/identity attributes. // If user-defined attribute is db.unique/identity, we'll "smush" local and remote // assertions against it. // For example, {:p/name "Grisha"} assertion on local and // {:p/name "Grisha"} assertion on remote will result in a single entity. // If user-defined attribute is not unique, however, no smushing will be performed. // The above example will result in two entities. for part in local_tx.parts { // Skip the "tx instant" datom: it will be generated by our transactor. // We don't care about preserving exact state of these datoms: they're already // stashed away on the timeline we've created above. if part.a == entids::DB_TX_INSTANT { continue; } let e: EntityPlace; let a = KnownEntid(part.a); let v = part.v; // Rewrite entids if they will allocate (see entity merging notes above). if entids_that_will_allocate.contains(&part.e) { e = builder.named_tempid(format!("{}", part.e)).into(); // Otherwise, refer to existing entities. } else { e = KnownEntid(part.e).into(); } // TODO we need to do the same rewriting for part.v if it's a Ref. // N.b.: attribute can't refer to an unallocated entity, so it's always a KnownEntid. // To illustrate, this is not a valid transaction, and will fail ("no entid found for ident: :person/name"): // [ // {:db/ident :person/name :db/valueType :db.type/string :db/cardinality :db.cardinality/one} // {:person/name "Grisha"} // ] // One would need to split that transaction into two, // at which point :person/name will refer to an allocated entity. match part.added { true => builder.add(e, a, v)?, false => { if entids_that_will_allocate.contains(&part.e) { builder.retract(e, a, v)?; continue; } // TODO handle tempids in ValuePlace, as well. // Retractions with non-upserting tempids are not currently supported. // We work around this by using a lookup-ref instead of the entity tempid. // However: // - lookup-ref can only be used for attributes which are :db/unique, // - a lookup-ref must resolve. If it doesn't, our transaction will fail. // And so: // - we skip retractions of non-unique attributes, // - we "pre-run" a lookup-ref to ensure it will resolve, // and skip the retraction otherwise. match ip.schema.attribute_map.get(&part.a) { Some(attributes) => { // A lookup-ref using a non-unique attribute will fail. // Skip this retraction, since we can't make sense of it. if attributes.unique.is_none() { continue; } } None => panic!( "programming error: missing attribute map for a known attribute" ), } // TODO prepare a query and re-use it for all retractions of this type let pre_lookup = ip.q_once( "[:find ?e . :in ?a ?v :where [?e ?a ?v]]", QueryInputs::with_value_sequence(vec![ (Variable::from_valid_name("?a"), a.into()), (Variable::from_valid_name("?v"), v.clone()), ]), )?; if pre_lookup.is_empty() { continue; } // TODO just use the value from the query instead of doing _another_ lookup-ref! builder.retract( EntityPlace::LookupRef(LookupRef { a: a.into(), v: v.clone(), }), a, v, )?; } } } // After all these checks, our builder might be empty: short-circuit. if builder.is_empty() { continue; } d(&"Savepoint before transacting a local tx...".to_string()); ip.savepoint("speculative_local")?; d(&format!( "Transacting builder filled with local txs... {:?}", builder )); let report = ip.transact_builder(builder)?; // Let's check that we didn't modify any schema attributes. // Our current attribute map in the schema isn't rich enough to allow // for this check: it's missing a notion of "attribute absence" - we can't // distinguish between a missing attribute and a default value. // Instead, we simply query the database, checking if transaction produced // any schema-altering datoms. for e in might_alter_installed_attributes.iter() { if let Some(resolved_e) = report.tempids.get(&format!("{}", e)) { if SyncMetadata::has_entity_assertions_in_tx( &ip.transaction, *resolved_e, report.tx_id, )? { bail!(TolstoyError::NotYetImplemented( "Can't sync with schema alterations yet.".to_string() )); } } } if !SyncMetadata::is_tx_empty(&ip.transaction, report.tx_id)? { d(&format!("tx {} is not a no-op", report.tx_id)); clean_rebase = false; ip.release_savepoint("speculative_local")?; } else { d(&format!( "Applied tx {} as a no-op. Rolling back the savepoint (empty tx clean-up).", report.tx_id )); ip.rollback_savepoint("speculative_local")?; } } // TODO // At this point, we've rebased local transactions on top of remote. // This would be a good point to create a "merge commit" and upload our loosing timeline. // Since we don't upload during a merge (instead, we request a follow-up sync), // set the locally known remote HEAD to what we received from the 'remote'. if let Some((entid, uuid)) = remote_report { SyncMetadata::set_remote_head_and_map(&mut ip.transaction, (entid, &uuid).into())?; } // If necessary, request a full sync as a follow-up to fast-forward remote. if clean_rebase { Ok(SyncReport::Merge(SyncFollowup::None)) } else { Ok(SyncReport::Merge(SyncFollowup::FullSync)) } } fn first_sync_against_non_empty( ip: &mut InProgress<'_, '_>, remote_client: &R, local_metadata: &SyncMetadata, ) -> Result where R: GlobalTransactionLog, { d(&"remote non-empty on first sync, adopting remote state.".to_string()); // 1) Download remote transactions. let incoming_txs = remote_client.transactions_after(&Uuid::nil())?; if incoming_txs.is_empty() { return Ok(SyncReport::BadRemoteState( "Remote specified non-root HEAD but gave no transactions".to_string(), )); } // 2) Process remote bootstrap. let remote_bootstrap = &incoming_txs[0]; let local_bootstrap = local_metadata.root; let bootstrap_helper = BootstrapHelper::new(remote_bootstrap); if !bootstrap_helper.is_compatible()? { return Ok(SyncReport::IncompatibleRemoteBootstrap( CORE_SCHEMA_VERSION as i64, bootstrap_helper.core_schema_version()?, )); } d(&format!( "mapping incoming bootstrap tx uuid to local bootstrap entid: {} -> {}", remote_bootstrap.tx, local_bootstrap )); // Map incoming bootstrap tx uuid to local bootstrap entid. // If there's more work to do, we'll move the head again. SyncMetadata::set_remote_head_and_map( &mut ip.transaction, (local_bootstrap, &remote_bootstrap.tx).into(), )?; // 3) Determine new local and remote data states, now that bootstrap has been dealt with. let remote_state = if incoming_txs.len() > 1 { RemoteDataState::Changed } else { RemoteDataState::Unchanged }; let local_state = if local_metadata.root != local_metadata.head { LocalDataState::Changed } else { LocalDataState::Unchanged }; // 4) The rest of this flow isn't that special anymore. // Since we've "merged" with the remote bootstrap, the "no-op" and // "local fast-forward" cases are reported as merges. match Syncer::what_do(remote_state, local_state) { SyncAction::NoOp => { Ok(SyncReport::Merge(SyncFollowup::None)) } SyncAction::PopulateRemote => { // This is a programming error. bail!(TolstoyError::UnexpectedState("Remote state can't be empty on first sync against non-empty remote".to_string())) } SyncAction::RemoteFastForward => { bail!(TolstoyError::NotYetImplemented("TODO fast-forward remote on first sync when remote is just bootstrap and local has more".to_string())) } SyncAction::LocalFastForward => { Syncer::fast_forward_local(ip, incoming_txs[1..].to_vec())?; Ok(SyncReport::Merge(SyncFollowup::None)) } SyncAction::CombineChanges => { let local_txs = Processor::process( &ip.transaction, Some(local_metadata.root), LocalTxSet::new())?; Syncer::merge( ip, incoming_txs[1..].to_vec(), local_txs, ) } } } pub fn sync(ip: &mut InProgress<'_, '_>, remote_client: &mut R) -> Result where R: GlobalTransactionLog, { d(&"sync flowing".to_string()); ensure_current_version(&mut ip.transaction)?; let remote_head = remote_client.head()?; d(&format!("remote head {:?}", remote_head)); let locally_known_remote_head = SyncMetadata::remote_head(&ip.transaction)?; d(&format!("local head {:?}", locally_known_remote_head)); let (root, head) = SyncMetadata::root_and_head_tx(&ip.transaction)?; let local_metadata = SyncMetadata::new(root, head); // impl From ... vs ::new() calls to constuct "state" objects? let local_state = TxMapper::get(&ip.transaction, local_metadata.head)?.into(); let remote_state = (&locally_known_remote_head, &remote_head).into(); // Currently, first sync against a non-empty remote is special. if locally_known_remote_head == Uuid::nil() && remote_head != Uuid::nil() { return Syncer::first_sync_against_non_empty(ip, remote_client, &local_metadata); } match Syncer::what_do(remote_state, local_state) { SyncAction::NoOp => { d(&"local HEAD did not move. Nothing to do!".to_string()); Ok(SyncReport::NoChanges) } SyncAction::PopulateRemote => { d(&"empty remote!".to_string()); Syncer::fast_forward_remote( &mut ip.transaction, None, remote_client, &remote_head, )?; Ok(SyncReport::RemoteFastForward) } SyncAction::RemoteFastForward => { d(&"local HEAD moved.".to_string()); let upload_from_tx = Syncer::local_tx_for_uuid(&ip.transaction, &locally_known_remote_head)?; d(&"Fast-forwarding the remote.".to_string()); // TODO it's possible that we've successfully advanced remote head previously, // but failed to advance our own local head. If that's the case, and we can recognize it, // our sync becomes just bumping our local head. AFAICT below would currently fail. Syncer::fast_forward_remote( &mut ip.transaction, Some(upload_from_tx), remote_client, &remote_head, )?; Ok(SyncReport::RemoteFastForward) } SyncAction::LocalFastForward => { d(&"fast-forwarding local store.".to_string()); Syncer::fast_forward_local( ip, remote_client.transactions_after(&locally_known_remote_head)?, )?; Ok(SyncReport::LocalFastForward) } SyncAction::CombineChanges => { d(&"combining changes from local and remote stores.".to_string()); // Get the starting point for out local set of txs to merge. let combine_local_from_tx = Syncer::local_tx_for_uuid(&ip.transaction, &locally_known_remote_head)?; let local_txs = Processor::process( &ip.transaction, Some(combine_local_from_tx), LocalTxSet::new(), )?; // Merge! Syncer::merge( ip, // Remote txs to merge... remote_client.transactions_after(&locally_known_remote_head)?, // ... with the local txs. local_txs, ) } } } } #[cfg(test)] mod tests { use super::*; #[test] fn test_what_do() { assert_eq!( SyncAction::PopulateRemote, Syncer::what_do(RemoteDataState::Empty, LocalDataState::Unchanged) ); assert_eq!( SyncAction::PopulateRemote, Syncer::what_do(RemoteDataState::Empty, LocalDataState::Changed) ); assert_eq!( SyncAction::NoOp, Syncer::what_do(RemoteDataState::Unchanged, LocalDataState::Unchanged) ); assert_eq!( SyncAction::RemoteFastForward, Syncer::what_do(RemoteDataState::Unchanged, LocalDataState::Changed) ); assert_eq!( SyncAction::LocalFastForward, Syncer::what_do(RemoteDataState::Changed, LocalDataState::Unchanged) ); assert_eq!( SyncAction::CombineChanges, Syncer::what_do(RemoteDataState::Changed, LocalDataState::Changed) ); } }