Skip to content

Expand useRealtimeRunsWithTag hook to support date range & run status filtering #2413

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 113 additions & 58 deletions packages/react-hooks/src/hooks/useRealtime.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
"use client";

import {
AnyTask,
ApiClient,
InferRunTypes,
RealtimeRun,
RealtimeRunSkipColumns,
} from "@trigger.dev/core/v3";
import { useCallback, useEffect, useId, useRef, useState } from "react";
import { AnyTask, ApiClient, InferRunTypes, RealtimeRun } from "@trigger.dev/core/v3";
import { useCallback, useEffect, useId, useRef, useState, useMemo } from "react";
import { KeyedMutator, useSWR } from "../utils/trigger-swr.js";
import { useApiClient, UseApiClientOptions } from "./useApiClient.js";
import { createThrottledQueue } from "../utils/throttle.js";
Expand Down Expand Up @@ -35,13 +29,6 @@ export type UseRealtimeSingleRunOptions<TTask extends AnyTask = AnyTask> = UseRe
* Set this to false if you are making updates to the run metadata after completion through child runs
*/
stopOnCompletion?: boolean;

/**
* Skip columns from the subscription.
*
* @default []
*/
skipColumns?: RealtimeRunSkipColumns;
};

export type UseRealtimeRunInstance<TTask extends AnyTask = AnyTask> = {
Expand Down Expand Up @@ -114,7 +101,6 @@ export function useRealtimeRun<TTask extends AnyTask>(

await processRealtimeRun(
runId,
{ skipColumns: options?.skipColumns },
apiClient,
mutateRun,
setError,
Expand Down Expand Up @@ -275,7 +261,6 @@ export function useRealtimeRunWithStreams<

await processRealtimeRunWithStreams(
runId,
{ skipColumns: options?.skipColumns },
apiClient,
mutateRun,
mutateStreams,
Expand Down Expand Up @@ -349,32 +334,6 @@ export type UseRealtimeRunsInstance<TTask extends AnyTask = AnyTask> = {
stop: () => void;
};

export type UseRealtimeRunsWithTagOptions = UseRealtimeRunOptions & {
/**
* Filter runs by the time they were created. You must specify the duration string like "1h", "10s", "30m", etc.
*
* @example
* "1h" - 1 hour ago
* "10s" - 10 seconds ago
* "30m" - 30 minutes ago
* "1d" - 1 day ago
* "1w" - 1 week ago
*
* The maximum duration is 1 week
*
* @note The timestamp will be calculated on the server side when you first subscribe to the runs.
*
*/
createdAt?: string;

/**
* Skip columns from the subscription.
*
* @default []
*/
skipColumns?: RealtimeRunSkipColumns;
};

/**
* Hook to subscribe to realtime updates of task runs filtered by tag(s).
*
Expand All @@ -389,13 +348,73 @@ export type UseRealtimeRunsWithTagOptions = UseRealtimeRunOptions & {
* const { runs, error } = useRealtimeRunsWithTag<typeof myTask>('my-tag');
* // Or with multiple tags
* const { runs, error } = useRealtimeRunsWithTag<typeof myTask>(['tag1', 'tag2']);
* // Or with a createdAt filter
* const { runs, error } = useRealtimeRunsWithTag<typeof myTask>('my-tag', { createdAt: '1h' });
* ```
*/

export interface RealtimeFilterOptions {
/**
* Inclusive start date. Accepts Date or ISO date string or epoch number.
*/
startDate?: Date | string | number;
/**
* Inclusive end date. Accepts Date or ISO date string or epoch number.
*/
endDate?: Date | string | number;
/**
* Allowed run statuses (exact string matches).
*/
statuses?: string[];
}

/** Utility: normalize a possible date input to a Date instance, or null */
function normalizeToDate(value?: Date | string | number): Date | null {
if (value === undefined || value === null) return null;
if (value instanceof Date) return value;
if (typeof value === "number") return new Date(value);
if (typeof value === "string") {
const d = new Date(value);
return Number.isNaN(d.getTime()) ? null : d;
}
return null;
}

/** Utility: pull a best-effort Date from a run object in a type-safe way */
function getRunDate<TTask extends AnyTask>(run: RealtimeRun<TTask>): Date | null {
// Common timestamp fields used by different APIs
const candidates = ["startedAt", "createdAt", "started_at", "created_at"] as const;

for (const key of candidates) {
const val = (run as unknown as Record<string, unknown>)[key];
if (val instanceof Date) return val;
if (typeof val === "string" || typeof val === "number") {
const d = normalizeToDate(val as string | number);
if (d) return d;
}
}

return null;
}

/** Utility: get status string from run in a safe way */
function getRunStatus<TTask extends AnyTask>(run: RealtimeRun<TTask>): string {
const status = (run as unknown as Record<string, unknown>).status;
if (typeof status === "string") return status;
if (typeof status === "number") return String(status);
return "";
}

/** Stable serialisation/keys for filters so deps are stable and readable */
function createFiltersKey(filters?: RealtimeFilterOptions): string {
if (!filters) return "";
const startIso = normalizeToDate(filters.startDate)?.toISOString() ?? "";
const endIso = normalizeToDate(filters.endDate)?.toISOString() ?? "";
const statuses = Array.isArray(filters.statuses) ? filters.statuses.join(",") : "";
return `${startIso}|${endIso}|${statuses}`;
}

export function useRealtimeRunsWithTag<TTask extends AnyTask>(
tag: string | string[],
options?: UseRealtimeRunsWithTagOptions
options?: UseRealtimeRunOptions & { filters?: RealtimeFilterOptions }
): UseRealtimeRunsInstance<TTask> {
const hookId = useId();
const idKey = options?.id ?? hookId;
Expand Down Expand Up @@ -439,7 +458,6 @@ export function useRealtimeRunsWithTag<TTask extends AnyTask>(

await processRealtimeRunsWithTag(
tag,
{ createdAt: options?.createdAt, skipColumns: options?.skipColumns },
apiClient,
mutateRuns,
runsRef,
Expand Down Expand Up @@ -471,9 +489,51 @@ export function useRealtimeRunsWithTag<TTask extends AnyTask>(
return () => {
stop();
};
}, [tag, stop, options?.enabled]);
// Including filtersKey here to restart the streaming request when filters change
// This ensures we get fresh data when filter criteria are modified
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [tag, stop, options?.enabled, createFiltersKey(options?.filters)]);
Comment on lines +492 to +495
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix useEffect dependencies to use stable filter key

The current implementation has inconsistent dependency handling - createFiltersKey is called inline in the dependency array but filtersKey is computed with useMemo below. This could cause unnecessary re-renders.

Apply this fix to use the memoized filtersKey:

+  // Stable key for useMemo deps
+  const filtersKey = useMemo(
+    () => createFiltersKey(options?.filters),
+    [options?.filters]
+  );
+
   useEffect(() => {
     if (typeof options?.enabled === "boolean" && !options.enabled) {
       return;
     }
 
     triggerRequest().finally(() => {});
 
     return () => {
       stop();
     };
-    // Including filtersKey here to restart the streaming request when filters change
-    // This ensures we get fresh data when filter criteria are modified
-    // eslint-disable-next-line react-hooks/exhaustive-deps
-  }, [tag, stop, options?.enabled, createFiltersKey(options?.filters)]);
-  // Stable key for useMemo deps
-  const filtersKey = useMemo(
-    () => createFiltersKey(options?.filters),
-    [options?.filters]
-  );
+  }, [tag, stop, options?.enabled, filtersKey]);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Including filtersKey here to restart the streaming request when filters change
// This ensures we get fresh data when filter criteria are modified
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [tag, stop, options?.enabled, createFiltersKey(options?.filters)]);
// Stable key for useMemo deps
const filtersKey = useMemo(
() => createFiltersKey(options?.filters),
[options?.filters]
);
useEffect(() => {
if (typeof options?.enabled === "boolean" && !options.enabled) {
return;
}
triggerRequest().finally(() => {});
return () => {
stop();
};
// Including filtersKey here to restart the streaming request when filters change
// This ensures we get fresh data when filter criteria are modified
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [tag, stop, options?.enabled, filtersKey]);
// …rest of the hook implementation…
// (Remove the original `useMemo` call that was below the effect.)
🤖 Prompt for AI Agents
In packages/react-hooks/src/hooks/useRealtime.ts around lines 492 to 495, the
useEffect dependency array calls createFiltersKey inline which is inconsistent
with the memoized filtersKey computed below and can trigger unnecessary
re-renders; change the dependency array to reference the memoized filtersKey
(i.e., replace createFiltersKey(options?.filters) with filtersKey), ensure
filtersKey is declared with useMemo before this useEffect, and remove the inline
eslint-disable comment if it becomes unnecessary.

// Stable key for useMemo deps
const filtersKey = useMemo(
() => createFiltersKey(options?.filters),
[options?.filters]
); // Client-side filtering: useMemo + typed return
const filteredRuns = useMemo<RealtimeRun<TTask>[]>(() => {
const list = runs ?? [];
const f = options?.filters;
if (!f) return list;

const start = normalizeToDate(f.startDate);
const end = normalizeToDate(f.endDate);
const allowedStatuses = Array.isArray(f.statuses) && f.statuses.length > 0
? f.statuses.map((s) => s.toString())
: null;

// small, readable filter function
return list.filter((run: any) => {
const runDate = getRunDate(run);
if (start && runDate) {
if (runDate < start) return false;
}
if (end && runDate) {
if (runDate > end) return false;
}

return { runs: runs ?? [], error, stop };
if (allowedStatuses) {
const status = getRunStatus(run);
if (!allowedStatuses.includes(status)) return false;
}

return true;
});
// filtersKey so memo invalidates when filters change
}, [runs, filtersKey]);

return {
runs: filteredRuns,
error,
stop,
};
}

/**
Expand Down Expand Up @@ -591,7 +651,7 @@ async function processRealtimeBatch<TTask extends AnyTask = AnyTask>(
}
}

// Inserts and then orders by the run createdAt timestamp, and ensures that the run is not duplicated
// Inserts and then orders by the run number, and ensures that the run is not duplicated
function insertRunShapeInOrder<TTask extends AnyTask>(
previousRuns: RealtimeRun<TTask>[],
run: RealtimeRun<TTask>
Expand All @@ -601,8 +661,8 @@ function insertRunShapeInOrder<TTask extends AnyTask>(
return previousRuns.map((r) => (r.id === run.id ? run : r));
}

const runCreatedAt = run.createdAt;
const index = previousRuns.findIndex((r) => r.createdAt > runCreatedAt);
const runNumber = run.number;
const index = previousRuns.findIndex((r) => r.number > runNumber);
if (index === -1) {
return [...previousRuns, run];
}
Expand All @@ -612,14 +672,13 @@ function insertRunShapeInOrder<TTask extends AnyTask>(

async function processRealtimeRunsWithTag<TTask extends AnyTask = AnyTask>(
tag: string | string[],
filters: { createdAt?: string; skipColumns?: RealtimeRunSkipColumns },
apiClient: ApiClient,
mutateRunsData: KeyedMutator<RealtimeRun<TTask>[]>,
existingRunsRef: React.MutableRefObject<RealtimeRun<TTask>[]>,
onError: (e: Error) => void,
abortControllerRef: React.MutableRefObject<AbortController | null>
) {
const subscription = apiClient.subscribeToRunsWithTag<InferRunTypes<TTask>>(tag, filters, {
const subscription = apiClient.subscribeToRunsWithTag<InferRunTypes<TTask>>(tag, {
signal: abortControllerRef.current?.signal,
onFetchError: onError,
});
Comment on lines +681 to 684
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify server-side filtering removal is intentional

The subscription no longer passes filter parameters to the server. Confirm that client-side filtering alone meets performance requirements for the expected data volumes.

Based on the retrieved learning about in-memory filtering being acceptable for paginated results (max 100 runs), this approach should be fine. However, let's verify if there are any pagination limits in place:


🏁 Script executed:

#!/bin/bash
# Check if subscribeToRunsWithTag has any built-in limits or pagination
ast-grep --pattern 'subscribeToRunsWithTag<$_>($$$) {
  $$$
}'

# Look for any limit/pagination parameters in the API client
rg -n "subscribeToRunsWithTag" -B3 -A10 --type=ts

Length of output: 6012


🏁 Script executed:

#!/bin/bash
# Show context around the subscribeToRunsWithTag call in useRealtime.ts
sed -n '650,720p' packages/react-hooks/src/hooks/useRealtime.ts

Length of output: 2351


🏁 Script executed:

#!/bin/bash
# Locate the definition of createSearchQueryForSubscribeToRuns
rg -n "createSearchQueryForSubscribeToRuns" -n packages/core/src/v3/apiClient

Length of output: 329


🏁 Script executed:

#!/bin/bash
# Show the createSearchQueryForSubscribeToRuns function definition
sed -n '1040,1120p' packages/core/src/v3/apiClient/index.ts

Length of output: 2048


🏁 Script executed:

#!/bin/bash
# Search for listRuns usage in useRealtime.ts
rg -n "listRuns" -C3 packages/react-hooks/src/hooks/useRealtime.ts

Length of output: 66


🏁 Script executed:

#!/bin/bash
# Show full implementation of subscribeToRunsWithTag in the API client
sed -n '900,1000p' packages/core/src/v3/apiClient/index.ts

Length of output: 3144


Fix parameter usage for subscribeToRunsWithTag and confirm filter removal is intentional

It looks like you’re accidentally passing the signal/onFetchError object as the filters param, so:

  • No createdAt-based server filter is applied (all runs with this tag will be streamed), and
  • The signal and onFetchError options never reach the SSE subscription.

Please update the call in
packages/react-hooks/src/hooks/useRealtime.ts (around lines 681–684):

- const subscription = apiClient.subscribeToRunsWithTag<InferRunTypes<TTask>>(tag, {
-   signal: abortControllerRef.current?.signal,
-   onFetchError: onError,
- });
+ const subscription = apiClient.subscribeToRunsWithTag<InferRunTypes<TTask>>(
+   tag,
+   /* filters */ undefined,
+   {
+     signal: abortControllerRef.current?.signal,
+     onFetchError: onError,
+   }
+ );

Then, if you intentionally removed any createdAt or pagination filters, please verify that your in-memory ordering/duplication logic (insertRunShape) will perform acceptably under the expected stream volume. Consider re-introducing a createdAt filter or using skipColumns to limit payload size if needed.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const subscription = apiClient.subscribeToRunsWithTag<InferRunTypes<TTask>>(tag, {
signal: abortControllerRef.current?.signal,
onFetchError: onError,
});
const subscription = apiClient.subscribeToRunsWithTag<InferRunTypes<TTask>>(
tag,
/* filters */ undefined,
{
signal: abortControllerRef.current?.signal,
onFetchError: onError,
}
);

Expand Down Expand Up @@ -655,7 +714,6 @@ async function processRealtimeRunWithStreams<
TStreams extends Record<string, any> = Record<string, any>,
>(
runId: string,
filters: { skipColumns?: RealtimeRunSkipColumns },
apiClient: ApiClient,
mutateRunData: KeyedMutator<RealtimeRun<TTask>>,
mutateStreamData: KeyedMutator<StreamResults<TStreams>>,
Expand All @@ -669,7 +727,6 @@ async function processRealtimeRunWithStreams<
signal: abortControllerRef.current?.signal,
closeOnComplete: stopOnCompletion,
onFetchError: onError,
skipColumns: filters.skipColumns,
});

type StreamUpdate = {
Expand Down Expand Up @@ -716,7 +773,6 @@ async function processRealtimeRunWithStreams<

async function processRealtimeRun<TTask extends AnyTask = AnyTask>(
runId: string,
filters: { skipColumns?: RealtimeRunSkipColumns },
apiClient: ApiClient,
mutateRunData: KeyedMutator<RealtimeRun<TTask>>,
onError: (e: Error) => void,
Expand All @@ -727,7 +783,6 @@ async function processRealtimeRun<TTask extends AnyTask = AnyTask>(
signal: abortControllerRef.current?.signal,
closeOnComplete: stopOnCompletion,
onFetchError: onError,
skipColumns: filters.skipColumns,
});

for await (const part of subscription) {
Expand Down