Skip to content

Commit

Permalink
Merge pull request ceph#11269 from yehudasa/wip-multipart-uploads-cp
Browse files Browse the repository at this point in the history
rgw: multipart upload copy

Reviewed-by: Casey Bodley <[email protected]>
  • Loading branch information
cbodley authored Oct 12, 2016
2 parents 09ac5db + 3e34380 commit 90b8196
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 14 deletions.
2 changes: 1 addition & 1 deletion doc/radosgw/s3.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ The following table describes the support status for current Amazon S3 functiona
+---------------------------------+-----------------+----------------------------------------+
| **Copy Object** | Supported | |
+---------------------------------+-----------------+----------------------------------------+
| **Multipart Uploads** | Supported | (missing Copy Part) |
| **Multipart Uploads** | Supported | |
+---------------------------------+-----------------+----------------------------------------+

Unsupported Header Fields
Expand Down
98 changes: 97 additions & 1 deletion src/rgw/rgw_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2484,6 +2484,31 @@ void RGWDeleteBucket::execute()

int RGWPutObj::verify_permission()
{
if (copy_source) {

RGWAccessControlPolicy cs_policy(s->cct);
map<string, bufferlist> cs_attrs;
rgw_bucket cs_bucket(copy_source_bucket_info.bucket);
rgw_obj_key cs_object(copy_source_object_name, copy_source_version_id);

rgw_obj obj(cs_bucket, cs_object.name);
obj.set_instance(cs_object.instance);
store->set_atomic(s->obj_ctx, obj);
store->set_prefetch_data(s->obj_ctx, obj);

/* check source object permissions */
if (read_policy(store, s, copy_source_bucket_info, cs_attrs, &cs_policy, cs_bucket, cs_object) < 0) {
return -EACCES;
}

/* admin request overrides permission checks */
if (!s->auth_identity->is_admin_of(cs_policy.get_owner().get_id()) &&
!cs_policy.verify_permission(*s->auth_identity, s->perm_mask, RGW_PERM_READ)) {
return -EACCES;
}

}

if (!verify_bucket_permission(s, RGW_PERM_WRITE)) {
return -EACCES;
}
Expand Down Expand Up @@ -2662,6 +2687,59 @@ void RGWPutObj::pre_exec()
rgw_bucket_object_pre_exec(s);
}

class RGWPutObj_CB : public RGWGetDataCB
{
RGWPutObj *op;
public:
RGWPutObj_CB(RGWPutObj *_op) : op(_op) {}
virtual ~RGWPutObj_CB() {}

int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) {
return op->get_data_cb(bl, bl_ofs, bl_len);
}
};

int RGWPutObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len)
{
bufferlist bl_tmp;
bl.copy(bl_ofs, bl_len, bl_tmp);

bl_aux.append(bl_tmp);

return bl_len;
}

int RGWPutObj::get_data(const off_t fst, const off_t lst, bufferlist& bl)
{
RGWPutObj_CB cb(this);
int ret = 0;

int64_t new_ofs, new_end;

new_ofs = fst;
new_end = lst;

rgw_obj_key obj_key(copy_source_object_name, copy_source_version_id);
rgw_obj obj(copy_source_bucket_info.bucket, obj_key.name);
obj.set_instance(obj_key.instance);

RGWRados::Object op_target(store, copy_source_bucket_info, *static_cast<RGWObjectCtx *>(s->obj_ctx), obj);
RGWRados::Object::Read read_op(&op_target);

ret = read_op.prepare(&new_ofs, &new_end);
if (ret < 0)
return ret;

ret = read_op.iterate(new_ofs, new_end, &cb);
if (ret < 0) {
return ret;
}

bl.claim_append(bl_aux);

return ret;
}

void RGWPutObj::execute()
{
RGWPutObjProcessor *processor = NULL;
Expand All @@ -2675,6 +2753,9 @@ void RGWPutObj::execute()
map<string, string>::iterator iter;
bool multipart;

off_t fst;
off_t lst;

bool need_calc_md5 = (dlo_manifest == NULL) && (slo_info == NULL);

perfcounter->inc(l_rgw_put);
Expand Down Expand Up @@ -2753,9 +2834,24 @@ void RGWPutObj::execute()
goto done;
}

fst = copy_source_range_fst;
lst = copy_source_range_lst;

