diff --git a/apps/sim/lib/core/config/redis.test.ts b/apps/sim/lib/core/config/redis.test.ts new file mode 100644 index 0000000000..7c740e2ec4 --- /dev/null +++ b/apps/sim/lib/core/config/redis.test.ts @@ -0,0 +1,176 @@ +import { createEnvMock, createMockRedis, loggerMock } from '@sim/testing' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +const mockRedisInstance = createMockRedis() + +vi.mock('@sim/logger', () => loggerMock) +vi.mock('@/lib/core/config/env', () => createEnvMock({ REDIS_URL: 'redis://localhost:6379' })) +vi.mock('ioredis', () => ({ + default: vi.fn(() => mockRedisInstance), +})) + +describe('redis config', () => { + beforeEach(() => { + vi.clearAllMocks() + vi.useFakeTimers() + }) + + afterEach(() => { + vi.useRealTimers() + vi.resetModules() + }) + + describe('onRedisReconnect', () => { + it('should register and invoke reconnect listeners', async () => { + const { onRedisReconnect, getRedisClient } = await import('./redis') + const listener = vi.fn() + onRedisReconnect(listener) + + getRedisClient() + + mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT')) + await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(30_000) + + expect(listener).toHaveBeenCalledTimes(1) + }) + + it('should not invoke listeners when PINGs succeed', async () => { + const { onRedisReconnect, getRedisClient } = await import('./redis') + const listener = vi.fn() + onRedisReconnect(listener) + + getRedisClient() + mockRedisInstance.ping.mockResolvedValue('PONG') + + await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(30_000) + + expect(listener).not.toHaveBeenCalled() + }) + + it('should reset failure count on successful PING', async () => { + const { onRedisReconnect, getRedisClient } = await import('./redis') + const listener = vi.fn() + onRedisReconnect(listener) + + getRedisClient() + + // 2 failures then a success — should reset counter + mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout')) + await vi.advanceTimersByTimeAsync(30_000) + mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout')) + await vi.advanceTimersByTimeAsync(30_000) + mockRedisInstance.ping.mockResolvedValueOnce('PONG') + await vi.advanceTimersByTimeAsync(30_000) + + // 2 more failures — should NOT trigger reconnect (counter was reset) + mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout')) + await vi.advanceTimersByTimeAsync(30_000) + mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout')) + await vi.advanceTimersByTimeAsync(30_000) + + expect(listener).not.toHaveBeenCalled() + }) + + it('should call disconnect(true) after 3 consecutive PING failures', async () => { + const { getRedisClient } = await import('./redis') + getRedisClient() + + mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT')) + await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(30_000) + + expect(mockRedisInstance.disconnect).not.toHaveBeenCalled() + + await vi.advanceTimersByTimeAsync(30_000) + expect(mockRedisInstance.disconnect).toHaveBeenCalledWith(true) + }) + + it('should handle listener errors gracefully without breaking health check', async () => { + const { onRedisReconnect, getRedisClient } = await import('./redis') + const badListener = vi.fn(() => { + throw new Error('listener crashed') + }) + const goodListener = vi.fn() + onRedisReconnect(badListener) + onRedisReconnect(goodListener) + + getRedisClient() + mockRedisInstance.ping.mockRejectedValue(new Error('timeout')) + await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(30_000) + + expect(badListener).toHaveBeenCalledTimes(1) + expect(goodListener).toHaveBeenCalledTimes(1) + }) + }) + + describe('closeRedisConnection', () => { + it('should clear the PING interval', async () => { + const { getRedisClient, closeRedisConnection } = await import('./redis') + getRedisClient() + + mockRedisInstance.quit.mockResolvedValue('OK') + await closeRedisConnection() + + // After closing, PING failures should not trigger disconnect + mockRedisInstance.ping.mockRejectedValue(new Error('timeout')) + await vi.advanceTimersByTimeAsync(30_000 * 5) + expect(mockRedisInstance.disconnect).not.toHaveBeenCalled() + }) + }) + + describe('retryStrategy', () => { + async function captureRetryStrategy(): Promise<(times: number) => number> { + vi.resetModules() + + vi.doMock('@sim/logger', () => loggerMock) + vi.doMock('@/lib/core/config/env', () => + createEnvMock({ REDIS_URL: 'redis://localhost:6379' }) + ) + + let capturedConfig: Record = {} + vi.doMock('ioredis', () => ({ + default: vi.fn((_url: string, config: Record) => { + capturedConfig = config + return { ping: vi.fn(), on: vi.fn() } + }), + })) + + const { getRedisClient } = await import('./redis') + getRedisClient() + + return capturedConfig.retryStrategy as (times: number) => number + } + + it('should use exponential backoff with jitter', async () => { + const retryStrategy = await captureRetryStrategy() + expect(retryStrategy).toBeDefined() + + // Base for attempt 1: min(1000 * 2^0, 10000) = 1000, jitter up to 300 + const delay1 = retryStrategy(1) + expect(delay1).toBeGreaterThanOrEqual(1000) + expect(delay1).toBeLessThanOrEqual(1300) + + // Base for attempt 3: min(1000 * 2^2, 10000) = 4000, jitter up to 1200 + const delay3 = retryStrategy(3) + expect(delay3).toBeGreaterThanOrEqual(4000) + expect(delay3).toBeLessThanOrEqual(5200) + + // Base for attempt 5: min(1000 * 2^4, 10000) = 10000, jitter up to 3000 + const delay5 = retryStrategy(5) + expect(delay5).toBeGreaterThanOrEqual(10000) + expect(delay5).toBeLessThanOrEqual(13000) + }) + + it('should cap at 30s for attempts beyond 10', async () => { + const retryStrategy = await captureRetryStrategy() + expect(retryStrategy(11)).toBe(30000) + expect(retryStrategy(100)).toBe(30000) + }) + }) +}) diff --git a/apps/sim/lib/core/config/redis.ts b/apps/sim/lib/core/config/redis.ts index ede72eaea9..4db71b49b2 100644 --- a/apps/sim/lib/core/config/redis.ts +++ b/apps/sim/lib/core/config/redis.ts @@ -7,6 +7,63 @@ const logger = createLogger('Redis') const redisUrl = env.REDIS_URL let globalRedisClient: Redis | null = null +let pingFailures = 0 +let pingInterval: NodeJS.Timeout | null = null +let pingInFlight = false + +const PING_INTERVAL_MS = 30_000 +const MAX_PING_FAILURES = 3 + +/** Callbacks invoked when the PING health check forces a reconnect. */ +const reconnectListeners: Array<() => void> = [] + +/** + * Register a callback that fires when the PING health check forces a reconnect. + * Useful for resetting cached adapters that hold a stale Redis reference. + */ +export function onRedisReconnect(cb: () => void): void { + reconnectListeners.push(cb) +} + +function startPingHealthCheck(redis: Redis): void { + if (pingInterval) return + + pingInterval = setInterval(async () => { + if (pingInFlight) return + pingInFlight = true + try { + await redis.ping() + pingFailures = 0 + } catch (error) { + pingFailures++ + logger.warn('Redis PING failed', { + consecutiveFailures: pingFailures, + error: error instanceof Error ? error.message : String(error), + }) + + if (pingFailures >= MAX_PING_FAILURES) { + logger.error('Redis PING failed 3 consecutive times — forcing reconnect', { + consecutiveFailures: pingFailures, + }) + pingFailures = 0 + for (const cb of reconnectListeners) { + try { + cb() + } catch (cbError) { + logger.error('Redis reconnect listener error', { error: cbError }) + } + } + try { + redis.disconnect(true) + } catch (disconnectError) { + logger.error('Error during forced Redis disconnect', { error: disconnectError }) + } + } + } finally { + pingInFlight = false + } + }, PING_INTERVAL_MS) +} /** * Get a Redis client instance. @@ -35,8 +92,10 @@ export function getRedisClient(): Redis | null { logger.error(`Redis reconnection attempt ${times}`, { nextRetryMs: 30000 }) return 30000 } - const delay = Math.min(times * 500, 5000) - logger.warn(`Redis reconnecting`, { attempt: times, nextRetryMs: delay }) + const base = Math.min(1000 * 2 ** (times - 1), 10000) + const jitter = Math.random() * base * 0.3 + const delay = Math.round(base + jitter) + logger.warn('Redis reconnecting', { attempt: times, nextRetryMs: delay }) return delay }, @@ -54,6 +113,8 @@ export function getRedisClient(): Redis | null { globalRedisClient.on('close', () => logger.warn('Redis connection closed')) globalRedisClient.on('end', () => logger.error('Redis connection ended')) + startPingHealthCheck(globalRedisClient) + return globalRedisClient } catch (error) { logger.error('Failed to initialize Redis client', { error }) @@ -118,6 +179,11 @@ export async function releaseLock(lockKey: string, value: string): Promise { + if (pingInterval) { + clearInterval(pingInterval) + pingInterval = null + } + if (globalRedisClient) { try { await globalRedisClient.quit() diff --git a/apps/sim/lib/core/rate-limiter/rate-limiter.test.ts b/apps/sim/lib/core/rate-limiter/rate-limiter.test.ts index 6aaf4ef332..658febd7d6 100644 --- a/apps/sim/lib/core/rate-limiter/rate-limiter.test.ts +++ b/apps/sim/lib/core/rate-limiter/rate-limiter.test.ts @@ -172,7 +172,7 @@ describe('RateLimiter', () => { ) }) - it('should deny on storage error (fail closed)', async () => { + it('should allow on storage error (fail open)', async () => { mockAdapter.consumeTokens.mockRejectedValue(new Error('Storage error')) const result = await rateLimiter.checkRateLimitWithSubscription( @@ -182,8 +182,8 @@ describe('RateLimiter', () => { false ) - expect(result.allowed).toBe(false) - expect(result.remaining).toBe(0) + expect(result.allowed).toBe(true) + expect(result.remaining).toBe(1) }) it('should work for all non-manual trigger types', async () => { diff --git a/apps/sim/lib/core/rate-limiter/rate-limiter.ts b/apps/sim/lib/core/rate-limiter/rate-limiter.ts index 53711429f8..a48c33a0ab 100644 --- a/apps/sim/lib/core/rate-limiter/rate-limiter.ts +++ b/apps/sim/lib/core/rate-limiter/rate-limiter.ts @@ -100,17 +100,16 @@ export class RateLimiter { retryAfterMs: result.retryAfterMs, } } catch (error) { - logger.error('Rate limit storage error - failing closed (denying request)', { + logger.error('Rate limit storage error - failing open (allowing request)', { error: error instanceof Error ? error.message : String(error), userId, triggerType, isAsync, }) return { - allowed: false, - remaining: 0, + allowed: true, + remaining: 1, resetAt: new Date(Date.now() + RATE_LIMIT_WINDOW_MS), - retryAfterMs: RATE_LIMIT_WINDOW_MS, } } } diff --git a/apps/sim/lib/core/rate-limiter/storage/factory.test.ts b/apps/sim/lib/core/rate-limiter/storage/factory.test.ts new file mode 100644 index 0000000000..58098b377e --- /dev/null +++ b/apps/sim/lib/core/rate-limiter/storage/factory.test.ts @@ -0,0 +1,129 @@ +import { loggerMock } from '@sim/testing' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +vi.mock('@sim/logger', () => loggerMock) + +const reconnectCallbacks: Array<() => void> = [] + +vi.mock('@/lib/core/config/redis', () => ({ + getRedisClient: vi.fn(() => null), + onRedisReconnect: vi.fn((cb: () => void) => { + reconnectCallbacks.push(cb) + }), +})) + +vi.mock('@/lib/core/storage', () => ({ + getStorageMethod: vi.fn(() => 'db'), +})) + +vi.mock('./db-token-bucket', () => ({ + DbTokenBucket: vi.fn(() => ({ type: 'db' })), +})) + +vi.mock('./redis-token-bucket', () => ({ + RedisTokenBucket: vi.fn(() => ({ type: 'redis' })), +})) + +describe('rate limit storage factory', () => { + beforeEach(() => { + vi.clearAllMocks() + reconnectCallbacks.length = 0 + }) + + afterEach(() => { + vi.resetModules() + }) + + it('should fall back to DbTokenBucket when Redis is configured but client unavailable', async () => { + const { getStorageMethod } = await import('@/lib/core/storage') + vi.mocked(getStorageMethod).mockReturnValue('redis') + + const { getRedisClient } = await import('@/lib/core/config/redis') + vi.mocked(getRedisClient).mockReturnValue(null) + + const { createStorageAdapter, resetStorageAdapter } = await import('./factory') + resetStorageAdapter() + + const adapter = createStorageAdapter() + expect(adapter).toEqual({ type: 'db' }) + }) + + it('should use RedisTokenBucket when Redis client is available', async () => { + const { getStorageMethod } = await import('@/lib/core/storage') + vi.mocked(getStorageMethod).mockReturnValue('redis') + + const { getRedisClient } = await import('@/lib/core/config/redis') + vi.mocked(getRedisClient).mockReturnValue({ ping: vi.fn() } as never) + + const { createStorageAdapter, resetStorageAdapter } = await import('./factory') + resetStorageAdapter() + + const adapter = createStorageAdapter() + expect(adapter).toEqual({ type: 'redis' }) + }) + + it('should use DbTokenBucket when storage method is db', async () => { + const { getStorageMethod } = await import('@/lib/core/storage') + vi.mocked(getStorageMethod).mockReturnValue('db') + + const { createStorageAdapter, resetStorageAdapter } = await import('./factory') + resetStorageAdapter() + + const adapter = createStorageAdapter() + expect(adapter).toEqual({ type: 'db' }) + }) + + it('should cache the adapter and return same instance', async () => { + const { getStorageMethod } = await import('@/lib/core/storage') + vi.mocked(getStorageMethod).mockReturnValue('db') + + const { createStorageAdapter, resetStorageAdapter } = await import('./factory') + resetStorageAdapter() + + const adapter1 = createStorageAdapter() + const adapter2 = createStorageAdapter() + expect(adapter1).toBe(adapter2) + }) + + it('should register a reconnect listener that resets cached adapter', async () => { + const { getStorageMethod } = await import('@/lib/core/storage') + vi.mocked(getStorageMethod).mockReturnValue('db') + + const { createStorageAdapter, resetStorageAdapter } = await import('./factory') + resetStorageAdapter() + + const adapter1 = createStorageAdapter() + + // Simulate Redis reconnect — should reset cached adapter + expect(reconnectCallbacks.length).toBeGreaterThan(0) + reconnectCallbacks[0]() + + // Next call should create a fresh adapter + const adapter2 = createStorageAdapter() + expect(adapter2).not.toBe(adapter1) + }) + + it('should re-evaluate storage on next call after reconnect resets cache', async () => { + const { getStorageMethod } = await import('@/lib/core/storage') + const { getRedisClient } = await import('@/lib/core/config/redis') + + // Start with Redis unavailable — falls back to DB + vi.mocked(getStorageMethod).mockReturnValue('redis') + vi.mocked(getRedisClient).mockReturnValue(null) + + const { createStorageAdapter, resetStorageAdapter } = await import('./factory') + resetStorageAdapter() + + const adapter1 = createStorageAdapter() + expect(adapter1).toEqual({ type: 'db' }) + + // Simulate reconnect + reconnectCallbacks[0]() + + // Now Redis is available + vi.mocked(getRedisClient).mockReturnValue({ ping: vi.fn() } as never) + + const adapter2 = createStorageAdapter() + expect(adapter2).toEqual({ type: 'redis' }) + }) +}) diff --git a/apps/sim/lib/core/rate-limiter/storage/factory.ts b/apps/sim/lib/core/rate-limiter/storage/factory.ts index ff6b9961c1..948e51ad90 100644 --- a/apps/sim/lib/core/rate-limiter/storage/factory.ts +++ b/apps/sim/lib/core/rate-limiter/storage/factory.ts @@ -1,5 +1,5 @@ import { createLogger } from '@sim/logger' -import { getRedisClient } from '@/lib/core/config/redis' +import { getRedisClient, onRedisReconnect } from '@/lib/core/config/redis' import { getStorageMethod, type StorageMethod } from '@/lib/core/storage' import type { RateLimitStorageAdapter } from './adapter' import { DbTokenBucket } from './db-token-bucket' @@ -8,21 +8,33 @@ import { RedisTokenBucket } from './redis-token-bucket' const logger = createLogger('RateLimitStorage') let cachedAdapter: RateLimitStorageAdapter | null = null +let reconnectListenerRegistered = false export function createStorageAdapter(): RateLimitStorageAdapter { if (cachedAdapter) { return cachedAdapter } + if (!reconnectListenerRegistered) { + onRedisReconnect(() => { + cachedAdapter = null + }) + reconnectListenerRegistered = true + } + const storageMethod = getStorageMethod() if (storageMethod === 'redis') { const redis = getRedisClient() if (!redis) { - throw new Error('Redis configured but client unavailable') + logger.warn( + 'Redis configured but client unavailable - falling back to PostgreSQL for rate limiting' + ) + cachedAdapter = new DbTokenBucket() + } else { + logger.info('Rate limiting: Using Redis') + cachedAdapter = new RedisTokenBucket(redis) } - logger.info('Rate limiting: Using Redis') - cachedAdapter = new RedisTokenBucket(redis) } else { logger.info('Rate limiting: Using PostgreSQL') cachedAdapter = new DbTokenBucket() diff --git a/apps/sim/lib/execution/isolated-vm.test.ts b/apps/sim/lib/execution/isolated-vm.test.ts index 17fb20c0d7..0a7059dfb3 100644 --- a/apps/sim/lib/execution/isolated-vm.test.ts +++ b/apps/sim/lib/execution/isolated-vm.test.ts @@ -1,4 +1,5 @@ import { EventEmitter } from 'node:events' +import { createEnvMock, loggerMock } from '@sim/testing' import { afterEach, describe, expect, it, vi } from 'vitest' type MockProc = EventEmitter & { @@ -130,13 +131,7 @@ async function loadExecutionModule(options: { return next() as any }) - vi.doMock('@sim/logger', () => ({ - createLogger: () => ({ - info: vi.fn(), - warn: vi.fn(), - error: vi.fn(), - }), - })) + vi.doMock('@sim/logger', () => loggerMock) const secureFetchMock = vi.fn( options.secureFetchImpl ?? @@ -154,8 +149,12 @@ async function loadExecutionModule(options: { secureFetchWithValidation: secureFetchMock, })) - vi.doMock('@/lib/core/config/env', () => ({ - env: { + vi.doMock('@/lib/core/utils/logging', () => ({ + sanitizeUrlForLog: vi.fn((url: string) => url), + })) + + vi.doMock('@/lib/core/config/env', () => + createEnvMock({ IVM_POOL_SIZE: '1', IVM_MAX_CONCURRENT: '100', IVM_MAX_PER_WORKER: '100', @@ -168,8 +167,8 @@ async function loadExecutionModule(options: { IVM_DISTRIBUTED_LEASE_MIN_TTL_MS: '1000', IVM_QUEUE_TIMEOUT_MS: '1000', ...(options.envOverrides ?? {}), - }, - })) + }) + ) const redisEval = options.redisEvalImpl ? vi.fn(options.redisEvalImpl) : undefined vi.doMock('@/lib/core/config/redis', () => ({ @@ -319,7 +318,7 @@ describe('isolated-vm scheduler', () => { expect(result.error?.message).toContain('Too many concurrent') }) - it('fails closed when Redis is configured but unavailable', async () => { + it('falls back to local execution when Redis is configured but unavailable', async () => { const { executeInIsolatedVM } = await loadExecutionModule({ envOverrides: { REDIS_URL: 'redis://localhost:6379', @@ -328,7 +327,7 @@ describe('isolated-vm scheduler', () => { }) const result = await executeInIsolatedVM({ - code: 'return "blocked"', + code: 'return "ok"', params: {}, envVars: {}, contextVariables: {}, @@ -337,10 +336,11 @@ describe('isolated-vm scheduler', () => { ownerKey: 'user:redis-down', }) - expect(result.error?.message).toContain('temporarily unavailable') + expect(result.error).toBeUndefined() + expect(result.result).toBe('ok') }) - it('fails closed when Redis lease evaluation errors', async () => { + it('falls back to local execution when Redis lease evaluation errors', async () => { const { executeInIsolatedVM } = await loadExecutionModule({ envOverrides: { REDIS_URL: 'redis://localhost:6379', @@ -356,7 +356,7 @@ describe('isolated-vm scheduler', () => { }) const result = await executeInIsolatedVM({ - code: 'return "blocked"', + code: 'return "ok"', params: {}, envVars: {}, contextVariables: {}, @@ -365,7 +365,8 @@ describe('isolated-vm scheduler', () => { ownerKey: 'user:redis-error', }) - expect(result.error?.message).toContain('temporarily unavailable') + expect(result.error).toBeUndefined() + expect(result.result).toBe('ok') }) it('applies weighted owner scheduling when draining queued executions', async () => { diff --git a/apps/sim/lib/execution/isolated-vm.ts b/apps/sim/lib/execution/isolated-vm.ts index ae14cc478a..0efeee09b4 100644 --- a/apps/sim/lib/execution/isolated-vm.ts +++ b/apps/sim/lib/execution/isolated-vm.ts @@ -987,15 +987,8 @@ export async function executeInIsolatedVM( } } if (leaseAcquireResult === 'unavailable') { - maybeCleanupOwner(ownerKey) - return { - result: null, - stdout: '', - error: { - message: 'Code execution is temporarily unavailable. Please try again in a moment.', - name: 'Error', - }, - } + logger.warn('Distributed lease unavailable, falling back to local execution', { ownerKey }) + // Continue execution — local pool still enforces per-process concurrency limits } let settled = false