Saltar al contenido principal
Version: 0.11.0

Event Bus

Quark can publish a lifecycle event after every Create, Update, and Delete. Wire an EventBus implementation with Client.UseEventBus and each committed write emits a typed event you can route to a logger, OpenTelemetry, or an external broker (NATS / Kafka / Redis Streams).

client.UseEventBus(quark.NewLoggerEventBus(slog.Default()))

// Every committed Create/Update/Delete now publishes an event.
quark.For[Order](ctx, client).Create(&order) // → "created" event

The interfaces

type Event interface {
Kind() string // "created" | "updated" | "deleted"
Table() string
Payload() any // the model value involved in the operation
}

type EventBus interface {
Publish(ctx context.Context, event Event) error
}

Payload() returns the model pointer the CRUD operation acted on, so a subscriber can type-switch to recover the concrete struct:

func (b *myBus) Publish(ctx context.Context, e quark.Event) error {
if o, ok := e.Payload().(*Order); ok && e.Kind() == "created" {
return b.broker.Publish(ctx, "orders.created", o)
}
return nil
}

In-tree implementations

Two reference buses ship with Quark:

  • quark.NewLoggerEventBus(logger) — writes each event to an slog.Logger at Info level. A nil logger falls back to slog.Default(). Never returns an error.

  • quark.NewOTelEventBus(logger) — writes a correlation-tagged log record (event=quark.event.emit) suited to OpenTelemetry log/trace bridging. It deliberately does not pull the OTel SDK into the core package; for first-class spans, implement EventBus yourself.

    Requires an slog→OTel bridge

    OTelEventBus does not emit a span on its own — it writes an slog record tagged for correlation. The OTel linkage only happens if your process installs an slog handler that bridges to OpenTelemetry (e.g. go.opentelemetry.io/contrib/bridges/otelslog). Without that bridge the record stays in slog only. If you need a real span emitted per event, implement EventBus against your tracer directly.

Both are starting points. Production deployments implement EventBus against their broker of choice.

Delivery semantics

At-least-once, synchronous, no outbox

Quark does not ship a transactional outbox in v0.9. Events are published synchronously after the write commits. If the process crashes between the commit and the publish, the event is lost — the data is durable but the event never went out. If you need exactly-once or guaranteed delivery, write your own outbox table and publish from a poller, or make your subscriber idempotent and accept at-least-once. See ADR-0013.

When the write runs inside an explicit transaction (client.Tx + ForTx[T]), the publish is registered via Tx.OnCommit: it fires after the commit is durable and is discarded on rollback — a rolled-back write never emits an event.

When the write runs outside a transaction (For[T] against the Client), the publish runs inline immediately after the statement.

What does and doesn't emit

Events fire from the per-row CRUD methods: Create (created), Update / UpdateFields (updated), and Delete (deleted).

  • Bulk and WHERE-based methods do NOT emit: CreateBatch, UpdateBatch, DeleteBatch, DeleteBy, and UpdateMap have no per-row *T to attach to the event, so they are out of scope for v0.9. If you need events for bulk work, loop the single-row methods.
  • Emission is not gated on rows affected. Update / Delete publish their event whenever the call succeeds, even if the WHERE matched zero rows. This is deliberate: "rows affected = 0" is ambiguous across engines (MySQL reports 0 for a no-change update that did match a row; PostgreSQL reports the match count), so gating on it would make emission engine-dependent. Treat an event as "the operation was attempted and committed", not "a row was definitely mutated". Subscribers that need the exact count should carry it in the payload themselves.

When Publish fails

The write is already persisted; an emit failure never rolls anything back. How the failure surfaces depends on the path:

PathOn Publish error
Non-transactional (For[T])The CRUD method returns the error wrapped in quark.ErrEventEmitFailed. The row stays written — do NOT retry the write; retry the emit or rely on subscriber idempotency.
Transactional (ForTx[T] in Client.Tx)Logged with event=quark.event.emit_failure. NOT propagated — the commit already returned success, so there is no return value left to carry it.
if err := quark.For[Order](ctx, client).Create(&order); err != nil {
if errors.Is(err, quark.ErrEventEmitFailed) {
// The order IS saved. Only the event failed to publish.
// Re-publish or enqueue for retry; do NOT re-create.
} else {
// The write itself failed.
}
}

Connecting an external broker (skeleton)

type NATSBus struct{ nc *nats.Conn }

func (b *NATSBus) Publish(ctx context.Context, e quark.Event) error {
subject := "quark." + e.Table() + "." + e.Kind()
data, err := json.Marshal(e.Payload())
if err != nil {
return err
}
return b.nc.Publish(subject, data) // at-least-once; NATS handles delivery
}

client.UseEventBus(&NATSBus{nc: nc})

A Kafka or Redis Streams bus follows the same shape: serialize e.Payload(), derive a topic/subject from e.Table() + e.Kind(), hand off to the broker.

Not the same as LISTEN/NOTIFY

EventBus is the outbound CRUD-event side. The inbound PostgreSQL LISTEN/NOTIFY listener (ListenerFactory.CreateListener, the helper Notify) is a separate, unrelated mechanism that remains out of scope for Fase 5 — it needs a dedicated connection outside the pool. ListenerFactory.CreateListener returns ErrDialectNotSupported.

Renamed in v0.9.0

The v0.8.0 placeholder struct EventBus (a LISTEN/NOTIFY factory that only ever returned ErrDialectNotSupported) was renamed to ListenerFactory to free the EventBus name for the interface documented here. NewEventBusNewListenerFactory. The struct was non-functional, so no working code breaks; see docs/MIGRATION_v0.9.0.md.