Skip to content

Querying Data

Fullfinity provides a Django-like query API for fetching data.

products = await Product.filter().all()

Use get() when you expect exactly one result. It raises an error if zero or multiple records match:

# Get exactly one record - raises if not found or multiple found
product = await Product.filter(id=1).get()
product = await Product.filter(sku="LAPTOP-001").get()
# Raises MissingError if no record found
# Raises UserError if multiple records found

Use first() when you want one record or None:

# Returns first match or None
product = await Product.filter(active=True).first()

Use last() to get the last record. It reverses the current ordering (or defaults to id DESC):

# Returns last match or None
product = await Product.filter(active=True).last()
# With explicit ordering - gets last by created_date
product = await Product.filter(active=True).order_by("created_date ASC").last()

Look up a record, creating it if it doesn’t exist:

# Returns (instance, created) tuple
user, created = await User.get_or_create(
email="test@example.com",
defaults={"name": "Test User", "active": True}
)
if created:
print("New user created")
else:
print("Existing user found")

The defaults parameter contains fields used only when creating (not for lookup).

# Equality
products = await Product.filter(active=True).all()
# Multiple conditions (AND)
products = await Product.filter(active=True, category="electronics").all()
LookupDescriptionExample
__eqEqual (default)age__eq=25
__neqNot equalstatus__neq="deleted"
__gtGreater thanprice__gt=100
__gteGreater than or equalage__gte=18
__ltLess thanstock__lt=10
__lteLess than or equalprice__lte=1000
__inIn listid__in=[1, 2, 3]
__ninNot in liststatus__nin=["deleted", "archived"]
__isnullIs NULLdeleted_at__isnull=True
__isnotnullIs not NULLemail__isnotnull=True
__containsContains (case-sensitive)name__contains="Pro"
__icontainsContains (case-insensitive)name__icontains="pro"
__startswithStarts withemail__startswith="admin"
__endswithEnds withemail__endswith="@company.com"
__iexactExact (case-insensitive)email__iexact="JOHN@EXAMPLE.COM"
__ncontainsDoes not contain (case-sensitive)name__ncontains="test"
__nicontainsDoes not contain (case-insensitive)name__nicontains="test"
__nstartswithDoes not start withcode__nstartswith="TEMP"
__nendswithDoes not end withemail__nendswith="@spam.com"
__niexactNot exact (case-insensitive)status__niexact="DELETED"

:::caution Lookups that do not exist on the database path There is no __between / __range (express ranges with __gte + __lte), no date-component lookups (__year / __month / __day / __date — bucket dates with the DateTrunc annotation below instead), and no case-insensitive prefix/suffix lookups for queries (__istartswith / __iendswith are evaluated only in-memory for Q-expression strings, not in SQL). The only case-insensitive lookups available against the database are __iexact, __icontains, __nicontains, and __niexact. Using an unsupported suffix raises QParseError. :::

# Price between 100 and 1000
products = await Product.filter(
price__gte=100,
price__lte=1000
).all()
# Name contains "laptop" (case-insensitive)
products = await Product.filter(name__icontains="laptop").all()
# Not deleted
users = await User.filter(deleted_at__isnull=True).all()
# Status is one of these
orders = await Order.filter(status__in=["pending", "processing"]).all()

For complex queries with OR, NOT, and nested conditions:

from fullfinity.engine.Q import Q
# OR condition
users = await User.filter(
Q(role="admin") | Q(role="manager")
).all()
# AND condition (explicit)
users = await User.filter(
Q(active=True) & Q(verified=True)
).all()
# NOT condition
users = await User.filter(
~Q(status="deleted")
).all()
# Complex nested
users = await User.filter(
Q(active=True) & (Q(role="admin") | Q(role="manager"))
).all()
# Combine Q with kwargs
users = await User.filter(
Q(role="admin") | Q(role="manager"),
active=True,
company__country="US"
).all()
# Single level
orders = await Order.filter(customer__name="Acme Corp").all()
# Multiple levels
invoices = await Invoice.filter(
customer__company__country__code="US"
).all()
# With operators
orders = await Order.filter(customer__age__gte=18).all()

Filter through reverse relationships using the related_name:

