Skip to content

Commit 57194bd

Browse files
Add task cleanup, switch to triggering off trigger.dev
1 parent ea4ecff commit 57194bd

File tree

22 files changed

+32566
-543
lines changed

22 files changed

+32566
-543
lines changed
Lines changed: 7 additions & 296 deletions
Original file line numberDiff line numberDiff line change
@@ -1,314 +1,25 @@
1-
import { db } from '@sim/db'
2-
import {
3-
a2aAgent,
4-
knowledgeBase,
5-
mcpServers,
6-
memory,
7-
subscription,
8-
userTableDefinitions,
9-
workflow,
10-
workflowFolder,
11-
workflowMcpServer,
12-
workspace,
13-
workspaceFile,
14-
workspaceFiles,
15-
} from '@sim/db/schema'
161
import { createLogger } from '@sim/logger'
17-
import { and, eq, inArray, isNotNull, isNull, lt, or, sql } from 'drizzle-orm'
18-
import type { PgColumn, PgTable } from 'drizzle-orm/pg-core'
192
import { type NextRequest, NextResponse } from 'next/server'
203
import { verifyCronAuth } from '@/lib/auth/internal'
21-
import { sqlIsPaid, sqlIsPro, sqlIsTeam } from '@/lib/billing/plan-helpers'
22-
import { ENTITLED_SUBSCRIPTION_STATUSES } from '@/lib/billing/subscriptions/utils'
23-
import { env } from '@/lib/core/config/env'
24-
import { isUsingCloudStorage, StorageService } from '@/lib/uploads'
4+
import { getJobQueue } from '@/lib/core/async-jobs'
255

266
export const dynamic = 'force-dynamic'
277

288
const logger = createLogger('SoftDeleteCleanupAPI')
299

