Skip to content
33 changes: 3 additions & 30 deletions packages/server/src/CachePool.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { IActiveCache, MODE } from './Interface'
import Redis from 'ioredis'
import { Redis } from 'ioredis'
import { RedisConnector } from './RedisConnector'

/**
* This pool is to keep track of in-memory cache used for LLM and Embeddings
Expand All @@ -12,35 +13,7 @@ export class CachePool {
ssoTokenCache: { [key: string]: any } = {}

constructor() {
if (process.env.MODE === MODE.QUEUE) {
if (process.env.REDIS_URL) {
this.redisClient = new Redis(process.env.REDIS_URL, {
keepAlive:
process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10))
? parseInt(process.env.REDIS_KEEP_ALIVE, 10)
: undefined
})
} else {
this.redisClient = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
tls:
process.env.REDIS_TLS === 'true'
? {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
: undefined,
keepAlive:
process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10))
? parseInt(process.env.REDIS_KEEP_ALIVE, 10)
: undefined
})
}
}
this.redisClient = new RedisConnector().getRedisClient()
}

/**
Expand Down
179 changes: 179 additions & 0 deletions packages/server/src/RedisConnector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import { InternalFlowiseError } from './errors/internalFlowiseError'
import logger from './utils/logger'
import { MODE } from './Interface'
import { Redis } from 'ioredis'
import { StatusCodes } from 'http-status-codes'

/**
* Class used to initialize and connect to Redis instance.
*
* Sync usage:
* const connector = new RedisConnector()
* const redis = connector.getRedisClient()
*
* Async usage:
* const connector = new RedisConnector()
* await connector.ready() // fully waits for Redis init
* const redis = connector.getRedisClient()
*/
export class RedisConnector {
/**
* @type {Redis}
*/
private redis!: Redis

/**
* @type {Record<string, unknown>}
*/
private connection!: Record<string, unknown>

/**
* @type {Promise<void>}
*/
private initPromise: Promise<void> | null = null

/**
* Sync constructor
*
* @constructor
*/
constructor() {}

/**
* Initializes Redis lazily (runs once).
*
* @returns {Promise<void>}
*/
private async init(): Promise<void> {
if (this.initPromise) return this.initPromise

this.initPromise = (async () => {
const keepAlive =
process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10))
? parseInt(process.env.REDIS_KEEP_ALIVE, 10)
: 0

const tlsOptions =
process.env.REDIS_TLS === 'true'
? {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
: {}

switch (process.env.MODE) {
case MODE.QUEUE:
await this.initializeQueueMode(keepAlive, tlsOptions)
break

case MODE.MAIN:
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR,
`[server]: MODE ${process.env.MODE} not implemented`
)

default:
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR,
`Unrecognized MODE - ${process.env.MODE}`
)
}
})()

return this.initPromise
}

/**
* Queue mode initialization.
*
* @param {number} keepAlive - Keep alive in milliseconds (see https://redis.github.io/ioredis/index.html#RedisOptions)
* @param {Record<string, unknown>} tlsOptions - Record with key-value pairs (see https://redis.github.io/ioredis/index.html#RedisOptions)
*/
private async initializeQueueMode(keepAlive: number, tlsOptions: Record<string, unknown>): Promise<void> {
if (process.env.REDIS_URL) {
logger.info('[server] Queue mode using REDIS_URL.')

tlsOptions.rejectUnauthorized =
!(process.env.REDIS_URL.startsWith('rediss://') && process.env.REDIS_TLS !== 'true')

this.connection = {
keepAlive,
tls: tlsOptions,
enableReadyCheck: true,
reconnectOnError: this.connectOnError.bind(this)
}

this.redis = new Redis(process.env.REDIS_URL, this.connection)

} else {
logger.info('[server] Queue mode using HOST or localhost.')

this.connection = {
host: process.env.REDIS_HOST ?? 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
keepAlive,
tls: tlsOptions,
enableReadyCheck: true,
reconnectOnError: this.connectOnError.bind(this)
}

this.redis = new Redis(this.connection)
}

try {
await this.redis.connect()
} catch (err: any) {
logger.error(`[server]: Redis connection failed - ${err.message}`)
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, err.message)
}
}

