-
Notifications
You must be signed in to change notification settings - Fork 0
/
process_pool.c
112 lines (94 loc) · 3.27 KB
/
process_pool.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/wait.h>
#include "list.h"
#include "process_pool.h"
void process_loop(pipes_t *pipes) {
task_t task;
task_output_t result;
while (read(pipes->read_pipe_fd, &task, sizeof(task)) > 0) {
printf("Perform task = %d by pid = %d\n", task.argument, getpid());
result = task.function(task.argument);
if (write(pipes->write_pipe_fd, &result, sizeof(result)) == -1) {
printf("Error writing result to pipe.\n");
exit(1);
}
}
close(pipes->read_pipe_fd);
close(pipes->write_pipe_fd);
exit(0);
}
process_pool_t *process_pool_init(int process_count) {
process_pool_t *pool = calloc(1, sizeof(process_pool_t));
pool->process_count = process_count;
pool->processes_pipes = calloc(process_count, sizeof(pipes_t));
for (int i = 0; i < process_count; ++i) {
int pipe_child_parent[2];
int pipe_parent_child[2];
if (pipe(pipe_child_parent) == -1 ||
pipe(pipe_parent_child) == -1) {
printf("Error creating pipe.\n");
exit(1);
}
pid_t pid = fork();
if (pid == -1) {
printf("Error forking.\n");
exit(1);
}
if (pid == 0) {
// child process
close(pipe_child_parent[0]);
close(pipe_parent_child[1]);
pipes_t pipes = {.read_pipe_fd =pipe_parent_child[0], .write_pipe_fd =pipe_child_parent[1]};
process_loop(&pipes);
}
//parent process
close(pipe_child_parent[1]);
close(pipe_parent_child[0]);
pool->processes_pipes[i].read_pipe_fd = pipe_child_parent[0];
pool->processes_pipes[i].write_pipe_fd = pipe_parent_child[1];
}
return pool;
}
void process_pool_wait_for_results(process_pool_t *pool, task_output_list_t *output_list) {
while(wait(NULL) > 0);
while (pool->done_count != pool->task_count) {
for (int i = 0; i < pool->process_count; ++i) {
task_output_t *output = calloc(1, sizeof(*output));
if (read(pool->processes_pipes[i].read_pipe_fd, output, sizeof(*output)) == 0) {
free(output);
break;
}
list_push(output_list, output);
pool->done_count += 1;
}
}
}
void process_pool_map(process_pool_t *pool, task_list_t *input_list, task_output_list_t *output_list) {
pool->task_count = input_list->node_count;
pool->done_count = 0;
task_t *task = NULL;
do {
for (int i = 0; i < pool->process_count; ++i) {
task = list_pop_back(input_list);
if (!task)
break;
write(pool->processes_pipes[i].write_pipe_fd, task, sizeof(*task));
printf("Sent task = %d\n", task->argument);
free(task);
}
} while (task != NULL);
for (int i = 0; i < pool->process_count; ++i) {
close(pool->processes_pipes[i].write_pipe_fd);
}
process_pool_wait_for_results(pool, output_list);
}
void process_pool_free(process_pool_t *pool) {
while(wait(NULL) > 0);
for (int i = 0; i < pool->process_count; ++i) {
close(pool->processes_pipes[i].read_pipe_fd);
}
free(pool->processes_pipes);
free(pool);
}