feat(webapp,run-engine): mollifier drainer replay + stale sweep + cancelled-run engine API#3754
Conversation
🦋 Changeset detectedLatest commit: 1be3361 The changes in this PR will be included in the next version bump. This PR includes changesets to release 32 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
✅ Files skipped from review due to trivial changes (1)
📜 Recent review details⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (21)
🧰 Additional context used📓 Path-based instructions (10)**/*.{ts,tsx}📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Files:
{packages/core,apps/webapp}/**/*.{ts,tsx}📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Files:
**/*.{ts,tsx,js,jsx}📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Files:
**/*.ts📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)
Files:
apps/webapp/**/*.{ts,tsx}📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Files:
apps/webapp/**/*.server.ts📄 CodeRabbit inference engine (apps/webapp/CLAUDE.md)
Files:
**/*.{js,jsx,ts,tsx,json,md,yml,yaml}📄 CodeRabbit inference engine (AGENTS.md)
Files:
**/*.{test,spec}.{ts,tsx}📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Files:
apps/webapp/**/*.test.{ts,tsx}📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Files:
**/*.test.{ts,tsx,js,jsx}📄 CodeRabbit inference engine (AGENTS.md)
Files:
🧠 Learnings (11)📚 Learning: 2026-03-22T13:26:12.060ZApplied to files:
📚 Learning: 2026-03-22T19:24:14.403ZApplied to files:
📚 Learning: 2026-05-18T08:21:27.694ZApplied to files:
📚 Learning: 2026-05-18T08:21:27.694ZApplied to files:
📚 Learning: 2026-03-29T19:16:28.864ZApplied to files:
📚 Learning: 2026-05-05T09:38:02.512ZApplied to files:
📚 Learning: 2026-05-12T21:04:05.815ZApplied to files:
📚 Learning: 2026-05-14T08:21:07.614ZApplied to files:
📚 Learning: 2026-05-07T12:25:18.271ZApplied to files:
📚 Learning: 2026-05-28T20:02:10.647ZApplied to files:
📚 Learning: 2026-05-18T14:40:02.173ZApplied to files:
🔇 Additional comments (9)
WalkthroughThis PR implements a terminal-failure callback mechanism for the mollifier drainer that ensures SYSTEM_FAILURE rows are persisted even when retryable database errors exhaust retries. It adds a durable Redis-backed stale-entry sweep that scans buffered entries across orgs/envs, reports OpenTelemetry metrics, and stores per-env stale counts. The changes extend run-engine with createCancelledRun and a configurable emitRunFailedEvent flag, wire drainer handlers into the webapp, and add tests and worker initialization for both features. Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
|
626a8dc to
af7368e
Compare
31f4726 to
b05929b
Compare
5a7bc19 to
baa6f17
Compare
b05929b to
b89da52
Compare
74fdf6d to
c6fa61f
Compare
Two bugs flagged by Devin on PR #3754: 1. entry.server.tsx reverted to \`void sessionsReplicationInstance;\`, which esbuild tree-shakes under \`"sideEffects": false\`. Restored the globalThis assignment + warning comment from #3738 (incident TRI-9864). Without this the sessions→ClickHouse logical replication slot stops being consumed at boot. 2. createFailedTaskRun unconditionally emitted \`runFailed\`, which the \`completeFailedRunEvent\` listener uses to write a span completion into ClickHouse. But TriggerFailedTaskService.call() already wraps createFailedTaskRun inside \`repository.traceEvent({ incomplete: false, isError: true })\` which writes its own completion row for the same (traceId, spanId). Two completions racing on the same span row is a real observability bug. Added an \`emitRunFailedEvent: boolean = true\` opt-out. The TriggerFailedTaskService.call() path now passes \`false\` and enqueues \`PerformTaskRunAlertsService\` directly after the trace event closes so the alerts side of \`runFailed\` is preserved. \`callWithoutTraceEvents\` and the mollifier drainer's terminal- failure path keep the default emit (they have no outer trace event managing the span). Regression test pins the opt-out: \`emitRunFailedEvent: false\` writes the PG row but does NOT fire the bus event. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two bugs flagged by Devin on PR #3754: 1. entry.server.tsx reverted to \`void sessionsReplicationInstance;\`, which esbuild tree-shakes under \`"sideEffects": false\`. Restored the globalThis assignment + warning comment from #3738 (incident TRI-9864). Without this the sessions→ClickHouse logical replication slot stops being consumed at boot. 2. createFailedTaskRun unconditionally emitted \`runFailed\`, which the \`completeFailedRunEvent\` listener uses to write a span completion into ClickHouse. But TriggerFailedTaskService.call() already wraps createFailedTaskRun inside \`repository.traceEvent({ incomplete: false, isError: true })\` which writes its own completion row for the same (traceId, spanId). Two completions racing on the same span row is a real observability bug. Added an \`emitRunFailedEvent: boolean = true\` opt-out. The TriggerFailedTaskService.call() path now passes \`false\` and enqueues \`PerformTaskRunAlertsService\` directly after the trace event closes so the alerts side of \`runFailed\` is preserved. \`callWithoutTraceEvents\` and the mollifier drainer's terminal- failure path keep the default emit (they have no outer trace event managing the span). Regression test pins the opt-out: \`emitRunFailedEvent: false\` writes the PG row but does NOT fire the bus event. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
01f3958 to
449a0bc
Compare
242ba73 to
6a8404d
Compare
449a0bc to
ffe51b8
Compare
Two bugs flagged by Devin on PR #3754: 1. entry.server.tsx reverted to \`void sessionsReplicationInstance;\`, which esbuild tree-shakes under \`"sideEffects": false\`. Restored the globalThis assignment + warning comment from #3738 (incident TRI-9864). Without this the sessions→ClickHouse logical replication slot stops being consumed at boot. 2. createFailedTaskRun unconditionally emitted \`runFailed\`, which the \`completeFailedRunEvent\` listener uses to write a span completion into ClickHouse. But TriggerFailedTaskService.call() already wraps createFailedTaskRun inside \`repository.traceEvent({ incomplete: false, isError: true })\` which writes its own completion row for the same (traceId, spanId). Two completions racing on the same span row is a real observability bug. Added an \`emitRunFailedEvent: boolean = true\` opt-out. The TriggerFailedTaskService.call() path now passes \`false\` and enqueues \`PerformTaskRunAlertsService\` directly after the trace event closes so the alerts side of \`runFailed\` is preserved. \`callWithoutTraceEvents\` and the mollifier drainer's terminal- failure path keep the default emit (they have no outer trace event managing the span). Regression test pins the opt-out: \`emitRunFailedEvent: false\` writes the PG row but does NOT fire the bus event. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
6a8404d to
bc9f4e2
Compare
ffe51b8 to
7ddb17d
Compare
Two bugs flagged by Devin on PR #3754: 1. entry.server.tsx reverted to \`void sessionsReplicationInstance;\`, which esbuild tree-shakes under \`"sideEffects": false\`. Restored the globalThis assignment + warning comment from #3738 (incident TRI-9864). Without this the sessions→ClickHouse logical replication slot stops being consumed at boot. 2. createFailedTaskRun unconditionally emitted \`runFailed\`, which the \`completeFailedRunEvent\` listener uses to write a span completion into ClickHouse. But TriggerFailedTaskService.call() already wraps createFailedTaskRun inside \`repository.traceEvent({ incomplete: false, isError: true })\` which writes its own completion row for the same (traceId, spanId). Two completions racing on the same span row is a real observability bug. Added an \`emitRunFailedEvent: boolean = true\` opt-out. The TriggerFailedTaskService.call() path now passes \`false\` and enqueues \`PerformTaskRunAlertsService\` directly after the trace event closes so the alerts side of \`runFailed\` is preserved. \`callWithoutTraceEvents\` and the mollifier drainer's terminal- failure path keep the default emit (they have no outer trace event managing the span). Regression test pins the opt-out: \`emitRunFailedEvent: false\` writes the PG row but does NOT fire the bus event. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
637e8c0 to
65219db
Compare
4229f9a to
4f31074
Compare
Two bugs flagged by Devin on PR #3754: 1. entry.server.tsx reverted to \`void sessionsReplicationInstance;\`, which esbuild tree-shakes under \`"sideEffects": false\`. Restored the globalThis assignment + warning comment from #3738 (incident TRI-9864). Without this the sessions→ClickHouse logical replication slot stops being consumed at boot. 2. createFailedTaskRun unconditionally emitted \`runFailed\`, which the \`completeFailedRunEvent\` listener uses to write a span completion into ClickHouse. But TriggerFailedTaskService.call() already wraps createFailedTaskRun inside \`repository.traceEvent({ incomplete: false, isError: true })\` which writes its own completion row for the same (traceId, spanId). Two completions racing on the same span row is a real observability bug. Added an \`emitRunFailedEvent: boolean = true\` opt-out. The TriggerFailedTaskService.call() path now passes \`false\` and enqueues \`PerformTaskRunAlertsService\` directly after the trace event closes so the alerts side of \`runFailed\` is preserved. \`callWithoutTraceEvents\` and the mollifier drainer's terminal- failure path keep the default emit (they have no outer trace event managing the span). Regression test pins the opt-out: \`emitRunFailedEvent: false\` writes the PG row but does NOT fire the bus event. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
65219db to
ccdcd9c
Compare
4f31074 to
e56b937
Compare
ccdcd9c to
5f50940
Compare
39ff4de to
af9bf63
Compare
44286e0 to
ac7fb86
Compare
Two bugs flagged by Devin on PR #3754: 1. entry.server.tsx reverted to \`void sessionsReplicationInstance;\`, which esbuild tree-shakes under \`"sideEffects": false\`. Restored the globalThis assignment + warning comment from #3738 (incident TRI-9864). Without this the sessions→ClickHouse logical replication slot stops being consumed at boot. 2. createFailedTaskRun unconditionally emitted \`runFailed\`, which the \`completeFailedRunEvent\` listener uses to write a span completion into ClickHouse. But TriggerFailedTaskService.call() already wraps createFailedTaskRun inside \`repository.traceEvent({ incomplete: false, isError: true })\` which writes its own completion row for the same (traceId, spanId). Two completions racing on the same span row is a real observability bug. Added an \`emitRunFailedEvent: boolean = true\` opt-out. The TriggerFailedTaskService.call() path now passes \`false\` and enqueues \`PerformTaskRunAlertsService\` directly after the trace event closes so the alerts side of \`runFailed\` is preserved. \`callWithoutTraceEvents\` and the mollifier drainer's terminal- failure path keep the default emit (they have no outer trace event managing the span). Regression test pins the opt-out: \`emitRunFailedEvent: false\` writes the PG row but does NOT fire the bus event. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…on terminal failure Two Devin review findings on PR #3754, both real and unresolved: 1. Sharded stale sweep's counts hash never cleared for fully-drained envs — gauge stayed permanently elevated, false-alerting the recommended `> 0 for 5m` rule. Root cause: when an env's last buffered entry is popped, the buffer's atomic Lua removes the env from `mollifier:org-envs:${orgId}` (and removes the org from `mollifier:orgs` if it has no other envs). The sweep's inner loop walks `buffer.listEnvsForOrg(orgId)`, so the env disappears from the iteration entirely — `setEnvStaleCount(envId, 0)` (which HDELs the field) is never called, and the counts hash retains the env's last-known stale count forever. Fix (Devin's Approach 2): cycle-bounded reconciliation. Add a Redis SET `mollifier:stale_sweep:visited` that the sweep SADDs into for every env it touches. When the cursor wraps (cycle complete), `reconcileVisited()` does `HKEYS counts → SMEMBERS visited → HDEL the difference → DEL visited`. Pipelined; orphans clear within at most one full cursor cycle of the env going quiet, which matches the sharding contract's existing one-cycle freshness window. Test: "evicts fully-drained envs from the counts hash at cycle wrap" — accepts an entry, sweep flags it stale, pops the entry (env vanishes from listEnvsForOrg), runs another sweep that triggers wrap, asserts the env is HDEL'd from both the snapshot and the underlying counts hash. 2. Drainer handler's terminal SYSTEM_FAILURE write dropped the snapshot's `batch` field. If the buffered run was part of a batch, the failure row wasn't associated with the batch and the batch parent's completion tracking could hang indefinitely waiting on a child that landed but isn't visible to the batch. Fix: extract `snapshot.batch` with structural type guards and pass it through to `createFailedTaskRun`. Same defensive pattern as the other snapshot fields in this code path (the snapshot is typed `Record<string, unknown>` because it came from cjson-decoded buffer payload). Test: "propagates the batch association into createFailedTaskRun" — asserts the call site receives `{ id, index }` from the snapshot. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…Redis Devin review on PR #3754: `stop()` previously called `deps.state.close()` immediately after `clearInterval`, but `tick()` only checks `stopped` at entry. A tick that was already past that guard would keep making `state.*` calls against an ioredis client that `stop()` had already `quit()`ed — those calls would throw, the tick's own try/catch would swallow them as `mollifier.stale_sweep.failed` warnings, and every graceful shutdown would emit spurious noise. Track the current tick promise as `currentTick`. `stop()` awaits it (if present) before invoking `state.close()`, so the tick's last state call lands BEFORE the Redis client is quit. The tick's own try/catch handles the (unexpected) case where it rejects; the await in `stop()` is solely for ordering. Also drop the `instanceof MollifierStaleSweepState` guard around `state.close()` — `close()` is part of the `StaleSweepStateStore` contract, so unconditional invocation is correct. Test fake states implement `close()` as a no-op. Test: `stop() waits for an in-flight tick to finish before closing the state` — gates a fake state's `readCursor` on a Deferred, kicks off the interval, waits for the tick to start, then races `stop()` against the gate. Asserts the stop promise stays unresolved while the tick is mid-flight and that the tick's final state operation lands before `close()`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three unresolved Devin threads, all addressed in this commit. Committed
locally only — not pushed.
1. `callWithoutTraceEvents` was inheriting the new `emitRunFailedEvent`
default of `true`, so its `createFailedTaskRun` call would fire the
`runFailed` bus emit and the listener would write a ClickHouse
completion event row with empty `traceId`/`spanId` — orphan row,
directly contradicting the method's "without trace events" contract.
Pass `emitRunFailedEvent: false` and enqueue
`PerformTaskRunAlertsService` directly, mirroring the `call()`
pattern so customers' ERROR channels still see the failure.
2. The cjson empty-tags defense lived only on `createCancelledRun`, not
on `engine.trigger`. When the mollifier buffer's mutate-side Lua
re-serialises a payload (e.g. `append_tags` on a buffered run that
never had tags), an empty Lua table encodes as `{}` and decodes
back to a JS object — and the previous `tags.length === 0` check
passes that object straight to Prisma's `String[]` column.
Mirror the same `Array.isArray && tags.length > 0 ? tags : undefined`
guard `createCancelledRun` already uses. The defense is symmetric
with the existing tested case for createCancelledRun, so the same
contract holds for the trigger replay path.
3. `runCancelled` handler's `cancelRunEvent` lookup fails for
buffered-only runs (no primary trace event exists, since the
mollifier gate skipped `repository.traceEvent` for the
not-yet-materialised run). The handler's `tryCatch` swallowed the
error, but the systematic `[runCancelled] Failed to cancel run
event` log fired on every cancelled buffered run.
Add `emitRunCancelledEvent: boolean = true` to `createCancelledRun`
(symmetric with the existing `emitRunFailedEvent` flag on
`createFailedTaskRun`); drainer handler passes `false`. CANCELED PG
row still writes; only the trace-event mirror is skipped.
Tests:
- `RunEngine.createCancelledRun > emitRunCancelledEvent: false
suppresses the bus emit but still writes the CANCELED PG row` —
pins the new flag's semantics.
- `createDrainerHandler > calls createCancelledRun with
emitRunCancelledEvent: false (suppresses orphan trace-event log
noise)` — pins the call site's contract.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
af9bf63 to
4500f0c
Compare
…celled-run engine API The replay side of the mollifier: - DrainerHandler that reads buffered snapshots and replays them through engine.trigger to materialise PG rows. - RunEngine.createCancelledRun: new public method the handler uses to write CANCELED rows directly from snapshots (bypass queue + waitpoint, emit runCancelled). Tolerates cjson empty-table tags. - Drainer fairness: org → env rotation so a heavy env doesn't starve light ones in the same org. - Stale-entry sweep + telemetry + alertable gauge for stuck drainers. Both drainer and sweep default-off; nothing fires unless flagged on. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- `isRetryablePgError`: also accept `errorCode === "P1001"` so `PrismaClientInitializationError` (which surfaces P1001 on a different field than `PrismaClientKnownRequestError`) retries. - Drop `envId` from OTel metric labels on `mollifier.realtime_subscriptions.buffered`, `mollifier.stale_entries`, and the `mollifier.stale_entries.current` gauge. `envId` is a banned high-cardinality attribute; the structured warn log alongside each counter tick still carries envId for forensic drill-down. - Stale-sweep test name + comments now match the assertion shape (all three entries stale, not "two stale + one fresh"). - `RunEngine.createCancelledRun` P2002 path now requires the existing row's status to be CANCELED; a non-canceled conflict throws rather than silently reporting success, so the caller can route to `engine.cancelRun()` or skip. - Regression test pins the new conflict guard. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…leton Importing the production drainer wiring transitively loads \`~/v3/runEngine.server\`, whose top-level \`singleton(...)\` eagerly constructs a RunEngine. The constructor spins up Prisma + Redis workers that try to connect to localhost — in CI (no PG, no Redis) that produces an unhandled \`PrismaClientInitializationError\` which fails the run even though every assertion passes. Mock the runEngine and prisma modules so the unit test exercises only the bootstrap's error classification, not a live engine. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Container startup + the sweep loop can exceed Vitest's 5s default on
CI runners (passes in ~1.7-2s locally). Matches the explicit
\`{ timeout: 20_000 }\` other mollifier redisTests carry across the
project.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two bugs flagged by Devin on PR #3754: 1. entry.server.tsx reverted to \`void sessionsReplicationInstance;\`, which esbuild tree-shakes under \`"sideEffects": false\`. Restored the globalThis assignment + warning comment from #3738 (incident TRI-9864). Without this the sessions→ClickHouse logical replication slot stops being consumed at boot. 2. createFailedTaskRun unconditionally emitted \`runFailed\`, which the \`completeFailedRunEvent\` listener uses to write a span completion into ClickHouse. But TriggerFailedTaskService.call() already wraps createFailedTaskRun inside \`repository.traceEvent({ incomplete: false, isError: true })\` which writes its own completion row for the same (traceId, spanId). Two completions racing on the same span row is a real observability bug. Added an \`emitRunFailedEvent: boolean = true\` opt-out. The TriggerFailedTaskService.call() path now passes \`false\` and enqueues \`PerformTaskRunAlertsService\` directly after the trace event closes so the alerts side of \`runFailed\` is preserved. \`callWithoutTraceEvents\` and the mollifier drainer's terminal- failure path keep the default emit (they have no outer trace event managing the span). Regression test pins the opt-out: \`emitRunFailedEvent: false\` writes the PG row but does NOT fire the bus event. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Rename the catch-all mollifier.md and trim it to the drainer replay handler, stale sweep, telemetry gauge, and run-engine cancelled/failed APIs; later read/mutation/dashboard work is documented in its own PR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tage in mollifier drainer The mollifier drainer's cancel bifurcation called engine.createCancelledRun without handling its documented conflict contract: when the normal trigger replay path races ahead and materialises a live (non-CANCELED) row, the engine throws a conflict so the caller can "decide between engine.cancelRun() and skipping". The handler did neither — the conflict propagated, isRetryablePgError returned false, and the drainer buffer.fail()'d the entry, silently losing the cancellation while the run kept executing. Now route conflicts to engine.cancelRun() so the cancel actually wins. Separately, when engine.trigger fails non-retryably and the SYSTEM_FAILURE fallback write then fails because PG is transiently unreachable, rethrowing the original non-retryable error made the drainer buffer.fail() the entry — losing the run with no PG row ever landing, and dropping the write error entirely. Rethrow the retryable write error instead so the drainer requeues; the failure row lands once PG recovers. Non-retryable write failures still rethrow the original error as before. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Remove plan-tracking shorthand (Q# bifurcation, Phase C1/Q4) from replay-layer mollifier comments and test names; reword to plain English. Comment/test-name only; no behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…labels The mollifier queue moved from ZSET to LIST in an earlier refactor, but two comments still described it as a ZSET (`mollifierStaleSweep.server.ts:9` and `env.server.ts:1104` — both narrating the periodic stale sweep). Update to LIST. Also clean four residual internal plan-doc labels left over from prior cleanup passes: - `createCancelledRun` docstring (`engine/index.ts`) referenced "Q4 mollifier- cancel design" and "F4 bypass" — both dead nomenclature now that the gate's C1/C3/F4 labels have been rewritten. Restate the waitpoint-skip rationale in plain English: the mollifier gate refuses to buffer triggerAndWait children, so a cancelled buffered run never has a waiting parent to unblock. - `createCancelledRun.test.ts` empty-tags regression dropped "Found while running the Phase F challenge suite." — the comment describes the bug itself, which is self-contained. - `mollifierStaleSweep.test.ts` "scans across multiple orgs" rephrased away from "Phase-3 design has org-level fairness"; the prose now states the invariant directly. Comment/docstring-only; no behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous sweep was unbounded along two dimensions: every tick walked
every org and every env (via buffer.listOrgs + listEnvsForOrg). At the
sweep's default per-env entry cap of 1000, an incident-scale fan-out
gave O(orgs * envs * 1000) Redis round-trips per tick — running far
longer than the 5-minute interval and triggering the inFlight guard
to drop every subsequent tick until the slow pass finished.
Shard the work via a durable cursor:
- New file `mollifierStaleSweepState.server.ts` owns three Redis keys
(`mollifier:stale_sweep:{cursor,org_list,counts}`), all under the
mollifier namespace but separated from the buffer's own state. The
state class has its own Redis client; the buffer's existing
`MollifierBuffer` API surface is untouched.
- On `cursor === 0` the org list is rebuilt by snapshotting
`buffer.listOrgs()` into the frozen LIST — the cycle's frozen view.
- Each tick consumes up to `maxOrgsPerPass` orgs (default 100),
processes them, and advances the cursor. When the cursor reaches the
end of the LIST it wraps to 0; the next tick rebuilds and starts the
next cycle.
- The per-env counts HASH is the source of truth for the gauge
snapshot. Visiting an env with zero stale entries HDEL's its hash
field — gauge clears immediately on revisit. Envs not revisited this
tick keep their last-known value (durability across ticks AND across
webapp restarts), accepting a worst-case lag of one full cursor cycle
before a no-longer-stale env clears.
Snapshot contract change: only envs with non-zero stale counts appear
in the reported `Map`. The telemetry layer (`mollifierTelemetry.server.ts`
`reportStaleEntrySnapshot`) sums values, so absence is equivalent to
zero for the gauge — the alert behaviour is unchanged.
Tests:
- New: "shards work across ticks: cursor advances by maxOrgsPerPass and
wraps after a full cycle" — drives a 5-org fixture with cap=2,
asserts the cursor's three-tick progression and wrap.
- New: "clears an env from the durable snapshot on revisit when it has
entries but none currently stale" — same entry flips
stale→not-stale by varying the sweep's `now`, asserts HDEL on
revisit.
- Existing tests updated to inject `state`; one assertion shape
rewritten ("snapshot omits envs that have entries but none stale")
to match the new HDEL semantics.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Five gaps in the sharded sweep's test coverage, added 9 new tests covering each: - State durability across process restart: state1 populates the cursor + counts hash, closes its Redis client (simulated webapp restart), state2 constructs against the same Redis and picks up exactly where state1 left off. This is the headline benefit of storing sweep state in Redis instead of process memory; without this test it could silently regress. - Cycle wrap rebuilds the org list: a third org joins between cycles and is visible only in the next cycle's snapshot. Pins the rebuildOrgList-on-cursor=0 contract. - Empty buffer (no orgs) advances cleanly with zero work, empty snapshot, cursor stays at 0 instead of tripping the wrap math. - Buffer-null branch's clearAll: previously asserted only "snapshot is empty"; now also asserts the durable state was actually wiped (cursor=0, counts hash empty) so a re-enable doesn't resume on a stale cursor. - MollifierStaleSweepState direct unit tests (5 tests): readCursor default + corrupt-value tolerance, writeCursor round-trip, rebuildOrgList replace-not-append semantics, setEnvStaleCount HSET vs HDEL, clearAll DELs all three keys. Suite total: 7 existing + 9 new = 16 tests. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…on terminal failure Two Devin review findings on PR #3754, both real and unresolved: 1. Sharded stale sweep's counts hash never cleared for fully-drained envs — gauge stayed permanently elevated, false-alerting the recommended `> 0 for 5m` rule. Root cause: when an env's last buffered entry is popped, the buffer's atomic Lua removes the env from `mollifier:org-envs:${orgId}` (and removes the org from `mollifier:orgs` if it has no other envs). The sweep's inner loop walks `buffer.listEnvsForOrg(orgId)`, so the env disappears from the iteration entirely — `setEnvStaleCount(envId, 0)` (which HDELs the field) is never called, and the counts hash retains the env's last-known stale count forever. Fix (Devin's Approach 2): cycle-bounded reconciliation. Add a Redis SET `mollifier:stale_sweep:visited` that the sweep SADDs into for every env it touches. When the cursor wraps (cycle complete), `reconcileVisited()` does `HKEYS counts → SMEMBERS visited → HDEL the difference → DEL visited`. Pipelined; orphans clear within at most one full cursor cycle of the env going quiet, which matches the sharding contract's existing one-cycle freshness window. Test: "evicts fully-drained envs from the counts hash at cycle wrap" — accepts an entry, sweep flags it stale, pops the entry (env vanishes from listEnvsForOrg), runs another sweep that triggers wrap, asserts the env is HDEL'd from both the snapshot and the underlying counts hash. 2. Drainer handler's terminal SYSTEM_FAILURE write dropped the snapshot's `batch` field. If the buffered run was part of a batch, the failure row wasn't associated with the batch and the batch parent's completion tracking could hang indefinitely waiting on a child that landed but isn't visible to the batch. Fix: extract `snapshot.batch` with structural type guards and pass it through to `createFailedTaskRun`. Same defensive pattern as the other snapshot fields in this code path (the snapshot is typed `Record<string, unknown>` because it came from cjson-decoded buffer payload). Test: "propagates the batch association into createFailedTaskRun" — asserts the call site receives `{ id, index }` from the snapshot. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…Redis Devin review on PR #3754: `stop()` previously called `deps.state.close()` immediately after `clearInterval`, but `tick()` only checks `stopped` at entry. A tick that was already past that guard would keep making `state.*` calls against an ioredis client that `stop()` had already `quit()`ed — those calls would throw, the tick's own try/catch would swallow them as `mollifier.stale_sweep.failed` warnings, and every graceful shutdown would emit spurious noise. Track the current tick promise as `currentTick`. `stop()` awaits it (if present) before invoking `state.close()`, so the tick's last state call lands BEFORE the Redis client is quit. The tick's own try/catch handles the (unexpected) case where it rejects; the await in `stop()` is solely for ordering. Also drop the `instanceof MollifierStaleSweepState` guard around `state.close()` — `close()` is part of the `StaleSweepStateStore` contract, so unconditional invocation is correct. Test fake states implement `close()` as a no-op. Test: `stop() waits for an in-flight tick to finish before closing the state` — gates a fake state's `readCursor` on a Deferred, kicks off the interval, waits for the tick to start, then races `stop()` against the gate. Asserts the stop promise stays unresolved while the tick is mid-flight and that the tick's final state operation lands before `close()`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three unresolved Devin threads, all addressed in this commit. Committed
locally only — not pushed.
1. `callWithoutTraceEvents` was inheriting the new `emitRunFailedEvent`
default of `true`, so its `createFailedTaskRun` call would fire the
`runFailed` bus emit and the listener would write a ClickHouse
completion event row with empty `traceId`/`spanId` — orphan row,
directly contradicting the method's "without trace events" contract.
Pass `emitRunFailedEvent: false` and enqueue
`PerformTaskRunAlertsService` directly, mirroring the `call()`
pattern so customers' ERROR channels still see the failure.
2. The cjson empty-tags defense lived only on `createCancelledRun`, not
on `engine.trigger`. When the mollifier buffer's mutate-side Lua
re-serialises a payload (e.g. `append_tags` on a buffered run that
never had tags), an empty Lua table encodes as `{}` and decodes
back to a JS object — and the previous `tags.length === 0` check
passes that object straight to Prisma's `String[]` column.
Mirror the same `Array.isArray && tags.length > 0 ? tags : undefined`
guard `createCancelledRun` already uses. The defense is symmetric
with the existing tested case for createCancelledRun, so the same
contract holds for the trigger replay path.
3. `runCancelled` handler's `cancelRunEvent` lookup fails for
buffered-only runs (no primary trace event exists, since the
mollifier gate skipped `repository.traceEvent` for the
not-yet-materialised run). The handler's `tryCatch` swallowed the
error, but the systematic `[runCancelled] Failed to cancel run
event` log fired on every cancelled buffered run.
Add `emitRunCancelledEvent: boolean = true` to `createCancelledRun`
(symmetric with the existing `emitRunFailedEvent` flag on
`createFailedTaskRun`); drainer handler passes `false`. CANCELED PG
row still writes; only the trace-event mirror is skipped.
Tests:
- `RunEngine.createCancelledRun > emitRunCancelledEvent: false
suppresses the bus emit but still writes the CANCELED PG row` —
pins the new flag's semantics.
- `createDrainerHandler > calls createCancelledRun with
emitRunCancelledEvent: false (suppresses orphan trace-event log
noise)` — pins the call site's contract.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ts exhaustion Before: when engine.trigger() threw a retryable PG error (P1001 / P2024 / ECONNRESET) and the drainer exhausted TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS (default 3), MollifierDrainer.processEntry called buffer.fail() directly, which atomically marks the entry FAILED and DELs the hash. No PG row was ever written, so the customer's run silently disappeared. This fix introduces a new optional `onTerminalFailure` callback on MollifierDrainerOptions that fires before buffer.fail() on any terminal path. The webapp wires it to a wrapper around the existing createFailedTaskRun(...) snapshot block from the non-retryable path — factored into a shared writeMollifierTerminalFailureRow helper so both paths land an identical SYSTEM_FAILURE row. Behaviour: - Non-retryable error: handler writes the row inline (unchanged), then returns normally. Drainer ack()s. - Retryable error, attempts left: drainer requeues (unchanged). - Retryable error, attempts exhausted: drainer invokes onTerminalFailure with cause: "max-attempts-exhausted"; callback writes the SYSTEM_FAILURE row, then drainer calls buffer.fail(). - Callback itself throws retryable: drainer requeues so the next tick gets another shot at the PG write. - Callback throws non-retryable: log and fall through to buffer.fail so a genuinely malformed snapshot doesn't loop forever. Also corrects the stale Lua comment in buffer.ts's failMollifierEntry script that claimed the drainer "has already written a SYSTEM_FAILURE PG row for terminal failures" — only true for the non-retryable path before this change. Includes 5 unit tests in drainer.test.ts covering max-attempts-exhausted, non-retryable, callback-retryable-rerequeue, callback-non-retryable-falls-through, and no-callback back-compat. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…failures `writeMollifierTerminalFailureRow` was calling `createFailedTaskRun` without `emitRunFailedEvent: false`. The engine's `runFailed` event handler then called `completeFailedRunEvent`, which looks up the run's primary trace span. Buffered-only runs never had a primary trace event written for them — the mollifier gate intercepts BEFORE `repository.traceEvent` runs — so the lookup always failed, producing a systematic `[runFailed] Failed to complete failed run event` error log per drainer terminal failure. The errors are caught by the handler's `tryCatch`, but the per-failure noise pollutes Sentry and masks real errors. Mirrors what `TriggerFailedTaskService` already does at `triggerFailedTask.server.ts:212` and `:324`: - Pass `emitRunFailedEvent: false` to suppress the trace-event mirror. - Manually enqueue `PerformTaskRunAlertsService.enqueue(run.id)` so the customer's ERROR channel still fires. Best-effort, logged on failure but does not bubble (the SYSTEM_FAILURE row landing is the load-bearing customer-visible outcome). Also tightens the helper's return type from `boolean` to `TaskRun | null` so the alert side has the run id without an extra lookup. Both existing call sites (inline non-retryable + max-attempts-exhausted callback) keep working — the `if (!wrote)` truthy check is identical under `TaskRun | null`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
4500f0c to
8cab7c8
Compare
…RE fallback on cancel-bifurcation Two fixes from CodeRabbit on PR #3754: 1. `TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED` was inheriting `TRIGGER_MOLLIFIER_ENABLED`, so any deployment already running the mollifier would auto-start the sweep worker on upgrade — defeating the point of having a separate kill switch. Hard-default to "0" so the sweep is an explicit ops decision. 2. In the cancel-bifurcation path, a non-conflict + non-retryable error from `createCancelledRun` rethrew out of the handler. The drainer's `onTerminalFailure` handler gates on `cause === "max-attempts-exhausted"` and skips "non-retryable", so `buffer.fail()` deleted the entry without ever writing a PG row — the cancelled run disappeared silently. Mirror the SYSTEM_FAILURE fallback the non-cancel trigger path already uses: write a terminal row via the shared helper, falling back to the original throw only when the write also fails non-retryably (and re-throw retryable write errors so the drainer requeues). UX trade: the customer initiated a cancel but sees SYSTEM_FAILURE if the cancel write itself was structurally rejected. Terminal-but- mislabelled beats terminal silence — the alternative was the run disappearing from the dashboard entirely. The path is narrow (only non-conflict, non-retryable createCancelledRun failures). Two new unit tests cover the cancel branch: non-retryable creates the SYSTEM_FAILURE row + ACKs the entry; retryable rethrows so the drainer requeues. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… slice
`MollifierStaleSweepState.readOrgListSlice` was logging-and-returning
`{ orgs: [], total: 0 }` on pipeline-abort or per-result Redis errors.
The caller (`runStaleSweepOnce`) treated that as a clean empty cycle:
wrote cursor=0, reconciled visited envs against the empty result, and
cleared the stale-entry gauge — silencing the alerts the sweep exists
to raise.
Re-throw the underlying error instead. The interval wrapper's catch
logs `stale_sweep.failed` for the failed tick; durable cursor + counts
hash stay untouched so the gauge keeps reporting its last-known value
until a healthy tick repopulates it.
New unit test on the caller contract pins this: error propagates,
cursor stays at its seeded value, counts hash retains the seeded env,
no snapshot is reported.
Separately documents a known limitation in the .server-changes entry:
the sweep runs per-webapp-instance, so its stale-entry counter
multiplies by N webapps in HA until a distributed lease lands as a
follow-up. The system is HA-safe (Redis ops are atomic, no torn
state); only the metric output is inflated.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
The replay side of the mollifier:
DrainerHandler: reads buffered snapshots and replays them throughengine.triggerto materialise PG rows.RunEngine.createCancelledRun: new public method the handler uses to write CANCELED rows directly from snapshots (bypass queue + waitpoint, emitrunCancelled). Tolerates the cjson empty-table tags edge case found during validation.Both the drainer and sweep default-off; nothing fires unless flagged on (
TRIGGER_MOLLIFIER_DRAINER_ENABLED,TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED).Stacked on the trigger-time decisions PR.
Test plan
Ship-gate follow-up fix
Drainer writes SYSTEM_FAILURE on max-attempts exhaustion. Adds an
onTerminalFailurecallback onMollifierDrainerOptionsso the customer's run lands a SYSTEM_FAILURE PG row even when the drainer exhaustsMAX_ATTEMPTSon a retryable PG error (previouslybuffer.fail()was called with no row written → silent data loss). The callback runs beforebuffer.fail()on every terminal path (non-retryable AND max-attempts-exhausted), and re-throwing a retryable error from the callback causes the drainer to requeue rather than fail.Bumps
@trigger.dev/redis-workerto a minor changeset (additive option + new exported types). Includes 5 unit tests covering both terminal causes plus the requeue-on-retryable-callback-failure path and no-callback back-compat.