Saltar al contenido principal
Version: 1.1.0

Event Bus

Quark can publish a lifecycle event after every Create, Update, and Delete. Wire an EventBus implementation with Client.UseEventBus and each committed write emits a typed event you can route to a logger, OpenTelemetry, or an external broker (NATS / Kafka / Redis Streams).

client.UseEventBus(quark.NewLoggerEventBus(slog.Default()))

// Every committed Create/Update/Delete now publishes an event.
quark.For[Order](ctx, client).Create(&order) // → "created" event

The interfaces

type Event interface {
Kind() string // "created" | "updated" | "deleted"
Table() string
Payload() any // the model value involved in the operation
}

type EventBus interface {
Publish(ctx context.Context, event Event) error
}

Payload() returns the model pointer the CRUD operation acted on, so a subscriber can type-switch to recover the concrete struct:

func (b *myBus) Publish(ctx context.Context, e quark.Event) error {
if o, ok := e.Payload().(*Order); ok && e.Kind() == "created" {
return b.broker.Publish(ctx, "orders.created", o)
}
return nil
}

In-tree implementations

Two reference buses ship with Quark:

  • quark.NewLoggerEventBus(logger) — writes each event to an slog.Logger at Info level. A nil logger falls back to slog.Default(). Never returns an error.

  • quark.NewOTelEventBus(logger) — writes a correlation-tagged log record (event=quark.event.emit) suited to OpenTelemetry log/trace bridging. It deliberately does not pull the OTel SDK into the core package; for first-class spans, implement EventBus yourself.

    Requires an slog→OTel bridge

    OTelEventBus does not emit a span on its own — it writes an slog record tagged for correlation. The OTel linkage only happens if your process installs an slog handler that bridges to OpenTelemetry (e.g. go.opentelemetry.io/contrib/bridges/otelslog). Without that bridge the record stays in slog only. If you need a real span emitted per event, implement EventBus against your tracer directly.

Both are starting points. Production deployments implement EventBus against their broker of choice.

Delivery semantics

At-least-once, synchronous, no outbox

Quark does not ship a transactional outbox in v0.9. Events are published synchronously after the write commits. If the process crashes between the commit and the publish, the event is lost — the data is durable but the event never went out. If you need exactly-once or guaranteed delivery, write your own outbox table and publish from a poller, or make your subscriber idempotent and accept at-least-once. See ADR-0013.

When the write runs inside an explicit transaction (client.Tx + ForTx[T]), the publish is registered via Tx.OnCommit: it fires after the commit is durable and is discarded on rollback — a rolled-back write never emits an event.

When the write runs outside a transaction (For[T] against the Client), the publish runs inline immediately after the statement.

What does and doesn't emit

Events fire from the per-row CRUD methods: Create (created), Update / UpdateFields (updated), and Delete (deleted).

  • Bulk and WHERE-based methods do NOT emit: CreateBatch, UpdateBatch, DeleteBatch, DeleteBy, and UpdateMap have no per-row *T to attach to the event, so they are out of scope for v0.9. If you need events for bulk work, loop the single-row methods.
  • Emission is not gated on rows affected. Update / Delete publish their event whenever the call succeeds, even if the WHERE matched zero rows. This is deliberate: "rows affected = 0" is ambiguous across engines (MySQL reports 0 for a no-change update that did match a row; PostgreSQL reports the match count), so gating on it would make emission engine-dependent. Treat an event as "the operation was attempted and committed", not "a row was definitely mutated". Subscribers that need the exact count should carry it in the payload themselves.

When Publish fails

The write is already persisted; an emit failure never rolls anything back. How the failure surfaces depends on the path:

PathOn Publish error
Non-transactional (For[T])The CRUD method returns the error wrapped in quark.ErrEventEmitFailed. The row stays written — do NOT retry the write; retry the emit or rely on subscriber idempotency.
Transactional (ForTx[T] in Client.Tx)Logged with event=quark.event.emit_failure. NOT propagated — the commit already returned success, so there is no return value left to carry it.
if err := quark.For[Order](ctx, client).Create(&order); err != nil {
if errors.Is(err, quark.ErrEventEmitFailed) {
// The order IS saved. Only the event failed to publish.
// Re-publish or enqueue for retry; do NOT re-create.
} else {
// The write itself failed.
}
}

Connecting an external broker (skeleton)

type NATSBus struct{ nc *nats.Conn }

func (b *NATSBus) Publish(ctx context.Context, e quark.Event) error {
subject := "quark." + e.Table() + "." + e.Kind()
data, err := json.Marshal(e.Payload())
if err != nil {
return err
}
return b.nc.Publish(subject, data) // at-least-once; NATS handles delivery
}

client.UseEventBus(&NATSBus{nc: nc})

A Kafka or Redis Streams bus follows the same shape: serialize e.Payload(), derive a topic/subject from e.Table() + e.Kind(), hand off to the broker.

Not the same as LISTEN/NOTIFY

EventBus is the outbound CRUD-event side. The inbound PostgreSQL LISTEN/NOTIFY listener (ListenerFactory.CreateListener, the helper Notify) is a separate, unrelated mechanism that pins a dedicated connection from the pool.

Inbound listener (PostgreSQL only)

ListenerFactory.CreateListener returns a listener that consumes PostgreSQL NOTIFY messages. It pins one connection from the Client pool (acquired lazily on the first Listen) for the listener's lifetime, because a LISTEN registration lives on the physical connection and the pool rotates connections freely (ADR-0019). Every non-PostgreSQL dialect returns ErrDialectNotSupported.

listener, err := quark.NewListenerFactory(client).CreateListener()
if err != nil {
return err // ErrDialectNotSupported on non-PostgreSQL dialects
}
defer listener.Close()

if err := listener.Listen(ctx, "orders"); err != nil {
return err
}

// Emit from anywhere (a pooled connection, a trigger, another service):
// quark.Notify(ctx, client, "orders", `{"id":42}`)

for {
payload, err := listener.Receive(ctx) // blocks until a NOTIFY arrives
if err != nil {
return err // ctx cancelled, or the connection dropped — reconnect
}
log.Printf("channel=%s payload=%s", payload.Channel, payload.Payload)
}

The listener returns these sentinel errors (match with errors.Is):

SentinelReturned byCondition
ErrDialectNotSupportedCreateListenerthe Client dialect is not PostgreSQL
ErrNoSubscriptionReceivecalled before any Listen (no channel subscribed)
ErrListenerClosedListen / Unlisten / Receivecalled after Close — create a fresh listener
Single-goroutine, no durable delivery

The listener is single-goroutine: Listen/Unlisten/Receive/Close are serialized over the one pinned connection. Receive blocks holding that connection, so you cannot Listen or Close from another goroutine while a Receive is in flight — register every channel first, then loop Receive in one goroutine. To stop, cancel the Receive context, then Close.

PostgreSQL LISTEN/NOTIFY is fire-and-forget: notifications emitted while the connection is down (network blip, failover, server restart) are lost — there is no durable buffer or replay. On a connection error Receive returns it; reconnect with a fresh CreateListener + Listen and reconcile state yourself. This is not a substitute for a durable queue.

The listener holds one connection out of MaxOpenConns for its whole lifetime. For a long-lived listener, size the pool accordingly or give it its own Client.

Renamed in v0.9.0

The v0.8.0 placeholder struct EventBus (a LISTEN/NOTIFY factory that only ever returned ErrDialectNotSupported) was renamed to ListenerFactory to free the EventBus name for the interface documented here. NewEventBusNewListenerFactory. The struct was non-functional, so no working code breaks; see docs/MIGRATION_v0.9.0.md.