Skip to content

Commit

Permalink
Merge branch 'bmk/esock/20240405/sendv/OTP-18845'
Browse files Browse the repository at this point in the history
  • Loading branch information
bmk committed Apr 10, 2024
2 parents 335741f + eb092d6 commit 44a98ee
Show file tree
Hide file tree
Showing 18 changed files with 1,525 additions and 53 deletions.
2 changes: 2 additions & 0 deletions erts/emulator/nifs/common/prim_socket_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,8 @@ extern ESockCmsgSpec* esock_lookup_cmsg_spec(ESockCmsgSpec* table,
size_t num,
ERL_NIF_TERM eType);



/* *** Sendfile 'stuff' ***
*/
#ifdef HAVE_SENDFILE
Expand Down
121 changes: 112 additions & 9 deletions erts/emulator/nifs/common/prim_socket_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,7 @@ const int esock_ioctl_flags_length = NUM(esock_ioctl_flags);
/* #define sock_send(s,buf,len,flag) send((s),(buf),(len),(flag)) */
/* #define sock_sendto(s,buf,blen,flag,addr,alen) \
sendto((s),(buf),(blen),(flag),(addr),(alen)) */
/* #define sock_sendv(s,iov,iovlen) writev((s),(iov),(iovlen)) */
#define sock_setopt(s,l,o,v,ln) setsockopt((s),(l),(o),(v),(ln))
#define sock_shutdown(s, how) shutdown((s), (how))

Expand Down Expand Up @@ -1024,6 +1025,7 @@ ESockSendfileCounters initESockSendfileCounters =
* nif_send
* nif_sendto
* nif_sendmsg
* nif_sendv
* nif_sendfile
* nif_recv
* nif_recvfrom
Expand Down Expand Up @@ -1104,6 +1106,7 @@ typedef struct {
ESockIOSend send;
ESockIOSendTo sendto;
ESockIOSendMsg sendmsg;
ESockIOSendv sendv;
ESockIOSendFileStart sendfile_start;
ESockIOSendFileContinue sendfile_cont;
ESockIOSendFileDeferredClose sendfile_dc;
Expand Down Expand Up @@ -2241,6 +2244,7 @@ static const struct in6_addr in6addr_loopback =
GLOBAL_ATOM_DECL(sendmsg); \
GLOBAL_ATOM_DECL(sendsrcaddr); \
GLOBAL_ATOM_DECL(sendto); \
GLOBAL_ATOM_DECL(sendv); \
GLOBAL_ATOM_DECL(seqpacket); \
GLOBAL_ATOM_DECL(setfib); \
GLOBAL_ATOM_DECL(set_peer_primary_addr); \
Expand Down Expand Up @@ -2572,12 +2576,19 @@ static ESockIoBackend io_backend = {0};
(SOCKR), (SENDR), \
(DP), (F), (TAP), (TAL)) : \
enif_raise_exception((ENV), MKA((ENV), "notsup")))
#define ESOCK_IO_SENDMSG(ENV, D, \
SOCKR, SENDR, EM, F, EIOV) \
((io_backend.sendmsg != NULL) ? \
io_backend.sendmsg((ENV), (D), \
(SOCKR), (SENDR), \
(EM), (F), (EIOV), &data) : \
#define ESOCK_IO_SENDMSG(ENV, D, \
SOCKR, SENDR, EM, F, EIOV) \
((io_backend.sendmsg != NULL) ? \
io_backend.sendmsg((ENV), (D), \
(SOCKR), (SENDR), \
(EM), (F), (EIOV), &data) : \
enif_raise_exception((ENV), MKA((ENV), "notsup")))
#define ESOCK_IO_SENDV(ENV, D, \
SOCKR, SENDR, EIOV) \
((io_backend.sendv != NULL) ? \
io_backend.sendv((ENV), (D), \
(SOCKR), (SENDR), \
(EIOV), &data) : \
enif_raise_exception((ENV), MKA((ENV), "notsup")))
#define ESOCK_IO_SENDFILE_START(ENV, D, \
SOR, SNR, \
Expand Down Expand Up @@ -3894,9 +3905,10 @@ extern ErlNifEnv* esock_alloc_env(const char* slogan)
* nif_connect(Sock, SockAddr)
* nif_listen(Sock, Backlog)
* nif_accept(LSock, Ref)
* nif_send(Sock, SendRef, Data, Flags)
* nif_sendto(Sock, SendRef, Data, Dest, Flags)
* nif_sendmsg(Sock, SendRef, Msg, Flags)
* nif_send(Sock, Data, Flags, SendRef)
* nif_sendto(Sock, Data, Dest, Flags, SendRef)
* nif_sendmsg(Sock, Msg, Flags, SendRef, IOV)
* nif_sendv(Sock, IOV, SendRef)
* nif_sendfile(Sock, SendRef, Offset, Count, InFileRef)
* nif_sendfile(Sock, SendRef, Offset, Count)
* nif_sendfile(Sock)
Expand Down Expand Up @@ -5768,6 +5780,69 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env,



