1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
use crate::bso_record::{CleartextBso, EncryptedBso};
use crate::client::{Sync15ClientResponse, Sync15StorageClient};
use crate::error::{self, ErrorKind, ErrorResponse, Result};
use crate::key_bundle::KeyBundle;
use crate::request::{CollectionRequest, NormalResponseHandler, UploadInfo};
use crate::util::ServerTimestamp;
use crate::CollState;
use std::borrow::Cow;
pub use sync15_traits::{IncomingChangeset, OutgoingChangeset, RecordChangeset};
pub fn encrypt_outgoing(o: OutgoingChangeset, key: &KeyBundle) -> Result<Vec<EncryptedBso>> {
let RecordChangeset {
changes,
collection,
..
} = o;
changes
.into_iter()
.map(|change| CleartextBso::from_payload(change, collection.clone()).encrypt(key))
.collect()
}
pub fn fetch_incoming(
client: &Sync15StorageClient,
state: &mut CollState,
collection_request: &CollectionRequest,
) -> Result<IncomingChangeset> {
let collection = collection_request.collection.clone();
let (records, timestamp) = match client.get_encrypted_records(collection_request)? {
Sync15ClientResponse::Success {
record,
last_modified,
..
} => (record, last_modified),
other => return Err(other.create_storage_error().into()),
};
state.last_modified = timestamp;
let mut result = IncomingChangeset::new(collection, timestamp);
result.changes.reserve(records.len());
for record in records {
let decrypted = record.decrypt(&state.key)?;
result.changes.push(decrypted.into_timestamped_payload());
}
Ok(result)
}
#[derive(Debug, Clone)]
pub struct CollectionUpdate<'a> {
client: &'a Sync15StorageClient,
state: &'a CollState,
collection: Cow<'static, str>,
xius: ServerTimestamp,
to_update: Vec<EncryptedBso>,
fully_atomic: bool,
}
impl<'a> CollectionUpdate<'a> {
pub fn new(
client: &'a Sync15StorageClient,
state: &'a CollState,
collection: Cow<'static, str>,
xius: ServerTimestamp,
records: Vec<EncryptedBso>,
fully_atomic: bool,
) -> CollectionUpdate<'a> {
CollectionUpdate {
client,
state,
collection,
xius,
to_update: records,
fully_atomic,
}
}
pub fn new_from_changeset(
client: &'a Sync15StorageClient,
state: &'a CollState,
changeset: OutgoingChangeset,
fully_atomic: bool,
) -> Result<CollectionUpdate<'a>> {
let collection = changeset.collection.clone();
let xius = changeset.timestamp;
if xius < state.last_modified {
return Err(
ErrorKind::StorageHttpError(ErrorResponse::PreconditionFailed {
route: collection.into_owned(),
})
.into(),
);
}
let to_update = crate::changeset::encrypt_outgoing(changeset, &state.key)?;
Ok(CollectionUpdate::new(
client,
state,
collection,
xius,
to_update,
fully_atomic,
))
}
pub fn upload(self) -> error::Result<UploadInfo> {
let mut failed = vec![];
let mut q = self.client.new_post_queue(
&self.collection,
&self.state.config,
self.xius,
NormalResponseHandler::new(!self.fully_atomic),
)?;
for record in self.to_update.into_iter() {
let enqueued = q.enqueue(&record)?;
if !enqueued && self.fully_atomic {
return Err(ErrorKind::RecordTooLargeError.into());
}
}
q.flush(true)?;
let mut info = q.completed_upload_info();
info.failed_ids.append(&mut failed);
if self.fully_atomic {
assert_eq!(
info.failed_ids.len(),
0,
"Bug: Should have failed by now if we aren't allowing dropped records"
);
}
Ok(info)
}
}