Skip to content

Commit

Permalink
Create better distribution of files over async threads
Browse files Browse the repository at this point in the history
The actual port id is used to create a key from the
pointer value which is the ErlDrvPort. To do this
a new driver api function driver_async_port_key is
added and the driver API minor version is updated.

The documentation is updated and the faulty description of
how to spread ports over async threads is updated to
use the new API.

Testcase also added.
  • Loading branch information
bufflig committed Aug 23, 2013
1 parent 7204e78 commit 6d1f1d4
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 10 deletions.
24 changes: 21 additions & 3 deletions erts/doc/src/erl_driver.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1981,7 +1981,7 @@ ERL_DRV_EXT2TERM char *buf, ErlDrvUInt len
thread, the following call can be used:</p>
<p></p>
<code type="none"><![CDATA[
unsigned int myKey = (unsigned int) myPort;
unsigned int myKey = driver_async_port_key(myPort);
r = driver_async(myPort, &myKey, myData, myFunc);
]]></code>
Expand Down Expand Up @@ -2021,6 +2021,24 @@ ERL_DRV_EXT2TERM char *buf, ErlDrvUInt len
</note>
</desc>
</func>
<func>
<name><ret>unsigned int</ret><nametext>driver_async_port_key (ErlDrvPort port)</nametext></name>
<fsummary>Calculate an async key from an ErlDrvPort</fsummary>
<desc>
<marker id="driver_async_port_key"></marker>
<p>This function calculates a key for later use in <seealso
marker="#driver_async">driver_async()</seealso>. The keys are
evenly distributed so that a fair mapping between port id's
and async thread id's is achieved.</p>
<note>
<p>Before OTP-R16, the actual port id could be used as a key
with proper casting, but after the rewrite of the port
subsystem, this is no longer the case. With this function, you
can achieve the same distribution based on port id's as before
OTP-R16.</p>
</note>
</desc>
</func>
<func>
<name><ret>int</ret><nametext>driver_async_cancel(long id)</nametext></name>
<fsummary>Cancel an asynchronous call</fsummary>
Expand All @@ -2033,10 +2051,10 @@ ERL_DRV_EXT2TERM char *buf, ErlDrvUInt len
The user had to implement synchronization of cancellation anyway.
It also unnecessarily complicated the implementation. Therefore,
as of OTP-R15B <c>driver_async_cancel()</c> is deprecated, and
scheduled for removal in OTP-R16. It will currently always fail,
scheduled for removal in OTP-R17. It will currently always fail,
and return 0.</p>
<warning><p><c>driver_async_cancel()</c> is deprecated and will
be removed in the OTP-R16 release.</p>
be removed in the OTP-R17 release.</p>
</warning>

</desc>
Expand Down
14 changes: 14 additions & 0 deletions erts/emulator/beam/erl_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,20 @@ int erts_async_ready_clean(void *varq, void *val)

#endif

/*
** Generate a fair async key prom an ErlDrvPort
** The port data gives a fair distribution grom port pointer
** to unsigned integer - to be used in key for driver_async below.
*/
unsigned int driver_async_port_key(ErlDrvPort port)
{
ErlDrvTermData td = driver_mk_port(port);
if (td == (ErlDrvTermData) NIL) {
return 0;
}
return (unsigned int) (UWord) internal_port_data(td);
}

/*
** Schedule async_invoke on a worker thread
** NOTE will be syncrounous when threads are unsupported
Expand Down
4 changes: 3 additions & 1 deletion erts/emulator/beam/erl_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ typedef struct {

#define ERL_DRV_EXTENDED_MARKER (0xfeeeeeed)
#define ERL_DRV_EXTENDED_MAJOR_VERSION 2
#define ERL_DRV_EXTENDED_MINOR_VERSION 1
#define ERL_DRV_EXTENDED_MINOR_VERSION 2

/*
* The emulator will refuse to load a driver with different major
Expand Down Expand Up @@ -638,6 +638,8 @@ EXTERN int erl_drv_send_term(ErlDrvTermData port,
int len);

/* Async IO functions */
EXTERN unsigned int driver_async_port_key(ErlDrvPort port);