# Contact has: parent = ManyToOne("Contact", related_name="contacts")
# Find companies that have child contacts named "John"
companies = await Contact.filter(contacts__name__icontains="John").all()
# Find companies with employees in California
companies = await Contact.filter(contacts__state__code="CA").all()
# Products with at least one active variant
# ProductVariant has: product = ManyToOne("Product", related_name="variants")
products = await Product.filter(variants__active=True).all()
# Orders containing a specific product
# OrderLine has: order = ManyToOne("Order", related_name="lines")
orders = await Order.filter(lines__product__id=product_id).all()
# Single hop: the M2M is directly on the queried model
products = await Product.filter(tags__name="Featured").all()
# Multi-hop: reach an M2M through a prior relation. Here `team` is a
# ManyToOne and `members` is a ManyToMany (through table) on the team — the
# join is resolved against the team, not the root model.
tickets = await Ticket.filter(team__members__id__eq=uid).all()

A ManyToMany can be traversed at any depth, including after other relation hops; the through table is matched against the model that owns the M2M.

# Ascending
products = await Product.filter().order_by("name ASC").all()
# Descending
products = await Product.filter().order_by("created_at DESC").all()
# Multiple fields
products = await Product.filter().order_by("category ASC", "name ASC").all()
# Limit
products = await Product.filter().limit(10).all()
# Offset
products = await Product.filter().offset(20).all()
# Combined (page 3, 10 per page)
products = await Product.filter().offset(20).limit(10).all()
# Helper function
async def paginate(model, page=1, per_page=20, **filters):
offset = (page - 1) * per_page
items = await model.filter(**filters).offset(offset).limit(per_page).all()
total = await model.filter(**filters).count()
return {
"items": items,
"page": page,
"per_page": per_page,
"total": total,
"pages": (total + per_page - 1) // per_page
}
# Count
total = await Product.filter().count()
active_count = await Product.filter(active=True).count()
# Sum
total_revenue = await Order.filter(status="paid").sum("total")
# Average
avg_price = await Product.filter(active=True).avg("price")
# Min/Max
min_price = await Product.filter().min("price")
max_price = await Product.filter().max("price")

exclude() is the inverse of filter() — every condition is negated. Kwargs become ~Q(...) and any Q object passed in is inverted:

# Everything except deleted
users = await User.exclude(status="deleted").all()
# Negate a Q expression
adults = await User.exclude(Q(age__lt=18)).all()
# Chains with filter()
await Product.filter(active=True).exclude(category="archived").all()
# Whole-row DISTINCT
rows = await Order.filter(state="confirmed").distinct().all()
# DISTINCT ON specific (stored) fields
cities = await Company.distinct("city").values_list("city", flat=True)

Passing field names switches to DISTINCT ON; the named fields must be stored columns (a store=False field raises ValidationError).

group_by() groups rows; annotate(alias=expression) adds computed aggregate columns under your chosen alias. Both are typically used together.

from fullfinity.engine.base import Sum, Avg, Min, Max, Count, RawSQL, DateTrunc
# Revenue per category
rows = await (
Order.filter(state="confirmed")
.annotate(revenue=Sum("total"), order_count=Count())
.group_by("category")
.all()
)
# Monthly totals — bucket a date with DateTrunc, then group on the alias
rows = await (
Order.filter(state="confirmed")
.annotate(month=DateTrunc("month", "created_date"), revenue=Sum("total"))
.group_by("month")
.all()
)

The aggregate/expression classes are all exported from fullfinity.engine.base:

ClassConstructorSQL
CountCount(field=None)COUNT(field) or COUNT(*)
SumSum(field)SUM(field)
AvgAvg(field)AVG(field)
MinMin(field)MIN(field)
MaxMax(field)MAX(field)
DateTruncDateTrunc(interval, field)DATE_TRUNC('interval', field) — interval is 'day', 'week', 'month', 'quarter', 'year'
RawSQLRawSQL(sql, params=None)the raw SQL fragment, aliased

group_by() accepts relation paths ("category__name") and rejects grouping by a non-relational store=False field.

For pessimistic locking, select_for_update() emits SELECT ... FOR UPDATE, holding the row lock until the surrounding transaction commits (each request runs in a transaction). Use it to serialize concurrent updates to the same row:

# Lock the row, then mutate it — concurrent requests block until this commits
quant = await Quant.filter(id=1).select_for_update().first()
await quant.update(quantity=quant.quantity - 10)
ArgumentEffect
nowait=TrueFail immediately if a row is already locked (SELECT ... FOR UPDATE NOWAIT)
skip_locked=TrueSkip rows that are already locked (SELECT ... FOR UPDATE SKIP LOCKED)

