use interrupt_support::Interruptee;
use rusqlite::{
    types::{Null, ToSql},
    Connection, Row, Transaction,
};
use sql_support::ConnExt;
use sync15_traits::Payload;
use sync_guid::Guid as SyncGuid;
use crate::api::{StorageChanges, StorageValueChange};
use crate::error::*;
use super::{merge, remove_matching_keys, JsonMap, Record, RecordData};
#[derive(Debug, PartialEq)]
pub enum DataState {
    
    Deleted,
    
    Exists(JsonMap),
}
fn changes_for_new_incoming(new: &JsonMap) -> StorageChanges {
    let mut result = StorageChanges::with_capacity(new.len());
    for (key, val) in new.iter() {
        result.push(StorageValueChange {
            key: key.clone(),
            old_value: None,
            new_value: Some(val.clone()),
        });
    }
    result
}
fn json_map_from_row(row: &Row<'_>, col: &str) -> Result<DataState> {
    let s = row.get::<_, Option<String>>(col)?;
    Ok(match s {
        None => DataState::Deleted,
        Some(s) => match serde_json::from_str(&s) {
            Ok(serde_json::Value::Object(m)) => DataState::Exists(m),
            _ => {
                
                
                
                
                
                
                DataState::Deleted
            }
        },
    })
}
pub fn stage_incoming(
    tx: &Transaction<'_>,
    incoming_payloads: Vec<Payload>,
    signal: &dyn Interruptee,
) -> Result<()> {
    let mut incoming_records = Vec::with_capacity(incoming_payloads.len());
    for payload in incoming_payloads {
        incoming_records.push(payload.into_record::<Record>()?);
    }
    sql_support::each_sized_chunk(
        &incoming_records,
        
        sql_support::default_max_variable_number() / 3,
        |chunk, _| -> Result<()> {
            let sql = format!(
                "INSERT OR REPLACE INTO temp.storage_sync_staging
                (guid, ext_id, data)
                VALUES {}",
                sql_support::repeat_multi_values(chunk.len(), 3)
            );
            let mut params = Vec::with_capacity(chunk.len() * 3);
            for record in chunk {
                signal.err_if_interrupted()?;
                params.push(&record.guid as &dyn ToSql);
                match &record.data {
                    RecordData::Data {
                        ref ext_id,
                        ref data,
                    } => {
                        params.push(ext_id);
                        params.push(data);
                    }
                    RecordData::Tombstone => {
                        params.push(&Null);
                        params.push(&Null);
                    }
                }
            }
            tx.execute(&sql, ¶ms)?;
            Ok(())
        },
    )?;
    Ok(())
}
#[derive(Debug, PartialEq)]
pub enum IncomingState {
    
    
    
    IncomingOnlyData { ext_id: String, data: JsonMap },
    
    
    
    IncomingOnlyTombstone,
    
    
    
    
    
    
    
    HasLocal {
        ext_id: String,
        incoming: DataState,
        local: DataState,
    },
    
    
    
    NotLocal {
        ext_id: String,
        incoming: DataState,
        mirror: DataState,
    },
    
    
    Everywhere {
        ext_id: String,
        incoming: DataState,
        mirror: DataState,
        local: DataState,
    },
}
pub fn get_incoming(conn: &Connection) -> Result<Vec<(SyncGuid, IncomingState)>> {
    let sql = "
        SELECT
            s.guid as guid,
            l.ext_id as l_ext_id,
            m.ext_id as m_ext_id,
            s.ext_id as s_ext_id,
            s.data as s_data, m.data as m_data, l.data as l_data,
            l.sync_change_counter
        FROM temp.storage_sync_staging s
        LEFT JOIN storage_sync_mirror m ON m.guid = s.guid
        LEFT JOIN storage_sync_data l on l.ext_id IN (m.ext_id, s.ext_id);";
    fn from_row(row: &Row<'_>) -> Result<(SyncGuid, IncomingState)> {
        let guid = row.get("guid")?;
        
        
        let mirror_ext_id: Option<String> = row.get("m_ext_id")?;
        let local_ext_id: Option<String> = row.get("l_ext_id")?;
        let staged_ext_id: Option<String> = row.get("s_ext_id")?;
        let incoming = json_map_from_row(row, "s_data")?;
        
        
        let state = match (local_ext_id, mirror_ext_id) {
            (None, None) => {
                match staged_ext_id {
                    Some(ext_id) => {
                        let data = match incoming {
                            
                            
                            
                            DataState::Deleted => JsonMap::new(),
                            DataState::Exists(data) => data,
                        };
                        IncomingState::IncomingOnlyData { ext_id, data }
                    }
                    None => IncomingState::IncomingOnlyTombstone,
                }
            }
            (Some(ext_id), None) => IncomingState::HasLocal {
                ext_id,
                incoming,
                local: json_map_from_row(row, "l_data")?,
            },
            (None, Some(ext_id)) => IncomingState::NotLocal {
                ext_id,
                incoming,
                mirror: json_map_from_row(row, "m_data")?,
            },
            (Some(ext_id), Some(_)) => IncomingState::Everywhere {
                ext_id,
                incoming,
                mirror: json_map_from_row(row, "m_data")?,
                local: json_map_from_row(row, "l_data")?,
            },
        };
        Ok((guid, state))
    }
    Ok(conn.conn().query_rows_and_then_named(sql, &[], from_row)?)
}
#[derive(Debug, PartialEq)]
pub enum IncomingAction {
    
