1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use crate::changeset::CollectionUpdate;
use crate::client::Sync15StorageClient;
use crate::clients;
use crate::coll_state::LocalCollStateMachine;
use crate::error::Error;
use crate::key_bundle::KeyBundle;
use crate::state::GlobalState;
use crate::telemetry;
use interrupt_support::Interruptee;
pub use sync15_traits::{IncomingChangeset, Store};
pub fn synchronize(
client: &Sync15StorageClient,
global_state: &GlobalState,
root_sync_key: &KeyBundle,
store: &dyn Store,
fully_atomic: bool,
telem_engine: &mut telemetry::Engine,
interruptee: &dyn Interruptee,
) -> Result<(), crate::Error> {
synchronize_with_clients_engine(
client,
global_state,
root_sync_key,
None,
store,
fully_atomic,
telem_engine,
interruptee,
)
}
#[allow(clippy::too_many_arguments)]
pub fn synchronize_with_clients_engine(
client: &Sync15StorageClient,
global_state: &GlobalState,
root_sync_key: &KeyBundle,
clients: Option<&clients::Engine<'_>>,
store: &dyn Store,
fully_atomic: bool,
telem_engine: &mut telemetry::Engine,
interruptee: &dyn Interruptee,
) -> Result<(), Error> {
let collection = store.collection_name();
log::info!("Syncing collection {}", collection);
let mut coll_state = match LocalCollStateMachine::get_state(store, global_state, root_sync_key)?
{
Some(coll_state) => coll_state,
None => {
log::warn!(
"can't setup for the {} collection - hopefully it works later",
collection
);
return Ok(());
}
};
if let Some(clients) = clients {
store.prepare_for_sync(&|| clients.get_client_data())?;
}
let collection_requests = store.get_collection_requests(coll_state.last_modified)?;
let incoming = if collection_requests.is_empty() {
log::info!("skipping incoming for {} - not needed.", collection);
vec![IncomingChangeset::new(collection, coll_state.last_modified)]
} else {
assert_eq!(collection_requests.last().unwrap().collection, collection);
let count = collection_requests.len();
collection_requests
.into_iter()
.enumerate()
.map(|(idx, collection_request)| {
interruptee.err_if_interrupted()?;
let incoming_changes =
crate::changeset::fetch_incoming(client, &mut coll_state, &collection_request)?;
log::info!(
"Downloaded {} remote changes (request {} of {})",
incoming_changes.changes.len(),
idx,
count,
);
Ok(incoming_changes)
})
.collect::<Result<Vec<_>, Error>>()?
};
let new_timestamp = incoming.last().expect("must have >= 1").timestamp;
let mut outgoing = store.apply_incoming(incoming, telem_engine)?;
interruptee.err_if_interrupted()?;
outgoing.timestamp = new_timestamp;
coll_state.last_modified = new_timestamp;
log::info!("Uploading {} outgoing changes", outgoing.changes.len());
let upload_info =
CollectionUpdate::new_from_changeset(client, &coll_state, outgoing, fully_atomic)?
.upload()?;
log::info!(
"Upload success ({} records success, {} records failed)",
upload_info.successful_ids.len(),
upload_info.failed_ids.len()
);
let mut telem_outgoing = telemetry::EngineOutgoing::new();
telem_outgoing.sent(upload_info.successful_ids.len() + upload_info.failed_ids.len());
telem_outgoing.failed(upload_info.failed_ids.len());
telem_engine.outgoing(telem_outgoing);
store.sync_finished(upload_info.modified_timestamp, upload_info.successful_ids)?;
log::info!("Sync finished!");
Ok(())
}