30-
const BATCH_SIZE = 2000
31-
const MAX_BATCHES_PER_TABLE = 10
32-
33-
interface TableCleanupResult {
34-
table: string
35-
deleted: number
36-
failed: number
37-
}
38-
39-
/**
40-
* Batch-delete rows from a table where the soft-delete column is older than the retention date,
41-
* scoped to the given workspace IDs.
42-
*/
43-
async function cleanupTable(
44-
tableDef: PgTable,
45-
softDeleteCol: PgColumn,
46-
workspaceIdCol: PgColumn,
47-
workspaceIds: string[],
48-
retentionDate: Date,
49-
tableName: string
50-
): Promise<TableCleanupResult> {
51-
const result: TableCleanupResult = { table: tableName, deleted: 0, failed: 0 }
52-
53-
if (workspaceIds.length === 0) {
54-
logger.info(`[${tableName}] Skipped — no workspaces in this tier`)
55-
return result
56-
}
57-
58-
let batchesProcessed = 0
59-
let hasMore = true
60-
61-
while (hasMore && batchesProcessed < MAX_BATCHES_PER_TABLE) {
62-
try {
63-
const deleted = await db
64-
.delete(tableDef)
65-
.where(
66-
and(
67-
inArray(workspaceIdCol, workspaceIds),
68-
isNotNull(softDeleteCol),
69-
lt(softDeleteCol, retentionDate)
70-
)
71-
)
72-
.returning({ id: sql`id` })
73-
74-
result.deleted += deleted.length
75-
hasMore = deleted.length === BATCH_SIZE
76-
batchesProcessed++
77-
78-
if (deleted.length > 0) {
79-
logger.info(`[${tableName}] Batch ${batchesProcessed}: deleted ${deleted.length} rows`)
80-
} else {
81-
logger.info(`[${tableName}] No expired soft-deleted rows found`)
82-
}
83-
} catch (error) {
84-
result.failed++
85-
logger.error(`[${tableName}] Batch delete failed:`, { error })
86-
hasMore = false
87-
}
88-
}
89-
90-
return result
91-
}
92-
93-
/**
94-
* Clean up soft-deleted workspace files from cloud storage before hard-deleting.
95-
*/
96-
async function cleanupWorkspaceFileStorage(
97-
workspaceIds: string[],
98-
retentionDate: Date
99-
): Promise<{ filesDeleted: number; filesFailed: number }> {
100-
const stats = { filesDeleted: 0, filesFailed: 0 }
101-
102-
if (!isUsingCloudStorage() || workspaceIds.length === 0) return stats
103-
104-
// Fetch keys of files about to be deleted
105-
const filesToDelete = await db
106-
.select({ key: workspaceFiles.key })
107-
.from(workspaceFiles)
108-
.where(
109-
and(
110-
inArray(workspaceFiles.workspaceId, workspaceIds),
111-
isNotNull(workspaceFiles.deletedAt),
112-
lt(workspaceFiles.deletedAt, retentionDate)
113-
)
114-
)
115-
.limit(BATCH_SIZE * MAX_BATCHES_PER_TABLE)
116-
117-
for (const file of filesToDelete) {
118-
try {
119-
await StorageService.deleteFile({ key: file.key, context: 'workspace' })
120-
stats.filesDeleted++
121-
} catch (error) {
122-
stats.filesFailed++
123-
logger.error(`Failed to delete storage file ${file.key}:`, { error })
124-
}
125-
}
126-
127-
return stats
128-
}
129-
130-
/** All tables to clean up with their soft-delete column and workspace column. */
131-
const CLEANUP_TARGETS = [
132-
{ table: workflow, softDeleteCol: workflow.archivedAt, wsCol: workflow.workspaceId, name: 'workflow' },
133-
{ table: workflowFolder, softDeleteCol: workflowFolder.archivedAt, wsCol: workflowFolder.workspaceId, name: 'workflowFolder' },
134-
{ table: knowledgeBase, softDeleteCol: knowledgeBase.deletedAt, wsCol: knowledgeBase.workspaceId, name: 'knowledgeBase' },
135-
{ table: userTableDefinitions, softDeleteCol: userTableDefinitions.archivedAt, wsCol: userTableDefinitions.workspaceId, name: 'userTableDefinitions' },
136-
{ table: workspaceFile, softDeleteCol: workspaceFile.deletedAt, wsCol: workspaceFile.workspaceId, name: 'workspaceFile' },
137-
{ table: workspaceFiles, softDeleteCol: workspaceFiles.deletedAt, wsCol: workspaceFiles.workspaceId, name: 'workspaceFiles' },
138-
{ table: memory, softDeleteCol: memory.deletedAt, wsCol: memory.workspaceId, name: 'memory' },
139-
{ table: mcpServers, softDeleteCol: mcpServers.deletedAt, wsCol: mcpServers.workspaceId, name: 'mcpServers' },
140-
{ table: workflowMcpServer, softDeleteCol: workflowMcpServer.deletedAt, wsCol: workflowMcpServer.workspaceId, name: 'workflowMcpServer' },
141-
{ table: a2aAgent, softDeleteCol: a2aAgent.archivedAt, wsCol: a2aAgent.workspaceId, name: 'a2aAgent' },
142-
] as const
143-
144-
async function cleanupTier(
145-
workspaceIds: string[],
146-
retentionDate: Date,
147-
tierLabel: string
148-
): Promise<{ tables: TableCleanupResult[]; filesDeleted: number; filesFailed: number }> {
149-
const tables: TableCleanupResult[] = []
150-
151-
if (workspaceIds.length === 0) {
152-
return { tables, filesDeleted: 0, filesFailed: 0 }
153-
}
154-
155-
// Clean cloud storage files before hard-deleting file metadata rows
156-
const fileStats = await cleanupWorkspaceFileStorage(workspaceIds, retentionDate)
157-
158-
for (const target of CLEANUP_TARGETS) {
159-
const result = await cleanupTable(
160-
target.table,
161-
target.softDeleteCol,
162-
target.wsCol,
163-
workspaceIds,
164-
retentionDate,
165-
`${tierLabel}/${target.name}`
166-
)
167-
tables.push(result)
168-
}
169-
170-
return { tables, ...fileStats }
171-
}
172-
17310
export async function GET(request: NextRequest) {
17411
try {
17512
const authError = verifyCronAuth(request, 'soft-delete cleanup')
17613
if (authError) return authError
17714

178-
const startTime = Date.now()
179-
180-
const freeRetentionDays = Number(env.FREE_PLAN_LOG_RETENTION_DAYS || '7')
181-
const paidRetentionDays = Number(env.PAID_PLAN_LOG_RETENTION_DAYS || '30')
182-
183-
const freeRetentionDate = new Date(Date.now() - freeRetentionDays * 24 * 60 * 60 * 1000)
184-
const paidRetentionDate = new Date(Date.now() - paidRetentionDays * 24 * 60 * 60 * 1000)
185-
186-
logger.info('Starting soft-delete cleanup', {
187-
freeRetentionDays,
188-
paidRetentionDays,
189-
freeRetentionDate: freeRetentionDate.toISOString(),
190-
paidRetentionDate: paidRetentionDate.toISOString(),
191-
})
192-
193-
// --- Group 1: Free workspaces ---
194-
195-
const freeWorkspaceRows = await db
196-
.select({ id: workspace.id })
197-
.from(workspace)
198-
.leftJoin(
199-
subscription,
200-
and(
201-
eq(subscription.referenceId, workspace.billedAccountUserId),
202-
inArray(subscription.status, ENTITLED_SUBSCRIPTION_STATUSES),
203-
sqlIsPaid(subscription.plan)
204-
)
205-
)
206-
.where(and(isNull(subscription.id), isNull(workspace.archivedAt)))
207-
208-
const freeIds = freeWorkspaceRows.map((w) => w.id)
209-
logger.info(`[free] Found ${freeIds.length} workspaces, retention cutoff: ${freeRetentionDate.toISOString()}`)
210-
const freeResults = await cleanupTier(freeIds, freeRetentionDate, 'free')
211-
logger.info(`[free] Result: ${freeResults.tables.reduce((s, t) => s + t.deleted, 0)} total rows deleted across ${CLEANUP_TARGETS.length} tables`)
212-
213-
// --- Group 2: Pro/Team workspaces ---
214-
215-
const paidWorkspaceRows = await db
216-
.select({ id: workspace.id })
217-
.from(workspace)
218-
.innerJoin(
219-
subscription,
220-
and(
221-
eq(subscription.referenceId, workspace.billedAccountUserId),
222-
inArray(subscription.status, ENTITLED_SUBSCRIPTION_STATUSES),
223-
or(sqlIsPro(subscription.plan)!, sqlIsTeam(subscription.plan)!)
224-
)
225-
)
226-
.where(isNull(workspace.archivedAt))
227-
228-
const paidIds = paidWorkspaceRows.map((w) => w.id)
229-
logger.info(`[paid] Found ${paidIds.length} workspaces, retention cutoff: ${paidRetentionDate.toISOString()}`)
230-
const paidResults = await cleanupTier(paidIds, paidRetentionDate, 'paid')
231-
logger.info(`[paid] Result: ${paidResults.tables.reduce((s, t) => s + t.deleted, 0)} total rows deleted across ${CLEANUP_TARGETS.length} tables`)
232-
233-
// --- Group 3: Enterprise with custom softDeleteRetentionHours ---
234-
235-
const enterpriseWorkspaceRows = await db
236-
.select({
237-
id: workspace.id,
238-
softDeleteRetentionHours: workspace.softDeleteRetentionHours,
239-
})
240-
.from(workspace)
241-
.innerJoin(
242-
subscription,
243-
and(
244-
eq(subscription.referenceId, workspace.billedAccountUserId),
245-
inArray(subscription.status, ENTITLED_SUBSCRIPTION_STATUSES),
246-
eq(subscription.plan, 'enterprise')
247-
)
248-
)
249-
.where(
250-
and(isNull(workspace.archivedAt), isNotNull(workspace.softDeleteRetentionHours))
251-
)
252-
253-
const enterpriseGroups = new Map<number, string[]>()
254-
for (const ws of enterpriseWorkspaceRows) {
255-
const hours = ws.softDeleteRetentionHours!
256-
const group = enterpriseGroups.get(hours) ?? []
257-
group.push(ws.id)
258-
enterpriseGroups.set(hours, group)
259-
}
260-
261-
logger.info(`[enterprise] Found ${enterpriseWorkspaceRows.length} workspaces with custom retention (${enterpriseGroups.size} distinct retention periods). Workspaces with NULL retention are skipped.`)
262-
263-
const enterpriseTables: TableCleanupResult[] = []
264-
let enterpriseFilesDeleted = 0
265-
let enterpriseFilesFailed = 0
266-
267-
for (const [hours, ids] of enterpriseGroups) {
268-
const retentionDate = new Date(Date.now() - hours * 60 * 60 * 1000)
269-
logger.info(`[enterprise-${hours}h] Processing ${ids.length} workspaces, retention cutoff: ${retentionDate.toISOString()}`)
270-
const groupResults = await cleanupTier(ids, retentionDate, `enterprise-${hours}h`)
271-
enterpriseTables.push(...groupResults.tables)
272-
enterpriseFilesDeleted += groupResults.filesDeleted
273-
enterpriseFilesFailed += groupResults.filesFailed
274-
}
275-
276-
const timeElapsed = (Date.now() - startTime) / 1000
15+
const jobQueue = await getJobQueue()
16+
const jobId = await jobQueue.enqueue('cleanup-soft-deletes', {})
27717

278-
const totalDeleted = (results: { tables: TableCleanupResult[] }) =>
279-
results.tables.reduce((sum, t) => sum + t.deleted, 0)
18+
logger.info('Soft-delete cleanup job dispatched', { jobId })
28019

281-
return NextResponse.json({
282-
message: `Soft-delete cleanup completed in ${timeElapsed.toFixed(2)}s`,
283-
tiers: {
284-
free: {
285-
workspaces: freeIds.length,
286-
retentionDays: freeRetentionDays,
287-
totalDeleted: totalDeleted(freeResults),
288-
filesDeleted: freeResults.filesDeleted,
289-
filesFailed: freeResults.filesFailed,
290-
tables: freeResults.tables,
291-
},
292-
paid: {
293-
workspaces: paidIds.length,
294-
retentionDays: paidRetentionDays,
295-
totalDeleted: totalDeleted(paidResults),
296-
filesDeleted: paidResults.filesDeleted,
297-
filesFailed: paidResults.filesFailed,
298-
tables: paidResults.tables,
299-
},
300-
enterprise: {
301-
workspaces: enterpriseWorkspaceRows.length,
302-
groups: enterpriseGroups.size,
303-
totalDeleted: enterpriseTables.reduce((sum, t) => sum + t.deleted, 0),
304-
filesDeleted: enterpriseFilesDeleted,
305-
filesFailed: enterpriseFilesFailed,
306-
tables: enterpriseTables,
307-
},
308-
},
309-
})
20+
return NextResponse.json({ triggered: true, jobId })
31021
} catch (error) {
311-
logger.error('Error in soft-delete cleanup:', { error })
312-
return NextResponse.json({ error: 'Failed to process soft-delete cleanup' }, { status: 500 })
22+
logger.error('Failed to dispatch soft-delete cleanup job:', { error })
23+
return NextResponse.json({ error: 'Failed to dispatch soft-delete cleanup' }, { status: 500 })
31324
}
31425
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import { createLogger } from '@sim/logger'
2+
import { type NextRequest, NextResponse } from 'next/server'
3+
import { verifyCronAuth } from '@/lib/auth/internal'
4+
import { getJobQueue } from '@/lib/core/async-jobs'
5+
6+
export const dynamic = 'force-dynamic'
7+
8+
const logger = createLogger('TaskCleanupAPI')
9+
10+
export async function GET(request: NextRequest) {
11+
try {
12+
const authError = verifyCronAuth(request, 'task cleanup')
13+
if (authError) return authError
14+
15+
const jobQueue = await getJobQueue()
16+
const jobId = await jobQueue.enqueue('cleanup-tasks', {})
17+
18+
logger.info('Task cleanup job dispatched', { jobId })
19+
20+
return NextResponse.json({ triggered: true, jobId })
21+
} catch (error) {
22+
logger.error('Failed to dispatch task cleanup job:', { error })
23+
return NextResponse.json({ error: 'Failed to dispatch task cleanup' }, { status: 500 })
24+
}
25+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { createLogger } from '@sim/logger'
2+
import { type NextRequest, NextResponse } from 'next/server'
3+
import { verifyCronAuth } from '@/lib/auth/internal'
4+
import { getJobQueue } from '@/lib/core/async-jobs'
5+
6+
export const dynamic = 'force-dynamic'
7+
8+
const logger = createLogger('TaskRedactionAPI')
9+
10+
export async function GET(request: NextRequest) {
11+
try {
12+
const authError = verifyCronAuth(request, 'task context redaction')
13+
if (authError) return authError
14+
15+
const jobQueue = await getJobQueue()
16+
const jobId = await jobQueue.enqueue('redact-task-context', {})
17+
18+
logger.info('Task context redaction job dispatched', { jobId })
19+
20+
return NextResponse.json({ triggered: true, jobId })
21+
} catch (error) {
22+
logger.error('Failed to dispatch task context redaction job:', { error })
23+
return NextResponse.json(
24+
{ error: 'Failed to dispatch task context redaction' },
25+
{ status: 500 }
26+
)
27+
}
28+
}

0 commit comments

Comments
 (0)