Skip to content

Commit

Permalink
rgw: refine es related coroutines
Browse files Browse the repository at this point in the history
Signed-off-by: Chang Liu <[email protected]>
  • Loading branch information
liuchang0812 committed Dec 16, 2019
1 parent 2cd512e commit 722e2d0
Showing 1 changed file with 83 additions and 42 deletions.
125 changes: 83 additions & 42 deletions src/rgw/rgw_sync_module_es.cc
Original file line number Diff line number Diff line change
Expand Up @@ -641,63 +641,57 @@ struct es_obj_metadata {
}
};

class RGWElasticInitConfigCBCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
ElasticConfigRef conf;
ESInfo es_info;

struct _err_response {
struct err_reason {
vector<err_reason> root_cause;
string type;
string reason;
string index;

void decode_json(JSONObj *obj) {
JSONDecoder::decode_json("root_cause", root_cause, obj);
JSONDecoder::decode_json("type", type, obj);
JSONDecoder::decode_json("reason", reason, obj);
JSONDecoder::decode_json("index", index, obj);
}
} error;

void decode_json(JSONObj *obj) {
JSONDecoder::decode_json("error", error, obj);
}
} err_response;

class RGWElasticGetESInfoCBCR : public RGWCoroutine {
public:
RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env,
RGWElasticGetESInfoCBCR(RGWDataSyncEnv *_sync_env,
ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
conf(_conf) {}
int operate() override {
reenter(this) {
ldout(sync_env->cct, 0) << ": init elasticsearch config zone=" << sync_env->source_zone << dendl;
ldout(sync_env->cct, 5) << conf->id << ": get elasticsearch info for zone: " << sync_env->source_zone << dendl;
yield call(new RGWReadRESTResourceCR<ESInfo> (sync_env->cct,
conf->conn.get(),
sync_env->http_manager,
"/", nullptr /*params*/,
&(conf->default_headers),
&es_info));
&(conf->es_info)));
if (retcode < 0) {
ldout(sync_env->cct, 5) << conf->id << ": get elasticsearch failed: " << retcode << dendl;
return set_cr_error(retcode);
}

ldout(sync_env->cct, 5) << conf->id << ": got elastic version=" << conf->es_info.get_version_str() << dendl;
return set_cr_done();
}
return 0;
}
private:
RGWDataSyncEnv *sync_env;
ElasticConfigRef conf;
};

class RGWElasticPutIndexCBCR : public RGWCoroutine {
public:
RGWElasticPutIndexCBCR(RGWDataSyncEnv *_sync_env,
ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
conf(_conf) {}
int operate() override {
reenter(this) {
ldout(sync_env->cct, 5) << conf->id << ": put elasticsearch index for zone: " << sync_env->source_zone << dendl;

yield {
string path = conf->get_index_path();
ldout(sync_env->cct, 5) << "got elastic version=" << es_info.get_version_str() << dendl;

es_index_settings settings(conf->num_replicas, conf->num_shards);

std::unique_ptr<es_index_config_base> index_conf;

if (es_info.version >= ES_V5) {
if (conf->es_info.version >= ES_V5) {
ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version >= 5" << dendl;
index_conf.reset(new es_index_config<es_type_v5>(settings, es_info.version));
index_conf.reset(new es_index_config<es_type_v5>(settings, conf->es_info.version));
} else {
ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version < 5" << dendl;
index_conf.reset(new es_index_config<es_type_v2>(settings, es_info.version));
index_conf.reset(new es_index_config<es_type_v2>(settings, conf->es_info.version));
}
call(new RGWPutRESTResourceCR<es_index_config_base, int, _err_response> (sync_env->cct,
conf->conn.get(),
Expand All @@ -707,10 +701,10 @@ class RGWElasticInitConfigCBCR : public RGWCoroutine {
*index_conf, nullptr, &err_response));
}
if (retcode < 0) {
ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl;

if (err_response.error.type != "index_already_exists_exception" &&
err_response.error.type != "resource_already_exists_exception") {
err_response.error.type != "resource_already_exists_exception") {
ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl;
return set_cr_error(retcode);
}

Expand All @@ -721,6 +715,58 @@ class RGWElasticInitConfigCBCR : public RGWCoroutine {
return 0;
}

private:
RGWDataSyncEnv *sync_env;
ElasticConfigRef conf;

struct _err_response {
struct err_reason {
vector<err_reason> root_cause;
string type;
string reason;
string index;

void decode_json(JSONObj *obj) {
JSONDecoder::decode_json("root_cause", root_cause, obj);
JSONDecoder::decode_json("type", type, obj);
JSONDecoder::decode_json("reason", reason, obj);
JSONDecoder::decode_json("index", index, obj);
}
} error;

void decode_json(JSONObj *obj) {
JSONDecoder::decode_json("error", error, obj);
}
} err_response;
};

class RGWElasticInitConfigCBCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
ElasticConfigRef conf;

public:
RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env,
ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
conf(_conf) {}
int operate() override {
reenter(this) {

yield call(new RGWElasticGetESInfoCBCR(sync_env, conf));

if (retcode < 0) {
return set_cr_error(retcode);
}

yield call(new RGWElasticPutIndexCBCR(sync_env, conf));
if (retcode < 0) {
return set_cr_error(retcode);
}
return set_cr_done();
}
return 0;
}

};

class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
Expand Down Expand Up @@ -827,12 +873,7 @@ class RGWElasticDataSyncModule : public RGWDataSyncModule {
RGWCoroutine *start_sync(RGWDataSyncEnv *sync_env) override {
ldout(sync_env->cct, 5) << conf->id << ": start_sync" << dendl;
// try to get elastic search version
return new RGWReadRESTResourceCR<ESInfo>(sync_env->cct,
conf->conn.get(),
sync_env->http_manager,
"/", nullptr,
&(conf->default_headers),
&(conf->es_info));
return new RGWElasticGetESInfoCBCR(sync_env, conf);
}

RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
Expand Down

0 comments on commit 722e2d0

Please sign in to comment.