Skip to content

Commit

Permalink
Added multi-threaded frame analysis to daemon
Browse files Browse the repository at this point in the history
  • Loading branch information
sam-dieck committed Nov 19, 2017
1 parent 0031c17 commit b4d3c51
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 87 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ openalpr-*.asc
openalpr-*.sig
openalpr-*.tar.gz
*.orig
.DS_Store

# vim editor files
*.swp

# Visual Studio files
src.sln
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ ELSE()
ENDIF()


set (CMAKE_CXX_STANDARD 11)
set(CMAKE_CSS_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -Wall ")
if (NOT IOS)
ADD_EXECUTABLE( alpr main.cpp )
Expand Down
191 changes: 104 additions & 87 deletions src/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "daemon/beanstalk.hpp"
#include "video/logging_videobuffer.h"
#include "daemon/daemonconfig.h"
#include "inc/safequeue.h"

#include "tclap/CmdLine.h"
#include "alpr.h"
Expand All @@ -23,7 +24,10 @@

using namespace alpr;

// prototypes
// Variables
SafeQueue<cv::Mat> framesQueue;

// Prototypes
void streamRecognitionThread(void* arg);
bool writeToQueue(std::string jsonResult);
bool uploadPost(CURL* curl, std::string url, std::string data);
Expand All @@ -45,6 +49,7 @@ struct CaptureThreadData
std::string stream_url;
std::string site_id;
int camera_id;
int analysis_threads;

bool clock_on;

Expand Down Expand Up @@ -200,6 +205,7 @@ int main( int argc, const char** argv )
tdata->country_code = daemon_config.country;
tdata->company_id = daemon_config.company_id;
tdata->site_id = daemon_config.site_id;
tdata->analysis_threads = daemon_config.analysis_threads;
tdata->top_n = daemon_config.topn;
tdata->pattern = daemon_config.pattern;
tdata->clock_on = clockOn;
Expand All @@ -209,10 +215,11 @@ int main( int argc, const char** argv )

if (daemon_config.uploadData)
{
// Kick off the data upload thread
UploadThreadData* udata = new UploadThreadData();
udata->upload_url = daemon_config.upload_url;
tthread::thread* thread_upload = new tthread::thread(dataUploadThread, (void*) udata );
// Kick off the data upload thread
UploadThreadData* udata = new UploadThreadData();
udata->upload_url = daemon_config.upload_url;
tthread::thread* thread_upload = new tthread::thread(dataUploadThread, (void*) udata );

threads.push_back(thread_upload);
}

Expand All @@ -231,6 +238,81 @@ int main( int argc, const char** argv )
}


void processingThread(void* arg)
{
CaptureThreadData* tdata = (CaptureThreadData*) arg;
Alpr alpr(tdata->country_code, tdata->config_file);
alpr.setTopN(tdata->top_n);
alpr.setDefaultRegion(tdata->pattern);

while (daemon_active) {

// Wait for a new frame
cv::Mat frame = framesQueue.pop();

// Process new frame
timespec startTime;
getTimeMonotonic(&startTime);

std::vector<AlprRegionOfInterest> regionsOfInterest;
regionsOfInterest.push_back(AlprRegionOfInterest(0,0, frame.cols, frame.rows));

AlprResults results = alpr.recognize(frame.data, frame.elemSize(), frame.cols, frame.rows, regionsOfInterest);

timespec endTime;
getTimeMonotonic(&endTime);
double totalProcessingTime = diffclock(startTime, endTime);

if (tdata->clock_on) {
LOG4CPLUS_INFO(logger, "Camera " << tdata->camera_id << " processed frame in: " << totalProcessingTime << " ms.");
}

if (results.plates.size() > 0) {

std::stringstream uuid_ss;
uuid_ss << tdata->site_id << "-cam" << tdata->camera_id << "-" << getEpochTimeMs();
std::string uuid = uuid_ss.str();

// Save the image to disk (using the UUID)
if (tdata->output_images) {
std::stringstream ss;
ss << tdata->output_image_folder << "/" << uuid << ".jpg";
cv::imwrite(ss.str(), frame);
}

// Update the JSON content to include UUID and camera ID
std::string json = alpr.toJson(results);
cJSON *root = cJSON_Parse(json.c_str());
cJSON_AddStringToObject(root, "uuid", uuid.c_str());
cJSON_AddNumberToObject(root, "camera_id", tdata->camera_id);
cJSON_AddStringToObject(root, "site_id", tdata->site_id.c_str());
cJSON_AddNumberToObject(root, "img_width", frame.cols);
cJSON_AddNumberToObject(root, "img_height", frame.rows);

// Add the company ID to the output if configured
if (tdata->company_id.length() > 0)
cJSON_AddStringToObject(root, "company_id", tdata->company_id.c_str());

char *out;
out=cJSON_PrintUnformatted(root);
cJSON_Delete(root);

std::string response(out);

free(out);

// Push the results to the Beanstalk queue
for (int j = 0; j < results.plates.size(); j++)
{
LOG4CPLUS_DEBUG(logger, "Writing plate " << results.plates[j].bestPlate.characters << " (" << uuid << ") to queue.");
}

writeToQueue(response);
}
}
}


