From 1e8622cdb4580591594cda6d998ba914448d08d7 Mon Sep 17 00:00:00 2001 From: Dragbos <145892326+drakeRAGE@users.noreply.github.com> Date: Mon, 18 Aug 2025 20:16:32 +0530 Subject: [PATCH] Update useRealtime.ts to add filter functionality based on date and status --- packages/react-hooks/src/hooks/useRealtime.ts | 171 ++++++++++++------ 1 file changed, 113 insertions(+), 58 deletions(-) diff --git a/packages/react-hooks/src/hooks/useRealtime.ts b/packages/react-hooks/src/hooks/useRealtime.ts index 9492c085de..31f71795a6 100644 --- a/packages/react-hooks/src/hooks/useRealtime.ts +++ b/packages/react-hooks/src/hooks/useRealtime.ts @@ -1,13 +1,7 @@ "use client"; -import { - AnyTask, - ApiClient, - InferRunTypes, - RealtimeRun, - RealtimeRunSkipColumns, -} from "@trigger.dev/core/v3"; -import { useCallback, useEffect, useId, useRef, useState } from "react"; +import { AnyTask, ApiClient, InferRunTypes, RealtimeRun } from "@trigger.dev/core/v3"; +import { useCallback, useEffect, useId, useRef, useState, useMemo } from "react"; import { KeyedMutator, useSWR } from "../utils/trigger-swr.js"; import { useApiClient, UseApiClientOptions } from "./useApiClient.js"; import { createThrottledQueue } from "../utils/throttle.js"; @@ -35,13 +29,6 @@ export type UseRealtimeSingleRunOptions = UseRe * Set this to false if you are making updates to the run metadata after completion through child runs */ stopOnCompletion?: boolean; - - /** - * Skip columns from the subscription. - * - * @default [] - */ - skipColumns?: RealtimeRunSkipColumns; }; export type UseRealtimeRunInstance = { @@ -114,7 +101,6 @@ export function useRealtimeRun( await processRealtimeRun( runId, - { skipColumns: options?.skipColumns }, apiClient, mutateRun, setError, @@ -275,7 +261,6 @@ export function useRealtimeRunWithStreams< await processRealtimeRunWithStreams( runId, - { skipColumns: options?.skipColumns }, apiClient, mutateRun, mutateStreams, @@ -349,32 +334,6 @@ export type UseRealtimeRunsInstance = { stop: () => void; }; -export type UseRealtimeRunsWithTagOptions = UseRealtimeRunOptions & { - /** - * Filter runs by the time they were created. You must specify the duration string like "1h", "10s", "30m", etc. - * - * @example - * "1h" - 1 hour ago - * "10s" - 10 seconds ago - * "30m" - 30 minutes ago - * "1d" - 1 day ago - * "1w" - 1 week ago - * - * The maximum duration is 1 week - * - * @note The timestamp will be calculated on the server side when you first subscribe to the runs. - * - */ - createdAt?: string; - - /** - * Skip columns from the subscription. - * - * @default [] - */ - skipColumns?: RealtimeRunSkipColumns; -}; - /** * Hook to subscribe to realtime updates of task runs filtered by tag(s). * @@ -389,13 +348,73 @@ export type UseRealtimeRunsWithTagOptions = UseRealtimeRunOptions & { * const { runs, error } = useRealtimeRunsWithTag('my-tag'); * // Or with multiple tags * const { runs, error } = useRealtimeRunsWithTag(['tag1', 'tag2']); - * // Or with a createdAt filter - * const { runs, error } = useRealtimeRunsWithTag('my-tag', { createdAt: '1h' }); * ``` */ + +export interface RealtimeFilterOptions { + /** + * Inclusive start date. Accepts Date or ISO date string or epoch number. + */ + startDate?: Date | string | number; + /** + * Inclusive end date. Accepts Date or ISO date string or epoch number. + */ + endDate?: Date | string | number; + /** + * Allowed run statuses (exact string matches). + */ + statuses?: string[]; +} + +/** Utility: normalize a possible date input to a Date instance, or null */ +function normalizeToDate(value?: Date | string | number): Date | null { + if (value === undefined || value === null) return null; + if (value instanceof Date) return value; + if (typeof value === "number") return new Date(value); + if (typeof value === "string") { + const d = new Date(value); + return Number.isNaN(d.getTime()) ? null : d; + } + return null; +} + +/** Utility: pull a best-effort Date from a run object in a type-safe way */ +function getRunDate(run: RealtimeRun): Date | null { + // Common timestamp fields used by different APIs + const candidates = ["startedAt", "createdAt", "started_at", "created_at"] as const; + + for (const key of candidates) { + const val = (run as unknown as Record)[key]; + if (val instanceof Date) return val; + if (typeof val === "string" || typeof val === "number") { + const d = normalizeToDate(val as string | number); + if (d) return d; + } + } + + return null; +} + +/** Utility: get status string from run in a safe way */ +function getRunStatus(run: RealtimeRun): string { + const status = (run as unknown as Record).status; + if (typeof status === "string") return status; + if (typeof status === "number") return String(status); + return ""; +} + +/** Stable serialisation/keys for filters so deps are stable and readable */ +function createFiltersKey(filters?: RealtimeFilterOptions): string { + if (!filters) return ""; + const startIso = normalizeToDate(filters.startDate)?.toISOString() ?? ""; + const endIso = normalizeToDate(filters.endDate)?.toISOString() ?? ""; + const statuses = Array.isArray(filters.statuses) ? filters.statuses.join(",") : ""; + return `${startIso}|${endIso}|${statuses}`; +} + export function useRealtimeRunsWithTag( tag: string | string[], - options?: UseRealtimeRunsWithTagOptions + options?: UseRealtimeRunOptions & { filters?: RealtimeFilterOptions } ): UseRealtimeRunsInstance { const hookId = useId(); const idKey = options?.id ?? hookId; @@ -439,7 +458,6 @@ export function useRealtimeRunsWithTag( await processRealtimeRunsWithTag( tag, - { createdAt: options?.createdAt, skipColumns: options?.skipColumns }, apiClient, mutateRuns, runsRef, @@ -471,9 +489,51 @@ export function useRealtimeRunsWithTag( return () => { stop(); }; - }, [tag, stop, options?.enabled]); + // Including filtersKey here to restart the streaming request when filters change + // This ensures we get fresh data when filter criteria are modified + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [tag, stop, options?.enabled, createFiltersKey(options?.filters)]); + // Stable key for useMemo deps + const filtersKey = useMemo( + () => createFiltersKey(options?.filters), + [options?.filters] + ); // Client-side filtering: useMemo + typed return + const filteredRuns = useMemo[]>(() => { + const list = runs ?? []; + const f = options?.filters; + if (!f) return list; + + const start = normalizeToDate(f.startDate); + const end = normalizeToDate(f.endDate); + const allowedStatuses = Array.isArray(f.statuses) && f.statuses.length > 0 + ? f.statuses.map((s) => s.toString()) + : null; + + // small, readable filter function + return list.filter((run: any) => { + const runDate = getRunDate(run); + if (start && runDate) { + if (runDate < start) return false; + } + if (end && runDate) { + if (runDate > end) return false; + } - return { runs: runs ?? [], error, stop }; + if (allowedStatuses) { + const status = getRunStatus(run); + if (!allowedStatuses.includes(status)) return false; + } + + return true; + }); + // filtersKey so memo invalidates when filters change + }, [runs, filtersKey]); + + return { + runs: filteredRuns, + error, + stop, + }; } /** @@ -591,7 +651,7 @@ async function processRealtimeBatch( } } -// Inserts and then orders by the run createdAt timestamp, and ensures that the run is not duplicated +// Inserts and then orders by the run number, and ensures that the run is not duplicated function insertRunShapeInOrder( previousRuns: RealtimeRun[], run: RealtimeRun @@ -601,8 +661,8 @@ function insertRunShapeInOrder( return previousRuns.map((r) => (r.id === run.id ? run : r)); } - const runCreatedAt = run.createdAt; - const index = previousRuns.findIndex((r) => r.createdAt > runCreatedAt); + const runNumber = run.number; + const index = previousRuns.findIndex((r) => r.number > runNumber); if (index === -1) { return [...previousRuns, run]; } @@ -612,14 +672,13 @@ function insertRunShapeInOrder( async function processRealtimeRunsWithTag( tag: string | string[], - filters: { createdAt?: string; skipColumns?: RealtimeRunSkipColumns }, apiClient: ApiClient, mutateRunsData: KeyedMutator[]>, existingRunsRef: React.MutableRefObject[]>, onError: (e: Error) => void, abortControllerRef: React.MutableRefObject ) { - const subscription = apiClient.subscribeToRunsWithTag>(tag, filters, { + const subscription = apiClient.subscribeToRunsWithTag>(tag, { signal: abortControllerRef.current?.signal, onFetchError: onError, }); @@ -655,7 +714,6 @@ async function processRealtimeRunWithStreams< TStreams extends Record = Record, >( runId: string, - filters: { skipColumns?: RealtimeRunSkipColumns }, apiClient: ApiClient, mutateRunData: KeyedMutator>, mutateStreamData: KeyedMutator>, @@ -669,7 +727,6 @@ async function processRealtimeRunWithStreams< signal: abortControllerRef.current?.signal, closeOnComplete: stopOnCompletion, onFetchError: onError, - skipColumns: filters.skipColumns, }); type StreamUpdate = { @@ -716,7 +773,6 @@ async function processRealtimeRunWithStreams< async function processRealtimeRun( runId: string, - filters: { skipColumns?: RealtimeRunSkipColumns }, apiClient: ApiClient, mutateRunData: KeyedMutator>, onError: (e: Error) => void, @@ -727,7 +783,6 @@ async function processRealtimeRun( signal: abortControllerRef.current?.signal, closeOnComplete: stopOnCompletion, onFetchError: onError, - skipColumns: filters.skipColumns, }); for await (const part of subscription) {