Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions apps/sim/lib/core/utils/retry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
export interface RetryOptions {
maxAttempts?: number
initialDelayMs?: number
maxDelayMs?: number
backoffMultiplier?: number
jitterRatio?: number
isRetryable?: (error: unknown) => boolean
onRetry?: (args: { attempt: number; error: unknown; delayMs: number }) => void
sleepFn?: (ms: number) => Promise<void>
}

function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms))
}

export async function withRetry<T>(
operation: () => Promise<T>,
options: RetryOptions = {}
): Promise<T> {
const {
maxAttempts = 5,
initialDelayMs = 100,
maxDelayMs = 10_000,
backoffMultiplier = 2,
jitterRatio = 0.1,
isRetryable = () => true,
onRetry,
sleepFn = sleep,
} = options

if (maxAttempts < 1) {
throw new Error('maxAttempts must be >= 1')
}

let delayMs = Math.max(0, initialDelayMs)

for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return await operation()
} catch (error) {
const isLastAttempt = attempt >= maxAttempts
if (isLastAttempt) {
throw error
}
if (!isRetryable(error)) {
throw error
}

const jitter = jitterRatio > 0 ? (Math.random() * 2 - 1) * jitterRatio * delayMs : 0
const nextDelayMs = Math.max(0, delayMs + jitter)
const cappedDelayMs = Math.min(nextDelayMs, maxDelayMs)

onRetry?.({ attempt, error, delayMs: cappedDelayMs })

if (cappedDelayMs > 0) {
await sleepFn(cappedDelayMs)
}

delayMs = Math.min(delayMs * Math.max(1, backoffMultiplier), maxDelayMs)
}
}

throw new Error('Retry operation failed unexpectedly')
}
140 changes: 140 additions & 0 deletions apps/sim/lib/workflows/executor/human-in-the-loop-manager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'

vi.mock('@sim/db', () => ({
db: {
transaction: vi.fn(),
},
}))

import { db } from '@sim/db'
import {
PausedExecutionNotFoundError,
PausePointNotFoundError,
PauseSnapshotNotReadyError,
} from '@/lib/workflows/executor/pause-resume-errors'

describe('PauseResumeManager.enqueueOrStartResume', () => {
let PauseResumeManager: typeof import('@/lib/workflows/executor/human-in-the-loop-manager').PauseResumeManager

beforeAll(async () => {
vi.stubEnv('NEXT_PUBLIC_APP_URL', 'http://localhost:3000')
;({ PauseResumeManager } = await import('@/lib/workflows/executor/human-in-the-loop-manager'))
})

beforeEach(() => {
vi.useFakeTimers()
vi.clearAllMocks()
})

afterEach(() => {
vi.useRealTimers()
})

it('retries when paused execution is not yet persisted', async () => {
vi.mocked(db.transaction)
.mockRejectedValueOnce(new PausedExecutionNotFoundError())
.mockRejectedValueOnce(new PausedExecutionNotFoundError())
.mockResolvedValueOnce({
status: 'queued',
resumeExecutionId: 'exec-1',
queuePosition: 1,
} as any)

const promise = PauseResumeManager.enqueueOrStartResume({
executionId: 'exec-1',
contextId: 'ctx-1',
resumeInput: { ok: true },
userId: 'user-1',
})

await vi.runAllTimersAsync()

await expect(promise).resolves.toMatchObject({
status: 'queued',
resumeExecutionId: 'exec-1',
queuePosition: 1,
})
expect(db.transaction).toHaveBeenCalledTimes(3)
})

it('retries when snapshot is not ready yet', async () => {
vi.mocked(db.transaction)
.mockRejectedValueOnce(new PauseSnapshotNotReadyError())
.mockResolvedValueOnce({
status: 'queued',
resumeExecutionId: 'exec-2',
queuePosition: 1,
} as any)

const promise = PauseResumeManager.enqueueOrStartResume({
executionId: 'exec-2',
contextId: 'ctx-2',
resumeInput: null,
userId: 'user-2',
})

await vi.runAllTimersAsync()

await expect(promise).resolves.toMatchObject({
status: 'queued',
resumeExecutionId: 'exec-2',
})
expect(db.transaction).toHaveBeenCalledTimes(2)
})

it('does not retry non-transient errors', async () => {
vi.mocked(db.transaction).mockRejectedValueOnce(new PausePointNotFoundError())

const promise = PauseResumeManager.enqueueOrStartResume({
executionId: 'exec-3',
contextId: 'ctx-3',
resumeInput: null,
userId: 'user-3',
})

await expect(promise).rejects.toThrow('Pause point not found for execution')
expect(db.transaction).toHaveBeenCalledTimes(1)
})

it('stops retrying after max attempts', async () => {
vi.mocked(db.transaction).mockRejectedValue(new PausedExecutionNotFoundError())

const promise = PauseResumeManager.enqueueOrStartResume({
executionId: 'exec-4',
contextId: 'ctx-4',
resumeInput: null,
userId: 'user-4',
})

const assertion = expect(promise).rejects.toThrow(PausedExecutionNotFoundError)
await vi.runAllTimersAsync()
await assertion
expect(db.transaction).toHaveBeenCalledTimes(8)
})

it('retries across transient failures until success', async () => {
vi.mocked(db.transaction)
.mockRejectedValueOnce(new PausedExecutionNotFoundError())
.mockRejectedValueOnce(new PauseSnapshotNotReadyError())
.mockResolvedValueOnce({
status: 'queued',
resumeExecutionId: 'exec-5',
queuePosition: 1,
} as any)

const promise = PauseResumeManager.enqueueOrStartResume({
executionId: 'exec-5',
contextId: 'ctx-5',
resumeInput: null,
userId: 'user-5',
})

await vi.runAllTimersAsync()

await expect(promise).resolves.toMatchObject({
status: 'queued',
resumeExecutionId: 'exec-5',
})
expect(db.transaction).toHaveBeenCalledTimes(3)
})
})
Loading