-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathnats_adapter.lua
93 lines (86 loc) · 3.78 KB
/
nats_adapter.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
local json = require ("lunajson")
local nats = require ("nats")
function nats_connect(nats_host, nats_port)
local nats_params = {
host = nats_host,
port = nats_port,
}
client = nats.connect(nats_params)
client:connect()
end
function toJson(request, eventName, opaqueData, configure)
supported_event = true
local notification = {
["Records"] = {
["eventVersion"] = "2.1",
["eventSource"] = "ceph:s3",
["awsRegion"] = request.ZoneGroup.Name,
["eventTime"] = request.Time,
["eventName"] = eventName,
["userIdentity"] = {
["principalId"] = request.User.Id
},
["requestParameters"] = {
["sourceIPAddress"] = ""
},
["responseElements"] = {
["x-amz-request-id"] = request.Id,
["x-amz-id-2"] = request.RGWId
},
["s3"] = {
["s3SchemaVersion"] = "1.0",
["configurationId"] = configure,
["bucket"] = {
["name"] = request.Bucket.Name,
["ownerIdentity"] = {
["principalId"] = request.Bucket.User.Id
},
["arn"] = "arn:aws:s3:" .. request.ZoneGroup.Name .. "::" .. request.Bucket.Name,
["id"] = request.Bucket.Id
},
["object"] = {
["key"] = request.Object.Name,
["size"] = request.Object.Size,
["eTag"] = "", -- eTag is not supported yet
["versionId"] = request.Object.Instance,
["sequencer"] = string.format("%x", os.time()),
["metadata"] = {
json.encode(request.HTTP.Metadata)
},
["tags"] = {
json.encode(request.Tags)
}
}
},
["eventId"] = "",
["opaqueData"] = opaqueData
}
}
return notification
end
supported_event = false
configure = "mynotif1"
opaqueData = "[email protected]"
topic = "Bucket_Notification"
bucket_name = "mybucket"
nats_host = '0.0.0.0'
nats_port = 4222
if bucket_name == Request.Bucket.Name then
--Object Created
if Request.RGWOp == "put_obj" then
notification = toJson(Request ,'ObjectCreated:Put', opaqueData, configure)
elseif Request.RGWOp == "post_obj" then
notification = toJson(Request ,'ObjectCreated:Post', opaqueData, configure)
elseif Request.RGWOp == "copy_obj" then
notification = toJson(Request ,'ObjectCreated:Copy', opaqueData, configure)
--Object Removed
elseif Request.RGWOp == "delete_obj" then
notification = toJson(Request ,'ObjectRemoved:Delete', opaqueData, configure)
end
if supported_event == true then
nats_connect()
local payload = json.encode(notification)
client:publish(topic, payload)
RGWDebugLog("bucket notification sent to nats://" .. nats_host .. ":" .. nats_port .. "/" .. topic)
end
end