Skip to content

Commit

Permalink
Merge pull request aws-amplify#3890 from elorzafe/fix-pubsub-mqtt-pro…
Browse files Browse the repository at this point in the history
…vider

fix(@aws-amplify/pubsub): call observer.error on disconnect error for MqttOverWSProvider
  • Loading branch information
elorzafe authored Aug 22, 2019
2 parents ece0b22 + 0795db6 commit 5526728
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 0 deletions.
16 changes: 16 additions & 0 deletions packages/pubsub/__tests__/PubSub-unit-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,22 @@ describe('PubSub', () => {
provider: 'AWSIoTProvider',
})).rejects.toMatchObject({error: new Error('Failed to connect to the network')});
});

test('trigger observer error when disconnected', (done) => {
const pubsub = new PubSub();

const awsIotProvider = new AWSIoTProvider({
aws_pubsub_region: 'region',
aws_pubsub_endpoint: 'wss://iot.mymockendpoint.org:443/notrealmqtt'
});
pubsub.addPluggable(awsIotProvider);

pubsub.subscribe('topic').subscribe({
error: () => done()
});

awsIotProvider.onDisconnect({ errorCode: 1, clientId: '123' });
});
});

describe('MqttOverWSProvider local testing config', () => {
Expand Down
7 changes: 7 additions & 0 deletions packages/pubsub/src/Providers/MqttOverWSProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,14 @@ export class MqttOverWSProvider extends AbstractPubSubProvider {
public onDisconnect({ clientId, errorCode, ...args }) {
if (errorCode !== 0) {
logger.warn(clientId, JSON.stringify({ errorCode, ...args }, null, 2));
this._topicObservers.forEach((observerForTopic, _observerTopic) => {
observerForTopic.forEach(observer => {
observer.error('Disconnected, error code: ' + errorCode);
observer.complete();
});
});
}
this._topicObservers = new Map();
}

public async newClient({ url, clientId }: MqttProvidertOptions): Promise<any> {
Expand Down

0 comments on commit 5526728

Please sign in to comment.