Active Record Mixin
The ActiveRecordMixin
class provides a set of ActiveRecord-like helper methods
for SQLAlchemy models, allowing for more intuitive and chainable database
operations with async/await support.
It implements the functionality of both Session
and
Smart Queries
mixins.
Info
The examples below assume the following models:
from sqlalchemy import ForeignKey, String
from sqlalchemy.ext.hybrid import hybrid_method, hybrid_property
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlactive.base_model import ActiveRecordBaseModel
class BaseModel(ActiveRecordBaseModel):
__abstract__ = True
class User(BaseModel):
__tablename__ = 'users'
id: Mapped[int] = mapped_column(
primary_key=True, autoincrement=True, index=True
)
username: Mapped[str] = mapped_column(
String(18), nullable=False, unique=True
)
name: Mapped[str] = mapped_column(String(50), nullable=False)
age: Mapped[int] = mapped_column(nullable=False)
posts: Mapped[list['Post']] = relationship(back_populates='user')
comments: Mapped[list['Comment']] = relationship(back_populates='user')
@hybrid_property
def is_adult(self) -> int:
return self.age > 18
@hybrid_method
def older_than(self, other: 'User') -> bool:
return self.age > other.age
class Post(BaseModel):
__tablename__ = 'posts'
id: Mapped[int] = mapped_column(
primary_key=True, autoincrement=True, index=True
)
title: Mapped[str] = mapped_column(String(100), nullable=False)
body: Mapped[str] = mapped_column(nullable=False)
rating: Mapped[int] = mapped_column(nullable=False)
user_id: Mapped[int] = mapped_column(ForeignKey('users.id'))
user: Mapped['User'] = relationship(back_populates='posts')
comments: Mapped[list['Comment']] = relationship(back_populates='post')
class Comment(BaseModel):
__tablename__ = 'comments'
id: Mapped[int] = mapped_column(
primary_key=True, autoincrement=True, index=True
)
body: Mapped[str] = mapped_column(nullable=False)
post_id: Mapped[int] = mapped_column(ForeignKey('posts.id'))
user_id: Mapped[int] = mapped_column(ForeignKey('users.id'))
post: Mapped['Post'] = relationship(back_populates='comments')
user: Mapped['User'] = relationship(back_populates='comments')
class Product(BaseModel):
__tablename__ = 'products'
id: Mapped[int] = mapped_column(
primary_key=True, autoincrement=True, index=True
)
name: Mapped[str] = mapped_column(String(100), nullable=False)
description: Mapped[str] = mapped_column(String(100), nullable=False)
price: Mapped[float] = mapped_column(nullable=False)
sells: Mapped[list['Sell']] = relationship(
back_populates='product', viewonly=True
)
class Sell(BaseModel):
__tablename__ = 'sells'
id: Mapped[int] = mapped_column(primary_key=True)
product_id: Mapped[int] = mapped_column(
ForeignKey('products.id'), primary_key=True
)
quantity: Mapped[int] = mapped_column(nullable=False)
product: Mapped['Product'] = relationship(back_populates='sells')
Usage
To use the ActiveRecordMixin
, create a base model class that inherits from it
and set the __abstract__
attribute to True
:
from sqlalchemy import Mapped, mapped_column
from sqlactive import ActiveRecordMixin
class BaseModel(ActiveRecordMixin):
__abstract__ = True
class User(BaseModel):
__tablename__ = 'users'
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
# ...and more
Tip
You can also make your base inherit from the ActiveRecordBaseModel
class
which is a combination of ActiveRecordMixin
, SerializationMixin
and
TimestampMixin
.
About Relationships
All relations used in filtering/sorting/grouping should be explicitly set,
not just being a backref
. This is because SQLActive does not know the
relation direction and cannot infer it. So, when defining a relationship
like:
It is required to define the reverse relationship:
Core Features
Creation, Updating, and Deletion
Creating Records
# Create a single record
bob = await User.insert(name='Bob')
joe = await User.create(name='Joe') # Synonym for insert()
# Create multiple records
users = [User(name='Alice'), User(name='Bob')]
await User.insert_all(users) # Shortcut for save_all()
Updating Records
# Update a single record
await user.update(name='Bob2')
# Update multiple records
users = await User.where(age=25).all()
for user in users:
user.name = f"{user.name} Jr."
await User.update_all(users) # Shortcut for save_all()
Deleting Records
# Delete a single record
await user.delete()
await user.remove() # Synonym for delete()
# Delete multiple records
users = await User.where(age=25).all()
await User.delete_all(users)
# Delete by primary keys
await User.destroy(1, 2, 3) # Deletes users with IDs 1, 2, and 3
Temporary Records
If you need to create a record for a short period of time, you can use the
with
statement:
The with
statement will create the record using the save()
method.
The record will be deleted at the end of the block using the delete()
.
Here is an example of using temporary records to test the isnull
filter operator:
post1 = Post(
title='Lorem ipsum',
body='Lorem ipsum dolor sit amet, consectetur adipiscing elit.',
rating=4,
user_id=1,
topic='Some topic', # this post has a topic
)
post2 = Post(
title='Lorem ipsum',
body='Lorem ipsum dolor sit amet, consectetur adipiscing elit.',
rating=4,
user_id=1, # this post does not have a topic
)
# post1 and post2 will be deleted at the end of the block
async with post1, post2:
# If isnull=True, only posts without a topic must be returned
posts = await Post.where(topic__isnull=True).all()
assert all([p.topic is None for p in posts]) is True
# If isnull=False, only posts with a topic must be returned
posts = await Post.where(topic__isnull=False).all()
assert all([post.topic is not None for p in posts]) is True
Querying
Basic Queries
# Get all records
users = await User.all()
# Get first record
user = await User.first() # None if no results found
# Get one record
user = await User.one() # Raises if no results found
user = await User.one_or_none() # Returns None if no results found
Filtering
The mixin supports both Django-like syntax and SQLAlchemy syntax for filtering:
# Django-like syntax
users = await User.where(name__like='%John%').all()
users = await User.where(name__like='%John%', age=30).all()
# SQLAlchemy syntax
users = await User.where(User.name == 'John Doe').all()
# Mixed syntax
users = await User.where(User.age == 30, name__like='%John%').all()
# Synonyms
users = await User.filter(name__like='%John%').all()
user = await User.find(name__like='%John%', age=30).one()
Sorting and Pagination
from sqlalchemy.sql import asc, desc
# Sorting (Django-like syntax)
users = await User.order_by('-created_at').all() # Descending order
users = await User.order_by('name').all() # Ascending order
users = await User.order_by('-created_at', 'name').all() # Multiple columns
# Sorting (SQLAlchemy syntax)
users = await User.sort(User.created_at.desc()).all() # Synonym for order_by()
users = await User.sort(asc(User.name)).all()
# Sorting (mixed syntax)
users = await User.order_by('-created_at', User.name.asc()).all()
users = await User.sort('-age', asc(User.name)).all()
# Pagination
users = await User.offset(10).limit(5).all() # Skip 10, take 5
users = await User.skip(10).take(5).all() # Same as above
Grouping
# Grouping (Django-like syntax)
users = await User.group_by(User.age).all()
users = await User.group_by(User.age, User.name).all()
# Grouping (SQLAlchemy syntax)
users = await User.group_by('age').all()
users = await User.group_by('age', 'name').all()
# Grouping (mixed syntax)
users = await User.group_by(User.age, 'name').all()
Eager Loading
Join Loading
comment = await Comment.join(
Comment.user, Comment.post # Left outer join
).first()
comment = await Comment.join(
Comment.user,
(Comment.post, True) # True means inner join
).first()
comments = await Comment.join(Comment.user, Comment.post)
.unique_all() # required for joinedload()
Subquery Loading
users = await User.with_subquery(
User.posts, # subquery loading
(User.comments, True) # True means selectinload
).unique_all() # important!
# With limiting and sorting (important for correct results)
users = await User.with_subquery(User.posts)
.limit(1)
.sort('id') # important!
.unique_all() # important!
Complex Schema Loading
from sqlactive import JOINED, SUBQUERY
schema = {
User.posts: JOINED, # joinedload user
User.comments: (SUBQUERY, { # load comments in separate query
Comment.user: JOINED # but join user in this separate query
})
}
user = await User.with_schema(schema).first()
Smart Queries
The Smart Query Mixin
provides a powerful smart query
builder that combines filtering, sorting, grouping and eager loading:
from sqlactive import JOINED
# Complex query with multiple features
users = await User.smart_query(
criterion=(User.age >= 18,),
filters={'name__like': '%John%'},
sort_columns=(User.username,),
sort_attrs=['-created_at'],
group_columns=(User.username,),
group_attrs=['age'],
schema={User.posts: JOINED}
).all()
API Reference
Class Properties
Most of class properties are inherited from
InspectionMixin
.
query
Returns a new
sqlalchemy.sql.Select
instance for the model.This is a shortcut for
select(cls)
.Examples
Is equivalent to:
Instance Methods
fill
Fills the object with values from
kwargs
without saving to the database.Parameters
kwargs
: Key-value pairs of columns to set.Returns
Self
: The instance itself for method chaining.Raises
AttributeError
: If attribute doesn't exist.NoSettableError
: If attribute is not settable.Examples
save
Saves the current row.
Note
All database errors will trigger a rollback and be raised.
Returns
Self
: The instance itself for method chaining.Raises
SQLAlchemyError
: If saving fails.Examples
update
Updates the current row with the provided values.
This is the same as calling
self.fill(**kwargs).save()
.Parameters
kwargs
: Key-value pairs of attributes to update.Returns
Self
: The instance itself for method chaining.Examples
delete
Deletes the current row.
Danger
This is not a soft delete method. It will permanently delete the row from the database. So, if you want to keep the row in the database, you can implement a custom soft delete method, i.e. using
save()
method to update the row with a flag indicating if the row is deleted or not (i.e. a booleanis_deleted
column).Raises
SQLAlchemyError
: If deleting fails.Examples
remove
Synonym for
delete()
.
Class Methods
insert
Inserts a new row and returns the saved instance.
Parameters
kwargs
: Key-value pairs for the new instance.Returns
Self
: The created instance for method chaining.Examples
create
Synonym for
insert()
.
save_all
Saves multiple rows in a single transaction.
When using this method to update existing rows, instances are not refreshed after commit by default. Accessing the attributes of the updated rows without refreshing them after commit will raise an
sqlalchemy.orm.exc.DetachedInstanceError
.To access the attributes of updated rows, the
refresh
flag must be set toTrue
in order to refresh them after commit.Warning
Refreshing multiple instances may be expensive, which may lead to a higher latency due to additional database queries.
Note
When inserting new rows, refreshing the instances after commit is no necessary. The instances are already available after commit, but you still can use the
refresh
flag to refresh them if needed.Parameters
rows
: Sequence of rows to be saved.refresh
: Whether to refresh the rows after commit (default:False
).Raises
SQLAlchemyError
: If saving fails.Examples
Inserting new rows:
>>> users = [ ... User(name='Bob Williams', age=30), ... User(name='Jane Doe', age=31), ... User(name='John Doe', age=32), ... ] >>> await User.save_all(users) >>> users[0].name Bob Williams >>> users[1].age 31
Updating existing rows (with refreshing after commit):
>>> users = User.where(name__endswith='Doe').all() >>> for user in users: ... user.name = user.name.replace('Doe', 'Smith') >>> await User.save_all(users, refresh=True) >>> users[0].name Jane Smith >>> users[1].name John Smith
Updating existing rows (without refreshing after commit):
insert_all
Inserts multiple rows in a single transaction.
This is mostly a shortcut for
save_all()
when inserting new rows.Note
When inserting new rows, refreshing the instances after commit is not necessary. The instances are already available after commit, but you still can use the
refresh
flag to refresh them if needed.See the
save_all()
method for more details.
update_all
Updates multiple rows in a single transaction.
This is mostly a shortcut for
save_all()
when updating existing rows.If you are planning to access the attributes of the updated instances after commit, you must set the
refresh
flag toTrue
in order to refresh them. Accessing the attributes of the updated instances without refreshing them after commit will raise ansqlalchemy.orm.exc.DetachedInstanceError
.Warning
Refreshing multiple instances may be expensive, which may lead to a higher latency due to additional database queries.
See the
save_all()
method for more details.
delete_all
Deletes multiple rows in a single transaction.
Danger
This is not a soft delete method. It will permanently delete the row from the database. So, if you want to keep the row in the database, you can implement a custom soft delete method, i.e. using
save()
method to update the row with a flag indicating if the row is deleted or not (i.e. a booleanis_deleted
column).Parameters
rows
: Sequence of rows to be deleted.Raises
SQLAlchemyError
: If deleting fails.Examples
destroy
Deletes multiple rows by their primary key.
This method can only be used if the model has a single primary key. Otherwise, it will raise a
CompositePrimaryKeyError
exception.Danger
This is not a soft delete method. It will permanently delete the row from the database. So, if you want to keep the row in the database, you can implement a custom soft delete method, i.e. using
save()
method to update the row with a flag indicating if the row is deleted or not (i.e. a booleanis_deleted
column).Parameters
ids
: Primary key values of rows to delete.Raises
CompositePrimaryKeyError
: If the model has a composite primary key.SQLAlchemyError
: If deleting fails.Examples
get
@classmethod
async def get(
pk: object,
join: Sequence[EagerLoadPath] | None = None,
subquery: Sequence[EagerLoadPath] | None = None,
schema: EagerSchema | None = None,
) -> Self | None
Fetches a row by primary key or
None
if no result is found.If multiple results are found, it will raise a
sqlalchemy.exc.MultipleResultsFound
exception.Parameters
pk
: Primary key value. It can also be a dict of composite primary key values.join
: Paths to join eager load. IMPORTANT See the docs ofjoin
method for details.subquery
: Paths to subquery eager load. IMPORTANT See the docs ofwith_subquery
method for details.schema
: Schema for the eager loading. IMPORTANT See the docs ofwith_schema
method for details.Returns
Self
: The instance for method chaining if found.None
: If no result is found.Raises
MultipleResultsFound
: If multiple rows match.Examples
get_or_fail
@classmethod
async def get_or_fail(
pk: object,
join: Sequence[EagerLoadPath] | None = None,
subquery: Sequence[EagerLoadPath] | None = None,
schema: EagerSchema | None = None,
) -> Self
Fetches a row by primary key or raises a
sqlalchemy.exc.NoResultFound
exception if no result is found.If multiple results are found, it will raise a
sqlalchemy.exc.MultipleResultsFound
exception.Parameters
pk
: Primary key value. It can also be a dict of composite primary key values.join
: Paths to join eager load. IMPORTANT See the docs ofjoin
method for details.subquery
: Paths to subquery eager load. IMPORTANT See the docs ofwith_subquery
method for details.schema
: Schema for the eager loading. IMPORTANT See the docs ofwith_schema
method for details.Returns
Self
: The instance for method chaining.Raises
NoResultFound
: If no result is found.MultipleResultsFound
: If multiple rows match.Examples
scalars
Returns a
sqlalchemy.engine.ScalarResult
instance containing all rows.Returns
sqlalchemy.engine.ScalarResult[Self]
: Result instance containing all scalars.Examples
first
Fetches the first row or
None
if no results are found.If
scalar
isTrue
, returns a scalar value (default).Parameters
scalar
: IfTrue
, returns a scalar value (Self
), otherwise returns a row (default:True
).Returns
Self
: Instance for method chaining (scalar).sqlalchemy.engine.Row[tuple[Any, ...]]
: Row.None
: If no result is found.Examples
Usage:
>>> user = await User.first() >>> user User(id=1) >>> user = await User.first(scalar=False) >>> user (User(id=1),)
Selecting specific columns:
one
Fetches one row or raises a
sqlalchemy.exc.NoResultFound
exception if no results are found.If multiple results are found, it will raise a
sqlalchemy.exc.MultipleResultsFound
exception.If
scalar
isTrue
, returns a scalar value (default).Parameters
scalar
: IfTrue
, returns a scalar value (Self
), otherwise returns a row (default:True
).Returns
Self
: Instance for method chaining (scalar).sqlalchemy.engine.Row[tuple[Any, ...]]
: Row.Raises
NoResultFound
: If no row is found.MultipleResultsFound
: If multiple rows match.Examples
Usage:
>>> user = await User.where(name='John Doe').one() >>> user User(id=1) >>> user = await User.where(name='John Doe').one(scalar=False) >>> user (User(id=1),) >>> user = await User.where(name='Unknown').one() Traceback (most recent call last): ... sqlalchemy.exc.NoResultFound: No row was found when one was required >>> user = await User.one() Traceback (most recent call last): ... sqlalchemy.exc.MultipleResultsFound: Multiple rows were found when one was required
Selecting specific columns:
one_or_none
Fetches one row or
None
if no results are found.If multiple results are found, it will raise a
sqlalchemy.exc.MultipleResultsFound
exception.If
scalar
isTrue
, returns a scalar value (default).Parameters
scalar
: IfTrue
, returns a scalar value (Self
), otherwise returns a row (default:True
).Returns
Self
: Instance for method chaining (scalar).sqlalchemy.engine.Row[tuple[Any, ...]]
: Row.None
: If no result is found.Raises
MultipleResultsFound
: If multiple rows match.Examples
Usage:
>>> user = await User.where(name='John Doe').one_or_none() >>> user User(id=1) >>> user = await User.where(name='John Doe').one_or_none(scalar=False) >>> user (User(id=1),) >>> user = await User.where(name='Unknown').one_or_none() >>> user None >>> user = await User.one_or_none() Traceback (most recent call last): ... sqlalchemy.exc.MultipleResultsFound: Multiple rows were found when one was required
Selecting specific columns:
all
Fetches all rows.
If
scalars
isTrue
, returns scalar values (default).Parameters
scalars
: IfTrue
, returns scalar values (Sequence[Self]
), otherwise returns rows (default:True
).Returns
Sequence[Self]
: Instances (scalars).Sequence[sqlalchemy.engine.Row[tuple[Any, ...]]]
: Rows.Examples
Usage:
>>> users = await User.all() >>> users [User(id=1), User(id=2), ...] >>> users = await User.all(scalars=False) >>> users [(User(id=1),), (User(id=2),), ...]
Selecting specific columns:
count
Fetches the number of rows.
Returns
int
: Number of rows.Examples
unique
Similar to
scalars()
but applies unique filtering to the objects returned in the result instance.If
scalars
isFalse
, returns asqlalchemy.engine.Result
instance instead of asqlalchemy.engine.ScalarResult
instance.Note
This method is different from
distinct()
in that it applies unique filtering to the objects returned in the result instance. If you need to apply unique filtering on the query (a DISTINCT clause), usedistinct()
instead.Parameters
scalars
: IfTrue
, returns asqlalchemy.engine.ScalarResult
instance. Otherwise, returns asqlalchemy.engine.Result
instance (default:True
).Returns
sqlalchemy.engine.ScalarResult[Self]
: Result instance containing all scalars.sqlalchemy.engine.Result[tuple[Any, ...]]
: Result instance containing all rows.Examples
unique_first
Similar to
first()
but applies unique filtering to the objects returned by eithersqlalchemy.engine.ScalarResult
orsqlalchemy.engine.Result
depending on the value ofscalar
.Note
This method is different from
distinct()
in that it applies unique filtering to the objects returned in the result instance. If you need to apply unique filtering on the query (a DISTINCT clause), usedistinct()
instead.
unique_one
Similar to
one()
but applies unique filtering to the objects returned by eithersqlalchemy.engine.ScalarResult
orsqlalchemy.engine.Result
depending on the value ofscalar
.Note
This method is different from
distinct()
in that it applies unique filtering to the objects returned in the result instance. If you need to apply unique filtering on the query (a DISTINCT clause), usedistinct()
instead.
unique_one_or_none
@classmethod
async def unique_one_or_none(scalar: bool = True) -> Self | Row[tuple[Any, ...]] | None
Similar to
one_or_none()
but applies unique filtering to the objects returned by eithersqlalchemy.engine.ScalarResult
orsqlalchemy.engine.Result
depending on the value ofscalar
.Note
This method is different from
distinct()
in that it applies unique filtering to the objects returned in the result instance. If you need to apply unique filtering on the query (a DISTINCT clause), usedistinct()
instead.See
unique()
andone_or_none()
for more details.
unique_all
@classmethod
async def unique_all(scalars: bool = True) -> Sequence[Self] | Sequence[Row[tuple[Any, ...]]]
Similar to
all()
but applies unique filtering to the objects returned by eithersqlalchemy.engine.ScalarResult
orsqlalchemy.engine.Result
depending on the value ofscalars
.Note
This method is different from
distinct()
in that it applies unique filtering to the objects returned in the result instance. If you need to apply unique filtering on the query (a DISTINCT clause), usedistinct()
instead.
unique_count
Similar to
count()
but applies unique filtering to the objects returned bysqlalchemy.engine.ScalarResult
.Note
This method is different from
distinct()
in that it applies unique filtering to the objects returned in the result instance. If you need to apply unique filtering on the query (a DISTINCT clause), usedistinct()
instead.
select
Replaces the columns clause with the given entities.
The existing set of FROMs are maintained, including those implied by the current columns clause.
Parameters
entities
: The entities to select.Returns
AsyncQuery[Self]
: Async query instance for chaining.Examples
distinct
Applies DISTINCT to the SELECT statement overall.
Returns
AsyncQuery[Self]
: Async query instance for chaining.Examples
options
Applies the given list of mapper options.
Warning
Quoting from the joined eager loading docs:
When including `joinedload()` in reference to a one-to-many or many-to-many collection, the `Result.unique()` method must be applied to the returned result, which will uniquify the incoming rows by primary key that otherwise are multiplied out by the join. The ORM will raise an error if this is not present. This is not automatic in modern SQLAlchemy, as it changes the behavior of the result set to return fewer ORM objects than the statement would normally return in terms of number of rows. Therefore SQLAlchemy keeps the use of `Result.unique()` explicit, so there is no ambiguity that the returned objects are being uniquified on primary key.
This is, when fetching many rows and using joined eager loading, the
unique()
method or related (i.e.unique_all()
) must be called to ensure that the rows are unique on primary key (see the examples below).To learn more about options, see the Query.options docs.
Parameters
args
: The options to apply.Returns
AsyncQuery[Self]
: Async query instance for chaining.Examples
Joined eager loading:
>>> users = await User.options(joinedload(User.posts)) ... .unique_all() # required for joinedload() >>> users [User(id=1), User(id=2), ...] >>> users[0].posts [Post(id=1), Post(id=2), ...] >>> user = await User.options(joinedload(User.posts)).first() >>> user User(id=1) >>> users.posts [Post(id=1), Post(id=2), ...]
Subquery eager loading:
>>> users = await User.options(subqueryload(User.posts)).all() >>> users [User(id=1), User(id=2), ...] >>> users[0].posts [Post(id=1), Post(id=2), ...]
Eager loading without calling unique() before all():
where
Applies one or more WHERE criteria to the query.
It supports both Django-like syntax and SQLAlchemy syntax.
Parameters
criteria
: SQLAlchemy style filter expressions.filters
: Django-style filters.Returns
AsyncQuery[Self]
: Async query instance for chaining.Examples
Using Django-like syntax:
>>> users = await User.where(age__gte=18).all() >>> users [User(id=1), User(id=2), ...] >>> users = await User.where(name__like='%John%', age=30).all() >>> users [User(id=2)]
Using SQLAlchemy syntax:
>>> users = await User.where(User.age >= 18).all() >>> users [User(id=1), User(id=2), ...] >>> users = await User.where(User.name == 'John Doe', User.age == 30).all() >>> users [User(id=2)]
Using both syntaxes:
filter
Synonym for
where()
.
find
Synonym for
where()
.
search
@classmethod
def search(
search_term: str,
columns: Sequence[str | InstrumentedAttribute[Any]] | None = None,
) -> AsyncQuery[Self]
Applies a search filter to the query.
Searches for
search_term
in the searchable columns of the model. Ifcolumns
are provided, searches only these columns.Parameters
search_term
: Search term.columns
: Columns to search in.Returns
AsyncQuery[Self]
: Async query instance for chaining.Examples
Usage:
>>> users = await User.search(search_term='John').all() >>> users [User(id=2), User(id=6)] >>> users[0].name John Doe >>> users[0].username John321 >>> users[1].name Diana Johnson >>> users[1].username Diana84
Searching specific columns:
order_by
Applies one or more ORDER BY criteria to the query.
It supports both Django-like syntax and SQLAlchemy syntax.
Parameters
columns
: Django-like or SQLAlchemy sort expressions.Returns
AsyncQuery[Self]
: Async query instance for chaining.Examples
Using Django-like syntax:
>>> posts = await Post.order_by('-rating', 'user___name').all() >>> posts [Post(id=1), Post(id=4), ...]
Using SQLAlchemy syntax:
Using both syntaxes:
sort
Synonym for
order_by()
.
group_by
@classmethod
def group_by(
*columns: ColumnExpressionOrStrLabelArgument[Any],
select_columns: Sequence[_ColumnsClauseArgument[Any]] | None = None,
) -> AsyncQuery[Self]
Applies one or more GROUP BY criteria to the query.
It supports both Django-like syntax and SQLAlchemy syntax.
It is recommended to select specific columns. You can use the
select_columns
parameter to select specific columns.Parameters
columns
: Django-like or SQLAlchemy columns.select_columns
: Columns to be selected (recommended).Returns
AsyncQuery[Self]
: Async query instance for chaining.Examples
Usage:
>>> from sqlalchemy.sql.functions import func >>> columns = (User.age, func.count(User.name)) >>> async_query = User.group_by( ... User.age, select_columns=columns ... ) >>> rows = await async_query.all(scalars=False) [(30, 2), (32, 1), ...]
You can also call
select()
before callinggroup_by()
:
offset
Applies an OFFSET clause to the query.
Parameters
offset
: Number of rows to skip.Returns
AsyncQuery[Self]
: Async query instance for chaining.Raises
ValueError
: Ifoffset
is negative.Examples
Usage:
skip
Synonym for
offset()
.
limit
Applies a LIMIT clause to the query.
Parameters
limit
: Maximum number of rows to return.Returns
AsyncQuery[Self]
: Async query instance for chaining.Raises
ValueError
: Iflimit
is negative.Examples
take
Synonym for
limit()
.
top
Synonym for
limit()
.
join
Joined eager loading using LEFT OUTER JOIN.
When a tuple is passed, the second element must be boolean, and if
True
, the join isINNER JOIN
, otherwiseLEFT OUTER JOIN
.Note
Only direct relationships can be loaded.
Parameters
paths
: Relationship attributes to join.Returns
AsyncQuery[Self]
: Async query instance for chaining.Raises
ValueError
: If the second element of tuple is not boolean.Examples
Usage:
>>> comment = await Comment.join( ... Comment.user, # LEFT OUTER JOIN ... (Comment.post, True) # True = INNER JOIN ... ).first() >>> comment Comment(id=1) >>> comment.user User(id=1) >>> comment.post Post(id=1) >>> Comment.join( ... Comment.user, ... (Comment.post, 'inner') # invalid argument ... ) Traceback (most recent call last): ... ValueError: expected boolean for second element of tuple, got str: 'inner'
with_subquery
Subqueryload or Selectinload eager loading.
Emits a second SELECT statement (Subqueryload) for each relationship to be loaded, across all result objects at once.
When a tuple is passed, the second element must be boolean. If it is
True
, the eager loading strategy is SELECT IN (Selectinload), otherwise SELECT JOIN (Subqueryload).Warning
A query which makes use of
subqueryload()
in conjunction with a limiting modifier such asQuery.limit()
orQuery.offset()
should always includeQuery.order_by()
against unique column(s) such as the primary key, so that the additional queries emitted bysubqueryload()
include the same ordering as used by the parent query. Without it, there is a chance that the inner query could return the wrong rows, as specified in The importance of ordering.Incorrect, LIMIT without ORDER BY:
Incorrect if User.name is not unique:
Correct:
To get more information about SELECT IN and SELECT JOIN strategies, , see the
loading relationships docs
.Note
Only direct relationships can be loaded.
Parameters
paths
: Relationship attributes to load.Returns
AsyncQuery[Self]
: Async query instance for chaining.Raises
ValueError
: If the second element of tuple is not boolean.Examples
Usage:
>>> users = await User.with_subquery( ... User.posts, # SELECT JOIN ... (User.comments, True) # True = SELECT IN ... ).all() >>> users[0] User(id=1) >>> users[0].posts # loaded using SELECT JOIN [Post(id=1), Post(id=2), ...] >>> users[0].posts[0].comments # loaded using SELECT IN [Comment(id=1), Comment(id=2), ...] >>> User.with_subquery( ... User.posts, ... (User.comments, 'selectin') # invalid argument ... ) Traceback (most recent call last): ... ValueError: expected boolean for second element of tuple, got str: 'selectin'
Using a limiting modifier:
>>> user = await User.with_subquery( ... User.posts, # SELECT JOIN ... (User.comments, True) # True = SELECT IN ... ).sort('id') # sorting modifier (Important!!!) ... .first() # limiting modifier >>> user = await User.with_subquery( ... User.posts, # SELECT JOIN ... (User.comments, True) # True = SELECT IN ... ).limit(1) # limiting modifier ... .sort('id') # sorting modifier (Important!!!) ... .all()[0] >>> user User(id=1) >>> user.posts # loaded using SELECT JOIN [Post(id=1), Post(id=2), ...] >>> user.posts[0].comments # loaded using SELECT IN [Comment(id=1), Comment(id=2), ...]
with_schema
Joined, subqueryload and selectinload eager loading.
Useful for complex cases where you need to load nested relationships in separate queries.
Warning
A query which makes use of
subqueryload()
in conjunction with a limiting modifier such asQuery.limit()
orQuery.offset()
should always includeQuery.order_by()
against unique column(s) such as the primary key, so that the additional queries emitted bysubqueryload()
include the same ordering as used by the parent query. Without it, there is a chance that the inner query could return the wrong rows, as specified in The importance of ordering.Incorrect, LIMIT without ORDER BY:
Incorrect if User.name is not unique:
Correct:
To get more information about SELECT IN and SELECT JOIN strategies, , see the
loading relationships docs
.Parameters
schema
: Dictionary defining the loading strategy.Returns
AsyncQuery[Self]
: Async query instance for chaining.>>> from sqlactive import JOINED, SUBQUERY >>> schema = { ... User.posts: JOINED, # joinedload user ... User.comments: (SUBQUERY, { # load comments in separate query ... Comment.user: JOINED # but, in this separate query, join user ... }) ... } >>> user = await User.with_schema(schema) ... .order_by(User.id) # important when limiting ... .first() # limiting modifier >>> user User(id=1) >>> user.posts [Post(id=1), Post(id=2), ...] >>> user.posts[0].comments [Comment(id=1), Comment(id=2), ...] >>> user.posts[0].comments[0].user User(id=1)
smart_query
@classmethod
def smart_query(
criteria: Sequence[ColumnElement[bool]] | None = None,
filters: DjangoFilters | None = None,
sort_columns: Sequence[ColumnExpressionOrStrLabelArgument[Any]] | None = None,
sort_attrs: Sequence[str] | None = None,
group_columns: Sequence[ColumnExpressionOrStrLabelArgument[Any]] | None = None,
group_attrs: Sequence[str] | None = None,
schema: EagerSchema | None = None,
) -> AsyncQuery[Self]
Creates a query combining filtering, sorting, grouping and eager loading. Then, wraps the query into an
AsyncQuery
instance and returns it.See
smart_query() from SmartQueryMixin
for details.
get_async_query
Returns an
AsyncQuery
instance with the providedsqlalchemy.sql.Select
instance.If no
sqlalchemy.sql.Select
instance is provided, it uses thequery
property of the model.Parameters
query
: SQLAlchemy query.Returns
AsyncQuery[Self]
: Async query instance for chaining.Examples
get_primary_key_name
Deprecated
This function is deprecated since version 0.2 and will be removed in future versions.
Use primary_key_name
property instead.
Gets the primary key name of the model.
Warning
This method can only be used if the model has a single primary key. If the model has a composite primary key, an
CompositePrimaryKeyError
is raised.Returns
str
: The name of the primary key.Raises
CompositePrimaryKeyError
: If the model has a composite primary key.Examples
Important Notes
-
When using
subqueryload()
with limiting modifiers (limit()
,offset()
), always includeorder_by()
with unique columns (like primary key) to ensure correct results. -
For joined eager loading with one-to-many or many-to-many relationships, use the
unique()
method or related (i.e.unique_all()
) to prevent duplicate rows: