Skip to content

Commit

Permalink
pause() replace by WaitGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
alpc62 committed Aug 5, 2020
1 parent ee2b426 commit a375a71
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 122 deletions.
12 changes: 9 additions & 3 deletions tutorial/tutorial-01-wget.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/

#include <netdb.h>
#include <unistd.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
Expand All @@ -26,6 +25,7 @@
#include "workflow/HttpMessage.h"
#include "workflow/HttpUtil.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"

#define REDIRECT_MAX 5
#define RETRY_MAX 2
Expand Down Expand Up @@ -97,7 +97,12 @@ void wget_callback(WFHttpTask *task)
fprintf(stderr, "\nSuccess. Press Ctrl-C to exit.\n");
}

void sig_handler(int signo) { }
static WFFacilities::WaitGroup wait_group(1);

void sig_handler(int signo)
{
wait_group.done();
}

int main(int argc, char *argv[])
{
Expand Down Expand Up @@ -125,7 +130,8 @@ int main(int argc, char *argv[])
req->add_header_pair("User-Agent", "Wget/1.14 (linux-gnu)");
req->add_header_pair("Connection", "close");
task->start();
pause();

wait_group.wait();
return 0;
}

12 changes: 9 additions & 3 deletions tutorial/tutorial-02-redis_cli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
*/

#include <netdb.h>
#include <unistd.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <string>
#include "workflow/RedisMessage.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"

#define RETRY_MAX 2

Expand Down Expand Up @@ -102,7 +102,12 @@ void redis_callback(WFRedisTask *task)
}
}

void sig_handler(int signo) { }
static WFFacilities::WaitGroup wait_group(1);

void sig_handler(int signo)
{
wait_group.done();
}

int main(int argc, char *argv[])
{
Expand Down Expand Up @@ -145,7 +150,8 @@ int main(int argc, char *argv[])
* Workflow::start_series_work(task, nullptr) or
* Workflow::create_series_work(task, nullptr)->start() */
task->start();
pause();

wait_group.wait();
return 0;
}

21 changes: 6 additions & 15 deletions tutorial/tutorial-03-wget_to_redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,16 @@

/* Tuturial-03. Store wget result in redis: key=URL, value=Http Body*/
#include <netdb.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <string>
#include <mutex>
#include <condition_variable>
#include "workflow/HttpMessage.h"
#include "workflow/HttpUtil.h"
#include "workflow/RedisMessage.h"
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"

using namespace protocol;

Expand Down Expand Up @@ -110,9 +108,6 @@ void http_callback(WFHttpTask *task)

int main(int argc, char *argv[])
{
std::mutex mutex;
std::condition_variable cond;
bool finished = false;
WFHttpTask *http_task;

if (argc != 3)
Expand Down Expand Up @@ -152,7 +147,9 @@ int main(int argc, char *argv[])
/* no more than 30 seconds receiving http response. */
http_task->set_receive_timeout(30 * 1000);

auto series_callback = [&mutex, &cond, &finished](const SeriesWork *series)
WFFacilities::WaitGroup wait_group(1);

auto series_callback = [&wait_group](const SeriesWork *series)
{
tutorial_series_context *context = (tutorial_series_context *)
series->get_context();
Expand All @@ -163,10 +160,7 @@ int main(int argc, char *argv[])
fprintf(stderr, "Series finished. failed!\n");

/* signal the main() to terminate */
mutex.lock();
finished = true;
cond.notify_one();
mutex.unlock();
wait_group.done();
};

/* Create a series */
Expand All @@ -175,10 +169,7 @@ int main(int argc, char *argv[])
series->set_context(&context);
series->start();

std::unique_lock<std::mutex> lock(mutex);
while (!finished)
cond.wait(lock);
lock.unlock();
wait_group.wait();
return 0;
}

11 changes: 8 additions & 3 deletions tutorial/tutorial-04-http_echo_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
Expand All @@ -30,6 +29,7 @@
#include "workflow/HttpUtil.h"
#include "workflow/WFServer.h"
#include "workflow/WFHttpServer.h"
#include "workflow/WFFacilities.h"

void process(WFHttpTask *server_task)
{
Expand Down Expand Up @@ -92,7 +92,12 @@ void process(WFHttpTask *server_task)
addrstr, port, seq);
}

void sig_handler(int signo) { }
static WFFacilities::WaitGroup wait_group(1);

void sig_handler(int signo)
{
wait_group.done();
}

int main(int argc, char *argv[])
{
Expand All @@ -110,7 +115,7 @@ int main(int argc, char *argv[])
port = atoi(argv[1]);
if (server.start(port) == 0)
{
pause();
wait_group.wait();
server.stop();
}
else
Expand Down
11 changes: 8 additions & 3 deletions tutorial/tutorial-05-http_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
*/

#include <signal.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <utility>
#include "workflow/Workflow.h"
#include "workflow/HttpMessage.h"
#include "workflow/HttpUtil.h"
#include "workflow/WFHttpServer.h"
#include "workflow/WFFacilities.h"

struct tutorial_series_context
{
Expand Down Expand Up @@ -137,7 +137,12 @@ void process(WFHttpTask *proxy_task)
*series << http_task;
}

void sig_handler(int signo) { }
static WFFacilities::WaitGroup wait_group(1);

void sig_handler(int signo)
{
wait_group.done();
}

int main(int argc, char *argv[])
{
Expand All @@ -160,7 +165,7 @@ int main(int argc, char *argv[])

if (server.start(port) == 0)
{
pause();
wait_group.wait();
server.stop();
}
else
Expand Down
21 changes: 5 additions & 16 deletions tutorial/tutorial-06-parallel_wget.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
#include <string.h>
#include <utility>
#include <string>
#include <mutex>
#include <condition_variable>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/HttpMessage.h"
#include "workflow/HttpUtil.h"
#include "workflow/WFFacilities.h"

using namespace protocol;

Expand Down Expand Up @@ -106,23 +105,13 @@ int main(int argc, char *argv[])
pwork->add_series(series);
}

std::mutex mutex;
std::condition_variable cond;
bool finished = false;
WFFacilities::WaitGroup wait_group(1);

Workflow::start_series_work(pwork,
[&mutex, &cond, &finished](const SeriesWork *)
{
mutex.lock();
finished = true;
cond.notify_one();
mutex.unlock();
Workflow::start_series_work(pwork, [&wait_group](const SeriesWork *) {
wait_group.done();
});

std::unique_lock<std::mutex> lock(mutex);
while (!finished)
cond.wait(lock);
lock.unlock();
wait_group.wait();
return 0;
}

25 changes: 7 additions & 18 deletions tutorial/tutorial-07-sort_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@

#include <stdlib.h>
#include <stdio.h>
#include <mutex>
#include <condition_variable>
#include "workflow/WFTaskFactory.h"

bool use_parallel_sort = false;
bool finished = false;
std::mutex mutex;
std::condition_variable cond;
#include "workflow/WFFacilities.h"

using namespace algorithm;

static WFFacilities::WaitGroup wait_group(1);

bool use_parallel_sort = false;

void callback(WFSortTask<int> *task)
{
/* Sort task's input and output are identical. */
Expand Down Expand Up @@ -60,12 +58,7 @@ void callback(WFSortTask<int> *task)
printf("Sort reversely:\n");
}
else
{
mutex.lock();
finished = true;
cond.notify_one();
mutex.unlock();
}
wait_group.done();
}

int main(int argc, char *argv[])
Expand Down Expand Up @@ -112,11 +105,7 @@ int main(int argc, char *argv[])
printf("Sort result:\n");
task->start();

std::unique_lock<std::mutex> lock(mutex);
while (!finished)
cond.wait(lock);
lock.unlock();

wait_group.wait();
free(array);
return 0;
}
Expand Down
21 changes: 5 additions & 16 deletions tutorial/tutorial-08-matrix_multiply.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
#include <errno.h>
#include <string.h>
#include <vector>
#include <mutex>
#include <condition_variable>
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"

namespace algorithm
{
Expand Down Expand Up @@ -146,23 +145,13 @@ int main()
input->a = {{1, 2, 3}, {4, 5, 6}};
input->b = {{7, 8}, {9, 10}, {11, 12}};

std::mutex mutex;
std::condition_variable cond;
bool finished = false;
WFFacilities::WaitGroup wait_group(1);

Workflow::start_series_work(task,
[&mutex, &cond, &finished](const SeriesWork *)
{
mutex.lock();
finished = true;
cond.notify_one();
mutex.unlock();
Workflow::start_series_work(task, [&wait_group](const SeriesWork *) {
wait_group.done();
});

std::unique_lock<std::mutex> lock(mutex);
while (!finished)
cond.wait(lock);
lock.unlock();
wait_group.wait();
return 0;
}

10 changes: 8 additions & 2 deletions tutorial/tutorial-09-http_file_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "workflow/WFHttpServer.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/Workflow.h"
#include "workflow/WFFacilities.h"

using namespace protocol;

Expand Down Expand Up @@ -87,7 +88,12 @@ void process(WFHttpTask *server_task, const char *root)
}
}

void sig_handler(int signo) { }
static WFFacilities::WaitGroup wait_group(1);

void sig_handler(int signo)
{
wait_group.done();
}

int main(int argc, char *argv[])
{
Expand All @@ -113,7 +119,7 @@ int main(int argc, char *argv[])

if (ret == 0)
{
pause();
wait_group.wait();
server.stop();
}
else
Expand Down
Loading

0 comments on commit a375a71

Please sign in to comment.