1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

// This helps you perform a sync of multiple stores and helps you manage
// global and local state between syncs.

use crate::client::{BackoffListener, Sync15StorageClient, Sync15StorageClientInit};
use crate::clients::{self, CommandProcessor, CLIENTS_TTL_REFRESH};
use crate::coll_state::StoreSyncAssociation;
use crate::error::Error;
use crate::key_bundle::KeyBundle;
use crate::state::{EngineChangesNeeded, GlobalState, PersistedGlobalState, SetupStateMachine};
use crate::status::{ServiceStatus, SyncResult};
use crate::sync::{self, Store};
use crate::telemetry;
use interrupt_support::Interruptee;
use std::collections::HashMap;
use std::mem;
use std::result;
use std::time::{Duration, SystemTime};

/// Info about the client to use. We reuse the client unless
/// we discover the client_init has changed, in which case we re-create one.
#[derive(Debug)]
struct ClientInfo {
    // the client_init used to create `client`.
    client_init: Sync15StorageClientInit,
    // the client (our tokenserver state machine state, and our http library's state)
    client: Sync15StorageClient,
}

impl ClientInfo {
    fn new(ci: &Sync15StorageClientInit) -> Result<Self, Error> {
        Ok(Self {
            client_init: ci.clone(),
            client: Sync15StorageClient::new(ci.clone())?,
        })
    }
}

/// Info we want callers to store *in memory* for us so that subsequent
/// syncs are faster. This should never be persisted to storage as it holds
/// sensitive information, such as the sync decryption keys.
#[derive(Debug, Default)]
pub struct MemoryCachedState {
    last_client_info: Option<ClientInfo>,
    last_global_state: Option<GlobalState>,
    // These are just stored in memory, as persisting an invalid value far in the
    // future has the potential to break sync for good.
    next_sync_after: Option<SystemTime>,
    next_client_refresh_after: Option<SystemTime>,
}

impl MemoryCachedState {
    // Called we notice the cached state is stale.
    pub fn clear_sensitive_info(&mut self) {
        self.last_client_info = None;
        self.last_global_state = None;
        // Leave the backoff time, as there's no reason to think it's not still
        // true.
    }
    pub fn get_next_sync_after(&self) -> Option<SystemTime> {
        self.next_sync_after
    }
    pub fn should_refresh_client(&self) -> bool {
        match self.next_client_refresh_after {
            Some(t) => SystemTime::now() > t,
            None => true,
        }
    }
    pub fn note_client_refresh(&mut self) {
        self.next_client_refresh_after =
            Some(SystemTime::now() + Duration::from_secs(CLIENTS_TTL_REFRESH));
    }
}

/// Sync multiple stores
/// * `stores` - The stores to sync
/// * `persisted_global_state` - The global state to use, or None if never
///   before provided. At the end of the sync, and even when the sync fails,
///   the value in this cell should be persisted to permanent storage and
///   provided next time the sync is called.
/// * `last_client_info` - The client state to use, or None if never before
///   provided. At the end of the sync, the value should be persisted
///   *in memory only* - it should not be persisted to disk.
/// * `storage_init` - Information about how the sync http client should be
///   configured.
/// * `root_sync_key` - The KeyBundle used for encryption.
///
/// Returns a map, keyed by name and holding an error value - if any store
/// fails, the sync will continue on to other stores, but the error will be
/// places in this map. The absence of a name in the map implies the store
/// succeeded.
pub fn sync_multiple(
    stores: &[&dyn Store],
    persisted_global_state: &mut Option<String>,
    mem_cached_state: &mut MemoryCachedState,
    storage_init: &Sync15StorageClientInit,
    root_sync_key: &KeyBundle,
    interruptee: &dyn Interruptee,
    req_info: Option<SyncRequestInfo<'_>>,
) -> SyncResult {
    sync_multiple_with_command_processor(
        None,
        stores,
        persisted_global_state,
        mem_cached_state,
        storage_init,
        root_sync_key,
        interruptee,
        req_info,
    )
}

