Async Query
The AsyncQuery
class is an Async wrapper for sqlalchemy.sql.Select
.
It implements the functionality of both Session
and Smart Queries
mixins.
Warning
All relations used in filtering/sorting/grouping should be explicitly set,
not just being a backref
. See the
About Relationships section
for more information.
Info
The examples below assume the following models:
from sqlalchemy import ForeignKey, String
from sqlalchemy.ext.hybrid import hybrid_method, hybrid_property
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlactive.base_model import ActiveRecordBaseModel
class BaseModel(ActiveRecordBaseModel):
__abstract__ = True
class User(BaseModel):
__tablename__ = 'users'
id: Mapped[int] = mapped_column(
primary_key=True, autoincrement=True, index=True
)
username: Mapped[str] = mapped_column(
String(18), nullable=False, unique=True
)
name: Mapped[str] = mapped_column(String(50), nullable=False)
age: Mapped[int] = mapped_column(nullable=False)
posts: Mapped[list['Post']] = relationship(back_populates='user')
comments: Mapped[list['Comment']] = relationship(back_populates='user')
@hybrid_property
def is_adult(self) -> int:
return self.age > 18
@hybrid_method
def older_than(self, other: 'User') -> bool:
return self.age > other.age
class Post(BaseModel):
__tablename__ = 'posts'
id: Mapped[int] = mapped_column(
primary_key=True, autoincrement=True, index=True
)
title: Mapped[str] = mapped_column(String(100), nullable=False)
body: Mapped[str] = mapped_column(nullable=False)
rating: Mapped[int] = mapped_column(nullable=False)
user_id: Mapped[int] = mapped_column(ForeignKey('users.id'))
user: Mapped['User'] = relationship(back_populates='posts')
comments: Mapped[list['Comment']] = relationship(back_populates='post')
class Comment(BaseModel):
__tablename__ = 'comments'
id: Mapped[int] = mapped_column(
primary_key=True, autoincrement=True, index=True
)
body: Mapped[str] = mapped_column(nullable=False)
post_id: Mapped[int] = mapped_column(ForeignKey('posts.id'))
user_id: Mapped[int] = mapped_column(ForeignKey('users.id'))
post: Mapped['Post'] = relationship(back_populates='comments')
user: Mapped['User'] = relationship(back_populates='comments')
class Product(BaseModel):
__tablename__ = 'products'
id: Mapped[int] = mapped_column(
primary_key=True, autoincrement=True, index=True
)
name: Mapped[str] = mapped_column(String(100), nullable=False)
description: Mapped[str] = mapped_column(String(100), nullable=False)
price: Mapped[float] = mapped_column(nullable=False)
sells: Mapped[list['Sell']] = relationship(
back_populates='product', viewonly=True
)
class Sell(BaseModel):
__tablename__ = 'sells'
id: Mapped[int] = mapped_column(primary_key=True)
product_id: Mapped[int] = mapped_column(
ForeignKey('products.id'), primary_key=True
)
quantity: Mapped[int] = mapped_column(nullable=False)
product: Mapped['Product'] = relationship(back_populates='sells')
Usage
The AsyncQuery
class provides a set of helper methods for asynchronously
executing the query.
Setting the Session
This class needs an sqlalchemy.ext.asyncio.async_scoped_session
instance to perform the actual query. The set_session
class
method must be called before using this class.
When calling the set_session
method from a base model
(either ActiveRecordBaseModel
, a subclass of it or a model,
i.e. User
), the session will be set automatically.
Calling
set_session
from either a base model or a model:# from your base model class (recommended) YourBaseModel.set_session(session) # from the ActiveRecordBaseModel class ActiveRecordBaseModel.set_session(session) # from your model User.set_session(session) # create a instance query = select(User) async_query = AsyncQuery(query)
Calling
set_session
from theAsyncQuery
instance:
Performing Queries
Example of usage:
query = select(User)
async_query = AsyncQuery(query)
async_query = async_query.where(name__like='%John%').sort('-created_at')
async_query = async_query.limit(2)
users = await async_query.all()
To get the sqlalchemy.sql.Select
instance to use native SQLAlchemy methods
use the query
property:
query = select(User)
async_query = AsyncQuery(query)
async_query.query # <sqlalchemy.sql.Select at 0x...>
Warning
If a NoSessionError
is raised, it means that there is no session
associated with the AsyncQuery
instance. This can happen
if the set_session
method of the base model has not been called
or if the model has not been initialized with a session.
In this case, you must provide a session by calling the set_session
either from the model or the AsyncQuery
instance as described above.
API Reference
Attributes
query
The wrapped
sqlalchemy.sql.Select
instance.Examples
Instance Methods
execute
Executes the query and returns a
sqlalchemy.engine.Result
instance containing the results.Returns
sqlalchemy.engine.Result[Any]
: Result of the query.
scalars
Returns a
sqlalchemy.engine.ScalarResult
instance containing all rows.Returns
sqlalchemy.engine.ScalarResult[T]
: Result instance containing all scalars.Examples
>>> query = select(User) >>> async_query = AsyncQuery(query) >>> result = await async_query.scalars() >>> result <sqlalchemy.engine.result.ScalarResult object at 0x...> >>> users = result.all() >>> users [User(id=1), User(id=2), ...] >>> result = await async_query.where(name='John Doe').scalars() >>> users = result.all() >>> users [User(id=2)]
first
Fetches the first row or
None
if no results are found.If
scalar
isTrue
, returns a scalar value (default).Parameters
scalar
: IfTrue
, returns a scalar value (T
), otherwise returns a row (default:True
).Returns
T
: Instance for method chaining (scalar).sqlalchemy.engine.Row[tuple[Any, ...]]
: Row.None
: If no result is found.Examples
Usage:
>>> query = select(User) >>> async_query = AsyncQuery(query) >>> user = await async_query.first() >>> user User(id=1) >>> user = await async_query.first(scalar=False) >>> user (User(id=1),)
Selecting specific columns:
one
Fetches one row or raises a
sqlalchemy.exc.NoResultFound
exception if no results are found.If multiple results are found, it will raise a
sqlalchemy.exc.MultipleResultsFound
exception.If
scalar
isTrue
, returns a scalar value (default).Parameters
scalar
: IfTrue
, returns a scalar value (T
), otherwise returns a row (default:True
).Returns
T
: Instance for method chaining (scalar).sqlalchemy.engine.Row[tuple[Any, ...]]
: Row.Raises
NoResultFound
: If no row is found.MultipleResultsFound
: If multiple rows match.Examples
Usage:
>>> query = select(User) >>> async_query = AsyncQuery(query) >>> user = await async_query.where(name='John Doe').one() >>> user User(id=1) >>> user = await async_query.where(name='John Doe') ... .one(scalar=False) >>> user (User(id=1),) >>> user = await async_query.where(name='Unknown').one() Traceback (most recent call last): ... sqlalchemy.exc.NoResultFound: No row was found when one was required >>> user = await async_query.one() Traceback (most recent call last): ... sqlalchemy.exc.MultipleResultsFound: Multiple rows were found when one was required
Selecting specific columns:
one_or_none
Fetches one row or
None
if no results are found.If multiple results are found, it will raise a
sqlalchemy.exc.MultipleResultsFound
exception.If
scalar
isTrue
, returns a scalar value (default).Parameters
scalar
: IfTrue
, returns a scalar value (T
), otherwise returns a row (default:True
).Returns
T
: Instance for method chaining (scalar).sqlalchemy.engine.Row[tuple[Any, ...]]
: Row.None
: If no result is found.Raises
MultipleResultsFound
: If multiple rows match.Examples
Usage:
>>> query = select(User) >>> async_query = AsyncQuery(query) >>> user = await async_query.where(name='John Doe') ... .one_or_none() >>> user User(id=1) >>> user = await async_query.where(name='John Doe') ... .one_or_none(scalar=False) >>> user (User(id=1),) >>> user = await async_query.where(name='Unknown') ... .one_or_none() >>> user None >>> user = await async_query.one_or_none() Traceback (most recent call last): ... sqlalchemy.exc.MultipleResultsFound: Multiple rows were found when one was required
Selecting specific columns:
all
Fetches all rows.
If
scalars
isTrue
, returns scalar values (default).Parameters
scalars
: IfTrue
, returns scalar values (Sequence[T]
), otherwise returns rows (default:True
).Returns
Sequence[T]
: Instances (scalars).Sequence[sqlalchemy.engine.Row[tuple[Any, ...]]]
: Rows.Examples
Usage:
>>> query = select(User) >>> async_query = AsyncQuery(query) >>> users = await async_query.all() >>> users [User(id=1), User(id=2), ...] >>> users = await async_query.all(scalars=False) >>> users [(User(id=1),), (User(id=2),), ...]
Selecting specific columns:
count
Fetches the number of rows.
Returns
int
: The number of rows.Examples
unique
Similar to
scalars()
but applies unique filtering to the objects returned in the result instance.If
scalars
isFalse
, returns asqlalchemy.engine.Result
instance instead of asqlalchemy.engine.ScalarResult
instance.Note
This method is different from
distinct()
in that it applies unique filtering to the objects returned in the result instance. If you need to apply unique filtering on the query (a DISTINCT clause), usedistinct()
instead.Parameters
scalars
: IfTrue
, returns asqlalchemy.engine.ScalarResult
instance. Otherwise, returns asqlalchemy.engine.Result
instance (default:True
).Returns
sqlalchemy.engine.ScalarResult[T]
: Result instance containing all scalars.sqlalchemy.engine.Result[tuple[Any, ...]]
: Result instance containing all rows.Examples
unique_first
Similar to
first()
but applies unique filtering to the objects returned by eithersqlalchemy.engine.ScalarResult
orsqlalchemy.engine.Result
depending on the value ofscalar
.Note
This method is different from
distinct()
in that it applies unique filtering to the objects returned in the result instance. If you need to apply unique filtering on the query (a DISTINCT clause), usedistinct()
instead.
unique_one
Similar to
one()
but applies unique filtering to the objects returned by eithersqlalchemy.engine.ScalarResult
orsqlalchemy.engine.Result
depending on the value ofscalar
.Note
This method is different from
distinct()
in that it applies unique filtering to the objects returned in the result instance. If you need to apply unique filtering on the query (a DISTINCT clause), usedistinct()
instead.
unique_one_or_none
Similar to
one_or_none()
but applies unique filtering to the objects returned by eithersqlalchemy.engine.ScalarResult
orsqlalchemy.engine.Result
depending on the value ofscalar
.Note
This method is different from
distinct()
in that it applies unique filtering to the objects returned in the result instance. If you need to apply unique filtering on the query (a DISTINCT clause), usedistinct()
instead.See
unique()
andone_or_none()
for more details.
unique_all
Similar to
all()
but applies unique filtering to the objects returned by eithersqlalchemy.engine.ScalarResult
orsqlalchemy.engine.Result
depending on the value ofscalars
.Note
This method is different from
distinct()
in that it applies unique filtering to the objects returned in the result instance. If you need to apply unique filtering on the query (a DISTINCT clause), usedistinct()
instead.
unique_count
Similar to
count()
but applies unique filtering to the objects returned bysqlalchemy.engine.ScalarResult
.Note
This method is different from
distinct()
in that it applies unique filtering to the objects returned in the result instance. If you need to apply unique filtering on the query (a DISTINCT clause), usedistinct()
instead.
select
Replaces the columns clause with the given entities.
The existing set of FROMs are maintained, including those implied by the current columns clause.
Parameters
entities
: The entities to select.Returns
Self
: The instance itself for method chaining.Examples
>>> query = select(User) >>> async_query = AsyncQuery(query) >>> async_query.order_by('-created_at') >>> async_query SELECT users.id, users.username, users.name, ... FROM users ORDER BY users.created_at DESC >>> async_query.select(User.name, User.age) >>> async_query SELECT users.name, users.age FROM users ORDER BY users.created_at DESC
distinct
Applies DISTINCT to the SELECT statement overall.
Returns
Self
: The instance itself for method chaining.Examples
options
Applies the given list of mapper options.
Warning
Quoting from the joined eager loading docs:
When including `joinedload()` in reference to a one-to-many or many-to-many collection, the `Result.unique()` method must be applied to the returned result, which will uniquify the incoming rows by primary key that otherwise are multiplied out by the join. The ORM will raise an error if this is not present. This is not automatic in modern SQLAlchemy, as it changes the behavior of the result set to return fewer ORM objects than the statement would normally return in terms of number of rows. Therefore SQLAlchemy keeps the use of `Result.unique()` explicit, so there is no ambiguity that the returned objects are being uniquified on primary key.
This is, when fetching many rows and using joined eager loading, the
unique()
method or related (i.e.unique_all()
) must be called to ensure that the rows are unique on primary key (see the examples below).To learn more about options, see the Query.options docs.
Parameters
args
: The options to apply.Returns
Self
: The instance itself for method chaining.Examples
Joined eager loading:
>>> query = select(User) >>> aq = AsyncQuery(query) >>> users = await aq.options(joinedload(User.posts)) ... .unique_all() # required for joinedload() >>> users [User(id=1), User(id=2), ...] >>> users[0].posts [Post(id=1), Post(id=2), ...] >>> user = await aq.options(joinedload(User.posts)).first() >>> user User(id=1) >>> users.posts [Post(id=1), Post(id=2), ...]
Subquery eager loading:
>>> users = await aq.options(subqueryload(User.posts)).all() >>> users [User(id=1), User(id=2), ...] >>> users[0].posts [Post(id=1), Post(id=2), ...]
Eager loading without calling unique() before all():
where
Applies one or more WHERE criteria to the query.
It supports both Django-like syntax and SQLAlchemy syntax.
Parameters
criteria
: SQLAlchemy style filter expressions.filters
: Django-style filters.Returns
Self
: The instance itself for method chaining.Examples
Using Django-like syntax:
>>> query = select(User) >>> async_query = AsyncQuery(query) >>> users = await async_query.where(age__gte=18).all() >>> users [User(id=1), User(id=2), ...] >>> users = await async_query.where( ... name__like='%John%', ... age=30 ... ).all() >>> users [User(id=2)]
Using SQLAlchemy syntax:
>>> users = await async_query.where(User.age >= 18).all() >>> users [User(id=1), User(id=2), ...] >>> users = await async_query.where( ... User.name == 'John Doe', ... User.age == 30 ... ).all() >>> users [User(id=2)]
Using both syntaxes:
filter
Synonym for
where()
.
find
Synonym for
where()
.
search
def search(
search_term: str,
columns: Sequence[str | InstrumentedAttribute[Any]] | None = None,
) -> Self
Applies a search filter to the query.
Searches for
search_term
in the searchable columns of the model. Ifcolumns
are provided, searches only these columns.Parameters
search_term
: Search term.columns
: Columns to search in.Returns
Self
: The instance itself for method chaining.Examples
Usage:
>>> query = select(User) >>> async_query = AsyncQuery(query) >>> users = await async_query.search(search_term='John').all() >>> users [User(id=2), User(id=6)] >>> users[0].name John Doe >>> users[0].username John321 >>> users[1].name Diana Johnson >>> users[1].username Diana84
Searching specific columns:
order_by
Applies one or more ORDER BY criteria to the query.
It supports both Django-like syntax and SQLAlchemy syntax.
Parameters
columns
: Django-like or SQLAlchemy sort expressions.Returns
Self
: The instance itself for method chaining.Examples
Using Django-like syntax:
>>> query = select(Post) >>> async_query = AsyncQuery(query) >>> posts = await async_query.order_by('-rating', 'user___name').all() >>> posts [Post(id=1), Post(id=4), ...]
Using SQLAlchemy syntax:
Using both syntaxes:
sort
Synonym for
order_by()
.
group_by
def group_by(
*columns: ColumnExpressionOrStrLabelArgument,
select_columns: Sequence[_ColumnsClauseArgument[Any]] | None = None,
) -> Self
Applies one or more GROUP BY criteria to the query.
It supports both Django-like syntax and SQLAlchemy syntax.
It is recommended to select specific columns. You can use the
select_columns
parameter to select specific columns.Parameters
columns
: Django-like or SQLAlchemy columns.select_columns
: Columns to be selected (recommended).Returns
Self
: The instance itself for method chaining.Examples
Usage:
>>> from sqlalchemy.sql.functions import func >>> query = select(User) >>> async_query = AsyncQuery(query) >>> columns = (User.age, func.count(User.name)) >>> async_query.group_by( ... User.age, select_columns=columns ... ) >>> rows = await async_query.all(scalars=False) [(30, 2), (32, 1), ...]
You can also call
select()
before callinggroup_by()
:>>> from sqlalchemy.sql import text, func >>> query = select(Post) >>> async_query = AsyncQuery(query) >>> async_query.select( ... Post.rating, ... text('users_1.name'), ... func.count(Post.title) ... ) >>> async_query.group_by('rating', 'user___name') >>> rows = async_query.all(scalars=False) >>> rows [(4, 'John Doe', 1), (5, 'Jane Doe', 1), ...]
offset
Applies an OFFSET clause to the query.
Parameters
offset
: Number of rows to skip.Returns
Self
: The instance itself for method chaining.Raises
ValueError
: Ifoffset
is negative.Examples
Usage:
>>> query = select(User) >>> async_query = AsyncQuery(query) >>> users = await async_query.all() >>> users [User(id=1), User(id=2), ...] >>> users = await async_query.offset(10).all() >>> users [User(id=11), User(id=12), ...] >>> async_query.offset(-1) Traceback (most recent call last): ... ValueError: offset must be >= 0
skip
Synonym for
offset()
.
limit
Applies a LIMIT clause to the query.
Parameters
limit
: Maximum number of rows to return.Returns
Self
: The instance itself for method chaining.Raises
ValueError
: Iflimit
is negative.Examples
>>> query = select(User) >>> async_query = AsyncQuery(query) >>> users = await async_query.all() >>> users [User(id=1), User(id=2), ...] >>> users = await async_query.limit(2).all() >>> users [User(id=1), User(id=2)] >>> async_query.limit(-1) Traceback (most recent call last): ... ValueError: limit must be >= 0
take
Synonym for
limit()
.
top
Synonym for
limit()
.
join
Joined eager loading using LEFT OUTER JOIN.
When a tuple is passed, the second element must be boolean, and if
True
, the join isINNER JOIN
, otherwiseLEFT OUTER JOIN
.Note
Only direct relationships can be loaded.
Parameters
paths
: Relationship attributes to join.Returns
Self
: The instance itself for method chaining.Raises
ValueError
: If the second element of tuple is not boolean.Examples
Usage:
>>> query = select(Comment) >>> async_query = AsyncQuery(query) >>> comment = await async_query.join( ... Comment.user, # LEFT OUTER JOIN ... (Comment.post, True) # True = INNER JOIN ... ).first() >>> comment Comment(id=1) >>> comment.user User(id=1) >>> comment.post Post(id=1) >>> async_query.join( ... Comment.user, ... (Comment.post, 'inner') # invalid argument ... ) Traceback (most recent call last): ... ValueError: expected boolean for second element of tuple, got str: 'inner'
with_subquery
Subqueryload or Selectinload eager loading.
Emits a second SELECT statement (Subqueryload) for each relationship to be loaded, across all result objects at once.
When a tuple is passed, the second element must be boolean. If it is
True
, the eager loading strategy is SELECT IN (Selectinload), otherwise SELECT JOIN (Subqueryload).Warning
A query which makes use of
subqueryload()
in conjunction with a limiting modifier such asQuery.limit()
orQuery.offset()
should always includeQuery.order_by()
against unique column(s) such as the primary key, so that the additional queries emitted bysubqueryload()
include the same ordering as used by the parent query. Without it, there is a chance that the inner query could return the wrong rows, as specified in The importance of ordering.Incorrect, LIMIT without ORDER BY:
Incorrect if User.name is not unique:
Correct:
To get more information about SELECT IN and SELECT JOIN strategies, , see the
loading relationships docs
.Note
Only direct relationships can be loaded.
Parameters
paths
: Relationship attributes to load.Returns
Self
: The instance itself for method chaining.Raises
ValueError
: If the second element of tuple is not boolean.Examples
Usage:
>>> query = select(User) >>> async_query = AsyncQuery(query) >>> users = await async_query.with_subquery( ... User.posts, # SELECT JOIN ... (User.comments, True) # True = SELECT IN ... ).all() >>> users[0] User(id=1) >>> users[0].posts # loaded using SELECT JOIN [Post(id=1), Post(id=2), ...] >>> users[0].posts[0].comments # loaded using SELECT IN [Comment(id=1), Comment(id=2), ...] >>> async_query.with_subquery( ... User.posts, ... (User.comments, 'selectin') # invalid argument ... ) Traceback (most recent call last): ... ValueError: expected boolean for second element of tuple, got str: 'selectin'
Using a limiting modifier:
>>> user = await async_query.with_subquery( ... User.posts, # SELECT JOIN ... (User.comments, True) # True = SELECT IN ... ).sort('id') # sorting modifier (Important!!!) ... .first() # limiting modifier >>> user = await async_query.with_subquery( ... User.posts, # SELECT JOIN ... (User.comments, True) # True = SELECT IN ... ).limit(1) # limiting modifier ... .sort('id') # sorting modifier (Important!!!) ... .all()[0] >>> user User(id=1) >>> user.posts # loaded using SELECT JOIN [Post(id=1), Post(id=2), ...] >>> user.posts[0].comments # loaded using SELECT IN [Comment(id=1), Comment(id=2), ...]
with_schema
Joined, subqueryload and selectinload eager loading.
Useful for complex cases where you need to load nested relationships in separate queries.
Warning
A query which makes use of
subqueryload()
in conjunction with a limiting modifier such asQuery.limit()
orQuery.offset()
should always includeQuery.order_by()
against unique column(s) such as the primary key, so that the additional queries emitted bysubqueryload()
include the same ordering as used by the parent query. Without it, there is a chance that the inner query could return the wrong rows, as specified in The importance of ordering.Incorrect, LIMIT without ORDER BY:
Incorrect if User.name is not unique:
Correct:
To get more information about SELECT IN and SELECT JOIN strategies, , see the
loading relationships docs
.Parameters
schema
: Dictionary defining the loading strategy.Returns
Self
: The instance itself for method chaining.>>> from sqlactive import JOINED, SUBQUERY >>> schema = { ... User.posts: JOINED, # joinedload user ... User.comments: (SUBQUERY, { # load comments in separate query ... Comment.user: JOINED # but, in this separate query, join user ... }) ... } >>> query = select(User) >>> aq = AsyncQuery(query) >>> user = await aq.with_schema(schema) ... .order_by(User.id) # important when limiting ... .first() # limiting modifier >>> user User(id=1) >>> user.posts [Post(id=1), Post(id=2), ...] >>> user.posts[0].comments [Comment(id=1), Comment(id=2), ...] >>> user.posts[0].comments[0].user User(id=1)