Skip to content

Commit

Permalink
WIP execution refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
abonander committed Apr 20, 2024
1 parent 6a4f61e commit b5b981d
Show file tree
Hide file tree
Showing 11 changed files with 628 additions and 197 deletions.
2 changes: 1 addition & 1 deletion sqlx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ futures-channel = { version = "0.3.19", default-features = false, features = ["s
futures-core = { version = "0.3.19", default-features = false }
futures-io = "0.3.24"
futures-intrusive = "0.5.0"
futures-util = { version = "0.3.19", default-features = false, features = ["alloc", "sink", "io"] }
futures-util = { version = "0.3.30", default-features = false, features = ["alloc", "sink", "io"] }
hex = "0.4.3"
log = { version = "0.4.14", default-features = false }
memchr = { version = "2.4.1", default-features = false }
Expand Down
3 changes: 3 additions & 0 deletions sqlx-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use std::fmt::Debug;
use crate::arguments::Arguments;
use crate::column::Column;
use crate::connection::Connection;
use crate::result_set::ResultSet;
use crate::row::Row;

use crate::statement::Statement;
Expand All @@ -82,6 +83,8 @@ pub trait Database: 'static + Sized + Send + Debug {
/// The concrete `QueryResult` implementation for this database.
type QueryResult: 'static + Sized + Send + Sync + Default + Extend<Self::QueryResult>;

type ResultSet: ResultSet;

/// The concrete `Column` implementation for this database.
type Column: Column<Database = Self>;

Expand Down
11 changes: 1 addition & 10 deletions sqlx-core/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::mem;
use crate::database::Database;

/// The return type of [Encode::encode].
#[must_use = "NULL values may write no data to the argument buffer"]
pub enum IsNull {
/// The value is null; no data was written.
Yes,
Expand All @@ -17,20 +18,10 @@ pub enum IsNull {

/// Encode a single value to be sent to the database.
pub trait Encode<'q, DB: Database> {
/// Writes the value of `self` into `buf` in the expected format for the database.
#[must_use]
fn encode(self, buf: &mut <DB as Database>::ArgumentBuffer<'q>) -> IsNull
where
Self: Sized,
{
self.encode_by_ref(buf)
}

/// Writes the value of `self` into `buf` without moving `self`.
///
/// Where possible, make use of `encode` instead as it can take advantage of re-using
/// memory.
#[must_use]
fn encode_by_ref(&self, buf: &mut <DB as Database>::ArgumentBuffer<'q>) -> IsNull;

fn produces(&self) -> Option<DB::TypeInfo> {
Expand Down
230 changes: 54 additions & 176 deletions sqlx-core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use crate::database::Database;
use crate::describe::Describe;
use crate::error::Error;

use either::Either;
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_util::{future, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use std::fmt::Debug;
use std::marker::PhantomData;
use crate::query_string::QueryString;
use crate::result_set::ResultSet;

/// A type that contains or can provide a database
/// connection to use for executing queries against the database.
Expand All @@ -32,110 +32,19 @@ use std::fmt::Debug;
///
pub trait Executor<'c>: Send + Debug + Sized {
type Database: Database;
type ResultSet: ResultSet<
Database = Self::Database,
Row = <Self::Database as Database>::Row
>;

/// Execute the query and return the total number of rows affected.
fn execute<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxFuture<'e, Result<<Self::Database as Database>::QueryResult, Error>>
where
'c: 'e,
E: Execute<'q, Self::Database>,
{
self.execute_many(query).try_collect().boxed()
}
/// Execute a query as an implicitly prepared statement.
async fn execute_prepared(&mut self, params: ExecutePrepared<'_, Self::Database>) -> Self::ResultSet;

/// Execute multiple queries and return the rows affected from each query, in a stream.
fn execute_many<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxStream<'e, Result<<Self::Database as Database>::QueryResult, Error>>
where
'c: 'e,
E: Execute<'q, Self::Database>,
{
self.fetch_many(query)
.try_filter_map(|step| async move {
Ok(match step {
Either::Left(rows) => Some(rows),
Either::Right(_) => None,
})
})
.boxed()
}

/// Execute the query and return the generated results as a stream.
fn fetch<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxStream<'e, Result<<Self::Database as Database>::Row, Error>>
where
'c: 'e,
E: Execute<'q, Self::Database>,
{
self.fetch_many(query)
.try_filter_map(|step| async move {
Ok(match step {
Either::Left(_) => None,
Either::Right(row) => Some(row),
})
})
.boxed()
}

/// Execute multiple queries and return the generated results as a stream
/// from each query, in a stream.
fn fetch_many<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxStream<
'e,
Result<
Either<<Self::Database as Database>::QueryResult, <Self::Database as Database>::Row>,
Error,
>,
>
where
'c: 'e,
E: Execute<'q, Self::Database>;

/// Execute the query and return all the generated results, collected into a [`Vec`].
fn fetch_all<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxFuture<'e, Result<Vec<<Self::Database as Database>::Row>, Error>>
where
'c: 'e,
E: Execute<'q, Self::Database>,
{
self.fetch(query).try_collect().boxed()
}

/// Execute the query and returns exactly one row.
fn fetch_one<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxFuture<'e, Result<<Self::Database as Database>::Row, Error>>
where
'c: 'e,
E: Execute<'q, Self::Database>,
{
self.fetch_optional(query)
.and_then(|row| match row {
Some(row) => future::ok(row),
None => future::err(Error::RowNotFound),
})
.boxed()
}

/// Execute the query and returns at most one row.
fn fetch_optional<'e, 'q: 'e, E: 'q>(
self,
query: E,
) -> BoxFuture<'e, Result<Option<<Self::Database as Database>::Row>, Error>>
where
'c: 'e,
E: Execute<'q, Self::Database>;
/// Execute raw SQL without creating a prepared statement.
///
/// The SQL string may contain multiple statements separated by semicolons (`;`)
/// as well as DDL (`CREATE TABLE`, `ALTER TABLE`, etc.).
async fn execute_raw(&mut self, params: ExecuteRaw<'_, Self::Database>) -> Self::ResultSet;

/// Prepare the SQL query to inspect the type information of its parameters
/// and results.
Expand All @@ -146,26 +55,26 @@ pub trait Executor<'c>: Send + Debug + Sized {
/// This explicit API is provided to allow access to the statement metadata available after
/// it prepared but before the first row is returned.
#[inline]
fn prepare<'e, 'q: 'e>(
async fn prepare<'e, 'q: 'e>(
self,
query: &'q str,
) -> BoxFuture<'e, Result<<Self::Database as Database>::Statement<'q>, Error>>
) -> Result<<Self::Database as Database>::Statement<'q>, Error>
where
'c: 'e,
{
self.prepare_with(query, &[])
self.prepare_with(query, &[]).await
}

/// Prepare the SQL query, with parameter type information, to inspect the
/// type information about its parameters and results.
///
/// Only some database drivers (PostgreSQL, MSSQL) can take advantage of
/// this extra information to influence parameter type inference.
fn prepare_with<'e, 'q: 'e>(
async fn prepare_with<'e, 'q: 'e>(
self,
sql: &'q str,
parameters: &'e [<Self::Database as Database>::TypeInfo],
) -> BoxFuture<'e, Result<<Self::Database as Database>::Statement<'q>, Error>>
) -> Result<<Self::Database as Database>::Statement<'q>, Error>
where
'c: 'e;

Expand All @@ -183,73 +92,42 @@ pub trait Executor<'c>: Send + Debug + Sized {
'c: 'e;
}

/// A type that may be executed against a database connection.
///
/// Implemented for the following:
///
/// * [`&str`](std::str)
/// * [`Query`](super::query::Query)
///
pub trait Execute<'q, DB: Database>: Send + Sized {
/// Gets the SQL that will be executed.
fn sql(&self) -> &'q str;

/// Gets the previously cached statement, if available.
fn statement(&self) -> Option<&DB::Statement<'q>>;

/// Returns the arguments to be bound against the query string.
/// Arguments struct for [`Executor::execute_prepared()`].
pub struct ExecutePrepared<'q, DB: Database> {
/// The SQL string to execute.
pub query: QueryString<'_>,
/// The bind arguments for the query string; must match the number of placeholders.
pub arguments: <DB as Database>::Arguments<'q>,
/// The maximum number of rows to return.
///
/// Returning `None` for `Arguments` indicates to use a "simple" query protocol and to not
/// prepare the query. Returning `Some(Default::default())` is an empty arguments object that
/// will be prepared (and cached) before execution.
fn take_arguments(&mut self) -> Option<<DB as Database>::Arguments<'q>>;

/// Returns `true` if the statement should be cached.
fn persistent(&self) -> bool;
}

// NOTE: `Execute` is explicitly not implemented for String and &String to make it slightly more
// involved to write `conn.execute(format!("SELECT {val}"))`
impl<'q, DB: Database> Execute<'q, DB> for &'q str {
#[inline]
fn sql(&self) -> &'q str {
self
}

#[inline]
fn statement(&self) -> Option<&DB::Statement<'q>> {
None
}

#[inline]
fn take_arguments(&mut self) -> Option<<DB as Database>::Arguments<'q>> {
None
}

#[inline]
fn persistent(&self) -> bool {
true
}
/// Set to `Some(0)` to just get the result.
pub limit: Option<u64>,
/// The number of rows to request from the database at a time.
///
/// This is the maximum number of rows that will be buffered in-memory.
///
/// This will also be the maximum number of rows that need to be read and discarded should the
/// [`ResultSet`] be dropped early.
pub buffer: Option<usize>,
/// If `true`, prepare the statement with a name and cache it for later re-use.
pub persistent: bool,
_db: PhantomData<DB>
}