/// Like `sync_multiple`, but specifies an optional command processor to handle
/// commands from the clients collection. This function is called by the sync
/// manager, which provides its own processor.
#[allow(clippy::too_many_arguments)]
pub fn sync_multiple_with_command_processor(
    command_processor: Option<&dyn CommandProcessor>,
    stores: &[&dyn Store],
    persisted_global_state: &mut Option<String>,
    mem_cached_state: &mut MemoryCachedState,
    storage_init: &Sync15StorageClientInit,
    root_sync_key: &KeyBundle,
    interruptee: &dyn Interruptee,
    req_info: Option<SyncRequestInfo<'_>>,
) -> SyncResult {
    log::info!("Syncing {} stores", stores.len());
    let mut sync_result = SyncResult {
        service_status: ServiceStatus::OtherError,
        result: Ok(()),
        declined: None,
        next_sync_after: None,
        engine_results: HashMap::with_capacity(stores.len()),
        telemetry: telemetry::SyncTelemetryPing::new(),
    };
    let backoff = crate::client::new_backoff_listener();
    let req_info = req_info.unwrap_or_default();
    let driver = SyncMultipleDriver {
        command_processor,
        stores,
        storage_init,
        interruptee,
        engines_to_state_change: req_info.engines_to_state_change,
        backoff: backoff.clone(),
        root_sync_key,
        result: &mut sync_result,
        persisted_global_state,
        mem_cached_state,
        saw_auth_error: false,
        ignore_soft_backoff: req_info.is_user_action,
    };
    match driver.sync() {
        Ok(()) => {
            log::debug!(
                "sync was successful, final status={:?}",
                sync_result.service_status
            );
        }
        Err(e) => {
            log::warn!(
                "sync failed: {}, final status={:?}\nBacktrace: {:?}",
                e,
                sync_result.service_status,
                e.backtrace()
            );
            sync_result.result = Err(e);
        }
    }
    // Respect `backoff` value when computing the next sync time even if we were
    // ignoring it during the sync
    sync_result.set_sync_after(backoff.get_required_wait(false).unwrap_or_default());
    mem_cached_state.next_sync_after = sync_result.next_sync_after;
    log::trace!("Sync result: {:?}", sync_result);
    sync_result
}

/// This is essentially a bag of information that the sync manager knows, but
/// otherwise we won't. It should probably be rethought if it gains many more
/// fields.
#[derive(Debug, Default)]
pub struct SyncRequestInfo<'a> {
    pub engines_to_state_change: Option<&'a HashMap<String, bool>>,
    pub is_user_action: bool,
}

// The sync multiple driver
struct SyncMultipleDriver<'info, 'res, 'pgs, 'mcs> {
    command_processor: Option<&'info dyn CommandProcessor>,
    stores: &'info [&'info dyn Store],
    storage_init: &'info Sync15StorageClientInit,
    root_sync_key: &'info KeyBundle,
    interruptee: &'info dyn Interruptee,
    backoff: BackoffListener,
    engines_to_state_change: Option<&'info HashMap<String, bool>>,
    result: &'res mut SyncResult,
    persisted_global_state: &'pgs mut Option<String>,
    mem_cached_state: &'mcs mut MemoryCachedState,
    ignore_soft_backoff: bool,
    saw_auth_error: bool,
}

