Skip to content

Commit

Permalink
Resolve "Handle different smart pointer port types internally"
Browse files Browse the repository at this point in the history
Closes #12
  • Loading branch information
jalbright-geon committed Nov 8, 2024
1 parent eb8130a commit bc5748f
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 24 deletions.
9 changes: 7 additions & 2 deletions include/composite/input_port.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ class input_port : public port {
}

auto type_id() const noexcept -> std::size_t override {
return typeid(T).hash_code();
return typeid(value_type).hash_code();
}

auto is_unique_type() const noexcept -> bool override {
return traits::is_unique_ptr_v<T>;
}

auto get_data() -> std::tuple<buffer_type, timestamp_type> {
Expand All @@ -92,7 +96,8 @@ class input_port : public port {
}

private:
friend class output_port<T>;
friend class output_port<std::unique_ptr<value_type>>;
friend class output_port<std::shared_ptr<value_type>>;

auto add_data(std::tuple<buffer_type, timestamp_type>&& data) -> void {
const auto lock = std::scoped_lock{m_data_mtx};
Expand Down
187 changes: 170 additions & 17 deletions include/composite/output_port.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/

#pragma once

#include "port.hpp"
#include "input_port.hpp"
#include "timestamp.hpp"

#include <algorithm>
#include <ranges>
#include <string_view>
#include <thread>
#include <typeinfo>

namespace composite {
Expand All @@ -38,35 +40,90 @@ class output_port : public port {

explicit output_port(std::string_view name) : port(name) {}

~output_port() override {
m_thread.request_stop();
if (m_thread.joinable()) {
m_thread.join();
}
}

auto type_id() const noexcept -> std::size_t override {
return typeid(T).hash_code();
return typeid(value_type).hash_code();
}

auto is_unique_type() const noexcept -> bool override {
return traits::is_unique_ptr_v<T>;
}

auto send_data(buffer_type data, timestamp_type ts) -> void {
for (auto i : std::views::iota(size_t{0}, m_connected_ports.size())) {
if (auto port = m_connected_ports.at(i); port != nullptr) {
if constexpr (traits::is_unique_ptr_v<T>) {
if (i == m_connected_ports.size() - 1) {
// last port, move incoming
port->add_data({std::move(data), ts});
} else {
// make a copy of the incoming data
auto data_copy = std::make_unique<value_type>(*data);
port->add_data({std::move(data_copy), ts});
if (m_thread.joinable()) {
const auto lock = std::scoped_lock{m_data_mtx};
m_queue.emplace_back(std::make_tuple(std::move(data), ts));
m_data_cv.notify_one();
} else if constexpr (traits::is_unique_ptr_v<T>) {
if (m_connected_ports.size() == 1) {
if (auto dst_port = m_connected_ports.front(); dst_port != nullptr) {
if (dst_port->is_unique_type()) { // u -> u
auto dst = static_cast<input_port<T>*>(dst_port);
dst->add_data({std::move(data), ts});
} else { // u -> s
auto dst = static_cast<input_port<std::shared_ptr<value_type>>*>(dst_port);
dst->add_data({std::shared_ptr<value_type>{data.release()}, ts});
}
}
} else { // u -> s,s,...
auto shared_data = std::shared_ptr<value_type>{data.release()};
for (auto port : m_connected_ports) {
if (port != nullptr) {
auto dst = static_cast<input_port<std::shared_ptr<value_type>>*>(port);
dst->add_data({shared_data, ts});
}
}
}
} else { // shared_ptr
if (m_connected_ports.size() == 1) {
if (auto dst_port = m_connected_ports.front(); dst_port != nullptr) {
if (dst_port->is_unique_type()) { // s -> u
auto dst = static_cast<input_port<std::unique_ptr<value_type>>*>(dst_port);
dst->add_data({std::make_unique<value_type>(std::move(*data)), ts});
} else { // s -> s
auto dst = static_cast<input_port<T>*>(dst_port);
dst->add_data({data, ts});
}
}
} else { // s -> s,s,...
for (auto& port : m_connected_ports) {
if (port != nullptr) {
auto dst = static_cast<input_port<T>*>(port);
dst->add_data({data, ts});
}
} else { // shared_ptr
port->add_data({data, ts});
}
}
}
}

auto connect(port* port) -> void override {
m_connected_ports.emplace_back(static_cast<input_port<T>*>(port));
m_connected_ports.emplace_back(port);
// sort with unique_ptr ports at the back
std::ranges::sort(m_connected_ports, [](const auto a, const auto b) {
return (!a->is_unique_type() && b->is_unique_type());
});
// start thread if required
auto thread_required = false;
if (m_connected_ports.size() > 1 && port->is_unique_type()) {
thread_required = true;
}
if (thread_required && !m_thread.joinable()) {
m_thread = std::jthread(&output_port::thread_func, this);
}
}

auto disconnect() -> void {
m_connected_ports.clear();
m_thread.request_stop();
if (m_thread.joinable()) {
m_thread.join();
}
}

auto is_connected() const -> bool {
Expand All @@ -76,13 +133,109 @@ class output_port : public port {
void eos(bool value) const {
for (auto port : m_connected_ports) {
if (port) {
port->eos(value);
if (port->is_unique_type()) {
static_cast<input_port<std::unique_ptr<value_type>>*>(port)->eos(value);
} else { // shared_ptr
static_cast<input_port<std::shared_ptr<value_type>>*>(port)->eos(value);
}
}
}
}

private:
std::vector<input_port<T>*> m_connected_ports;
std::vector<port*> m_connected_ports;
std::jthread m_thread;
std::deque<std::tuple<buffer_type, timestamp_type>> m_queue;
std::mutex m_data_mtx;
std::condition_variable m_data_cv;

auto thread_func(std::stop_token token) -> void {
while (!token.stop_requested()) {
using namespace std::chrono_literals;
auto lock = std::unique_lock{m_data_mtx};
m_data_cv.wait_for(lock, 1s, [this]{ return !m_queue.empty(); });
if (!m_queue.empty()) {
auto [data, ts] = std::move(m_queue.front());
m_queue.pop_front();
lock.unlock();
// Determine structure of fan-out
auto all_unique = true;
for (const auto port : m_connected_ports) {
if (port != nullptr && !port->is_unique_type()) {
all_unique = false;
break;
}
}
// Use output_port type to determine how data needs to be copied/moved
if constexpr (traits::is_unique_ptr_v<T>) {
if (all_unique) { // u -> u,u,...
send_all_unique({std::move(data), ts});
} else { // u -> s,...,u,...
auto shared_data = std::make_shared<value_type>(*data);
for (auto i : std::views::iota(size_t{0}, m_connected_ports.size())) {
if (auto port = m_connected_ports.at(i); port != nullptr) {
if (port->is_unique_type()) {
auto dst = static_cast<input_port<T>*>(port);
if (i == m_connected_ports.size() - 1) {
// last port, move incoming
dst->add_data({std::move(data), ts});
} else {
// make a copy of the incoming data
auto data_copy = std::make_unique<value_type>(*data);
dst->add_data({std::move(data_copy), ts});
}
} else { // shared_ptr
auto dst = static_cast<input_port<std::shared_ptr<value_type>>*>(port);
dst->add_data({shared_data, ts});
}
}
}
}
} else { // shared_ptr
if (all_unique) { // s -> u,u,...
send_all_unique({std::make_unique<value_type>(std::move(*data)), ts});
} else { // s -> s,...,u,...
auto unique_data = std::make_unique<value_type>(*data);
for (auto i : std::views::iota(size_t{0}, m_connected_ports.size())) {
if (auto port = m_connected_ports.at(i); port != nullptr) {
if (port->is_unique_type()) {
auto dst = static_cast<input_port<std::unique_ptr<value_type>>*>(port);
if (i == m_connected_ports.size() - 1) {
// last port, move incoming
dst->add_data({std::move(unique_data), ts});
} else {
// make a copy of the incoming data
auto data_copy = std::make_unique<value_type>(*unique_data);
dst->add_data({std::move(data_copy), ts});
}
} else { // shared_ptr
auto dst = static_cast<input_port<T>*>(port);
dst->add_data({data, ts});
}
}
}
}
}
}
}
}

auto send_all_unique(std::tuple<std::unique_ptr<value_type>, timestamp_type>&& data) -> void {
auto [unique_data, ts] = std::move(data);
for (auto i : std::views::iota(size_t{0}, m_connected_ports.size())) {
if (auto port = m_connected_ports.at(i); port != nullptr) {
auto dst = static_cast<input_port<std::unique_ptr<value_type>>*>(port);
if (i == m_connected_ports.size() - 1) {
// last port, move incoming
dst->add_data({std::move(unique_data), ts});
} else {
// make a copy of the incoming data
auto data_copy = std::make_unique<value_type>(*unique_data);
dst->add_data({std::move(data_copy), ts});
}
}
}
}

}; // class output_port

Expand Down
5 changes: 2 additions & 3 deletions include/composite/port.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ template<typename T> concept smart_ptr = is_shared_ptr_v<T> || is_unique_ptr_v<T

class port {
public:
explicit port(std::string_view name) :
m_name(name) {
}
explicit port(std::string_view name) : m_name(name) {}

virtual ~port() = default;

Expand All @@ -56,6 +54,7 @@ class port {
}

virtual auto type_id() const noexcept -> std::size_t = 0;
virtual auto is_unique_type() const noexcept -> bool = 0;

virtual auto connect(port* port) -> void {
// to be implemented by derived class
Expand Down
5 changes: 3 additions & 2 deletions include/composite/port_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
namespace composite {

class port_set {
using port_map_t = std::map<std::string, port*>;
public:
using port_map_type = std::map<std::string, port*>;

auto add_port(port* port) -> void {
m_ports.try_emplace(port->name(), port);
}
Expand All @@ -41,7 +42,7 @@ class port_set {
}

private:
port_map_t m_ports;
port_map_type m_ports;

}; // class port_set

Expand Down

0 comments on commit bc5748f

Please sign in to comment.