    DeleteLocally {
        ext_id: String,
        changes: StorageChanges,
    },
    
    TakeRemote {
        ext_id: String,
        data: JsonMap,
        changes: StorageChanges,
    },
    
    Merge {
        ext_id: String,
        data: JsonMap,
        changes: StorageChanges,
    },
    
    Same { ext_id: String },
    
    Nothing,
}
pub fn plan_incoming(s: IncomingState) -> IncomingAction {
    match s {
        IncomingState::Everywhere {
            ext_id,
            incoming,
            local,
            mirror,
        } => {
            
            match (incoming, local, mirror) {
                (
                    DataState::Exists(incoming_data),
                    DataState::Exists(local_data),
                    DataState::Exists(mirror_data),
                ) => {
                    
                    merge(ext_id, incoming_data, local_data, Some(mirror_data))
                }
                (
                    DataState::Exists(incoming_data),
                    DataState::Exists(local_data),
                    DataState::Deleted,
                ) => {
                    
                    merge(ext_id, incoming_data, local_data, None)
                }
                (DataState::Exists(incoming_data), DataState::Deleted, _) => {
                    
                    IncomingAction::TakeRemote {
                        ext_id,
                        changes: changes_for_new_incoming(&incoming_data),
                        data: incoming_data,
                    }
                }
                (DataState::Deleted, DataState::Exists(local_data), DataState::Exists(mirror)) => {
                    
                    
                    
                    let (result, changes) = remove_matching_keys(local_data, &mirror);
                    if result.is_empty() {
                        
                        
                        IncomingAction::DeleteLocally { ext_id, changes }
                    } else {
                        IncomingAction::Merge {
                            ext_id,
                            data: result,
                            changes,
                        }
                    }
                }
                (DataState::Deleted, DataState::Exists(local_data), DataState::Deleted) => {
                    
                    
                    
                    
                    
                    
                    IncomingAction::Merge {
                        ext_id,
                        data: local_data,
                        changes: StorageChanges::new(),
                    }
                }
                (DataState::Deleted, DataState::Deleted, _) => {
                    
                    
                    IncomingAction::Same { ext_id }
                }
            }
        }
        IncomingState::HasLocal {
            ext_id,
            incoming,
            local,
        } => {
            
            
            
            match (incoming, local) {
                (DataState::Exists(incoming_data), DataState::Exists(local_data)) => {
                    
                    
                    
                    merge(ext_id, incoming_data, local_data, None)
                }
                (DataState::Deleted, DataState::Exists(local_data)) => {
                    
                    
                    
                    
                    IncomingAction::Merge {
                        ext_id,
                        data: local_data,
                        changes: StorageChanges::new(),
                    }
                }
                (DataState::Exists(incoming_data), DataState::Deleted) => {
                    
                    IncomingAction::TakeRemote {
                        ext_id,
                        changes: changes_for_new_incoming(&incoming_data),
                        data: incoming_data,
                    }
                }
                (DataState::Deleted, DataState::Deleted) => {
                    
                    IncomingAction::Same { ext_id }
                }
            }
        }
        IncomingState::NotLocal {
            ext_id, incoming, ..
        } => {
            
            
            
            match incoming {
                DataState::Exists(data) => IncomingAction::TakeRemote {
                    ext_id,
                    changes: changes_for_new_incoming(&data),
                    data,
                },
                DataState::Deleted => IncomingAction::Same { ext_id },
            }
        }
        IncomingState::IncomingOnlyData { ext_id, data } => {
            
            
            
            IncomingAction::TakeRemote {
                ext_id,
                changes: changes_for_new_incoming(&data),
                data,
            }
        }
        IncomingState::IncomingOnlyTombstone => {
            
            IncomingAction::Nothing
        }
    }
}
fn insert_changes(tx: &Transaction<'_>, ext_id: &str, changes: &StorageChanges) -> Result<()> {
    tx.execute_named_cached(
        "INSERT INTO temp.storage_sync_applied (ext_id, changes)
            VALUES (:ext_id, :changes)",
        &[
            (":ext_id", &ext_id),
            (":changes", &serde_json::to_string(&changes)?),
        ],
    )?;
    Ok(())
}
pub fn apply_actions(
    tx: &Transaction<'_>,
    actions: Vec<(SyncGuid, IncomingAction)>,
    signal: &dyn Interruptee,
) -> Result<()> {
    for (item, action) in actions {
        signal.err_if_interrupted()?;
        log::trace!("action for '{:?}': {:?}", item, action);
        match action {
            IncomingAction::DeleteLocally { ext_id, changes } => {
                
                tx.execute_named_cached(
                    "DELETE FROM storage_sync_data WHERE ext_id = :ext_id",
                    &[(":ext_id", &ext_id)],
                )?;
                insert_changes(tx, &ext_id, &changes)?;
            }
            
            IncomingAction::TakeRemote {
                ext_id,
                data,
                changes,
            } => {
                tx.execute_named_cached(
                    "INSERT OR REPLACE INTO storage_sync_data(ext_id, data, sync_change_counter)
                        VALUES (:ext_id, :data, 0)",
                    &[
                        (":ext_id", &ext_id),
                        (":data", &serde_json::Value::Object(data)),
                    ],
                )?;
                insert_changes(tx, &ext_id, &changes)?;
            }
            
            
            IncomingAction::Merge {
                ext_id,
                data,
                changes,
            } => {
                tx.execute_named_cached(
                    "UPDATE storage_sync_data SET data = :data, sync_change_counter = sync_change_counter + 1 WHERE ext_id = :ext_id",
                    &[
                        (":ext_id", &ext_id),
                        (":data", &serde_json::Value::Object(data)),
                    ]
                )?;
                insert_changes(tx, &ext_id, &changes)?;
            }
            
            
            IncomingAction::Same { ext_id } => {
                tx.execute_named_cached(
                    "UPDATE storage_sync_data SET sync_change_counter = 0 WHERE ext_id = :ext_id",
                    &[(":ext_id", &ext_id)],
                )?;
                
            }
            
            IncomingAction::Nothing => {}
        }
    }
    Ok(())
}
#[cfg(test)]
mod tests {
    use super::super::test::new_syncable_mem_db;
    use super::*;
    use crate::api;
    use interrupt_support::NeverInterrupts;
    use rusqlite::NO_PARAMS;
    use serde_json::{json, Value};
    use sync15_traits::Payload;
    
