use interrupt_support::Interruptee;
use rusqlite::{
types::{Null, ToSql},
Connection, Row, Transaction,
};
use sql_support::ConnExt;
use sync15_traits::Payload;
use sync_guid::Guid as SyncGuid;
use crate::api::{StorageChanges, StorageValueChange};
use crate::error::*;
use super::{merge, remove_matching_keys, JsonMap, Record, RecordData};
#[derive(Debug, PartialEq)]
pub enum DataState {
Deleted,
Exists(JsonMap),
}
fn changes_for_new_incoming(new: &JsonMap) -> StorageChanges {
let mut result = StorageChanges::with_capacity(new.len());
for (key, val) in new.iter() {
result.push(StorageValueChange {
key: key.clone(),
old_value: None,
new_value: Some(val.clone()),
});
}
result
}
fn json_map_from_row(row: &Row<'_>, col: &str) -> Result<DataState> {
let s = row.get::<_, Option<String>>(col)?;
Ok(match s {
None => DataState::Deleted,
Some(s) => match serde_json::from_str(&s) {
Ok(serde_json::Value::Object(m)) => DataState::Exists(m),
_ => {
DataState::Deleted
}
},
})
}
pub fn stage_incoming(
tx: &Transaction<'_>,
incoming_payloads: Vec<Payload>,
signal: &dyn Interruptee,
) -> Result<()> {
let mut incoming_records = Vec::with_capacity(incoming_payloads.len());
for payload in incoming_payloads {
incoming_records.push(payload.into_record::<Record>()?);
}
sql_support::each_sized_chunk(
&incoming_records,
sql_support::default_max_variable_number() / 3,
|chunk, _| -> Result<()> {
let sql = format!(
"INSERT OR REPLACE INTO temp.storage_sync_staging
(guid, ext_id, data)
VALUES {}",
sql_support::repeat_multi_values(chunk.len(), 3)
);
let mut params = Vec::with_capacity(chunk.len() * 3);
for record in chunk {
signal.err_if_interrupted()?;
params.push(&record.guid as &dyn ToSql);
match &record.data {
RecordData::Data {
ref ext_id,
ref data,
} => {
params.push(ext_id);
params.push(data);
}
RecordData::Tombstone => {
params.push(&Null);
params.push(&Null);
}
}
}
tx.execute(&sql, ¶ms)?;
Ok(())
},
)?;
Ok(())
}
#[derive(Debug, PartialEq)]
pub enum IncomingState {
IncomingOnlyData { ext_id: String, data: JsonMap },
IncomingOnlyTombstone,
HasLocal {
ext_id: String,
incoming: DataState,
local: DataState,
},
NotLocal {
ext_id: String,
incoming: DataState,
mirror: DataState,
},
Everywhere {
ext_id: String,
incoming: DataState,
mirror: DataState,
local: DataState,
},
}
pub fn get_incoming(conn: &Connection) -> Result<Vec<(SyncGuid, IncomingState)>> {
let sql = "
SELECT
s.guid as guid,
l.ext_id as l_ext_id,
m.ext_id as m_ext_id,
s.ext_id as s_ext_id,
s.data as s_data, m.data as m_data, l.data as l_data,
l.sync_change_counter
FROM temp.storage_sync_staging s
LEFT JOIN storage_sync_mirror m ON m.guid = s.guid
LEFT JOIN storage_sync_data l on l.ext_id IN (m.ext_id, s.ext_id);";
fn from_row(row: &Row<'_>) -> Result<(SyncGuid, IncomingState)> {
let guid = row.get("guid")?;
let mirror_ext_id: Option<String> = row.get("m_ext_id")?;
let local_ext_id: Option<String> = row.get("l_ext_id")?;
let staged_ext_id: Option<String> = row.get("s_ext_id")?;
let incoming = json_map_from_row(row, "s_data")?;
let state = match (local_ext_id, mirror_ext_id) {
(None, None) => {
match staged_ext_id {
Some(ext_id) => {
let data = match incoming {
DataState::Deleted => JsonMap::new(),
DataState::Exists(data) => data,
};
IncomingState::IncomingOnlyData { ext_id, data }
}
None => IncomingState::IncomingOnlyTombstone,
}
}
(Some(ext_id), None) => IncomingState::HasLocal {
ext_id,
incoming,
local: json_map_from_row(row, "l_data")?,
},
(None, Some(ext_id)) => IncomingState::NotLocal {
ext_id,
incoming,
mirror: json_map_from_row(row, "m_data")?,
},
(Some(ext_id), Some(_)) => IncomingState::Everywhere {
ext_id,
incoming,
mirror: json_map_from_row(row, "m_data")?,
local: json_map_from_row(row, "l_data")?,
},
};
Ok((guid, state))
}
Ok(conn.conn().query_rows_and_then_named(sql, &[], from_row)?)
}
#[derive(Debug, PartialEq)]
pub enum IncomingAction {
DeleteLocally {
ext_id: String,
changes: StorageChanges,
},
TakeRemote {
ext_id: String,
data: JsonMap,
changes: StorageChanges,
},
Merge {
ext_id: String,
data: JsonMap,
changes: StorageChanges,
},
Same { ext_id: String },
Nothing,
}
pub fn plan_incoming(s: IncomingState) -> IncomingAction {
match s {
IncomingState::Everywhere {
ext_id,
incoming,
local,
mirror,
} => {
match (incoming, local, mirror) {
(
DataState::Exists(incoming_data),
DataState::Exists(local_data),
DataState::Exists(mirror_data),
) => {
merge(ext_id, incoming_data, local_data, Some(mirror_data))
}
(
DataState::Exists(incoming_data),
DataState::Exists(local_data),
DataState::Deleted,
) => {
merge(ext_id, incoming_data, local_data, None)
}
(DataState::Exists(incoming_data), DataState::Deleted, _) => {
IncomingAction::TakeRemote {
ext_id,
changes: changes_for_new_incoming(&incoming_data),
data: incoming_data,
}
}
(DataState::Deleted, DataState::Exists(local_data), DataState::Exists(mirror)) => {
let (result, changes) = remove_matching_keys(local_data, &mirror);
if result.is_empty() {
IncomingAction::DeleteLocally { ext_id, changes }
} else {
IncomingAction::Merge {
ext_id,
data: result,
changes,
}
}
}
(DataState::Deleted, DataState::Exists(local_data), DataState::Deleted) => {
IncomingAction::Merge {
ext_id,
data: local_data,
changes: StorageChanges::new(),
}
}
(DataState::Deleted, DataState::Deleted, _) => {
IncomingAction::Same { ext_id }
}
}
}
IncomingState::HasLocal {
ext_id,
incoming,
local,
} => {
match (incoming, local) {
(DataState::Exists(incoming_data), DataState::Exists(local_data)) => {
merge(ext_id, incoming_data, local_data, None)
}
(DataState::Deleted, DataState::Exists(local_data)) => {
IncomingAction::Merge {
ext_id,
data: local_data,
changes: StorageChanges::new(),
}
}
(DataState::Exists(incoming_data), DataState::Deleted) => {
IncomingAction::TakeRemote {
ext_id,
changes: changes_for_new_incoming(&incoming_data),
data: incoming_data,
}
}
(DataState::Deleted, DataState::Deleted) => {
IncomingAction::Same { ext_id }
}
}
}
IncomingState::NotLocal {
ext_id, incoming, ..
} => {
match incoming {
DataState::Exists(data) => IncomingAction::TakeRemote {
ext_id,
changes: changes_for_new_incoming(&data),
data,
},
DataState::Deleted => IncomingAction::Same { ext_id },
}
}
IncomingState::IncomingOnlyData { ext_id, data } => {
IncomingAction::TakeRemote {
ext_id,
changes: changes_for_new_incoming(&data),
data,
}
}
IncomingState::IncomingOnlyTombstone => {
IncomingAction::Nothing
}
}
}
fn insert_changes(tx: &Transaction<'_>, ext_id: &str, changes: &StorageChanges) -> Result<()> {
tx.execute_named_cached(
"INSERT INTO temp.storage_sync_applied (ext_id, changes)
VALUES (:ext_id, :changes)",
&[
(":ext_id", &ext_id),
(":changes", &serde_json::to_string(&changes)?),
],
)?;
Ok(())
}
pub fn apply_actions(
tx: &Transaction<'_>,
actions: Vec<(SyncGuid, IncomingAction)>,
signal: &dyn Interruptee,
) -> Result<()> {
for (item, action) in actions {
signal.err_if_interrupted()?;
log::trace!("action for '{:?}': {:?}", item, action);
match action {
IncomingAction::DeleteLocally { ext_id, changes } => {
tx.execute_named_cached(
"DELETE FROM storage_sync_data WHERE ext_id = :ext_id",
&[(":ext_id", &ext_id)],
)?;
insert_changes(tx, &ext_id, &changes)?;
}
IncomingAction::TakeRemote {
ext_id,
data,
changes,
} => {
tx.execute_named_cached(
"INSERT OR REPLACE INTO storage_sync_data(ext_id, data, sync_change_counter)
VALUES (:ext_id, :data, 0)",
&[
(":ext_id", &ext_id),
(":data", &serde_json::Value::Object(data)),
],
)?;
insert_changes(tx, &ext_id, &changes)?;
}
IncomingAction::Merge {
ext_id,
data,
changes,
} => {
tx.execute_named_cached(
"UPDATE storage_sync_data SET data = :data, sync_change_counter = sync_change_counter + 1 WHERE ext_id = :ext_id",
&[
(":ext_id", &ext_id),
(":data", &serde_json::Value::Object(data)),
]
)?;
insert_changes(tx, &ext_id, &changes)?;
}
IncomingAction::Same { ext_id } => {
tx.execute_named_cached(
"UPDATE storage_sync_data SET sync_change_counter = 0 WHERE ext_id = :ext_id",
&[(":ext_id", &ext_id)],
)?;
}
IncomingAction::Nothing => {}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::super::test::new_syncable_mem_db;
use super::*;
use crate::api;
use interrupt_support::NeverInterrupts;
use rusqlite::NO_PARAMS;
use serde_json::{json, Value};
use sync15_traits::Payload;
fn ssi(conn: &Connection, stmt: &str) -> u32 {
conn.try_query_one(stmt, &[], true)
.expect("must work")
.unwrap_or_default()
}
fn array_to_incoming(mut array: Value) -> Vec<Payload> {
let jv = array.as_array_mut().expect("you must pass a json array");
let mut result = Vec::with_capacity(jv.len());
for elt in jv {
result.push(Payload::from_json(elt.take()).expect("must be valid"));
}
result
}
macro_rules! map {
($($map:tt)+) => {
json!($($map)+).as_object().unwrap().clone()
};
}
macro_rules! change {
($key:literal, None, None) => {
StorageValueChange {
key: $key.to_string(),
old_value: None,
new_value: None,
};
};
($key:literal, $old:tt, None) => {
StorageValueChange {
key: $key.to_string(),
old_value: Some(json!($old)),
new_value: None,
};
};
($key:literal, None, $new:tt) => {
StorageValueChange {
key: $key.to_string(),
old_value: None,
new_value: Some(json!($new)),
};
};
($key:literal, $old:tt, $new:tt) => {
StorageValueChange {
key: $key.to_string(),
old_value: Some(json!($old)),
new_value: Some(json!($new)),
};
};
}
macro_rules! changes {
( $( $change:expr ),* ) => {
{
let mut changes = StorageChanges::new();
$(
changes.push($change);
)*
changes
}
};
}
#[test]
fn test_incoming_populates_staging() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let incoming = json! {[
{
"id": "guidAAAAAAAA",
"extId": "ext1@example.com",
"data": json!({"foo": "bar"}).to_string(),
}
]};
stage_incoming(&tx, array_to_incoming(incoming), &NeverInterrupts)?;
assert_eq!(
ssi(&tx, "SELECT count(*) FROM temp.storage_sync_staging"),
1
);
Ok(())
}
#[test]
fn test_fetch_incoming_state() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
tx.execute(
r#"
INSERT INTO temp.storage_sync_staging (guid, ext_id, data)
VALUES ('guid', 'ext_id', '{"foo":"bar"}')
"#,
NO_PARAMS,
)?;
let incoming = get_incoming(&tx)?;
assert_eq!(incoming.len(), 1);
assert_eq!(incoming[0].0, SyncGuid::new("guid"),);
assert_eq!(
incoming[0].1,
IncomingState::IncomingOnlyData {
ext_id: "ext_id".to_string(),
data: map!({"foo": "bar"}),
}
);
tx.execute(
r#"
INSERT INTO storage_sync_mirror (guid, ext_id, data)
VALUES ('guid', 'ext_id', '{"foo":"new"}')
"#,
NO_PARAMS,
)?;
let incoming = get_incoming(&tx)?;
assert_eq!(incoming.len(), 1);
assert_eq!(
incoming[0].1,
IncomingState::NotLocal {
ext_id: "ext_id".to_string(),
incoming: DataState::Exists(map!({"foo": "bar"})),
mirror: DataState::Exists(map!({"foo": "new"})),
}
);
api::set(&tx, "ext_id", json!({"foo": "local"}))?;
let incoming = get_incoming(&tx)?;
assert_eq!(incoming.len(), 1);
assert_eq!(
incoming[0].1,
IncomingState::Everywhere {
ext_id: "ext_id".to_string(),
incoming: DataState::Exists(map!({"foo": "bar"})),
local: DataState::Exists(map!({"foo": "local"})),
mirror: DataState::Exists(map!({"foo": "new"})),
}
);
Ok(())
}
#[test]
fn test_fetch_incoming_state_nulls() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
tx.execute(
r#"
INSERT INTO temp.storage_sync_staging (guid, ext_id, data)
VALUES ('guid', NULL, NULL)
"#,
NO_PARAMS,
)?;
let incoming = get_incoming(&tx)?;
assert_eq!(incoming.len(), 1);
assert_eq!(incoming[0].1, IncomingState::IncomingOnlyTombstone,);
tx.execute(
r#"
INSERT INTO storage_sync_mirror (guid, ext_id, data)
VALUES ('guid', NULL, NULL)
"#,
NO_PARAMS,
)?;
let incoming = get_incoming(&tx)?;
assert_eq!(incoming.len(), 1);
assert_eq!(incoming[0].1, IncomingState::IncomingOnlyTombstone);
tx.execute(
r#"
INSERT INTO storage_sync_data (ext_id, data)
VALUES ('ext_id', NULL)
"#,
NO_PARAMS,
)?;
let incoming = get_incoming(&tx)?;
assert_eq!(incoming.len(), 1);
assert_eq!(
incoming[0].1,
IncomingState::IncomingOnlyTombstone
);
Ok(())
}
#[derive(Debug, PartialEq)]
struct LocalItem {
data: DataState,
sync_change_counter: i32,
}
fn get_local_item(conn: &Connection) -> Option<LocalItem> {
conn.try_query_row::<_, Error, _>(
"SELECT data, sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext_id'",
&[],
|row| {
let data = json_map_from_row(row, "data")?;
let sync_change_counter = row.get::<_, i32>(1)?;
Ok(LocalItem {
data,
sync_change_counter,
})
},
true,
)
.expect("query should work")
}
fn get_applied_item_changes(conn: &Connection) -> Option<StorageChanges> {
conn.try_query_row::<_, Error, _>(
"SELECT changes FROM temp.storage_sync_applied WHERE ext_id = 'ext_id'",
&[],
|row| Ok(serde_json::from_str(&row.get::<_, String>("changes")?)?),
true,
)
.expect("query should work")
.map(|val: serde_json::Value| {
let ob = val.as_object().expect("should be an object of items");
let mut result = StorageChanges::with_capacity(ob.len());
for (key, val) in ob.into_iter() {
let details = val.as_object().expect("elts should be objects");
result.push(StorageValueChange {
key: key.to_string(),
old_value: details.get("oldValue").cloned(),
new_value: details.get("newValue").cloned(),
});
}
result
})
}
fn do_apply_action(tx: &Transaction<'_>, action: IncomingAction) {
let guid = SyncGuid::new("guid");
apply_actions(tx, vec![(guid, action)], &NeverInterrupts).expect("should apply");
}
#[test]
fn test_apply_actions() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("transaction should work");
api::set(&tx, "ext_id", json!({"foo": "local"}))?;
assert_eq!(
api::get(&tx, "ext_id", json!(null))?,
json!({"foo": "local"})
);
let changes = changes![change!("foo", "local", None)];
do_apply_action(
&tx,
IncomingAction::DeleteLocally {
ext_id: "ext_id".to_string(),
changes: changes.clone(),
},
);
assert_eq!(api::get(&tx, "ext_id", json!(null))?, json!({}));
assert!(get_local_item(&tx).is_none());
assert_eq!(get_applied_item_changes(&tx), Some(changes));
tx.rollback()?;
let tx = db.transaction().expect("transaction should work");
api::set(&tx, "ext_id", json!({"foo": "local"}))?;
assert_eq!(
api::get(&tx, "ext_id", json!(null))?,
json!({"foo": "local"})
);
assert_eq!(
get_local_item(&tx),
Some(LocalItem {
data: DataState::Exists(map!({"foo": "local"})),
sync_change_counter: 1
})
);
let changes = changes![change!("foo", "local", "remote")];
do_apply_action(
&tx,
IncomingAction::TakeRemote {
ext_id: "ext_id".to_string(),
data: map!({"foo": "remote"}),
changes: changes.clone(),
},
);
assert_eq!(
get_local_item(&tx),
Some(LocalItem {
data: DataState::Exists(map!({"foo": "remote"})),
sync_change_counter: 0
})
);
assert_eq!(get_applied_item_changes(&tx), Some(changes));
tx.rollback()?;
let tx = db.transaction().expect("transaction should work");
api::set(&tx, "ext_id", json!({"foo": "local"}))?;
assert_eq!(
api::get(&tx, "ext_id", json!(null))?,
json!({"foo": "local"})
);
assert_eq!(
get_local_item(&tx),
Some(LocalItem {
data: DataState::Exists(map!({"foo": "local"})),
sync_change_counter: 1
})
);
let changes = changes![change!("foo", "local", "remote")];
do_apply_action(
&tx,
IncomingAction::Merge {
ext_id: "ext_id".to_string(),
data: map!({"foo": "remote"}),
changes: changes.clone(),
},
);
assert_eq!(
get_local_item(&tx),
Some(LocalItem {
data: DataState::Exists(map!({"foo": "remote"})),
sync_change_counter: 2
})
);
assert_eq!(get_applied_item_changes(&tx), Some(changes));
tx.rollback()?;
let tx = db.transaction().expect("transaction should work");
api::set(&tx, "ext_id", json!({"foo": "local"}))?;
assert_eq!(
api::get(&tx, "ext_id", json!(null))?,
json!({"foo": "local"})
);
assert_eq!(
get_local_item(&tx),
Some(LocalItem {
data: DataState::Exists(map!({"foo": "local"})),
sync_change_counter: 1
})
);
do_apply_action(
&tx,
IncomingAction::Same {
ext_id: "ext_id".to_string(),
},
);
assert_eq!(
get_local_item(&tx),
Some(LocalItem {
data: DataState::Exists(map!({"foo": "local"})),
sync_change_counter: 0
})
);
assert_eq!(get_applied_item_changes(&tx), None);
tx.rollback()?;
Ok(())
}
}