Skip to content

Commit

Permalink
fix: PubSub - Resolve ts strict errors in the MqttOverWSProvider (aws…
Browse files Browse the repository at this point in the history
  • Loading branch information
stocaaro authored Apr 6, 2022
1 parent eaaa2c4 commit f50bd68
Showing 1 changed file with 37 additions and 16 deletions.
53 changes: 37 additions & 16 deletions packages/pubsub/src/Providers/MqttOverWSProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,27 @@ export type MqttProvidertOptions = MqttProviderOptions;
class ClientsQueue {
private promises: Map<string, Promise<any>> = new Map();

async get(clientId: string, clientFactory: (string) => Promise<any>) {
let promise = this.promises.get(clientId);
if (promise) {
return promise;
async get(clientId: string, clientFactory?: (input: string) => Promise<any>) {
const cachedPromise = this.promises.get(clientId);
if (cachedPromise) {
return cachedPromise;
}

promise = clientFactory(clientId);
if (clientFactory) {
const newPromise = clientFactory(clientId);

this.promises.set(clientId, promise);
this.promises.set(clientId, newPromise);

return promise;
return newPromise;
}
return undefined;
}

get allClients() {
return Array.from(this.promises.keys());
}

remove(clientId) {
remove(clientId: string) {
this.promises.delete(clientId);
}
}
Expand Down Expand Up @@ -95,19 +98,29 @@ export class MqttOverWSProvider extends AbstractPubSubProvider {
.aws_appsync_dangerously_connect_to_http_endpoint_for_testing;
}

protected getTopicForValue(value) {
protected getTopicForValue(value: any) {
return typeof value === 'object' && value[topicSymbol];
}

getProviderName() {
return 'MqttOverWSProvider';
}

public onDisconnect({ clientId, errorCode, ...args }) {
public onDisconnect({
clientId,
errorCode,
...args
}: {
clientId?: string;
errorCode?: number;
}) {
if (errorCode !== 0) {
logger.warn(clientId, JSON.stringify({ errorCode, ...args }, null, 2));

const topicsToDelete = [];
const topicsToDelete: string[] = [];
if (!clientId) {
return;
}
const clientIdObservers = this._clientIdObservers.get(clientId);
if (!clientIdObservers) {
return;
Expand Down Expand Up @@ -142,10 +155,18 @@ export class MqttOverWSProvider extends AbstractPubSubProvider {
client.onMessageArrived = ({
destinationName: topic,
payloadString: msg,
}: {
destinationName: string;
payloadString: string;
}) => {
this._onMessage(topic, msg);
};
client.onConnectionLost = ({ errorCode, ...args }) => {
client.onConnectionLost = ({
errorCode,
...args
}: {
errorCode: number;
}) => {
this.onDisconnect({ clientId, errorCode, ...args });
};

Expand All @@ -171,7 +192,7 @@ export class MqttOverWSProvider extends AbstractPubSubProvider {
}

protected async disconnect(clientId: string): Promise<void> {
const client = await this.clientsQueue.get(clientId, () => null);
const client = await this.clientsQueue.get(clientId);

if (client && client.isConnected()) {
client.disconnect();
Expand Down Expand Up @@ -199,7 +220,7 @@ export class MqttOverWSProvider extends AbstractPubSubProvider {

private _onMessage(topic: string, msg: any) {
try {
const matchedTopicObservers = [];
const matchedTopicObservers: Set<SubscriptionObserver<any>>[] = [];
this._topicObservers.forEach((observerForTopic, observerTopic) => {
if (mqttTopicMatch(observerTopic, topic)) {
matchedTopicObservers.push(observerForTopic);
Expand Down Expand Up @@ -269,9 +290,9 @@ export class MqttOverWSProvider extends AbstractPubSubProvider {
logger.debug('Unsubscribing from topic(s)', targetTopics.join(','));

if (client) {
this._clientIdObservers.get(clientId).delete(observer);
this._clientIdObservers.get(clientId)?.delete(observer);
// No more observers per client => client not needed anymore
if (this._clientIdObservers.get(clientId).size === 0) {
if (this._clientIdObservers.get(clientId)?.size === 0) {
this.disconnect(clientId);
this._clientIdObservers.delete(clientId);
}
Expand Down

0 comments on commit f50bd68

Please sign in to comment.