EXTERN long driver_async(ErlDrvPort ix,
unsigned int* key,
void (*async_invoke)(void*),
Expand Down
3 changes: 2 additions & 1 deletion erts/emulator/drivers/common/efile_drv.c
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ file_init(void)
return 0;
}


/*********************************************************************
* Driver entry point -> start
*/
Expand All @@ -788,7 +789,7 @@ file_start(ErlDrvPort port, char* command)
}
desc->fd = FILE_FD_INVALID;
desc->port = port;
desc->key = (unsigned int) (UWord) port;
desc->key = driver_async_port_key(port);
desc->flags = 0;
desc->invoke = NULL;
desc->d = NULL;
Expand Down
4 changes: 4 additions & 0 deletions erts/emulator/sys/win32/erl_win_dyn_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ WDD_TYPEDEF(int, erl_drv_output_term, (ErlDrvTermData, ErlDrvTermData*, int));
WDD_TYPEDEF(int, driver_output_term, (ErlDrvPort, ErlDrvTermData*, int));
WDD_TYPEDEF(int, erl_drv_send_term, (ErlDrvTermData, ErlDrvTermData, ErlDrvTermData*, int));
WDD_TYPEDEF(int, driver_send_term, (ErlDrvPort, ErlDrvTermData, ErlDrvTermData*, int));
WDD_TYPEDEF(unsigned int, driver_async_port_key, (ErlDrvPort));
WDD_TYPEDEF(long, driver_async, (ErlDrvPort,unsigned int*,void (*)(void*),void*,void (*)(void*)));
WDD_TYPEDEF(int, driver_async_cancel, (unsigned int));
WDD_TYPEDEF(int, driver_lock_driver, (ErlDrvPort));
Expand Down Expand Up @@ -197,6 +198,7 @@ typedef struct {
WDD_FTYPE(driver_output_term) *driver_output_term;
WDD_FTYPE(erl_drv_send_term) *erl_drv_send_term;
WDD_FTYPE(driver_send_term) *driver_send_term;
WDD_FTYPE(driver_async_port_key) *driver_async_port_key;
WDD_FTYPE(driver_async) *driver_async;
WDD_FTYPE(driver_async_cancel) *driver_async_cancel;
WDD_FTYPE(driver_lock_driver) *driver_lock_driver;
Expand Down Expand Up @@ -308,6 +310,7 @@ extern TWinDynDriverCallbacks WinDynDriverCallbacks;
#define driver_output_term (WinDynDriverCallbacks.driver_output_term)
#define erl_drv_send_term (WinDynDriverCallbacks.erl_drv_send_term)
#define driver_send_term (WinDynDriverCallbacks.driver_send_term)
#define driver_async_port_key (WinDynDriverCallbacks.driver_async_port_key)
#define driver_async (WinDynDriverCallbacks.driver_async)
#define driver_async_cancel (WinDynDriverCallbacks.driver_async_cancel)
#define driver_lock_driver (WinDynDriverCallbacks.driver_lock_driver)
Expand Down Expand Up @@ -443,6 +446,7 @@ do { \
((W).driver_output_term) = driver_output_term; \
((W).erl_drv_send_term) = erl_drv_send_term; \
((W).driver_send_term) = driver_send_term; \
((W).driver_async_port_key) = driver_async_port_key; \
((W).driver_async) = driver_async; \
((W).driver_async_cancel) = driver_async_cancel; \
((W).driver_lock_driver) = driver_lock_driver; \
Expand Down
88 changes: 83 additions & 5 deletions erts/emulator/test/efile_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@
-module(efile_SUITE).
-export([all/0, suite/0,groups/0,init_per_suite/1, end_per_suite/1,
init_per_group/2,end_per_group/2]).
-export([iter_max_files/1]).
-export([iter_max_files/1, async_dist/1]).

-export([do_iter_max_files/2]).
-export([do_iter_max_files/2, do_async_dist/1]).

-include_lib("test_server/include/test_server.hrl").

suite() -> [{ct_hooks,[ts_install_cth]}].

all() ->
[iter_max_files].
[iter_max_files, async_dist].

groups() ->
[].
Expand All @@ -45,6 +45,84 @@ init_per_group(_GroupName, Config) ->
end_per_group(_GroupName, Config) ->
Config.

do_async_dist(Dir) ->
X = 100,
AT = erlang:system_info(thread_pool_size),
Keys = file_keys(Dir,AT*X,[],[]),
Tab = ets:new(x,[ordered_set]),
[ ets:insert(Tab,{N,0}) || N <- lists:seq(0,AT-1) ],
[ ets:update_counter(Tab,(N rem AT),1) || N <- Keys ],
Res = [ V || {_,V} <- ets:tab2list(Tab) ],
ets:delete(Tab),
{Res, sdev(Res)/X}.

sdev(List) ->
Len = length(List),
Mean = lists:sum(List)/Len,
math:sqrt(lists:sum([ (X - Mean) * (X - Mean) || X <- List ]) / Len).

file_keys(_,0,FdList,FnList) ->
[ file:close(FD) || FD <- FdList ],
[ file:delete(FN) || FN <- FnList ],
[];
file_keys(Dir,Num,FdList,FnList) ->
Name = "dummy"++integer_to_list(Num),
FN = filename:join([Dir,Name]),
case file:open(FN,[write,raw]) of
{ok,FD} ->
{file_descriptor,prim_file,{Port,_}} = FD,
<<X:32/integer-big>> =
iolist_to_binary(erlang:port_control(Port,$K,[])),
[X | file_keys(Dir,Num-1,[FD|FdList],[FN|FnList])];
{error,_} ->
% Try freeing up FD's if there are any
case FdList of
[] ->
exit({cannot_open_file,FN});
_ ->
[ file:close(FD) || FD <- FdList ],
[ file:delete(F) || F <- FnList ],
file_keys(Dir,Num,[],[])
end
end.

async_dist(doc) ->
"Check that the distribution of files over async threads is fair";
async_dist(Config) when is_list(Config) ->
DataDir = ?config(data_dir,Config),
TestFile = filename:join(DataDir, "existing_file"),
Dir = filename:dirname(code:which(?MODULE)),
AsyncSizes = [7,10,100,255,256,64,63,65],
Max = 0.5,

lists:foreach(fun(Size) ->
{ok,Node} =
test_server:start_node
(test_iter_max_files,slave,
[{args,
"+A "++integer_to_list(Size)++
" -pa " ++ Dir}]),
{Distr,SD} = rpc:call(Node,?MODULE,do_async_dist,
[DataDir]),
test_server:stop_node(Node),
if
SD > Max ->
io:format("Bad async queue distribution for "
"~p async threads:~n"
" Standard deviation is ~p~n"
" Key distribution:~n ~lp~n",
[Size,SD,Distr]),
exit({bad_async_dist,Size,SD,Distr});
true ->
io:format("OK async queue distribution for "
"~p async threads:~n"
" Standard deviation is ~p~n"
" Key distribution:~n ~lp~n",
[Size,SD,Distr]),
ok
end
end, AsyncSizes),
ok.

%%
%% Open as many files as possible. Do this several times and check
Expand Down Expand Up @@ -98,7 +176,7 @@ open_files(Name) ->
?line case file:open(Name, [read,raw]) of
{ok, Fd} ->
[Fd| open_files(Name)];
{error, Reason} ->
% io:format("Error reason: ~p", [Reason]),
{error, _Reason} ->
% io:format("Error reason: ~p", [_Reason]),
[]
end.

0 comments on commit 6d1f1d4

Please sign in to comment.