diff --git a/apps/sim/lib/core/utils/retry.ts b/apps/sim/lib/core/utils/retry.ts new file mode 100644 index 0000000000..a0e06b8a4d --- /dev/null +++ b/apps/sim/lib/core/utils/retry.ts @@ -0,0 +1,64 @@ +export interface RetryOptions { + maxAttempts?: number + initialDelayMs?: number + maxDelayMs?: number + backoffMultiplier?: number + jitterRatio?: number + isRetryable?: (error: unknown) => boolean + onRetry?: (args: { attempt: number; error: unknown; delayMs: number }) => void + sleepFn?: (ms: number) => Promise +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + +export async function withRetry( + operation: () => Promise, + options: RetryOptions = {} +): Promise { + const { + maxAttempts = 5, + initialDelayMs = 100, + maxDelayMs = 10_000, + backoffMultiplier = 2, + jitterRatio = 0.1, + isRetryable = () => true, + onRetry, + sleepFn = sleep, + } = options + + if (maxAttempts < 1) { + throw new Error('maxAttempts must be >= 1') + } + + let delayMs = Math.max(0, initialDelayMs) + + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + try { + return await operation() + } catch (error) { + const isLastAttempt = attempt >= maxAttempts + if (isLastAttempt) { + throw error + } + if (!isRetryable(error)) { + throw error + } + + const jitter = jitterRatio > 0 ? (Math.random() * 2 - 1) * jitterRatio * delayMs : 0 + const nextDelayMs = Math.max(0, delayMs + jitter) + const cappedDelayMs = Math.min(nextDelayMs, maxDelayMs) + + onRetry?.({ attempt, error, delayMs: cappedDelayMs }) + + if (cappedDelayMs > 0) { + await sleepFn(cappedDelayMs) + } + + delayMs = Math.min(delayMs * Math.max(1, backoffMultiplier), maxDelayMs) + } + } + + throw new Error('Retry operation failed unexpectedly') +} diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.test.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.test.ts new file mode 100644 index 0000000000..341d302a2a --- /dev/null +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.test.ts @@ -0,0 +1,140 @@ +import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest' + +vi.mock('@sim/db', () => ({ + db: { + transaction: vi.fn(), + }, +})) + +import { db } from '@sim/db' +import { + PausedExecutionNotFoundError, + PausePointNotFoundError, + PauseSnapshotNotReadyError, +} from '@/lib/workflows/executor/pause-resume-errors' + +describe('PauseResumeManager.enqueueOrStartResume', () => { + let PauseResumeManager: typeof import('@/lib/workflows/executor/human-in-the-loop-manager').PauseResumeManager + + beforeAll(async () => { + vi.stubEnv('NEXT_PUBLIC_APP_URL', 'http://localhost:3000') + ;({ PauseResumeManager } = await import('@/lib/workflows/executor/human-in-the-loop-manager')) + }) + + beforeEach(() => { + vi.useFakeTimers() + vi.clearAllMocks() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + it('retries when paused execution is not yet persisted', async () => { + vi.mocked(db.transaction) + .mockRejectedValueOnce(new PausedExecutionNotFoundError()) + .mockRejectedValueOnce(new PausedExecutionNotFoundError()) + .mockResolvedValueOnce({ + status: 'queued', + resumeExecutionId: 'exec-1', + queuePosition: 1, + } as any) + + const promise = PauseResumeManager.enqueueOrStartResume({ + executionId: 'exec-1', + contextId: 'ctx-1', + resumeInput: { ok: true }, + userId: 'user-1', + }) + + await vi.runAllTimersAsync() + + await expect(promise).resolves.toMatchObject({ + status: 'queued', + resumeExecutionId: 'exec-1', + queuePosition: 1, + }) + expect(db.transaction).toHaveBeenCalledTimes(3) + }) + + it('retries when snapshot is not ready yet', async () => { + vi.mocked(db.transaction) + .mockRejectedValueOnce(new PauseSnapshotNotReadyError()) + .mockResolvedValueOnce({ + status: 'queued', + resumeExecutionId: 'exec-2', + queuePosition: 1, + } as any) + + const promise = PauseResumeManager.enqueueOrStartResume({ + executionId: 'exec-2', + contextId: 'ctx-2', + resumeInput: null, + userId: 'user-2', + }) + + await vi.runAllTimersAsync() + + await expect(promise).resolves.toMatchObject({ + status: 'queued', + resumeExecutionId: 'exec-2', + }) + expect(db.transaction).toHaveBeenCalledTimes(2) + }) + + it('does not retry non-transient errors', async () => { + vi.mocked(db.transaction).mockRejectedValueOnce(new PausePointNotFoundError()) + + const promise = PauseResumeManager.enqueueOrStartResume({ + executionId: 'exec-3', + contextId: 'ctx-3', + resumeInput: null, + userId: 'user-3', + }) + + await expect(promise).rejects.toThrow('Pause point not found for execution') + expect(db.transaction).toHaveBeenCalledTimes(1) + }) + + it('stops retrying after max attempts', async () => { + vi.mocked(db.transaction).mockRejectedValue(new PausedExecutionNotFoundError()) + + const promise = PauseResumeManager.enqueueOrStartResume({ + executionId: 'exec-4', + contextId: 'ctx-4', + resumeInput: null, + userId: 'user-4', + }) + + const assertion = expect(promise).rejects.toThrow(PausedExecutionNotFoundError) + await vi.runAllTimersAsync() + await assertion + expect(db.transaction).toHaveBeenCalledTimes(8) + }) + + it('retries across transient failures until success', async () => { + vi.mocked(db.transaction) + .mockRejectedValueOnce(new PausedExecutionNotFoundError()) + .mockRejectedValueOnce(new PauseSnapshotNotReadyError()) + .mockResolvedValueOnce({ + status: 'queued', + resumeExecutionId: 'exec-5', + queuePosition: 1, + } as any) + + const promise = PauseResumeManager.enqueueOrStartResume({ + executionId: 'exec-5', + contextId: 'ctx-5', + resumeInput: null, + userId: 'user-5', + }) + + await vi.runAllTimersAsync() + + await expect(promise).resolves.toMatchObject({ + status: 'queued', + resumeExecutionId: 'exec-5', + }) + expect(db.transaction).toHaveBeenCalledTimes(3) + }) +}) diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts index 936f7cd298..70720bee98 100644 --- a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts @@ -4,15 +4,26 @@ import { pausedExecutions, resumeQueue, workflowExecutionLogs } from '@sim/db/sc import { createLogger } from '@sim/logger' import { and, asc, desc, eq, inArray, lt, type SQL, sql } from 'drizzle-orm' import type { Edge } from 'reactflow' +import { withRetry } from '@/lib/core/utils/retry' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' +import { + PausedExecutionNotFoundError, + PausePointNotFoundError, + PausePointNotPausedError, + PauseSnapshotNotReadyError, +} from '@/lib/workflows/executor/pause-resume-errors' import { ExecutionSnapshot } from '@/executor/execution/snapshot' import type { ExecutionResult, PausePoint, SerializedSnapshot } from '@/executor/types' import type { SerializedConnection } from '@/serializer/types' const logger = createLogger('HumanInTheLoopManager') +const _RESUME_LOOKUP_MAX_ATTEMPTS = 8 +const _RESUME_LOOKUP_INITIAL_DELAY_MS = 25 +const _RESUME_LOOKUP_MAX_DELAY_MS = 400 + interface ResumeQueueEntrySummary { id: string pausedExecutionId: string @@ -162,120 +173,144 @@ export class PauseResumeManager { static async enqueueOrStartResume(args: EnqueueResumeArgs): Promise { const { executionId, contextId, resumeInput, userId } = args - return await db.transaction(async (tx) => { - const pausedExecution = await tx - .select() - .from(pausedExecutions) - .where(eq(pausedExecutions.executionId, executionId)) - .for('update') - .limit(1) - .then((rows) => rows[0]) + return withRetry( + () => + db.transaction(async (tx) => { + const pausedExecution = await tx + .select() + .from(pausedExecutions) + .where(eq(pausedExecutions.executionId, executionId)) + .for('update') + .limit(1) + .then((rows) => rows[0]) + + if (!pausedExecution) { + throw new PausedExecutionNotFoundError() + } - if (!pausedExecution) { - throw new Error('Paused execution not found or already resumed') - } + const pausePoints = pausedExecution.pausePoints as Record + const pausePoint = pausePoints?.[contextId] + if (!pausePoint) { + throw new PausePointNotFoundError() + } + if (pausePoint.resumeStatus !== 'paused') { + throw new PausePointNotPausedError() + } + if (!pausePoint.snapshotReady) { + throw new PauseSnapshotNotReadyError() + } - const pausePoints = pausedExecution.pausePoints as Record - const pausePoint = pausePoints?.[contextId] - if (!pausePoint) { - throw new Error('Pause point not found for execution') - } - if (pausePoint.resumeStatus !== 'paused') { - throw new Error('Pause point already resumed or in progress') - } - if (!pausePoint.snapshotReady) { - throw new Error('Snapshot not ready; execution still finalizing pause') - } + const activeResume = await tx + .select({ id: resumeQueue.id }) + .from(resumeQueue) + .where( + and( + eq(resumeQueue.parentExecutionId, executionId), + inArray(resumeQueue.status, ['claimed'] as const) + ) + ) + .limit(1) + .then((rows) => rows[0]) + + const resumeExecutionId = executionId + const now = new Date() + + if (activeResume) { + const [entry] = await tx + .insert(resumeQueue) + .values({ + id: randomUUID(), + pausedExecutionId: pausedExecution.id, + parentExecutionId: executionId, + newExecutionId: resumeExecutionId, + contextId, + resumeInput: resumeInput ?? null, + status: 'pending', + queuedAt: now, + }) + .returning({ id: resumeQueue.id, queuedAt: resumeQueue.queuedAt }) - const activeResume = await tx - .select({ id: resumeQueue.id }) - .from(resumeQueue) - .where( - and( - eq(resumeQueue.parentExecutionId, executionId), - inArray(resumeQueue.status, ['claimed'] as const) - ) - ) - .limit(1) - .then((rows) => rows[0]) + await tx + .update(pausedExecutions) + .set({ + pausePoints: sql`jsonb_set(pause_points, ARRAY[${contextId}, 'resumeStatus'], '"queued"'::jsonb)`, + }) + .where(eq(pausedExecutions.id, pausedExecution.id)) + + pausePoint.resumeStatus = 'queued' + + const [positionRow = { position: 0 }] = await tx + .select({ position: sql`count(*)` }) + .from(resumeQueue) + .where( + and( + eq(resumeQueue.parentExecutionId, executionId), + eq(resumeQueue.status, 'pending'), + lt(resumeQueue.queuedAt, entry.queuedAt) + ) + ) - const resumeExecutionId = executionId - const now = new Date() + return { + status: 'queued', + resumeExecutionId, + queuePosition: Number(positionRow.position ?? 0) + 1, + } + } - if (activeResume) { - const [entry] = await tx - .insert(resumeQueue) - .values({ - id: randomUUID(), + const resumeEntryId = randomUUID() + await tx.insert(resumeQueue).values({ + id: resumeEntryId, pausedExecutionId: pausedExecution.id, parentExecutionId: executionId, newExecutionId: resumeExecutionId, contextId, resumeInput: resumeInput ?? null, - status: 'pending', + status: 'claimed', queuedAt: now, + claimedAt: now, }) - .returning({ id: resumeQueue.id, queuedAt: resumeQueue.queuedAt }) - - await tx - .update(pausedExecutions) - .set({ - pausePoints: sql`jsonb_set(pause_points, ARRAY[${contextId}, 'resumeStatus'], '"queued"'::jsonb)`, - }) - .where(eq(pausedExecutions.id, pausedExecution.id)) - - pausePoint.resumeStatus = 'queued' - - const [positionRow = { position: 0 }] = await tx - .select({ position: sql`count(*)` }) - .from(resumeQueue) - .where( - and( - eq(resumeQueue.parentExecutionId, executionId), - eq(resumeQueue.status, 'pending'), - lt(resumeQueue.queuedAt, entry.queuedAt) - ) - ) - - return { - status: 'queued', - resumeExecutionId, - queuePosition: Number(positionRow.position ?? 0) + 1, - } - } - - const resumeEntryId = randomUUID() - await tx.insert(resumeQueue).values({ - id: resumeEntryId, - pausedExecutionId: pausedExecution.id, - parentExecutionId: executionId, - newExecutionId: resumeExecutionId, - contextId, - resumeInput: resumeInput ?? null, - status: 'claimed', - queuedAt: now, - claimedAt: now, - }) - await tx - .update(pausedExecutions) - .set({ - pausePoints: sql`jsonb_set(pause_points, ARRAY[${contextId}, 'resumeStatus'], '"resuming"'::jsonb)`, - }) - .where(eq(pausedExecutions.id, pausedExecution.id)) + await tx + .update(pausedExecutions) + .set({ + pausePoints: sql`jsonb_set(pause_points, ARRAY[${contextId}, 'resumeStatus'], '"resuming"'::jsonb)`, + }) + .where(eq(pausedExecutions.id, pausedExecution.id)) - pausePoint.resumeStatus = 'resuming' + pausePoint.resumeStatus = 'resuming' - return { - status: 'starting', - resumeExecutionId, - resumeEntryId, - pausedExecution, - contextId, - resumeInput, - userId, + return { + status: 'starting', + resumeExecutionId, + resumeEntryId, + pausedExecution, + contextId, + resumeInput, + userId, + } + }), + { + maxAttempts: _RESUME_LOOKUP_MAX_ATTEMPTS, + initialDelayMs: _RESUME_LOOKUP_INITIAL_DELAY_MS, + maxDelayMs: _RESUME_LOOKUP_MAX_DELAY_MS, + backoffMultiplier: 2, + jitterRatio: 0, + isRetryable: (error) => + error instanceof PausedExecutionNotFoundError || + error instanceof PauseSnapshotNotReadyError, + onRetry: ({ attempt, error, delayMs }) => { + logger.warn( + `Transient resume lookup failure; retrying (attempt ${attempt}/${_RESUME_LOOKUP_MAX_ATTEMPTS})`, + { + executionId, + contextId, + delayMs, + error: error instanceof Error ? error.message : String(error), + } + ) + }, } - }) + ) } static async startResumeExecution(args: StartResumeExecutionArgs): Promise { diff --git a/apps/sim/lib/workflows/executor/pause-resume-errors.ts b/apps/sim/lib/workflows/executor/pause-resume-errors.ts new file mode 100644 index 0000000000..894aa73f09 --- /dev/null +++ b/apps/sim/lib/workflows/executor/pause-resume-errors.ts @@ -0,0 +1,27 @@ +export class PausedExecutionNotFoundError extends Error { + constructor(message = 'Paused execution not found or already resumed') { + super(message) + this.name = 'PausedExecutionNotFoundError' + } +} + +export class PauseSnapshotNotReadyError extends Error { + constructor(message = 'Snapshot not ready; execution still finalizing pause') { + super(message) + this.name = 'PauseSnapshotNotReadyError' + } +} + +export class PausePointNotFoundError extends Error { + constructor(message = 'Pause point not found for execution') { + super(message) + this.name = 'PausePointNotFoundError' + } +} + +export class PausePointNotPausedError extends Error { + constructor(message = 'Pause point already resumed or in progress') { + super(message) + this.name = 'PausePointNotPausedError' + } +}