Drivers and Querying

SQLSpec provides a unified driver interface across supported databases. This page covers the common execution patterns and points to adapter-specific configuration.

Supported Drivers (High Level)

  • PostgreSQL: asyncpg, psycopg (sync/async), psqlpy, ADBC

  • SQLite: sqlite3, aiosqlite, ADBC

  • MySQL: asyncmy, mysql-connector, pymysql

  • Analytics / Cloud: DuckDB, BigQuery, Spanner, Oracle, ADBC

Core Execution Pattern

sqlite session
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig

db_path = tmp_path / "driver.sqlite"
spec = SQLSpec()
config = spec.add_config(SqliteConfig(connection_config={"database": str(db_path)}))

with spec.provide_session(config) as session:
    session.execute("create table if not exists health (id integer primary key)")
    result = session.execute("select count(*) as total from health")
    print(result.one())

Transactions

manual transaction
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig

db_path = tmp_path / "transactions.db"
spec = SQLSpec()
config = spec.add_config(SqliteConfig(connection_config={"database": str(db_path)}))

with spec.provide_session(config) as session:
    session.execute("create table if not exists ledger (id integer primary key, note text)")
    session.begin()
    session.execute("insert into ledger (note) values ('committed')")
    session.commit()

    session.begin()
    session.execute("insert into ledger (note) values ('rolled back')")
    session.rollback()

    result = session.execute("select note from ledger order by id")
    print(result.all())

Parameter Binding

positional and named parameters
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig

db_path = tmp_path / "params.db"
spec = SQLSpec()
config = spec.add_config(SqliteConfig(connection_config={"database": str(db_path)}))

with spec.provide_session(config) as session:
    session.execute("select :status as status, :status as status_copy", {"status": "active"})
    result = session.execute("select ? as value", (42,))
    print(result.one())

Schema Mapping

Use the schema_type parameter on select(), select_one(), and select_one_or_none() to map result rows to dataclass instances automatically.

schema_type mapping
from dataclasses import dataclass

from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig

@dataclass
class User:
    id: int
    name: str

db_path = tmp_path / "schema.db"
spec = SQLSpec()
config = spec.add_config(SqliteConfig(connection_config={"database": str(db_path)}))

with spec.provide_session(config) as session:
    session.execute("create table users (id integer primary key, name text)")
    session.execute("insert into users (name) values ('Alice'), ('Bob')")

    # select returns list of dicts by default
    rows = session.select("select id, name from users order by id")
    print(rows)  # [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]

    # Use schema_type to map rows to dataclass instances
    users = session.select("select id, name from users order by id", schema_type=User)
    print(users)  # [User(id=1, name='Alice'), User(id=2, name='Bob')]

    # select_one with schema_type
    user = session.select_one("select id, name from users where name = ?", "Alice", schema_type=User)
    print(user)  # User(id=1, name='Alice')

    # select_one_or_none returns None if no match
    maybe_user = session.select_one_or_none("select id, name from users where name = ?", "Nobody", schema_type=User)
    print(maybe_user)  # None

Scalar Values and Pagination

select_value returns a single scalar from a one-row, one-column result. select_value_or_none returns None when no rows match. select_with_total returns both the data page and the total count for pagination.

scalar values, execute_many, and pagination
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig

db_path = tmp_path / "batch.db"
spec = SQLSpec()
config = spec.add_config(SqliteConfig(connection_config={"database": str(db_path)}))

with spec.provide_session(config) as session:
    session.execute("create table users (id integer primary key, name text, email text)")

    # execute_many inserts multiple rows in a single call
    session.execute_many(
        "insert into users (name, email) values (?, ?)",
        [("Alice", "[email protected]"), ("Bob", "[email protected]"), ("Charlie", "[email protected]")],
    )

    # select_value returns a single scalar value
    count = session.select_value("select count(*) from users")
    print(count)  # 3

    # select_value with type conversion
    count_int = session.select_value("select count(*) from users", value_type=int)
    print(count_int)  # 3

    # select_value_or_none returns None when no rows match
    email = session.select_value_or_none("select email from users where name = ?", "Nobody")
    print(email)  # None

    # select_with_total for pagination
    from sqlspec.core import SQL

    query = SQL("select id, name from users").paginate(page=1, page_size=2)
    data, total = session.select_with_total(query)
    print(f"Page has {len(data)} rows, total matching: {total}")