impl<'info, 'res, 'pgs, 'mcs> SyncMultipleDriver<'info, 'res, 'pgs, 'mcs> {
    /// The actual worker for sync_multiple.
    fn sync(mut self) -> result::Result<(), Error> {
        log::info!("Loading/initializing persisted state");
        let mut pgs = self.prepare_persisted_state();

        log::info!("Preparing client info");
        let client_info = self.prepare_client_info()?;

        if self.was_interrupted() {
            return Ok(());
        }

        log::info!("Entering sync state machine");
        // Advance the state machine to the point where it can perform a full
        // sync. This may involve uploading meta/global, crypto/keys etc.
        let mut global_state = self.run_state_machine(&client_info, &mut pgs)?;

        if self.was_interrupted() {
            return Ok(());
        }

        // Set the service status to OK here - we may adjust it based on an individual
        // store failing.
        self.result.service_status = ServiceStatus::Ok;

        let clients_engine = if let Some(command_processor) = self.command_processor {
            log::info!("Synchronizing clients engine");
            let should_refresh = self.mem_cached_state.should_refresh_client();
            let mut engine = clients::Engine::new(command_processor, self.interruptee);
            if let Err(e) = engine.sync(
                &client_info.client,
                &global_state,
                &self.root_sync_key,
                should_refresh,
            ) {
                // Record telemetry with the error just in case...
                let mut telem_sync = telemetry::SyncTelemetry::new();
                let mut telem_engine = telemetry::Engine::new("clients");
                telem_engine.failure(&e);
                telem_sync.engine(telem_engine);
                self.result.service_status = ServiceStatus::from_err(&e);

                // ...And bail, because a clients engine sync failure is fatal.
                return Err(e);
            }
            // We don't record telemetry for successful clients engine
            // syncs, since we only keep client records in memory, we
            // expect the counts to be the same most times, and a
            // failure aborts the entire sync.
            if self.was_interrupted() {
                return Ok(());
            }
            self.mem_cached_state.note_client_refresh();
            Some(engine)
        } else {
            None
        };

        log::info!("Synchronizing stores");

        let telem_sync = self.sync_stores(&client_info, &mut global_state, clients_engine.as_ref());
        self.result.telemetry.sync(telem_sync);

        log::info!("Finished syncing stores.");

        if !self.saw_auth_error {
            log::trace!("Updating persisted global state");
            self.mem_cached_state.last_client_info = Some(client_info);
            self.mem_cached_state.last_global_state = Some(global_state);
        }

        Ok(())
    }

    fn was_interrupted(&mut self) -> bool {
        if self.interruptee.was_interrupted() {
            log::info!("Interrupted, bailing out");
            self.result.service_status = ServiceStatus::Interrupted;
            true
        } else {
            false
        }
    }

    fn sync_stores(
        &mut self,
        client_info: &ClientInfo,
        global_state: &mut GlobalState,
        clients: Option<&clients::Engine<'_>>,
    ) -> telemetry::SyncTelemetry {
        let mut telem_sync = telemetry::SyncTelemetry::new();
        for store in self.stores {
            let name = store.collection_name();
            if self
                .backoff
                .get_required_wait(self.ignore_soft_backoff)
                .is_some()
            {
                log::warn!("Got backoff, bailing out of sync early");
                break;
            }
            if global_state.global.declined.iter().any(|e| e == &*name) {
                log::info!("The {} engine is declined. Skipping", name);
                continue;
            }
            log::info!("Syncing {} engine!", name);

            let mut telem_engine = telemetry::Engine::new(&*name);
            let result = sync::synchronize_with_clients_engine(
                &client_info.client,
                &global_state,
                self.root_sync_key,
                clients,
                *store,
                true,
                &mut telem_engine,
                self.interruptee,
            );

            match result {
                Ok(()) => log::info!("Sync of {} was successful!", name),
                Err(ref e) => {
                    log::warn!("Sync of {} failed! {:?}", name, e);
                    let this_status = ServiceStatus::from_err(&e);
                    // The only error which forces us to discard our state is an
                    // auth error.
                    self.saw_auth_error =
                        self.saw_auth_error || this_status == ServiceStatus::AuthenticationError;
                    telem_engine.failure(e);
                    // If the failure from the store looks like anything other than
                    // a "store error" we don't bother trying the others.
                    if this_status != ServiceStatus::OtherError {
                        telem_sync.engine(telem_engine);
                        self.result.engine_results.insert(name.into(), result);
                        self.result.service_status = this_status;
                        break;
                    }
                }
            }
            telem_sync.engine(telem_engine);
            self.result.engine_results.insert(name.into(), result);
            if self.was_interrupted() {
                break;
            }
        }
        telem_sync
    }

