Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 176 additions & 0 deletions apps/sim/lib/core/config/redis.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import { createEnvMock, createMockRedis, loggerMock } from '@sim/testing'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'

const mockRedisInstance = createMockRedis()

vi.mock('@sim/logger', () => loggerMock)
vi.mock('@/lib/core/config/env', () => createEnvMock({ REDIS_URL: 'redis://localhost:6379' }))
vi.mock('ioredis', () => ({
default: vi.fn(() => mockRedisInstance),
}))

describe('redis config', () => {
beforeEach(() => {
vi.clearAllMocks()
vi.useFakeTimers()
})

afterEach(() => {
vi.useRealTimers()
vi.resetModules()
})

describe('onRedisReconnect', () => {
it('should register and invoke reconnect listeners', async () => {
const { onRedisReconnect, getRedisClient } = await import('./redis')
const listener = vi.fn()
onRedisReconnect(listener)

getRedisClient()

mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT'))
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(30_000)

expect(listener).toHaveBeenCalledTimes(1)
})

it('should not invoke listeners when PINGs succeed', async () => {
const { onRedisReconnect, getRedisClient } = await import('./redis')
const listener = vi.fn()
onRedisReconnect(listener)

getRedisClient()
mockRedisInstance.ping.mockResolvedValue('PONG')

await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(30_000)

expect(listener).not.toHaveBeenCalled()
})

it('should reset failure count on successful PING', async () => {
const { onRedisReconnect, getRedisClient } = await import('./redis')
const listener = vi.fn()
onRedisReconnect(listener)

getRedisClient()

// 2 failures then a success — should reset counter
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
await vi.advanceTimersByTimeAsync(30_000)
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
await vi.advanceTimersByTimeAsync(30_000)
mockRedisInstance.ping.mockResolvedValueOnce('PONG')
await vi.advanceTimersByTimeAsync(30_000)

// 2 more failures — should NOT trigger reconnect (counter was reset)
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
await vi.advanceTimersByTimeAsync(30_000)
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
await vi.advanceTimersByTimeAsync(30_000)

expect(listener).not.toHaveBeenCalled()
})

it('should call disconnect(true) after 3 consecutive PING failures', async () => {
const { getRedisClient } = await import('./redis')
getRedisClient()

mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT'))
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(30_000)

expect(mockRedisInstance.disconnect).not.toHaveBeenCalled()

await vi.advanceTimersByTimeAsync(30_000)
expect(mockRedisInstance.disconnect).toHaveBeenCalledWith(true)
})

it('should handle listener errors gracefully without breaking health check', async () => {
const { onRedisReconnect, getRedisClient } = await import('./redis')
const badListener = vi.fn(() => {
throw new Error('listener crashed')
})
const goodListener = vi.fn()
onRedisReconnect(badListener)
onRedisReconnect(goodListener)

getRedisClient()
mockRedisInstance.ping.mockRejectedValue(new Error('timeout'))
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(30_000)

expect(badListener).toHaveBeenCalledTimes(1)
expect(goodListener).toHaveBeenCalledTimes(1)
})
})

describe('closeRedisConnection', () => {
it('should clear the PING interval', async () => {
const { getRedisClient, closeRedisConnection } = await import('./redis')
getRedisClient()

mockRedisInstance.quit.mockResolvedValue('OK')
await closeRedisConnection()

// After closing, PING failures should not trigger disconnect
mockRedisInstance.ping.mockRejectedValue(new Error('timeout'))
await vi.advanceTimersByTimeAsync(30_000 * 5)
expect(mockRedisInstance.disconnect).not.toHaveBeenCalled()
})
})

describe('retryStrategy', () => {
async function captureRetryStrategy(): Promise<(times: number) => number> {
vi.resetModules()

vi.doMock('@sim/logger', () => loggerMock)
vi.doMock('@/lib/core/config/env', () =>
createEnvMock({ REDIS_URL: 'redis://localhost:6379' })
)

let capturedConfig: Record<string, unknown> = {}
vi.doMock('ioredis', () => ({
default: vi.fn((_url: string, config: Record<string, unknown>) => {
capturedConfig = config
return { ping: vi.fn(), on: vi.fn() }
}),
}))

const { getRedisClient } = await import('./redis')
getRedisClient()

return capturedConfig.retryStrategy as (times: number) => number
}

it('should use exponential backoff with jitter', async () => {
const retryStrategy = await captureRetryStrategy()
expect(retryStrategy).toBeDefined()

// Base for attempt 1: min(1000 * 2^0, 10000) = 1000, jitter up to 300
const delay1 = retryStrategy(1)
expect(delay1).toBeGreaterThanOrEqual(1000)
expect(delay1).toBeLessThanOrEqual(1300)

// Base for attempt 3: min(1000 * 2^2, 10000) = 4000, jitter up to 1200
const delay3 = retryStrategy(3)
expect(delay3).toBeGreaterThanOrEqual(4000)
expect(delay3).toBeLessThanOrEqual(5200)

// Base for attempt 5: min(1000 * 2^4, 10000) = 10000, jitter up to 3000
const delay5 = retryStrategy(5)
expect(delay5).toBeGreaterThanOrEqual(10000)
expect(delay5).toBeLessThanOrEqual(13000)
})

it('should cap at 30s for attempts beyond 10', async () => {
const retryStrategy = await captureRetryStrategy()
expect(retryStrategy(11)).toBe(30000)
expect(retryStrategy(100)).toBe(30000)
})
})
})
70 changes: 68 additions & 2 deletions apps/sim/lib/core/config/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,63 @@ const logger = createLogger('Redis')
const redisUrl = env.REDIS_URL

