Skip to content

Commit

Permalink
Merge pull request ceph#44301 from yuvalif/wip-yuval-cloudevents
Browse files Browse the repository at this point in the history
rgw/notifications: add cloudevents support to HTTP endpoint
  • Loading branch information
yuvalif authored Jan 9, 2022
2 parents d889215 + e35d511 commit 493181d
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 85 deletions.
3 changes: 3 additions & 0 deletions doc/radosgw/notifications.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ To update a topic, use the same command used for topic creation, with the topic
[&Attributes.entry.7.key=OpaqueData&Attributes.entry.7.value=<opaque data>]
[&Attributes.entry.8.key=push-endpoint&Attributes.entry.8.value=<endpoint>]
[&Attributes.entry.9.key=persistent&Attributes.entry.9.value=true|false]
[&Attributes.entry.10.key=cloudevents&Attributes.entry.10.value=true|false]

Request parameters:

Expand All @@ -134,6 +135,7 @@ Request parameters:
- URI: ``http[s]://<fqdn>[:<port]``
- port defaults to: 80/443 for HTTP/S accordingly
- verify-ssl: indicate whether the server certificate is validated by the client or not ("true" by default)
- cloudevents: indicate whether the HTTP header should contain attributes according to the `S3 CloudEvents Spec`_ ("false" by default)

- AMQP0.9.1 endpoint

Expand Down Expand Up @@ -450,3 +452,4 @@ pushed or pulled using the pubsub sync module. For example:
.. _S3 Notification Compatibility: ../s3-notification-compatibility
.. _AWS Create Topic: https://docs.aws.amazon.com/sns/latest/api/API_CreateTopic.html
.. _Bucket Operations: ../s3/bucketops
.. _S3 CloudEvents Spec: https://github.com/cloudevents/spec/blob/main/cloudevents/adapters/aws-s3.md
10 changes: 5 additions & 5 deletions src/rgw/rgw_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -918,9 +918,9 @@ RGWHTTPArgs::get_optional(const std::string& name) const
}
}

int RGWHTTPArgs::get_bool(const string& name, bool *val, bool *exists)
int RGWHTTPArgs::get_bool(const string& name, bool *val, bool *exists) const
{
map<string, string>::iterator iter;
map<string, string>::const_iterator iter;
iter = val_map.find(name);
bool e = (iter != val_map.end());
if (exists)
Expand All @@ -941,13 +941,13 @@ int RGWHTTPArgs::get_bool(const string& name, bool *val, bool *exists)
return 0;
}

int RGWHTTPArgs::get_bool(const char *name, bool *val, bool *exists)
int RGWHTTPArgs::get_bool(const char *name, bool *val, bool *exists) const
{
string s(name);
return get_bool(s, val, exists);
}

void RGWHTTPArgs::get_bool(const char *name, bool *val, bool def_val)
void RGWHTTPArgs::get_bool(const char *name, bool *val, bool def_val) const
{
bool exists = false;
if ((get_bool(name, val, &exists) < 0) ||
Expand All @@ -956,7 +956,7 @@ void RGWHTTPArgs::get_bool(const char *name, bool *val, bool def_val)
}
}

int RGWHTTPArgs::get_int(const char *name, int *val, int def_val)
int RGWHTTPArgs::get_int(const char *name, int *val, int def_val) const
{
bool exists = false;
string val_str;
Expand Down
8 changes: 4 additions & 4 deletions src/rgw/rgw_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,10 @@ class RGWHTTPArgs {
const std::string& get(const std::string& name, bool *exists = NULL) const;
boost::optional<const std::string&>
get_optional(const std::string& name) const;
int get_bool(const std::string& name, bool *val, bool *exists);
int get_bool(const char *name, bool *val, bool *exists);
void get_bool(const char *name, bool *val, bool def_val);
int get_int(const char *name, int *val, int def_val);
int get_bool(const std::string& name, bool *val, bool *exists) const;
int get_bool(const char *name, bool *val, bool *exists) const;
void get_bool(const char *name, bool *val, bool def_val) const;
int get_int(const char *name, int *val, int def_val) const;

/** Get the value for specific system argument parameter */
std::string sys_get(const std::string& name, bool *exists = nullptr) const;
Expand Down
87 changes: 32 additions & 55 deletions src/rgw/rgw_pubsub_push.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <algorithm>
#include "include/buffer_fwd.h"
#include "common/Formatter.h"
#include "common/iso_8601.h"
#include "common/async/completion.h"
#include "rgw_common.h"
#include "rgw_data_sync.h"
Expand Down Expand Up @@ -39,14 +40,26 @@ std::string json_format_pubsub_event(const EventType& event) {
f.flush(ss);
return ss.str();
}

bool get_bool(const RGWHTTPArgs& args, const std::string& name, bool default_value) {
bool value;
bool exists;
if (args.get_bool(name.c_str(), &value, &exists) == -EINVAL) {
throw RGWPubSubEndpoint::configuration_error("invalid boolean value for " + name);
}
if (!exists) {
return default_value;
}
return value;
}

class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
private:
const std::string endpoint;
std::string str_ack_level;
typedef unsigned ack_level_t;
ack_level_t ack_level; // TODO: not used for now
bool verify_ssl;
const bool verify_ssl;
const bool cloudevents;
static const ack_level_t ACK_LEVEL_ANY = 0;
static const ack_level_t ACK_LEVEL_NON_ERROR = 1;

Expand Down Expand Up @@ -98,11 +111,11 @@ class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
};

public:
RGWPubSubHTTPEndpoint(const std::string& _endpoint,
const RGWHTTPArgs& args) : endpoint(_endpoint) {
RGWPubSubHTTPEndpoint(const std::string& _endpoint, const RGWHTTPArgs& args) :
endpoint(_endpoint), verify_ssl(get_bool(args, "verify-ssl", true)), cloudevents(get_bool(args, "cloudevents", false))
{
bool exists;

str_ack_level = args.get("http-ack-level", &exists);
const auto& str_ack_level = args.get("http-ack-level", &exists);
if (!exists || str_ack_level == "any") {
// "any" is default
ack_level = ACK_LEVEL_ANY;
Expand All @@ -114,17 +127,6 @@ class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level);
}
}

auto str_verify_ssl = args.get("verify-ssl", &exists);
boost::algorithm::to_lower(str_verify_ssl);
// verify server certificate by default
if (!exists || str_verify_ssl == "true") {
verify_ssl = true;
} else if (str_verify_ssl == "false") {
verify_ssl = false;
} else {
throw configuration_error("HTTP/S: verify-ssl must be true/false, not: " + str_verify_ssl);
}
}

RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
Expand All @@ -139,6 +141,17 @@ class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
bufferlist read_bl;
RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
const auto post_data = json_format_pubsub_event(event);
if (cloudevents) {
// following: https://github.com/cloudevents/spec/blob/v1.0.1/http-protocol-binding.md
// using "Binary Content Mode"
request.append_header("ce-specversion", "1.0");
request.append_header("ce-type", "com.amazonaws." + event.eventName);
request.append_header("ce-time", to_iso_8601(event.eventTime));
// default output of iso8601 is also RFC3339 compatible
request.append_header("ce-id", event.x_amz_request_id + "." + event.x_amz_id_2);
request.append_header("ce-source", event.eventSource + "." + event.awsRegion + "." + event.bucket_name);
request.append_header("ce-subject", event.object_key);
}
request.set_post_data(post_data);
request.set_send_length(post_data.length());
request.append_header("Content-Type", "application/json");
Expand All @@ -152,10 +165,8 @@ class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
std::string to_str() const override {
std::string str("HTTP/S Endpoint");
str += "\nURI: " + endpoint;
str += "\nAck Level: " + str_ack_level;
str += (verify_ssl ? "\nverify SSL" : "\ndon't verify SSL");
return str;

}
};

Expand Down Expand Up @@ -440,44 +451,10 @@ class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
kafka::connection_ptr_t conn;
const ack_level_t ack_level;

bool get_verify_ssl(const RGWHTTPArgs& args) {
bool exists;
auto str_verify_ssl = args.get("verify-ssl", &exists);
if (!exists) {
// verify server certificate by default
return true;
}
boost::algorithm::to_lower(str_verify_ssl);
if (str_verify_ssl == "true") {
return true;
}
if (str_verify_ssl == "false") {
return false;
}
throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl);
}

bool get_use_ssl(const RGWHTTPArgs& args) {
bool exists;
auto str_use_ssl = args.get("use-ssl", &exists);
if (!exists) {
// by default ssl not used
return false;
}
boost::algorithm::to_lower(str_use_ssl);
if (str_use_ssl == "true") {
return true;
}
if (str_use_ssl == "false") {
return false;
}
throw configuration_error("'use-ssl' must be true/false, not: " + str_use_ssl);
}

ack_level_t get_ack_level(const RGWHTTPArgs& args) {
bool exists;
// get ack level
const auto str_ack_level = args.get("kafka-ack-level", &exists);
const auto& str_ack_level = args.get("kafka-ack-level", &exists);
if (!exists || str_ack_level == "broker") {
// "broker" is default
return ack_level_t::Broker;
Expand Down Expand Up @@ -584,7 +561,7 @@ class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
CephContext* _cct) :
cct(_cct),
topic(_topic),
conn(kafka::connect(_endpoint, get_use_ssl(args), get_verify_ssl(args), args.get_optional("ca-location"))) ,
conn(kafka::connect(_endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), args.get_optional("ca-location"))) ,
ack_level(get_ack_level(args)) {
if (!conn) {
throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
Expand Down
3 changes: 3 additions & 0 deletions src/test/rgw/bucket_notification/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ boto >=2.6.0
boto3 >=1.0.0
configparser >=5.0.0
kafka-python >=2.0.0
pika
cloudevents
xmltodict
Loading

0 comments on commit 493181d

Please sign in to comment.