Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .changeset/wait-node-executor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
"@objectstack/service-automation": minor
---

Implement the `wait` node executor — durable timer / signal pause.

The flow designer offered a `wait` node but the engine had no executor for it, so
a flow using it couldn't run. `wait` now suspends the run on entry (ADR-0019
durable pause, the same suspend/resume machinery as `screen` / `approval`) and
resumes by one of two paths, per `waitEventConfig.eventType`:

- **timer** — schedules a one-shot job (`IJobService`, `{ type: 'once', at }`)
that calls `engine.resume(runId)` when the ISO-8601 `timerDuration` elapses.
With no job service the run still suspends and is resumable via an external
`resume(runId)` (logged) — never silently no-ops or fails the flow.
- **signal / webhook / manual / condition** — suspends with the signal name as
the correlation key; an external producer resumes the run when the event
arrives.

Reads its run id from the engine-injected `$runId` variable (same mechanism the
approval node uses). Adds a `parseIsoDuration` helper (`PT1H`, `P3D`, `PT90M`,
`P1DT12H`, bare ms). Registered as a built-in node, so a bare
`AutomationServicePlugin` now ships 13 executors including `wait`.

Tests: `wait-node.test.ts` — duration parsing, suspend→resume traversal,
one-shot job scheduling + handler-driven resume, named-signal suspend.
service-automation **113 passing**. A worked `showcase_task_follow_up` flow
(wait → notify) demonstrates it end-to-end.
62 changes: 62 additions & 0 deletions examples/app-showcase/src/flows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,67 @@ export const TaskCompletedRestPingFlow = defineFlow({
],
});

/**
* Task Follow-up Reminder — the worked `wait` (durable timer) example.
*
* When a task is created, the flow pauses at a `wait` node for a fixed delay,
* then reminds the assignee to update it. The `wait` node *suspends* the run
* (ADR-0019 durable pause, like `screen`/`approval`); a one-shot job scheduled
* via the job service (`{ type: 'once', at }`) resumes it when the timer
* elapses — so the delayed reminder fires end-to-end with no manual
* `engine.resume()`. Without a job service the run still suspends and can be
* resumed by an external `resume(runId)` (it never silently no-ops).
*
* A short demo delay keeps it observable in-session; a production reminder would
* use e.g. `timerDuration: 'P3D'`. Install
* `requires: ['automation', 'triggers', 'job', 'messaging']`.
*/
export const TaskFollowUpFlow = defineFlow({
name: 'showcase_task_follow_up',
label: 'Task Follow-up Reminder (wait)',
description: 'Waits a fixed delay after a task is created, then reminds the assignee — demonstrates the durable wait node.',
type: 'autolaunched',
nodes: [
{
id: 'start',
type: 'start',
label: 'On Task Created',
config: {
objectName: 'showcase_task',
triggerType: 'record-after-create',
},
},
{
id: 'hold',
type: 'wait',
label: 'Wait 1 min',
// Timer wait: suspends the run, then a one-shot job resumes it after the
// duration. ISO-8601 duration; production reminders would use e.g. 'P3D'.
waitEventConfig: { eventType: 'timer', timerDuration: 'PT1M', onTimeout: 'continue' },
},
{
id: 'remind',
type: 'notify',
label: 'Remind Assignee',
config: {
topic: 'task.followup',
recipients: ['{record.assignee}'],
channels: ['inbox'],
severity: 'info',
title: 'Follow up on: {record.title}',
message: 'This task has been open for a while — please update its status.',
actionUrl: '/showcase_task/{record.id}',
},
},
{ id: 'end', type: 'end', label: 'End' },
],
edges: [
{ id: 'e1', source: 'start', target: 'hold' },
{ id: 'e2', source: 'hold', target: 'remind' },
{ id: 'e3', source: 'remind', target: 'end' },
],
});