let globalRedisClient: Redis | null = null
let pingFailures = 0
let pingInterval: NodeJS.Timeout | null = null
let pingInFlight = false

const PING_INTERVAL_MS = 30_000
const MAX_PING_FAILURES = 3

/** Callbacks invoked when the PING health check forces a reconnect. */
const reconnectListeners: Array<() => void> = []

/**
* Register a callback that fires when the PING health check forces a reconnect.
* Useful for resetting cached adapters that hold a stale Redis reference.
*/
export function onRedisReconnect(cb: () => void): void {
reconnectListeners.push(cb)
}

function startPingHealthCheck(redis: Redis): void {
if (pingInterval) return

pingInterval = setInterval(async () => {
if (pingInFlight) return
pingInFlight = true
try {
await redis.ping()
pingFailures = 0
} catch (error) {
pingFailures++
logger.warn('Redis PING failed', {
consecutiveFailures: pingFailures,
error: error instanceof Error ? error.message : String(error),
})

if (pingFailures >= MAX_PING_FAILURES) {
logger.error('Redis PING failed 3 consecutive times — forcing reconnect', {
consecutiveFailures: pingFailures,
})
pingFailures = 0
for (const cb of reconnectListeners) {
try {
cb()
} catch (cbError) {
logger.error('Redis reconnect listener error', { error: cbError })
}
}
try {
redis.disconnect(true)
} catch (disconnectError) {
logger.error('Error during forced Redis disconnect', { error: disconnectError })
}
}
} finally {
pingInFlight = false
}
}, PING_INTERVAL_MS)
}

/**
* Get a Redis client instance.
Expand Down Expand Up @@ -35,8 +92,10 @@ export function getRedisClient(): Redis | null {
logger.error(`Redis reconnection attempt ${times}`, { nextRetryMs: 30000 })
return 30000
}
const delay = Math.min(times * 500, 5000)
logger.warn(`Redis reconnecting`, { attempt: times, nextRetryMs: delay })
const base = Math.min(1000 * 2 ** (times - 1), 10000)
const jitter = Math.random() * base * 0.3
const delay = Math.round(base + jitter)
logger.warn('Redis reconnecting', { attempt: times, nextRetryMs: delay })
return delay
},

Expand All @@ -54,6 +113,8 @@ export function getRedisClient(): Redis | null {
globalRedisClient.on('close', () => logger.warn('Redis connection closed'))
globalRedisClient.on('end', () => logger.error('Redis connection ended'))

startPingHealthCheck(globalRedisClient)

return globalRedisClient
} catch (error) {
logger.error('Failed to initialize Redis client', { error })
Expand Down Expand Up @@ -118,6 +179,11 @@ export async function releaseLock(lockKey: string, value: string): Promise<boole
* Use for graceful shutdown.
*/
export async function closeRedisConnection(): Promise<void> {
if (pingInterval) {
clearInterval(pingInterval)
pingInterval = null
}

if (globalRedisClient) {
try {
await globalRedisClient.quit()
Expand Down
6 changes: 3 additions & 3 deletions apps/sim/lib/core/rate-limiter/rate-limiter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ describe('RateLimiter', () => {
)
})

it('should deny on storage error (fail closed)', async () => {
it('should allow on storage error (fail open)', async () => {
mockAdapter.consumeTokens.mockRejectedValue(new Error('Storage error'))

const result = await rateLimiter.checkRateLimitWithSubscription(
Expand All @@ -182,8 +182,8 @@ describe('RateLimiter', () => {
false
)

expect(result.allowed).toBe(false)
expect(result.remaining).toBe(0)
expect(result.allowed).toBe(true)
expect(result.remaining).toBe(1)
})

it('should work for all non-manual trigger types', async () => {
Expand Down
7 changes: 3 additions & 4 deletions apps/sim/lib/core/rate-limiter/rate-limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,16 @@ export class RateLimiter {
retryAfterMs: result.retryAfterMs,
}
} catch (error) {
logger.error('Rate limit storage error - failing closed (denying request)', {
logger.error('Rate limit storage error - failing open (allowing request)', {
error: error instanceof Error ? error.message : String(error),
userId,
triggerType,
isAsync,
})
return {
allowed: false,
remaining: 0,
allowed: true,
remaining: 1,
resetAt: new Date(Date.now() + RATE_LIMIT_WINDOW_MS),
retryAfterMs: RATE_LIMIT_WINDOW_MS,
}
}
}
Expand Down
Loading