    fn ssi(conn: &Connection, stmt: &str) -> u32 {
        conn.try_query_one(stmt, &[], true)
            .expect("must work")
            .unwrap_or_default()
    }
    fn array_to_incoming(mut array: Value) -> Vec<Payload> {
        let jv = array.as_array_mut().expect("you must pass a json array");
        let mut result = Vec::with_capacity(jv.len());
        for elt in jv {
            result.push(Payload::from_json(elt.take()).expect("must be valid"));
        }
        result
    }
    
    macro_rules! map {
        ($($map:tt)+) => {
            json!($($map)+).as_object().unwrap().clone()
        };
    }
    macro_rules! change {
        ($key:literal, None, None) => {
            StorageValueChange {
                key: $key.to_string(),
                old_value: None,
                new_value: None,
            };
        };
        ($key:literal, $old:tt, None) => {
            StorageValueChange {
                key: $key.to_string(),
                old_value: Some(json!($old)),
                new_value: None,
            };
        };
        ($key:literal, None, $new:tt) => {
            StorageValueChange {
                key: $key.to_string(),
                old_value: None,
                new_value: Some(json!($new)),
            };
        };
        ($key:literal, $old:tt, $new:tt) => {
            StorageValueChange {
                key: $key.to_string(),
                old_value: Some(json!($old)),
                new_value: Some(json!($new)),
            };
        };
    }
    macro_rules! changes {
        ( $( $change:expr ),* ) => {
            {
                let mut changes = StorageChanges::new();
                $(
                    changes.push($change);
                )*
                changes
            }
        };
    }
    #[test]
    fn test_incoming_populates_staging() -> Result<()> {
        let mut db = new_syncable_mem_db();
        let tx = db.transaction()?;
        let incoming = json! {[
            {
                "id": "guidAAAAAAAA",
                "extId": "ext1@example.com",
                "data": json!({"foo": "bar"}).to_string(),
            }
        ]};
        stage_incoming(&tx, array_to_incoming(incoming), &NeverInterrupts)?;
        
        assert_eq!(
            ssi(&tx, "SELECT count(*) FROM temp.storage_sync_staging"),
            1
        );
        Ok(())
    }
    #[test]
    fn test_fetch_incoming_state() -> Result<()> {
        let mut db = new_syncable_mem_db();
        let tx = db.transaction()?;
        
        tx.execute(
            r#"
            INSERT INTO temp.storage_sync_staging (guid, ext_id, data)
            VALUES ('guid', 'ext_id', '{"foo":"bar"}')
        "#,
            NO_PARAMS,
        )?;
        let incoming = get_incoming(&tx)?;
        assert_eq!(incoming.len(), 1);
        assert_eq!(incoming[0].0, SyncGuid::new("guid"),);
        assert_eq!(
            incoming[0].1,
            IncomingState::IncomingOnlyData {
                ext_id: "ext_id".to_string(),
                data: map!({"foo": "bar"}),
            }
        );
        
        tx.execute(
            r#"
            INSERT INTO storage_sync_mirror (guid, ext_id, data)
            VALUES ('guid', 'ext_id', '{"foo":"new"}')
        "#,
            NO_PARAMS,
        )?;
        let incoming = get_incoming(&tx)?;
        assert_eq!(incoming.len(), 1);
        assert_eq!(
            incoming[0].1,
            IncomingState::NotLocal {
                ext_id: "ext_id".to_string(),
                incoming: DataState::Exists(map!({"foo": "bar"})),
                mirror: DataState::Exists(map!({"foo": "new"})),
            }
        );
        
        api::set(&tx, "ext_id", json!({"foo": "local"}))?;
        let incoming = get_incoming(&tx)?;
        assert_eq!(incoming.len(), 1);
        assert_eq!(
            incoming[0].1,
            IncomingState::Everywhere {
                ext_id: "ext_id".to_string(),
                incoming: DataState::Exists(map!({"foo": "bar"})),
                local: DataState::Exists(map!({"foo": "local"})),
                mirror: DataState::Exists(map!({"foo": "new"})),
            }
        );
        Ok(())
    }
    
