Skip to content

Commit

Permalink
feature: Add support for immediate flag when using Kinesis analytic…
Browse files Browse the repository at this point in the history
…s providers (aws-amplify#10783)
  • Loading branch information
jimblanc authored Dec 16, 2022
1 parent 9d232ad commit d7a18e0
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,44 @@ describe('kinesis firehose provider test', () => {
spyon.mockRestore();
});

test('record with immediate transmission', async () => {
const kinesisProvider = new KinesisFirehoseProvider();
const putRecordBatchCommandSpy = jest.spyOn(
PutRecordBatchCommand.prototype,
'constructor'
);

jest.spyOn(Credentials, 'get').mockImplementationOnce(() => {
return Promise.resolve(credentials);
});

await expect(
kinesisProvider.record({
event: {
data: {
d: 1,
},
streamName: 'testStream',
immediate: true,
},
config: {},
})
).resolves.toBe(true);

// Ensure PutRecord was constructed as expected
expect(putRecordBatchCommandSpy).toHaveBeenCalledTimes(1);
expect(putRecordBatchCommandSpy).toHaveBeenCalledWith({
DeliveryStreamName: 'testStream',
Records: [
{
Data: new Uint8Array([123, 34, 100, 34, 58, 49, 125]), // Encoded data payload
},
],
});

expect(FirehoseClient.prototype.send).toHaveBeenCalledTimes(1);
});

test('record happy case', async () => {
const analytics = new KinesisFirehoseProvider();
analytics.configure({ region: 'region1' });
Expand Down
40 changes: 40 additions & 0 deletions packages/analytics/__tests__/Providers/AWSKinesisProvider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,45 @@ describe('kinesis provider test', () => {
expect(await analytics.record('params')).toBe(false);
});

test('record with immediate transmission', async () => {
const kinesisProvider = new KinesisProvider();
const putRecordCommandSpy = jest.spyOn(
PutRecordsCommand.prototype,
'constructor'
);

jest.spyOn(Credentials, 'get').mockImplementationOnce(() => {
return Promise.resolve(credentials);
});

await expect(
kinesisProvider.record({
event: {
data: {
d: 1,
},
streamName: 'testStream',
immediate: true,
},
config: {},
})
).resolves.toBe(true);

// Ensure PutRecord was constructed as expected
expect(putRecordCommandSpy).toHaveBeenCalledTimes(1);
expect(putRecordCommandSpy).toHaveBeenCalledWith({
Records: [
{
Data: new Uint8Array([123, 34, 100, 34, 58, 49, 125]), // Encoded data payload
PartitionKey: 'partition-identityId',
},
],
StreamName: 'testStream',
});

expect(KinesisClient.prototype.send).toHaveBeenCalledTimes(1);
});

test('record happy case', async () => {
const analytics = new KinesisProvider();

Expand All @@ -90,6 +129,7 @@ describe('kinesis provider test', () => {
});

jest.advanceTimersByTime(6000);
});
});

describe('passing parameters to KinesisClient', () => {
Expand Down
8 changes: 7 additions & 1 deletion packages/analytics/src/Providers/AWSKinesisProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ export class AWSKinesisProvider implements AnalyticsProvider {

Object.assign(params, { config: this._config, credentials });

return this._putToBuffer(params);
if (params.event?.immediate) {
this._sendEvents([params]);

return Promise.resolve(true);
} else {
return this._putToBuffer(params);
}
}

public updateEndpoint() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ export interface KinesisAnalyticsEvent {
data: object | string;
partitionKey: string;
streamName: string;
immediate?: boolean;
}

0 comments on commit d7a18e0

Please sign in to comment.