Skip to content

Realtime Rails (WebSocket)

The React app holds one WebSocket to the server (/api/ws) for all live updates: in-app notifications, the collaborate chat, presence, import progress, and the “this record changed — reload?” prompt. This page describes that transport and the two things every feature reuses instead of hand-rolling: the notification primitive and the record-watch flow.

Delivery is cross-worker via Valkey pub/sub (fullfinity/engine/websocket.py). Each worker holds its own clients dict (access_token → socket); to reach a user we publish an envelope to the ws:fanout channel and whichever worker holds the target socket delivers it locally. So send_* helpers work regardless of which worker the recipient is connected to.

Every frame the client receives is one of three types, routed in app/src/components/WebSocketProvider.jsx:

typePurpose
notificationa plain server toast (title/message/icon)
activity_updatein-app notification frames, routed by data.kind (see below)
realtime_datachat / typing / presence / import-progress / client-action frames

An activity_update frame is not one thing — it carries a data.kind so each source is handled on its own terms. Adding a new source is a new case, not surgery on a shared block:

kindClient handling
message (default)bump the bell badge, live-append to an open record’s chatter (recordMessageBus), and toast who · what
record_messagelive-append to an open record’s chatter only (recordMessageBus) — no bell/toast. Pushed to the record’s viewers (the watch registry) so an open Form updates even when the viewer doesn’t follow it (e.g. an agent handling a live chat).
record_changedhand to recordChangeBus; the open Form offers a reload (no bell, no generic toast)

This is the API a third-party module uses to make a message reach people. When a user receives a message (chatter post, @mention, follower notification, helpdesk reply, live chat), every notifier builds the same shape so the toast, the bell, and Web Push are consistent. That shape is Message.notification_payload() (fullfinity/modules/core/models/collaborate.py):

{
"kind": "message",
"model": "...", "document": 123, "message_id": 45,
"title": "Jane Doe", # who
"body": "Can you take a look?" # what (first 140 chars, HTML stripped)
}

The transport is the existing send_activity_update(user_id, payload)do not invent a parallel notify channel.

The one call you usually want: message.notify_record_followers()

Section titled “The one call you usually want: message.notify_record_followers()”

If your module creates a message with a bare Message.create(...) (a reply box, an inbound webhook, an outbound email, a bot post) and you want it to land in the right people’s badge / unread inbox / live toast, call one method right after creating it:

msg = (await get_model("Message").create(
content=body, author=contact, message_type="Message",
model="HelpdeskTicket", document=ticket_id,
))[0]
await msg.notify_record_followers() # ← resolves audience, pushes WS + Web Push, records the inbox rows

notify_record_followers() (on Message) does the whole job: it resolves the audience (record followers ∪ the message’s @mentions, minus the author), respects each recipient’s notification preference, sends the WS push and Web Push, and links the recipients into notified_contacts — the rows that drive the per-recipient unread inbox (see below). It is the authored-message analogue of the inbound-mail path’s set_notified_contacts_from_email.

Call it exactly once per message. It both notifies and records the inbox rows, so a second call re-pushes a duplicate toast. Create the message at exactly one site and notify there; don’t also notify from a lifecycle seam on the same message (the helpdesk seam, for instance, deliberately does not notify — the create site owns it).

The chatter quick-reply and the @mention compose box don’t need this — they go through Message.create_message(..., send=True), which already runs the same audience + notify logic. notify_record_followers() is for the bare-create paths that bypass create_message.

Audience helpers (when you need to go lower-level)

Section titled “Audience helpers (when you need to go lower-level)”

Audience is per-event, not one global list — for a message it’s the record’s followers ∪ the message’s @mentions:

  • Message.record_followers(model, document) → the standing followers half (a Followers row per contact). Build a record’s audience from this; add a follower with Followers.create(model=..., document=..., contact=...).
  • set_notified_contacts(message, contacts, author, model, document) → the engine that filters by preference, pushes WS + Web Push, and returns the contact ids to link. notify_record_followers wraps this; reach for it directly only if you’ve already resolved a custom contact list.
  • send_activity_update(user.id, await message.notification_payload()) → the raw transport, for a one-off ping to a specific user (e.g. a domain event that isn’t a record message). Reuse notification_payload() for the shape; never hand-build a parallel envelope.

