forked from pbkit/pbkit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfetch.ts
70 lines (68 loc) · 1.87 KB
/
fetch.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import {
createSubscribeFn,
defer,
Observer,
SubscribeFn,
} from "../core/runtime/async/observer.ts";
export interface ProgressResult {
originalResponse: Response;
response: Response;
subscribeProgress: SubscribeFn<Progress>;
}
export interface Progress {
current: number;
total: number;
}
export function progressResponse(originalResponse: Response): ProgressResult {
const deferredStart = defer<void>();
const subscribeReader = readerToSubscribeFn(
originalResponse.body?.getReader()!,
deferredStart,
);
const contentLength = +originalResponse.headers.get("content-length")! | 0;
let loaded = 0;
let progressObserver: Observer<Progress> | null = null;
const subscribeProgress: SubscribeFn<Progress> = (observer) => {
progressObserver = observer;
return () => {
progressObserver = null;
observer.complete();
};
};
const response = new Response(
new ReadableStream({
start(controller) {
subscribeReader({
next({ value }) {
if (!value) return;
controller.enqueue(value);
loaded += value.byteLength | 0;
progressObserver?.next({
current: loaded,
total: contentLength,
});
},
error(err) {
controller.error(err);
progressObserver?.error(err);
},
complete() {
controller.close();
progressObserver?.complete();
},
});
deferredStart.resolve();
},
}),
);
return { originalResponse, response, subscribeProgress };
}
function readerToSubscribeFn<T>(
reader: ReadableStreamDefaultReader<T>,
wait = Promise.resolve(),
): SubscribeFn<ReadableStreamReadResult<T>> {
return createSubscribeFn(async () => {
const readResult = await reader.read();
return [readResult, readResult.done];
}, wait);
}