diff --git a/langchain-core/src/callbacks/manager.ts b/langchain-core/src/callbacks/manager.ts index 1a96fe1729c5..b80770fc1dd6 100644 --- a/langchain-core/src/callbacks/manager.ts +++ b/langchain-core/src/callbacks/manager.ts @@ -61,13 +61,6 @@ export interface BaseCallbackConfig { * Tags are passed to all callbacks, metadata is passed to handle*Start callbacks. */ callbacks?: Callbacks; - - /** - * Runtime values for attributes previously made configurable on this Runnable, - * or sub-Runnables. - */ - // eslint-disable-next-line @typescript-eslint/no-explicit-any - configurable?: Record; } export function parseCallbackConfigArg( diff --git a/langchain-core/src/runnables/base.ts b/langchain-core/src/runnables/base.ts index e1ef667323ee..79557fb69181 100644 --- a/langchain-core/src/runnables/base.ts +++ b/langchain-core/src/runnables/base.ts @@ -3,7 +3,6 @@ import pRetry from "p-retry"; import { CallbackManager, CallbackManagerForChainRun, - BaseCallbackConfig, } from "../callbacks/manager.js"; import { LogStreamCallbackHandler, @@ -59,12 +58,6 @@ export interface RunnableInterface< batchOptions?: RunnableBatchOptions ): Promise<(RunOutput | Error)[]>; - batch( - inputs: RunInput[], - options?: Partial | Partial[], - batchOptions?: RunnableBatchOptions - ): Promise<(RunOutput | Error)[]>; - stream( input: RunInput, options?: Partial @@ -433,15 +426,8 @@ export abstract class Runnable< let finalOutputSupported = true; const callbackManager_ = await getCallbackMangerForConfig(options); - let runManager: CallbackManagerForChainRun | undefined; - const serializedRepresentation = this.toJSON(); - async function* wrapInputForTracing() { - for await (const chunk of inputGenerator) { - if (!runManager) { - // Start the run manager AFTER the iterator starts to preserve - // tracing order - runManager = await callbackManager_?.handleChainStart( - serializedRepresentation, + const runManager = await callbackManager_?.handleChainStart( + this.toJSON(), { input: "" }, undefined, options?.runType, @@ -449,14 +435,15 @@ export abstract class Runnable< undefined, options?.runName ); - } + async function* wrapInputForTracing() { + for await (const chunk of inputGenerator) { if (finalInputSupported) { if (finalInput === undefined) { finalInput = chunk; } else { try { // eslint-disable-next-line @typescript-eslint/no-explicit-any - finalInput = (finalInput as any).concat(chunk); + finalInput = concat(finalInput, chunk as any); } catch { finalInput = undefined; finalInputSupported = false; @@ -482,7 +469,7 @@ export abstract class Runnable< } else { try { // eslint-disable-next-line @typescript-eslint/no-explicit-any - finalOutput = (finalOutput as any).concat(chunk); + finalOutput = concat(finalOutput, chunk as any); } catch { finalOutput = undefined; finalOutputSupported = false; @@ -507,7 +494,8 @@ export abstract class Runnable< _patchConfig( config: Partial = {}, - callbackManager: CallbackManager | undefined = undefined + callbackManager: CallbackManager | undefined = undefined, + recursionLimit: number | undefined = undefined ): Partial { const newConfig = { ...config }; if (callbackManager !== undefined) { @@ -518,6 +506,9 @@ export abstract class Runnable< delete newConfig.runName; return { ...newConfig, callbacks: callbackManager }; } + if (recursionLimit !== undefined) { + newConfig.recursionLimit = recursionLimit; + } return newConfig; } @@ -556,7 +547,7 @@ export abstract class Runnable< // Make a best effort to gather, for any type that supports concat. // This method should throw an error if gathering fails. // eslint-disable-next-line @typescript-eslint/no-explicit-any - finalChunk = (finalChunk as any).concat(chunk); + finalChunk = concat(finalChunk, chunk as any); } } yield* this._streamIterator(finalChunk, options); @@ -670,7 +661,7 @@ export abstract class Runnable< export type RunnableBindingArgs< RunInput, RunOutput, - CallOptions extends RunnableConfig + CallOptions extends RunnableConfig = RunnableConfig > = { bound: Runnable; kwargs?: Partial; @@ -684,7 +675,7 @@ export type RunnableBindingArgs< export class RunnableBinding< RunInput, RunOutput, - CallOptions extends RunnableConfig + CallOptions extends RunnableConfig = RunnableConfig > extends Runnable { static lc_name() { return "RunnableBinding"; @@ -892,7 +883,7 @@ export class RunnableBinding< export class RunnableEach< RunInputItem, RunOutputItem, - CallOptions extends BaseCallbackConfig + CallOptions extends RunnableConfig > extends Runnable { static lc_name() { return "RunnableEach"; @@ -1360,7 +1351,7 @@ export class RunnableSequence< } else { try { // eslint-disable-next-line @typescript-eslint/no-explicit-any - finalOutput = (finalOutput as any).concat(chunk); + finalOutput = concat(finalOutput, chunk as any); } catch (e) { finalOutput = undefined; concatSupported = false; @@ -1473,7 +1464,7 @@ export class RunnableMap< async invoke( input: RunInput, - options?: Partial + options?: Partial ): Promise { const callbackManager_ = await getCallbackMangerForConfig(options); const runManager = await callbackManager_?.handleChainStart( @@ -1537,14 +1528,21 @@ export class RunnableLambda extends Runnable< async _invoke( input: RunInput, - config?: Partial, + config?: Partial, runManager?: CallbackManagerForChainRun ) { let output = await this.func(input, { config }); if (output && Runnable.isRunnable(output)) { + if (config?.recursionLimit === 0) { + throw new Error("Recursion limit reached."); + } output = await output.invoke( input, - this._patchConfig(config, runManager?.getChild()) + this._patchConfig( + config, + runManager?.getChild(), + (config?.recursionLimit ?? 25) - 1 + ) ); } return output; @@ -1552,7 +1550,7 @@ export class RunnableLambda extends Runnable< async invoke( input: RunInput, - options?: Partial + options?: Partial ): Promise { return this._callWithConfig(this._invoke, input, options); } @@ -1597,7 +1595,7 @@ export class RunnableWithFallbacks extends Runnable< async invoke( input: RunInput, - options?: Partial + options?: Partial ): Promise { const callbackManager_ = await CallbackManager.configure( options?.callbacks, @@ -1639,25 +1637,25 @@ export class RunnableWithFallbacks extends Runnable< async batch( inputs: RunInput[], - options?: Partial | Partial[], + options?: Partial | Partial[], batchOptions?: RunnableBatchOptions & { returnExceptions?: false } ): Promise; async batch( inputs: RunInput[], - options?: Partial | Partial[], + options?: Partial | Partial[], batchOptions?: RunnableBatchOptions & { returnExceptions: true } ): Promise<(RunOutput | Error)[]>; async batch( inputs: RunInput[], - options?: Partial | Partial[], + options?: Partial | Partial[], batchOptions?: RunnableBatchOptions ): Promise<(RunOutput | Error)[]>; async batch( inputs: RunInput[], - options?: Partial | Partial[], + options?: Partial | Partial[], batchOptions?: RunnableBatchOptions ): Promise<(RunOutput | Error)[]> { if (batchOptions?.returnExceptions) { diff --git a/langchain-core/src/runnables/config.ts b/langchain-core/src/runnables/config.ts index e604071a7b09..ec616e708f8f 100644 --- a/langchain-core/src/runnables/config.ts +++ b/langchain-core/src/runnables/config.ts @@ -3,7 +3,19 @@ import { CallbackManager, } from "../callbacks/manager.js"; -export type RunnableConfig = BaseCallbackConfig; +export interface RunnableConfig extends BaseCallbackConfig { + /** + * Runtime values for attributes previously made configurable on this Runnable, + * or sub-Runnables. + */ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + configurable?: Record; + + /** + * Maximum number of times a call can recurse. If not provided, defaults to 25. + */ + recursionLimit?: number; +} export async function getCallbackMangerForConfig(config?: RunnableConfig) { return CallbackManager.configure( @@ -28,6 +40,8 @@ export function mergeConfigs( copy[key] = { ...copy[key], ...options[key] }; } else if (key === "tags") { copy[key] = (copy[key] ?? []).concat(options[key] ?? []); + } else if (key === "configurable") { + copy[key] = { ...copy[key], ...options[key] }; } else if (key === "callbacks") { const baseCallbacks = copy.callbacks; const providedCallbacks = options.callbacks ?? config.callbacks; diff --git a/langchain-core/src/runnables/history.ts b/langchain-core/src/runnables/history.ts index 29e65802eb79..bf4c00862c36 100644 --- a/langchain-core/src/runnables/history.ts +++ b/langchain-core/src/runnables/history.ts @@ -1,4 +1,3 @@ -import { BaseCallbackConfig } from "../callbacks/manager.js"; import { BaseChatMessageHistory, BaseListChatMessageHistory, @@ -28,10 +27,7 @@ type GetSessionHistoryCallable = ( | BaseListChatMessageHistory; export interface RunnableWithMessageHistoryInputs - extends Omit< - RunnableBindingArgs, - "bound" | "config" - > { + extends Omit, "bound" | "config"> { runnable: Runnable; getMessageHistory: GetSessionHistoryCallable; inputMessagesKey?: string; @@ -43,7 +39,7 @@ export interface RunnableWithMessageHistoryInputs export class RunnableWithMessageHistory< RunInput, RunOutput -> extends RunnableBinding { +> extends RunnableBinding { runnable: Runnable; inputMessagesKey?: string; @@ -151,7 +147,7 @@ export class RunnableWithMessageHistory< return returnType; } - async _exitHistory(run: Run, config: BaseCallbackConfig): Promise { + async _exitHistory(run: Run, config: RunnableConfig): Promise { const history = config.configurable?.messageHistory; // Get input messages @@ -176,7 +172,7 @@ export class RunnableWithMessageHistory< } } - async _mergeConfig(...configs: Array) { + async _mergeConfig(...configs: Array) { const config = await super._mergeConfig(...configs); // Extract sessionId if (!config.configurable || !config.configurable.sessionId) {