Skip to content

Commit

Permalink
[feat] add stage feat
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunelFeng committed Dec 24, 2024
1 parent ad957a1 commit ccbe0e4
Show file tree
Hide file tree
Showing 22 changed files with 405 additions and 17 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,10 @@ int main() {
* 修复辅助线程异常等待问题
* 更新`tutorial`内容

[2024.12.24 - v2.7.0 - Chunel]
* 提供`stage`(阶段)功能,用于`element`之间同步运行
* 更新`tutorial`内容

</details>

------------
Expand Down
3 changes: 2 additions & 1 deletion src/GraphCtrl/GraphElement/GAdapter/GSingleton/GSingleton.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class GSingleton : public GAdapter {
const std::string &name, CSize loop) final;

CStatus addManagers(GParamManagerPtr paramManager,
GEventManagerPtr eventManager) final;
GEventManagerPtr eventManager,
GStageManagerPtr stageManager) final;

CBool isHold() final;

Expand Down
7 changes: 4 additions & 3 deletions src/GraphCtrl/GraphElement/GAdapter/GSingleton/GSingleton.inl
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,14 @@ CStatus GSingleton<T>::addElementInfo(const std::set<GElementPtr> &depends,

template <typename T>
CStatus GSingleton<T>::addManagers(GParamManagerPtr paramManager,
GEventManagerPtr eventManager) {
GEventManagerPtr eventManager,
GStageManagerPtr stageManager) {
CGRAPH_FUNCTION_BEGIN

CGRAPH_ASSERT_INIT(false)
CGRAPH_ASSERT_NOT_NULL(paramManager, eventManager)
CGRAPH_ASSERT_NOT_NULL(paramManager, eventManager, stageManager)
auto element = dynamic_cast<T *>(s_singleton_.get());
status = element->addManagers(paramManager, eventManager);
status = element->addManagers(paramManager, eventManager, stageManager);

CGRAPH_FUNCTION_END
}
Expand Down
7 changes: 5 additions & 2 deletions src/GraphCtrl/GraphElement/GElement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,16 @@ CStatus GElement::addElementInfo(const GElementPtrSet& depends,
}


CStatus GElement::addManagers(GParamManagerPtr paramManager, GEventManagerPtr eventManager) {
CStatus GElement::addManagers(GParamManagerPtr paramManager,
GEventManagerPtr eventManager,
GStageManagerPtr stageManager) {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(false)
CGRAPH_ASSERT_NOT_NULL(paramManager, eventManager)
CGRAPH_ASSERT_NOT_NULL(paramManager, eventManager, stageManager)

this->setGParamManager(paramManager);
this->setGEventManager(eventManager);
this->setGStageManager(stageManager);
if (aspect_manager_) {
aspect_manager_->setGParamManager(paramManager);
aspect_manager_->setGEventManager(eventManager);
Expand Down
5 changes: 4 additions & 1 deletion src/GraphCtrl/GraphElement/GElement.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,12 @@ class GElement : public GElementObject,
* 设置manager信息
* @param paramManager
* @param eventManager
* @param stageManager
* @return
*/
virtual CStatus addManagers(GParamManagerPtr paramManager,
GEventManagerPtr eventManager);
GEventManagerPtr eventManager,
GStageManagerPtr stageManager);

/**
* 包含切面相关功能的函数,fat取自fatjar的意思
Expand Down Expand Up @@ -481,6 +483,7 @@ class GElement : public GElementObject,

CGRAPH_DECLARE_GPARAM_MANAGER_WRAPPER_WITH_MEMBER
CGRAPH_DECLARE_GEVENT_MANAGER_WRAPPER_WITH_MEMBER
CGRAPH_DECLARE_STAGE_MANAGER_WRAPPER_WITH_MEMBER
};

using GElementRef = GElement &;
Expand Down
1 change: 1 addition & 0 deletions src/GraphCtrl/GraphElement/GElementObject.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "../GraphParam/GParamInclude.h"
#include "../GraphAspect/GAspectInclude.h"
#include "../GraphEvent/GEventInclude.h"
#include "../GraphStage/GStageInclude.h"

CGRAPH_NAMESPACE_BEGIN

Expand Down
8 changes: 5 additions & 3 deletions src/GraphCtrl/GraphElement/GGroup/GGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ CStatus GGroup::addElement(GElementPtr element) {

this->group_elements_arr_.emplace_back(element);
element->belong_ = this;
element->addManagers(param_manager_, event_manager_);
element->addManagers(param_manager_, event_manager_, stage_manager_);
CGRAPH_FUNCTION_END
}

Expand Down Expand Up @@ -92,16 +92,18 @@ CBool GGroup::isSerializable() const {


CStatus GGroup::addManagers(GParamManagerPtr paramManager,
GEventManagerPtr eventManager) {
GEventManagerPtr eventManager,
GStageManagerPtr stageManager) {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_NOT_NULL(paramManager, eventManager)
CGRAPH_ASSERT_INIT(false)

this->setGParamManager(paramManager);
this->setGEventManager(eventManager);
this->setGStageManager(stageManager);
for (GElementPtr element : group_elements_arr_) {
CGRAPH_ASSERT_NOT_NULL(element)
status += element->addManagers(paramManager, eventManager);
status += element->addManagers(paramManager, eventManager, stageManager);
}

CGRAPH_FUNCTION_END
Expand Down
3 changes: 2 additions & 1 deletion src/GraphCtrl/GraphElement/GGroup/GGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class GGroup : public GElement {
explicit GGroup();

CStatus addManagers(GParamManagerPtr paramManager,
GEventManagerPtr eventManager) override;
GEventManagerPtr eventManager,
GStageManagerPtr stageManager) override;

CStatus init() override;

Expand Down
8 changes: 5 additions & 3 deletions src/GraphCtrl/GraphElement/GGroup/GRegion/GRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,18 @@ CBool GRegion::isSerializable() const {


CStatus GRegion::addManagers(GParamManagerPtr paramManager,
GEventManagerPtr eventManager) {
GEventManagerPtr eventManager,
GStageManagerPtr stageManager) {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(false)
CGRAPH_ASSERT_NOT_NULL(paramManager, eventManager)
CGRAPH_ASSERT_NOT_NULL(paramManager, eventManager, stageManager)

this->setGParamManager(paramManager);
this->setGEventManager(eventManager);
this->setGStageManager(stageManager);
for (auto* cur : manager_->manager_elements_) {
CGRAPH_ASSERT_NOT_NULL(cur)
status += cur->addManagers(paramManager, eventManager);
status += cur->addManagers(paramManager, eventManager, stageManager);
}

CGRAPH_FUNCTION_END
Expand Down
3 changes: 2 additions & 1 deletion src/GraphCtrl/GraphElement/GGroup/GRegion/GRegion.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class GRegion : public GGroup {
CBool isSerializable() const final;

CStatus addManagers(GParamManagerPtr paramManager,
GEventManagerPtr eventManager) final;
GEventManagerPtr eventManager,
GStageManagerPtr stageManager) final;

CBool isSeparate(GElementCPtr a, GElementCPtr b) const final;

Expand Down
18 changes: 16 additions & 2 deletions src/GraphCtrl/GraphPipeline/GPipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ GPipeline::GPipeline() {
param_manager_ = CGRAPH_SAFE_MALLOC_COBJECT(GParamManager)
daemon_manager_ = CGRAPH_SAFE_MALLOC_COBJECT(GDaemonManager)
event_manager_ = CGRAPH_SAFE_MALLOC_COBJECT(GEventManager)
stage_manager_ = CGRAPH_SAFE_MALLOC_COBJECT(GStageManager)
}


Expand All @@ -27,20 +28,22 @@ GPipeline::~GPipeline() {
CGRAPH_DELETE_PTR(element_manager_)
CGRAPH_DELETE_PTR(param_manager_)
CGRAPH_DELETE_PTR(event_manager_)
CGRAPH_DELETE_PTR(stage_manager_)
}


CStatus GPipeline::init() {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(false) // 必须是非初始化的状态下,才可以初始化。反之同理
CGRAPH_ASSERT_NOT_NULL(element_manager_, param_manager_, daemon_manager_, event_manager_)
CGRAPH_ASSERT_NOT_NULL(element_manager_, param_manager_, daemon_manager_, event_manager_, stage_manager_)

status += initEnv();
CGRAPH_FUNCTION_CHECK_STATUS

status += param_manager_->init();
status += event_manager_->init();
status += element_manager_->init();
status += stage_manager_->init();
status += daemon_manager_->init(); // daemon的初始化,需要晚于所有element的初始化
CGRAPH_FUNCTION_CHECK_STATUS

Expand Down Expand Up @@ -90,6 +93,7 @@ CStatus GPipeline::destroy() {
status += event_manager_->destroy();
status += daemon_manager_->destroy();
status += element_manager_->destroy();
status += stage_manager_->destroy();
status += param_manager_->destroy();
CGRAPH_FUNCTION_CHECK_STATUS

Expand Down Expand Up @@ -220,6 +224,16 @@ CStatus GPipeline::perf(std::ostream& oss) {
}


GPipelinePtr GPipeline::addGStage(const std::string& key, CInt threshold) {
CGRAPH_ASSERT_INIT_THROW_ERROR(false)
CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(stage_manager_)
CGRAPH_THROW_EXCEPTION_BY_CONDITION(threshold <= 0, "threshold value must bigger than 0")

stage_manager_->create(key, threshold);
return this;
}


GPipelinePtr GPipeline::setGEngineType(GEngineType type) {
CGRAPH_ASSERT_INIT_THROW_ERROR(false)
CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(element_manager_)
Expand Down Expand Up @@ -345,7 +359,7 @@ CStatus GPipeline::innerRegister(GElementPtr element, const GElementPtrSet &depe
status = element->addElementInfo(depends, curName, loop);
CGRAPH_FUNCTION_CHECK_STATUS

status = element->addManagers(param_manager_, event_manager_);
status = element->addManagers(param_manager_, event_manager_, stage_manager_);
CGRAPH_FUNCTION_CHECK_STATUS
status = element_manager_->add(element);
CGRAPH_FUNCTION_CHECK_STATUS
Expand Down
10 changes: 10 additions & 0 deletions src/GraphCtrl/GraphPipeline/GPipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "../GraphElement/GElementInclude.h"
#include "../GraphDaemon/GDaemonInclude.h"
#include "../GraphEvent/GEventInclude.h"
#include "../GraphStage/GStageInclude.h"

CGRAPH_NAMESPACE_BEGIN

Expand Down Expand Up @@ -321,6 +322,14 @@ class GPipeline : public GPipelineObject,
c_enable_if_t<std::is_base_of<GEventParam, TParam>::value, int> = 0>
GPipeline* addGEvent(const std::string& key, TParam* param = nullptr);

/**
* 添加一个阶段
* @param key
* @param threshold
* @return
*/
GPipeline* addGStage(const std::string& key, CInt threshold);

/**
* 设置引擎策略
* @param type
Expand Down Expand Up @@ -414,6 +423,7 @@ class GPipeline : public GPipelineObject,
GParamManagerPtr param_manager_ = nullptr; // 参数管理类
GDaemonManagerPtr daemon_manager_ = nullptr; // 守护管理类
GEventManagerPtr event_manager_ = nullptr; // 事件管理类
GStageManagerPtr stage_manager_ = nullptr; // 阶段管理类

GSchedule schedule_; // 调度管理类
GElementRepository repository_; // 记录创建的所有element的仓库
Expand Down
70 changes: 70 additions & 0 deletions src/GraphCtrl/GraphStage/GStage.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/***************************
@Author: Chunel
@Contact: [email protected]
@File: GStage.h
@Time: 2024/12/13 19:25
@Desc:
***************************/

#ifndef CGRAPH_GSTAGE_H
#define CGRAPH_GSTAGE_H

#include <atomic>
#include <mutex>
#include <condition_variable>

#include "GStageObject.h"

CGRAPH_NAMESPACE_BEGIN

class GStage : public GStageObject {
private:
GStage() = default;

/**
* 设置阈值信息
* @param threshold
* @return
*/
GStage* setThreshold(CInt threshold) {
threshold_ = threshold;
return this;
}

/**
* 进入等待区域
* @return
*/
CVoid waiting() {
{
CGRAPH_LOCK_GUARD wm(waiting_mutex_);
cur_value_++;
if (cur_value_ >= threshold_) {
// 如果超过了 threshold,则打开全部
cur_value_ = 0;
locker_.cv_.notify_all();
return;
}
}

CGRAPH_UNIQUE_LOCK lk(locker_.mtx_);
locker_.cv_.wait(lk, [this] {
return 0 == cur_value_ || cur_value_ >= threshold_;
});
}

private:
CInt threshold_ { 0 }; // 阈值信息
CInt cur_value_ { 0 }; // 当前值
UCvMutex locker_;
std::mutex waiting_mutex_;

friend class GStageManager;
friend class CAllocator;
};

using GStagePtr = GStage *;

CGRAPH_NAMESPACE_END

#endif //CGRAPH_GSTAGE_H
16 changes: 16 additions & 0 deletions src/GraphCtrl/GraphStage/GStageInclude.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/***************************
@Author: Chunel
@Contact: [email protected]
@File: GStageInclude.h
@Time: 2024/12/15 16:52
@Desc:
***************************/

#ifndef CGRAPH_GSTAGEINCLUDE_H
#define CGRAPH_GSTAGEINCLUDE_H

#include "GStage.h"
#include "GStageManager.h"
#include "GStageManagerWrapper.h"

#endif //CGRAPH_GSTAGEINCLUDE_H
Loading

0 comments on commit ccbe0e4

Please sign in to comment.