    #[test]
    fn test_fetch_incoming_state_nulls() -> Result<()> {
        let mut db = new_syncable_mem_db();
        let tx = db.transaction()?;
        
        tx.execute(
            r#"
            INSERT INTO temp.storage_sync_staging (guid, ext_id, data)
            VALUES ('guid', NULL, NULL)
        "#,
            NO_PARAMS,
        )?;
        let incoming = get_incoming(&tx)?;
        assert_eq!(incoming.len(), 1);
        assert_eq!(incoming[0].1, IncomingState::IncomingOnlyTombstone,);
        
        
        tx.execute(
            r#"
            INSERT INTO storage_sync_mirror (guid, ext_id, data)
            VALUES ('guid', NULL, NULL)
        "#,
            NO_PARAMS,
        )?;
        let incoming = get_incoming(&tx)?;
        assert_eq!(incoming.len(), 1);
        assert_eq!(incoming[0].1, IncomingState::IncomingOnlyTombstone);
        tx.execute(
            r#"
            INSERT INTO storage_sync_data (ext_id, data)
            VALUES ('ext_id', NULL)
        "#,
            NO_PARAMS,
        )?;
        let incoming = get_incoming(&tx)?;
        assert_eq!(incoming.len(), 1);
        assert_eq!(
            incoming[0].1,
            
            
            
            IncomingState::IncomingOnlyTombstone
        );
        Ok(())
    }
    
