Skip to content
This repository has been archived by the owner on Nov 10, 2022. It is now read-only.

Commit

Permalink
Updated w3c-visserver-api to use Boost.Beast server implementation
Browse files Browse the repository at this point in the history
 - Integrated new server implementation supporting Web-Socket and HTTP
   on same port. Both plain and SSL connections possible.
 - Updated IServer interface to allow for server listeners and different
   connection types
 - Refactored SubscriptionHandler and WsChannel to streamline connection
   ID for different servers. Small cleanup done.
 - Updated test client to allow use of old client or new based on
   Boost.Beast

Old and new implementations of client and server are left, but could be removed
in future if there is no need for them anymore

Signed-off-by: Miladinovic Bojan <[email protected]>
  • Loading branch information
Miladinovic Bojan committed Oct 29, 2019
1 parent d5867fa commit 3596941
Show file tree
Hide file tree
Showing 13 changed files with 332 additions and 90 deletions.
2 changes: 2 additions & 0 deletions w3c-visserver-api/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ if(BUILD_TEST_CLIENT)
add_executable(testclient ${CMAKE_CURRENT_SOURCE_DIR}/test/testclient.cpp)
target_link_libraries(testclient simple-websocket-server)
target_include_directories(testclient PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/jsoncons)
target_include_directories(testclient INTERFACE ${Boost_INCLUDE_DIR})
target_link_libraries(testclient ${Boost_LIBRARIES} ${Boost_UNIT_TEST_FRAMEWORK_LIBRARY})
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/../examples/demo-certificates/Client.pem ${CMAKE_CURRENT_BINARY_DIR} COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/../examples/demo-certificates/Client.key ${CMAKE_CURRENT_BINARY_DIR} COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/../examples/demo-certificates/CA.pem ${CMAKE_CURRENT_BINARY_DIR} COPYONLY)
Expand Down
18 changes: 15 additions & 3 deletions w3c-visserver-api/include/IServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,26 @@
#define __IWSSERVER_H__

#include <string>
#include <memory>

class IVssCommandProcessor;

/**
* \class ObserverType
* \brief Server traffic types which can be observed
*/
enum class ObserverType {
WEBSOCKET = 0x01, //!< Receive Web-Socket traffic data
HTTP = 0x02, //!< Receive HTTP traffic data
ALL = 0x03 //!< Receive all traffic
};