/* ----------------------------------------------------------------------
* nif_sendv
*
* Description:
* Send a message (in the form of a list of binaries = I/O vector) on a
* socket.
*
* Arguments:
* Socket (ref) - Points to the socket descriptor.
* IOV - List of binaries
* SendRef - A unique id reference() for this (sendv) request.
*/

static
ERL_NIF_TERM nif_sendv(ErlNifEnv* env,
int argc,
const ERL_NIF_TERM argv[])
{
ERL_NIF_TERM res, sockRef, sendRef, eIOV;
ESockDescriptor* descP;

ESOCK_ASSERT( argc == 3 );

SGDBG( ("SOCKET", "nif_sendv -> entry with argc: %d\r\n", argc) );

sockRef = argv[0]; // We need this in case we send abort (to the caller)
eIOV = argv[1];
sendRef = argv[2];

if (! ESOCK_GET_RESOURCE(env, sockRef, (void**) &descP)) {
SGDBG( ("SOCKET", "nif_sendv -> get resource failed\r\n") );
return enif_make_badarg(env);
}

/* Extract arguments and perform preliminary validation */

if (! enif_is_ref(env, sendRef)) {
SSDBG( descP, ("SOCKET", "nif_sendv -> argv decode failed\r\n") );
return enif_make_badarg(env);
}

MLOCK(descP->writeMtx);

SSDBG( descP,
("SOCKET", "nif_sendv(%T), {%d,0x%X} ->"
"\r\n sendRef: %T"
"\r\n",
sockRef, descP->sock, descP->writeState, sendRef) );

res = ESOCK_IO_SENDV(env, descP, sockRef, sendRef, eIOV);

MUNLOCK(descP->writeMtx);

SSDBG( descP, ("SOCKET", "nif_sendv(%T) -> done with"
"\r\n res: %T"
"\r\n", sockRef, res) );

return res;

}



#ifdef FOOBAR

/* ----------------------------------------------------------------------
Expand Down Expand Up @@ -10462,6 +10537,12 @@ ERL_NIF_TERM esock_cancel(ErlNifEnv* env,
MUNLOCK(descP->writeMtx);
return result;
}
if (COMPARE(op, esock_atom_sendv) == 0) {
MLOCK(descP->writeMtx);
result = ESOCK_IO_CANCEL_SEND(env, descP, sockRef, opRef);
MUNLOCK(descP->writeMtx);
return result;
}
}
}

Expand Down Expand Up @@ -13492,6 +13573,7 @@ ErlNifFunc esock_funcs[] =
{"nif_send", 4, nif_send, 0},
{"nif_sendto", 5, nif_sendto, 0},
{"nif_sendmsg", 5, nif_sendmsg, 0},
{"nif_sendv", 3, nif_sendv, 0},
{"nif_sendfile", 5, nif_sendfile, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"nif_sendfile", 4, nif_sendfile, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"nif_sendfile", 1, nif_sendfile, ERL_NIF_DIRTY_JOB_IO_BOUND},
Expand Down Expand Up @@ -13555,19 +13637,22 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
unsigned int ioNumThreads, ioNumThreadsDef;

/* +++ Local atoms and error reason atoms +++ */
// ESOCK_EPRINTF("\r\n[ESOCK] create local atoms\r\n");
#define LOCAL_ATOM_DECL(A) atom_##A = MKA(env, #A)
LOCAL_ATOMS;
// LOCAL_ERROR_REASON_ATOMS;
#undef LOCAL_ATOM_DECL

/* Global atom(s) and error reason atom(s) */
// ESOCK_EPRINTF("\r\n[ESOCK] create global atoms\r\n");
#define GLOBAL_ATOM_DECL(A) esock_atom_##A = MKA(env, #A)
GLOBAL_ATOMS;
GLOBAL_ERROR_REASON_ATOMS;
#undef GLOBAL_ATOM_DECL

esock_atom_socket_tag = MKA(env, "$socket");

// ESOCK_EPRINTF("\r\n[ESOCK] get registry pid\r\n");
if (! esock_extract_pid_from_map(env, load_info,
atom_registry,
&data.regPid)) {
Expand All @@ -13576,25 +13661,29 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
}

