use rusqlite::{
self,
types::{FromSql, ToSql},
Connection, Result as SqlResult, Row, Savepoint, Transaction, TransactionBehavior, NO_PARAMS,
};
use std::iter::FromIterator;
use std::ops::Deref;
use std::time::Instant;
use crate::maybe_cached::MaybeCached;
pub struct Conn(rusqlite::Connection);
pub trait ConnExt {
fn conn(&self) -> &Connection;
fn set_pragma<T>(&self, pragma_name: &str, pragma_value: T) -> SqlResult<&Self>
where
T: ToSql,
Self: Sized,
{
self.conn()
.pragma_update(None, pragma_name, &pragma_value)?;
Ok(self)
}
fn prepare_maybe_cached<'conn>(
&'conn self,
sql: &str,
cache: bool,
) -> SqlResult<MaybeCached<'conn>> {
MaybeCached::prepare(self.conn(), sql, cache)
}
fn execute_all(&self, stmts: &[&str]) -> SqlResult<()> {
let conn = self.conn();
for sql in stmts {
let r = conn.execute(sql, NO_PARAMS);
match r {
Ok(_) => {}
Err(rusqlite::Error::ExecuteReturnedResults) => {}
Err(e) => return Err(e),
}
}
Ok(())
}
fn execute_cached<P>(&self, sql: &str, params: P) -> SqlResult<usize>
where
P: IntoIterator,
P::Item: ToSql,
{
let mut stmt = self.conn().prepare_cached(sql)?;
stmt.execute(params)
}
fn execute_named_cached(&self, sql: &str, params: &[(&str, &dyn ToSql)]) -> SqlResult<usize> {
crate::maybe_log_plan(self.conn(), sql, params);
let mut stmt = self.conn().prepare_cached(sql)?;
stmt.execute_named(params)
}
fn query_one<T: FromSql>(&self, sql: &str) -> SqlResult<T> {
crate::maybe_log_plan(self.conn(), sql, &[]);
let res: T = self
.conn()
.query_row_and_then(sql, NO_PARAMS, |row| row.get(0))?;
Ok(res)
}
fn try_query_one<T: FromSql>(
&self,
sql: &str,
params: &[(&str, &dyn ToSql)],
cache: bool,
) -> SqlResult<Option<T>>
where
Self: Sized,
{
crate::maybe_log_plan(self.conn(), sql, params);
use rusqlite::OptionalExtension;
let res: Option<Option<T>> = self
.conn()
.query_row_and_then_named(sql, params, |row| row.get(0), cache)
.optional()?;
Ok(res.unwrap_or_default())
}
fn query_row_and_then_named<T, E, F>(
&self,
sql: &str,
params: &[(&str, &dyn ToSql)],
mapper: F,
cache: bool,
) -> Result<T, E>
where
Self: Sized,
E: From<rusqlite::Error>,
F: FnOnce(&Row<'_>) -> Result<T, E>,
{
crate::maybe_log_plan(self.conn(), sql, params);
Ok(self
.try_query_row(sql, params, mapper, cache)?
.ok_or(rusqlite::Error::QueryReturnedNoRows)?)
}
fn query_rows_and_then_named<T, E, F>(
&self,
sql: &str,
params: &[(&str, &dyn ToSql)],
mapper: F,
) -> Result<Vec<T>, E>
where
Self: Sized,
E: From<rusqlite::Error>,
F: FnMut(&Row<'_>) -> Result<T, E>,
{
crate::maybe_log_plan(self.conn(), sql, params);
query_rows_and_then_named(self.conn(), sql, params, mapper, false)
}
fn query_rows_and_then_named_cached<T, E, F>(
&self,
sql: &str,
params: &[(&str, &dyn ToSql)],
mapper: F,
) -> Result<Vec<T>, E>
where
Self: Sized,
E: From<rusqlite::Error>,
F: FnMut(&Row<'_>) -> Result<T, E>,
{
crate::maybe_log_plan(self.conn(), sql, params);
query_rows_and_then_named(self.conn(), sql, params, mapper, true)
}
fn query_rows_into<Coll, T, E, F>(
&self,
sql: &str,
params: &[(&str, &dyn ToSql)],
mapper: F,
) -> Result<Coll, E>
where
Self: Sized,
E: From<rusqlite::Error>,
F: FnMut(&Row<'_>) -> Result<T, E>,
Coll: FromIterator<T>,
{
crate::maybe_log_plan(self.conn(), sql, params);
query_rows_and_then_named(self.conn(), sql, params, mapper, false)
}
fn query_rows_into_cached<Coll, T, E, F>(
&self,
sql: &str,
params: &[(&str, &dyn ToSql)],
mapper: F,
) -> Result<Coll, E>
where
Self: Sized,
E: From<rusqlite::Error>,
F: FnMut(&Row<'_>) -> Result<T, E>,
Coll: FromIterator<T>,
{
crate::maybe_log_plan(self.conn(), sql, params);
query_rows_and_then_named(self.conn(), sql, params, mapper, true)
}
fn try_query_row<T, E, F>(
&self,
sql: &str,
params: &[(&str, &dyn ToSql)],
mapper: F,
cache: bool,
) -> Result<Option<T>, E>
where
Self: Sized,
E: From<rusqlite::Error>,
F: FnOnce(&Row<'_>) -> Result<T, E>,
{
crate::maybe_log_plan(self.conn(), sql, params);
let conn = self.conn();
let mut stmt = MaybeCached::prepare(conn, sql, cache)?;
let mut rows = stmt.query_named(params)?;
rows.next()?.map(mapper).transpose()
}
fn unchecked_transaction(&self) -> SqlResult<UncheckedTransaction<'_>> {
UncheckedTransaction::new(self.conn(), TransactionBehavior::Deferred)
}
fn unchecked_transaction_imm(&self) -> SqlResult<UncheckedTransaction<'_>> {
UncheckedTransaction::new(self.conn(), TransactionBehavior::Immediate)
}
}
impl ConnExt for Connection {
#[inline]
fn conn(&self) -> &Connection {
self
}
}
impl<'conn> ConnExt for Transaction<'conn> {
#[inline]
fn conn(&self) -> &Connection {
&*self
}
}
impl<'conn> ConnExt for Savepoint<'conn> {
#[inline]
fn conn(&self) -> &Connection {
&*self
}
}
pub struct UncheckedTransaction<'conn> {
pub conn: &'conn Connection,
pub started_at: Instant,
pub finished: bool,
}
impl<'conn> UncheckedTransaction<'conn> {
pub fn new(conn: &'conn Connection, behavior: TransactionBehavior) -> SqlResult<Self> {
let query = match behavior {
TransactionBehavior::Deferred => "BEGIN DEFERRED",
TransactionBehavior::Immediate => "BEGIN IMMEDIATE",
TransactionBehavior::Exclusive => "BEGIN EXCLUSIVE",
_ => unreachable!(),
};
conn.execute_batch(query)
.map(move |_| UncheckedTransaction {
conn,
started_at: Instant::now(),
finished: false,
})
}
pub fn commit(mut self) -> SqlResult<()> {
if self.finished {
log::warn!("ignoring request to commit an already finished transaction");
return Ok(());
}
self.finished = true;
self.conn.execute_batch("COMMIT")?;
log::debug!("Transaction commited after {:?}", self.started_at.elapsed());
Ok(())
}
pub fn rollback(mut self) -> SqlResult<()> {
if self.finished {
log::warn!("ignoring request to rollback an already finished transaction");
return Ok(());
}
self.rollback_()
}
fn rollback_(&mut self) -> SqlResult<()> {
self.finished = true;
self.conn.execute_batch("ROLLBACK")?;
Ok(())
}
fn finish_(&mut self) -> SqlResult<()> {
if self.finished || self.conn.is_autocommit() {
return Ok(());
}
self.rollback_()?;
Ok(())
}
}
impl<'conn> Deref for UncheckedTransaction<'conn> {
type Target = Connection;
#[inline]
fn deref(&self) -> &Connection {
self.conn
}
}
impl<'conn> Drop for UncheckedTransaction<'conn> {
fn drop(&mut self) {
if let Err(e) = self.finish_() {
log::warn!("Error dropping an unchecked transaction: {}", e);
}
}
}
impl<'conn> ConnExt for UncheckedTransaction<'conn> {
#[inline]
fn conn(&self) -> &Connection {
&*self
}
}
fn query_rows_and_then_named<Coll, T, E, F>(
conn: &Connection,
sql: &str,
params: &[(&str, &dyn ToSql)],
mapper: F,
cache: bool,
) -> Result<Coll, E>
where
E: From<rusqlite::Error>,
F: FnMut(&Row<'_>) -> Result<T, E>,
Coll: FromIterator<T>,
{
let mut stmt = conn.prepare_maybe_cached(sql, cache)?;
let iter = stmt.query_and_then_named(params, mapper)?;
Ok(iter.collect::<Result<Coll, E>>()?)
}