Skip to content

Commit

Permalink
kinesis API for AWS
Browse files Browse the repository at this point in the history
  • Loading branch information
Shuai Zhang committed Jul 10, 2015
1 parent 0cf217a commit baaaf21
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 3 deletions.
57 changes: 55 additions & 2 deletions src/lua/api-gateway/aws/kinesis/KinesisService.lua
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ end

-- API: http://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreams.html
-- {
-- "ExclusiveStartStreamName": "string",
-- "Limit": number
-- "ExclusiveStartStreamName": "string", --optional
-- "Limit": number --optional
-- }
function _M:listStreams(streamName, limit)
local arguments = {
Expand All @@ -83,6 +83,59 @@ end



-- API: http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
-- {
-- "StreamName": "string"
-- "Data": blob,
-- "PartitionKey": "string",
-- }
function _M:putRecord(streamName, data, partitionKey)
assert(streamName ~= nil, "Please provide a valid streamName.")
local arguments = {
StreamName = streamName,
Data = ngx.encode_base64(data),
PartitionKey = partitionKey
}
local ok, code, headers, status, body = self:performAction("PutRecord", arguments, "/", "POST", true)

if (code == ngx.HTTP_OK and body ~= nil) then
return cjson.decode(body), code, headers, status, body
end
return nil, code, headers, status, body
end



-- API: http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html
-- {
-- "StreamName": "string"
-- "Records": [
-- {
-- "Data": blob,
-- "PartitionKey": "string"
-- }
-- ],
-- }
function _M:putRecords(streamName, records)
assert(streamName ~= nil, "Please provide a valid streamName.")
-- encode data for each of records as base64
for i, record in ipairs(records) do
record.Data = ngx.encode_base64(record.Data)
end

local arguments = {
StreamName = streamName,
Records = records
}
local ok, code, headers, status, body = self:performAction("PutRecords", arguments, "/", "POST", true)

if (code == ngx.HTTP_OK and body ~= nil) then
return cjson.decode(body), code, headers, status, body
end
return nil, code, headers, status, body
end



return _M

22 changes: 21 additions & 1 deletion test/perl/kinesis.t
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ __DATA__
content_by_lua '
local KinesisService = require "api-gateway.aws.kinesis.KinesisService"
local cjson = require "cjson"
local service = KinesisService:new({
security_credentials_host = "127.0.0.1",
Expand All @@ -113,6 +114,26 @@ __DATA__
assert(table.getn(list.StreamNames) > 0, "At least one stream should have been created")
end

-- use putRecord to put a single record into DEFAULT_STREAM_NAME
local data = "testas"
local partitionKey = "partition"
local json_response, code, headers, status, body = service:putRecord(DEFAULT_STREAM_NAME, data, partitionKey)
assert( code == 200, "PutRecord Action should have returned with 200, but it returned with:" .. tostring(code) .. ", response:" .. tostring(body) )

-- use putRecords to put a batch of records
local records = {
{
Data = "55555",
PartitionKey = "partitionKey1"
},
{
Data = "7777777",
PartitionKey = "partitionKey2"
}
}
local json_response, code, headers, status, body = service:putRecords(DEFAULT_STREAM_NAME, records)
assert( code == 200, "PutRecords Action should have returned with 200, but it returned with:" .. tostring(code) .. ", response:" .. tostring(body) )
assert(json_response.FailedRecordCount == 0, "There are failed records during put")

-- pick the first stream
local streamName = list.StreamNames[1]
Expand All @@ -124,7 +145,6 @@ __DATA__
assert( code == 200, "DeleteStream Action should have returned with 200, but it returned with:" .. tostring(code) .. ", response:" .. tostring(body) )
end

-- TODO: test putRecords
';
}
--- timeout: 70
Expand Down

0 comments on commit baaaf21

Please sign in to comment.