Saltar al contenido principal
Version: 0.10.0

Caching and Observability

Quark's extension points sit around the normal database/sql execution path. You can add a query cache, middleware, observers, lifecycle hooks, and OpenTelemetry without changing model code.

Query Caching

Attach a CacheStore to the client, then enable caching per query:

import (
"time"

"github.com/jcsvwinston/quark"
"github.com/jcsvwinston/quark/cache/memory"
)

store := memory.New()
defer store.Close()

client, err := quark.New("postgres", dsn,
quark.WithCacheStore(store),
)

users, err := quark.For[User](ctx, client).
Where("active", "=", true).
Limit(100).
Cache(5*time.Minute).
List()

The cache key includes the dialect, tenant ID, schema, SQL string, and bound arguments. A cached query returns the decoded []T without hitting the database.

Tags and Invalidation

Cache(ttl) automatically tags the entry with the model table name. Writes invalidate the table tag after successful Exec operations.

// Tagged as "users" automatically.
users, err := quark.For[User](ctx, client).
Cache(5*time.Minute).
List()

// Invalidates "users".
err = quark.For[User](ctx, client).Create(&newUser)

When you pass custom tags, include the table tag yourself if you still want automatic write invalidation to catch the entry:

users, err := quark.For[User](ctx, client).
Where("active", "=", true).
Cache(5*time.Minute, "users", "users:active").
List()

_ = store.InvalidateTags(ctx, "users:active")

Without the "users" tag, a write to the users table will not know that your custom tag represents user data.

Stampede protection

Every CacheStore installed via WithCacheStore is wrapped automatically with three in-process protections (ADR-0011):

  • SingleflightN concurrent callers for the same key collapse to one compute. The others wait on the result. A miss never produces a database stampede on a hot key.
  • TTL jitter — every Set randomises the TTL by ±jitterPct (default ±10%), so batch-warmed entries don't all expire in the same instant.
  • Probabilistic early refresh (XFetch) — near expiry, Get may signal refresh me now probabilistically; the value is recomputed while the cached copy is still valid, flattening the load curve.

The wrapper is installed without changes to the underlying store (memory.Store, redis.Store, or any third-party CacheStore) and without changes to the public CacheStore interface — opt-out is not possible (the protections are "all or nothing", per the cache playbook).

client, _ := quark.New("pgx", dsn,
quark.WithCacheStore(memory.New()),
quark.WithCacheJitter(0.2), // ±20% TTL jitter (default 0.1)
quark.WithCacheXFetchBeta(0.5), // tune XFetch aggressiveness
)

Tuning knobs:

OptionDefaultNotes
WithCacheJitter(pct)0.1Range [0, 1]; values outside are clamped. 0 disables jitter, singleflight + XFetch stay on.
WithCacheXFetchBeta(β)1.0Range β ≥ 0. Higher β triggers earlier refresh. β = 0 disables XFetch, singleflight + jitter stay on.

Known limitation: the singleflight is in-process only. A multi-replica deployment still allows N processes to each compute the same hot key — much less severe than the in-process stampede, but real. An ADR successor adds a distributed-lock hook if cross-instance demand appears.

Memory Store

import "github.com/jcsvwinston/quark/cache/memory"

store := memory.New()
defer store.Close()

The memory store is thread-safe and keeps a reverse index from tag to cache keys. It has a cleanup loop that evicts expired entries roughly once per minute. It is process-local, so it is ideal for tests, single-process services, and short TTLs.

Redis Store

import rediscache "github.com/jcsvwinston/quark/cache/redis"

store := rediscache.New(rediscache.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})

if err := store.Ping(ctx); err != nil {
return err
}

client, err := quark.New("postgres", dsn,
quark.WithCacheStore(store),
)

The Redis store uses keys prefixed with quark:cache: and Redis sets prefixed with quark:tag: for tag invalidation.

CacheStore Interface

type CacheStore interface {
Get(ctx context.Context, key string) ([]byte, error)
Set(ctx context.Context, key string, val []byte, ttl time.Duration, tags ...string) error
Delete(ctx context.Context, key string) error
InvalidateTags(ctx context.Context, tags ...string) error
}

Implement this interface when you need another backend, such as Memcached, Ristretto, an encrypted cache, or a tenant-aware distributed cache.

Lifecycle Hooks

Hooks live on the model:

func (u *User) BeforeCreate(ctx context.Context) error {
now := time.Now()
u.CreatedAt = now
u.UpdatedAt = now
return nil
}

func (u *User) AfterDelete(ctx context.Context) error {
audit.FromContext(ctx).Record("user.deleted", u.ID)
return nil
}

Available hooks:

HookTypical use
BeforeCreatetimestamps, default values, derived fields
AfterCreateaudit events, domain outbox rows
BeforeUpdatetimestamps, optimistic checks
AfterUpdatecache hints, audit events
BeforeDeleteauthorization checks, audit metadata
AfterDeletecleanup and outbox events