Make the right people followers. A notification only reaches someone who is a follower (or @mentioned). So a module that “owns” a record for a user should make that user a follower — e.g. helpdesk auto-follows a ticket’s assigned_user (_ensure_assignee_follows, run on assignment), which is what puts the agent in the badge/inbox for every future message, with no per-message special-casing.

Per-recipient read state & the unified unread inbox

Section titled “Per-recipient read state & the unified unread inbox”

Read state for document messages is per recipient, not a single global flag. The row that already exists for “this contact was notified about this message” — FkMessageNotifiedContact (the notified_contacts junction populated by set_notified_contacts / notify_record_followers) — carries a read boolean. So “unread for me” is simply:

FkMessageNotifiedContact.filter(contact=my_contact_id, read=False)

That one query is the single definition of the messages inbox — the bell badge count and the Messages popover list both derive from it (no more badge-counts-X / popover-lists-Y mismatch). Because notified_contacts is already followers ∪ mentioned ∪ explicit recipients, this inbox folds in the “following” dimension for free.

  • Count (GET /api/activity-stream/unread-count) → FkMessageNotifiedContact.filter(contact=me, read=False).count().
  • List (POST /api/activity-stream with unread_only: true) → the messages on those unread rows, newest first. Without the flag the feed returns read+unread and stamps each item’s read from the same junction.
  • Mark read (POST /api/activity-stream/read, auth="internal") — scope is one of: {model, document} (mark every message on a record read — the Form dwell trigger), {message_id} (one message — the popover tick), or {all: true}. It resolves the target message ids and runs FkMessageNotifiedContact.filter(contact=me, message__in=ids, read=False).update(read=True), returning {updated: N}. No scope → no-op.

When messages get marked read (frontend). Two triggers, both in the React app:

  • DwellForm.jsx starts a 10s timer on a saved record’s Form (same gate as record-watch: not transient, not window-mode, real id) and calls markRecordRead(model, id) once it fires; the timer is cleared on unmount / id change so quick navigations don’t mark read.
  • Manual tick — each row in the Messages popover has a check ActionIcon calling markMessageRead(id), which persists and drops the row from the unread list.

NotificationContext (app/src/contexts/NotificationContext.jsx) owns markRecordRead, markMessageRead, and markAllMessagesRead; the API wrappers are in app/src/api/activityStream.js.

Presence is a TTL heartbeat, not a stored field. On WebSocket connect a per-user key presence:{db}:{user_id} is set in Valkey (db 11) with a ~35s TTL, refreshed on each client heartbeat (~20s) and deleted on disconnect (mark_online/refresh_presence/mark_offline in fullfinity/engine/websocket.py). A lapsed key is “offline”.

Presence is workspace-wide, not scoped to shared channels. On connect/disconnect, broadcast_presence(user_id, online) pushes a realtime_data frame ({event: "presence", user_id, online}) to every other user currently online in the same DB — it resolves that audience by scanning the per-DB presence keyspace (online_user_ids(db)), so a teal presence dot stays live on a user’s avatar anywhere it appears, not just inside a chat they share. Tenant isolation comes from the db in the key prefix.

A newly-connected client seeds its presence map from GET /api/chat/presence (returns the currently-online user ids) so dots are correct immediately, before any live frame arrives; pushed frames keep it live thereafter. The client store is presence in app/src/contexts/ChatContext.jsx (user_id → bool).

Avatars consume this through the single avatar component BaseAvatar (app/src/components/ui/fields/display/AvatarWidget.jsx): pass model="User" + recordId and it renders the live dot and opens a 1:1 direct-message floating widget on click (your own avatar shows its dot but isn’t click-to-chat). Only User avatars subscribe to chat state; every other avatar stays a plain presentational component.

When someone edits a record another user has open, that user’s Form prompts them to reload. No polling — it rides the same WS.

