From 701e840b5ff9c84b52877f84c26ad928641724cb Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Tue, 24 Jun 2025 16:23:54 +0300 Subject: [PATCH 01/20] --wip-- [skip ci] --- packages/client/lib/client/commands-queue.ts | 18 ++++++++++++ packages/client/lib/client/index.ts | 31 +++++++++++++++++--- packages/client/lib/client/socket.ts | 2 ++ 3 files changed, 47 insertions(+), 4 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 52a07a7e3b..63658a841e 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -65,6 +65,7 @@ export default class RedisCommandsQueue { } #invalidateCallback?: (key: RedisArgument | null) => unknown; + #movingCallback?: (afterMs: number, host: string, port: number) => void; constructor( respVersion: RespVersions, @@ -134,6 +135,19 @@ export default class RedisCommandsQueue { } break; } + case 'MOVING': { + if (this.#movingCallback) { + console.log('received moving', push) + const [_, afterMs, url] = push; + let [host, port] = url.toString().split(':'); + //['18.200.246.58'] - for some reason the server sends the host this way + if(host.includes('[')) { + host = host.slice(2, -2); + } + this.#movingCallback(afterMs, host, Number(port)); + } + break; + } } } }, @@ -145,6 +159,10 @@ export default class RedisCommandsQueue { this.#invalidateCallback = callback; } + setMovingCallback(callback?: (afterMs: number, host: string, port: number) => void) { + this.#movingCallback = callback; + } + addCommand( args: ReadonlyArray, options?: CommandOptions diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 128dc59967..2731361815 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -366,7 +366,7 @@ export default class RedisClient< } readonly #options?: RedisClientOptions; - readonly #socket: RedisSocket; + #socket: RedisSocket; readonly #queue: RedisCommandsQueue; #selectedDB = 0; #monitorCallback?: MonitorCallback; @@ -431,7 +431,26 @@ export default class RedisClient< this.#validateOptions(options) this.#options = this.#initiateOptions(options); this.#queue = this.#initiateQueue(); - this.#socket = this.#initiateSocket(); + this.#socket = this.#initiateSocket(this.#options); + + this.#queue.setMovingCallback(async (afterMs: number, host: string, port: number) => { + console.log(`Moving to ${host}:${port} before ${afterMs}ms`); + const oldSocket = this.#socket; + const newSocket = this.#initiateSocket({ + ...this.#options, + socket: { + ...this.#options?.socket, + host, + port + } + }); + newSocket.on('ready', () => { + console.log(`Connected to ${host}:${port}, destroying old socket`); + oldSocket.destroy() + this.#socket = newSocket + }); + await newSocket.connect() + }); if (options?.clientSideCache) { if (options.clientSideCache instanceof ClientSideCacheProvider) { @@ -657,8 +676,9 @@ export default class RedisClient< return commands; } - #initiateSocket(): RedisSocket { + #initiateSocket(options?: RedisClientOptions): RedisSocket { const socketInitiator = async () => { + console.log('Initiator...'); const promises = [], chainId = Symbol('Socket Initiator'); @@ -688,8 +708,9 @@ export default class RedisClient< } }; - return new RedisSocket(socketInitiator, this.#options?.socket) + return new RedisSocket(socketInitiator, options?.socket) .on('data', chunk => { + console.log('Data received', chunk); try { this.#queue.decoder.write(chunk); } catch (err) { @@ -698,6 +719,7 @@ export default class RedisClient< } }) .on('error', err => { + console.error('Socket error', err); this.emit('error', err); this.#clientSideCache?.onError(); if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) { @@ -708,6 +730,7 @@ export default class RedisClient< }) .on('connect', () => this.emit('connect')) .on('ready', () => { + console.log('Socket ready'); this.emit('ready'); this.#setPingTimer(); this.#maybeScheduleWrite(); diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index 5f0bcc4492..dad0144d3e 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -209,6 +209,7 @@ export default class RedisSocket extends EventEmitter { } async #connect(): Promise { + console.log('Connecting...'); let retries = 0; do { try { @@ -224,6 +225,7 @@ export default class RedisSocket extends EventEmitter { } this.#isReady = true; this.#socketEpoch++; + console.log('Socket connected, emit ready'); this.emit('ready'); } catch (err) { const retryIn = this.#shouldReconnect(retries++, err as Error); From b5fa59c64f147038736f5dfa9e4cea44daa5b080 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Thu, 26 Jun 2025 16:02:19 +0300 Subject: [PATCH 02/20] --wip-- [skip ci] --- packages/client/lib/client/commands-queue.ts | 1 + packages/client/lib/client/index.ts | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 63658a841e..a2179c8fe0 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -121,6 +121,7 @@ export default class RedisCommandsQueue { //TODO: we can shave off a few cycles by not adding onPush handler at all if CSC is not used onPush: push => { if (!this.#onPush(push)) { + console.log('Push received', push.toString()); // currently only supporting "invalidate" over RESP3 push messages switch (push[0].toString()) { case "invalidate": { diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 2731361815..db179f5c2d 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -710,7 +710,7 @@ export default class RedisClient< return new RedisSocket(socketInitiator, options?.socket) .on('data', chunk => { - console.log('Data received', chunk); + console.log('Data received', chunk.toString()); try { this.#queue.decoder.write(chunk); } catch (err) { From 2933b57f3fcf7936b99fc80c225824ac8e529d0c Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Fri, 11 Jul 2025 11:28:13 +0300 Subject: [PATCH 03/20] expose new options --- packages/client/lib/client/index.ts | 42 +++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index db179f5c2d..1786edf7b7 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -144,6 +144,44 @@ export interface RedisClientOptions< * Tag to append to library name that is sent to the Redis server */ clientInfoTag?: string; + + /** + * Configuration for handling Redis Enterprise graceful maintenance scenarios. + * + * When Redis Enterprise performs maintenance operations, nodes will be replaced, resulting in disconnects. + * This configuration allows the client to handle these scenarios gracefully by automatically + * reconnecting and managing command execution during maintenance windows. + * + * @example Basic graceful maintenance configuration + * ``` + * const client = createClient({ + * gracefulMaintenance: { + * handleFailedCommands: 'retry', + * handleTimeouts: 'exception', + * } + * }); + * ``` + * + * @example Graceful maintenance with timeout smoothing + * ``` + * const client = createClient({ + * gracefulMaintenance: { + * handleFailedCommands: 'retry', + * handleTimeouts: 5000, // Extend timeouts to 5 seconds during maintenance + * } + * }); + * ``` + */ + gracefulMaintenance?: { + /** + * Designates how failed commands should be handled. A failed command is when the time isn’t sufficient to deal with the responses on the old connection before the server shuts it down + */ + handleFailedCommands: 'exception' | 'retry', + /** + * Specify whether we should throw a MaintenanceTimeout exception or provide more relaxed timeout, in order to minimize command timeouts during maintenance. + */ + handleTimeouts: 'exception' | number, + } } type WithCommands< @@ -468,6 +506,10 @@ export default class RedisClient< throw new Error('Client Side Caching is only supported with RESP3'); } + if (options?.gracefulMaintenance && options?.RESP !== 3) { + throw new Error('Graceful Maintenance is only supported with RESP3'); + } + } #initiateOptions(options?: RedisClientOptions): RedisClientOptions | undefined { From 143f0771c14c55999ca020797ad967d9a6a3ac7d Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Fri, 11 Jul 2025 15:58:22 +0300 Subject: [PATCH 04/20] implement queue drain mechanism --- packages/client/lib/client/commands-queue.ts | 28 ++++----- packages/client/lib/client/index.ts | 62 +++++++++++++++++--- packages/client/lib/client/linked-list.ts | 33 ++++++++++- 3 files changed, 97 insertions(+), 26 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index a2179c8fe0..27bc18a610 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -1,10 +1,11 @@ -import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list'; +import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList, makeEmptyAware } from './linked-list'; import encodeCommand from '../RESP/encoder'; import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder'; import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types'; import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub'; import { AbortError, ErrorReply, TimeoutError } from '../errors'; import { MonitorCallback } from '.'; +import EventEmitter from 'events'; export interface CommandOptions { chainId?: symbol; @@ -54,18 +55,18 @@ export default class RedisCommandsQueue { readonly #respVersion; readonly #maxLength; readonly #toWrite = new DoublyLinkedList(); - readonly #waitingForReply = new SinglyLinkedList(); + readonly #waitingForReply: SinglyLinkedList; readonly #onShardedChannelMoved; #chainInExecution: symbol | undefined; readonly decoder; readonly #pubSub = new PubSub(); + readonly events = new EventEmitter(); get isPubSubActive() { return this.#pubSub.isActive; } #invalidateCallback?: (key: RedisArgument | null) => unknown; - #movingCallback?: (afterMs: number, host: string, port: number) => void; constructor( respVersion: RespVersions, @@ -76,6 +77,9 @@ export default class RedisCommandsQueue { this.#maxLength = maxLength; this.#onShardedChannelMoved = onShardedChannelMoved; this.decoder = this.#initiateDecoder(); + const [waitingForReply, emptyEmitter] = makeEmptyAware(new SinglyLinkedList()) + this.#waitingForReply = waitingForReply; + emptyEmitter.on('empty', this.events.on.bind(this.events, 'waitingForReplyEmpty')) } #onReply(reply: ReplyUnion) { @@ -137,16 +141,10 @@ export default class RedisCommandsQueue { break; } case 'MOVING': { - if (this.#movingCallback) { - console.log('received moving', push) - const [_, afterMs, url] = push; - let [host, port] = url.toString().split(':'); - //['18.200.246.58'] - for some reason the server sends the host this way - if(host.includes('[')) { - host = host.slice(2, -2); - } - this.#movingCallback(afterMs, host, Number(port)); - } + console.log('received moving', push) + const [_, afterMs, url] = push; + const [host, port] = url.toString().split(':'); + this.events.emit('moving', afterMs, host, Number(port)) break; } } @@ -160,8 +158,8 @@ export default class RedisCommandsQueue { this.#invalidateCallback = callback; } - setMovingCallback(callback?: (afterMs: number, host: string, port: number) => void) { - this.#movingCallback = callback; + isWaitingForReply(): boolean { + return this.#waitingForReply.length > 0; } addCommand( diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 1786edf7b7..f8d61c316b 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -417,11 +417,16 @@ export default class RedisClient< #watchEpoch?: number; #clientSideCache?: ClientSideCacheProvider; #credentialsSubscription: Disposable | null = null; + // Flag used to pause writing to the socket during maintenance windows. + // When true, prevents new commands from being written while waiting for: + // 1. New socket to be ready after maintenance redirect + // 2. In-flight commands on the old socket to complete + #paused = false; + get clientSideCache() { return this._self.#clientSideCache; } - get options(): RedisClientOptions | undefined { return this._self.#options; } @@ -470,11 +475,27 @@ export default class RedisClient< this.#options = this.#initiateOptions(options); this.#queue = this.#initiateQueue(); this.#socket = this.#initiateSocket(this.#options); - - this.#queue.setMovingCallback(async (afterMs: number, host: string, port: number) => { + // Queue + // toWrite [ C D E ] + // waitingForReply [ A B ] + // + // time: ---1-2---3-4-5-6--------------------------- + // + // 1. [EVENT] MOVING PN received + // 2. [ACTION] Pause writing ( we need to wait for new socket to connect and for all in-flight commands to complete ) + // 3. [EVENT] New sock connected + // 4. [EVENT] In-flight commands completed + // 5. [ACTION] Unpause writing -> we are going to write to the new socket from now on + // 6. [ACTION] Destroy old socket + this.options?.gracefulMaintenance && this.#queue.events.on('moving', async (afterMs: number, host: string, port: number) => { + // 1 console.log(`Moving to ${host}:${port} before ${afterMs}ms`); + + // 2 + this.#paused = true; + const oldSocket = this.#socket; - const newSocket = this.#initiateSocket({ + this.#socket = this.#initiateSocket({ ...this.#options, socket: { ...this.#options?.socket, @@ -482,12 +503,32 @@ export default class RedisClient< port } }); - newSocket.on('ready', () => { - console.log(`Connected to ${host}:${port}, destroying old socket`); - oldSocket.destroy() - this.#socket = newSocket + + // 3 + this.#socket.once('ready', () => { + //TODO handshake...??? + console.log(`Connected to ${host}:${port}`); + + // 4 + if(!this.#queue.isWaitingForReply()) { + // 5 and 6 + oldSocket.destroy(); + this.#paused = false; + } + }); + + // 4 + this.#queue.events.once('waitingForReplyEmpty', () => { + + // 3 + if(this.#socket.isReady) { + // 5 and 6 + oldSocket.destroy(); + this.#paused = false; + } }); - await newSocket.connect() + + await this.#socket.connect() }); if (options?.clientSideCache) { @@ -1120,6 +1161,9 @@ export default class RedisClient< } #write() { + if(this.#paused) { + return + } this.#socket.write(this.#queue.commandsToWrite()); } diff --git a/packages/client/lib/client/linked-list.ts b/packages/client/lib/client/linked-list.ts index 29678f027b..3be04cbd3d 100644 --- a/packages/client/lib/client/linked-list.ts +++ b/packages/client/lib/client/linked-list.ts @@ -1,3 +1,5 @@ +import EventEmitter from "events"; + export interface DoublyLinkedNode { value: T; previous: DoublyLinkedNode | undefined; @@ -32,7 +34,7 @@ export class DoublyLinkedList { next: undefined, value }; - } + } return this.#tail = this.#tail.next = { previous: this.#tail, @@ -93,7 +95,7 @@ export class DoublyLinkedList { node.previous!.next = node.next; node.previous = undefined; } - + node.next = undefined; } @@ -111,6 +113,33 @@ export class DoublyLinkedList { } } +// This function takes an object that has a `length` property +// and returns both a proxy and an event emitter. +// The proxy will act the same as the original object. +// And the event emitter will emit an `empty` event whenever the `length` becomes zero. +export const makeEmptyAware = (obj: T): [T, EventEmitter] => { + const eventEmitter = new EventEmitter(); + const proxy = new Proxy(obj, { + get(target, prop, receiver) { + const original = Reflect.get(target, prop, receiver); + if (typeof original === 'function') { + return function (...args: any[]) { + const oldLength = target.length; + const ret = original.apply(target, args); + const newLength = target.length; + if(oldLength !== newLength && newLength === 0) { + eventEmitter.emit('empty') + } + return ret + }; + } else { + return original; + }; + }, + }); + return [ proxy, eventEmitter ]; +} + export interface SinglyLinkedNode { value: T; next: SinglyLinkedNode | undefined; From 272e34edc6c28d03b59050f7efde78ad715f77f4 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Mon, 14 Jul 2025 15:17:13 +0300 Subject: [PATCH 05/20] fix typo --- packages/client/lib/client/commands-queue.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 27bc18a610..d88d69c9ac 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -79,7 +79,7 @@ export default class RedisCommandsQueue { this.decoder = this.#initiateDecoder(); const [waitingForReply, emptyEmitter] = makeEmptyAware(new SinglyLinkedList()) this.#waitingForReply = waitingForReply; - emptyEmitter.on('empty', this.events.on.bind(this.events, 'waitingForReplyEmpty')) + emptyEmitter.on('empty', this.events.emit.bind(this.events, 'waitingForReplyEmpty')) } #onReply(reply: ReplyUnion) { From 09f107e23be5d716131f19e0c86d3fba071a02f9 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Mon, 14 Jul 2025 15:20:03 +0300 Subject: [PATCH 06/20] fix proxy bug with this --- packages/client/lib/client/linked-list.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/client/lib/client/linked-list.ts b/packages/client/lib/client/linked-list.ts index 3be04cbd3d..d92feb2714 100644 --- a/packages/client/lib/client/linked-list.ts +++ b/packages/client/lib/client/linked-list.ts @@ -120,8 +120,8 @@ export class DoublyLinkedList { export const makeEmptyAware = (obj: T): [T, EventEmitter] => { const eventEmitter = new EventEmitter(); const proxy = new Proxy(obj, { - get(target, prop, receiver) { - const original = Reflect.get(target, prop, receiver); + get(target, prop, _receiver) { + const original = Reflect.get(target, prop, target); if (typeof original === 'function') { return function (...args: any[]) { const oldLength = target.length; From 8973002ba9b4068f49c32608daa3dcb6f68f7cf7 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Mon, 14 Jul 2025 15:30:38 +0300 Subject: [PATCH 07/20] refactor - remove proxy, use subclass --- packages/client/lib/client/commands-queue.ts | 9 ++-- packages/client/lib/client/linked-list.ts | 50 +++++++++----------- 2 files changed, 27 insertions(+), 32 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index d88d69c9ac..4ad2acf3c5 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -1,4 +1,4 @@ -import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList, makeEmptyAware } from './linked-list'; +import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList, EmptyAwareSinglyLinkedList } from './linked-list'; import encodeCommand from '../RESP/encoder'; import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder'; import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types'; @@ -55,7 +55,7 @@ export default class RedisCommandsQueue { readonly #respVersion; readonly #maxLength; readonly #toWrite = new DoublyLinkedList(); - readonly #waitingForReply: SinglyLinkedList; + readonly #waitingForReply: EmptyAwareSinglyLinkedList; readonly #onShardedChannelMoved; #chainInExecution: symbol | undefined; readonly decoder; @@ -77,9 +77,8 @@ export default class RedisCommandsQueue { this.#maxLength = maxLength; this.#onShardedChannelMoved = onShardedChannelMoved; this.decoder = this.#initiateDecoder(); - const [waitingForReply, emptyEmitter] = makeEmptyAware(new SinglyLinkedList()) - this.#waitingForReply = waitingForReply; - emptyEmitter.on('empty', this.events.emit.bind(this.events, 'waitingForReplyEmpty')) + this.#waitingForReply = new EmptyAwareSinglyLinkedList() + this.#waitingForReply.events.on('empty', this.events.emit.bind(this.events, 'waitingForReplyEmpty')) } #onReply(reply: ReplyUnion) { diff --git a/packages/client/lib/client/linked-list.ts b/packages/client/lib/client/linked-list.ts index d92feb2714..c4020b250c 100644 --- a/packages/client/lib/client/linked-list.ts +++ b/packages/client/lib/client/linked-list.ts @@ -113,33 +113,6 @@ export class DoublyLinkedList { } } -// This function takes an object that has a `length` property -// and returns both a proxy and an event emitter. -// The proxy will act the same as the original object. -// And the event emitter will emit an `empty` event whenever the `length` becomes zero. -export const makeEmptyAware = (obj: T): [T, EventEmitter] => { - const eventEmitter = new EventEmitter(); - const proxy = new Proxy(obj, { - get(target, prop, _receiver) { - const original = Reflect.get(target, prop, target); - if (typeof original === 'function') { - return function (...args: any[]) { - const oldLength = target.length; - const ret = original.apply(target, args); - const newLength = target.length; - if(oldLength !== newLength && newLength === 0) { - eventEmitter.emit('empty') - } - return ret - }; - } else { - return original; - }; - }, - }); - return [ proxy, eventEmitter ]; -} - export interface SinglyLinkedNode { value: T; next: SinglyLinkedNode | undefined; @@ -230,3 +203,26 @@ export class SinglyLinkedList { } } } + +export class EmptyAwareSinglyLinkedList extends SinglyLinkedList { + readonly events = new EventEmitter(); + reset() { + super.reset(); + this.events.emit('empty'); + } + shift(): T | undefined { + const old = this.length; + const ret = super.shift(); + if(old !== this.length && this.length === 0) { + this.events.emit('empty'); + } + return ret; + } + remove(node: SinglyLinkedNode, parent: SinglyLinkedNode | undefined): T | undefined { + const old = this.length; + super.remove(node, parent); + if(old !== this.length && this.length === 0) { + this.events.emit('empty'); + } + } +} From 2fcabe51d3081104e3ec017836a17e4863cad180 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Tue, 15 Jul 2025 11:42:25 +0300 Subject: [PATCH 08/20] --wip-- [skip ci] --- packages/client/lib/client/commands-queue.ts | 2 +- packages/client/lib/client/index.ts | 11 +++++++---- packages/client/lib/client/linked-list.ts | 2 +- packages/client/lib/client/socket.ts | 1 + 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 4ad2acf3c5..a3799c1f6e 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -1,4 +1,4 @@ -import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList, EmptyAwareSinglyLinkedList } from './linked-list'; +import { DoublyLinkedNode, DoublyLinkedList, EmptyAwareSinglyLinkedList } from './linked-list'; import encodeCommand from '../RESP/encoder'; import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder'; import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types'; diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index f8d61c316b..e3d822c68f 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -492,7 +492,8 @@ export default class RedisClient< console.log(`Moving to ${host}:${port} before ${afterMs}ms`); // 2 - this.#paused = true; + console.log(`Pausing writing until new socket is ready and all in-flight commands are completed`); + // this.#paused = true; const oldSocket = this.#socket; this.#socket = this.#initiateSocket({ @@ -512,6 +513,8 @@ export default class RedisClient< // 4 if(!this.#queue.isWaitingForReply()) { // 5 and 6 + console.log(`All in-flight commands completed`); + console.log(`Resume writing`) oldSocket.destroy(); this.#paused = false; } @@ -519,10 +522,12 @@ export default class RedisClient< // 4 this.#queue.events.once('waitingForReplyEmpty', () => { - + console.log(`All in-flight commands completed`); // 3 if(this.#socket.isReady) { // 5 and 6 + console.log(`Connected to ${host}:${port}`); + console.log(`Resume writing`) oldSocket.destroy(); this.#paused = false; } @@ -793,7 +798,6 @@ export default class RedisClient< return new RedisSocket(socketInitiator, options?.socket) .on('data', chunk => { - console.log('Data received', chunk.toString()); try { this.#queue.decoder.write(chunk); } catch (err) { @@ -802,7 +806,6 @@ export default class RedisClient< } }) .on('error', err => { - console.error('Socket error', err); this.emit('error', err); this.#clientSideCache?.onError(); if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) { diff --git a/packages/client/lib/client/linked-list.ts b/packages/client/lib/client/linked-list.ts index c4020b250c..6db71fe529 100644 --- a/packages/client/lib/client/linked-list.ts +++ b/packages/client/lib/client/linked-list.ts @@ -218,7 +218,7 @@ export class EmptyAwareSinglyLinkedList extends SinglyLinkedList { } return ret; } - remove(node: SinglyLinkedNode, parent: SinglyLinkedNode | undefined): T | undefined { + remove(node: SinglyLinkedNode, parent: SinglyLinkedNode | undefined) { const old = this.length; super.remove(node, parent); if(old !== this.length && this.length === 0) { diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index dad0144d3e..59afb7bea8 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -219,6 +219,7 @@ export default class RedisSocket extends EventEmitter { try { await this.#initiator(); } catch (err) { + console.log('Initiator failed', err); this.#socket.destroy(); this.#socket = undefined; throw err; From d02bf949355a016995754e55e2c8511ded4ac39c Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Wed, 16 Jul 2025 15:19:34 +0300 Subject: [PATCH 09/20] --wip-- [skip ci] --- packages/client/lib/client/index.ts | 160 ++++++++++++++------------- packages/client/lib/client/socket.ts | 30 ++--- 2 files changed, 98 insertions(+), 92 deletions(-) diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index e3d822c68f..1ea9b7a8e7 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -474,7 +474,7 @@ export default class RedisClient< this.#validateOptions(options) this.#options = this.#initiateOptions(options); this.#queue = this.#initiateQueue(); - this.#socket = this.#initiateSocket(this.#options); + this.#socket = this.#createSocket(this.#options); // Queue // toWrite [ C D E ] // waitingForReply [ A B ] @@ -487,54 +487,54 @@ export default class RedisClient< // 4. [EVENT] In-flight commands completed // 5. [ACTION] Unpause writing -> we are going to write to the new socket from now on // 6. [ACTION] Destroy old socket - this.options?.gracefulMaintenance && this.#queue.events.on('moving', async (afterMs: number, host: string, port: number) => { - // 1 - console.log(`Moving to ${host}:${port} before ${afterMs}ms`); - - // 2 - console.log(`Pausing writing until new socket is ready and all in-flight commands are completed`); - // this.#paused = true; - - const oldSocket = this.#socket; - this.#socket = this.#initiateSocket({ - ...this.#options, - socket: { - ...this.#options?.socket, - host, - port - } - }); - - // 3 - this.#socket.once('ready', () => { - //TODO handshake...??? - console.log(`Connected to ${host}:${port}`); - - // 4 - if(!this.#queue.isWaitingForReply()) { - // 5 and 6 - console.log(`All in-flight commands completed`); - console.log(`Resume writing`) - oldSocket.destroy(); - this.#paused = false; - } - }); - - // 4 - this.#queue.events.once('waitingForReplyEmpty', () => { - console.log(`All in-flight commands completed`); - // 3 - if(this.#socket.isReady) { - // 5 and 6 - console.log(`Connected to ${host}:${port}`); - console.log(`Resume writing`) - oldSocket.destroy(); - this.#paused = false; - } - }); - - await this.#socket.connect() - }); + // this.options?.gracefulMaintenance && this.#queue.events.on('moving', async (afterMs: number, host: string, port: number) => { + // // 1 + // console.log(`Moving to ${host}:${port} before ${afterMs}ms`); + + // // 2 + // console.log(`Pausing writing until new socket is ready and all in-flight commands are completed`); + // // this.#paused = true; + + // const oldSocket = this.#socket; + // this.#socket = this.#initiateSocket({ + // ...this.#options, + // socket: { + // ...this.#options?.socket, + // host, + // port + // } + // }); + + // // 3 + // this.#socket.once('ready', () => { + // //TODO handshake...??? + // console.log(`Connected to ${host}:${port}`); + + // // 4 + // if(!this.#queue.isWaitingForReply()) { + // // 5 and 6 + // console.log(`All in-flight commands completed`); + // console.log(`Resume writing`) + // oldSocket.destroy(); + // this.#paused = false; + // } + // }); + + // // 4 + // this.#queue.events.once('waitingForReplyEmpty', () => { + // console.log(`All in-flight commands completed`); + // // 3 + // if(this.#socket.isReady) { + // // 5 and 6 + // console.log(`Connected to ${host}:${port}`); + // console.log(`Resume writing`) + // oldSocket.destroy(); + // this.#paused = false; + // } + // }); + + // await this.#socket.connect() + // }); if (options?.clientSideCache) { if (options.clientSideCache instanceof ClientSideCacheProvider) { @@ -764,39 +764,42 @@ export default class RedisClient< return commands; } - #initiateSocket(options?: RedisClientOptions): RedisSocket { - const socketInitiator = async () => { - console.log('Initiator...'); - const promises = [], - chainId = Symbol('Socket Initiator'); + async #initiateSocket(options?: RedisClientOptions): Promise { + await this.#socket.waitForReady(); + console.log('Initiator...'); + const promises = []; + const chainId = Symbol('Socket Initiator'); + + const resubscribePromise = this.#queue.resubscribe(chainId); + if (resubscribePromise) { + promises.push(resubscribePromise); + } - const resubscribePromise = this.#queue.resubscribe(chainId); - if (resubscribePromise) { - promises.push(resubscribePromise); - } + if (this.#monitorCallback) { + promises.push( + this.#queue.monitor( + this.#monitorCallback, + { + typeMapping: this._commandOptions?.typeMapping, + chainId, + asap: true + } + ) + ); + } - if (this.#monitorCallback) { - promises.push( - this.#queue.monitor( - this.#monitorCallback, - { - typeMapping: this._commandOptions?.typeMapping, - chainId, - asap: true - } - ) - ); - } + promises.push(...(await this.#handshake(chainId, true))); - promises.push(...(await this.#handshake(chainId, true))); + this.#setPingTimer(); - if (promises.length) { - this.#write(); - return Promise.all(promises); - } - }; + if (promises.length) { + this.#write(); + await Promise.all(promises); + } + } - return new RedisSocket(socketInitiator, options?.socket) + #createSocket(options?: RedisClientOptions): RedisSocket { + return new RedisSocket(options?.socket) .on('data', chunk => { try { this.#queue.decoder.write(chunk); @@ -818,8 +821,6 @@ export default class RedisClient< .on('ready', () => { console.log('Socket ready'); this.emit('ready'); - this.#setPingTimer(); - this.#maybeScheduleWrite(); }) .on('reconnecting', () => this.emit('reconnecting')) .on('drain', () => this.#maybeScheduleWrite()) @@ -932,6 +933,7 @@ export default class RedisClient< async connect() { await this._self.#socket.connect(); + await this._self.#initiateSocket(this._self.#options); return this as unknown as RedisClientType; } diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index 59afb7bea8..c8d3541d40 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -51,10 +51,7 @@ export type RedisTcpSocketOptions = RedisTcpOptions | RedisTlsOptions; export type RedisSocketOptions = RedisTcpSocketOptions | RedisIpcOptions; -export type RedisSocketInitiator = () => void | Promise; - export default class RedisSocket extends EventEmitter { - readonly #initiator; readonly #connectTimeout; readonly #reconnectStrategy; readonly #socketFactory; @@ -82,16 +79,23 @@ export default class RedisSocket extends EventEmitter { return this.#socketEpoch; } - constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) { + constructor(options?: RedisSocketOptions) { super(); - this.#initiator = initiator; this.#connectTimeout = options?.connectTimeout ?? 5000; this.#reconnectStrategy = this.#createReconnectStrategy(options); this.#socketFactory = this.#createSocketFactory(options); this.#socketTimeout = options?.socketTimeout; } + async waitForReady(): Promise { + if (this.#isReady) return + return new Promise((resolve, reject) => { + this.once('ready', resolve); + this.once('error', reject); + }); + } + #createReconnectStrategy(options?: RedisSocketOptions): ReconnectStrategyFunction { const strategy = options?.reconnectStrategy; if (strategy === false || typeof strategy === 'number') { @@ -216,14 +220,14 @@ export default class RedisSocket extends EventEmitter { this.#socket = await this.#createSocket(); this.emit('connect'); - try { - await this.#initiator(); - } catch (err) { - console.log('Initiator failed', err); - this.#socket.destroy(); - this.#socket = undefined; - throw err; - } + // try { + // await this.#initiator(); + // } catch (err) { + // console.log('Initiator failed', err); + // this.#socket.destroy(); + // this.#socket = undefined; + // throw err; + // } this.#isReady = true; this.#socketEpoch++; console.log('Socket connected, emit ready'); From efd7919513a651df3f9e2d5cc26ae87135f5ac75 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Thu, 17 Jul 2025 17:09:17 +0300 Subject: [PATCH 10/20] --wip-- [skip ci] --- .../client/enterprise-maintenance-manager.ts | 50 +++++++++++++++++ packages/client/lib/client/index.ts | 55 ++++++++++++------- 2 files changed, 84 insertions(+), 21 deletions(-) create mode 100644 packages/client/lib/client/enterprise-maintenance-manager.ts diff --git a/packages/client/lib/client/enterprise-maintenance-manager.ts b/packages/client/lib/client/enterprise-maintenance-manager.ts new file mode 100644 index 0000000000..80a4ff0a3f --- /dev/null +++ b/packages/client/lib/client/enterprise-maintenance-manager.ts @@ -0,0 +1,50 @@ +import { RedisClientOptions } from "."; +import RedisCommandsQueue from "./commands-queue"; +import RedisSocket from "./socket"; + +export default class EnterpriseMaintenanceManager { + client: any; + commandsQueue: RedisCommandsQueue; + options: RedisClientOptions; + constructor( + client: any, + commandsQueue: RedisCommandsQueue, + options: RedisClientOptions, + ) { + this.client = client; + this.commandsQueue = commandsQueue; + this.options = options; + + this.commandsQueue.events.on("moving", this.#onMoving); + } + + #onMoving = async (_afterMs: number, host: string, port: number) => { + + this.client.pause() + + const socket = new RedisSocket({ + ...this.options.socket, + host, + port + }); + await socket.connect(); + + //wait until waitingForReply is empty + await new Promise(resolve => { + if(!this.commandsQueue.isWaitingForReply()) { + resolve() + } else { + this.commandsQueue.events.once('waitingForReplyEmpty', resolve) + } + }) + + const oldSocket = this.client.socket + oldSocket.removeAllListeners(); + oldSocket.destroy(); + + this.client.socket = socket; + + this.client.resume() + }; + +} diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 1ea9b7a8e7..41f47d4816 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -460,6 +460,15 @@ export default class RedisClient< return this._self.#dirtyWatch !== undefined } + get socket() { + return this._self.#socket; + } + + set socket(socket: RedisSocket) { + this._self.#socket = socket; + this.#initiateSocket(); + } + /** * Marks the client's WATCH command as invalidated due to a topology change. * This will cause any subsequent EXEC in a transaction to fail with a WatchError. @@ -557,6 +566,7 @@ export default class RedisClient< } } + #initiateOptions(options?: RedisClientOptions): RedisClientOptions | undefined { // Convert username/password to credentialsProvider if no credentialsProvider is already in place @@ -766,6 +776,29 @@ export default class RedisClient< async #initiateSocket(options?: RedisClientOptions): Promise { await this.#socket.waitForReady(); + + this.#socket + .on('data', chunk => { + try { + this.#queue.decoder.write(chunk); + } catch (err) { + this.#queue.resetDecoder(); + this.emit('error', err); + } + }) + .on('error', err => { + this.emit('error', err); + this.#clientSideCache?.onError(); + if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) { + this.#queue.flushWaitingForReply(err); + } else { + this.#queue.flushAll(err); + } + }) + .on('reconnecting', () => this.emit('reconnecting')) + .on('drain', () => this.#maybeScheduleWrite()) + .on('end', () => this.emit('end')); + console.log('Initiator...'); const promises = []; const chainId = Symbol('Socket Initiator'); @@ -800,31 +833,11 @@ export default class RedisClient< #createSocket(options?: RedisClientOptions): RedisSocket { return new RedisSocket(options?.socket) - .on('data', chunk => { - try { - this.#queue.decoder.write(chunk); - } catch (err) { - this.#queue.resetDecoder(); - this.emit('error', err); - } - }) - .on('error', err => { - this.emit('error', err); - this.#clientSideCache?.onError(); - if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) { - this.#queue.flushWaitingForReply(err); - } else { - this.#queue.flushAll(err); - } - }) .on('connect', () => this.emit('connect')) .on('ready', () => { console.log('Socket ready'); this.emit('ready'); - }) - .on('reconnecting', () => this.emit('reconnecting')) - .on('drain', () => this.#maybeScheduleWrite()) - .on('end', () => this.emit('end')); + }); } #pingTimer?: NodeJS.Timeout; From 3c7f27eab2a740479c9ab7c40a22047940f76b43 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Fri, 18 Jul 2025 12:06:33 +0300 Subject: [PATCH 11/20] extract socket orchestration in separate class --- .../client/enterprise-maintenance-manager.ts | 68 ++++++++++++----- packages/client/lib/client/index.ts | 74 ++++--------------- packages/client/lib/client/socket.ts | 2 + 3 files changed, 67 insertions(+), 77 deletions(-) diff --git a/packages/client/lib/client/enterprise-maintenance-manager.ts b/packages/client/lib/client/enterprise-maintenance-manager.ts index 80a4ff0a3f..b15698d4f8 100644 --- a/packages/client/lib/client/enterprise-maintenance-manager.ts +++ b/packages/client/lib/client/enterprise-maintenance-manager.ts @@ -18,33 +18,67 @@ export default class EnterpriseMaintenanceManager { this.commandsQueue.events.on("moving", this.#onMoving); } - #onMoving = async (_afterMs: number, host: string, port: number) => { + // Queue + // toWrite [ C D E ] + // waitingForReply [ A B ] + // + // time: ---1-2---3-4-5-6--------------------------- + // + // 1. [EVENT] MOVING PN received + // 2. [ACTION] Pause writing ( we need to wait for new socket to connect and for all in-flight commands to complete ) + // 3. [EVENT] New socket connected + // 4. [EVENT] In-flight commands completed + // 5. [ACTION] Destroy old socket + // 6. [ACTION] Resume writing -> we are going to write to the new socket from now on + #onMoving = async ( + _afterMs: number, + host: string, + port: number, + ): Promise => { + // 1 [EVENT] MOVING PN received + console.log('[EnterpriseMaintenanceManager] Pausing client'); + // 2 [ACTION] Pause writing + this.client.pause(); - this.client.pause() - - const socket = new RedisSocket({ + console.log(`[EnterpriseMaintenanceManager] Creating new socket for ${host}:${port}`); + const newSocket = new RedisSocket({ ...this.options.socket, host, - port + port, }); - await socket.connect(); + console.log('[EnterpriseMaintenanceManager] Connecting to new socket'); + await newSocket.connect(); + // 3 [EVENT] New socket connected + console.log('[EnterpriseMaintenanceManager] New socket connected'); - //wait until waitingForReply is empty - await new Promise(resolve => { - if(!this.commandsQueue.isWaitingForReply()) { - resolve() + // Wait until waitingForReply is empty + console.log('[EnterpriseMaintenanceManager] Waiting for reply queue to empty'); + await new Promise((resolve) => { + if (!this.commandsQueue.isWaitingForReply()) { + console.log('[EnterpriseMaintenanceManager] Reply queue already empty'); + resolve(); } else { - this.commandsQueue.events.once('waitingForReplyEmpty', resolve) + console.log('[EnterpriseMaintenanceManager] Reply queue not empty, waiting for empty event'); + this.commandsQueue.events.once("waitingForReplyEmpty", () => { + console.log('[EnterpriseMaintenanceManager] Reply queue now empty'); + resolve(); + }); } - }) + }); + // 4 [EVENT] Reply queue now empty - const oldSocket = this.client.socket + // 5 [ACTION] Destroy old socket + // Switch to the new socket and clean up the old one + console.log('[EnterpriseMaintenanceManager] Switching to new socket and cleaning up old one'); + const oldSocket = this.client.socket; + this.client.socket = newSocket; oldSocket.removeAllListeners(); oldSocket.destroy(); + console.log('[EnterpriseMaintenanceManager] Old socket destroyed'); - this.client.socket = socket; - - this.client.resume() + // 6 [ACTION] Resume writing + console.log('[EnterpriseMaintenanceManager] Resuming client'); + this.client.resume(); + console.log('[EnterpriseMaintenanceManager] Socket migration complete'); }; - } diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 41f47d4816..286fd8e25b 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -20,6 +20,7 @@ import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider } import { BasicCommandParser, CommandParser } from './parser'; import SingleEntryCache from '../single-entry-cache'; import { version } from '../../package.json' +import EnterpriseMaintenanceManager from './enterprise-maintenance-manager'; export interface RedisClientOptions< M extends RedisModules = RedisModules, @@ -469,6 +470,15 @@ export default class RedisClient< this.#initiateSocket(); } + pause() { + this._self.#paused = true; + } + + resume() { + this._self.#paused = false; + this._self.#maybeScheduleWrite(); + } + /** * Marks the client's WATCH command as invalidated due to a topology change. * This will cause any subsequent EXEC in a transaction to fail with a WatchError. @@ -484,66 +494,10 @@ export default class RedisClient< this.#options = this.#initiateOptions(options); this.#queue = this.#initiateQueue(); this.#socket = this.#createSocket(this.#options); - // Queue - // toWrite [ C D E ] - // waitingForReply [ A B ] - // - // time: ---1-2---3-4-5-6--------------------------- - // - // 1. [EVENT] MOVING PN received - // 2. [ACTION] Pause writing ( we need to wait for new socket to connect and for all in-flight commands to complete ) - // 3. [EVENT] New sock connected - // 4. [EVENT] In-flight commands completed - // 5. [ACTION] Unpause writing -> we are going to write to the new socket from now on - // 6. [ACTION] Destroy old socket - // this.options?.gracefulMaintenance && this.#queue.events.on('moving', async (afterMs: number, host: string, port: number) => { - // // 1 - // console.log(`Moving to ${host}:${port} before ${afterMs}ms`); - - // // 2 - // console.log(`Pausing writing until new socket is ready and all in-flight commands are completed`); - // // this.#paused = true; - - // const oldSocket = this.#socket; - // this.#socket = this.#initiateSocket({ - // ...this.#options, - // socket: { - // ...this.#options?.socket, - // host, - // port - // } - // }); - - // // 3 - // this.#socket.once('ready', () => { - // //TODO handshake...??? - // console.log(`Connected to ${host}:${port}`); - - // // 4 - // if(!this.#queue.isWaitingForReply()) { - // // 5 and 6 - // console.log(`All in-flight commands completed`); - // console.log(`Resume writing`) - // oldSocket.destroy(); - // this.#paused = false; - // } - // }); - - // // 4 - // this.#queue.events.once('waitingForReplyEmpty', () => { - // console.log(`All in-flight commands completed`); - // // 3 - // if(this.#socket.isReady) { - // // 5 and 6 - // console.log(`Connected to ${host}:${port}`); - // console.log(`Resume writing`) - // oldSocket.destroy(); - // this.#paused = false; - // } - // }); - - // await this.#socket.connect() - // }); + + if(options?.gracefulMaintenance) { + new EnterpriseMaintenanceManager(this, this.#queue, this.#options!); + } if (options?.clientSideCache) { if (options.clientSideCache instanceof ClientSideCacheProvider) { diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index c8d3541d40..7e09985a8d 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -250,6 +250,7 @@ export default class RedisSocket extends EventEmitter { let onTimeout; if (this.#connectTimeout !== undefined) { + console.log('#connectTimeout',this.#connectTimeout) onTimeout = () => socket.destroy(new ConnectionTimeoutError()); socket.once('timeout', onTimeout); socket.setTimeout(this.#connectTimeout); @@ -266,6 +267,7 @@ export default class RedisSocket extends EventEmitter { } if (this.#socketTimeout) { + console.log('#socketTimeout',this.#socketTimeout) socket.once('timeout', () => { socket.destroy(new SocketTimeoutError(this.#socketTimeout!)); }); From 9a8f4b9ca99567cb4102dd241de11df3c3db2383 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Fri, 18 Jul 2025 14:55:27 +0300 Subject: [PATCH 12/20] refactor - remove reference to client --- .../client/enterprise-maintenance-manager.ts | 29 ++++++------------- packages/client/lib/client/index.ts | 25 +++++++--------- 2 files changed, 20 insertions(+), 34 deletions(-) diff --git a/packages/client/lib/client/enterprise-maintenance-manager.ts b/packages/client/lib/client/enterprise-maintenance-manager.ts index b15698d4f8..5fd10d3f64 100644 --- a/packages/client/lib/client/enterprise-maintenance-manager.ts +++ b/packages/client/lib/client/enterprise-maintenance-manager.ts @@ -1,24 +1,23 @@ +import EventEmitter from "events"; import { RedisClientOptions } from "."; import RedisCommandsQueue from "./commands-queue"; import RedisSocket from "./socket"; -export default class EnterpriseMaintenanceManager { - client: any; +export default class EnterpriseMaintenanceManager extends EventEmitter { commandsQueue: RedisCommandsQueue; options: RedisClientOptions; constructor( - client: any, commandsQueue: RedisCommandsQueue, options: RedisClientOptions, ) { - this.client = client; + super(); this.commandsQueue = commandsQueue; this.options = options; this.commandsQueue.events.on("moving", this.#onMoving); } - // Queue + // Queue: // toWrite [ C D E ] // waitingForReply [ A B ] // @@ -27,7 +26,7 @@ export default class EnterpriseMaintenanceManager { // 1. [EVENT] MOVING PN received // 2. [ACTION] Pause writing ( we need to wait for new socket to connect and for all in-flight commands to complete ) // 3. [EVENT] New socket connected - // 4. [EVENT] In-flight commands completed + // 4. [EVENT] WaitingForReply commands completed // 5. [ACTION] Destroy old socket // 6. [ACTION] Resume writing -> we are going to write to the new socket from now on #onMoving = async ( @@ -38,7 +37,7 @@ export default class EnterpriseMaintenanceManager { // 1 [EVENT] MOVING PN received console.log('[EnterpriseMaintenanceManager] Pausing client'); // 2 [ACTION] Pause writing - this.client.pause(); + this.emit('pause') console.log(`[EnterpriseMaintenanceManager] Creating new socket for ${host}:${port}`); const newSocket = new RedisSocket({ @@ -65,20 +64,10 @@ export default class EnterpriseMaintenanceManager { }); } }); - // 4 [EVENT] Reply queue now empty + // 4 [EVENT] WaitingForReply commands completed - // 5 [ACTION] Destroy old socket - // Switch to the new socket and clean up the old one - console.log('[EnterpriseMaintenanceManager] Switching to new socket and cleaning up old one'); - const oldSocket = this.client.socket; - this.client.socket = newSocket; - oldSocket.removeAllListeners(); - oldSocket.destroy(); - console.log('[EnterpriseMaintenanceManager] Old socket destroyed'); + // 5 + 6 + this.emit('resume', newSocket); - // 6 [ACTION] Resume writing - console.log('[EnterpriseMaintenanceManager] Resuming client'); - this.client.resume(); - console.log('[EnterpriseMaintenanceManager] Socket migration complete'); }; } diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 286fd8e25b..79338f1d24 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -461,21 +461,16 @@ export default class RedisClient< return this._self.#dirtyWatch !== undefined } - get socket() { - return this._self.#socket; - } - - set socket(socket: RedisSocket) { - this._self.#socket = socket; - this.#initiateSocket(); - } - - pause() { + #pauseForMaintenance() { this._self.#paused = true; } - resume() { + #resumeFromMaintenance(newSocket: RedisSocket) { + this._self.#socket.removeAllListeners(); + this._self.#socket.destroy(); + this._self.#socket = newSocket; this._self.#paused = false; + this._self.#initiateSocket(); this._self.#maybeScheduleWrite(); } @@ -496,7 +491,9 @@ export default class RedisClient< this.#socket = this.#createSocket(this.#options); if(options?.gracefulMaintenance) { - new EnterpriseMaintenanceManager(this, this.#queue, this.#options!); + new EnterpriseMaintenanceManager(this.#queue, this.#options!) + .on('pause', this.#pauseForMaintenance.bind(this)) + .on('resume', this.#resumeFromMaintenance.bind(this)) } if (options?.clientSideCache) { @@ -728,7 +725,7 @@ export default class RedisClient< return commands; } - async #initiateSocket(options?: RedisClientOptions): Promise { + async #initiateSocket(): Promise { await this.#socket.waitForReady(); this.#socket @@ -900,7 +897,7 @@ export default class RedisClient< async connect() { await this._self.#socket.connect(); - await this._self.#initiateSocket(this._self.#options); + await this._self.#initiateSocket(); return this as unknown as RedisClientType; } From 4c819d01c721fec723cb28e791a8f986fed962bb Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Fri, 18 Jul 2025 14:55:36 +0300 Subject: [PATCH 13/20] remove logs --- packages/client/lib/client/commands-queue.ts | 2 -- .../client/lib/client/enterprise-maintenance-manager.ts | 8 -------- packages/client/lib/client/index.ts | 2 -- packages/client/lib/client/socket.ts | 4 ---- 4 files changed, 16 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index a3799c1f6e..a8a5fa2307 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -124,7 +124,6 @@ export default class RedisCommandsQueue { //TODO: we can shave off a few cycles by not adding onPush handler at all if CSC is not used onPush: push => { if (!this.#onPush(push)) { - console.log('Push received', push.toString()); // currently only supporting "invalidate" over RESP3 push messages switch (push[0].toString()) { case "invalidate": { @@ -140,7 +139,6 @@ export default class RedisCommandsQueue { break; } case 'MOVING': { - console.log('received moving', push) const [_, afterMs, url] = push; const [host, port] = url.toString().split(':'); this.events.emit('moving', afterMs, host, Number(port)) diff --git a/packages/client/lib/client/enterprise-maintenance-manager.ts b/packages/client/lib/client/enterprise-maintenance-manager.ts index 5fd10d3f64..77f18b6049 100644 --- a/packages/client/lib/client/enterprise-maintenance-manager.ts +++ b/packages/client/lib/client/enterprise-maintenance-manager.ts @@ -35,31 +35,23 @@ export default class EnterpriseMaintenanceManager extends EventEmitter { port: number, ): Promise => { // 1 [EVENT] MOVING PN received - console.log('[EnterpriseMaintenanceManager] Pausing client'); // 2 [ACTION] Pause writing this.emit('pause') - console.log(`[EnterpriseMaintenanceManager] Creating new socket for ${host}:${port}`); const newSocket = new RedisSocket({ ...this.options.socket, host, port, }); - console.log('[EnterpriseMaintenanceManager] Connecting to new socket'); await newSocket.connect(); // 3 [EVENT] New socket connected - console.log('[EnterpriseMaintenanceManager] New socket connected'); // Wait until waitingForReply is empty - console.log('[EnterpriseMaintenanceManager] Waiting for reply queue to empty'); await new Promise((resolve) => { if (!this.commandsQueue.isWaitingForReply()) { - console.log('[EnterpriseMaintenanceManager] Reply queue already empty'); resolve(); } else { - console.log('[EnterpriseMaintenanceManager] Reply queue not empty, waiting for empty event'); this.commandsQueue.events.once("waitingForReplyEmpty", () => { - console.log('[EnterpriseMaintenanceManager] Reply queue now empty'); resolve(); }); } diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 79338f1d24..f7dda94f30 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -750,7 +750,6 @@ export default class RedisClient< .on('drain', () => this.#maybeScheduleWrite()) .on('end', () => this.emit('end')); - console.log('Initiator...'); const promises = []; const chainId = Symbol('Socket Initiator'); @@ -786,7 +785,6 @@ export default class RedisClient< return new RedisSocket(options?.socket) .on('connect', () => this.emit('connect')) .on('ready', () => { - console.log('Socket ready'); this.emit('ready'); }); } diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index 7e09985a8d..0a61feef69 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -213,7 +213,6 @@ export default class RedisSocket extends EventEmitter { } async #connect(): Promise { - console.log('Connecting...'); let retries = 0; do { try { @@ -230,7 +229,6 @@ export default class RedisSocket extends EventEmitter { // } this.#isReady = true; this.#socketEpoch++; - console.log('Socket connected, emit ready'); this.emit('ready'); } catch (err) { const retryIn = this.#shouldReconnect(retries++, err as Error); @@ -250,7 +248,6 @@ export default class RedisSocket extends EventEmitter { let onTimeout; if (this.#connectTimeout !== undefined) { - console.log('#connectTimeout',this.#connectTimeout) onTimeout = () => socket.destroy(new ConnectionTimeoutError()); socket.once('timeout', onTimeout); socket.setTimeout(this.#connectTimeout); @@ -267,7 +264,6 @@ export default class RedisSocket extends EventEmitter { } if (this.#socketTimeout) { - console.log('#socketTimeout',this.#socketTimeout) socket.once('timeout', () => { socket.destroy(new SocketTimeoutError(this.#socketTimeout!)); }); From 3ba02826919e440edeb2ee978943058ca52a9165 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Fri, 18 Jul 2025 15:12:59 +0300 Subject: [PATCH 14/20] cosmetics --- packages/client/lib/client/commands-queue.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index a8a5fa2307..bafe5b64cc 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -55,7 +55,7 @@ export default class RedisCommandsQueue { readonly #respVersion; readonly #maxLength; readonly #toWrite = new DoublyLinkedList(); - readonly #waitingForReply: EmptyAwareSinglyLinkedList; + readonly #waitingForReply = new EmptyAwareSinglyLinkedList(); readonly #onShardedChannelMoved; #chainInExecution: symbol | undefined; readonly decoder; @@ -77,7 +77,6 @@ export default class RedisCommandsQueue { this.#maxLength = maxLength; this.#onShardedChannelMoved = onShardedChannelMoved; this.decoder = this.#initiateDecoder(); - this.#waitingForReply = new EmptyAwareSinglyLinkedList() this.#waitingForReply.events.on('empty', this.events.emit.bind(this.events, 'waitingForReplyEmpty')) } From dbaa8ade2785e4691f4800b88b6986c42b4cde84 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Fri, 18 Jul 2025 15:17:08 +0300 Subject: [PATCH 15/20] remove unused code --- packages/client/lib/client/socket.ts | 9 --------- 1 file changed, 9 deletions(-) diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index 0a61feef69..5d28f59ace 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -218,15 +218,6 @@ export default class RedisSocket extends EventEmitter { try { this.#socket = await this.#createSocket(); this.emit('connect'); - - // try { - // await this.#initiator(); - // } catch (err) { - // console.log('Initiator failed', err); - // this.#socket.destroy(); - // this.#socket = undefined; - // throw err; - // } this.#isReady = true; this.#socketEpoch++; this.emit('ready'); From 24393f7177300482883660f734e24f4b71447f4b Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Fri, 18 Jul 2025 15:24:19 +0300 Subject: [PATCH 16/20] refactor - extract utility method to wait for in-flight commands to complete --- packages/client/lib/client/commands-queue.ts | 12 +++++++++--- .../client/enterprise-maintenance-manager.ts | 17 ++++------------- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index bafe5b64cc..ca17dcfe9b 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -77,7 +77,6 @@ export default class RedisCommandsQueue { this.#maxLength = maxLength; this.#onShardedChannelMoved = onShardedChannelMoved; this.decoder = this.#initiateDecoder(); - this.#waitingForReply.events.on('empty', this.events.emit.bind(this.events, 'waitingForReplyEmpty')) } #onReply(reply: ReplyUnion) { @@ -154,8 +153,15 @@ export default class RedisCommandsQueue { this.#invalidateCallback = callback; } - isWaitingForReply(): boolean { - return this.#waitingForReply.length > 0; + async waitForInflightCommandsToComplete(): Promise { + // In-flight commands already completed + if(this.#waitingForReply.length === 0) { + return + }; + // Otherwise wait for in-flight commands to fire `empty` event + return new Promise(resolve => { + this.#waitingForReply.events.on('empty', resolve) + }); } addCommand( diff --git a/packages/client/lib/client/enterprise-maintenance-manager.ts b/packages/client/lib/client/enterprise-maintenance-manager.ts index 77f18b6049..a69f41a13e 100644 --- a/packages/client/lib/client/enterprise-maintenance-manager.ts +++ b/packages/client/lib/client/enterprise-maintenance-manager.ts @@ -19,14 +19,14 @@ export default class EnterpriseMaintenanceManager extends EventEmitter { // Queue: // toWrite [ C D E ] - // waitingForReply [ A B ] + // waitingForReply [ A B ] - aka In-flight commands // // time: ---1-2---3-4-5-6--------------------------- // // 1. [EVENT] MOVING PN received // 2. [ACTION] Pause writing ( we need to wait for new socket to connect and for all in-flight commands to complete ) // 3. [EVENT] New socket connected - // 4. [EVENT] WaitingForReply commands completed + // 4. [EVENT] In-flight commands completed // 5. [ACTION] Destroy old socket // 6. [ACTION] Resume writing -> we are going to write to the new socket from now on #onMoving = async ( @@ -46,17 +46,8 @@ export default class EnterpriseMaintenanceManager extends EventEmitter { await newSocket.connect(); // 3 [EVENT] New socket connected - // Wait until waitingForReply is empty - await new Promise((resolve) => { - if (!this.commandsQueue.isWaitingForReply()) { - resolve(); - } else { - this.commandsQueue.events.once("waitingForReplyEmpty", () => { - resolve(); - }); - } - }); - // 4 [EVENT] WaitingForReply commands completed + await this.commandsQueue.waitForInflightCommandsToComplete(); + // 4 [EVENT] In-flight commands completed // 5 + 6 this.emit('resume', newSocket); From 721b99bc029ad8dd1f31997503ad3b8575102755 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Thu, 24 Jul 2025 13:11:53 +0300 Subject: [PATCH 17/20] rename flag --- packages/client/lib/client/index.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index f7dda94f30..917e500bc5 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -422,7 +422,7 @@ export default class RedisClient< // When true, prevents new commands from being written while waiting for: // 1. New socket to be ready after maintenance redirect // 2. In-flight commands on the old socket to complete - #paused = false; + #pausedForMaintenance = false; get clientSideCache() { return this._self.#clientSideCache; @@ -462,14 +462,14 @@ export default class RedisClient< } #pauseForMaintenance() { - this._self.#paused = true; + this._self.#pausedForMaintenance = true; } #resumeFromMaintenance(newSocket: RedisSocket) { this._self.#socket.removeAllListeners(); this._self.#socket.destroy(); this._self.#socket = newSocket; - this._self.#paused = false; + this._self.#pausedForMaintenance = false; this._self.#initiateSocket(); this._self.#maybeScheduleWrite(); } @@ -1128,7 +1128,7 @@ export default class RedisClient< } #write() { - if(this.#paused) { + if(this.#pausedForMaintenance) { return } this.#socket.write(this.#queue.commandsToWrite()); From 6717cb65322310f0a8cfa75f044b23142edfebee Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Fri, 25 Jul 2025 16:45:02 +0300 Subject: [PATCH 18/20] implement timeout relaxation --- packages/client/lib/client/commands-queue.ts | 77 +++++++++++++++++-- .../client/enterprise-maintenance-manager.ts | 48 ++++++++---- packages/client/lib/client/index.ts | 11 +-- packages/client/lib/client/linked-list.ts | 9 +++ packages/client/lib/client/socket.ts | 19 ++++- packages/client/lib/errors.ts | 7 ++ 6 files changed, 143 insertions(+), 28 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index ca17dcfe9b..9cc454791d 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -3,9 +3,10 @@ import encodeCommand from '../RESP/encoder'; import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder'; import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types'; import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub'; -import { AbortError, ErrorReply, TimeoutError } from '../errors'; +import { AbortError, ErrorReply, TimeoutDuringMaintanance, TimeoutError } from '../errors'; import { MonitorCallback } from '.'; import EventEmitter from 'events'; +import assert from 'assert'; export interface CommandOptions { chainId?: symbol; @@ -31,6 +32,7 @@ export interface CommandToWrite extends CommandWaitingForReply { timeout: { signal: AbortSignal; listener: () => unknown; + originalTimeout: number | undefined; } | undefined; } @@ -62,6 +64,50 @@ export default class RedisCommandsQueue { readonly #pubSub = new PubSub(); readonly events = new EventEmitter(); + // If this value is set, we are in a maintenance mode. + // This means any existing commands should have their timeout + // overwritten to the new timeout. And all new commands should + // have their timeout set as the new timeout. + #maintenanceCommandTimeout: number | undefined + + setMaintenanceCommandTimeout(ms: number | undefined) { + // Prevent possible api misuse + if (this.#maintenanceCommandTimeout === ms) return; + + this.#maintenanceCommandTimeout = ms; + + // Overwrite timeouts of all eligible toWrite commands + this.#toWrite.forEachNode(node => { + const command = node.value; + + // If the command didnt have a timeout, skip it + if (!command.timeout) return; + + // Remove existing timeout listener + RedisCommandsQueue.#removeTimeoutListener(command) + + //TODO see if this is needed + // // Keep a flag to know if we were in maintenance at this point in time. + // // To be used in the timeout listener, which needs to know which exact error to use. + // const wasMaintenance = !!this.#maintenanceCommandTimeout + + // Determine newTimeout + const newTimeout = this.#maintenanceCommandTimeout ?? command.timeout?.originalTimeout; + assert(newTimeout !== undefined, 'Trying to reset timeout to `undefined`') + + const signal = AbortSignal.timeout(newTimeout); + command.timeout = { + signal, + listener: () => { + this.#toWrite.remove(node); + command.reject(this.#maintenanceCommandTimeout ? new TimeoutDuringMaintanance(newTimeout) : new TimeoutError()); + }, + originalTimeout: command.timeout.originalTimeout + }; + signal.addEventListener('abort', command.timeout.listener, { once: true }); + }); + } + get isPubSubActive() { return this.#pubSub.isActive; } @@ -139,7 +185,16 @@ export default class RedisCommandsQueue { case 'MOVING': { const [_, afterMs, url] = push; const [host, port] = url.toString().split(':'); - this.events.emit('moving', afterMs, host, Number(port)) + this.events.emit('moving', afterMs, host, Number(port)); + break; + } + case 'MIGRATING': { + console.log('GOT MIGRATING', push.map(p => p.toString())); + this.events.emit('migrating'); + break; + } + case 'MIGRATED': { + this.events.emit('migrated'); break; } } @@ -187,15 +242,25 @@ export default class RedisCommandsQueue { typeMapping: options?.typeMapping }; - const timeout = options?.timeout; + // If #commandTimeout was explicitly set, this + // means we are in maintenance mode and should + // use it instead of the timeout provided by the command + const timeout = this.#maintenanceCommandTimeout || options?.timeout if (timeout) { + + //TODO see if this is needed + // // Keep a flag to know if we were in maintenance at this point in time. + // // To be used in the timeout listener, which needs to know which exact error to use. + // const wasMaintenance = !!this.#maintenanceCommandTimeout + const signal = AbortSignal.timeout(timeout); value.timeout = { signal, listener: () => { this.#toWrite.remove(node); - value.reject(new TimeoutError()); - } + value.reject(this.#maintenanceCommandTimeout ? new TimeoutDuringMaintanance(timeout) : new TimeoutError()); + }, + originalTimeout: options?.timeout }; signal.addEventListener('abort', value.timeout.listener, { once: true }); } @@ -451,7 +516,7 @@ export default class RedisCommandsQueue { } static #removeTimeoutListener(command: CommandToWrite) { - command.timeout!.signal.removeEventListener('abort', command.timeout!.listener); + command.timeout?.signal.removeEventListener('abort', command.timeout!.listener); } static #flushToWrite(toBeSent: CommandToWrite, err: Error) { diff --git a/packages/client/lib/client/enterprise-maintenance-manager.ts b/packages/client/lib/client/enterprise-maintenance-manager.ts index a69f41a13e..c2a9c46baa 100644 --- a/packages/client/lib/client/enterprise-maintenance-manager.ts +++ b/packages/client/lib/client/enterprise-maintenance-manager.ts @@ -4,17 +4,16 @@ import RedisCommandsQueue from "./commands-queue"; import RedisSocket from "./socket"; export default class EnterpriseMaintenanceManager extends EventEmitter { - commandsQueue: RedisCommandsQueue; - options: RedisClientOptions; - constructor( - commandsQueue: RedisCommandsQueue, - options: RedisClientOptions, - ) { + #commandsQueue: RedisCommandsQueue; + #options: RedisClientOptions; + constructor(commandsQueue: RedisCommandsQueue, options: RedisClientOptions) { super(); - this.commandsQueue = commandsQueue; - this.options = options; + this.#commandsQueue = commandsQueue; + this.#options = options; - this.commandsQueue.events.on("moving", this.#onMoving); + this.#commandsQueue.events.on("moving", this.#onMoving); + this.#commandsQueue.events.on("migrating", this.#onMigrating); + this.#commandsQueue.events.on("migrated", this.#onMigrated); } // Queue: @@ -36,21 +35,44 @@ export default class EnterpriseMaintenanceManager extends EventEmitter { ): Promise => { // 1 [EVENT] MOVING PN received // 2 [ACTION] Pause writing - this.emit('pause') + this.emit("pause"); const newSocket = new RedisSocket({ - ...this.options.socket, + ...this.#options.socket, host, port, }); + //todo + newSocket.setMaintenanceTimeout(); await newSocket.connect(); // 3 [EVENT] New socket connected - await this.commandsQueue.waitForInflightCommandsToComplete(); + await this.#commandsQueue.waitForInflightCommandsToComplete(); // 4 [EVENT] In-flight commands completed // 5 + 6 - this.emit('resume', newSocket); + this.emit("resume", newSocket); + }; + #onMigrating = async () => { + this.#commandsQueue.setMaintenanceCommandTimeout(this.#getCommandTimeout()); + this.emit("maintenance", this.#getSocketTimeout()); }; + + #onMigrated = async () => { + this.#commandsQueue.setMaintenanceCommandTimeout(undefined); + this.emit("maintenance", undefined); + } + + #getSocketTimeout(): number | undefined { + return this.#options.gracefulMaintenance?.handleTimeouts === "error" + ? this.#options.socket?.socketTimeout + : this.#options.gracefulMaintenance?.handleTimeouts; + } + + #getCommandTimeout(): number | undefined { + return this.#options.gracefulMaintenance?.handleTimeouts === "error" + ? this.#options.commandOptions?.timeout + : this.#options.gracefulMaintenance?.handleTimeouts; + } } diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 917e500bc5..58e77feb1a 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -179,9 +179,9 @@ export interface RedisClientOptions< */ handleFailedCommands: 'exception' | 'retry', /** - * Specify whether we should throw a MaintenanceTimeout exception or provide more relaxed timeout, in order to minimize command timeouts during maintenance. + * Specify whether we should throw a TimeoutDuringMaintanance exception or provide more relaxed timeout, in order to minimize command timeouts during maintenance. */ - handleTimeouts: 'exception' | number, + handleTimeouts: 'error' | number, } } @@ -461,10 +461,6 @@ export default class RedisClient< return this._self.#dirtyWatch !== undefined } - #pauseForMaintenance() { - this._self.#pausedForMaintenance = true; - } - #resumeFromMaintenance(newSocket: RedisSocket) { this._self.#socket.removeAllListeners(); this._self.#socket.destroy(); @@ -492,8 +488,9 @@ export default class RedisClient< if(options?.gracefulMaintenance) { new EnterpriseMaintenanceManager(this.#queue, this.#options!) - .on('pause', this.#pauseForMaintenance.bind(this)) + .on('pause', () => this._self.#pausedForMaintenance = true ) .on('resume', this.#resumeFromMaintenance.bind(this)) + .on('maintenance', (mtm: number | undefined) => this._self.#socket.setMaintenanceTimeout(mtm)) } if (options?.clientSideCache) { diff --git a/packages/client/lib/client/linked-list.ts b/packages/client/lib/client/linked-list.ts index 6db71fe529..1fc0458cfe 100644 --- a/packages/client/lib/client/linked-list.ts +++ b/packages/client/lib/client/linked-list.ts @@ -111,6 +111,15 @@ export class DoublyLinkedList { node = node.next; } } + + forEachNode(fn: (node: DoublyLinkedNode) => void) { + let node = this.#head; + while(node) { + fn(node); + node = node.next; + } + } + } export interface SinglyLinkedNode { diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index 5d28f59ace..91d30abaa5 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -1,7 +1,7 @@ import { EventEmitter, once } from 'node:events'; import net from 'node:net'; import tls from 'node:tls'; -import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, ReconnectStrategyError, SocketTimeoutError } from '../errors'; +import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, ReconnectStrategyError, SocketTimeoutError, TimeoutDuringMaintanance } from '../errors'; import { setTimeout } from 'node:timers/promises'; import { RedisArgument } from '../RESP/types'; @@ -57,6 +57,8 @@ export default class RedisSocket extends EventEmitter { readonly #socketFactory; readonly #socketTimeout; + #maintenanceTimeout: number | undefined; + #socket?: net.Socket | tls.TLSSocket; #isOpen = false; @@ -234,6 +236,16 @@ export default class RedisSocket extends EventEmitter { } while (this.#isOpen && !this.#isReady); } + setMaintenanceTimeout(ms?: number) { + this.#maintenanceTimeout = ms; + + if(ms !== undefined) { + this.#socket?.setTimeout(ms); + } else if (this.#socketTimeout !== undefined) { + this.#socket?.setTimeout(this.#socketTimeout); + } + } + async #createSocket(): Promise { const socket = this.#socketFactory.create(); @@ -256,7 +268,10 @@ export default class RedisSocket extends EventEmitter { if (this.#socketTimeout) { socket.once('timeout', () => { - socket.destroy(new SocketTimeoutError(this.#socketTimeout!)); + const error = this.#maintenanceTimeout + ? new TimeoutDuringMaintanance(this.#socketTimeout!) + : new SocketTimeoutError(this.#socketTimeout!) + socket.destroy(error); }); socket.setTimeout(this.#socketTimeout); } diff --git a/packages/client/lib/errors.ts b/packages/client/lib/errors.ts index db37ec1a9b..21e748807d 100644 --- a/packages/client/lib/errors.ts +++ b/packages/client/lib/errors.ts @@ -76,6 +76,13 @@ export class BlobError extends ErrorReply {} export class TimeoutError extends Error {} +export class TimeoutDuringMaintanance extends Error { + constructor(timeout: number) { + super(`Socket timeout during maintenance. Expecting data, but didn't receive any in ${timeout}ms.`); + } +} + + export class MultiErrorReply extends ErrorReply { replies: Array; errorIndexes: Array; From f24bc837e18a9ae2d693d199d9617f1c78b78efa Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Mon, 28 Jul 2025 13:13:49 +0300 Subject: [PATCH 19/20] add failover pns --- packages/client/lib/client/commands-queue.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 9cc454791d..1ad008e1ed 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -188,12 +188,13 @@ export default class RedisCommandsQueue { this.events.emit('moving', afterMs, host, Number(port)); break; } - case 'MIGRATING': { - console.log('GOT MIGRATING', push.map(p => p.toString())); + case 'MIGRATING': + case 'FAILING_OVER': { this.events.emit('migrating'); break; } - case 'MIGRATED': { + case 'MIGRATED': + case 'FAILED_OVER': { this.events.emit('migrated'); break; } From fb0e62cd0a15475e9448f8ab97ca7949700f878c Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Mon, 28 Jul 2025 14:43:53 +0300 Subject: [PATCH 20/20] refactor: extract push handlers out of the queue --- packages/client/lib/client/commands-queue.ts | 52 +++++-------------- .../client/enterprise-maintenance-manager.ts | 28 ++++++++-- packages/client/lib/client/index.ts | 14 ++++- 3 files changed, 51 insertions(+), 43 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 1ad008e1ed..e6e72c0984 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -5,7 +5,6 @@ import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/ty import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub'; import { AbortError, ErrorReply, TimeoutDuringMaintanance, TimeoutError } from '../errors'; import { MonitorCallback } from '.'; -import EventEmitter from 'events'; import assert from 'assert'; export interface CommandOptions { @@ -53,6 +52,13 @@ const RESP2_PUSH_TYPE_MAPPING = { [RESP_TYPES.SIMPLE_STRING]: Buffer }; +// Try to handle a push notification. Return whether you +// successfully consumed the notification or not. This is +// important in order for the queue to be able to pass the +// notification to another handler if the current one did not +// succeed. +type PushHandler = (pushItems: Array) => boolean; + export default class RedisCommandsQueue { readonly #respVersion; readonly #maxLength; @@ -62,7 +68,8 @@ export default class RedisCommandsQueue { #chainInExecution: symbol | undefined; readonly decoder; readonly #pubSub = new PubSub(); - readonly events = new EventEmitter(); + + #pushHandlers: PushHandler[] = [this.#onPush.bind(this)]; // If this value is set, we are in a maintenance mode. // This means any existing commands should have their timeout @@ -112,8 +119,6 @@ export default class RedisCommandsQueue { return this.#pubSub.isActive; } - #invalidateCallback?: (key: RedisArgument | null) => unknown; - constructor( respVersion: RespVersions, maxLength: number | null | undefined, @@ -155,6 +160,7 @@ export default class RedisCommandsQueue { } return true; } + return false } #getTypeMapping() { @@ -167,46 +173,16 @@ export default class RedisCommandsQueue { onErrorReply: err => this.#onErrorReply(err), //TODO: we can shave off a few cycles by not adding onPush handler at all if CSC is not used onPush: push => { - if (!this.#onPush(push)) { - // currently only supporting "invalidate" over RESP3 push messages - switch (push[0].toString()) { - case "invalidate": { - if (this.#invalidateCallback) { - if (push[1] !== null) { - for (const key of push[1]) { - this.#invalidateCallback(key); - } - } else { - this.#invalidateCallback(null); - } - } - break; - } - case 'MOVING': { - const [_, afterMs, url] = push; - const [host, port] = url.toString().split(':'); - this.events.emit('moving', afterMs, host, Number(port)); - break; - } - case 'MIGRATING': - case 'FAILING_OVER': { - this.events.emit('migrating'); - break; - } - case 'MIGRATED': - case 'FAILED_OVER': { - this.events.emit('migrated'); - break; - } - } + for(const pushHandler of this.#pushHandlers) { + if(pushHandler(push)) return } }, getTypeMapping: () => this.#getTypeMapping() }); } - setInvalidateCallback(callback?: (key: RedisArgument | null) => unknown) { - this.#invalidateCallback = callback; + addPushHandler(handler: PushHandler): void { + this.#pushHandlers.push(handler); } async waitForInflightCommandsToComplete(): Promise { diff --git a/packages/client/lib/client/enterprise-maintenance-manager.ts b/packages/client/lib/client/enterprise-maintenance-manager.ts index c2a9c46baa..855eb42ca8 100644 --- a/packages/client/lib/client/enterprise-maintenance-manager.ts +++ b/packages/client/lib/client/enterprise-maintenance-manager.ts @@ -11,11 +11,31 @@ export default class EnterpriseMaintenanceManager extends EventEmitter { this.#commandsQueue = commandsQueue; this.#options = options; - this.#commandsQueue.events.on("moving", this.#onMoving); - this.#commandsQueue.events.on("migrating", this.#onMigrating); - this.#commandsQueue.events.on("migrated", this.#onMigrated); + this.#commandsQueue.addPushHandler(this.#onPush); } + #onPush = (push: Array): boolean => { + switch (push[0].toString()) { + case "MOVING": { + const [_, afterMs, url] = push; + const [host, port] = url.toString().split(":"); + this.#onMoving(afterMs, host, Number(port)); + return true; + } + case "MIGRATING": + case "FAILING_OVER": { + this.#onMigrating(); + return true; + } + case "MIGRATED": + case "FAILED_OVER": { + this.#onMigrated(); + return true; + } + } + return false + }; + // Queue: // toWrite [ C D E ] // waitingForReply [ A B ] - aka In-flight commands @@ -62,7 +82,7 @@ export default class EnterpriseMaintenanceManager extends EventEmitter { #onMigrated = async () => { this.#commandsQueue.setMaintenanceCommandTimeout(undefined); this.emit("maintenance", undefined); - } + }; #getSocketTimeout(): number | undefined { return this.#options.gracefulMaintenance?.handleTimeouts === "error" diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 58e77feb1a..52317469d0 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -500,7 +500,19 @@ export default class RedisClient< const cscConfig = options.clientSideCache; this.#clientSideCache = new BasicClientSideCache(cscConfig); } - this.#queue.setInvalidateCallback(this.#clientSideCache.invalidate.bind(this.#clientSideCache)); + this.#queue.addPushHandler((push: Array): boolean => { + if (push[0].toString() !== 'invalidate') return false; + + if (push[1] !== null) { + for (const key of push[1]) { + this.#clientSideCache?.invalidate(key) + } + } else { + this.#clientSideCache?.invalidate(null) + } + + return true + }); } }