    #[derive(Debug, PartialEq)]
    struct LocalItem {
        data: DataState,
        sync_change_counter: i32,
    }
    fn get_local_item(conn: &Connection) -> Option<LocalItem> {
        conn.try_query_row::<_, Error, _>(
            "SELECT data, sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext_id'",
            &[],
            |row| {
                let data = json_map_from_row(row, "data")?;
                let sync_change_counter = row.get::<_, i32>(1)?;
                Ok(LocalItem {
                    data,
                    sync_change_counter,
                })
            },
            true,
        )
        .expect("query should work")
    }
    fn get_applied_item_changes(conn: &Connection) -> Option<StorageChanges> {
        
        
        conn.try_query_row::<_, Error, _>(
            "SELECT changes FROM temp.storage_sync_applied WHERE ext_id = 'ext_id'",
            &[],
            |row| Ok(serde_json::from_str(&row.get::<_, String>("changes")?)?),
            true,
        )
        .expect("query should work")
        .map(|val: serde_json::Value| {
            let ob = val.as_object().expect("should be an object of items");
            let mut result = StorageChanges::with_capacity(ob.len());
            for (key, val) in ob.into_iter() {
                let details = val.as_object().expect("elts should be objects");
                result.push(StorageValueChange {
                    key: key.to_string(),
                    old_value: details.get("oldValue").cloned(),
                    new_value: details.get("newValue").cloned(),
                });
            }
            result
        })
    }
    fn do_apply_action(tx: &Transaction<'_>, action: IncomingAction) {
        let guid = SyncGuid::new("guid");
        apply_actions(tx, vec![(guid, action)], &NeverInterrupts).expect("should apply");
    }
    #[test]
    fn test_apply_actions() -> Result<()> {
        let mut db = new_syncable_mem_db();
        
        let tx = db.transaction().expect("transaction should work");
        api::set(&tx, "ext_id", json!({"foo": "local"}))?;
        assert_eq!(
            api::get(&tx, "ext_id", json!(null))?,
            json!({"foo": "local"})
        );
        let changes = changes![change!("foo", "local", None)];
        do_apply_action(
            &tx,
            IncomingAction::DeleteLocally {
                ext_id: "ext_id".to_string(),
                changes: changes.clone(),
            },
        );
        assert_eq!(api::get(&tx, "ext_id", json!(null))?, json!({}));
        
        assert!(get_local_item(&tx).is_none());
        assert_eq!(get_applied_item_changes(&tx), Some(changes));
        tx.rollback()?;
        
        let tx = db.transaction().expect("transaction should work");
        api::set(&tx, "ext_id", json!({"foo": "local"}))?;
        assert_eq!(
            api::get(&tx, "ext_id", json!(null))?,
            json!({"foo": "local"})
        );
        
        assert_eq!(
            get_local_item(&tx),
            Some(LocalItem {
                data: DataState::Exists(map!({"foo": "local"})),
                sync_change_counter: 1
            })
        );
        let changes = changes![change!("foo", "local", "remote")];
        do_apply_action(
            &tx,
            IncomingAction::TakeRemote {
                ext_id: "ext_id".to_string(),
                data: map!({"foo": "remote"}),
                changes: changes.clone(),
            },
        );
        
        assert_eq!(
            get_local_item(&tx),
            Some(LocalItem {
                data: DataState::Exists(map!({"foo": "remote"})),
                sync_change_counter: 0
            })
        );
        assert_eq!(get_applied_item_changes(&tx), Some(changes));
        tx.rollback()?;
        
        let tx = db.transaction().expect("transaction should work");
        api::set(&tx, "ext_id", json!({"foo": "local"}))?;
        assert_eq!(
            api::get(&tx, "ext_id", json!(null))?,
            json!({"foo": "local"})
        );
        
        assert_eq!(
            get_local_item(&tx),
            Some(LocalItem {
                data: DataState::Exists(map!({"foo": "local"})),
                sync_change_counter: 1
            })
        );
        let changes = changes![change!("foo", "local", "remote")];
        do_apply_action(
            &tx,
            IncomingAction::Merge {
                ext_id: "ext_id".to_string(),
                data: map!({"foo": "remote"}),
                changes: changes.clone(),
            },
        );
        assert_eq!(
            get_local_item(&tx),
            Some(LocalItem {
                data: DataState::Exists(map!({"foo": "remote"})),
                sync_change_counter: 2
            })
        );
        assert_eq!(get_applied_item_changes(&tx), Some(changes));
        tx.rollback()?;
        
        let tx = db.transaction().expect("transaction should work");
        api::set(&tx, "ext_id", json!({"foo": "local"}))?;
        assert_eq!(
            api::get(&tx, "ext_id", json!(null))?,
            json!({"foo": "local"})
        );
        
        assert_eq!(
            get_local_item(&tx),
            Some(LocalItem {
                data: DataState::Exists(map!({"foo": "local"})),
                sync_change_counter: 1
            })
        );
        do_apply_action(
            &tx,
            IncomingAction::Same {
                ext_id: "ext_id".to_string(),
            },
        );
        assert_eq!(
            get_local_item(&tx),
            Some(LocalItem {
                data: DataState::Exists(map!({"foo": "local"})),
                sync_change_counter: 0
            })
        );
        assert_eq!(get_applied_item_changes(&tx), None);
        tx.rollback()?;
        Ok(())
    }
}