class IServer {
public:
virtual ~IServer() {}

virtual void startServer(const std::string &endpointName) = 0;
virtual void sendToConnection(uint32_t connID, const std::string &message) = 0;
virtual void start() = 0;
virtual void AddListener(ObserverType, std::shared_ptr<IVssCommandProcessor>) = 0;
virtual void RemoveListener(ObserverType, std::shared_ptr<IVssCommandProcessor>) = 0;
virtual void SendToConnection(uint64_t connID, const std::string &message) = 0;
};
#endif
3 changes: 1 addition & 2 deletions w3c-visserver-api/include/ISubscriptionHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ class ISubscriptionHandler {
public:
virtual ~ISubscriptionHandler() {}

virtual uint32_t subscribe(WsChannel& channel,
virtual uint64_t subscribe(WsChannel& channel,
std::shared_ptr<IVssDatabase> db,
uint32_t channelID,
const std::string &path) = 0;
virtual int unsubscribe(uint32_t subscribeID) = 0;
virtual int unsubscribeAll(uint32_t connectionID) = 0;
Expand Down
5 changes: 2 additions & 3 deletions w3c-visserver-api/include/SubscriptionHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class WsServer;
class ILogger;

// Subscription ID: Client ID
typedef std::unordered_map<uint32_t, uint32_t> subscriptions_t;
typedef std::unordered_map<uint32_t, uint64_t> subscriptions_t;

// Subscription UUID
typedef std::string uuid_t;
Expand All @@ -61,9 +61,8 @@ class SubscriptionHandler : public ISubscriptionHandler {
std::shared_ptr<IAccessChecker> checkAccess);
~SubscriptionHandler();

uint32_t subscribe(WsChannel& channel,
uint64_t subscribe(WsChannel& channel,
std::shared_ptr<IVssDatabase> db,
uint32_t channelID,
const std::string &path);
int unsubscribe(uint32_t subscribeID);
int unsubscribeAll(uint32_t connectionID);
Expand Down
3 changes: 1 addition & 2 deletions w3c-visserver-api/include/VssCommandProcessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ class VssCommandProcessor : public IVssCommandProcessor {
std::string processGet(WsChannel& channel, uint32_t request_id, std::string path);
std::string processSet(WsChannel& channel, uint32_t request_id, std::string path,
jsoncons::json value);
std::string processSubscribe(WsChannel& channel, uint32_t request_id,
std::string path, uint32_t connectionID);
std::string processSubscribe(WsChannel& channel, uint32_t request_id, std::string path);
std::string processUnsubscribe(uint32_t request_id, uint32_t subscribeID);
std::string processGetMetaData(uint32_t request_id, std::string path);
std::string processAuthorize(WsChannel& channel, uint32_t request_id,
Expand Down
20 changes: 14 additions & 6 deletions w3c-visserver-api/include/WsChannel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,32 @@ using namespace jsoncons;
using jsoncons::json;

class WsChannel {
public:
enum class Type {
WEBSOCKET_PLAIN,
WEBSOCKET_SSL,
HTTP_PLAIN,
HTTP_SSL
};
private:
uint32_t connectionID;
uint64_t connectionID;
bool authorized = false;
string authToken;
json permissions;
Type typeOfConnection;

public:
void setConnID(uint32_t conID) { connectionID = conID; }

void setConnID(uint64_t conID) { connectionID = conID; }
void setAuthorized(bool isauth) { authorized = isauth; }
void setAuthToken(string tok) { authToken = tok; }
void setPermissions(json perm) { permissions = perm; }
void setType(Type type) { typeOfConnection = type; }

uint32_t getConnID() { return connectionID; }

uint64_t getConnID() { return connectionID; }
bool isAuthorized() { return authorized; }

string getAuthToken() { return authToken; }

json getPermissions() { return permissions; }
Type getType() { return typeOfConnection; }
};
#endif
9 changes: 7 additions & 2 deletions w3c-visserver-api/include/WsServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ class WsServer : public IServer {
int port);
~WsServer();
void startServer(const std::string &endpointName);
void sendToConnection(uint32_t connID, const std::string &message);
void start();
bool start();

// IServer

void AddListener(ObserverType, std::shared_ptr<IVssCommandProcessor>);
void RemoveListener(ObserverType, std::shared_ptr<IVssCommandProcessor>);
void SendToConnection(uint64_t connID, const std::string &message);
};
#endif
42 changes: 13 additions & 29 deletions w3c-visserver-api/src/SubscriptionHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include "ILogger.hpp"

using namespace std;
// using namespace jsoncons;
using namespace jsoncons::jsonpath;
using jsoncons::json;

Expand All @@ -46,14 +45,13 @@ SubscriptionHandler::~SubscriptionHandler() {
stopThread();
}

uint32_t SubscriptionHandler::subscribe(WsChannel& channel,
uint64_t SubscriptionHandler::subscribe(WsChannel& channel,
std::shared_ptr<IVssDatabase> db,
uint32_t channelID,
const string &path) {
// generate subscribe ID "randomly".
uint32_t subId = rand() % 9999999;
uint64_t subId = rand() % 9999999;
// embed connection ID into subID.
subId = channelID + subId;
subId = channel.getConnID() + subId;

bool isBranch = false;
string jPath = db->getVSSSpecificPath(path, isBranch, db->data_tree);
Expand All @@ -66,7 +64,6 @@ uint32_t SubscriptionHandler::subscribe(WsChannel& channel,
throw noPermissionException(msg.str());
}

int clientID = channelID / CLIENT_MASK;
jsoncons::json resArray = jsonpath::json_query(db->data_tree, jPath);

if (resArray.is_array() && resArray.size() == 1) {
Expand All @@ -79,7 +76,7 @@ uint32_t SubscriptionHandler::subscribe(WsChannel& channel,
+ string("ID with a new one"));
}

subscribeHandle[sigUUID][subId] = clientID;
subscribeHandle[sigUUID][subId] = channel.getConnID();

return subId;
} else if (resArray.is_array()) {
Expand Down Expand Up @@ -114,7 +111,7 @@ int SubscriptionHandler::unsubscribeAll(uint32_t connectionID) {
for (auto& uuid : subscribeHandle) {
auto subscriptions = &(uuid.second);
for (auto& subscription : *subscriptions) {
if (subscription.second == (connectionID / CLIENT_MASK)) {
if (subscription.second == (connectionID)) {
subscriptions->erase(subscription.first);
}
}
Expand All @@ -131,11 +128,10 @@ int SubscriptionHandler::updateByUUID(const string &signalUUID,
}

for (auto subID : handle->second) {
subMutex.lock();
std::lock_guard<std::mutex> lock(subMutex);
pair<uint32_t, json> newSub;
newSub = std::make_pair(subID.first, value);
newSub = std::make_pair(subID.second, value);
buffer.push(newSub);
subMutex.unlock();
}

return 0;
Expand All @@ -154,33 +150,29 @@ int SubscriptionHandler::updateByPath(const string &path, const json &value) {
}

void* SubscriptionHandler::subThreadRunner() {
// SubscriptionHandler* handler = (SubscriptionHandler*)instance;

logger->Log(LogLevel::VERBOSE, "SubscribeThread: Started Subscription Thread!");

while (isThreadRunning()) {
subMutex.lock();
if (buffer.size() > 0) {
pair<uint32_t, jsoncons::json> newSub = buffer.front();
std::lock_guard<std::mutex> lock(subMutex);
auto newSub = buffer.front();
buffer.pop();

uint32_t subID = newSub.first;
auto connId = newSub.first;
jsoncons::json value = newSub.second;

jsoncons::json answer;
answer["action"] = "subscribe";
answer["subscriptionId"] = subID;
answer["subscriptionId"] = connId;
answer.insert_or_assign("value", value);
answer["timestamp"] = time(NULL);

stringstream ss;
ss << pretty_print(answer);
string message = ss.str();

uint32_t connectionID = (subID / CLIENT_MASK) * CLIENT_MASK;
getServer()->sendToConnection(connectionID, message);
getServer()->SendToConnection(connId, message);
}
subMutex.unlock();
// sleep 10 ms
usleep(10000);
}
Expand All @@ -191,22 +183,14 @@ void* SubscriptionHandler::subThreadRunner() {

int SubscriptionHandler::startThread() {
subThread = thread(&SubscriptionHandler::subThreadRunner, this);
/*
if (pthread_create(&subscription_thread, NULL, &subThread, this)) {
logger->Log(LogLevel::ERROR, "SubscriptionHandler::startThread: Error creating subscription "
+ "handler thread");
return 1;
}
*/
threadRun = true;
return 0;
}

int SubscriptionHandler::stopThread() {
subMutex.lock();
std::lock_guard<std::mutex> lock(subMutex);
threadRun = false;
subThread.join();
subMutex.unlock();
return 0;
}

Expand Down
7 changes: 3 additions & 4 deletions w3c-visserver-api/src/VssCommandProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,13 @@ string VssCommandProcessor::processSet(WsChannel &channel,
}

string VssCommandProcessor::processSubscribe(WsChannel &channel,
uint32_t request_id, string path,
uint32_t connectionID) {
uint32_t request_id, string path) {
logger->Log(LogLevel::VERBOSE, string("VssCommandProcessor::processSubscribe: path received from client ")
+ string("for subscription"));

uint32_t subId = -1;
try {
subId = subHandler->subscribe(channel, database, connectionID, path);
subId = subHandler->subscribe(channel, database, path);
} catch (noPathFoundonTree &noPathFound) {
logger->Log(LogLevel::ERROR, string(noPathFound.what()));
return pathNotFoundResponse(request_id, "subscribe", path);
Expand Down Expand Up @@ -462,7 +461,7 @@ string VssCommandProcessor::processQuery(const string &req_json,
logger->Log(LogLevel::VERBOSE, "VssCommandProcessor::processQuery: subscribe query for "
+ path + " with request id " + to_string(request_id));
response =
processSubscribe(channel, request_id, path, channel.getConnID());
processSubscribe(channel, request_id, path);
} else if (action == "getMetadata") {
logger->Log(LogLevel::VERBOSE, "VssCommandProcessor::processQuery: metadata query for "
+ path + " with request id " + to_string(request_id));
Expand Down
6 changes: 0 additions & 6 deletions w3c-visserver-api/src/WebSockHttpFlexServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,15 @@
* *****************************************************************************
*/

#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/version.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/make_unique.hpp>
#include <boost/config.hpp>
#include <algorithm>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
Expand Down
14 changes: 12 additions & 2 deletions w3c-visserver-api/src/WsServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ void WsServer::startServer(const string &endpointName) {
}
}

void WsServer::sendToConnection(uint32_t connectionID, const string &message) {
void WsServer::SendToConnection(uint64_t connectionID, const string &message) {
if (isSecure_) {
auto send_stream = make_shared<SecuredServer::SendStream>();
*send_stream << message;
Expand Down Expand Up @@ -268,11 +268,21 @@ void *startWSServer(void *arg) {
return NULL;
}

void WsServer::start() {
bool WsServer::start() {
pthread_t startWSServer_thread;

/* create the web socket server thread. */
if (pthread_create(&startWSServer_thread, NULL, &startWSServer, NULL)) {
logger->Log(LogLevel::ERROR, "main: Error creating websocket server run thread");
}

return true;
}

void WsServer::AddListener(ObserverType, std::shared_ptr<IVssCommandProcessor>)
{
}

void WsServer::RemoveListener(ObserverType, std::shared_ptr<IVssCommandProcessor>)
{
}
Loading

0 comments on commit 3596941

Please sign in to comment.