Streaming Results

select_stream() returns a context-managed stream of dict rows fetched in bounded chunks using each driver's native streaming primitive. Adapters without a native path materialize the full result eagerly by default (not bounded-memory). Pass native_only=True to require a native stream and raise ImproperConfigurationError when the adapter has no native path.

streaming rows
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig

spec = SQLSpec()
config = spec.add_config(SqliteConfig(connection_config={"database": ":memory:"}))

with spec.provide_session(config) as session:
    session.execute("create table readings (id integer primary key, value integer)")
    session.execute_many("insert into readings (value) values (?)", [(i,) for i in range(250)])

    total = 0
    with session.select_stream("select value from readings order by id", chunk_size=100) as stream:
        for row in stream:
            total += row["value"]

Adapter capability is discoverable via Config.supports_native_row_streaming. Native paths: psycopg and CockroachDB-psycopg (server-side named cursors), asyncpg and CockroachDB-asyncpg (cursors inside a stream-owned transaction), pymysql/aiomysql/asyncmy (SSCursor), mysql-connector (unbuffered cursors), sqlite/aiosqlite and oracledb (chunked fetchmany), psqlpy (server-side cursor with array_size), and BigQuery (page-wise result iteration). ADBC, DuckDB, mssql-python, arrow-odbc, and Spanner are eager-fallback only.

Lifetime and transaction rules:

  • Close the stream (or exhaust it) before issuing other statements on the same connection. close() mid-iteration releases the cursor and is idempotent.

  • psycopg, asyncpg, and psqlpy streams open their own transaction (a savepoint when one is already active) and commit it on close, so they stream correctly even on autocommit connections.

  • MySQL unbuffered cursors drain remaining rows when closed mid-iteration.

  • oracledb streaming returns raw driver values for LOB columns.

  • BigQuery page_size is advisory; pages may exceed the requested chunk size.

  • An exception raised during iteration closes the stream and propagates; the connection remains usable afterwards (issue a rollback first on PostgreSQL drivers, whose transaction is aborted by the failed statement).

Statement Stacks

Statement stacks bundle multiple SQL statements plus parameter sets. Drivers that support native pipelines or batch execution can send the stack in a single round trip, while others execute each statement sequentially.

statement stack
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig
from sqlspec.core.stack import StatementStack

spec = SQLSpec()
config = spec.add_config(SqliteConfig(connection_config={"database": ":memory:"}))

stack = (
    StatementStack()
    .push_execute("create table teams (id integer primary key, name text)")
    .push_execute_many("insert into teams (name) values (:name)", [{"name": "Litestar"}, {"name": "SQLSpec"}])
    .push_execute("select id, name from teams order by id")
)

with spec.provide_session(config) as session:
    results = session.execute_stack(stack)
    rows = results[-1].result.all()

Driver Configuration Examples

asyncpg config
from sqlspec.adapters.asyncpg import AsyncpgConfig

config = AsyncpgConfig(
    connection_config={"dsn": "postgresql://user:pass@localhost:5432/app"},
    pool_config={"min_size": 1, "max_size": 5},
)
cockroach + psycopg
from sqlspec.adapters.cockroach_psycopg import CockroachPsycopgSyncConfig

config = CockroachPsycopgSyncConfig(connection_config={"dsn": "postgresql://user:pass@localhost:26257/defaultdb"})
mysql connector config
from sqlspec.adapters.mysqlconnector import MysqlConnectorSyncConfig

config = MysqlConnectorSyncConfig(
    connection_config={"host": "localhost", "user": "app", "password": "secret", "database": "app_db"}
)