Both default to False; passing both True raises ValidationError.

The with_fields() method allows you to control exactly which fields are fetched and hydrated, improving performance by avoiding unnecessary computation.

# Only fetch specific fields
products = await Product.with_fields("id", "name", "price").all()

Fields in Fullfinity fall into two categories:

Field TypeStorageBehavior
Stored fieldsIn databaseAlways available after query
Non-stored calculated fields (store=False)Computed at runtimeMust be hydrated
class Product(Model):
name = Char(max_length=255) # Stored - always available
price = Float() # Stored - always available
quantity = Integer() # Stored - always available
# Non-stored calculated field - requires hydration
margin = Float(calculate="calc_margin", store=False)
# Non-stored related field - requires hydration
currency_symbol = Char(related_field="currency__symbol", store=False)

By default, only root-level records have their non-stored calculated fields hydrated:

# Root records: all calculated fields hydrated ✓
products = await Product.filter().all()
print(product.margin) # Works - root level
# Nested records: calculated fields NOT hydrated ✗
order = await Order.filter(id=1).prefetch_related("lines__product").first()
print(order.lines[0].product.margin) # Raises AttributeError!

Accessing a non-hydrated calculated field returns a LazyRelation that raises on use or can be awaited:

# Option 1: Raises on direct use
order.total + 10 # RelationshipError: Calculated field 'total' on Order was not hydrated...
# Option 2: Await to fetch
total = await order.total # Fetches and returns the value

This fail-fast behavior prevents silent bugs from None values.

Use with_fields() with double-underscore notation to hydrate calculated fields on related records:

# Hydrate calculated fields on nested records
orders = await Order.filter().with_fields(
"total", # Root calculated field
"lines__subtotal", # O2M calculated field
"lines__product__margin", # Deeply nested calculated field
).all()
# Now you can access them
for order in orders:
print(order.total) # ✓
for line in order.lines:
print(line.subtotal) # ✓
print(line.product.margin) # ✓
ScenarioWhat Gets Hydrated
with_fields("name", "price")Only name and price on root records
with_fields("contact")Prefetch contact + hydrate display_name
with_fields("contact__email")Prefetch contact + hydrate display_name + email
with_fields("lines")Prefetch lines (O2M) + hydrate display_name on each
with_fields("lines__subtotal")Prefetch lines + hydrate display_name + subtotal
with_fields("tags")Prefetch tags (M2M) + hydrate display_name on each

:::info Key Points

  • Stored fields (like id, name) are always available - they come from the database
  • display_name is automatically included for all prefetched relations
  • Only non-stored calculated fields need explicit hydration via with_fields() :::
# Efficient: Only compute what's needed
orders = await Order.filter(state="confirmed").with_fields(
"name",
"total", # Calculated on Order
"lines__subtotal", # Calculated on OrderLine
).all()
for order in orders:
print(order.name) # ✓ Stored field (always available)
print(order.total) # ✓ Explicitly requested
for line in order.lines:
print(line.id) # ✓ Stored field (always available)
print(line.subtotal) # ✓ Explicitly requested
print(line.display_name) # ✓ Auto-included for relations
# Combine with filters and sorting
products = await Product.filter(active=True).with_fields(
"name", "margin", "stock_value"
).order_by("name ASC").limit(50).all()

Relations that haven’t been fetched return a LazyRelation sentinel:

order = await Order.filter(id=1).first() # No prefetch
order.lines # Returns LazyRelation sentinel
# Option 1: Raises on direct use
for line in order.lines: # RelationshipError!
pass
# Option 2: Await to fetch
lines = await order.lines # Fetches and returns the lines
for line in lines:
print(line.amount)
# Option 3: Use prefetch_related for bulk operations (recommended)
order = await Order.filter(id=1).prefetch_related("lines").first()
for line in order.lines: # Already loaded
print(line.amount)
Access PatternStored FieldsNon-stored CalculatedRelations
.all()✓ Available✓ Hydrated (root only)LazyRelation (await to fetch)
.with_fields("field")✓ Available✓ If requestedLazyRelation (await to fetch)
.with_fields("rel")✓ AvailableRoot only✓ + display_name
.with_fields("rel__field")✓ AvailableRoot only✓ + display_name + field
.prefetch_related("rel")✓ Available✓ Root only✓ + display_name
.skip_calculated_fields()✓ AvailableLazyRelation (await to fetch)LazyRelation (await to fetch)