impl<'q, DB: Database> Execute<'q, DB> for (&'q str, Option<<DB as Database>::Arguments<'q>>) {
#[inline]
fn sql(&self) -> &'q str {
self.0
}

#[inline]
fn statement(&self) -> Option<&DB::Statement<'q>> {
None
}

#[inline]
fn take_arguments(&mut self) -> Option<<DB as Database>::Arguments<'q>> {
self.1.take()
}

#[inline]
fn persistent(&self) -> bool {
true
}
/// Arguments struct for [`Executor::execute_raw()`].
pub struct ExecuteRaw<'q, DB: Database> {
/// The SQL string to execute.
pub query: QueryString<'_>,
/// The maximum number of rows to return.
///
/// Set to `Some(0)` to just get the result.
pub limit: Option<u64>,
/// The number of rows to request from the database at a time.
///
/// This is the maximum number of rows that will be buffered in-memory.
///
/// This will also be the maximum number of rows that need to be read and discarded should the
/// [`ResultSet`] be dropped early.
pub buffer: Option<usize>,
_db: PhantomData<DB>
}
4 changes: 4 additions & 0 deletions sqlx-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ pub mod query_as;
pub mod query_builder;
pub mod query_scalar;

pub mod query_string;

pub mod raw_sql;

pub mod result_set;
pub mod row;
pub mod rt;
pub mod sync;
Expand Down
2 changes: 1 addition & 1 deletion sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ impl<DB: Database> DecrementSizeGuard<DB> {

pub fn from_permit(pool: Arc<PoolInner<DB>>, permit: AsyncSemaphoreReleaser<'_>) -> Self {
// here we effectively take ownership of the permit
permit.disarm();
permit.consume();
Self::new_permit(pool)
}

Expand Down
3 changes: 2 additions & 1 deletion sqlx-core/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ use crate::database::{Database, HasStatementCache};
use crate::encode::Encode;
use crate::error::Error;
use crate::executor::{Execute, Executor};
use crate::query_string::QueryString;
use crate::statement::Statement;
use crate::types::Type;

/// A single SQL query as a prepared statement. Returned by [`query()`].
#[must_use = "query must be executed to affect database"]
pub struct Query<'q, DB: Database, A> {
pub(crate) statement: Either<&'q str, &'q DB::Statement<'q>>,
pub(crate) statement: Either<QueryString<'q>, &'q DB::Statement<'q>>,
pub(crate) arguments: Option<A>,
pub(crate) database: PhantomData<DB>,
pub(crate) persistent: bool,
Expand Down
Loading

0 comments on commit b5b981d

Please sign in to comment.