1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use crate::changeset::CollectionUpdate;
use crate::client::Sync15StorageClient;
use crate::clients;
use crate::coll_state::LocalCollStateMachine;
use crate::error::Error;
use crate::key_bundle::KeyBundle;
use crate::state::GlobalState;
use crate::telemetry;
use interrupt_support::Interruptee;

pub use sync15_traits::{IncomingChangeset, Store};

pub fn synchronize(
    client: &Sync15StorageClient,
    global_state: &GlobalState,
    root_sync_key: &KeyBundle,
    store: &dyn Store,
    fully_atomic: bool,
    telem_engine: &mut telemetry::Engine,
    interruptee: &dyn Interruptee,
) -> Result<(), crate::Error> {
    synchronize_with_clients_engine(
        client,
        global_state,
        root_sync_key,
        None,
        store,
        fully_atomic,
        telem_engine,
        interruptee,
    )
}

#[allow(clippy::too_many_arguments)]
pub fn synchronize_with_clients_engine(
    client: &Sync15StorageClient,
    global_state: &GlobalState,
    root_sync_key: &KeyBundle,
    clients: Option<&clients::Engine<'_>>,
    store: &dyn Store,
    fully_atomic: bool,
    telem_engine: &mut telemetry::Engine,
    interruptee: &dyn Interruptee,
) -> Result<(), Error> {
    let collection = store.collection_name();
    log::info!("Syncing collection {}", collection);

    // our global state machine is ready - get the collection machine going.
    let mut coll_state = match LocalCollStateMachine::get_state(store, global_state, root_sync_key)?
    {
        Some(coll_state) => coll_state,
        None => {
            // XXX - this is either "error" or "declined".
            log::warn!(
                "can't setup for the {} collection - hopefully it works later",
                collection
            );
            return Ok(());
        }
    };

    if let Some(clients) = clients {
        store.prepare_for_sync(&|| clients.get_client_data())?;
    }

    let collection_requests = store.get_collection_requests(coll_state.last_modified)?;
    let incoming = if collection_requests.is_empty() {
        log::info!("skipping incoming for {} - not needed.", collection);
        vec![IncomingChangeset::new(collection, coll_state.last_modified)]
    } else {
        assert_eq!(collection_requests.last().unwrap().collection, collection);

        let count = collection_requests.len();
        collection_requests
            .into_iter()
            .enumerate()
            .map(|(idx, collection_request)| {
                interruptee.err_if_interrupted()?;
                let incoming_changes =
                    crate::changeset::fetch_incoming(client, &mut coll_state, &collection_request)?;

                log::info!(
                    "Downloaded {} remote changes (request {} of {})",
                    incoming_changes.changes.len(),
                    idx,
                    count,
                );
                Ok(incoming_changes)
            })
            .collect::<Result<Vec<_>, Error>>()?
    };

    let new_timestamp = incoming.last().expect("must have >= 1").timestamp;
    let mut outgoing = store.apply_incoming(incoming, telem_engine)?;

    interruptee.err_if_interrupted()?;
    // Bump the timestamps now just incase the upload fails.
    // xxx - duplication below smells wrong
    outgoing.timestamp = new_timestamp;
    coll_state.last_modified = new_timestamp;

    log::info!("Uploading {} outgoing changes", outgoing.changes.len());
    let upload_info =
        CollectionUpdate::new_from_changeset(client, &coll_state, outgoing, fully_atomic)?
            .upload()?;

    log::info!(
        "Upload success ({} records success, {} records failed)",
        upload_info.successful_ids.len(),
        upload_info.failed_ids.len()
    );
    // ideally we'd report this per-batch, but for now, let's just report it
    // as a total.
    let mut telem_outgoing = telemetry::EngineOutgoing::new();
    telem_outgoing.sent(upload_info.successful_ids.len() + upload_info.failed_ids.len());
    telem_outgoing.failed(upload_info.failed_ids.len());
    telem_engine.outgoing(telem_outgoing);

    store.sync_finished(upload_info.modified_timestamp, upload_info.successful_ids)?;

    log::info!("Sync finished!");
    Ok(())
}