/* --esock-disable-registry */
// ESOCK_EPRINTF("\r\n[ESOCK] get use-registry\r\n");
data.useReg =
esock_get_bool_from_map(env, load_info,
esock_atom_use_registry,
ESOCK_USE_SOCKET_REGISTRY);

/* --esock-enable-iow */
// ESOCK_EPRINTF("\r\n[ESOCK] get enable-iow\r\n");
data.iow =
esock_get_bool_from_map(env, load_info,
atom_iow,
ESOCK_NIF_IOW_DEFAULT);

/* --enable-extended-error-info */
// ESOCK_EPRINTF("\r\n[ESOCK] maybe enable eei\r\n");
#if defined(ESOCK_USE_EXTENDED_ERROR_INFO)
data.eei = TRUE;
#else
data.eei = FALSE;
#endif

/* --esock-debug-file=<filename> */
// ESOCK_EPRINTF("\r\n[ESOCK] debug filename\r\n");
{
char *debug_filename;

Expand All @@ -13617,9 +13706,11 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
FREE(debug_filename);
}

// ESOCK_EPRINTF("\r\n[ESOCK] create protocols mutex\r\n");
data.protocolsMtx = MCREATE("esock.protocols");

/* +++ Global Counters +++ */
// ESOCK_EPRINTF("\r\n[ESOCK] create global counters mutex (and init counters)\r\n");
data.cntMtx = MCREATE("esock.gcnt");
data.numSockets = 0;
data.numTypeDGrams = 0;
Expand All @@ -13634,6 +13725,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
data.numProtoSCTP = 0;


// ESOCK_EPRINTF("\r\n[ESOCK] init opts and cmsg tables\r\n");
initOpts();
initCmsgTables();

Expand Down Expand Up @@ -13664,6 +13756,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
#endif


// ESOCK_EPRINTF("\r\n[ESOCK] init IOV max\r\n");
data.iov_max =
#if defined(NO_SYSCONF) || (! defined(_SC_IOV_MAX))
# ifdef IOV_MAX
Expand All @@ -13679,13 +13772,15 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)


/* This is (currently) intended for Windows use */
// ESOCK_EPRINTF("\r\n[ESOCK] (win) system info\r\n");
enif_system_info(&sysInfo, sizeof(ErlNifSysInfo));

/* We should have a config options for this:
* --esock-num-io-threads=16
*
* ESOCK_IO_NUM_THREADS
*/
// ESOCK_EPRINTF("\r\n[ESOCK] (win) number of schedulers\r\n");
ioNumThreadsDef =
(unsigned int) (sysInfo.scheduler_threads > 0) ?
2*sysInfo.scheduler_threads : 2;
Expand All @@ -13694,6 +13789,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
atom_io_num_threads,
ioNumThreadsDef);

// ESOCK_EPRINTF("\r\n[ESOCK] init I/O backend callbacks\r\n");
#ifdef __WIN32__

io_backend.init = esaio_init;
Expand All @@ -13713,6 +13809,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
io_backend.send = esaio_send;
io_backend.sendto = esaio_sendto;
io_backend.sendmsg = esaio_sendmsg;
io_backend.sendv = esaio_sendv;
io_backend.sendfile_start = NULL;
io_backend.sendfile_cont = NULL;
io_backend.sendfile_dc = NULL;
Expand Down Expand Up @@ -13763,6 +13860,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
io_backend.send = essio_send;
io_backend.sendto = essio_sendto;
io_backend.sendmsg = essio_sendmsg;
io_backend.sendv = essio_sendv;
io_backend.sendfile_start = essio_sendfile_start;
io_backend.sendfile_cont = essio_sendfile_cont;
io_backend.sendfile_dc = essio_sendfile_deferred_close;
Expand Down Expand Up @@ -13796,17 +13894,22 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)

#endif

// ESOCK_EPRINTF("\r\n[ESOCK] init I/O backend\r\n");
if (ESOCK_IO_INIT(ioNumThreads) != ESOCK_IO_OK) {
esock_error_msg("Failed initiating I/O backend");
return 1; // Failure
}

// ESOCK_EPRINTF("\r\n[ESOCK] open socket (nif) resource\r\n");
esocks = enif_open_resource_type_x(env,
"sockets",
&esockInit,
ERL_NIF_RT_CREATE,
NULL);

/* ESOCK_EPRINTF("\r\n[ESOCK] open socket (nif) resource res: 0x%lX\r\n", */
/* esocks); */

