Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
yanning.zhang committed Sep 18, 2023
1 parent 5dc84e4 commit 51f3829
Show file tree
Hide file tree
Showing 6 changed files with 297 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/index.ts → src/crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import logger from "./logger.js";
import Bottleneck from "bottleneck";
import seenreq from "seenreq";

import type { crawlerOptions } from "./types/index.d.ts";
import type { crawlerOptions } from "./types/crawler.js";

const normalizeContentType = (contentType: string) => {
//@todo
Expand Down
134 changes: 134 additions & 0 deletions src/rateLimiter/cluster.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
class Cluster {
private maxConcurrent: number;
private rateLimit: number;
private priorityRange: number;
private defaultPriority: number;
private homogeneous: boolean;
private limiters: Record<string, Bottleneck> = {};
private Bottleneck: any;

constructor(maxConcurrent: number, rateLimit: number, priorityRange: number, defaultPriority: number, homogeneous: boolean) {
this.maxConcurrent = maxConcurrent;
this.rateLimit = rateLimit;
this.priorityRange = priorityRange;
this.defaultPriority = defaultPriority;
this.homogeneous = homogeneous ? true : false;
this.Bottleneck = require("./Bottleneck").default;
}

key(key: string = ""): Bottleneck {
if (!this.limiters[key]) {
this.limiters[key] = new this.Bottleneck(
this.maxConcurrent,
this.rateLimit,
this.priorityRange,
this.defaultPriority,
this.homogeneous ? this : null
);
this.limiters[key].setName(key);
}
return this.limiters[key];
}

deleteKey(key: string = ""): boolean {
return delete this.limiters[key];
}

all(cb: (limiter: Bottleneck) => any[]): any[] {
const results: any[] = [];
for (const k in this.limiters) {
if (Object.prototype.hasOwnProperty.call(this.limiters, k)) {
const v = this.limiters[k];
results.push(cb(v));
}
}
return results;
}

keys(): string[] {
return Object.keys(this.limiters);
}

private _waitingClients(): number {
let count = 0;
const keys = this.keys();
keys.forEach((key) => {
count += this.limiters[key]._waitingClients.size();
});
return count;
}

private _unfinishedClients(): number {
let count = 0;
const keys = this.keys();
keys.forEach((key) => {
count += this.limiters[key]._waitingClients.size();
count += this.limiters[key]._tasksRunning;
});
return count;
}

dequeue(name: string): { next: (done: () => void, limiter: string | null) => void; limiter: string } | undefined {
const keys = this.keys();
for (let i = 0; i < keys.length; ++i) {
if (this.limiters[keys[i]]._waitingClients.size()) {
return {
next: this.limiters[keys[i]]._waitingClients.dequeue(),
limiter: name,
};
}
}
}

private _status(): string {
const status: string[] = [];
const keys = this.keys();
keys.forEach((key) => {
status.push([
'key: ' + key,
'running: ' + this.limiters[key]._tasksRunning,
'waiting: ' + this.limiters[key]._waitingClients.size(),
].join());
});
return status.join(';');
}

startAutoCleanup(): void {
this.stopAutoCleanup();
const base = (this.interval = setInterval(() => {
const time = Date.now();
for (const k in this.limiters) {
const v = this.limiters[k];
if (v._nextRequest + 1000 * 60 * 5 < time) {
this.deleteKey(k);
}
}
}, 1000 * 30));
if (typeof base.unref === "function") {
base.unref();
}
}

stopAutoCleanup(): void {
clearInterval(this.interval);
}

get waitingClients(): number {
return this._waitingClients();
}

get unfinishedClients(): number {
return this._unfinishedClients();
}

get status(): string {
return this._status();
}

get empty(): boolean {
return this._unfinishedClients() > 0 ? false : true;
}
}

export default Cluster;

151 changes: 151 additions & 0 deletions src/rateLimiter/rateLimiter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
class PriorityQueue<T> {
private slots: T[][] = [];
private total: number | null = null;

constructor(size: number) {
size = Math.max(+size | 0, 1);
for (let i = 0; i < size; i += 1) {
this.slots.push([]);
}
}

size(): number {
if (this.total === null) {
this.total = 0;
for (const slot of this.slots) {
this.total += slot.length;
}
}
return this.total;
}

enqueue(obj: T, priority: number): void {
priority = (priority && +priority | 0) || 0;
this.total = null;
if (priority) {
const priorityOrig = priority;
if (priority < 0 || priority >= this.slots.length) {
priority = this.slots.length - 1;
console.error(`invalid priority: ${priorityOrig} must be between 0 and ${priority}`);
}
}
this.slots[priority].push(obj);
}

dequeue(callback: (obj: T | null) => void): void {
let obj: T | null = null;
for (const slot of this.slots) {
if (slot.length) {
obj = slot.shift() || null;
break;
}
}
callback(obj);
}
}

class RateLimiter {
private name: string | undefined;
private rateLimit: number;
private maxConcurrent: number;
private _waitingClients: PriorityQueue<(done: () => void, limiter: null) => void>;
private _priorityRange: number;
private _defaultPriority: number;
private _nextRequest: number;
private _tasksRunning: number;

constructor(
maxConcurrent: number,
rateLimit: number,
priorityRange: number,
defaultPriority: number,
cluster: any
) {
if (isNaN(maxConcurrent) || isNaN(rateLimit)) {
throw new Error("maxConcurrent and rateLimit must be numbers");
}

priorityRange = priorityRange || 1;
if (isNaN(priorityRange)) {
throw new Error("priorityRange must be a number");
}
priorityRange = parseInt(priorityRange.toString(), 10);
defaultPriority = defaultPriority ? defaultPriority : Math.floor(priorityRange / 2);
if (isNaN(defaultPriority)) {
throw new Error("defaultPriority must be a number");
}
defaultPriority = defaultPriority >= priorityRange ? priorityRange - 1 : defaultPriority;
defaultPriority = parseInt(defaultPriority.toString(), 10);

this.name = undefined;
this.rateLimit = parseInt(rateLimit.toString(), 10);
this.maxConcurrent = this.rateLimit ? 1 : parseInt(maxConcurrent.toString(), 10);
this._waitingClients = new PriorityQueue<(done: () => void, limiter: null) => void>(priorityRange);
this._priorityRange = priorityRange;
this._defaultPriority = defaultPriority;
this._nextRequest = Date.now();
this._tasksRunning = 0;
this.cluster = cluster;
}

setName(name: string): void {
this.name = name;
}

setRateLimit(rateLimit: number): void {
if (isNaN(rateLimit)) {
throw new Error("rateLimit must be a number");
}
this.rateLimit = parseInt(rateLimit.toString(), 10);
if (this.rateLimit > 0) {
this.maxConcurrent = 1;
}
}

submit(options: number | { priority: number }, clientCallback: (done: () => void, limiter: null) => void): void {
const priority = typeof options === "number" ? parseInt(options.toString(), 10) : options.priority;
const validPriority = Number.isInteger(priority) ? priority : this._defaultPriority;
const clampedPriority = validPriority > this._priorityRange - 1 ? this._priorityRange - 1 : validPriority;
this._waitingClients.enqueue(clientCallback, clampedPriority);
this._tryToRun();
}

private _tryToRun(): void {
if (this._tasksRunning < this.maxConcurrent && this.hasWaitingClients()) {
++this._tasksRunning;
const wait = Math.max(this._nextRequest - Date.now(), 0);
this._nextRequest = Date.now() + wait + this.rateLimit;
const obj = this.dequeue();
const next = obj.next;
setTimeout(() => {
const done = () => {
--this._tasksRunning;
this._tryToRun();
};
next(done, null);
}, wait);
}
}

hasWaitingClients(): boolean {
if (this._waitingClients.size()) {
return true;
}
if (this.cluster && this.cluster._waitingClients()) {
return true;
}
return false;
}

dequeue(): { next: (done: () => void, limiter: null) => void; limiter: null } {
if (this._waitingClients.size()) {
return {
next: this._waitingClients.dequeue(),
limiter: null,
};
}
return this.cluster.dequeue(this.name);
}
}

export default Bottleneck;
11 changes: 11 additions & 0 deletions src/test.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"src/": {
"types/": {
"files": ["index.d.ts", "rateLimiter.d.ts"]
},
"rateLimiter/": {
"files": ["index.ts", "cluster.ts"]
},
"files": ["index.ts", "logger.ts"]
}
}
File renamed without changes.
18 changes: 0 additions & 18 deletions src/vtest.ts
Original file line number Diff line number Diff line change
@@ -1,18 +0,0 @@
const a = "tst",
b = {
a: 1,
b: 2,
},
c = null,
d = undefined,
e = new Date(),
f = function () {},
g = [];
// console.log(Object.prototype.toString.call(a));
// console.log(Object.prototype.toString.call(b));
// console.log(Object.prototype.toString.call(c));
// console.log(Object.prototype.toString.call(d));
// console.log(Object.prototype.toString.call(e));
// console.log(Object.prototype.toString.call(f));
// console.log(Object.prototype.toString.call(g));
console.log(a.toString());

0 comments on commit 51f3829

Please sign in to comment.