diff --git a/src/attachments.ts b/src/attachments.ts new file mode 100644 index 0000000..b27b6b6 --- /dev/null +++ b/src/attachments.ts @@ -0,0 +1,143 @@ +import { resolveUrl, getExt } from "./utils.js"; +import type { InboundAttachment, InboundAttachmentKind, AttachmentRecord } from "./types/types.js"; + +const IMAGE_EXTENSIONS = new Set(["png", "jpg", "jpeg", "gif", "webp", "bmp", "tiff", "tif"]); +const VIDEO_EXTENSIONS = new Set(["mp4", "mov", "mkv", "webm", "avi", "m4v"]); +const AUDIO_EXTENSIONS = new Set(["mp3", "m4a", "ogg", "oga", "opus", "wav", "flac", "aac", "amr", "weba"]); +const DOCUMENT_EXTENSIONS = new Set(["pdf", "doc", "docx", "ppt", "pptx", "xls", "xlsx", "txt", "md", "csv", "json"]); +const DOCUMENT_MIME_TYPES = new Set([ + "application/pdf", + "application/msword", + "application/vnd.ms-excel", + "application/vnd.ms-powerpoint", + "application/vnd.openxmlformats-officedocument.presentationml.presentation", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + "application/json", + "text/csv", + "text/markdown", + "text/plain", +]); + +export function getMessageAttachmentInputs(message: { + attachments?: unknown[]; + file?: unknown; + files?: unknown[]; +}): unknown[] { + const hasId = (r: AttachmentRecord) => typeof r._id === "string" && r._id.length > 0; + const fileRecords = toRecords([ + ...(message.file ? [message.file] : []), + ...(message.files ?? []), + ]); + const fileIds = new Set(fileRecords.filter(hasId).map((r) => r._id)); + const attachmentRecords = toRecords(message.attachments ?? []).filter( + (r) => !hasId(r) || !fileIds.has(r._id), + ); + + const hasUrl = (r: AttachmentRecord) => + typeof r.url === "string" || + typeof r.title_link === "string" || + typeof r.image_url === "string" || + typeof r.video_url === "string" || + typeof r.audio_url === "string"; + + const merged: AttachmentRecord[] = []; + const paired = new Set(); + + for (const fileRec of fileRecords) { + const matchIdx = attachmentRecords.findIndex((att, i) => + !paired.has(i) && ( + (fileRec._id && att._id && fileRec._id === att._id) || + (!hasId(att) && hasUrl(att)) + ), + ); + if (matchIdx !== -1) { + paired.add(matchIdx); + const att = attachmentRecords[matchIdx]!; + const m = { ...att } as AttachmentRecord; + if (fileRec._id) m._id = fileRec._id; + if (fileRec.type) m.type = fileRec.type; + if (fileRec.name) m.name = fileRec.name; + if (typeof fileRec.size === "number") m.size = fileRec.size; + merged.push(m); + } else { + merged.push(fileRec); + } + } + + for (let i = 0; i < attachmentRecords.length; i++) { + if (!paired.has(i)) merged.push(attachmentRecords[i]!); + } + + return merged; +} + +export function normalizeInboundAttachments( + inputs: unknown[], + options?: { serverUrl?: string }, +): InboundAttachment[] { + return inputs.map((input) => toAttachment(input, options)); +} + +function toAttachment(input: unknown, options?: { serverUrl?: string }): InboundAttachment { + const record = asRecord(input); + const mimeType = getMime(record); + const url = getUrl(record, options?.serverUrl); + const fileName = getFileName(record, url); + return { + kind: classify(mimeType, fileName), + source: record?._id ? "rocketchat-file" : "rocketchat-attachment", + raw: input, + ...(mimeType !== undefined ? { mimeType } : {}), + ...(fileName !== undefined ? { fileName } : {}), + ...(url !== undefined ? { url } : {}), + ...(typeof record?.size === "number" ? { sizeBytes: record.size } : {}), + }; +} + +function asRecord(input: unknown): AttachmentRecord | null { + return input && typeof input === "object" && !Array.isArray(input) ? input as AttachmentRecord : null; +} + +function toRecords(inputs: unknown[]): AttachmentRecord[] { + return inputs.map(asRecord).filter((r): r is AttachmentRecord => r !== null); +} + +function getMime(record: AttachmentRecord | null): string | undefined { + const v = record?.type ?? record?.mimeType ?? record?.mimetype ?? record?.contentType; + return typeof v === "string" && v.trim().length > 0 ? v.trim().toLowerCase() : undefined; +} + +function getUrl(record: AttachmentRecord | null, serverUrl: string | undefined): string | undefined { + const candidates = [record?.url, record?.title_link, record?.image_url, record?.video_url, record?.audio_url]; + const raw = candidates.find((v): v is string => typeof v === "string" && v.length > 0); + return raw ? resolveUrl(raw, serverUrl) : undefined; +} + +function getFileName(record: AttachmentRecord | null, url: string | undefined): string | undefined { + const name = [record?.title, record?.name, record?.filename].find( + (v): v is string => typeof v === "string" && v.trim().length > 0, + ); + if (name) return name.trim(); + if (!url) return undefined; + try { + const seg = new URL(url).pathname.split("/").filter(Boolean).at(-1); + return seg ? decodeURIComponent(seg) : undefined; + } catch { return undefined; } +} + +function classify(mimeType: string | undefined, fileName: string | undefined): InboundAttachmentKind { + if (mimeType?.startsWith("image/")) return "image"; + if (mimeType?.startsWith("audio/")) return "audio"; + if (mimeType?.startsWith("video/")) return "video"; + if (mimeType?.startsWith("text/") || (mimeType && DOCUMENT_MIME_TYPES.has(mimeType))) return "document"; + const ext = getExt(fileName); + if (!ext) return "unknown"; + if (IMAGE_EXTENSIONS.has(ext)) return "image"; + if (AUDIO_EXTENSIONS.has(ext)) return "audio"; + if (VIDEO_EXTENSIONS.has(ext)) return "video"; + if (DOCUMENT_EXTENSIONS.has(ext)) return "document"; + return "unknown"; +} + + diff --git a/src/cli/admin-api.ts b/src/cli/admin-api.ts index 8472c12..51df0d9 100644 --- a/src/cli/admin-api.ts +++ b/src/cli/admin-api.ts @@ -1,6 +1,23 @@ import { RocketChatClientError } from "../client.js"; +import { getErrorMessage } from "../utils.js"; import type { RCLoginResult, RCUser, JsonObject } from "../types/types.js"; +function extractRecord(json: JsonObject, field: string): Record { + const value = json[field]; + if (!value || typeof value !== "object" || Array.isArray(value)) { + throw new RocketChatClientError(`RC API response missing or invalid "${field}"`); + } + return value as Record; +} + +function extractString(obj: Record, key: string): string { + const v = obj[key]; + if (typeof v !== "string" || v.length === 0) { + throw new RocketChatClientError(`RC API response missing or invalid "${key}"`); + } + return v; +} + type RCFetchOpts = { method?: string; body?: Record; @@ -27,16 +44,10 @@ async function adminFetch(baseUrl: string, path: string, opts: RCFetchOpts = {}) return json; } -function getErrorMessage(payload: JsonObject, fallback: string): string { - if (typeof payload.error === "string" && payload.error.length > 0) return payload.error; - if (typeof payload.message === "string" && payload.message.length > 0) return payload.message; - return fallback; -} - export async function loginAs(baseUrl: string, user: string, password: string): Promise { const json = await adminFetch(baseUrl, "/api/v1/login", { body: { user, password } }); - const data = json.data as { userId: string; authToken: string }; - return { userId: data.userId, authToken: data.authToken }; + const data = extractRecord(json, "data"); + return { userId: extractString(data, "userId"), authToken: extractString(data, "authToken") }; } export async function createBotUser( @@ -58,8 +69,8 @@ export async function createBotUser( sendWelcomeEmail: false, }, }); - const user = json.user as RCUser; - return { _id: user._id, username: user.username, name: user.name }; + const userRecord = extractRecord(json, "user"); + return { _id: extractString(userRecord, "_id"), username: extractString(userRecord, "username"), name: extractString(userRecord, "name") }; } export async function getUserByUsername( @@ -86,8 +97,8 @@ export async function createDirectMessage(baseUrl: string, auth: RCLoginResult, authToken: auth.authToken, body: { username }, }); - const room = json.room as { _id: string }; - return room._id; + const room = extractRecord(json, "room"); + return extractString(room, "_id"); } export async function sendMessage(baseUrl: string, auth: RCLoginResult, roomId: string, text: string): Promise { diff --git a/src/cli/config-updater.ts b/src/cli/config-updater.ts index a294c3d..548b152 100644 --- a/src/cli/config-updater.ts +++ b/src/cli/config-updater.ts @@ -1,19 +1,19 @@ import { existsSync, readFileSync, writeFileSync, renameSync } from "node:fs"; import { resolve } from "node:path"; import { homedir } from "node:os"; - -type TokenAuth = { mode: "token"; userId: string; accessToken: string }; +import type { AuthCredentials, JsonObject } from "../types/types.js"; const OC_CONFIG_PATH = resolve(homedir(), ".openclaw", "openclaw.json"); -type OcConfig = Record; +/** Config updater only writes token auth (CLI setup always resolves to a token) */ +type TokenAuth = Extract; -function readConfig(): OcConfig { +function readConfig(): JsonObject { if (!existsSync(OC_CONFIG_PATH)) return {}; return JSON.parse(readFileSync(OC_CONFIG_PATH, "utf-8")); } -function writeConfig(cfg: OcConfig): void { +function writeConfig(cfg: JsonObject): void { const tmp = OC_CONFIG_PATH + ".tmp"; writeFileSync(tmp, JSON.stringify(cfg, null, 2) + "\n", "utf-8"); renameSync(tmp, OC_CONFIG_PATH); @@ -55,4 +55,3 @@ export function updateConfig(opts: { writeConfig(cfg); } - diff --git a/src/client.ts b/src/client.ts index d2c1f6f..7884719 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,3 +1,12 @@ +import { mkdir, writeFile, readFile } from "node:fs/promises"; +import { join, basename } from "node:path"; +import { randomUUID } from "node:crypto"; + +import { resolveOpenClawDir, resolveUrl, getExt, getErrorMessage } from "./utils.js"; + +const MAX_DOWNLOAD_BYTES = 20 * 1024 * 1024; +const ALLOWED_DOWNLOAD_MIME_PREFIXES = ["image/"]; + import type { PluginAccountConfig, RocketChatIdentity, @@ -28,6 +37,7 @@ export class RocketChatClient { private readonly serverUrl: string; private readonly auth: PluginAccountConfig["auth"]; private readonly fetchFn: typeof fetch; + private readonly mediaDir: string; private identity: RocketChatIdentity | null = null; private resolvedUserId: string | null = null; private resolvedAuthToken: string | null = null; @@ -36,6 +46,7 @@ export class RocketChatClient { this.serverUrl = options.serverUrl.replace(/\/+$/, ""); this.auth = options.auth; this.fetchFn = options.fetch ?? globalThis.fetch; + this.mediaDir = join(resolveOpenClawDir(), "media"); if (this.auth.mode === "token") { this.resolvedUserId = this.auth.userId; @@ -124,6 +135,108 @@ export class RocketChatClient { }); } + async downloadAttachmentToTempFile( + url: string, + options?: { fileName?: string }, + ): Promise { + await this.ensureInitialized(); + const requestUrl = resolveUrl(url, this.serverUrl); + if (isBlockedUrl(requestUrl)) { + throw new RocketChatClientError(`attachment download blocked: ${requestUrl} resolves to a private/internal address`); + } + const response = await this.fetchFn(requestUrl, { + method: "GET", + headers: { + Accept: "*/*", + "X-User-Id": this.resolvedUserId!, + "X-Auth-Token": this.resolvedAuthToken!, + }, + }); + if (!response.ok) { + throw new RocketChatClientError(`attachment download failed: ${response.statusText}`); + } + const contentTypeHeader = response.headers.get("Content-Type"); + const rawContentType = (contentTypeHeader ?? "").split(";")[0]?.trim().toLowerCase() ?? ""; + if (!rawContentType || !ALLOWED_DOWNLOAD_MIME_PREFIXES.some((p) => rawContentType.startsWith(p))) { + throw new RocketChatClientError( + `attachment download refused: unsupported content type "${contentTypeHeader ?? ""}"`, + ); + } + const contentLength = response.headers.get("Content-Length"); + if (contentLength) { + const bytes = Number.parseInt(contentLength, 10); + if (!Number.isNaN(bytes) && bytes > MAX_DOWNLOAD_BYTES) { + throw new RocketChatClientError( + `attachment download refused: Content-Length ${bytes} exceeds max ${MAX_DOWNLOAD_BYTES}`, + ); + } + } + const inboundDir = join(this.mediaDir, "inbound"); + await mkdir(inboundDir, { recursive: true }); + const ext = getExt(options?.fileName ?? url ?? "attachment"); + const safeName = (options?.fileName ?? "attachment").replace(/[^a-zA-Z0-9._-]/g, "_"); + const filePath = join(inboundDir, `${safeName}---${randomUUID().slice(0, 12)}${ext ? `.${ext}` : ""}`); + const bytes = Buffer.from(await response.arrayBuffer()); + if (bytes.length > MAX_DOWNLOAD_BYTES) { + throw new RocketChatClientError( + `attachment download refused: actual size ${bytes.length} exceeds max ${MAX_DOWNLOAD_BYTES}`, + ); + } + await writeFile(filePath, bytes); + return filePath; + } + + async uploadAttachment( + roomId: string, + filePath: string, + text?: string, + options?: { tmid?: string }, + ): Promise { + await this.ensureInitialized(); + const fileName = basename(filePath); + const fileBytes = await readFile(filePath); + const formData = new FormData(); + if (text?.trim()) formData.append("msg", text.trim()); + if (options?.tmid) formData.append("tmid", options.tmid); + formData.append("file", new Blob([fileBytes]), fileName); + const uploadResponse = await this.fetchFn( + new URL(`/api/v1/rooms.media/${encodeURIComponent(roomId)}`, this.serverUrl).toString(), + { + method: "POST", + headers: { + "X-User-Id": this.resolvedUserId!, + "X-Auth-Token": this.resolvedAuthToken!, + }, + body: formData, + }, + ); + if (!uploadResponse.ok) { + throw new RocketChatClientError(`attachment upload failed: ${uploadResponse.statusText}`); + } + const uploadPayload = await this.parseJsonResponse(uploadResponse); + const file = asObject(uploadPayload.file ?? {}); + const fileId = getString(file, "_id"); + const confirmResponse = await this.fetchFn( + new URL( + `/api/v1/rooms.mediaConfirm/${encodeURIComponent(roomId)}/${encodeURIComponent(fileId)}`, + this.serverUrl, + ).toString(), + { + method: "POST", + headers: { + "X-User-Id": this.resolvedUserId!, + "X-Auth-Token": this.resolvedAuthToken!, + }, + }, + ); + if (!confirmResponse.ok) { + throw new RocketChatClientError(`attachment confirm failed: ${confirmResponse.statusText}`); + } + const confirmPayload = await this.parseJsonResponse(confirmResponse); + const message = asObject(confirmPayload.message ?? {}); + return getString(message, "_id"); + } + private async ensureInitialized(): Promise { if (!this.resolvedUserId || !this.resolvedAuthToken) { await this.initialize(); @@ -150,8 +263,10 @@ export class RocketChatClient { private async parseJsonResponse(response: Response): Promise { let payload: JsonObject; try { - payload = (await response.json()) as JsonObject; + const parsed = await response.json(); + payload = asObject(parsed); } catch (err) { + if (err instanceof RocketChatClientError) throw err; throw new RocketChatClientError( `Rocket.Chat API returned non-JSON response (status ${response.status}): ${err instanceof Error ? err.message : String(err)}`, ); @@ -175,6 +290,29 @@ export class RocketChatClient { } } +function isBlockedUrl(url: string): boolean { + try { + const hostname = new URL(url).hostname.toLowerCase(); + if ( + hostname === "localhost" || + hostname === "127.0.0.1" || + hostname === "::1" || + hostname === "0.0.0.0" || + hostname.endsWith(".local") || + hostname.endsWith(".internal") || + hostname.startsWith("10.") || + hostname.startsWith("192.168.") || + /^172\.(1[6-9]|2\d|3[01])\./.test(hostname) || + /^169\.254\./.test(hostname) + ) { + return true; + } + return false; + } catch { + return false; + } +} + function asObject(value: unknown): JsonObject { if (value && typeof value === "object" && !Array.isArray(value)) { return value as JsonObject; @@ -197,18 +335,6 @@ function getOptionalString(object: JsonObject, key: string): string | null { return typeof value === "string" && value.length > 0 ? value : null; } -function getErrorMessage(payload: JsonObject, fallback: string): string { - if (typeof payload.error === "string" && payload.error.length > 0) { - return payload.error; - } - - if (typeof payload.message === "string" && payload.message.length > 0) { - return payload.message; - } - - return fallback; -} - function getRetryAfterMs(response: Response, payload: JsonObject): number { const retryAfterHeader = response.headers.get("Retry-After"); if (retryAfterHeader) { @@ -230,3 +356,5 @@ function getRetryAfterMs(response: Response, payload: JsonObject): number { return 30_000; } + + diff --git a/src/gateway.ts b/src/gateway.ts index 308db60..fccb61d 100644 --- a/src/gateway.ts +++ b/src/gateway.ts @@ -1,9 +1,8 @@ -import { homedir } from "node:os"; -import { join } from "node:path"; - +import { resolveOpenClawDir } from "./utils.js"; import { RocketChatClient, RocketChatRateLimitError } from "./client.js"; import { parsePluginConfig } from "./config.js"; import { FileCheckpointStore } from "./checkpoint-store.js"; +import { getMessageAttachmentInputs, normalizeInboundAttachments } from "./attachments.js"; import type { InboundEvent } from "./types/types.js"; import { shouldHandleInboundEvent } from "./channel.js"; import { dispatchInboundEventWithChannelRuntime } from "./inbound-dispatch.js"; @@ -12,8 +11,14 @@ import type { OpenClawConfig, GatewayContext, OpenClawConfigLike, + RocketChatIdentity, + OutboundReplyPayload, + ReplyDeliverInfo, } from "./types/types.js"; +const MAX_MESSAGE_LENGTH = 10_000; +const MAX_ATTACHMENTS = 5; + export type ClientEntry = { client: RocketChatClient; generation: number; wakeup: () => void }; export const activeClients = new Map(); let nextGeneration = 0; @@ -39,6 +44,152 @@ export function isConfigured(account: Partial | null | undefine return Boolean(account.auth); } +class PollState { + blockedUntilMs = 0; + consecutiveEmptyPolls = 0; + timer: ReturnType | null = null; + stopped = false; + warnedAboutMissingRuntime = false; + + getInterval(): number { + if (this.consecutiveEmptyPolls < 3) return 3_000; + if (this.consecutiveEmptyPolls < 10) return 10_000; + if (this.consecutiveEmptyPolls < 20) return 30_000; + return 60_000; + } + + recordCycle(foundMessages: boolean): void { + this.consecutiveEmptyPolls = foundMessages ? 0 : this.consecutiveEmptyPolls + 1; + } + + block(ms: number): void { + this.blockedUntilMs = Date.now() + ms; + this.consecutiveEmptyPolls = Math.max(this.consecutiveEmptyPolls, 10); + } + + isBlocked(): boolean { + return Date.now() < this.blockedUntilMs; + } + + resetBackoff(): void { + this.consecutiveEmptyPolls = 0; + } +} + +async function pollOnce( + client: RocketChatClient, + checkpoint: FileCheckpointStore, + identity: RocketChatIdentity, + mentionNames: string[], + ctx: GatewayContext, + account: ResolvedAccount, + state: PollState, +): Promise { + const stateData = await checkpoint.read(); + if (!stateData.updatedSince) { + await checkpoint.write({ updatedSince: new Date().toISOString(), recentMessageIds: stateData.recentMessageIds }); + return; + } + + const seenIds = new Set(stateData.recentMessageIds); + const subscriptions = await client.listSubscriptions(stateData.updatedSince); + let nextUpdatedSince = stateData.updatedSince; + let foundMessages = false; + + for (const sub of subscriptions) { + const subTs = sub._updatedAt ?? sub.updatedAt ?? null; + if (subTs && subTs > nextUpdatedSince) nextUpdatedSince = subTs; + + const messages = await client.syncMessages(sub.rid, stateData.updatedSince); + messages.sort((a, b) => (a.ts ?? a._updatedAt ?? "").localeCompare(b.ts ?? b._updatedAt ?? "")); + for (const msg of messages) { + const msgTs = msg.ts ?? msg._updatedAt ?? null; + if (msgTs && msgTs > nextUpdatedSince) nextUpdatedSince = msgTs; + + if (shouldSkipMessage(msg, identity.userId, seenIds)) continue; + + const event = toInboundEvent(account.accountId, sub, msg, account.serverUrl); + + if (!shouldHandleInboundEvent(event, { botUserId: identity.userId, mentionNames })) continue; + + foundMessages = true; + + logger.info(`[rocketchat:${account.accountId}] inbound from ${event.senderName}: "${event.text.slice(0, 80)}"`); + + if (ctx.channelRuntime) { + await handleMessage(ctx, event, client, account.accountId); + } else if (!state.warnedAboutMissingRuntime) { + state.warnedAboutMissingRuntime = true; + logger.error(`[rocketchat:${account.accountId}] channel runtime is unavailable; inbound messages will be ignored`); + } + + seenIds.add(msg._id); + await checkpoint.write({ updatedSince: nextUpdatedSince, recentMessageIds: [...seenIds].slice(-250) }); + } + } + + state.recordCycle(foundMessages); + logger.info(`[rocketchat:${account.accountId}] poll done (found=${foundMessages}, CEP=${state.consecutiveEmptyPolls}, nextInterval=${state.getInterval()}ms)`); +} + +async function handleMessage( + ctx: GatewayContext, + event: InboundEvent, + client: RocketChatClient, + accountId: string, +): Promise { + const replyTmid = event.tmid ?? undefined; + const channelRuntime = ctx.channelRuntime; + if (!channelRuntime) return; + + await client.reactToMessage( + event.messageId, + PROCESSING_EMOJIS[Math.floor(Math.random() * PROCESSING_EMOJIS.length)]!, + ).catch((err) => logger.error(`[rocketchat:${accountId}] reaction failed: ${err instanceof Error ? err.message : String(err)}`)); + + await dispatchInboundEventWithChannelRuntime({ + cfg: (ctx.cfg ?? {}) as OpenClawConfigLike, + accountId, + event, + channelRuntime, + client, + deliver: (payload, info) => sendReply(client, event.roomId, event.messageId, replyTmid, accountId, payload, info), + onRecordError: (error) => { + logger.error(`[rocketchat:${accountId}] failed to record inbound session: ${error instanceof Error ? error.message : String(error)}`); + }, + onDispatchError: (error, info) => { + logger.error(`[rocketchat:${accountId}] ${info.kind} dispatch failed: ${error instanceof Error ? error.message : String(error)}`); + }, + }); +} + +async function sendReply( + client: RocketChatClient, + roomId: string, + messageId: string, + replyTmid: string | undefined, + accountId: string, + payload: OutboundReplyPayload, + info: ReplyDeliverInfo, +): Promise { + if (info.kind !== "final") return; + + await client.reactToMessage(messageId, ":white_check_mark:").catch((err) => + logger.error(`[rocketchat:${accountId}] reaction failed: ${err instanceof Error ? err.message : String(err)}`) + ); + + if (payload.attachmentPath) { + try { + await client.uploadAttachment(roomId, payload.attachmentPath, payload.text, replyTmid ? { tmid: replyTmid } : undefined); + } catch (err) { + logger.error(`[rocketchat:${accountId}] upload failed: ${err instanceof Error ? err.message : String(err)}`); + await client.postMessage(roomId, payload.text ?? "", replyTmid ? { tmid: replyTmid } : undefined); + } + } else { + await client.postMessage(roomId, payload.text ?? "", replyTmid ? { tmid: replyTmid } : undefined); + } +} + export async function startGateway(ctx: GatewayContext): Promise { const account = ctx.account ?? resolveAccount(ctx.cfg ?? {}, ctx.accountId); if (!account || !account.enabled) { @@ -46,166 +197,62 @@ export async function startGateway(ctx: GatewayContext): Promise { return; } - const log = logger; - const auth = account.auth; const client = new RocketChatClient({ serverUrl: account.serverUrl, - auth, + auth: account.auth, }); const identity = await client.getIdentity(); const generation = nextGeneration++; ctx.setStatus?.("connected"); - log.info(`[rocketchat:${account.accountId}] connected as ${identity.username}`); + logger.info(`[rocketchat:${account.accountId}] connected as ${identity.username}`); - const stateDir = resolveStateDir(); + const stateDir = resolveOpenClawDir(); const checkpointPath = `${stateDir}/rocketchat/${account.accountId}.json`; const checkpoint = new FileCheckpointStore(checkpointPath, 250); - - let blockedUntilMs = 0; - let consecutiveEmptyPolls = 0; - let timer: ReturnType | null = null; - let stopped = false; - let warnedAboutMissingRuntime = false; - - const getPollInterval = (): number => { - if (consecutiveEmptyPolls < 3) return 3_000; - if (consecutiveEmptyPolls < 10) return 10_000; - if (consecutiveEmptyPolls < 20) return 30_000; - return 60_000; - }; - + const state = new PollState(); const mentionNames = dedupeMentions([identity.username, ...account.mentionNames]); const safePollOnce = async (): Promise => { - if (stopped) return; - if (Date.now() < blockedUntilMs) { - log.info(`[rocketchat:${account.accountId}] poll skipped, blocked for ${blockedUntilMs - Date.now()}ms`); + if (state.stopped) return; + if (state.isBlocked()) { + logger.info(`[rocketchat:${account.accountId}] poll skipped, blocked for ${state.blockedUntilMs - Date.now()}ms`); return; } - log.info(`[rocketchat:${account.accountId}] poll tick (CEP=${consecutiveEmptyPolls}, interval=${getPollInterval()}ms)`); try { - const state = await checkpoint.read(); - if (!state.updatedSince) { - await checkpoint.write({ updatedSince: new Date().toISOString(), recentMessageIds: state.recentMessageIds }); - return; - } - - const seenIds = new Set(state.recentMessageIds); - const subscriptions = await client.listSubscriptions(state.updatedSince); - let nextUpdatedSince = state.updatedSince; - let foundMessages = false; - - for (const sub of subscriptions) { - const subTs = sub._updatedAt ?? sub.updatedAt ?? null; - if (subTs && subTs > nextUpdatedSince) { - nextUpdatedSince = subTs; - } - - const messages = await client.syncMessages(sub.rid, state.updatedSince); - messages.sort((a, b) => (a.ts ?? a._updatedAt ?? "").localeCompare(b.ts ?? b._updatedAt ?? "")); - for (const msg of messages) { - const msgTs = msg.ts ?? msg._updatedAt ?? null; - if (msgTs && msgTs > nextUpdatedSince) { - nextUpdatedSince = msgTs; - } - - if (shouldSkipMessage(msg, identity.userId, seenIds)) { - continue; - } - - const event = toInboundEvent(account.accountId, sub, msg); - - if (!shouldHandleInboundEvent(event, { botUserId: identity.userId, mentionNames })) { - continue; - } - - foundMessages = true; - - log.info( - `[rocketchat:${account.accountId}] inbound from ${event.senderName}: "${event.text.slice(0, 80)}"`, - ); - - if (ctx.channelRuntime) { - const channelRuntime = ctx.channelRuntime; - const replyTmid = event.tmid ?? undefined; - - await client.reactToMessage( - event.messageId, - PROCESSING_EMOJIS[Math.floor(Math.random() * PROCESSING_EMOJIS.length)]! - ).catch((err) => log.error(`[rocketchat:${account.accountId}] reaction failed: ${err instanceof Error ? err.message : String(err)}`)); - - await dispatchInboundEventWithChannelRuntime({ - cfg: (ctx.cfg ?? {}) as OpenClawConfigLike, - accountId: account.accountId, - event, - channelRuntime, - deliver: async (payload, info) => { - if (info.kind === "final") { - await client.reactToMessage(event.messageId, ":white_check_mark:").catch((err) => log.error(`[rocketchat:${account.accountId}] reaction failed: ${err instanceof Error ? err.message : String(err)}`)); - await client.postMessage(event.roomId, payload.text ?? "", replyTmid ? { tmid: replyTmid } : undefined); - } - }, - onRecordError: (error) => { - log.error( - `[rocketchat:${account.accountId}] failed to record inbound session: ${error instanceof Error ? error.message : String(error)}`, - ); - }, - onDispatchError: (error, info) => { - log.error( - `[rocketchat:${account.accountId}] ${info.kind} dispatch failed: ${error instanceof Error ? error.message : String(error)}`, - ); - }, - }); - } else { - if (!warnedAboutMissingRuntime) { - warnedAboutMissingRuntime = true; - log.error( - `[rocketchat:${account.accountId}] channel runtime is unavailable; inbound messages will be ignored`, - ); - } - } - - seenIds.add(msg._id); - await checkpoint.write({ updatedSince: nextUpdatedSince, recentMessageIds: [...seenIds].slice(-250) }); - } - } - - consecutiveEmptyPolls = foundMessages ? 0 : consecutiveEmptyPolls + 1; - log.info(`[rocketchat:${account.accountId}] poll done (found=${foundMessages}, CEP=${consecutiveEmptyPolls}, nextInterval=${getPollInterval()}ms)`); + await pollOnce(client, checkpoint, identity, mentionNames, ctx, account, state); } catch (err) { if (err instanceof RocketChatRateLimitError) { - blockedUntilMs = Date.now() + err.retryAfterMs; - consecutiveEmptyPolls = Math.max(consecutiveEmptyPolls, 10); - log.error(`[rocketchat:${account.accountId}] rate limited, backing off ${err.retryAfterMs}ms`); + state.block(err.retryAfterMs); + logger.error(`[rocketchat:${account.accountId}] rate limited, backing off ${err.retryAfterMs}ms`); } else { - log.error(`[rocketchat:${account.accountId}] poll error: ${err instanceof Error ? err.message : String(err)}`); + logger.error(`[rocketchat:${account.accountId}] poll error: ${err instanceof Error ? err.message : String(err)}`); } } }; const scheduleNext = () => { - if (stopped) return; - const delay = getPollInterval(); - log.info(`[rocketchat:${account.accountId}] next poll in ${delay}ms (CEP=${consecutiveEmptyPolls})`); - timer = setTimeout(async () => { + if (state.stopped) return; + const delay = state.getInterval(); + logger.info(`[rocketchat:${account.accountId}] next poll in ${delay}ms (CEP=${state.consecutiveEmptyPolls})`); + state.timer = setTimeout(async () => { await safePollOnce(); scheduleNext(); }, delay); }; const wakeup = () => { - if (stopped) return; - log.info(`[rocketchat:${account.accountId}] wakeup triggered, resetting CEP`); - consecutiveEmptyPolls = 0; - if (timer) { - clearTimeout(timer); - timer = null; + if (state.stopped) return; + logger.info(`[rocketchat:${account.accountId}] wakeup triggered, resetting CEP`); + state.resetBackoff(); + if (state.timer) { + clearTimeout(state.timer); + state.timer = null; } scheduleNext(); }; - + activeClients.set(account.accountId, { client, generation, wakeup }); await safePollOnce(); @@ -214,17 +261,17 @@ export async function startGateway(ctx: GatewayContext): Promise { try { await new Promise((resolve) => { if (ctx.abortSignal?.aborted) { - stopped = true; + state.stopped = true; resolve(); return; } ctx.abortSignal?.addEventListener("abort", () => { - stopped = true; + state.stopped = true; resolve(); }, { once: true }); }); } finally { - if (timer) clearTimeout(timer); + if (state.timer) clearTimeout(state.timer); const current = activeClients.get(account.accountId); if (current?.generation === generation) { activeClients.delete(account.accountId); @@ -240,7 +287,7 @@ function shouldSkipMessage( ): boolean { if (!msg._id) return true; if (msg.t) return true; - if ((!msg.msg || msg.msg.trim().length === 0)) return true; + if ((!msg.msg || msg.msg.trim().length === 0) && getMessageAttachmentInputs(msg).length === 0) return true; if (msg.u?._id === botUserId) return true; if (seenIds.has(msg._id)) return true; return false; @@ -250,7 +297,9 @@ function toInboundEvent( accountId: string, sub: import("./types/types.js").RocketChatSubscriptionRecord, msg: import("./types/types.js").RocketChatMessageRecord, + serverUrl?: string, ): InboundEvent { + const rawAttachments = getMessageAttachmentInputs(msg); return { accountId, roomId: msg.rid, @@ -259,8 +308,9 @@ function toInboundEvent( tmid: msg.tmid ?? null, senderId: msg.u?._id ?? "", senderName: msg.u?.username ?? msg.u?.name ?? "", - text: msg.msg ?? "", + text: (msg.msg ?? "").slice(0, MAX_MESSAGE_LENGTH), mentions: (msg.mentions ?? []).map((m) => m.username ?? m.name ?? "").filter(Boolean), + attachments: normalizeInboundAttachments(rawAttachments.slice(0, MAX_ATTACHMENTS), serverUrl ? { serverUrl } : undefined), sentAt: msg.ts ?? new Date(0).toISOString(), raw: msg, }; @@ -272,14 +322,6 @@ function mapRoomType(t: string | undefined): InboundEvent["roomType"] { return "channel"; } -function resolveStateDir(): string { - const explicit = process.env.OPENCLAW_STATE_DIR?.trim(); - if (explicit) return explicit; - const home = process.env.OPENCLAW_HOME?.trim(); - if (home) return join(home, ".openclaw"); - return join(homedir(), ".openclaw"); -} - const PROCESSING_EMOJIS = [ ":eyes:", ":thinking:", ":hourglass:", ":gear:", ":robot:", ":arrows_counterclockwise:", ":bulb:", ":mag:" diff --git a/src/inbound-dispatch.ts b/src/inbound-dispatch.ts index 85cba65..101ff5b 100644 --- a/src/inbound-dispatch.ts +++ b/src/inbound-dispatch.ts @@ -1,4 +1,5 @@ -import type { InboundEvent, OpenClawConfigLike, OutboundReplyPayload, ReplyDeliverInfo, ChannelRuntimeLike } from "./types/types.js"; +import type { InboundEvent, OpenClawConfigLike, OutboundReplyPayload, ReplyDeliverInfo, ChannelRuntimeLike, InboundAttachment } from "./types/types.js"; +import type { RocketChatClient } from "./client.js"; export async function dispatchInboundEventWithChannelRuntime(params: { cfg: OpenClawConfigLike; @@ -8,6 +9,7 @@ export async function dispatchInboundEventWithChannelRuntime(params: { deliver(payload: OutboundReplyPayload, info: ReplyDeliverInfo): Promise; onRecordError(err: unknown): void; onDispatchError(err: unknown, info: ReplyDeliverInfo): void; + client?: RocketChatClient; }): Promise { const route = params.channelRuntime.routing.resolveAgentRoute({ cfg: params.cfg, @@ -62,6 +64,7 @@ export async function dispatchInboundEventWithChannelRuntime(params: { Timestamp: timestamp, OriginatingChannel: "rocketchat", OriginatingTo: to, + ...(await buildMediaContext(params.event.attachments, params.client)), }); await params.channelRuntime.session.recordInboundSession({ @@ -95,18 +98,20 @@ function normalizeOutboundReplyPayload(payload: unknown): OutboundReplyPayload { } const record = payload as Record; - const mediaUrls = Array.isArray(record.mediaUrls) - ? record.mediaUrls.filter((value): value is string => typeof value === "string" && value.trim().length > 0) - : undefined; const text = typeof record.text === "string" ? record.text : undefined; const mediaUrl = typeof record.mediaUrl === "string" ? record.mediaUrl : undefined; + const mediaUrls = Array.isArray(record.mediaUrls) + ? record.mediaUrls.filter((value): value is string => typeof value === "string" && value.trim().length > 0) + : undefined; + const attachmentPath = typeof record.attachmentPath === "string" ? record.attachmentPath : undefined; const replyToId = typeof record.replyToId === "string" ? record.replyToId : undefined; return { ...(text ? { text } : {}), ...(mediaUrl ? { mediaUrl } : {}), ...(mediaUrls && mediaUrls.length > 0 ? { mediaUrls } : {}), + ...(attachmentPath ? { attachmentPath } : {}), ...(replyToId ? { replyToId } : {}), }; } @@ -126,6 +131,52 @@ function buildRecipientAddress(event: InboundEvent): string { return `rocketchat:${event.roomId}`; } +async function buildMediaContext( + attachments: InboundAttachment[], + client?: RocketChatClient, +): Promise> { + if (attachments.length === 0) return {}; + + const results = await Promise.all( + attachments.map(async (attachment) => { + if (attachment.url && client) { + try { + const filePath = await client.downloadAttachmentToTempFile(attachment.url, attachment.fileName ? { fileName: attachment.fileName } : undefined); + return { kind: "path" as const, value: filePath, mimeType: attachment.mimeType }; + } catch { + // download failed — fall through to URL injection + } + } + + if (attachment.url) { + return { kind: "url" as const, value: attachment.url, mimeType: attachment.mimeType }; + } + + return null; + }), + ); + + const mediaUrls: string[] = []; + const mediaPaths: string[] = []; + const mediaTypes: string[] = []; + + for (const r of results) { + if (!r) continue; + if (r.kind === "path") { + mediaPaths.push(r.value); + } else { + mediaUrls.push(r.value); + } + if (r.mimeType) mediaTypes.push(r.mimeType); + } + + return { + ...(mediaUrls.length > 0 ? { MediaUrl: mediaUrls[0], MediaUrls: mediaUrls } : {}), + ...(mediaPaths.length > 0 ? { MediaPath: mediaPaths[0], MediaPaths: mediaPaths } : {}), + ...(mediaTypes.length > 0 ? { MediaType: mediaTypes[0], MediaTypes: mediaTypes } : {}), + }; +} + function toEpochMs(value: string): number | undefined { const timestamp = Date.parse(value); return Number.isNaN(timestamp) ? undefined : timestamp; diff --git a/src/index.ts b/src/index.ts index bd1bee7..7bbe6ba 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,26 +1,21 @@ -import type { GatewayApi } from "./types/types.js"; -import { rocketchatPlugin, startGateway, listAccountIds, resolveAccount, isConfigured } from "./plugin.js"; +import { defineChannelPluginEntry } from "openclaw/plugin-sdk/channel-core"; +import type { OpenClawPluginApi, OpenClawPluginDefinition } from "openclaw/plugin-sdk/core"; +import { rocketchatPlugin, startGateway } from "./plugin.js"; -export function register(api: GatewayApi) { - api.registerChannel?.({ plugin: rocketchatPlugin }); -} +export { startGateway } from "./plugin.js"; -export function activate(api: GatewayApi) { - api.registerGatewayMethod("rocketchat.gateway.startAccount", (ctx) => { - return startGateway(ctx as Parameters[0]); - }); -} - -export default { +const _entry = defineChannelPluginEntry({ id: "rocketchat", name: "Rocket.Chat", description: "Rocket.Chat channel plugin with REST polling outbound/inbound", plugin: rocketchatPlugin, - config: { - listAccountIds, - resolveAccount, - isConfigured, + registerFull: (api: OpenClawPluginApi) => { + api.registerGatewayMethod("rocketchat.gateway.startAccount", (ctx) => { + return startGateway(ctx as unknown as Parameters[0]); + }); }, - register, - activate, -}; +}); + +const entry: OpenClawPluginDefinition & { channelPlugin: typeof rocketchatPlugin } = _entry; + +export default entry; diff --git a/src/plugin.ts b/src/plugin.ts index 4883463..d65c6f5 100644 --- a/src/plugin.ts +++ b/src/plugin.ts @@ -1,79 +1,84 @@ +import { createChatChannelPlugin } from "openclaw/plugin-sdk/channel-core"; +import type { OpenClawConfig } from "openclaw/plugin-sdk/core"; import { RocketChatClient } from "./client.js"; import { startGateway, resolveAccount, listAccountIds, isConfigured, activeClients } from "./gateway.js"; -import type { OpenClawConfig } from "./types/types.js"; +import type { ResolvedAccount } from "./types/types.js"; export { startGateway, resolveAccount, listAccountIds, isConfigured }; -export const rocketchatPlugin = { - id: "rocketchat", - meta: { +export const rocketchatPlugin = createChatChannelPlugin({ + base: { id: "rocketchat", - label: "Rocket.Chat", - selectionLabel: "Rocket.Chat", - blurb: "Rocket.Chat channel plugin with REST polling outbound/inbound", - aliases: ["rc"], - }, - capabilities: { chatTypes: ["direct", "group", "channel"] }, - config: { - listAccountIds, - resolveAccount, - isConfigured, - }, - threading: { - topLevelReplyToMode: "reply" as const, - }, - messaging: { - targetPrefixes: ["rocketchat", "channel", "user", "@"], - normalizeTarget: (target: string): string | undefined => { - const trimmed = target?.trim(); - if (!trimmed) return undefined; - return trimmed.replace(/^rocketchat:(?:channel:|user:)?/i, "").replace(/^channel:/i, ""); + meta: { + id: "rocketchat", + label: "Rocket.Chat", + selectionLabel: "Rocket.Chat", + docsPath: "https://rocket.chat/docs", + blurb: "Rocket.Chat channel plugin with REST polling outbound/inbound", + aliases: ["rc"], }, - targetResolver: { - looksLikeId: (id: string): boolean => { - const trimmed = id?.trim(); - if (!trimmed) return false; - return /^[a-z0-9_]{4,32}$/i.test(trimmed) || /^rocketchat:/i.test(trimmed) || /^channel:/i.test(trimmed) || /^user:/i.test(trimmed) || /^@/.test(trimmed); + capabilities: { chatTypes: ["direct", "group", "channel"] }, + config: { + listAccountIds: listAccountIds as (cfg: OpenClawConfig) => string[], + resolveAccount: resolveAccount as (cfg: OpenClawConfig, accountId?: string | null) => ResolvedAccount, + isConfigured: ((account: unknown, _cfg: OpenClawConfig): boolean => isConfigured(account as Parameters[0])) as (account: ResolvedAccount, cfg: OpenClawConfig) => boolean, + }, + messaging: { + targetPrefixes: ["rocketchat", "channel", "user", "@"], + normalizeTarget: (target: string): string | undefined => { + const trimmed = target?.trim(); + if (!trimmed) return undefined; + return trimmed.replace(/^rocketchat:(?:channel:|user:)?/i, "").replace(/^channel:/i, ""); + }, + targetResolver: { + looksLikeId: (id: string): boolean => { + const trimmed = id?.trim(); + if (!trimmed) return false; + return /^[a-z0-9_]{4,32}$/i.test(trimmed) || /^rocketchat:/i.test(trimmed) || /^channel:/i.test(trimmed) || /^user:/i.test(trimmed) || /^@/.test(trimmed); + }, + hint: "", }, - hint: "", + }, + gateway: { + startAccount: (ctx: unknown) => startGateway(ctx as Parameters[0]), }, }, + threading: { + topLevelReplyToMode: "reply", + }, outbound: { - deliveryMode: "direct" as const, - resolveTarget: ({ to }: { to: string }) => { - const trimmed = to?.trim(); - if (!trimmed) return { ok: false as const, error: new Error("Rocket.Chat send requires a target id") }; - const normalized = trimmed.replace(/^rocketchat:(?:channel:|user:)?/i, "").replace(/^channel:/i, ""); - return { ok: true as const, to: normalized }; + base: { + deliveryMode: "direct", + resolveTarget: (params) => { + const to = params?.to?.trim(); + if (!to) return { ok: false as const, error: new Error("Rocket.Chat send requires a target id") }; + const normalized = to.replace(/^rocketchat:(?:channel:|user:)?/i, "").replace(/^channel:/i, ""); + return { ok: true as const, to: normalized }; + }, }, - sendText: async (params: { - cfg?: unknown; - accountId?: string; - to: string; - text: string; - replyToId?: string; - }): Promise<{ ok: boolean; messageId: string; channel: string }> => { - let account = resolveAccount(params.cfg ?? {}, params.accountId); - if (!account) { - const accounts = listAccountIds(params.cfg as OpenClawConfig); - if (accounts.length > 0) { - account = resolveAccount(params.cfg ?? {}, accounts[0]); + attachedResults: { + channel: "rocketchat", + sendText: async (ctx) => { + const accountId = ctx.accountId ?? undefined; + let account = resolveAccount(ctx.cfg as Parameters[0], accountId); + if (!account) { + const accounts = listAccountIds(ctx.cfg as Parameters[0]); + if (accounts.length > 0) { + account = resolveAccount(ctx.cfg as Parameters[0], accounts[0]); + } } - } - if (!account) throw new Error(`Unknown Rocket.Chat account: ${params.accountId}`); + if (!account) throw new Error(`Unknown Rocket.Chat account: ${ctx.accountId}`); - const entry = activeClients.get(account.accountId); - let client = entry?.client ?? null; - if (!client) { - client = new RocketChatClient({ serverUrl: account.serverUrl, auth: account.auth }); - } - const tmidOptions = params.replyToId ? { tmid: params.replyToId } : undefined; - const messageId = await client.postMessage(params.to, params.text, tmidOptions); - entry?.wakeup?.(); - return { ok: true, messageId, channel: "rocketchat" }; + const entry = activeClients.get(account.accountId); + let client = entry?.client ?? null; + if (!client) { + client = new RocketChatClient({ serverUrl: account.serverUrl, auth: account.auth }); + } + const tmidOptions = ctx.replyToId ? { tmid: ctx.replyToId } : undefined; + const messageId = await client.postMessage(ctx.to, ctx.text, tmidOptions); + entry?.wakeup?.(); + return { ok: true, messageId }; + }, }, }, - gateway: { - startAccount: startGateway, - }, -}; +}); diff --git a/src/types/types.ts b/src/types/types.ts index f00c43b..0d72e10 100644 --- a/src/types/types.ts +++ b/src/types/types.ts @@ -1,6 +1,18 @@ import type { PluginAccountConfig } from "../config.js"; export type { PluginConfig, PluginAccountConfig } from "../config.js"; +export type InboundAttachmentKind = "image" | "audio" | "document" | "video" | "unknown"; + +export type InboundAttachment = { + kind: InboundAttachmentKind; + mimeType?: string; + fileName?: string; + url?: string; + sizeBytes?: number; + source: "rocketchat-attachment" | "rocketchat-file"; + raw: unknown; +}; + export type RocketChatIdentity = { userId: string; authToken: string; @@ -34,6 +46,9 @@ export type RocketChatMessageRecord = { username?: string; name?: string; }>; + attachments?: unknown[]; + file?: unknown; + files?: unknown[]; }; export type RocketChatClientOptions = { @@ -72,18 +87,12 @@ export type InboundEvent = { senderName: string; text: string; mentions: string[]; + attachments: InboundAttachment[]; sentAt: string; raw: RocketChatMessageRecord; }; -export type OpenClawConfigLike = { - session?: { - store?: string; - }; - channels?: { - rocketchat?: unknown; - }; -}; +export type OpenClawConfigLike = OpenClawConfig; export type RoutePeer = { kind: InboundEvent["roomType"]; @@ -105,6 +114,7 @@ export type OutboundReplyPayload = { text?: string; mediaUrl?: string; mediaUrls?: string[]; + attachmentPath?: string; replyToId?: string; }; @@ -199,4 +209,21 @@ export type RCUser = { name: string; }; +export type AttachmentRecord = { + _id?: string; + title?: string; + title_link?: string; + url?: string; + image_url?: string; + video_url?: string; + audio_url?: string; + type?: string; + mimeType?: string; + mimetype?: string; + contentType?: string; + name?: string; + filename?: string; + size?: number; +}; + diff --git a/src/utils.ts b/src/utils.ts new file mode 100644 index 0000000..51d509b --- /dev/null +++ b/src/utils.ts @@ -0,0 +1,36 @@ +import { homedir } from "node:os"; +import { join } from "node:path"; +import type { JsonObject } from "./types/types.js"; + +export function resolveOpenClawDir(): string { + const explicit = process.env.OPENCLAW_STATE_DIR?.trim(); + if (explicit) return explicit; + const home = process.env.OPENCLAW_HOME?.trim(); + if (home) return join(home, ".openclaw"); + return join(homedir(), ".openclaw"); +} + +export function resolveUrl(url: string, base?: string): string { + try { return new URL(url).toString(); } catch { /* relative */ } + if (!base) return url; + try { + return new URL(url, base.endsWith("/") ? base : base + "/").toString(); + } catch { + return url; + } +} + +export function getErrorMessage(payload: JsonObject, fallback: string): string { + if (typeof payload.error === "string" && payload.error.length > 0) return payload.error; + if (typeof payload.message === "string" && payload.message.length > 0) return payload.message; + return fallback; +} + +export function getExt(name: string | undefined): string | undefined { + if (!name) return undefined; + const clean = name.trim().toLowerCase(); + const part = clean.split("?").shift()!.split("#").shift()!; + const dot = part.lastIndexOf("."); + if (dot <= 0 || dot === part.length - 1) return undefined; + return part.slice(dot + 1); +}