use crate::client::{BackoffListener, Sync15StorageClient, Sync15StorageClientInit};
use crate::clients::{self, CommandProcessor, CLIENTS_TTL_REFRESH};
use crate::coll_state::StoreSyncAssociation;
use crate::error::Error;
use crate::key_bundle::KeyBundle;
use crate::state::{EngineChangesNeeded, GlobalState, PersistedGlobalState, SetupStateMachine};
use crate::status::{ServiceStatus, SyncResult};
use crate::sync::{self, Store};
use crate::telemetry;
use interrupt_support::Interruptee;
use std::collections::HashMap;
use std::mem;
use std::result;
use std::time::{Duration, SystemTime};
#[derive(Debug)]
struct ClientInfo {
client_init: Sync15StorageClientInit,
client: Sync15StorageClient,
}
impl ClientInfo {
fn new(ci: &Sync15StorageClientInit) -> Result<Self, Error> {
Ok(Self {
client_init: ci.clone(),
client: Sync15StorageClient::new(ci.clone())?,
})
}
}
#[derive(Debug, Default)]
pub struct MemoryCachedState {
last_client_info: Option<ClientInfo>,
last_global_state: Option<GlobalState>,
next_sync_after: Option<SystemTime>,
next_client_refresh_after: Option<SystemTime>,
}
impl MemoryCachedState {
pub fn clear_sensitive_info(&mut self) {
self.last_client_info = None;
self.last_global_state = None;
}
pub fn get_next_sync_after(&self) -> Option<SystemTime> {
self.next_sync_after
}
pub fn should_refresh_client(&self) -> bool {
match self.next_client_refresh_after {
Some(t) => SystemTime::now() > t,
None => true,
}
}
pub fn note_client_refresh(&mut self) {
self.next_client_refresh_after =
Some(SystemTime::now() + Duration::from_secs(CLIENTS_TTL_REFRESH));
}
}
pub fn sync_multiple(
stores: &[&dyn Store],
persisted_global_state: &mut Option<String>,
mem_cached_state: &mut MemoryCachedState,
storage_init: &Sync15StorageClientInit,
root_sync_key: &KeyBundle,
interruptee: &dyn Interruptee,
req_info: Option<SyncRequestInfo<'_>>,
) -> SyncResult {
sync_multiple_with_command_processor(
None,
stores,
persisted_global_state,
mem_cached_state,
storage_init,
root_sync_key,
interruptee,
req_info,
)
}
#[allow(clippy::too_many_arguments)]
pub fn sync_multiple_with_command_processor(
command_processor: Option<&dyn CommandProcessor>,
stores: &[&dyn Store],
persisted_global_state: &mut Option<String>,
mem_cached_state: &mut MemoryCachedState,
storage_init: &Sync15StorageClientInit,
root_sync_key: &KeyBundle,
interruptee: &dyn Interruptee,
req_info: Option<SyncRequestInfo<'_>>,
) -> SyncResult {
log::info!("Syncing {} stores", stores.len());
let mut sync_result = SyncResult {
service_status: ServiceStatus::OtherError,
result: Ok(()),
declined: None,
next_sync_after: None,
engine_results: HashMap::with_capacity(stores.len()),
telemetry: telemetry::SyncTelemetryPing::new(),
};
let backoff = crate::client::new_backoff_listener();
let req_info = req_info.unwrap_or_default();
let driver = SyncMultipleDriver {
command_processor,
stores,
storage_init,
interruptee,
engines_to_state_change: req_info.engines_to_state_change,
backoff: backoff.clone(),
root_sync_key,
result: &mut sync_result,
persisted_global_state,
mem_cached_state,
saw_auth_error: false,
ignore_soft_backoff: req_info.is_user_action,
};
match driver.sync() {
Ok(()) => {
log::debug!(
"sync was successful, final status={:?}",
sync_result.service_status
);
}
Err(e) => {
log::warn!(
"sync failed: {}, final status={:?}\nBacktrace: {:?}",
e,
sync_result.service_status,
e.backtrace()
);
sync_result.result = Err(e);
}
}
sync_result.set_sync_after(backoff.get_required_wait(false).unwrap_or_default());
mem_cached_state.next_sync_after = sync_result.next_sync_after;
log::trace!("Sync result: {:?}", sync_result);
sync_result
}
#[derive(Debug, Default)]
pub struct SyncRequestInfo<'a> {
pub engines_to_state_change: Option<&'a HashMap<String, bool>>,
pub is_user_action: bool,
}
struct SyncMultipleDriver<'info, 'res, 'pgs, 'mcs> {
command_processor: Option<&'info dyn CommandProcessor>,
stores: &'info [&'info dyn Store],
storage_init: &'info Sync15StorageClientInit,
root_sync_key: &'info KeyBundle,
interruptee: &'info dyn Interruptee,
backoff: BackoffListener,
engines_to_state_change: Option<&'info HashMap<String, bool>>,
result: &'res mut SyncResult,
persisted_global_state: &'pgs mut Option<String>,
mem_cached_state: &'mcs mut MemoryCachedState,
ignore_soft_backoff: bool,
saw_auth_error: bool,
}
impl<'info, 'res, 'pgs, 'mcs> SyncMultipleDriver<'info, 'res, 'pgs, 'mcs> {
fn sync(mut self) -> result::Result<(), Error> {
log::info!("Loading/initializing persisted state");
let mut pgs = self.prepare_persisted_state();
log::info!("Preparing client info");
let client_info = self.prepare_client_info()?;
if self.was_interrupted() {
return Ok(());
}
log::info!("Entering sync state machine");
let mut global_state = self.run_state_machine(&client_info, &mut pgs)?;
if self.was_interrupted() {
return Ok(());
}
self.result.service_status = ServiceStatus::Ok;
let clients_engine = if let Some(command_processor) = self.command_processor {
log::info!("Synchronizing clients engine");
let should_refresh = self.mem_cached_state.should_refresh_client();
let mut engine = clients::Engine::new(command_processor, self.interruptee);
if let Err(e) = engine.sync(
&client_info.client,
&global_state,
&self.root_sync_key,
should_refresh,
) {
let mut telem_sync = telemetry::SyncTelemetry::new();
let mut telem_engine = telemetry::Engine::new("clients");
telem_engine.failure(&e);
telem_sync.engine(telem_engine);
self.result.service_status = ServiceStatus::from_err(&e);
return Err(e);
}
if self.was_interrupted() {
return Ok(());
}
self.mem_cached_state.note_client_refresh();
Some(engine)
} else {
None
};
log::info!("Synchronizing stores");
let telem_sync = self.sync_stores(&client_info, &mut global_state, clients_engine.as_ref());
self.result.telemetry.sync(telem_sync);
log::info!("Finished syncing stores.");
if !self.saw_auth_error {
log::trace!("Updating persisted global state");
self.mem_cached_state.last_client_info = Some(client_info);
self.mem_cached_state.last_global_state = Some(global_state);
}
Ok(())
}
fn was_interrupted(&mut self) -> bool {
if self.interruptee.was_interrupted() {
log::info!("Interrupted, bailing out");
self.result.service_status = ServiceStatus::Interrupted;
true
} else {
false
}
}
fn sync_stores(
&mut self,
client_info: &ClientInfo,
global_state: &mut GlobalState,
clients: Option<&clients::Engine<'_>>,
) -> telemetry::SyncTelemetry {
let mut telem_sync = telemetry::SyncTelemetry::new();
for store in self.stores {
let name = store.collection_name();
if self
.backoff
.get_required_wait(self.ignore_soft_backoff)
.is_some()
{
log::warn!("Got backoff, bailing out of sync early");
break;
}
if global_state.global.declined.iter().any(|e| e == &*name) {
log::info!("The {} engine is declined. Skipping", name);
continue;
}
log::info!("Syncing {} engine!", name);
let mut telem_engine = telemetry::Engine::new(&*name);
let result = sync::synchronize_with_clients_engine(
&client_info.client,
&global_state,
self.root_sync_key,
clients,
*store,
true,
&mut telem_engine,
self.interruptee,
);
match result {
Ok(()) => log::info!("Sync of {} was successful!", name),
Err(ref e) => {
log::warn!("Sync of {} failed! {:?}", name, e);
let this_status = ServiceStatus::from_err(&e);
self.saw_auth_error =
self.saw_auth_error || this_status == ServiceStatus::AuthenticationError;
telem_engine.failure(e);
if this_status != ServiceStatus::OtherError {
telem_sync.engine(telem_engine);
self.result.engine_results.insert(name.into(), result);
self.result.service_status = this_status;
break;
}
}
}
telem_sync.engine(telem_engine);
self.result.engine_results.insert(name.into(), result);
if self.was_interrupted() {
break;
}
}
telem_sync
}
fn run_state_machine(
&mut self,
client_info: &ClientInfo,
pgs: &mut PersistedGlobalState,
) -> result::Result<GlobalState, Error> {
let last_state = mem::replace(&mut self.mem_cached_state.last_global_state, None);
let mut state_machine = SetupStateMachine::for_full_sync(
&client_info.client,
&self.root_sync_key,
pgs,
self.engines_to_state_change,
self.interruptee,
);
log::info!("Advancing state machine to ready (full)");
let res = state_machine.run_to_ready(last_state);
let changes = state_machine.changes_needed.take();
*self.persisted_global_state = Some(serde_json::to_string(&pgs)?);
self.result.declined = Some(pgs.get_declined().to_vec());
log::debug!(
"Declined engines list after state machine set to: {:?}",
self.result.declined,
);
if let Some(c) = changes {
self.wipe_or_reset_engines(c, &client_info.client)?;
}
let state = match res {
Err(e) => {
self.result.service_status = ServiceStatus::from_err(&e);
return Err(e);
}
Ok(state) => state,
};
self.result.telemetry.uid(client_info.client.hashed_uid()?);
self.mem_cached_state.last_global_state = None;
Ok(state)
}
fn wipe_or_reset_engines(
&mut self,
changes: EngineChangesNeeded,
client: &Sync15StorageClient,
) -> result::Result<(), Error> {
if changes.local_resets.is_empty() && changes.remote_wipes.is_empty() {
return Ok(());
}
for e in &changes.remote_wipes {
log::info!("Engine {:?} just got disabled locally, wiping server", e);
client.wipe_remote_engine(&e)?;
}
for s in self.stores {
let name = s.collection_name();
if changes.local_resets.contains(&*name) {
log::info!("Resetting engine {}, as it was declined remotely", name);
s.reset(&StoreSyncAssociation::Disconnected)?;
}
}
Ok(())
}
fn prepare_client_info(&mut self) -> result::Result<ClientInfo, Error> {
let mut client_info = match mem::replace(&mut self.mem_cached_state.last_client_info, None)
{
Some(client_info) => {
if client_info.client_init != *self.storage_init {
log::info!("Discarding all state as the account might have changed");
*self.mem_cached_state = MemoryCachedState::default();
ClientInfo::new(self.storage_init)?
} else {
log::debug!("Reusing memory-cached client_info");
client_info
}
}
None => {
log::debug!("mem_cached_state was stale or missing, need setup");
self.mem_cached_state.clear_sensitive_info();
ClientInfo::new(self.storage_init)?
}
};
client_info.client.backoff = self.backoff.clone();
Ok(client_info)
}
fn prepare_persisted_state(&mut self) -> PersistedGlobalState {
match self.persisted_global_state {
Some(persisted_string) if !persisted_string.is_empty() => {
match serde_json::from_str::<PersistedGlobalState>(&persisted_string) {
Ok(state) => {
log::trace!("Read persisted state: {:?}", state);
state
}
_ => {
log::error!(
"Failed to parse PersistedGlobalState from JSON! Falling back to default"
);
*self.mem_cached_state = MemoryCachedState::default();
PersistedGlobalState::default()
}
}
}
_ => {
log::info!(
"The application didn't give us persisted state - \
this is only expected on the very first run for a given user."
);
*self.mem_cached_state = MemoryCachedState::default();
PersistedGlobalState::default()
}
}
}
}