forked from redis/ioredis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathScanStream.ts
65 lines (59 loc) · 1.36 KB
/
ScanStream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import { Readable, ReadableOptions } from "stream";
/**
* Options for ScanStream
*
* @export
* @interface IScanStreamOptions
* @extends {ReadableOptions}
*/
export interface IScanStreamOptions extends ReadableOptions {
key?: string;
match?: string;
command: string;
redis: any;
count?: string | number;
}
/**
* Convenient class to convert the process of scaning keys to a readable stream.
*
* @export
* @class ScanStream
* @extends {Readable}
*/
export default class ScanStream extends Readable {
private _redisCursor = "0";
private _redisDrained = false;
constructor(private opt: IScanStreamOptions) {
super(opt);
}
_read() {
if (this._redisDrained) {
this.push(null);
return;
}
const args: string[] = [this._redisCursor];
if (this.opt.key) {
args.unshift(this.opt.key);
}
if (this.opt.match) {
args.push("MATCH", this.opt.match);
}
if (this.opt.count) {
args.push("COUNT", String(this.opt.count));
}
this.opt.redis[this.opt.command](args, (err, res) => {
if (err) {
this.emit("error", err);
return;
}
this._redisCursor = res[0] instanceof Buffer ? res[0].toString() : res[0];
if (this._redisCursor === "0") {
this._redisDrained = true;
}
this.push(res[1]);
});
}
close() {
this._redisDrained = true;
}
}