Skip to content

Commit

Permalink
fix: retry until to fetch task output instead of long-lived request (N…
Browse files Browse the repository at this point in the history
…angoHQ#2831)

## Describe your changes
Instead of keeping the request to orchestrator `GET /task/ID/output`
alive for potentially minutes, we do shorter requests and retry until
the action task is supposed to expire

## Issue ticket number and link

## Checklist before requesting a review (skip if just adding/editing
APIs & templates)
- [ ] I added tests, otherwise the reason is: 
- [ ] I added observability, otherwise the reason is:
- [ ] I added analytics, otherwise the reason is:
  • Loading branch information
TBonnin authored Oct 9, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent e69b525 commit f3a0628
Showing 5 changed files with 64 additions and 23 deletions.
27 changes: 21 additions & 6 deletions packages/orchestrator/lib/clients/client.ts
Original file line number Diff line number Diff line change
@@ -8,8 +8,8 @@ import { route as postSchedulesSearchRoute } from '../routes/v1/schedules/postSe
import { route as getOutputRoute } from '../routes/v1/tasks/taskId/getOutput.js';
import { route as putTaskRoute } from '../routes/v1/tasks/putTaskId.js';
import { route as postHeartbeatRoute } from '../routes/v1/tasks/taskId/postHeartbeat.js';
import type { Result, Route } from '@nangohq/utils';
import { Ok, Err, routeFetch, getLogger, retryWithBackoff, httpRetryStrategy } from '@nangohq/utils';
import type { Result, RetryConfig, Route } from '@nangohq/utils';
import { Ok, Err, routeFetch, getLogger, retry } from '@nangohq/utils';
import type { Endpoint } from '@nangohq/types';
import type {
ClientError,
@@ -40,11 +40,20 @@ export class OrchestratorClient {
private routeFetch<E extends Endpoint<any>>(
route: Route<E>,
config?: {
timeoutMs: number;
timeoutMs?: number;
retryConfig?: RetryConfig<E['Reply']>;
}
): (props: { query?: E['Querystring']; body?: E['Body']; params?: E['Params'] }) => Promise<E['Reply']> {
return (props) => {
return retryWithBackoff(() => routeFetch(this.baseUrl, route, config)(props), { retry: httpRetryStrategy, numOfAttempts: 3, maxDelay: 50 });
const fetch = async () => {
return await routeFetch(this.baseUrl, route, { timeoutMs: config?.timeoutMs })(props);
};
const retryConfig: RetryConfig<E['Reply']> = config?.retryConfig || {
maxAttempts: 3,
delayMs: 50,
retryIf: (res) => 'error' in res
};
return retry(fetch, retryConfig);
};
}

@@ -164,8 +173,14 @@ export class OrchestratorClient {
return res;
}
const taskId = res.value.taskId;
const timeoutMs = (scheduleProps.timeoutSettingsInSecs.createdToStarted + scheduleProps.timeoutSettingsInSecs.startedToCompleted) * 1000;
const getOutput = await this.routeFetch(getOutputRoute, { timeoutMs })({ params: { taskId }, query: { longPolling: timeoutMs } });
const retryUntil = Date.now() + (scheduleProps.timeoutSettingsInSecs.createdToStarted + scheduleProps.timeoutSettingsInSecs.startedToCompleted) * 1000;
const getOutput = await this.routeFetch(getOutputRoute, {
retryConfig: {
maxAttempts: 1000,
delayMs: 100,
retryIf: (res) => 'error' in res && Date.now() < retryUntil
}
})({ params: { taskId }, query: { longPolling: 30_000 } });
if ('error' in getOutput) {
return Err({
name: getOutput.error.code,
6 changes: 3 additions & 3 deletions packages/records/lib/models/records.ts
Original file line number Diff line number Diff line change
@@ -219,9 +219,9 @@ export async function upsert({
await retry(upserting, {
maxAttempts: 3,
delayMs: 500,
retryIf: (error: Error) => {
if ('code' in error) {
const errorCode = (error as { code: string }).code;
retryIf: (res) => {
if ('code' in res) {
const errorCode = (res as { code: string }).code;
return errorCode === '40P01'; // deadlock_detected
}
return false;
2 changes: 1 addition & 1 deletion packages/utils/lib/express/route.ts
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@ export const routeFetch = <E extends Endpoint<any>>(
baseUrl: string,
route: Route<E>,
config?: {
timeoutMs: number;
timeoutMs?: number | undefined;
}
) => {
return async function f({ query, body, params }: { query?: E['Querystring']; body?: E['Body']; params?: E['Params'] }): Promise<E['Reply']> {
24 changes: 16 additions & 8 deletions packages/utils/lib/retry.ts
Original file line number Diff line number Diff line change
@@ -2,21 +2,29 @@ import { AxiosError } from 'axios';
import type { BackoffOptions } from 'exponential-backoff';
import { backOff } from 'exponential-backoff';

interface RetryConfig {
export interface RetryConfig<T = unknown> {
maxAttempts: number;
delayMs: number | ((attempt: number) => number);
retryIf: (error: Error) => boolean;
retryIf?: (t: T) => boolean;
retryOnError?: (error: Error) => boolean;
}

export async function retry<T>(fn: () => T, config: RetryConfig): Promise<T> {
const { maxAttempts, delayMs, retryIf } = config;
export async function retry<T>(fn: () => T, { maxAttempts, delayMs, retryIf = () => false, retryOnError = () => true }: RetryConfig<T>): Promise<T> {
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
const wait = async () => {
const delay = typeof delayMs === 'number' ? delayMs : delayMs(attempt);
return new Promise((resolve) => setTimeout(resolve, delay));
};
try {
return fn();
const res = await Promise.resolve(fn());
if (attempt < maxAttempts && retryIf(res)) {
await wait();
} else {
return res;
}
} catch (error) {
if (attempt < maxAttempts && retryIf(error as Error)) {
const delay = typeof delayMs === 'number' ? delayMs : delayMs(attempt);
await new Promise((resolve) => setTimeout(resolve, delay));
if (attempt < maxAttempts && retryOnError(error as Error)) {
await wait();
} else {
throw error;
}
28 changes: 23 additions & 5 deletions packages/utils/lib/retry.unit.test.ts
Original file line number Diff line number Diff line change
@@ -14,8 +14,7 @@ describe('retry', () => {
},
{
maxAttempts: 3,
delayMs: () => 0,
retryIf: () => true
delayMs: () => 0
}
);
expect(result).toEqual(3);
@@ -31,8 +30,7 @@ describe('retry', () => {
},
{
maxAttempts: 3,
delayMs: () => 0,
retryIf: () => true
delayMs: () => 0
}
);
} catch (error: any) {
@@ -41,6 +39,26 @@ describe('retry', () => {
expect(count).toBe(3);
});

it('should not retry if result condition is false ', async () => {
let count = 0;
try {
await retry(
() => {
count++;
return count;
},
{
maxAttempts: 3,
delayMs: () => 0,
retryIf: (n) => n == -1
}
);
} catch (error: any) {
expect(error.message).toEqual('my error');
}
expect(count).toBe(1);
});

it('should not retry if error condition is false ', async () => {
let count = 0;
try {
@@ -55,7 +73,7 @@ describe('retry', () => {
{
maxAttempts: 3,
delayMs: () => 0,
retryIf: (error) => error.message === 'another error'
retryOnError: (error) => error.message === 'another error'
}
);
} catch (error: any) {

0 comments on commit f3a0628

Please sign in to comment.