Skip to content

Commit e83f497

Browse files
author
Arkady Drovosekov
committed
queue-rb branch merged
1 parent bfd599f commit e83f497

26 files changed

+1215
-116
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ HTML_DOC_DIR = doc/html
1111
ERLC_OPTS = +debug_info -DTEST
1212
#ERLC_OPTS = +debug_info -DTEST -DPROPER -pa ../proper/ebin
1313
ERLC := erlc $(ERLC_OPTS)
14-
VSN=1.8
14+
VSN=2.2
1515
APP_NAME=ejobman
1616
LICENSE=MIT
1717

doc/readme

+3-1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ Version 1
7070
Version 2 (current)
7171
-------------------
7272

73+
amqp routing key is a job group
74+
7375
{type: rest},
7476
{info:
7577
{method: xxx,
@@ -81,7 +83,7 @@ Version 2 (current)
8183
{key2: val2}
8284
]
8385
},
84-
{group, xxx}
86+
{group, xxx} % will be eliminated in future versions
8587
},
8688

8789
Supported kinds of auth_info:

eall.config

+35-23
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
{error_logger_mf_maxfiles, 5} % 5 files max
1717
]},
1818
{ejobman, [
19+
{temp_rt_key_for_group, <<"new">>},
1920
% log is common and may also be defined in eworkman
2021
{log0, "/var/log/erpher/ej"},
2122
{pid_file0, "/var/run/erpher.pid"},
@@ -33,7 +34,7 @@
3334
{password, <<"guest">>},
3435
{vhost , <<"/">>},
3536
{exchange , <<"ejobman">>},
36-
{exchange_type , <<"topic">>},
37+
{exchange_type0 , <<"topic">>},
3738
{queue , <<"test_queue_2">>},
3839
{routing_key , <<"new">>}
3940
]},
@@ -51,7 +52,24 @@
5152
{run, 2}
5253
]}
5354
]},
54-
{handler, [
55+
{group_handler, [
56+
{http_connect_timeout, 15000},
57+
{http_timeout, 3600000},
58+
{max_children, 10}, % default group
59+
{job_groups, [
60+
[
61+
{name, <<"g1">>},
62+
{max_children, 1}
63+
],
64+
[
65+
{name, <<"g2">>},
66+
{max_children, 10}
67+
],
68+
[
69+
{name, <<"g3">>},
70+
{max_children, 100}
71+
]
72+
]},
5573
{schema_rewrite, [
5674
[
5775
{src_type, regex},
@@ -91,28 +109,24 @@
91109
{dst_host_hdr, "host2.localdomain"}
92110
]
93111
]},
112+
{debug, [
113+
{handler_child, 5},
114+
{handler_run, 2},
115+
{handler_job, 5},
116+
{msg, 3},
117+
{run, 2},
118+
{job, 5},
119+
{http, 3},
120+
{config, 0},
121+
{rewrite, 0}
122+
]}
123+
]},
124+
{handler, [
94125
{job_log, "/var/log/erpher/job"},
95126
% never | minute | hour | day | {dow, 0..7} | month | year
96127
{job_log_rotate, day},
97-
{http_connect_timeout, 15000},
98-
{http_timeout, 3600000},
99128
{stat_limit_n, 101}, % amount
100129
{stat_limit_t, 123}, % seconds
101-
{max_children, 10}, % to process short command
102-
{job_groups, [
103-
[
104-
{name, <<"g1">>},
105-
{max_children, 1}
106-
],
107-
[
108-
{name, <<"g2">>},
109-
{max_children, 10}
110-
],
111-
[
112-
{name, <<"g3">>},
113-
{max_children, 100}
114-
]
115-
]},
116130
{debug,
117131
[
118132
{handler_child, 2},
@@ -121,12 +135,10 @@
121135
{job_queue, 0},
122136
{job_result, 2}, % to log job results
123137
{run, 2},
124-
{stat, 0},
125-
{http, 3},
126-
{rewrite, 0}
138+
{stat, 0}
127139
]
128140
}
129-
]}
141+
]} % handler
130142
]},
131143
{eworkman, [
132144
{delay_for_log0, 10}, % ms. Start delay for ejobman to create a log

include/chi.hrl

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
-ifndef(ejobman_chi).
2+
-define(ejobman_chi, true).
3+
4+
-record(chi, {
5+
pid,
6+
id,
7+
mon,
8+
os_pid,
9+
tag,
10+
alive=true,
11+
stop={0,0,0}, % time of marking dead
12+
start={0,0,0} % time in now() format
13+
}).
14+
15+
-endif.

include/ejobman.hrl

+6-13
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22
-define(ejobman_params, true).
33

44
-include("nums.hrl").
5+
-include("chi.hrl").
56

67
% state of a worker gen_server
78
-record(child, {
89
name,
910
port,
1011
id,
1112
os_pid,
13+
gh_pid,
1214
group,
1315
tag,
1416
duration,
@@ -31,17 +33,6 @@
3133
max_children
3234
}).
3335

34-
-record(chi, {
35-
pid,
36-
id,
37-
mon,
38-
os_pid,
39-
tag,
40-
alive=true,
41-
stop={0,0,0}, % time of marking dead
42-
start={0,0,0} % time in now() format
43-
}).
44-
4536
-record(pool, {
4637
id,
4738
w_duration = 86400, % seconds
@@ -61,8 +52,8 @@
6152

6253
% state of a handler and a receiver gen_server
6354
-record(ejm, {
64-
ch_queues, % dict: group -> queue of jobs
65-
ch_data, % dict: group -> spawned children list
55+
ch_queues :: dict(), % dict: group -> queue of jobs
56+
ch_data :: dict(), % dict: group -> spawned children list
6657
max_children = 32767,
6758
http_connect_timeout = ?HTTP_CONNECT_TIMEOUT,
6859
http_timeout = ?HTTP_TIMEOUT,
@@ -72,6 +63,8 @@
7263
web_server_opts,
7364
log,
7465
pid_file,
66+
group_handler = [], % config for group handlers
67+
group_handler_run = [], % started group handlers. Unnecessary, in fact
7568
job_groups = [], % configured job groups
7669
job_log, % filename for job log
7770
job_log_last,

include/group_handler.hrl

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
-ifndef(group_handler).
2+
-define(group_handler, true).
3+
4+
-include("nums.hrl").
5+
6+
% state of a group_handler gen_server
7+
-record(egh, {
8+
id :: reference(), % id that is used by supervisor
9+
group, % group id
10+
max :: non_neg_integer(), % max children for the group
11+
ch_run = []:: list(), % running children
12+
ch_queue :: queue(), % fetched from amqp but not yet started children
13+
vhost :: binary(), % amqp virtual host
14+
conn,
15+
exchange :: binary(), % amqp exchange for a group
16+
queue :: binary(), % amqp queue for a group
17+
http_connect_timeout = ?HTTP_CONNECT_TIMEOUT :: non_neg_integer(),
18+
http_timeout = ?HTTP_TIMEOUT :: non_neg_integer(),
19+
schema_rewrite = [] :: list(),
20+
url_rewrite = [] :: list(),
21+
debug = [] :: list()
22+
}).
23+
24+
-endif.

include/job.hrl

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
%-define(T, 1000).
55

66
-record(job, {
7-
id :: reference(),
7+
id :: reference() | binary(),
88
type = 'rest',
99
method,
1010
url,

include/nums.hrl

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
-define(ejobman_nums, true).
33

44
-define(LOG_PROCS_INTERVAL, 60). % seconds
5+
-define(GID_DEFAULT, default).
56

67
-define(STAT_T_KEEP_MINUTES, 62).
78
-define(STAT_T_KEEP_HOURS, 26).

include/rabbit_session.hrl

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
-record(conn, {
1717
'channel' = false,
1818
'connection' = false,
19-
'consumer_tag' = false
19+
'consumer_tag' = false,
20+
'ticket' = false
2021
}).
2122

2223
-endif.

include/receiver.hrl

+6-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,12 @@
99
conn,
1010
log,
1111
pid_file,
12-
debug
12+
13+
% for this rt key look into job's group field for real rt key
14+
temp_rt_key_for_group :: binary(),
15+
16+
groups = [] :: list(),
17+
debug = [] :: list()
1318
}).
1419

1520
-endif.

src/ejobman.app.src

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{application, ejobman, [
22
{description, "ejobman"},
33
{id, "ejobman"},
4-
{vsn, "1.8"},
4+
{vsn, "2.2"},
55
{modules, [
66
ejobman_app,
77
ejobman_child,
@@ -10,9 +10,14 @@
1010
ejobman_conf,
1111
ejobman_conf_rabbit,
1212
ejobman_data,
13+
ejobman_group_handler_cmd,
14+
ejobman_group_handler,
15+
ejobman_group_handler_spawn,
16+
ejobman_group_sup,
1317
ejobman_handler_cmd,
1418
ejobman_handler,
1519
ejobman_log,
20+
ejobman_print_stat,
1621
ejobman_rb,
1722
ejobman_receiver_cmd,
1823
ejobman_receiver,
@@ -24,6 +29,7 @@
2429
ejobman_stat,
2530
ejobman_receiver,
2631
ejobman_handler,
32+
ejobman_group_supervisor,
2733
ejobman_child_supervisor
2834
]},
2935
{mod, {ejobman_app,[]}},

src/ejobman_child.erl

+4-5
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,6 @@ main_action(State) ->
160160
%%
161161
-spec process_cmd(#child{}) -> ok.
162162

163-
process_cmd(#child{from = 'undefined'}) ->
164-
ok;
165163
process_cmd(#child{url = <<>>}) ->
166164
ok;
167165
process_cmd(#child{url = [_ | _]} = St) ->
@@ -181,7 +179,7 @@ process_cmd(_) ->
181179
%% https://github.com/cmullaparthi/ibrowse
182180
%% @since 2011-07-18
183181
%%
184-
real_cmd(#child{id=Id, method=Method_bin, params=Params, tag=Tag,
182+
real_cmd(#child{id=Id, method=Method_bin, params=Params, tag=Tag, gh_pid=Gh_pid,
185183
http_connect_timeout=Conn_t, http_timeout=Http_t} = St) ->
186184
mpln_p_debug:pr({?MODULE, real_cmd, ?LINE, params, Id, self(),
187185
St}, St#child.debug, run, 4),
@@ -195,7 +193,7 @@ real_cmd(#child{id=Id, method=Method_bin, params=Params, tag=Tag,
195193
Req}, St#child.debug, http, 5),
196194
mpln_p_debug:pr({?MODULE, real_cmd, ?LINE, start, Id, self()},
197195
St#child.debug, run, 2),
198-
ejobman_receiver:send_ack(Id, Tag),
196+
ejobman_group_handler:send_ack(Gh_pid, Id, Tag),
199197
T1 = now(),
200198
ejobman_stat:add(Id, 'http_start',
201199
[{'header', mpln_misc_web:make_proplist_binary(Hdr)},
@@ -216,7 +214,8 @@ real_cmd(#child{id=Id, method=Method_bin, params=Params, tag=Tag,
216214
%%
217215
%% @doc sends result to ejobman_handler and ejobman_stat
218216
%%
219-
process_result(#child{id=Id, group=Group}, Res, T1, T2) ->
217+
process_result(#child{id=Id, gh_pid=Pid, group=Group}, Res, T1, T2) ->
218+
ejobman_group_handler:cmd_result(Pid, Res, T1, T2, Id),
220219
send_stat(Id, Res),
221220
ejobman_handler:cmd_result(Res, T1, T2, Group, Id).
222221

0 commit comments

Comments
 (0)