Event Bus
Quark can publish a lifecycle event after every Create,
Update, and Delete. Wire an EventBus implementation with
Client.UseEventBus and each committed write emits a typed event you
can route to a logger, OpenTelemetry, or an external broker
(NATS / Kafka / Redis Streams).
client.UseEventBus(quark.NewLoggerEventBus(slog.Default()))
// Every committed Create/Update/Delete now publishes an event.
quark.For[Order](ctx, client).Create(&order) // → "created" event
The interfaces
type Event interface {
Kind() string // "created" | "updated" | "deleted"
Table() string
Payload() any // the model value involved in the operation
}
type EventBus interface {
Publish(ctx context.Context, event Event) error
}
Payload() returns the model pointer the CRUD operation acted on, so
a subscriber can type-switch to recover the concrete struct:
func (b *myBus) Publish(ctx context.Context, e quark.Event) error {
if o, ok := e.Payload().(*Order); ok && e.Kind() == "created" {
return b.broker.Publish(ctx, "orders.created", o)
}
return nil
}
In-tree implementations
Two reference buses ship with Quark:
-
quark.NewLoggerEventBus(logger)— writes each event to anslog.Loggerat Info level. A nil logger falls back toslog.Default(). Never returns an error. -
quark.NewOTelEventBus(logger)— writes a correlation-tagged log record (event=quark.event.emit) suited to OpenTelemetry log/trace bridging. It deliberately does not pull the OTel SDK into the core package; for first-class spans, implementEventBusyourself.Requires an slog→OTel bridgeOTelEventBusdoes not emit a span on its own — it writes an slog record tagged for correlation. The OTel linkage only happens if your process installs an slog handler that bridges to OpenTelemetry (e.g.go.opentelemetry.io/contrib/bridges/otelslog). Without that bridge the record stays in slog only. If you need a real span emitted per event, implementEventBusagainst your tracer directly.
Both are starting points. Production deployments implement EventBus
against their broker of choice.
Delivery semantics
Quark does not ship a transactional outbox in v0.9. Events are published synchronously after the write commits. If the process crashes between the commit and the publish, the event is lost — the data is durable but the event never went out. If you need exactly-once or guaranteed delivery, write your own outbox table and publish from a poller, or make your subscriber idempotent and accept at-least-once. See ADR-0013.
When the write runs inside an explicit transaction
(client.Tx + ForTx[T]), the publish is registered via
Tx.OnCommit:
it fires after the commit is durable and is discarded on
rollback — a rolled-back write never emits an event.
When the write runs outside a transaction (For[T] against the
Client), the publish runs inline immediately after the statement.
What does and doesn't emit
Events fire from the per-row CRUD methods: Create (created),
Update / UpdateFields (updated), and Delete (deleted).
- Bulk and WHERE-based methods do NOT emit:
CreateBatch,UpdateBatch,DeleteBatch,DeleteBy, andUpdateMaphave no per-row*Tto attach to the event, so they are out of scope for v0.9. If you need events for bulk work, loop the single-row methods. - Emission is not gated on rows affected.
Update/Deletepublish their event whenever the call succeeds, even if theWHEREmatched zero rows. This is deliberate: "rows affected = 0" is ambiguous across engines (MySQL reports 0 for a no-change update that did match a row; PostgreSQL reports the match count), so gating on it would make emission engine-dependent. Treat an event as "the operation was attempted and committed", not "a row was definitely mutated". Subscribers that need the exact count should carry it in the payload themselves.
When Publish fails
The write is already persisted; an emit failure never rolls anything back. How the failure surfaces depends on the path:
| Path | On Publish error |
|---|---|
Non-transactional (For[T]) | The CRUD method returns the error wrapped in quark.ErrEventEmitFailed. The row stays written — do NOT retry the write; retry the emit or rely on subscriber idempotency. |
Transactional (ForTx[T] in Client.Tx) | Logged with event=quark.event.emit_failure. NOT propagated — the commit already returned success, so there is no return value left to carry it. |
if err := quark.For[Order](ctx, client).Create(&order); err != nil {
if errors.Is(err, quark.ErrEventEmitFailed) {
// The order IS saved. Only the event failed to publish.
// Re-publish or enqueue for retry; do NOT re-create.
} else {
// The write itself failed.
}
}
Connecting an external broker (skeleton)
type NATSBus struct{ nc *nats.Conn }
func (b *NATSBus) Publish(ctx context.Context, e quark.Event) error {
subject := "quark." + e.Table() + "." + e.Kind()
data, err := json.Marshal(e.Payload())
if err != nil {
return err
}
return b.nc.Publish(subject, data) // at-least-once; NATS handles delivery
}
client.UseEventBus(&NATSBus{nc: nc})
A Kafka or Redis Streams bus follows the same shape: serialize
e.Payload(), derive a topic/subject from e.Table() + e.Kind(),
hand off to the broker.
Not the same as LISTEN/NOTIFY
EventBus is the outbound CRUD-event side. The inbound
PostgreSQL LISTEN/NOTIFY listener (ListenerFactory.CreateListener,
the helper Notify) is a separate, unrelated mechanism that needs a
dedicated connection outside the pool.
The event bus delivers outbound events only (your code reacts to
Quark's CRUD inside the same process). There is no inbound listener:
ListenerFactory.CreateListener returns ErrDialectNotSupported on every
dialect today. If you need to consume PostgreSQL LISTEN/NOTIFY (a
dedicated pgx.Conn that survives the pool, with reconnect/backpressure
semantics), drive it yourself outside Quark for now. Inbound support is
planned post-v1.0 — it is a tracked v1.0 known limitation, not a bug.
The v0.8.0 placeholder struct EventBus (a LISTEN/NOTIFY factory that
only ever returned ErrDialectNotSupported) was renamed to
ListenerFactory to free the EventBus name for the interface
documented here. NewEventBus → NewListenerFactory. The struct was
non-functional, so no working code breaks; see
docs/MIGRATION_v0.9.0.md.