do {
bufferlist data_in;
len = get_data(data_in);
if (fst > lst)
break;
if (!copy_source) {
len = get_data(data_in);
} else {
uint64_t cur_lst = min(fst + s->cct->_conf->rgw_max_chunk_size - 1, lst);
op_ret = get_data(fst, cur_lst, data_in);
if (op_ret < 0)
goto done;
len = data_in.length();
s->content_length += len;
fst += len;
}
if (len < 0) {
op_ret = len;
goto done;
Expand Down
17 changes: 17 additions & 0 deletions src/rgw/rgw_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,15 @@ class RGWPutObj : public RGWOp {
const char *supplied_etag;
const char *if_match;
const char *if_nomatch;
const char *copy_source;
const char *copy_source_range;
RGWBucketInfo copy_source_bucket_info;
string copy_source_tenant_name;
string copy_source_bucket_name;
string copy_source_object_name;
string copy_source_version_id;
off_t copy_source_range_fst;
off_t copy_source_range_lst;
string etag;
bool chunked_upload;
RGWAccessControlPolicy policy;
Expand All @@ -664,6 +673,7 @@ class RGWPutObj : public RGWOp {
ceph::real_time mtime;
uint64_t olh_epoch;
string version_id;
bufferlist bl_aux;

ceph::real_time delete_at;

Expand All @@ -673,6 +683,10 @@ class RGWPutObj : public RGWOp {
supplied_etag(NULL),
if_match(NULL),
if_nomatch(NULL),
copy_source(NULL),
copy_source_range(NULL),
copy_source_range_fst(0),
copy_source_range_lst(0),
chunked_upload(0),
dlo_manifest(NULL),
slo_info(NULL),
Expand All @@ -698,6 +712,9 @@ class RGWPutObj : public RGWOp {
void pre_exec();
void execute();

int get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len);
int get_data(const off_t fst, const off_t lst, bufferlist& bl);

virtual int get_params() = 0;
virtual int get_data(bufferlist& bl) = 0;
virtual void send_response() = 0;
Expand Down
115 changes: 103 additions & 12 deletions src/rgw/rgw_rest_s3.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1060,18 +1060,92 @@ void RGWDeleteBucket_ObjStore_S3::send_response()

int RGWPutObj_ObjStore_S3::get_params()
{
RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
map<string, bufferlist> src_attrs;
size_t pos;
int ret;

RGWAccessControlPolicy_S3 s3policy(s->cct);
if (!s->length)
return -ERR_LENGTH_REQUIRED;

int r = create_s3_policy(s, store, s3policy, s->owner);
if (r < 0)
return r;
ret = create_s3_policy(s, store, s3policy, s->owner);
if (ret < 0)
return ret;

policy = s3policy;

if_match = s->info.env->get("HTTP_IF_MATCH");
if_nomatch = s->info.env->get("HTTP_IF_NONE_MATCH");
copy_source = s->info.env->get("HTTP_X_AMZ_COPY_SOURCE");
copy_source_range = s->info.env->get("HTTP_X_AMZ_COPY_SOURCE_RANGE");

/* handle x-amz-copy-source */

if (copy_source) {
copy_source_bucket_name = copy_source;
pos = copy_source_bucket_name.find("/");
if (pos == std::string::npos) {
ret = -EINVAL;
ldout(s->cct, 5) << "x-amz-copy-source bad format" << dendl;
return ret;
}
copy_source_object_name = copy_source_bucket_name.substr(pos + 1, copy_source_bucket_name.size());
copy_source_bucket_name = copy_source_bucket_name.substr(0, pos);
#define VERSION_ID_STR "?versionId="
pos = copy_source_object_name.find(VERSION_ID_STR);
if (pos == std::string::npos) {
url_decode(copy_source_object_name, copy_source_object_name);
} else {
copy_source_version_id = copy_source_object_name.substr(pos + sizeof(VERSION_ID_STR) - 1);
url_decode(copy_source_object_name.substr(0, pos), copy_source_object_name);
}
pos = copy_source_bucket_name.find(":");
if (pos == std::string::npos) {
copy_source_tenant_name = s->src_tenant_name;
} else {
copy_source_tenant_name = copy_source_bucket_name.substr(0, pos);
copy_source_bucket_name = copy_source_bucket_name.substr(pos + 1, copy_source_bucket_name.size());
if (copy_source_bucket_name.empty()) {
ret = -EINVAL;
ldout(s->cct, 5) << "source bucket name is empty" << dendl;
return ret;
}
}
ret = store->get_bucket_info(obj_ctx,
copy_source_tenant_name,
copy_source_bucket_name,
copy_source_bucket_info,
NULL, &src_attrs);
if (ret < 0) {
ldout(s->cct, 5) << __func__ << "(): get_bucket_info() returned ret=" << ret << dendl;
return ret;
}

/* handle x-amz-copy-source-range */

if (copy_source_range) {
string range = copy_source_range;
pos = range.find("=");
if (pos == std::string::npos) {
ret = -EINVAL;
ldout(s->cct, 5) << "x-amz-copy-source-range bad format" << dendl;
return ret;
}
range = range.substr(pos + 1);
pos = range.find("-");
if (pos == std::string::npos) {
ret = -EINVAL;
ldout(s->cct, 5) << "x-amz-copy-source-range bad format" << dendl;
return ret;
}
string first = range.substr(0, pos);
string last = range.substr(pos + 1);
copy_source_range_fst = strtoull(first.c_str(), NULL, 10);
copy_source_range_lst = strtoull(last.c_str(), NULL, 10);
}

} /* copy_source */

return RGWPutObj_ObjStore::get_params();
}
Expand Down Expand Up @@ -1257,8 +1331,28 @@ void RGWPutObj_ObjStore_S3::send_response()
s->cct->_conf->rgw_s3_success_create_obj_status);
set_req_state_err(s, op_ret);
}
dump_etag(s, etag.c_str());
dump_content_length(s, 0);
if (!copy_source) {
dump_etag(s, etag.c_str());
dump_content_length(s, 0);
} else {
dump_errno(s);
end_header(s, this, "application/xml");
dump_start(s);
struct tm tmp;
utime_t ut(mtime);
time_t secs = (time_t)ut.sec();
gmtime_r(&secs, &tmp);
char buf[TIME_BUF_SIZE];
s->formatter->open_object_section_in_ns("CopyPartResult",
"http://s3.amazonaws.com/doc/2006-03-01/");
if (strftime(buf, sizeof(buf), "%Y-%m-%dT%T.000Z", &tmp) > 0) {
s->formatter->dump_string("LastModified", buf);
}
s->formatter->dump_string("ETag", etag);
s->formatter->close_section();
rgw_flush_formatter_and_reset(s, s->formatter);
return;
}
}
if (s->system_request && !real_clock::is_zero(mtime)) {
dump_epoch_header(s, "Rgwx-Mtime", mtime);
Expand Down Expand Up @@ -2104,10 +2198,6 @@ int RGWCopyObj_ObjStore_S3::init_dest_policy()

int RGWCopyObj_ObjStore_S3::get_params()
{
if (s->info.env->get("HTTP_X_AMZ_COPY_SOURCE_RANGE")) {
return -ERR_NOT_IMPLEMENTED;
}

if_mod = s->info.env->get("HTTP_X_AMZ_COPY_IF_MODIFIED_SINCE");
if_unmod = s->info.env->get("HTTP_X_AMZ_COPY_IF_UNMODIFIED_SINCE");
if_match = s->info.env->get("HTTP_X_AMZ_COPY_IF_MATCH");
Expand Down Expand Up @@ -3133,10 +3223,11 @@ int RGWHandler_REST_S3::init(RGWRados *store, struct req_state *s,
s->has_acl_header = s->info.env->exists_prefix("HTTP_X_AMZ_GRANT");

const char *copy_source = s->info.env->get("HTTP_X_AMZ_COPY_SOURCE");
if (copy_source) {

if (copy_source && !s->info.env->get("HTTP_X_AMZ_COPY_SOURCE_RANGE")) {
ret = RGWCopyObj::parse_copy_location(copy_source,
s->init_state.src_bucket,
s->src_object);
s->init_state.src_bucket,
s->src_object);
if (!ret) {
ldout(s->cct, 0) << "failed to parse copy location" << dendl;
return -EINVAL; // XXX why not -ERR_INVALID_BUCKET_NAME or -ERR_BAD_URL?
Expand Down

0 comments on commit 90b8196

Please sign in to comment.