/**
* Function to handle Redis failure, used as callback.
* https://redis.github.io/ioredis/interfaces/CommonRedisOptions.html#reconnectOnError
* @param {Error} err
* @returns {number} 1 - Always reconnect to Redis in case of errors (does not retry the failed command)
* @see https://redis.github.io/ioredis/interfaces/CommonRedisOptions.html#reconnectOnError
*/
private connectOnError(err: Error): number {
logger.error(`[server]: Redis connection error - ${err.message}`)
return 1
}

/**
* Sync-safe access:
* - If Redis isn't initialized: triggers async initialization.
* - Always returns the Redis instance synchronously.
*
* @returns {Redis}
*/
public getRedisClient(): Redis {
// Trigger async init if not yet started
void this.init()
return this.redis
}

/**
* Fully async safe usage:
* await connector.ready()
*
* @returns {Promise<void>}
*/
public async ready(): Promise<void> {
await this.init()
}

/**
* Sync-safe access
*
* @returns {Record<string, unknown>}
*/
public getRedisConnection(): Record<string, unknown> {
// Trigger async init if not yet started
void this.init()
return this.connection
}
}

export default RedisConnector
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Redis from 'ioredis'
import { RedisConnector } from '../../../RedisConnector'
import { RedisStore } from 'connect-redis'
import { getDatabaseSSLFromEnv } from '../../../DataSource'
import path from 'path'
Expand All @@ -13,24 +14,7 @@ let dbStore: Store | null = null

export const initializeRedisClientAndStore = (): RedisStore => {
if (!redisClient) {
if (process.env.REDIS_URL) {
redisClient = new Redis(process.env.REDIS_URL)
} else {
redisClient = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
tls:
process.env.REDIS_TLS === 'true'
? {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
: undefined
})
}
redisClient = new RedisConnector().getRedisClient()
}
if (!redisStore) {
redisStore = new RedisStore({ client: redisClient })
Expand Down
63 changes: 5 additions & 58 deletions packages/server/src/utils/rateLimit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { IChatFlow, MODE } from '../Interface'
import { Mutex } from 'async-mutex'
import { RedisStore } from 'rate-limit-redis'
import Redis from 'ioredis'
import { RedisConnector } from '../RedisConnector'
import { QueueEvents, QueueEventsListener, QueueEventsProducer } from 'bullmq'

interface CustomListener extends QueueEventsListener {
Expand All @@ -22,65 +23,11 @@ export class RateLimiterManager {
private queueEvents: QueueEvents

constructor() {
let redisConnector = new RedisConnector()
this.redisClient = redisConnector.getRedisClient()
if (process.env.MODE === MODE.QUEUE) {
if (process.env.REDIS_URL) {
this.redisClient = new Redis(process.env.REDIS_URL, {
keepAlive:
process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10))
? parseInt(process.env.REDIS_KEEP_ALIVE, 10)
: undefined
})
} else {
this.redisClient = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
tls:
process.env.REDIS_TLS === 'true'
? {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
: undefined,
keepAlive:
process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10))
? parseInt(process.env.REDIS_KEEP_ALIVE, 10)
: undefined
})
}
this.queueEventsProducer = new QueueEventsProducer(QUEUE_NAME, { connection: this.getConnection() })
this.queueEvents = new QueueEvents(QUEUE_NAME, { connection: this.getConnection() })
}
}

getConnection() {
let tlsOpts = undefined
if (process.env.REDIS_URL && process.env.REDIS_URL.startsWith('rediss://')) {
tlsOpts = {
rejectUnauthorized: false
}
} else if (process.env.REDIS_TLS === 'true') {
tlsOpts = {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
}
return {
url: process.env.REDIS_URL || undefined,
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
tls: tlsOpts,
maxRetriesPerRequest: null,
enableReadyCheck: true,
keepAlive:
process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10))
? parseInt(process.env.REDIS_KEEP_ALIVE, 10)
: undefined
this.queueEventsProducer = new QueueEventsProducer(QUEUE_NAME, { connection: redisConnector.getRedisConnection() })
this.queueEvents = new QueueEvents(QUEUE_NAME, { connection: redisConnector.getRedisConnection() })
}
}

Expand Down