diff --git a/src/lua/api-gateway/aws/kinesis/KinesisService.lua b/src/lua/api-gateway/aws/kinesis/KinesisService.lua index cac13e9..8b57039 100644 --- a/src/lua/api-gateway/aws/kinesis/KinesisService.lua +++ b/src/lua/api-gateway/aws/kinesis/KinesisService.lua @@ -65,8 +65,8 @@ end -- API: http://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreams.html -- { --- "ExclusiveStartStreamName": "string", --- "Limit": number +-- "ExclusiveStartStreamName": "string", --optional +-- "Limit": number --optional -- } function _M:listStreams(streamName, limit) local arguments = { @@ -83,6 +83,59 @@ end +-- API: http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html +-- { +-- "StreamName": "string" +-- "Data": blob, +-- "PartitionKey": "string", +-- } +function _M:putRecord(streamName, data, partitionKey) + assert(streamName ~= nil, "Please provide a valid streamName.") + local arguments = { + StreamName = streamName, + Data = ngx.encode_base64(data), + PartitionKey = partitionKey + } + local ok, code, headers, status, body = self:performAction("PutRecord", arguments, "/", "POST", true) + + if (code == ngx.HTTP_OK and body ~= nil) then + return cjson.decode(body), code, headers, status, body + end + return nil, code, headers, status, body +end + + + +-- API: http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html +-- { +-- "StreamName": "string" +-- "Records": [ +-- { +-- "Data": blob, +-- "PartitionKey": "string" +-- } +-- ], +-- } +function _M:putRecords(streamName, records) + assert(streamName ~= nil, "Please provide a valid streamName.") + -- encode data for each of records as base64 + for i, record in ipairs(records) do + record.Data = ngx.encode_base64(record.Data) + end + + local arguments = { + StreamName = streamName, + Records = records + } + local ok, code, headers, status, body = self:performAction("PutRecords", arguments, "/", "POST", true) + + if (code == ngx.HTTP_OK and body ~= nil) then + return cjson.decode(body), code, headers, status, body + end + return nil, code, headers, status, body +end + + return _M diff --git a/test/perl/kinesis.t b/test/perl/kinesis.t index e40332b..0743e02 100644 --- a/test/perl/kinesis.t +++ b/test/perl/kinesis.t @@ -91,6 +91,7 @@ __DATA__ content_by_lua ' local KinesisService = require "api-gateway.aws.kinesis.KinesisService" + local cjson = require "cjson" local service = KinesisService:new({ security_credentials_host = "127.0.0.1", @@ -113,6 +114,26 @@ __DATA__ assert(table.getn(list.StreamNames) > 0, "At least one stream should have been created") end + -- use putRecord to put a single record into DEFAULT_STREAM_NAME + local data = "testas" + local partitionKey = "partition" + local json_response, code, headers, status, body = service:putRecord(DEFAULT_STREAM_NAME, data, partitionKey) + assert( code == 200, "PutRecord Action should have returned with 200, but it returned with:" .. tostring(code) .. ", response:" .. tostring(body) ) + + -- use putRecords to put a batch of records + local records = { + { + Data = "55555", + PartitionKey = "partitionKey1" + }, + { + Data = "7777777", + PartitionKey = "partitionKey2" + } + } + local json_response, code, headers, status, body = service:putRecords(DEFAULT_STREAM_NAME, records) + assert( code == 200, "PutRecords Action should have returned with 200, but it returned with:" .. tostring(code) .. ", response:" .. tostring(body) ) + assert(json_response.FailedRecordCount == 0, "There are failed records during put") -- pick the first stream local streamName = list.StreamNames[1] @@ -124,7 +145,6 @@ __DATA__ assert( code == 200, "DeleteStream Action should have returned with 200, but it returned with:" .. tostring(code) .. ", response:" .. tostring(body) ) end - -- TODO: test putRecords '; } --- timeout: 70