From 149d8105ea3c4dfd63c3c7e25b3be1e4c4f2ec45 Mon Sep 17 00:00:00 2001 From: Przemyslaw Tredak Date: Wed, 6 Feb 2019 14:02:32 -0800 Subject: [PATCH] Increase perfomance of BulkAppend and BulkFlush (#14067) * Better bulkappend * Fix lint --- src/engine/threaded_engine.h | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/src/engine/threaded_engine.h b/src/engine/threaded_engine.h index 18018cbdf3bb..4a2d4196dcd3 100644 --- a/src/engine/threaded_engine.h +++ b/src/engine/threaded_engine.h @@ -403,6 +403,10 @@ class ThreadedEngine : public Engine { BulkStatus& bulk_status = *BulkStatusStore::Get(); std::swap(bulk_status.bulk_size, bulk_size); if (bulk_status.count >= bulk_status.bulk_size) BulkFlush(); + if (!bulk_status.functions) { + bulk_status.functions.reset(new std::vector()); + } + bulk_status.functions->reserve(bulk_size); return bulk_size; } @@ -416,7 +420,7 @@ class ThreadedEngine : public Engine { /*! \brief context of current ops */ Context ctx; /*! \brief current op functions */ - SyncFn fn; + std::shared_ptr> functions; /*! \brief constant variables */ std::vector const_vars; /*! \brief mutable variables */ @@ -472,15 +476,12 @@ class ThreadedEngine : public Engine { std::vector const& const_vars, std::vector const& mutable_vars) { BulkStatus& bulk_status = *BulkStatusStore::Get(); + if (!bulk_status.functions) { + bulk_status.functions.reset(new std::vector()); + } + bulk_status.functions->push_back(exec_fn); if (!bulk_status.count) { bulk_status.ctx = exec_ctx; - bulk_status.fn = std::move(exec_fn); - } else { - auto prev_fn = std::move(bulk_status.fn); - bulk_status.fn = [exec_fn, prev_fn](RunContext rctx) { - prev_fn(rctx); - exec_fn(rctx); - }; } ++bulk_status.count; @@ -497,10 +498,12 @@ class ThreadedEngine : public Engine { if (!bulk_status.count) return; bulk_status.count = 0; DeduplicateVarHandle(&bulk_status.const_vars, &bulk_status.mutable_vars); - SyncFn fn = std::move(bulk_status.fn); - this->PushAsync([fn](RunContext ctx, CallbackOnComplete on_complete) { + auto functions = bulk_status.functions; + this->PushAsync([functions](RunContext ctx, CallbackOnComplete on_complete) { ctx.is_bulk = true; - fn(ctx); + for (auto& fn : *functions) { + fn(ctx); + } ctx.is_bulk = false; bool is_gpu = ctx.ctx.dev_mask() == gpu::kDevMask; if (is_gpu) { @@ -510,6 +513,8 @@ class ThreadedEngine : public Engine { }, bulk_status.ctx, bulk_status.const_vars, bulk_status.mutable_vars, FnProperty::kNormal, 0, "ImperativeBulk"); + bulk_status.functions.reset(new std::vector()); + bulk_status.functions->reserve(bulk_status.bulk_size); bulk_status.const_vars.clear(); bulk_status.mutable_vars.clear(); }