1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
use interrupt_support::Interruptee;
use rusqlite::{Connection, Row, Transaction};
use sql_support::ConnExt;
use sync15_traits::Payload;
use sync_guid::Guid as SyncGuid;
use crate::error::*;
use super::{Record, RecordData};
fn outgoing_from_row(row: &Row<'_>) -> Result<Payload> {
let guid: SyncGuid = row.get("guid")?;
let ext_id: String = row.get("ext_id")?;
let raw_data: Option<String> = row.get("data")?;
let payload = match raw_data {
Some(raw_data) => Payload::from_record(Record {
guid,
data: RecordData::Data {
ext_id,
data: raw_data,
},
})?,
None => Payload::new_tombstone(guid),
};
Ok(payload)
}
pub fn stage_outgoing(tx: &Transaction<'_>) -> Result<()> {
let sql = "
-- Stage outgoing items. The item may not yet have a GUID (ie, it might
-- not already be in either the mirror nor the incoming staging table),
-- so we generate one if it doesn't exist.
INSERT INTO storage_sync_outgoing_staging
(guid, ext_id, data, sync_change_counter)
SELECT coalesce(m.guid, s.guid, generate_guid()),
l.ext_id, l.data, l.sync_change_counter
FROM storage_sync_data l
-- left joins as one or both may not exist.
LEFT JOIN storage_sync_mirror m ON m.ext_id = l.ext_id
LEFT JOIN storage_sync_staging s ON s.ext_id = l.ext_id
WHERE sync_change_counter > 0;
-- At this point, we've merged in all new records, so copy incoming
-- staging into the mirror so that it matches what's on the server.
INSERT OR REPLACE INTO storage_sync_mirror (guid, ext_id, data)
SELECT guid, ext_id, data FROM temp.storage_sync_staging;
-- And copy any incoming records that we aren't reuploading into the
-- local table. We'll copy the outgoing ones into the mirror and local
-- after we upload them.
INSERT OR REPLACE INTO storage_sync_data (ext_id, data, sync_change_counter)
SELECT ext_id, data, 0
FROM storage_sync_staging s
WHERE ext_id IS NOT NULL
AND NOT EXISTS(SELECT 1 FROM storage_sync_outgoing_staging o
WHERE o.guid = s.guid);";
tx.execute_batch(sql)?;
Ok(())
}
pub fn get_outgoing(conn: &Connection, signal: &dyn Interruptee) -> Result<Vec<Payload>> {
let sql = "SELECT guid, ext_id, data FROM storage_sync_outgoing_staging";
let elts = conn
.conn()
.query_rows_and_then_named(sql, &[], |row| -> Result<_> {
signal.err_if_interrupted()?;
Ok(outgoing_from_row(row)?)
})?;
log::debug!("get_outgoing found {} items", elts.len());
Ok(elts.into_iter().collect())
}
pub fn record_uploaded(
tx: &Transaction<'_>,
items: &[SyncGuid],
signal: &dyn Interruptee,
) -> Result<()> {
log::debug!(
"record_uploaded recording that {} items were uploaded",
items.len()
);
sql_support::each_chunk(&items, |chunk, _| -> Result<()> {
signal.err_if_interrupted()?;
let sql = format!(
"UPDATE storage_sync_outgoing_staging SET
was_uploaded = 1
WHERE guid IN ({})",
sql_support::repeat_sql_vars(chunk.len()),
);
tx.execute(&sql, chunk)?;
Ok(())
})?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::super::test::new_syncable_mem_db;
use super::*;
use interrupt_support::NeverInterrupts;
#[test]
fn test_simple() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
tx.execute_batch(
r#"
INSERT INTO storage_sync_data (ext_id, data, sync_change_counter)
VALUES
('ext_no_changes', '{"foo":"bar"}', 0),
('ext_with_changes', '{"foo":"bar"}', 1);
"#,
)?;
stage_outgoing(&tx)?;
let changes = get_outgoing(&tx, &NeverInterrupts)?;
assert_eq!(changes.len(), 1);
assert_eq!(changes[0].data["extId"], "ext_with_changes".to_string());
record_uploaded(
&tx,
changes
.into_iter()
.map(|p| p.id)
.collect::<Vec<SyncGuid>>()
.as_slice(),
&NeverInterrupts,
)?;
let counter: i32 = tx.conn().query_one(
"SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext_with_changes'",
)?;
assert_eq!(counter, 0);
Ok(())
}
}