    fn run_state_machine(
        &mut self,
        client_info: &ClientInfo,
        pgs: &mut PersistedGlobalState,
    ) -> result::Result<GlobalState, Error> {
        let last_state = mem::replace(&mut self.mem_cached_state.last_global_state, None);

        let mut state_machine = SetupStateMachine::for_full_sync(
            &client_info.client,
            &self.root_sync_key,
            pgs,
            self.engines_to_state_change,
            self.interruptee,
        );

        log::info!("Advancing state machine to ready (full)");
        let res = state_machine.run_to_ready(last_state);
        // Grab this now even though we don't need it until later to avoid a
        // lifetime issue
        let changes = state_machine.changes_needed.take();
        // The state machine might have updated our persisted_global_state, so
        // update the caller's repr of it.
        *self.persisted_global_state = Some(serde_json::to_string(&pgs)?);

        // Now that we've gone through the state machine, store the declined list in
        // the sync_result
        self.result.declined = Some(pgs.get_declined().to_vec());
        log::debug!(
            "Declined engines list after state machine set to: {:?}",
            self.result.declined,
        );

        if let Some(c) = changes {
            self.wipe_or_reset_engines(c, &client_info.client)?;
        }
        let state = match res {
            Err(e) => {
                self.result.service_status = ServiceStatus::from_err(&e);
                return Err(e);
            }
            Ok(state) => state,
        };
        self.result.telemetry.uid(client_info.client.hashed_uid()?);
        // As for client_info, put None back now so we start from scratch on error.
        self.mem_cached_state.last_global_state = None;
        Ok(state)
    }

    fn wipe_or_reset_engines(
        &mut self,
        changes: EngineChangesNeeded,
        client: &Sync15StorageClient,
    ) -> result::Result<(), Error> {
        if changes.local_resets.is_empty() && changes.remote_wipes.is_empty() {
            return Ok(());
        }
        for e in &changes.remote_wipes {
            log::info!("Engine {:?} just got disabled locally, wiping server", e);
            client.wipe_remote_engine(&e)?;
        }

        for s in self.stores {
            let name = s.collection_name();
            if changes.local_resets.contains(&*name) {
                log::info!("Resetting engine {}, as it was declined remotely", name);
                s.reset(&StoreSyncAssociation::Disconnected)?;
            }
        }

        Ok(())
    }

    fn prepare_client_info(&mut self) -> result::Result<ClientInfo, Error> {
        let mut client_info = match mem::replace(&mut self.mem_cached_state.last_client_info, None)
        {
            Some(client_info) => {
                // if our storage_init has changed it probably means the user has
                // changed, courtesy of the 'kid' in the structure. Thus, we can't
                // reuse the client or the memory cached state. We do keep the disk
                // state as currently that's only the declined list.
                if client_info.client_init != *self.storage_init {
                    log::info!("Discarding all state as the account might have changed");
                    *self.mem_cached_state = MemoryCachedState::default();
                    ClientInfo::new(self.storage_init)?
                } else {
                    log::debug!("Reusing memory-cached client_info");
                    // we can reuse it (which should be the common path)
                    client_info
                }
            }
            None => {
                log::debug!("mem_cached_state was stale or missing, need setup");
                // We almost certainly have no other state here, but to be safe, we
                // throw away any memory state we do have.
                self.mem_cached_state.clear_sensitive_info();
                ClientInfo::new(self.storage_init)?
            }
        };
        // Ensure we use the correct listener here rather than on all the branches
        // above, since it seems less error prone.
        client_info.client.backoff = self.backoff.clone();
        Ok(client_info)
    }

    fn prepare_persisted_state(&mut self) -> PersistedGlobalState {
        // Note that any failure to use a persisted state means we also decline
        // to use our memory cached state, so that we fully rebuild that
        // persisted state for next time.
        match self.persisted_global_state {
            Some(persisted_string) if !persisted_string.is_empty() => {
                match serde_json::from_str::<PersistedGlobalState>(&persisted_string) {
                    Ok(state) => {
                        log::trace!("Read persisted state: {:?}", state);
                        // Note that we don't set `result.declined` from the
                        // data in state - it remains None, which explicitly
                        // indicates "we don't have updated info".
                        state
                    }
                    _ => {
                        // Don't log the error since it might contain sensitive
                        // info (although currently it only contains the declined engines list)
                        log::error!(
                            "Failed to parse PersistedGlobalState from JSON! Falling back to default"
                        );
                        *self.mem_cached_state = MemoryCachedState::default();
                        PersistedGlobalState::default()
                    }
                }
            }
            _ => {
                log::info!(
                    "The application didn't give us persisted state - \
                     this is only expected on the very first run for a given user."
                );
                *self.mem_cached_state = MemoryCachedState::default();
                PersistedGlobalState::default()
            }
        }
    }
}