if (esocks != NULL) {
int ores;

Expand Down
6 changes: 6 additions & 0 deletions erts/emulator/nifs/common/socket_asyncio.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ extern ERL_NIF_TERM esaio_sendmsg(ErlNifEnv* env,
int flags,
ERL_NIF_TERM eIOV,
const ESockData* dataP);
extern ERL_NIF_TERM esaio_sendv(ErlNifEnv* env,
ESockDescriptor* descP,
ERL_NIF_TERM sockRef,
ERL_NIF_TERM sendRef,
ERL_NIF_TERM eIOV,
const ESockData* dataP);
/*
extern
ERL_NIF_TERM esaio_sendfile_start(ErlNifEnv* env,
Expand Down
24 changes: 14 additions & 10 deletions erts/emulator/nifs/common/socket_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ typedef long ssize_t;
GLOBAL_ATOM_DEF(sendmsg); \
GLOBAL_ATOM_DEF(sendsrcaddr); \
GLOBAL_ATOM_DEF(sendto); \
GLOBAL_ATOM_DEF(sendv); \
GLOBAL_ATOM_DEF(seqpacket); \
GLOBAL_ATOM_DEF(setfib); \
GLOBAL_ATOM_DEF(set_peer_primary_addr); \
Expand Down Expand Up @@ -656,24 +657,27 @@ GLOBAL_ERROR_REASON_ATOM_DEFS;
#define IS_UNDEFINED(T) IS_IDENTICAL((T), esock_atom_undefined)
#define IS_OK(T) IS_IDENTICAL((T), esock_atom_ok)

#define IS_ATOM(E, TE) enif_is_atom((E), (TE))
#define IS_BIN(E, TE) enif_is_binary((E), (TE))
#define IS_LIST(E, TE) enif_is_list((E), (TE))
#define IS_MAP(E, TE) enif_is_map((E), (TE))
#define IS_NUM(E, TE) enif_is_number((E), (TE))
#define IS_TUPLE(E, TE) enif_is_tuple((E), (TE))
#define IS_INTEGER(E, TE) esock_is_integer((E), (TE))
#define IS_ATOM(E, TE) enif_is_atom((E), (TE))
#define IS_BIN(E, TE) enif_is_binary((E), (TE))
#define IS_LIST(E, TE) enif_is_list((E), (TE))
#define IS_MAP(E, TE) enif_is_map((E), (TE))
#define IS_NUM(E, TE) enif_is_number((E), (TE))
#define IS_TUPLE(E, TE) enif_is_tuple((E), (TE))
#define IS_INTEGER(E, TE) esock_is_integer((E), (TE))

#define IS_PID_UNDEF(P) enif_is_pid_undefined((P))
#define SET_PID_UNDEF(P) enif_set_pid_undefined((P))
#define IS_PID_UNDEF(P) enif_is_pid_undefined((P))
#define SET_PID_UNDEF(P) enif_set_pid_undefined((P))

#define GET_ATOM_LEN(E, TE, LP) \
enif_get_atom_length((E), (TE), (LP), ERL_NIF_LATIN1)
#define GET_ATOM(E, TE, BP, MAX) \
enif_get_atom((E), (TE), (BP), (MAX), ERL_NIF_LATIN1)
#define GET_BIN(E, TE, BP) enif_inspect_iolist_as_binary((E), (TE), (BP))
#define GET_BIN(E, TE, BP) \
enif_inspect_iolist_as_binary((E), (TE), (BP))
#define GET_INT(E, TE, IP) enif_get_int((E), (TE), (IP))
#define GET_INT64(E, TE, IP) enif_get_int64((E), (TE), (IP))
#define GET_IOV(ME, EIOV, T, IOV) \
enif_inspect_iovec(NULL, (ME), (EIOV), (T), (IOV))
#define GET_LIST_ELEM(E, L, HP, TP) enif_get_list_cell((E), (L), (HP), (TP))
#define GET_LIST_LEN(E, L, LP) enif_get_list_length((E), (L), (LP))
#define GET_LONG(E, TE, LP) enif_get_long((E), (TE), (LP))
Expand Down
7 changes: 7 additions & 0 deletions erts/emulator/nifs/common/socket_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ typedef ERL_NIF_TERM (*ESockIOSendMsg)(ErlNifEnv* env,
ERL_NIF_TERM eIOV,
const ESockData* dataP);

typedef ERL_NIF_TERM (*ESockIOSendv)(ErlNifEnv* env,
ESockDescriptor* descP,
ERL_NIF_TERM sockRef,
ERL_NIF_TERM sendRef,
ERL_NIF_TERM eIOV,
const ESockData* dataP);

typedef ERL_NIF_TERM (*ESockIOSendFileStart)(ErlNifEnv* env,
ESockDescriptor* descP,
ERL_NIF_TERM sockRef,
Expand Down
Loading

0 comments on commit 44a98ee

Please sign in to comment.