export const allFlows = [
TaskCompletedFlow,
ReassignWizardFlow,
Expand All @@ -370,4 +431,5 @@ export const allFlows = [
TaskAssignedNotifyFlow,
ScheduledDigestFlow,
TaskCompletedRestPingFlow,
TaskFollowUpFlow,
];
3 changes: 3 additions & 0 deletions packages/services/service-automation/src/builtin/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ import { registerScreenNodes } from './screen-nodes.js';
import { registerHttpNodes } from './http-nodes.js';
import { registerConnectorNodes } from './connector-nodes.js';
import { registerNotifyNode } from './notify-node.js';
import { registerWaitNode } from './wait-node.js';

export { registerLogicNodes } from './logic-nodes.js';
export { registerCrudNodes } from './crud-nodes.js';
export { registerScreenNodes } from './screen-nodes.js';
export { registerHttpNodes } from './http-nodes.js';
export { registerConnectorNodes } from './connector-nodes.js';
export { registerNotifyNode } from './notify-node.js';
export { registerWaitNode, parseIsoDuration } from './wait-node.js';

/**
* Seed every built-in node executor into the engine. Called by
Expand All @@ -53,6 +55,7 @@ export function installBuiltinNodes(engine: AutomationEngine, ctx: PluginContext
registerHttpNodes(engine, ctx);
registerConnectorNodes(engine, ctx);
registerNotifyNode(engine, ctx);
registerWaitNode(engine, ctx);

const types = engine.getRegisteredNodeTypes();
ctx.logger.info(
Expand Down
140 changes: 140 additions & 0 deletions packages/services/service-automation/src/builtin/wait-node.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

import { describe, it, expect, beforeEach } from 'vitest';
import { AutomationEngine } from '../engine.js';
import type { NodeExecutor } from '../engine.js';
import { registerWaitNode, parseIsoDuration } from './wait-node.js';
import type { IJobService, JobHandler, JobSchedule } from '@objectstack/spec/contracts';

function silentLogger() {
return { info() {}, warn() {}, error() {}, debug() {}, child() { return silentLogger(); } } as any;
}

/** ctx with no job service (timer degrades to suspend-only). */
function ctxNoJob() {
return { logger: silentLogger(), getService() { throw new Error('no service'); } } as any;
}

/** A fake job service that records `schedule()` calls and exposes the handler. */
function fakeJobCtx() {
const scheduled: Array<{ name: string; schedule: JobSchedule; handler: JobHandler }> = [];
const cancelled: string[] = [];
const job: IJobService = {
async schedule(name, schedule, handler) { scheduled.push({ name, schedule, handler }); },
async cancel(name) { cancelled.push(name); },
async trigger() {},
};
const ctx = { logger: silentLogger(), getService: (id: string) => (id === 'job' ? job : undefined) } as any;
return { ctx, scheduled, cancelled };
}

/** A marker executor that records the order it ran, to prove traversal resumed. */
function markerExecutor(ran: string[]): NodeExecutor {
return { type: 'mark', async execute(node) { ran.push(node.id); return { success: true }; } };
}

const waitFlow = (waitConfig: Record<string, unknown>) => ({
name: 'wait_flow',
label: 'Wait Flow',
type: 'autolaunched',
nodes: [
{ id: 'start', type: 'start', label: 'Start' },
{ id: 'pause', type: 'wait', label: 'Wait', waitEventConfig: waitConfig },
{ id: 'after', type: 'mark', label: 'After' },
{ id: 'end', type: 'end', label: 'End' },
],
edges: [
{ id: 'e1', source: 'start', target: 'pause' },
{ id: 'e2', source: 'pause', target: 'after' },
{ id: 'e3', source: 'after', target: 'end' },
],
});

describe('parseIsoDuration', () => {
it('parses ISO-8601 durations to ms', () => {
expect(parseIsoDuration('PT1H')).toBe(3_600_000);
expect(parseIsoDuration('P3D')).toBe(259_200_000);
expect(parseIsoDuration('PT90M')).toBe(5_400_000);
expect(parseIsoDuration('P1DT12H')).toBe(129_600_000);
expect(parseIsoDuration('PT30S')).toBe(30_000);
expect(parseIsoDuration('P1W')).toBe(604_800_000);
});
it('treats a plain number / numeric string as ms', () => {
expect(parseIsoDuration(5000)).toBe(5000);
expect(parseIsoDuration('5000')).toBe(5000);
});
it('returns undefined for unparseable / non-positive input', () => {
expect(parseIsoDuration('')).toBeUndefined();
expect(parseIsoDuration('1 hour')).toBeUndefined();
expect(parseIsoDuration('P')).toBeUndefined();
expect(parseIsoDuration(0)).toBeUndefined();
expect(parseIsoDuration(-5)).toBeUndefined();
expect(parseIsoDuration(undefined)).toBeUndefined();
});
});

describe('wait node executor', () => {
let engine: AutomationEngine;
let ran: string[];

beforeEach(() => {
engine = new AutomationEngine(silentLogger());
ran = [];
engine.registerNodeExecutor(markerExecutor(ran));
});

it('suspends the run on entry and resumes downstream via resume(runId)', async () => {
registerWaitNode(engine, ctxNoJob());
engine.registerFlow('wait_flow', waitFlow({ eventType: 'timer', timerDuration: 'PT1H' }));

const paused = await engine.execute('wait_flow');
expect(paused.status).toBe('paused');
expect(paused.runId).toBeTruthy();
expect(ran).toEqual([]); // downstream not yet run — the wait held the run

const suspended = engine.listSuspendedRuns();
expect(suspended).toHaveLength(1);
expect(suspended[0]).toMatchObject({ nodeId: 'pause', flowName: 'wait_flow' });

const resumed = await engine.resume(paused.runId!);
expect(resumed.success).toBe(true);
expect(resumed.status).toBeUndefined(); // ran to completion
expect(ran).toEqual(['after']); // traversal continued past the wait
});

it('schedules a one-shot job that resumes the run when a job service is present', async () => {
const { ctx, scheduled, cancelled } = fakeJobCtx();
registerWaitNode(engine, ctx);
engine.registerFlow('wait_flow', waitFlow({ eventType: 'timer', timerDuration: 'PT2H' }));

const before = Date.now();
const paused = await engine.execute('wait_flow');
expect(paused.status).toBe('paused');
expect(ran).toEqual([]);

// A single one-shot job was scheduled ~2h out.
expect(scheduled).toHaveLength(1);
expect(scheduled[0].schedule.type).toBe('once');
const at = new Date(scheduled[0].schedule.at!).getTime();
expect(at).toBeGreaterThanOrEqual(before + 7_200_000 - 1000);
expect(at).toBeLessThanOrEqual(Date.now() + 7_200_000 + 1000);

// Firing the scheduled handler resumes the run + cancels the one-shot.
await scheduled[0].handler({ jobId: scheduled[0].name });
expect(ran).toEqual(['after']);
expect(cancelled).toContain(scheduled[0].name);
});

it('suspends on a named signal and resumes when the signal arrives', async () => {
registerWaitNode(engine, ctxNoJob());
engine.registerFlow('wait_flow', waitFlow({ eventType: 'signal', signalName: 'contract.renewed' }));

const paused = await engine.execute('wait_flow');
expect(paused.status).toBe('paused');
expect(engine.listSuspendedRuns()[0]).toMatchObject({ nodeId: 'pause', correlation: 'contract.renewed' });

const resumed = await engine.resume(paused.runId!);
expect(resumed.success).toBe(true);
expect(ran).toEqual(['after']);
});
});
Loading