Skip to content

Commit

Permalink
Wait for children to complete in main process
Browse files Browse the repository at this point in the history
  • Loading branch information
blechschmidt committed Aug 10, 2018
1 parent 3fcb814 commit 4073a0d
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 5 deletions.
10 changes: 9 additions & 1 deletion flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ static void handle_termination()
}

// times is the number of resulting processes, i.e. if times is two, the process will fork once
size_t split_process(size_t times)
size_t split_process(size_t times, pid_t *pids)
{
if(pids != NULL)
{
pids[0] = getpid();
}
for (size_t i = 0; i < times - 1; i++)
{
pid_t child = fork();
Expand All @@ -45,6 +49,10 @@ size_t split_process(size_t times)
return i + 1;
}
default:
if(pids != NULL)
{
pids[i + 1] = child;
}
break;
}
}
Expand Down
44 changes: 40 additions & 4 deletions main.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ void cleanup()
}
free(context.sockets.pipes);
free(context.sockets.master_pipes_read);
free(context.pids);
free(context.done);
}

void log_msg(const char* format, ...)
Expand Down Expand Up @@ -617,6 +619,7 @@ void my_stats_to_msg(stats_exchange_t *stats_msg)
stats_msg->current_rate = context.stats.current_rate;
stats_msg->success_rate = context.stats.success_rate;
stats_msg->numparsed = context.stats.numparsed;
stats_msg->done = (context.state >= STATE_DONE);
for(size_t i = 0; i <= context.cmd_args.resolve_count; i++)
{
stats_msg->timeouts[i] = context.stats.timeouts[i];
Expand Down Expand Up @@ -827,7 +830,20 @@ void check_progress()

void done()
{
context.state = STATE_DONE;
context.done[context.fork_index] = true;
if(context.fork_index != 0 || context.cmd_args.num_processes == 1)
{
context.state = STATE_DONE;
}
else
{
context.finished++;
context.state = (context.finished < context.cmd_args.num_processes ? STATE_WAIT_CHILDREN : STATE_DONE);
}
if(context.cmd_args.num_processes > 1 && context.fork_index != 0)
{
send_stats();
}
check_progress();
}

Expand Down Expand Up @@ -1463,7 +1479,11 @@ void read_control_message(socket_info_t *socket_info)
{
log_msg("Atomic read failed: Read %ld bytes.\n", read_result);
}

if(!context.done[process] && context.stat_messages[process].done)
{
context.finished++;
context.done[process] = true;
}
}

void make_query_sockets_nonblocking()
Expand Down Expand Up @@ -1513,7 +1533,9 @@ void run()
#endif

init_pipes();
context.fork_index = split_process(context.cmd_args.num_processes);
context.pids = safe_calloc(context.cmd_args.num_processes * sizeof(*context.pids));
context.done = safe_calloc(context.cmd_args.num_processes * sizeof(*context.done));
context.fork_index = split_process(context.cmd_args.num_processes, context.pids);
#ifdef HAVE_EPOLL
if(!context.cmd_args.busypoll)
{
Expand Down Expand Up @@ -1646,6 +1668,11 @@ void run()
else if ((pevents[i].events & EPOLLIN) && socket_info->type == SOCKET_TYPE_CONTROL)
{
read_control_message(socket_info);
if(context.finished >= context.cmd_args.num_processes)
{
context.state = STATE_DONE;
break;
}
}
}
timed_ring_handle(&context.ring, ring_timeout);
Expand Down Expand Up @@ -1674,6 +1701,11 @@ void run()
{
read_control_message(context.sockets.master_pipes_read + i);
}
if(context.finished >= context.cmd_args.num_processes)
{
context.state = STATE_DONE;
break;
}
}
}
}
Expand All @@ -1690,6 +1722,8 @@ void use_stdin()

int parse_cmd(int argc, char **argv)
{
bool domain_param = false;

context.cmd_args.argc = argc;
context.cmd_args.argv = argv;
context.cmd_args.help_function = print_help;
Expand Down Expand Up @@ -1992,6 +2026,7 @@ int parse_cmd(int argc, char **argv)
if (context.cmd_args.domains == NULL)
{
context.cmd_args.domains = argv[i];
domain_param = true;
if (strcmp(argv[i], "-") == 0)
{
use_stdin();
Expand Down Expand Up @@ -2021,6 +2056,7 @@ int parse_cmd(int argc, char **argv)
}
}
fclose(context.domainfile);
context.domainfile = NULL;
}
}
else
Expand All @@ -2046,7 +2082,7 @@ int parse_cmd(int argc, char **argv)
log_msg("Resolvers are required to be supplied.\n");
clean_exit(EXIT_FAILURE);
}
if (context.domainfile == NULL)
if (!domain_param)
{
if(!isatty(STDIN_FILENO))
{
Expand Down
7 changes: 7 additions & 0 deletions massdns.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ typedef struct {
size_t current_rate;
size_t success_rate;
size_t numparsed;
bool done;
} stats_exchange_t;

typedef struct
Expand Down Expand Up @@ -98,6 +99,7 @@ typedef enum
STATE_WARMUP, // Before the hash map size has been reached
STATE_QUERYING,
STATE_COOLDOWN,
STATE_WAIT_CHILDREN,
STATE_DONE
} state_t;

Expand Down Expand Up @@ -182,6 +184,11 @@ typedef struct
socket_info_t *master_pipes_read;
} sockets;

// Processes
size_t finished;
pid_t *pids;
bool *done;

FILE* outfile;
FILE* logfile;
FILE* domainfile;
Expand Down

0 comments on commit 4073a0d

Please sign in to comment.