Skip to content

Commit

Permalink
Merge pull request ceph#11901 from ceph/wip-rgw-compression-type-random
Browse files Browse the repository at this point in the history
rgw: add rgw_compression_type=random for teuthology testing
  • Loading branch information
mattbenjamin authored Nov 21, 2016
2 parents 1f9b698 + 5769b44 commit 9fd205d
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 53 deletions.
18 changes: 18 additions & 0 deletions src/compressor/Compressor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,24 @@ boost::optional<Compressor::CompressionMode> Compressor::get_comp_mode_type(cons

CompressorRef Compressor::create(CephContext *cct, const std::string &type)
{
// support "random" for teuthology testing
if (type == "random") {
static std::random_device seed;
static std::default_random_engine engine(seed());
static Spinlock mutex;

int alg = COMP_ALG_NONE;
std::uniform_int_distribution<> dist(0, COMP_ALG_LAST - 1);
{
std::lock_guard<Spinlock> lock(mutex);
alg = dist(engine);
}
if (alg == COMP_ALG_NONE) {
return nullptr;
}
return create(cct, alg);
}

CompressorRef cs_impl = NULL;
std::stringstream ss;
PluginRegistry *reg = cct->get_plugin_registry();
Expand Down
41 changes: 14 additions & 27 deletions src/rgw/rgw_compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,26 @@ int RGWPutObj_Compress::handle_data(bufferlist& bl, off_t ofs, void **phandle, r
if ((ofs > 0 && compressed) || // if previous part was compressed
(ofs == 0)) { // or it's the first part
ldout(cct, 10) << "Compression for rgw is enabled, compress part " << bl.length() << dendl;
CompressorRef compressor = Compressor::create(cct, cct->_conf->rgw_compression_type);
if (!compressor.get()) {
if (ofs > 0 && compressed) {
lderr(cct) << "Cannot load compressor of type " << cct->_conf->rgw_compression_type
<< " for next part, compression process failed" << dendl;
int cr = compressor->compress(bl, in_bl);
if (cr < 0) {
if (ofs > 0) {
lderr(cct) << "Compression failed with exit code " << cr
<< " for next part, compression process failed" << dendl;
return -EIO;
}
// if compressor isn't available - just do not use it with log warning?
ldout(cct, 5) << "Cannot load compressor of type " << cct->_conf->rgw_compression_type
<< " for rgw, check rgw_compression_type config option" << dendl;
compressed = false;
ldout(cct, 5) << "Compression failed with exit code " << cr
<< " for first part, storing uncompressed" << dendl;
in_bl.claim(bl);
} else {
int cr = compressor->compress(bl, in_bl);
if (cr < 0) {
if (ofs > 0 && compressed) {
lderr(cct) << "Compression failed with exit code " << cr
<< " for next part, compression process failed" << dendl;
return -EIO;
}
ldout(cct, 5) << "Compression failed with exit code " << cr << dendl;
compressed = false;
in_bl.claim(bl);
} else {
compressed = true;
compressed = true;

compression_block newbl;
int bs = blocks.size();
newbl.old_ofs = ofs;
newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0;
newbl.len = in_bl.length();
blocks.push_back(newbl);
}
compression_block newbl;
int bs = blocks.size();
newbl.old_ofs = ofs;
newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0;
newbl.len = in_bl.length();
blocks.push_back(newbl);
}
} else {
compressed = false;
Expand Down
9 changes: 5 additions & 4 deletions src/rgw/rgw_compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ class RGWGetObj_Decompress : public RGWGetObj_Filter
class RGWPutObj_Compress : public RGWPutObj_Filter
{
CephContext* cct;
bool compressed;
bool compressed{false};
CompressorRef compressor;
std::vector<compression_block> blocks;
public:
RGWPutObj_Compress(CephContext* cct_, RGWPutObjDataProcessor* next) : RGWPutObj_Filter(next),
cct(cct_),
compressed(false) {}
RGWPutObj_Compress(CephContext* cct_, CompressorRef compressor,
RGWPutObjDataProcessor* next)
: RGWPutObj_Filter(next), cct(cct_), compressor(compressor) {}
virtual ~RGWPutObj_Compress(){}
virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again) override;

Expand Down
70 changes: 54 additions & 16 deletions src/rgw/rgw_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2808,6 +2808,29 @@ int RGWPutObj::get_data(const off_t fst, const off_t lst, bufferlist& bl)
return ret;
}

// special handling for rgw_compression_type = "random" with multipart uploads
static CompressorRef get_compressor_plugin(const req_state *s)
{
const auto& compression_type = s->cct->_conf->rgw_compression_type;
if (compression_type != "random") {
return Compressor::create(s->cct, compression_type);
}

bool is_multipart{false};
const auto& upload_id = s->info.args.get("uploadId", &is_multipart);

if (!is_multipart) {
return Compressor::create(s->cct, compression_type);
}

// use a hash of the multipart upload id so all parts use the same plugin
const auto alg = std::hash<std::string>{}(upload_id) % Compressor::COMP_ALG_LAST;
if (alg == Compressor::COMP_ALG_NONE) {
return nullptr;
}
return Compressor::create(s->cct, alg);
}

void RGWPutObj::execute()
{
RGWPutObjProcessor *processor = NULL;
Expand All @@ -2824,7 +2847,7 @@ void RGWPutObj::execute()

off_t fst;
off_t lst;
bool compression_enabled;
CompressorRef plugin;
boost::optional<RGWPutObj_Compress> compressor;

bool need_calc_md5 = (dlo_manifest == NULL) && (slo_info == NULL);
Expand Down Expand Up @@ -2910,10 +2933,15 @@ void RGWPutObj::execute()

fst = copy_source_range_fst;
lst = copy_source_range_lst;
compression_enabled = s->cct->_conf->rgw_compression_type != "none";
if (compression_enabled) {
compressor = boost::in_place(s->cct, filter);
filter = &*compressor;
if (s->cct->_conf->rgw_compression_type != "none") {
plugin = get_compressor_plugin(s);
if (!plugin) {
ldout(s->cct, 1) << "Cannot load plugin for rgw_compression_type "
<< s->cct->_conf->rgw_compression_type << dendl;
} else {
compressor = boost::in_place(s->cct, plugin, filter);
filter = &*compressor;
}
}

do {
Expand Down Expand Up @@ -2993,8 +3021,8 @@ void RGWPutObj::execute()
goto done;
}

if (compression_enabled) {
compressor = boost::in_place(s->cct, filter);
if (compressor) {
compressor = boost::in_place(s->cct, plugin, filter);
filter = &*compressor;
}

Expand Down Expand Up @@ -3050,14 +3078,18 @@ void RGWPutObj::execute()

hash.Final(m);

if (compression_enabled && compressor->is_compressed()) {
if (compressor && compressor->is_compressed()) {
bufferlist tmp;
RGWCompressionInfo cs_info;
cs_info.compression_type = s->cct->_conf->rgw_compression_type;
cs_info.compression_type = plugin->get_type_name();
cs_info.orig_size = s->obj_size;
cs_info.blocks = move(compressor->get_compression_blocks());
::encode(cs_info, tmp);
attrs[RGW_ATTR_COMPRESSION] = tmp;
ldout(s->cct, 20) << "storing " << RGW_ATTR_COMPRESSION
<< " with type=" << cs_info.compression_type
<< ", orig_size=" << cs_info.orig_size
<< ", blocks=" << cs_info.blocks.size() << dendl;
}

buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
Expand Down Expand Up @@ -3168,7 +3200,6 @@ void RGWPostObj::execute()
MD5 hash;
buffer::list bl, aclbl;
int len = 0;
bool compression_enabled;
boost::optional<RGWPutObj_Compress> compressor;

// read in the data from the POST form
Expand Down Expand Up @@ -3209,10 +3240,17 @@ void RGWPostObj::execute()
if (op_ret < 0)
return;

compression_enabled = s->cct->_conf->rgw_compression_type != "none";
if (compression_enabled) {
compressor = boost::in_place(s->cct, filter);
filter = &*compressor;
const auto& compression_type = s->cct->_conf->rgw_compression_type;
CompressorRef plugin;
if (compression_type != "none") {
plugin = Compressor::create(s->cct, compression_type);
if (!plugin) {
ldout(s->cct, 1) << "Cannot load plugin for rgw_compression_type "
<< compression_type << dendl;
} else {
compressor = boost::in_place(s->cct, plugin, filter);
filter = &*compressor;
}
}

while (data_pending) {
Expand Down Expand Up @@ -3267,10 +3305,10 @@ void RGWPostObj::execute()
emplace_attr(RGW_ATTR_CONTENT_TYPE, std::move(ct_bl));
}

if (compression_enabled && compressor->is_compressed()) {
if (compressor && compressor->is_compressed()) {
bufferlist tmp;
RGWCompressionInfo cs_info;
cs_info.compression_type = s->cct->_conf->rgw_compression_type;
cs_info.compression_type = plugin->get_type_name();
cs_info.orig_size = s->obj_size;
cs_info.blocks = move(compressor->get_compression_blocks());
::encode(cs_info, tmp);
Expand Down
20 changes: 14 additions & 6 deletions src/rgw/rgw_rados.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7099,12 +7099,20 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
}

boost::optional<RGWPutObj_Compress> compressor;
CompressorRef plugin;

RGWPutObjDataProcessor *filter = &processor;
bool compression_enabled = cct->_conf->rgw_compression_type != "none";
if (compression_enabled) {
compressor = boost::in_place(cct, filter);
filter = &*compressor;

const auto& compression_type = cct->_conf->rgw_compression_type;
if (compression_type != "none") {
plugin = Compressor::create(cct, compression_type);
if (!plugin) {
ldout(cct, 1) << "Cannot load plugin for rgw_compression_type "
<< compression_type << dendl;
} else {
compressor = boost::in_place(cct, plugin, filter);
filter = &*compressor;
}
}

RGWRadosPutObj cb(cct, filter, &processor, opstate, progress_cb, progress_data);
Expand Down Expand Up @@ -7171,10 +7179,10 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
}
}
}
if (compression_enabled && compressor->is_compressed()) {
if (compressor && compressor->is_compressed()) {
bufferlist tmp;
RGWCompressionInfo cs_info;
cs_info.compression_type = cct->_conf->rgw_compression_type;
cs_info.compression_type = plugin->get_type_name();
cs_info.orig_size = cb.get_data_len();
cs_info.blocks = move(compressor->get_compression_blocks());
::encode(cs_info, tmp);
Expand Down

0 comments on commit 9fd205d

Please sign in to comment.