void streamRecognitionThread(void* arg)
{
CaptureThreadData* tdata = (CaptureThreadData*) arg;
Expand All @@ -239,106 +321,41 @@ void streamRecognitionThread(void* arg)
LOG4CPLUS_INFO(logger, "pattern: " << tdata->pattern);
LOG4CPLUS_INFO(logger, "Stream " << tdata->camera_id << ": " << tdata->stream_url);

Alpr alpr(tdata->country_code, tdata->config_file);
alpr.setTopN(tdata->top_n);
alpr.setDefaultRegion(tdata->pattern);


int framenum = 0;
/* Create processing threads */
const int num_threads = tdata->analysis_threads;
tthread::thread* threads[num_threads];

for (int i = 0; i < num_threads; i++) {
LOG4CPLUS_INFO(logger, "Spawning Thread " << i );
tthread::thread* t = new tthread::thread(processingThread, (void*) tdata);
threads[i] = t;
}

cv::Mat frame;
LoggingVideoBuffer videoBuffer(logger);

videoBuffer.connect(tdata->stream_url, 5);

cv::Mat latestFrame;

std::vector<uchar> buffer;

LOG4CPLUS_INFO(logger, "Starting camera " << tdata->camera_id);

while (daemon_active)
{
std::vector<cv::Rect> regionsOfInterest;
int response = videoBuffer.getLatestFrame(&latestFrame, regionsOfInterest);
int response = videoBuffer.getLatestFrame(&frame, regionsOfInterest);

if (response != -1)
{

timespec startTime;
getTimeMonotonic(&startTime);

std::vector<AlprRegionOfInterest> regionsOfInterest;
regionsOfInterest.push_back(AlprRegionOfInterest(0,0, latestFrame.cols, latestFrame.rows));

AlprResults results = alpr.recognize(latestFrame.data, latestFrame.elemSize(), latestFrame.cols, latestFrame.rows, regionsOfInterest);

timespec endTime;
getTimeMonotonic(&endTime);
double totalProcessingTime = diffclock(startTime, endTime);

if (tdata->clock_on)
{
LOG4CPLUS_INFO(logger, "Camera " << tdata->camera_id << " processed frame in: " << totalProcessingTime << " ms.");
}

if (results.plates.size() > 0)
{

std::stringstream uuid_ss;
uuid_ss << tdata->site_id << "-cam" << tdata->camera_id << "-" << getEpochTimeMs();
std::string uuid = uuid_ss.str();

// Save the image to disk (using the UUID)
if (tdata->output_images)
{
std::stringstream ss;
ss << tdata->output_image_folder << "/" << uuid << ".jpg";

cv::imwrite(ss.str(), latestFrame);
}

// Update the JSON content to include UUID and camera ID

std::string json = alpr.toJson(results);

cJSON *root = cJSON_Parse(json.c_str());
cJSON_AddStringToObject(root, "uuid", uuid.c_str());
cJSON_AddNumberToObject(root, "camera_id", tdata->camera_id);
cJSON_AddStringToObject(root, "site_id", tdata->site_id.c_str());
cJSON_AddNumberToObject(root, "img_width", latestFrame.cols);
cJSON_AddNumberToObject(root, "img_height", latestFrame.rows);

// Add the company ID to the output if configured
if (tdata->company_id.length() > 0)
cJSON_AddStringToObject(root, "company_id", tdata->company_id.c_str());

char *out;
out=cJSON_PrintUnformatted(root);
cJSON_Delete(root);

std::string response(out);

free(out);

// Push the results to the Beanstalk queue
for (int j = 0; j < results.plates.size(); j++)
{
LOG4CPLUS_DEBUG(logger, "Writing plate " << results.plates[j].bestPlate.characters << " (" << uuid << ") to queue.");
}

writeToQueue(response);
if (response != -1) {
if (framesQueue.empty()) {
framesQueue.push(frame);
}
}

usleep(10000);
}


videoBuffer.disconnect();

LOG4CPLUS_INFO(logger, "Video processing ended");

delete tdata;
for (int i = 0; i < num_threads; i++) {
delete threads[i];
}
}


Expand Down
1 change: 1 addition & 0 deletions src/daemon/daemonconfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ DaemonConfig::DaemonConfig(std::string config_file, std::string config_defaults_

country = getString(&ini, &defaultIni, "daemon", "country", "us");
topn = getInt(&ini, &defaultIni, "daemon", "topn", 20);
analysis_threads = getInt(&ini, &defaultIni, "daemon", "analysis_threads", 1);

storePlates = getBoolean(&ini, &defaultIni, "daemon", "store_plates", false);
imageFolder = getString(&ini, &defaultIni, "daemon", "store_plates_location", "/tmp/");
Expand Down
1 change: 1 addition & 0 deletions src/daemon/daemonconfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class DaemonConfig {
std::string country;

int topn;
int analysis_threads;
bool storePlates;
std::string imageFolder;
bool uploadData;
Expand Down
48 changes: 48 additions & 0 deletions src/inc/safequeue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#ifndef SAFE_QUEUE_H_
#define SAFE_QUEUE_H_

#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>

template <typename T>
class SafeQueue
{
public:
T pop()
{
std::unique_lock<std::mutex> mlock(_mutex);
while (_queue.empty()) {
_cond.wait(mlock);
}
auto val = _queue.front();
_queue.pop();
return val;
}

void push(const T& item)
{
std::unique_lock<std::mutex> mlock(_mutex);
_queue.push(item);
mlock.unlock();
_cond.notify_one();
}

bool empty()
{
return _queue.empty();
}

SafeQueue() = default;
// Disable copying and assignments
SafeQueue(const SafeQueue&) = delete;
SafeQueue& operator=(const SafeQueue&) = delete;

private:
std::queue<T> _queue;
std::mutex _mutex;
std::condition_variable _cond;
};

#endif

0 comments on commit b4d3c51

Please sign in to comment.