Skip to content

Commit

Permalink
[Serving] Support multi-tiered Embedding configuration in serving. (D…
Browse files Browse the repository at this point in the history
  • Loading branch information
candyzone authored Aug 17, 2022
1 parent 58cbb41 commit 1b0a70b
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 4 deletions.
14 changes: 10 additions & 4 deletions docs/Processor.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,7 @@ void* initialize(const char* model_entry, const char* model_config, int* state);

# [feature_store_type是'redis'需要], redis读线程数
"read_thread_num": 4,

# [feature_store_type是'redis'需要],redis更新模型线程数
"update_thread_num": 1,
# [feature_store_type是'redis'需要],redis更新模型线程数 "update_thread_num": 1,

# 默认序列化使用protobuf(预留参数)
"serialize_protocol": "protobuf",
Expand Down Expand Up @@ -369,7 +367,15 @@ void* initialize(const char* model_entry, const char* model_config, int* state);

# timeline保存位置,支持oss和local
# local: "/root/timeline/"
"timeline_path": "oss://mybucket/timeline/"
"timeline_path": "oss://mybucket/timeline/",

# EmbeddingVariable 存储配置
# 0: 使用原图配置, 1: DRAM单级存储, 12: DRAM+SSDHASH 多级存储
# 默认值: 0
"ev_storage_type": 12,

# 多级存储路径, 如果设置了多级存储
"ev_storage_path": "/ssd/1/"
}
```
#### 模型路径配置
Expand Down
18 changes: 18 additions & 0 deletions serving/processor/framework/graph_optimizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,10 @@ Status SavedModelOptimizer::RunNativeTFGraphPass() {
TF_RETURN_IF_ERROR(RewriteEmbeddingLookupGraph(var_parts, import_nodes));
}

if (option_.st) {
TF_RETURN_IF_ERROR(RewriteEmbeddingVariableAttr(option_.st, option_.path));
}

// Add other passes here

// replace the graph def in saved_model_bundle
Expand Down Expand Up @@ -2355,6 +2359,20 @@ Node* SavedModelOptimizer::UpdateRestoreShardNodeInputs(
return shard_node;
}

Status SavedModelOptimizer::RewriteEmbeddingVariableAttr(
embedding::StorageType st, std::string path) {
for (Node* node : graph_.nodes()) {
if (node->op_def().name() == "KvResourceImportV2" ||
node->op_def().name() == "InitializeKvVariableOp") {
node->AddAttr("storage_type", st);
node->AddAttr("storage_path", path);

LOG(INFO) << "debug op: " << node->DebugString();
}
}
return Status::OK();
}

} // namespace processor
} // namespace tensorflow

7 changes: 7 additions & 0 deletions serving/processor/framework/graph_optimizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.

#include "tensorflow/cc/saved_model/loader.h"
#include "tensorflow/core/graph/graph.h"
#include "tensorflow/core/framework/embedding/config.pb.h"
#include "tensorflow/core/framework/graph.pb.h"
#include "tensorflow/core/framework/node_def.pb.h"
#include "tensorflow/core/framework/node_def_util.h"
Expand Down Expand Up @@ -144,6 +145,10 @@ struct GraphOptimizerOption {
// current instance partition id
int partition_id = -1;
int shard_instance_count = 0;

// multi tiered embedding
embedding::StorageType st = embedding::StorageType::INVALID;
std::string path;
};

struct SrcInfo {
Expand Down Expand Up @@ -258,6 +263,8 @@ class SavedModelOptimizer : public GraphOptimizer {
std::unordered_map<std::string, std::vector<Node*>>& origin_import_nodes,
std::vector<Node*>& new_kv_import_nodes);

Status RewriteEmbeddingVariableAttr(embedding::StorageType st, std::string path);

Node* storage_pointer_node_ = nullptr;// storage placeholder node
Node* version_node_ = nullptr; // version placeholder node
Node* incr_ckpt_node_ = nullptr; // indicate if import incr ckpt
Expand Down
18 changes: 18 additions & 0 deletions serving/processor/serving/model_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,24 @@ Status ModelConfigFactory::Create(const char* model_config, ModelConfig** config
}
}

if (!json_config["ev_storage_type"].isNull()) {
auto st = json_config["ev_storage_type"].asInt();
switch (st) {
case embedding::StorageType::INVALID:
break;
case embedding::StorageType::DRAM:
(*config)->storage_type = embedding::StorageType::DRAM;
break;
case embedding::StorageType::DRAM_SSDHASH:
(*config)->storage_type = embedding::StorageType::DRAM_SSDHASH;
(*config)->storage_path = json_config["ev_storage_path"].asString();
break;
default:
return Status(error::Code::INVALID_ARGUMENT,
"[TensorFlow] Only support ev storage type {DRAM, DRAM_SSDHASH}.");
}
}

return Status::OK();
}

Expand Down
5 changes: 5 additions & 0 deletions serving/processor/serving/model_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define SERVING_PROCESSOR_SERVING_MODEL_CONFIG_H

#include <string>
#include "tensorflow/core/framework/embedding/config.pb.h"
#include "tensorflow/core/lib/core/status.h"

namespace tensorflow {
Expand Down Expand Up @@ -54,6 +55,10 @@ struct ModelConfig {

// session use self-owned thread pool
bool use_per_session_threads = false;

// EmbeddingVariable Config
embedding::StorageType storage_type = embedding::StorageType::INVALID;
std::string storage_path;
};

class ModelConfigFactory {
Expand Down
58 changes: 58 additions & 0 deletions serving/processor/serving/model_config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,64 @@ const std::string oss_and_redis_config = " \
EXPECT_EQ("test_key", config->oss_access_key);
}

TEST_F(ModelConfigTest, ShouldSuccessWhenConfigEmbeddingConfig) {
const std::string oss_config = " \
{ \
\"serialize_protocol\": \"protobuf\", \
\"inter_op_parallelism_threads\" : 4, \
\"intra_op_parallelism_threads\" : 2, \
\"init_timeout_minutes\" : 1, \
\"signature_name\": \"tensorflow_serving\", \
\"checkpoint_dir\" : \"oss://test_ckpt/1\", \
\"savedmodel_dir\" : \"oss://test_savedmodel/1\", \
\"feature_store_type\" : \"memory\", \
\"redis_url\" :\"test_url\", \
\"redis_password\" :\"test_password\", \
\"read_thread_num\" : 2, \
\"update_thread_num\":1, \
\"model_store_type\": \"oss\", \
\"oss_endpoint\": \"test.endpoint\", \
\"oss_access_id\" : \"test_id\", \
\"ev_storage_type\" : 12, \
\"ev_storage_path\" : \"123\", \
\"oss_access_key\" : \"test_key\" \
}";

ModelConfig* config = nullptr;
EXPECT_TRUE(
ModelConfigFactory::Create(oss_config.c_str(), &config).ok());
EXPECT_EQ(12, config->storage_type);
EXPECT_EQ("123", config->storage_path);
}

TEST_F(ModelConfigTest, ShouldFailureWhenConfigEmbeddingConfig) {
const std::string oss_config = " \
{ \
\"serialize_protocol\": \"protobuf\", \
\"inter_op_parallelism_threads\" : 4, \
\"intra_op_parallelism_threads\" : 2, \
\"init_timeout_minutes\" : 1, \
\"signature_name\": \"tensorflow_serving\", \
\"checkpoint_dir\" : \"oss://test_ckpt/1\", \
\"savedmodel_dir\" : \"oss://test_savedmodel/1\", \
\"feature_store_type\" : \"memory\", \
\"redis_url\" :\"test_url\", \
\"redis_password\" :\"test_password\", \
\"read_thread_num\" : 2, \
\"update_thread_num\":1, \
\"model_store_type\": \"oss\", \
\"oss_endpoint\": \"test.endpoint\", \
\"oss_access_id\" : \"test_id\", \
\"ev_storage_type\" : 14, \
\"ev_storage_path\" : \"123\", \
\"oss_access_key\" : \"test_key\" \
}";

ModelConfig* config = nullptr;
EXPECT_EQ(error::Code::INVALID_ARGUMENT,
ModelConfigFactory::Create(oss_config.c_str(), &config).code());
}

} // processor
} // tensorflow

3 changes: 3 additions & 0 deletions serving/processor/serving/model_instance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,9 @@ Status LocalSessionInstance::Init(ModelConfig* config,
PartitionPolicy::GetGlobalPolicy()->GetShardInstanceCount();
}

option.st = config->storage_type;
option.path = config->storage_path;

optimizer_ = new SavedModelOptimizer(config->signature_name,
&meta_graph_def_, option);
TF_RETURN_IF_ERROR(optimizer_->Optimize());
Expand Down

0 comments on commit 1b0a70b

Please sign in to comment.