diff --git a/.changeset/wait-node-executor.md b/.changeset/wait-node-executor.md new file mode 100644 index 000000000..75c798950 --- /dev/null +++ b/.changeset/wait-node-executor.md @@ -0,0 +1,28 @@ +--- +"@objectstack/service-automation": minor +--- + +Implement the `wait` node executor — durable timer / signal pause. + +The flow designer offered a `wait` node but the engine had no executor for it, so +a flow using it couldn't run. `wait` now suspends the run on entry (ADR-0019 +durable pause, the same suspend/resume machinery as `screen` / `approval`) and +resumes by one of two paths, per `waitEventConfig.eventType`: + +- **timer** — schedules a one-shot job (`IJobService`, `{ type: 'once', at }`) + that calls `engine.resume(runId)` when the ISO-8601 `timerDuration` elapses. + With no job service the run still suspends and is resumable via an external + `resume(runId)` (logged) — never silently no-ops or fails the flow. +- **signal / webhook / manual / condition** — suspends with the signal name as + the correlation key; an external producer resumes the run when the event + arrives. + +Reads its run id from the engine-injected `$runId` variable (same mechanism the +approval node uses). Adds a `parseIsoDuration` helper (`PT1H`, `P3D`, `PT90M`, +`P1DT12H`, bare ms). Registered as a built-in node, so a bare +`AutomationServicePlugin` now ships 13 executors including `wait`. + +Tests: `wait-node.test.ts` — duration parsing, suspend→resume traversal, +one-shot job scheduling + handler-driven resume, named-signal suspend. +service-automation **113 passing**. A worked `showcase_task_follow_up` flow +(wait → notify) demonstrates it end-to-end. diff --git a/examples/app-showcase/src/flows/index.ts b/examples/app-showcase/src/flows/index.ts index ad5f38a62..12e3bfcba 100644 --- a/examples/app-showcase/src/flows/index.ts +++ b/examples/app-showcase/src/flows/index.ts @@ -362,6 +362,67 @@ export const TaskCompletedRestPingFlow = defineFlow({ ], }); +/** + * Task Follow-up Reminder — the worked `wait` (durable timer) example. + * + * When a task is created, the flow pauses at a `wait` node for a fixed delay, + * then reminds the assignee to update it. The `wait` node *suspends* the run + * (ADR-0019 durable pause, like `screen`/`approval`); a one-shot job scheduled + * via the job service (`{ type: 'once', at }`) resumes it when the timer + * elapses — so the delayed reminder fires end-to-end with no manual + * `engine.resume()`. Without a job service the run still suspends and can be + * resumed by an external `resume(runId)` (it never silently no-ops). + * + * A short demo delay keeps it observable in-session; a production reminder would + * use e.g. `timerDuration: 'P3D'`. Install + * `requires: ['automation', 'triggers', 'job', 'messaging']`. + */ +export const TaskFollowUpFlow = defineFlow({ + name: 'showcase_task_follow_up', + label: 'Task Follow-up Reminder (wait)', + description: 'Waits a fixed delay after a task is created, then reminds the assignee — demonstrates the durable wait node.', + type: 'autolaunched', + nodes: [ + { + id: 'start', + type: 'start', + label: 'On Task Created', + config: { + objectName: 'showcase_task', + triggerType: 'record-after-create', + }, + }, + { + id: 'hold', + type: 'wait', + label: 'Wait 1 min', + // Timer wait: suspends the run, then a one-shot job resumes it after the + // duration. ISO-8601 duration; production reminders would use e.g. 'P3D'. + waitEventConfig: { eventType: 'timer', timerDuration: 'PT1M', onTimeout: 'continue' }, + }, + { + id: 'remind', + type: 'notify', + label: 'Remind Assignee', + config: { + topic: 'task.followup', + recipients: ['{record.assignee}'], + channels: ['inbox'], + severity: 'info', + title: 'Follow up on: {record.title}', + message: 'This task has been open for a while — please update its status.', + actionUrl: '/showcase_task/{record.id}', + }, + }, + { id: 'end', type: 'end', label: 'End' }, + ], + edges: [ + { id: 'e1', source: 'start', target: 'hold' }, + { id: 'e2', source: 'hold', target: 'remind' }, + { id: 'e3', source: 'remind', target: 'end' }, + ], +}); + export const allFlows = [ TaskCompletedFlow, ReassignWizardFlow, @@ -370,4 +431,5 @@ export const allFlows = [ TaskAssignedNotifyFlow, ScheduledDigestFlow, TaskCompletedRestPingFlow, + TaskFollowUpFlow, ]; diff --git a/packages/services/service-automation/src/builtin/index.ts b/packages/services/service-automation/src/builtin/index.ts index 56c64db84..dfa67ee0d 100644 --- a/packages/services/service-automation/src/builtin/index.ts +++ b/packages/services/service-automation/src/builtin/index.ts @@ -33,6 +33,7 @@ import { registerScreenNodes } from './screen-nodes.js'; import { registerHttpNodes } from './http-nodes.js'; import { registerConnectorNodes } from './connector-nodes.js'; import { registerNotifyNode } from './notify-node.js'; +import { registerWaitNode } from './wait-node.js'; export { registerLogicNodes } from './logic-nodes.js'; export { registerCrudNodes } from './crud-nodes.js'; @@ -40,6 +41,7 @@ export { registerScreenNodes } from './screen-nodes.js'; export { registerHttpNodes } from './http-nodes.js'; export { registerConnectorNodes } from './connector-nodes.js'; export { registerNotifyNode } from './notify-node.js'; +export { registerWaitNode, parseIsoDuration } from './wait-node.js'; /** * Seed every built-in node executor into the engine. Called by @@ -53,6 +55,7 @@ export function installBuiltinNodes(engine: AutomationEngine, ctx: PluginContext registerHttpNodes(engine, ctx); registerConnectorNodes(engine, ctx); registerNotifyNode(engine, ctx); + registerWaitNode(engine, ctx); const types = engine.getRegisteredNodeTypes(); ctx.logger.info( diff --git a/packages/services/service-automation/src/builtin/wait-node.test.ts b/packages/services/service-automation/src/builtin/wait-node.test.ts new file mode 100644 index 000000000..1bf9988ce --- /dev/null +++ b/packages/services/service-automation/src/builtin/wait-node.test.ts @@ -0,0 +1,140 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { describe, it, expect, beforeEach } from 'vitest'; +import { AutomationEngine } from '../engine.js'; +import type { NodeExecutor } from '../engine.js'; +import { registerWaitNode, parseIsoDuration } from './wait-node.js'; +import type { IJobService, JobHandler, JobSchedule } from '@objectstack/spec/contracts'; + +function silentLogger() { + return { info() {}, warn() {}, error() {}, debug() {}, child() { return silentLogger(); } } as any; +} + +/** ctx with no job service (timer degrades to suspend-only). */ +function ctxNoJob() { + return { logger: silentLogger(), getService() { throw new Error('no service'); } } as any; +} + +/** A fake job service that records `schedule()` calls and exposes the handler. */ +function fakeJobCtx() { + const scheduled: Array<{ name: string; schedule: JobSchedule; handler: JobHandler }> = []; + const cancelled: string[] = []; + const job: IJobService = { + async schedule(name, schedule, handler) { scheduled.push({ name, schedule, handler }); }, + async cancel(name) { cancelled.push(name); }, + async trigger() {}, + }; + const ctx = { logger: silentLogger(), getService: (id: string) => (id === 'job' ? job : undefined) } as any; + return { ctx, scheduled, cancelled }; +} + +/** A marker executor that records the order it ran, to prove traversal resumed. */ +function markerExecutor(ran: string[]): NodeExecutor { + return { type: 'mark', async execute(node) { ran.push(node.id); return { success: true }; } }; +} + +const waitFlow = (waitConfig: Record) => ({ + name: 'wait_flow', + label: 'Wait Flow', + type: 'autolaunched', + nodes: [ + { id: 'start', type: 'start', label: 'Start' }, + { id: 'pause', type: 'wait', label: 'Wait', waitEventConfig: waitConfig }, + { id: 'after', type: 'mark', label: 'After' }, + { id: 'end', type: 'end', label: 'End' }, + ], + edges: [ + { id: 'e1', source: 'start', target: 'pause' }, + { id: 'e2', source: 'pause', target: 'after' }, + { id: 'e3', source: 'after', target: 'end' }, + ], +}); + +describe('parseIsoDuration', () => { + it('parses ISO-8601 durations to ms', () => { + expect(parseIsoDuration('PT1H')).toBe(3_600_000); + expect(parseIsoDuration('P3D')).toBe(259_200_000); + expect(parseIsoDuration('PT90M')).toBe(5_400_000); + expect(parseIsoDuration('P1DT12H')).toBe(129_600_000); + expect(parseIsoDuration('PT30S')).toBe(30_000); + expect(parseIsoDuration('P1W')).toBe(604_800_000); + }); + it('treats a plain number / numeric string as ms', () => { + expect(parseIsoDuration(5000)).toBe(5000); + expect(parseIsoDuration('5000')).toBe(5000); + }); + it('returns undefined for unparseable / non-positive input', () => { + expect(parseIsoDuration('')).toBeUndefined(); + expect(parseIsoDuration('1 hour')).toBeUndefined(); + expect(parseIsoDuration('P')).toBeUndefined(); + expect(parseIsoDuration(0)).toBeUndefined(); + expect(parseIsoDuration(-5)).toBeUndefined(); + expect(parseIsoDuration(undefined)).toBeUndefined(); + }); +}); + +describe('wait node executor', () => { + let engine: AutomationEngine; + let ran: string[]; + + beforeEach(() => { + engine = new AutomationEngine(silentLogger()); + ran = []; + engine.registerNodeExecutor(markerExecutor(ran)); + }); + + it('suspends the run on entry and resumes downstream via resume(runId)', async () => { + registerWaitNode(engine, ctxNoJob()); + engine.registerFlow('wait_flow', waitFlow({ eventType: 'timer', timerDuration: 'PT1H' })); + + const paused = await engine.execute('wait_flow'); + expect(paused.status).toBe('paused'); + expect(paused.runId).toBeTruthy(); + expect(ran).toEqual([]); // downstream not yet run — the wait held the run + + const suspended = engine.listSuspendedRuns(); + expect(suspended).toHaveLength(1); + expect(suspended[0]).toMatchObject({ nodeId: 'pause', flowName: 'wait_flow' }); + + const resumed = await engine.resume(paused.runId!); + expect(resumed.success).toBe(true); + expect(resumed.status).toBeUndefined(); // ran to completion + expect(ran).toEqual(['after']); // traversal continued past the wait + }); + + it('schedules a one-shot job that resumes the run when a job service is present', async () => { + const { ctx, scheduled, cancelled } = fakeJobCtx(); + registerWaitNode(engine, ctx); + engine.registerFlow('wait_flow', waitFlow({ eventType: 'timer', timerDuration: 'PT2H' })); + + const before = Date.now(); + const paused = await engine.execute('wait_flow'); + expect(paused.status).toBe('paused'); + expect(ran).toEqual([]); + + // A single one-shot job was scheduled ~2h out. + expect(scheduled).toHaveLength(1); + expect(scheduled[0].schedule.type).toBe('once'); + const at = new Date(scheduled[0].schedule.at!).getTime(); + expect(at).toBeGreaterThanOrEqual(before + 7_200_000 - 1000); + expect(at).toBeLessThanOrEqual(Date.now() + 7_200_000 + 1000); + + // Firing the scheduled handler resumes the run + cancels the one-shot. + await scheduled[0].handler({ jobId: scheduled[0].name }); + expect(ran).toEqual(['after']); + expect(cancelled).toContain(scheduled[0].name); + }); + + it('suspends on a named signal and resumes when the signal arrives', async () => { + registerWaitNode(engine, ctxNoJob()); + engine.registerFlow('wait_flow', waitFlow({ eventType: 'signal', signalName: 'contract.renewed' })); + + const paused = await engine.execute('wait_flow'); + expect(paused.status).toBe('paused'); + expect(engine.listSuspendedRuns()[0]).toMatchObject({ nodeId: 'pause', correlation: 'contract.renewed' }); + + const resumed = await engine.resume(paused.runId!); + expect(resumed.success).toBe(true); + expect(ran).toEqual(['after']); + }); +}); diff --git a/packages/services/service-automation/src/builtin/wait-node.ts b/packages/services/service-automation/src/builtin/wait-node.ts new file mode 100644 index 000000000..bd42121d1 --- /dev/null +++ b/packages/services/service-automation/src/builtin/wait-node.ts @@ -0,0 +1,137 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import type { PluginContext } from '@objectstack/core'; +import { defineActionDescriptor } from '@objectstack/spec/automation'; +import type { IJobService } from '@objectstack/spec/contracts'; +import type { AutomationEngine } from '../engine.js'; + +/** + * `wait` built-in node — a durable pause (ADR-0019 suspend/resume), the timer / + * signal sibling of the human-input `screen` and `approval` nodes. + * + * On entry the node *suspends* the run (the engine snapshots the continuation + * and returns `{ status: 'paused', runId }`). How it resumes depends on + * `waitEventConfig.eventType`: + * + * - **timer** — schedule a one-shot job (`IJobService`, `{ type: 'once', at }`) + * that calls `engine.resume(runId)` when the duration elapses. With no job + * service the node still suspends, but resumption must come from an external + * `resume(runId)` (logged) — never silently no-ops or fails the flow, + * matching the platform's degrade-don't-crash convention. + * - **signal / webhook / manual / condition** — suspend with the signal name as + * the correlation key; an external producer resumes the run when the event + * arrives (`resume(runId)`), exactly like a decision-less approval. + * + * Reads its own run id from the `$runId` variable the engine injects at start + * (same mechanism the approval node uses to map external state back to the run). + */ +export function registerWaitNode(engine: AutomationEngine, ctx: PluginContext): void { + const getJobService = (): IJobService | undefined => { + try { + return ctx.getService('job'); + } catch { + return undefined; + } + }; + + engine.registerNodeExecutor({ + type: 'wait', + descriptor: defineActionDescriptor({ + type: 'wait', + version: '1.0.0', + name: 'Wait', + description: 'Pause the flow until a timer elapses or a named signal arrives.', + icon: 'timer-reset', + category: 'logic', + source: 'builtin', + // Durable pause — the run suspends and resumes later (timer/signal). + supportsPause: true, + isAsync: true, + }), + async execute(node, variables, _context) { + // Prefer the spec-structured `waitEventConfig` block; fall back to a loose + // `config` for hand-authored flows that put the same keys under config. + const loose = (node.config ?? {}) as Record; + const wec = (node.waitEventConfig ?? {}) as Record; + const eventType = String(wec.eventType ?? loose.eventType ?? 'timer'); + const runId = variables.get('$runId'); + + if (eventType === 'timer') { + const durationMs = + parseIsoDuration(wec.timerDuration ?? loose.timerDuration ?? loose.duration) ?? + (typeof wec.timeoutMs === 'number' ? wec.timeoutMs : undefined) ?? + (typeof loose.timeoutMs === 'number' ? (loose.timeoutMs as number) : undefined); + + const job = getJobService(); + if (job && runId != null && durationMs && durationMs > 0) { + const jobName = `flow-wait:${String(runId)}:${node.id}`; + const at = new Date(Date.now() + durationMs).toISOString(); + try { + await job.schedule(jobName, { type: 'once', at }, async () => { + try { + await engine.resume(String(runId)); + } finally { + // One-shot: drop the job so it never re-fires. + try { + await job.cancel?.(jobName); + } catch { + /* best-effort */ + } + } + }); + return { success: true, suspend: true, correlation: jobName }; + } catch (err) { + ctx.logger.warn( + `[wait] node '${node.id}': failed to schedule timer resume (${(err as Error)?.message ?? err}); ` + + `suspending without auto-resume (resume it via resume(runId))`, + ); + } + } else if (!job) { + ctx.logger.warn( + `[wait] node '${node.id}': no job service registered — suspending without an auto-resume timer ` + + `(resume it via resume(runId), or install the job service for durable timers)`, + ); + } + // Degrade: still suspend; resumption comes from an external resume(). + return { success: true, suspend: true, correlation: `timer:${node.id}` }; + } + + // signal / webhook / manual / condition — suspend; an external producer + // resumes the run when the named event arrives. + const signal = String(wec.signalName ?? loose.signalName ?? loose.signal ?? `wait:${node.id}`); + return { success: true, suspend: true, correlation: signal }; + }, + }); + + ctx.logger.info('[Wait Node] 1 built-in node executor registered'); +} + +/** + * Parse an ISO-8601 duration (the subset flows use — weeks/days + a time part + * of hours/minutes/seconds, e.g. `PT1H`, `P3D`, `PT90M`, `P1DT12H`) into + * milliseconds. A bare positive number is treated as milliseconds. Returns + * `undefined` for anything unparseable / non-positive. + */ +export function parseIsoDuration(input: unknown): number | undefined { + if (typeof input === 'number' && Number.isFinite(input)) return input > 0 ? input : undefined; + if (typeof input !== 'string') return undefined; + const s = input.trim(); + if (!s) return undefined; + // Plain numeric string ⇒ milliseconds. + if (/^\d+(?:\.\d+)?$/.test(s)) { + const n = Number(s); + return n > 0 ? n : undefined; + } + const m = /^P(?:(\d+)W)?(?:(\d+)D)?(?:T(?:(\d+)H)?(?:(\d+)M)?(?:(\d+(?:\.\d+)?)S)?)?$/.exec(s); + if (!m) return undefined; + const [, w, d, h, min, sec] = m; + if (!w && !d && !h && !min && !sec) return undefined; + const totalSec = + Number(w ?? 0) * 7 * 86_400 + + Number(d ?? 0) * 86_400 + + Number(h ?? 0) * 3_600 + + Number(min ?? 0) * 60 + + Number(sec ?? 0); + const ms = totalSec * 1000; + return ms > 0 ? ms : undefined; +}