Caching and Observability
Quark's extension points sit around the normal database/sql execution path.
You can add a query cache, middleware, observers, lifecycle hooks, and
OpenTelemetry without changing model code.
Query Caching
Attach a CacheStore to the client, then enable caching per query:
import (
"time"
"github.com/jcsvwinston/quark"
"github.com/jcsvwinston/quark/cache/memory"
)
store := memory.New()
defer store.Close()
client, err := quark.New("postgres", dsn,
quark.WithCacheStore(store),
)
users, err := quark.For[User](ctx, client).
Where("active", "=", true).
Limit(100).
Cache(5*time.Minute).
List()
The cache key includes the dialect, tenant ID, schema, SQL string, and bound
arguments. A cached query returns the decoded []T without hitting the database.
Tags and Invalidation
Cache(ttl) automatically tags the entry with the model table name and
the per-row tag <table>:<pk> for any rows the query touches. Writes
invalidate both tags after successful Exec operations: Create /
Update / UpdateFields / Tracked.Save / Delete register the
affected PK on top of the table tag.
// Tagged as "users" and (after the read) "users:42" automatically.
user, err := quark.For[User](ctx, client).
Cache(5*time.Minute).
First(42)
// Invalidates the table tag "users" AND the row tag "users:42".
err = quark.For[User](ctx, client).
UpdateFields(&User{ID: 42, Name: "Alice"}, "name")
The per-row tag (<table>:<pk>) was added in v0.8 (F4-6) so a single-row
write does not blow away every cached query on the table. Composite PKs
join with :. Bulk and WHERE-based methods (UpdateBatch,
DeleteBatch, DeleteBy) invalidate the table tag because the affected
PKs are not enumerable.
On Redis, the tag-set TTL takes the MAX across writes via
ExpireNX + ExpireGT (Redis 7+) — older entries can't shorten the
window that a newer entry needs.
When you pass custom tags, include the table tag yourself if you still want automatic write invalidation to catch the entry:
users, err := quark.For[User](ctx, client).
Where("active", "=", true).
Cache(5*time.Minute, "users", "users:active").
List()
_ = store.InvalidateTags(ctx, "users:active")
Without the "users" tag, a write to the users table will not know that your
custom tag represents user data.
Stampede protection
:::warning In-process only — cross-instance is post-v1.0 The protections below (singleflight, jitter, XFetch) are per-process. In a multi-replica deployment each replica collapses its own concurrent callers, but N replicas can still each compute the same hot key once — much less severe than an in-process stampede, but real. Cross-instance coordination (an optional distributed-lock hook) is a tracked v1.0 known limitation, planned post-v1.0 (ADR-0011 §"Cuándo reabrir"). :::
Every CacheStore installed via WithCacheStore is wrapped automatically
with three in-process protections (ADR-0011):
- Singleflight —
Nconcurrent callers for the same key collapse to one compute. The others wait on the result. A miss never produces a database stampede on a hot key. - TTL jitter — every
Setrandomises the TTL by ±jitterPct(default±10%), so batch-warmed entries don't all expire in the same instant. - Probabilistic early refresh (XFetch) — near expiry,
Getmay signal refresh me now probabilistically; the value is recomputed while the cached copy is still valid, flattening the load curve.
The wrapper is installed without changes to the underlying store
(memory.Store, redis.Store, or any third-party CacheStore) and
without changes to the public CacheStore interface — opt-out is not
possible (the protections are "all or nothing", per the cache playbook).
client, _ := quark.New("pgx", dsn,
quark.WithCacheStore(memory.New()),
quark.WithCacheJitter(0.2), // ±20% TTL jitter (default 0.1)
quark.WithCacheXFetchBeta(0.5), // tune XFetch aggressiveness
)
Tuning knobs:
| Option | Default | Notes |
|---|---|---|
WithCacheJitter(pct) | 0.1 | Range [0, 1]; values outside are clamped. 0 disables jitter, singleflight + XFetch stay on. |
WithCacheXFetchBeta(β) | 1.0 | Range β ≥ 0. Higher β triggers earlier refresh. β = 0 disables XFetch, singleflight + jitter stay on. |
(The in-process scope of these protections is called out in the warning at the top of this section.)
Memory Store
import "github.com/jcsvwinston/quark/cache/memory"
store := memory.New()
defer store.Close()
The memory store is thread-safe and keeps a reverse index from tag to cache keys. It has a cleanup loop that evicts expired entries roughly once per minute. It is process-local, so it is ideal for tests, single-process services, and short TTLs.
Redis Store
import rediscache "github.com/jcsvwinston/quark/cache/redis"
store := rediscache.New(rediscache.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
if err := store.Ping(ctx); err != nil {
return err
}
client, err := quark.New("postgres", dsn,
quark.WithCacheStore(store),
)
The Redis store uses keys prefixed with quark:cache: and Redis sets prefixed
with quark:tag: for tag invalidation.
CacheStore Interface
type CacheStore interface {
Get(ctx context.Context, key string) ([]byte, error)
Set(ctx context.Context, key string, val []byte, ttl time.Duration, tags ...string) error
Delete(ctx context.Context, key string) error
InvalidateTags(ctx context.Context, tags ...string) error
}
Implement this interface when you need another backend, such as Memcached, Ristretto, an encrypted cache, or a tenant-aware distributed cache.
Lifecycle Hooks
Hooks live on the model:
func (u *User) BeforeCreate(ctx context.Context) error {
now := time.Now()
u.CreatedAt = now
u.UpdatedAt = now
return nil
}
func (u *User) AfterDelete(ctx context.Context) error {
audit.FromContext(ctx).Record("user.deleted", u.ID)
return nil
}
Available hooks:
| Hook | Typical use |
|---|---|
BeforeCreate | timestamps, default values, derived fields |
AfterCreate | audit events, domain outbox rows |
BeforeUpdate | timestamps, optimistic checks |
AfterUpdate | cache hints, audit events |
BeforeDelete | authorization checks, audit metadata |
AfterDelete | cleanup and outbox events |
Hooks run for entity-level Create, Update, Delete, and HardDelete. Bulk
operations should keep side effects in service-level orchestration.
Middleware
Middleware wraps SQL execution. It can wrap multi-row queries, single-row queries, and exec statements independently.
type LogMiddleware struct {
quark.BaseMiddleware
}
func (m *LogMiddleware) WrapExec(next quark.ExecFunc) quark.ExecFunc {
return func(ctx context.Context, exec quark.Executor, sqlStr string, args []any) (sql.Result, error) {
start := time.Now()
res, err := next(ctx, exec, sqlStr, args)
log.Printf("exec duration=%s sql=%s err=%v", time.Since(start), sqlStr, err)
return res, err
}
}
client, err := quark.New("postgres", dsn,
quark.WithMiddleware(&LogMiddleware{}),
)
Middleware is executed in registration order, with the first registered middleware wrapping the later ones.
Query Observers
Observers receive a QueryEvent after execution:
type MetricsObserver struct{}
func (o *MetricsObserver) ObserveQuery(e quark.QueryEvent) {
metrics.RecordDatabaseQuery(
e.Table,
e.Operation,
e.Duration,
e.Rows,
e.Error,
)
}
client, err := quark.New("postgres", dsn,
quark.WithQueryObserver(&MetricsObserver{}),
)
QueryEvent fields:
| Field | Type | Description |
|---|---|---|
SQL | string | SQL sent to the driver. |
Args | []any | Bound arguments. |
Duration | time.Duration | Execution duration measured by Quark. |
Rows | int64 | Rows returned or affected when known. |
Error | error | Error observed at execution time. |
Table | string | Model table when available. |
Operation | string | Examples: SELECT, EXEC, QUERY_ROW, RAW_QUERY, RAW_EXEC. |
Observers are a good fit for metrics, structured logging, query sampling, and auditing. Redact sensitive arguments before exporting them.
Slow query logging
A quick way to catch regressions before any external tooling: configure a
threshold and let Quark log slow operations through the Client logger.
client, _ := quark.New("pgx", dsn,
quark.WithSlowQueryThreshold(100*time.Millisecond),
)
Every operation that exceeds the threshold emits a structured WARN with
duration_ms, threshold_ms, operation, table, rows and sql —
parameterised, bind args never included. Threshold 0 disables. See the
Observability reference
for the full field list.
OpenTelemetry
import quarkotel "github.com/jcsvwinston/quark/otel"
client, err := quark.New("postgres", dsn,
quark.WithMiddleware(quarkotel.New(
quarkotel.WithDBSystem("postgres"),
)),
)
Spans
The OTel middleware creates spans named:
| Execution path | Span name |
|---|---|
ExecContext | quark.exec |
QueryContext | quark.query |
QueryRowContext | quark.query_row |
Spans include db.statement (parameterised SQL — bind arguments are not
attached by default, see span redaction below) and db.operation. Errors
from Exec and Query are recorded on the span. QueryRow driver errors
surface later during Scan, so the current middleware cannot observe every
QueryRow scan error.
Span redaction
Bind arguments are redacted from spans by default. Opt in to IncludeArgs
only for local debugging:
quarkotel.New(quarkotel.WithSpanRedaction(quarkotel.IncludeArgs))
See the Observability API reference for the full contract.
Metrics
The middleware also emits three OTel metrics instruments on the
github.com/jcsvwinston/quark meter:
| Instrument | Kind | Unit | Notes |
|---|---|---|---|
quark.queries.total | counter | — | Every operation increments. |
quark.queries.duration | histogram | ms | Wall-clock duration including middleware overhead. |
quark.queries.rows | histogram | — | Rows affected — Exec only. SELECT does not emit (counting rows would require wrapping *sql.Rows). |
Data points carry db.operation and — when WithDBSystem is set — db.system.
Install your MeterProvider before the first query; the middleware resolves
it lazily from the OTel global provider, same as the tracer.
PostgreSQL Notifications
Quark exposes a small notification helper:
err := quark.Notify(ctx, client, "user_events", `{"type":"signup","id":123}`)
Notify validates the channel name and currently supports PostgreSQL through
pg_notify. The inbound ListenerFactory.CreateListener (renamed from
EventBus in v0.9.0) returns ErrDialectNotSupported — LISTEN/NOTIFY is out
of scope for Fase 5. For outbound CRUD lifecycle events, see the
Event Bus guide (Client.UseEventBus).
Production Combination
store := memory.New()
defer store.Close()
client, err := quark.New("postgres", dsn,
quark.WithCacheStore(store),
quark.WithMiddleware(quarkotel.New()),
quark.WithMiddleware(&LogMiddleware{}),
quark.WithQueryObserver(&MetricsObserver{}),
)
A practical ordering is:
- Use middleware for behavior around execution, such as tracing or retry.
- Use observers for post-execution telemetry.
- Use hooks for entity-specific lifecycle behavior.
- Use cache tags intentionally, especially when custom tags are introduced.