For performance-critical queries where you only need raw database values, you can skip the computation of non-stored calculated fields:

# Fast existence check without computing display_name, etc.
exists = await Product.filter(identifier=ident).skip_calculated_fields().first()
# Use when you only need the record ID or stored fields
record = await Product.filter(code=code).skip_calculated_fields().first()
if record:
return record.id

What it skips:

  • Non-stored calculated fields (store=False, calculate="method")
  • Non-stored related fields (related_field="rel__field", store=False)

What it does NOT skip:

  • Stored computed fields (store=True) - already in database
  • Regular database columns

:::tip Performance Use skip_calculated_fields() for existence checks or simple lookups where you don’t need computed values. This can significantly improve performance when querying models with expensive calculated fields. :::

Within a request, the ORM maintains an identity map that ensures the same record ID always returns the same Python object instance. This provides consistency - if you modify a record and then fetch it again, you get your modified instance back.

In rare cases where you need fresh data from the database (ignoring any in-memory modifications), use bypass_cache():

# Normal behavior: returns cached/modified instance
invoice = await Invoice.filter(id=1).first()
invoice.name = "Modified"
invoice2 = await Invoice.filter(id=1).first()
print(invoice2.name) # "Modified" - same instance
# Bypass cache: fetch fresh from database
fresh = await Invoice.filter(id=1).bypass_cache().first()
print(fresh.name) # Original database value

All query methods can be chained:

orders = await Order.filter(
status__in=["pending", "confirmed"],
customer__country="US"
).order_by(
"created_at DESC"
).prefetch_related(
"customer",
"lines__product"
).limit(50).all()

Avoid N+1 queries:

# Without prefetch (N+1 problem)
orders = await Order.filter().all()
for order in orders:
await order.fetch_related("customer") # Extra query per order!
# With prefetch (efficient)
orders = await Order.filter().prefetch_related("customer").all()
for order in orders:
print(order.customer.name) # Already loaded
# Multiple relations
orders = await Order.filter().prefetch_related(
"customer",
"lines",
"lines__product"
).all()
# Nested relations
orders = await Order.filter().prefetch_related(
"customer__company__country"
).all()

All bulk operations go through the same create() and update() methods as single-record operations. Override these methods to intercept ALL creates/updates.

Insert multiple records in a single efficient query:

# Create multiple records at once
products = await Product.create([
{"name": "Product A", "price": 100, "active": True},
{"name": "Product B", "price": 200, "active": True},
{"name": "Product C", "price": 300, "active": False},
])
# With context (applied to all created instances)
products = await Product.create([...], ctx={"skip_variant": True})
# Returns list of created instances with IDs
for product in products:
print(f"Created: {product.id} - {product.name}")

The update() method works as both instance method and classmethod:

# Instance style - single record
await product.update(price=150, active=True)
# Instance style with context
await product.with_ctx(posting=True).update(state="Posted")
# Classmethod style - multiple records, ONE SQL query
await Product.update([product1, product2, product3], price=150, active=True)
# Classmethod style with context (applied to all records)
await Product.update(records, ctx={"posting": True}, state="Posted")

Update records matching a filter - delegates to Model.update():

# Update all matching records - goes through Model.update()
await Product.filter(category=5, active=True).update(price=100, featured=False)
# With Q expressions
await Product.filter(Q(stock=0) | Q(discontinued=True)).update(active=False)

The delete() method works as both instance method and classmethod:

# Instance style - single record
await product.delete()
# Classmethod style - multiple records
await Product.delete([product1, product2, product3])

Delete records matching a filter - delegates to Model.delete():

# Delete all matching records - goes through Model.delete()
await Product.filter(archived=True, expired_at__lt=cutoff_date).delete()
# With Q expressions
await Product.filter(Q(stock=0) & Q(discontinued=True)).delete()

Override create(), update(), and delete() to add custom logic that runs for ALL operations. No decorators needed — the metaclass auto-wraps these three lifecycle methods:

class Product(Model):
async def create(cls, records):
# records is ALWAYS a list (normalized by metaclass)
# Context is available on each record via record._ctx
for record in records:
record['audit_created_by'] = get_current_user()
if record._ctx and record._ctx.get('skip_audit'):
record.pop('audit_created_by', None)
return await super().create(records)
async def update(cls, records, **vals):
# This runs for ALL updates (instance, bulk, and QuerySet)
# Context is available on each record via record._ctx
if 'state' in vals and vals['state'] == 'posted':
for record in records:
if not record._ctx.get('skip_validation'):
await record.validate_can_post()
return await super().update(records, **vals)
async def delete(cls, records):
# This runs for ALL deletes (instance, bulk, and QuerySet)
for record in records:
if record.state == "posted":
raise UserError("Cannot delete posted records")
return await super().delete(records)

