1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ use std::{error::Error, fmt}; use serde::{Deserialize, Serialize}; use super::{Guid, Payload, ServerTimestamp}; /// A bridged Sync engine implements all the methods needed to support /// Desktop Sync. pub trait BridgedEngine { /// The type returned for errors. type Error; /// Returns the last sync time, in milliseconds, for this engine's /// collection. This is called before each sync, to determine the lower /// bound for new records to fetch from the server. fn last_sync(&self) -> Result<i64, Self::Error>; /// Sets the last sync time, in milliseconds. This is called throughout /// the sync, to fast-forward the stored last sync time to match the /// timestamp on the uploaded records. fn set_last_sync(&self, last_sync_millis: i64) -> Result<(), Self::Error>; /// Returns the sync ID for this engine's collection. This is only used in /// tests. fn sync_id(&self) -> Result<Option<String>, Self::Error>; /// Resets the sync ID for this engine's collection, returning the new ID. /// As a side effect, implementations should reset all local Sync state, /// as in `reset`. fn reset_sync_id(&self) -> Result<String, Self::Error>; /// Ensures that the locally stored sync ID for this engine's collection /// matches the `new_sync_id` from the server. If the two don't match, /// implementations should reset all local Sync state, as in `reset`. /// This method returns the assigned sync ID, which can be either the /// `new_sync_id`, or a different one if the engine wants to force other /// devices to reset their Sync state for this collection the next time they /// sync. fn ensure_current_sync_id(&self, new_sync_id: &str) -> Result<String, Self::Error>; /// Indicates that the engine is about to start syncing. This is called /// once per sync, and always before `store_incoming`. fn sync_started(&self) -> Result<(), Self::Error>; /// Stages a batch of incoming Sync records. This is called multiple /// times per sync, once for each batch. Implementations can use the /// signal to check if the operation was aborted, and cancel any /// pending work. fn store_incoming(&self, incoming_cleartexts: &[IncomingEnvelope]) -> Result<(), Self::Error>; /// Applies all staged records, reconciling changes on both sides and /// resolving conflicts. Returns a list of records to upload. fn apply(&self) -> Result<ApplyResults, Self::Error>; /// Indicates that the given record IDs were uploaded successfully to the /// server. This is called multiple times per sync, once for each batch /// upload. fn set_uploaded(&self, server_modified_millis: i64, ids: &[Guid]) -> Result<(), Self::Error>; /// Indicates that all records have been uploaded. At this point, any record /// IDs marked for upload that haven't been passed to `set_uploaded`, can be /// assumed to have failed: for example, because the server rejected a record /// with an invalid TTL or sort index. fn sync_finished(&self) -> Result<(), Self::Error>; /// Resets all local Sync state, including any change flags, mirrors, and /// the last sync time, such that the next sync is treated as a first sync /// with all new local data. Does not erase any local user data. fn reset(&self) -> Result<(), Self::Error>; /// Erases all local user data for this collection, and any Sync metadata. /// This method is destructive, and unused for most collections. fn wipe(&self) -> Result<(), Self::Error>; } #[derive(Clone, Debug, Default)] pub struct ApplyResults { /// List of records pub envelopes: Vec<OutgoingEnvelope>, /// The number of incoming records whose contents were merged because they /// changed on both sides. None indicates we aren't reporting this /// information. pub num_reconciled: Option<usize>, } impl ApplyResults { pub fn new(envelopes: Vec<OutgoingEnvelope>, num_reconciled: impl Into<Option<usize>>) -> Self { Self { envelopes, num_reconciled: num_reconciled.into(), } } } // Shorthand for engines that don't care. impl From<Vec<OutgoingEnvelope>> for ApplyResults { fn from(envelopes: Vec<OutgoingEnvelope>) -> Self { Self { envelopes, num_reconciled: None, } } } /// An envelope for an incoming item, passed to `BridgedEngine::store_incoming`. /// Envelopes are a halfway point between BSOs, the format used for all items on /// the Sync server, and records, which are specific to each engine. /// /// A BSO is a JSON object with metadata fields (`id`, `modifed`, `sortindex`), /// and a BSO payload that is itself a JSON string. For encrypted records, the /// BSO payload has a ciphertext, which must be decrypted to yield a cleartext. /// The cleartext is a JSON string (that's three levels of JSON wrapping, if /// you're keeping score: the BSO itself, BSO payload, and cleartext) with the /// actual record payload. /// /// An envelope combines the metadata fields from the BSO, and the cleartext /// from the encrypted BSO payload. #[derive(Clone, Debug, Deserialize)] pub struct IncomingEnvelope { pub id: Guid, pub modified: ServerTimestamp, #[serde(default)] pub sortindex: Option<i32>, #[serde(default)] pub ttl: Option<u32>, // Don't provide access to the cleartext directly. We want all callers to // use `IncomingEnvelope::payload`, so that we can validate the cleartext. cleartext: String, } impl IncomingEnvelope { /// Parses and returns the record payload from this envelope. Returns an /// error if the envelope's cleartext isn't valid JSON, or the payload is /// invalid. pub fn payload(&self) -> Result<Payload, PayloadError> { let payload: Payload = serde_json::from_str(&self.cleartext)?; if payload.id != self.id { return Err(PayloadError::MismatchedId { envelope: self.id.clone(), payload: payload.id, }); } // Remove auto field data from payload and replace with real data Ok(payload .with_auto_field("ttl", self.ttl) .with_auto_field("sortindex", self.sortindex)) } } /// An envelope for an outgoing item, returned from `BridgedEngine::apply`. This /// is similar to `IncomingEnvelope`, but omits fields that are only set by the /// server, like `modified`. #[derive(Clone, Debug, Serialize)] pub struct OutgoingEnvelope { id: Guid, cleartext: String, sortindex: Option<i32>, ttl: Option<u32>, } impl From<Payload> for OutgoingEnvelope { fn from(mut payload: Payload) -> Self { let id = payload.id.clone(); // Remove auto field data from OutgoingEnvelope payload let ttl = payload.take_auto_field("ttl"); let sortindex = payload.take_auto_field("sortindex"); OutgoingEnvelope { id, cleartext: payload.into_json_string(), sortindex, ttl, } } } /// An error that indicates a payload is invalid. #[derive(Debug)] pub enum PayloadError { /// The payload contains invalid JSON. Invalid(serde_json::Error), /// The ID of the BSO in the envelope doesn't match the ID in the payload. MismatchedId { envelope: Guid, payload: Guid }, } impl Error for PayloadError {} impl From<serde_json::Error> for PayloadError { fn from(err: serde_json::Error) -> PayloadError { PayloadError::Invalid(err) } } impl fmt::Display for PayloadError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { PayloadError::Invalid(err) => err.fmt(f), PayloadError::MismatchedId { envelope, payload } => write!( f, "ID `{}` in envelope doesn't match `{}` in payload", envelope, payload ), } } }