Drivers and Querying¶
SQLSpec provides a unified driver interface across supported databases. This page covers the common execution patterns and points to adapter-specific configuration.
Supported Drivers (High Level)¶
PostgreSQL: asyncpg, psycopg (sync/async), psqlpy, ADBC
SQLite: sqlite3, aiosqlite, ADBC
MySQL: asyncmy, mysql-connector, pymysql
Analytics / Cloud: DuckDB, BigQuery, Spanner, Oracle, ADBC
Core Execution Pattern¶
sqlite session¶from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig
db_path = tmp_path / "driver.sqlite"
spec = SQLSpec()
config = spec.add_config(SqliteConfig(connection_config={"database": str(db_path)}))
with spec.provide_session(config) as session:
session.execute("create table if not exists health (id integer primary key)")
result = session.execute("select count(*) as total from health")
print(result.one())
Transactions¶
manual transaction¶from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig
db_path = tmp_path / "transactions.db"
spec = SQLSpec()
config = spec.add_config(SqliteConfig(connection_config={"database": str(db_path)}))
with spec.provide_session(config) as session:
session.execute("create table if not exists ledger (id integer primary key, note text)")
session.begin()
session.execute("insert into ledger (note) values ('committed')")
session.commit()
session.begin()
session.execute("insert into ledger (note) values ('rolled back')")
session.rollback()
result = session.execute("select note from ledger order by id")
print(result.all())
Parameter Binding¶
positional and named parameters¶from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig
db_path = tmp_path / "params.db"
spec = SQLSpec()
config = spec.add_config(SqliteConfig(connection_config={"database": str(db_path)}))
with spec.provide_session(config) as session:
session.execute("select :status as status, :status as status_copy", {"status": "active"})
result = session.execute("select ? as value", (42,))
print(result.one())
Schema Mapping¶
Use the schema_type parameter on select(), select_one(), and
select_one_or_none() to map result rows to dataclass instances automatically.
schema_type mapping¶from dataclasses import dataclass
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig
@dataclass
class User:
id: int
name: str
db_path = tmp_path / "schema.db"
spec = SQLSpec()
config = spec.add_config(SqliteConfig(connection_config={"database": str(db_path)}))
with spec.provide_session(config) as session:
session.execute("create table users (id integer primary key, name text)")
session.execute("insert into users (name) values ('Alice'), ('Bob')")
# select returns list of dicts by default
rows = session.select("select id, name from users order by id")
print(rows) # [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
# Use schema_type to map rows to dataclass instances
users = session.select("select id, name from users order by id", schema_type=User)
print(users) # [User(id=1, name='Alice'), User(id=2, name='Bob')]
# select_one with schema_type
user = session.select_one("select id, name from users where name = ?", "Alice", schema_type=User)
print(user) # User(id=1, name='Alice')
# select_one_or_none returns None if no match
maybe_user = session.select_one_or_none("select id, name from users where name = ?", "Nobody", schema_type=User)
print(maybe_user) # None
Scalar Values and Pagination¶
select_value returns a single scalar from a one-row, one-column result.
select_value_or_none returns None when no rows match.
select_with_total returns both the data page and the total count for pagination.
scalar values, execute_many, and pagination¶from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig
db_path = tmp_path / "batch.db"
spec = SQLSpec()
config = spec.add_config(SqliteConfig(connection_config={"database": str(db_path)}))
with spec.provide_session(config) as session:
session.execute("create table users (id integer primary key, name text, email text)")
# execute_many inserts multiple rows in a single call
session.execute_many(
"insert into users (name, email) values (?, ?)",
[("Alice", "[email protected]"), ("Bob", "[email protected]"), ("Charlie", "[email protected]")],
)
# select_value returns a single scalar value
count = session.select_value("select count(*) from users")
print(count) # 3
# select_value with type conversion
count_int = session.select_value("select count(*) from users", value_type=int)
print(count_int) # 3
# select_value_or_none returns None when no rows match
email = session.select_value_or_none("select email from users where name = ?", "Nobody")
print(email) # None
# select_with_total for pagination
from sqlspec.core import SQL
query = SQL("select id, name from users").paginate(page=1, page_size=2)
data, total = session.select_with_total(query)
print(f"Page has {len(data)} rows, total matching: {total}")
Streaming Results¶
select_stream() returns a context-managed stream of dict rows fetched in
bounded chunks using each driver's native streaming primitive. Adapters without
a native path materialize the full result eagerly by default (not
bounded-memory). Pass native_only=True to require a native stream and raise
ImproperConfigurationError when the adapter has no native path.
streaming rows¶from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig
spec = SQLSpec()
config = spec.add_config(SqliteConfig(connection_config={"database": ":memory:"}))
with spec.provide_session(config) as session:
session.execute("create table readings (id integer primary key, value integer)")
session.execute_many("insert into readings (value) values (?)", [(i,) for i in range(250)])
total = 0
with session.select_stream("select value from readings order by id", chunk_size=100) as stream:
for row in stream:
total += row["value"]
Adapter capability is discoverable via Config.supports_native_row_streaming.
Native paths: psycopg and CockroachDB-psycopg (server-side named cursors),
asyncpg and CockroachDB-asyncpg (cursors inside a stream-owned transaction),
pymysql/aiomysql/asyncmy (SSCursor), mysql-connector (unbuffered cursors),
sqlite/aiosqlite and oracledb (chunked fetchmany), psqlpy (server-side
cursor with array_size), and BigQuery (page-wise result iteration).
ADBC, DuckDB, mssql-python, arrow-odbc, and Spanner are eager-fallback only.
Lifetime and transaction rules:
Close the stream (or exhaust it) before issuing other statements on the same connection.
close()mid-iteration releases the cursor and is idempotent.psycopg, asyncpg, and psqlpy streams open their own transaction (a savepoint when one is already active) and commit it on close, so they stream correctly even on autocommit connections.
MySQL unbuffered cursors drain remaining rows when closed mid-iteration.
oracledb streaming returns raw driver values for LOB columns.
BigQuery
page_sizeis advisory; pages may exceed the requested chunk size.An exception raised during iteration closes the stream and propagates; the connection remains usable afterwards (issue a rollback first on PostgreSQL drivers, whose transaction is aborted by the failed statement).
Statement Stacks¶
Statement stacks bundle multiple SQL statements plus parameter sets. Drivers that support native pipelines or batch execution can send the stack in a single round trip, while others execute each statement sequentially.
statement stack¶from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig
from sqlspec.core.stack import StatementStack
spec = SQLSpec()
config = spec.add_config(SqliteConfig(connection_config={"database": ":memory:"}))
stack = (
StatementStack()
.push_execute("create table teams (id integer primary key, name text)")
.push_execute_many("insert into teams (name) values (:name)", [{"name": "Litestar"}, {"name": "SQLSpec"}])
.push_execute("select id, name from teams order by id")
)
with spec.provide_session(config) as session:
results = session.execute_stack(stack)
rows = results[-1].result.all()
Driver Configuration Examples¶
asyncpg config¶from sqlspec.adapters.asyncpg import AsyncpgConfig
config = AsyncpgConfig(
connection_config={"dsn": "postgresql://user:pass@localhost:5432/app"},
pool_config={"min_size": 1, "max_size": 5},
)
cockroach + psycopg¶from sqlspec.adapters.cockroach_psycopg import CockroachPsycopgSyncConfig
config = CockroachPsycopgSyncConfig(connection_config={"dsn": "postgresql://user:pass@localhost:26257/defaultdb"})
mysql connector config¶from sqlspec.adapters.mysqlconnector import MysqlConnectorSyncConfig
config = MysqlConnectorSyncConfig(
connection_config={"host": "localhost", "user": "app", "password": "secret", "database": "app_db"}
)