Hooks run for entity-level Create, Update, Delete, and HardDelete. Bulk operations should keep side effects in service-level orchestration.

Middleware

Middleware wraps SQL execution. It can wrap multi-row queries, single-row queries, and exec statements independently.

type LogMiddleware struct {
quark.BaseMiddleware
}

func (m *LogMiddleware) WrapExec(next quark.ExecFunc) quark.ExecFunc {
return func(ctx context.Context, exec quark.Executor, sqlStr string, args []any) (sql.Result, error) {
start := time.Now()
res, err := next(ctx, exec, sqlStr, args)
log.Printf("exec duration=%s sql=%s err=%v", time.Since(start), sqlStr, err)
return res, err
}
}

client, err := quark.New("postgres", dsn,
quark.WithMiddleware(&LogMiddleware{}),
)

Middleware is executed in registration order, with the first registered middleware wrapping the later ones.

Query Observers

Observers receive a QueryEvent after execution:

type MetricsObserver struct{}

func (o *MetricsObserver) ObserveQuery(e quark.QueryEvent) {
metrics.RecordDatabaseQuery(
e.Table,
e.Operation,
e.Duration,
e.Rows,
e.Error,
)
}

client, err := quark.New("postgres", dsn,
quark.WithQueryObserver(&MetricsObserver{}),
)

QueryEvent fields:

FieldTypeDescription
SQLstringSQL sent to the driver.
Args[]anyBound arguments.
Durationtime.DurationExecution duration measured by Quark.
Rowsint64Rows returned or affected when known.
ErrorerrorError observed at execution time.
TablestringModel table when available.
OperationstringExamples: SELECT, EXEC, QUERY_ROW, RAW_QUERY, RAW_EXEC.

Observers are a good fit for metrics, structured logging, query sampling, and auditing. Redact sensitive arguments before exporting them.

Slow query logging

A quick way to catch regressions before any external tooling: configure a threshold and let Quark log slow operations through the Client logger.

client, _ := quark.New("pgx", dsn,
quark.WithSlowQueryThreshold(100*time.Millisecond),
)

Every operation that exceeds the threshold emits a structured WARN with duration_ms, threshold_ms, operation, table, rows and sql — parameterised, bind args never included. Threshold 0 disables. See the Observability reference for the full field list.

OpenTelemetry

import quarkotel "github.com/jcsvwinston/quark/otel"

client, err := quark.New("postgres", dsn,
quark.WithMiddleware(quarkotel.New(
quarkotel.WithDBSystem("postgres"),
)),
)

Spans

The OTel middleware creates spans named:

Execution pathSpan name
ExecContextquark.exec
QueryContextquark.query
QueryRowContextquark.query_row

Spans include db.statement (parameterised SQL — bind arguments are not attached by default, see span redaction below) and db.operation. Errors from Exec and Query are recorded on the span. QueryRow driver errors surface later during Scan, so the current middleware cannot observe every QueryRow scan error.

Span redaction

Bind arguments are redacted from spans by default. Opt in to IncludeArgs only for local debugging:

quarkotel.New(quarkotel.WithSpanRedaction(quarkotel.IncludeArgs))

See the Observability API reference for the full contract.

Metrics

The middleware also emits three OTel metrics instruments on the github.com/jcsvwinston/quark meter:

InstrumentKindUnitNotes
quark.queries.totalcounterEvery operation increments.
quark.queries.durationhistogrammsWall-clock duration including middleware overhead.
quark.queries.rowshistogramRows affected — Exec only. SELECT does not emit (counting rows would require wrapping *sql.Rows).

Data points carry db.operation and — when WithDBSystem is set — db.system.

Install your MeterProvider before the first query; the middleware resolves it lazily from the OTel global provider, same as the tracer.

PostgreSQL Notifications

Quark exposes a small notification helper:

err := quark.Notify(ctx, client, "user_events", `{"type":"signup","id":123}`)

Notify validates the channel name and currently supports PostgreSQL through pg_notify. The inbound ListenerFactory.CreateListener (renamed from EventBus in v0.9.0) returns ErrDialectNotSupported — LISTEN/NOTIFY is out of scope for Fase 5. For outbound CRUD lifecycle events, see the Event Bus guide (Client.UseEventBus).

Production Combination

store := memory.New()
defer store.Close()

client, err := quark.New("postgres", dsn,
quark.WithCacheStore(store),
quark.WithMiddleware(quarkotel.New()),
quark.WithMiddleware(&LogMiddleware{}),
quark.WithQueryObserver(&MetricsObserver{}),
)

A practical ordering is:

  1. Use middleware for behavior around execution, such as tracing or retry.
  2. Use observers for post-execution telemetry.
  3. Use hooks for entity-specific lifecycle behavior.
  4. Use cache tags intentionally, especially when custom tags are introduced.