Backend. The viewer registry lives in Valkey (fullfinity/engine/websocket.py):

  • watch_record / unwatch_record — add/remove a viewer’s token to watch:{db}:{model}:{doc} (a Set). A per-token reverse index (watchidx:{db}:{token}) lets a disconnect clean up without scanning. A stale token is harmless — delivery filters to locally-held sockets — so the keys carry only a long backstop TTL; disconnect + unwatch frames do the real cleanup.
  • notify_records_changed(model, documents, exclude_token=None) — pings every watcher of any of those records except the editor’s own session, using one pipelined SMEMBERS for the whole batch (cost independent of batch size). notify_record_changed is the single-record convenience over it.
  • schedule_record_change_pings(model, documents, editor_user_id) — the entry point Model.update calls. It is fire-and-forget: it spawns a background task (resolving the editor’s token over the async client and excluding it) so the write never waits on Valkey for this best-effort ping. It’s called at the end of Model.update (the chokepoint all updates, including QuerySet.update, route through) and skipped for internal compute-cascade churn. Keeping it off the write’s critical path is deliberate: an awaited, per-row, sync-client version would add a blocking round-trip to every update; don’t reintroduce that.

A per-db gate keeps even the background work near-zero when nobody’s watching. A single presence key (watchers_active:{db}) is lit by watch_record and refreshed on each heartbeat while a client still has a Form open (refresh_watchers_presence), so it auto-expires shortly after the last watcher leaves — no member-set bookkeeping, no remove-races. Each worker caches the EXISTS result for ~2s (_db_has_watchers), so a burst of writes does at most one gate check per window. A stale “yes” just does a harmless empty lookup; a stale “no” can delay pings to a just-opened form by at most the cache TTL, then self-heals.

The WS receive loop (fullfinity/engine/api/api.py) handles inbound watch / unwatch frames and calls clear_watches on disconnect.

Frontend. Three small modules under app/src/utils/:

  • recordWatch.jswatchRecord(model, id) / unwatchRecord(model, id) send the frames and track the active set; setWatchSender (wired in WebSocketProvider) replays the active watches on every reconnect, since the server forgets them on disconnect.
  • recordChangeBus.jssubscribeRecordChange(cb) / emitRecordChange(payload), mirroring recordMessageBus.

Form.jsx watches while mounted on a saved record (not transient, not a window-mode sub-form), unwatches on unmount/navigation, and on a matching record_changed shows a dirty-aware toast:

  • Clean form → a single Reload (setViewReload(true) refetches the record).
  • Unsaved editsSave & reload (triggerFormSubmit()) or Discard & reload (setViewReload(true), dropping local edits for the server copy).
  1. Build the payload with a kind (reuse notification_payload() for message-shaped events).
  2. Push it over send_activity_update / publish_to_tokens — never a new socket or channel.
  3. If the client needs distinct handling, add a case to the activity_update switch in WebSocketProvider.jsx and feed an app-wide bus (like recordChangeBus) rather than reaching into a component directly.

Live-chat visitor widget (cross-origin, cookieless)

Section titled “Live-chat visitor widget (cross-origin, cookieless)”

The embeddable live-chat widget is not the React app and has no cookie, but it rides the same fanout. It opens wss://host/livechat/ws?token=<session>&db=<db> — the live-chat session token is the bearer (validated against LiveChatSession, no cookie), and the socket is registered in clients[session_token]. So publish_to_token(session_token, …) reaches the visitor cross-worker exactly like a user token — no fanout changes.

  • Receive (push-only): on connect the server sends {type: "history", messages: […]}, then pushes each new public message as {type: "message", message: {…}}. The push originates from a MessageLivechat(__inherit__="Message") seam (modules/helpdesk_livechat/models/message_livechat.py) that fires on every public Message on a ticket — so agent-console replies and ticket-chatter replies both reach the visitor, and the visitor’s own message echoes back (its is_agent is computed per session). The widget dedupes by message id.
  • Send (HTTP): the visitor POSTs messages/attachments (durable) — never over the WS — matching the internal “POST to persist, WS to deliver” split. There is no polling anywhere.
  • Presence: the visitor’s open WebSocket is the presence signal — touch_seen on connect + on each heartbeat frame (~25s), clear_presence on disconnect (drops the Valkey seen-key immediately). The agent’s avatar dot reads that key.
  • Typing: a typing frame over the WS → mark_typing (a short-TTL Valkey key the agent reads).
  • The DB context isn’t set by request middleware here, so the handler wraps each ORM op in DatabaseManager().transaction(db, None) + elevate() (the visitor is anonymous).