For complex queries that can’t be expressed with the ORM, use raw SQL via the environment:

from fullfinity.engine.context import env_ctx
env = env_ctx.get()
# Fetch multiple rows
results = await env.execute_sql(
"SELECT * FROM product WHERE price > $1 AND category_id = $2",
100, category_id
)
for row in results:
print(row["name"], row["price"])
# Fetch single row
product = await env.execute_sql_one(
"SELECT * FROM product WHERE id = $1", product_id
)
if product:
print(product["name"])
# Fetch scalar value (count, sum, etc.)
total = await env.execute_sql_scalar(
"SELECT SUM(quantity) FROM stock_quant WHERE product_id = $1", product_id
)
# Batch operations (efficient insert/update)
await env.execute_sql_many(
"INSERT INTO tag (name, color) VALUES ($1, $2)",
[("Sale", "red"), ("New", "green"), ("Featured", "blue")]
)
MethodDescriptionReturns
execute_sql(query, *args)Fetch multiple rowsList[Record]
execute_sql_one(query, *args)Fetch single rowRecord or None
execute_sql_scalar(query, *args)Fetch single valueAny
execute_sql_many(query, args_list)Batch insert/updateNone

Generate SQL without executing:

query, params = Product.filter(active=True).order_by("name ASC").to_sql()
print(f"Query: {query}")
print(f"Params: {params}")
from fullfinity.engine.Q import Q
async def search_products(
search_term=None,
category_id=None,
min_price=None,
max_price=None,
tags=None,
page=1,
per_page=20,
sort_by="name",
sort_order="ASC"
):
# Build base query
query = Product.filter(active=True)
# Apply filters
if search_term:
query = query.filter(
Q(name__icontains=search_term) |
Q(description__icontains=search_term) |
Q(sku__icontains=search_term)
)
if category_id:
query = query.filter(category=category_id)
if min_price:
query = query.filter(price__gte=min_price)
if max_price:
query = query.filter(price__lte=max_price)
if tags:
query = query.filter(tags__id__in=tags)
# Get total count
total = await query.count()
# Apply sorting
query = query.order_by(f"{sort_by} {sort_order}")
# Apply pagination
offset = (page - 1) * per_page
products = await query.offset(offset).limit(per_page).prefetch_related(
"category",
"tags"
).all()
return {
"items": products,
"total": total,
"page": page,
"per_page": per_page,
"pages": (total + per_page - 1) // per_page
}

Q-expression strings (views & record rules)

Section titled “Q-expression strings (views & record rules)”

In Python code you build Q objects directly. But some places author a Q as a string of data rather than code — view modifiers (filter / visible / readonly / required), security record rules, and name_search filters. These strings are turned into a Q by a single evaluator, eval_q, which parses with Python’s ast and walks the result against a strict whitelist — it never executes arbitrary Python. The accepted grammar is a frozen contract (see core/tests/test_q_expression_grammar.py); only these constructs are allowed:

ConstructExample
Q(...) conditionsQ(state='Posted'), Q(amount__gt=0), Q(tag__in=['a','b'])
The D(...) date helperQ(due__lt=D('today', days=-30))
Connectors & | ~Q(active=True) & ~Q(state='Done')
Nested Q positionalsQ(Q(a=1) | Q(b=2), c=3)
Context names (supplied per surface)Q(user__id=uid), Q(company=cid)
Attribute on a context nameQ(partner=self.partner_id) (name_search)

Which context names are available depends on the surface: record rules expose uid / cid / company_id / cids / contact_id; dashboard/stat filters expose uid / cid / D; name_search exposes term and self (the form’s sibling field values). Anything else — arbitrary function calls, attribute chains onto values, comprehensions, subscripts, arithmetic, unknown names — raises QParseError and nothing runs.

Canonical form. Write equality bare and use Python literals:

"Q(state='Posted')" # canonical — eq is the default operator
"Q(state__eq='Posted')" # accepted, but the __eq is redundant
"Q(active=True)" # Python True/False/None
"Q(active=true)" # accepted (lowercase alias), prefer True