Skip to content

Commit

Permalink
Merge branch 'master' into patch-2
Browse files Browse the repository at this point in the history
  • Loading branch information
iartemiev authored Aug 23, 2019
2 parents e4f0dba + 5526728 commit 54e6e1f
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 5 deletions.
5 changes: 5 additions & 0 deletions lgtm.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
extraction:
javascript:
index:
filters:
- exclude: "docs"
16 changes: 16 additions & 0 deletions packages/pubsub/__tests__/PubSub-unit-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,22 @@ describe('PubSub', () => {
provider: 'AWSIoTProvider',
})).rejects.toMatchObject({error: new Error('Failed to connect to the network')});
});

test('trigger observer error when disconnected', (done) => {
const pubsub = new PubSub();

const awsIotProvider = new AWSIoTProvider({
aws_pubsub_region: 'region',
aws_pubsub_endpoint: 'wss://iot.mymockendpoint.org:443/notrealmqtt'
});
pubsub.addPluggable(awsIotProvider);

pubsub.subscribe('topic').subscribe({
error: () => done()
});

awsIotProvider.onDisconnect({ errorCode: 1, clientId: '123' });
});
});

describe('MqttOverWSProvider local testing config', () => {
Expand Down
15 changes: 11 additions & 4 deletions packages/pubsub/src/Providers/AWSAppSyncProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,17 @@ export class AWSAppSyncProvider extends MqttOverWSProvider {
// reconnect everything we have in the map
await Promise.all(map.map(async ([clientId, { url, topics }]) => {
// connect to new client
const client = await this.connect(clientId, {
clientId,
url,
});
let client = null;
try {
client = await this.connect(clientId, {
clientId,
url,
});
} catch (err) {
observer.error({ message: 'Failed to connect', error: err });
observer.complete();
return undefined;
}

// subscribe to all topics for this client
// store topic-client mapping
Expand Down
7 changes: 7 additions & 0 deletions packages/pubsub/src/Providers/MqttOverWSProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,14 @@ export class MqttOverWSProvider extends AbstractPubSubProvider {
public onDisconnect({ clientId, errorCode, ...args }) {
if (errorCode !== 0) {
logger.warn(clientId, JSON.stringify({ errorCode, ...args }, null, 2));
this._topicObservers.forEach((observerForTopic, _observerTopic) => {
observerForTopic.forEach(observer => {
observer.error('Disconnected, error code: ' + errorCode);
observer.complete();
});
});
}
this._topicObservers = new Map();
}

public async newClient({ url, clientId }: MqttProvidertOptions): Promise<any> {
Expand Down
2 changes: 1 addition & 1 deletion packages/pushnotification/src/PushNotification.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ export default class PushNotification {
}

let ret = null;
const dataPayload = dataObj.data;
const dataPayload = dataObj.data || {};
if (dataPayload['pinpoint.campaign.campaign_id']) {
ret = {
title: dataPayload['pinpoint.notification.title'],
Expand Down

0 comments on commit 54e6e1f

Please sign in to comment.