Skip to content

Commit

Permalink
Make new link protocol mandatory
Browse files Browse the repository at this point in the history
  • Loading branch information
rickard-green committed Jun 13, 2022
1 parent c3516b5 commit 92584b8
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 421 deletions.
379 changes: 170 additions & 209 deletions erts/doc/src/erl_dist_protocol.xml

Large diffs are not rendered by default.

46 changes: 13 additions & 33 deletions erts/emulator/beam/dist.c
Original file line number Diff line number Diff line change
Expand Up @@ -1331,28 +1331,14 @@ erts_dsig_send_unlink(ErtsDSigSendContext *ctx, Eterm local, Eterm remote, Uint6
Eterm big_heap[ERTS_MAX_UINT64_HEAP_SIZE];
Eterm unlink_id;
Eterm ctl;
if (ctx->dflags & DFLAG_UNLINK_ID) {
if (IS_USMALL(0, id))
unlink_id = make_small(id);
else {
Eterm *hp = &big_heap[0];
unlink_id = erts_uint64_to_big(id, &hp);
}
ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_UNLINK_ID),
unlink_id, local, remote);
}
if (IS_USMALL(0, id))
unlink_id = make_small(id);
else {
/*
* A node that isn't capable of talking the new link protocol.
*
* Send an old unlink op, and send ourselves an unlink-ack. We may
* end up in an inconsistent state as we could before the new link
* protocol was introduced...
*/
erts_proc_sig_send_dist_unlink_ack(ctx->dep, ctx->connection_id,
remote, local, id);
ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_UNLINK), local, remote);
Eterm *hp = &big_heap[0];
unlink_id = erts_uint64_to_big(id, &hp);
}
ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_UNLINK_ID),
unlink_id, local, remote);
return dsig_send_ctl(ctx, ctl);
}

Expand All @@ -1363,11 +1349,6 @@ erts_dsig_send_unlink_ack(ErtsDSigSendContext *ctx, Eterm local, Eterm remote, U
Eterm unlink_id;
Eterm ctl;

if (!(ctx->dflags & DFLAG_UNLINK_ID)) {
/* Receiving node does not understand it, so drop it... */
return ERTS_DSIG_SEND_OK;
}

if (IS_USMALL(0, id))
unlink_id = make_small(id);
else {
Expand Down Expand Up @@ -2136,6 +2117,13 @@ int erts_net_message(Port *prt,
break;
}

case DOP_UNLINK:
/*
* DOP_UNLINK should never be passed. The new link protocol is
* mandatory as of OTP 26.
*/
goto invalid_message;

case DOP_UNLINK_ID: {
Eterm *element;
Uint64 id;
Expand All @@ -2149,14 +2137,6 @@ int erts_net_message(Port *prt,
if (id == 0)
goto invalid_message;

if (0) {
case DOP_UNLINK:
if (tuple_arity != 3)
goto invalid_message;
element = &tuple[2];
id = 0;
}

from = *(element++);
to = *element;
if (is_not_external_pid(from))
Expand Down
7 changes: 3 additions & 4 deletions erts/emulator/beam/dist.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@
| DFLAG_HANDSHAKE_23)

/* New mandatory flags for distribution in OTP 26 */
#define DFLAG_DIST_MANDATORY_26 (DFLAG_V4_NC)
#define DFLAG_DIST_MANDATORY_26 (DFLAG_V4_NC \
| DFLAG_UNLINK_ID)

/* Mandatory flags for distribution. */
#define DFLAG_DIST_MANDATORY (DFLAG_DIST_MANDATORY_25 \
Expand All @@ -99,8 +100,7 @@
#define DFLAG_DIST_HOPEFULLY (DFLAG_DIST_MONITOR \
| DFLAG_DIST_MONITOR_NAME \
| DFLAG_SPAWN \
| DFLAG_ALIAS \
| DFLAG_UNLINK_ID)
| DFLAG_ALIAS)

/* Our preferred set of flags. Used for connection setup handshake */
#define DFLAG_DIST_DEFAULT (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY \
Expand All @@ -113,7 +113,6 @@
| DFLAG_FRAGMENTS \
| DFLAG_SPAWN \
| DFLAG_ALIAS \
| DFLAG_UNLINK_ID \
| DFLAG_MANDATORY_25_DIGEST)

/* Flags addable by local distr implementations */
Expand Down
112 changes: 0 additions & 112 deletions erts/emulator/beam/external.c
Original file line number Diff line number Diff line change
Expand Up @@ -5968,118 +5968,6 @@ Sint transcode_dist_obuf(ErtsDistOutputBuf* ob,
return 0;
return reds;
}

if ((~dflags & DFLAG_UNLINK_ID)
&& ep[0] == SMALL_TUPLE_EXT
&& ep[1] == 4
&& ep[2] == SMALL_INTEGER_EXT
&& (ep[3] == DOP_UNLINK_ID_ACK || ep[3] == DOP_UNLINK_ID)) {

if (ep[3] == DOP_UNLINK_ID_ACK) {
/* Drop DOP_UNLINK_ID_ACK signal... */
int i;
for (i = 1; i < ob->eiov->vsize; i++) {
if (ob->eiov->binv[i])
driver_free_binary(ob->eiov->binv[i]);
}
ob->eiov->vsize = 1;
ob->eiov->size = 0;
}
else {
Eterm ctl_msg, remote, local, *tp;
ErtsTranscodeDecodeState tds;
Uint64 id;
byte *ptr;
ASSERT(ep[3] == DOP_UNLINK_ID);
/*
* Rewrite the DOP_UNLINK_ID signal into a
* DOP_UNLINK signal and send an unlink ack
* to the local sender.
*/

/*
* decode control message so we get info
* needed for unlink ack signal to send...
*/
ASSERT(get_int32(hdr + 4) == 0); /* No payload */
ctl_msg = transcode_decode_ctl_msg(&tds, iov, eiov->vsize);

ASSERT(is_tuple_arity(ctl_msg, 4));

tp = tuple_val(ctl_msg);
ASSERT(tp[1] == make_small(DOP_UNLINK_ID));

if (!term_to_Uint64(tp[2], &id))
ERTS_INTERNAL_ERROR("Invalid encoding of DOP_UNLINK_ID signal");

local = tp[3];
remote = tp[4];

ASSERT(is_internal_pid(local));
ASSERT(is_external_pid(remote));

/*
* Rewrite buffer to an unlink signal by removing
* second element and change first element to
* DOP_UNLINK. That is, to: {DOP_UNLINK, local, remote}
*/

ptr = &ep[4];
switch (*ptr) {
case SMALL_INTEGER_EXT:
ptr += 1;
break;
case INTEGER_EXT:
ptr += 4;
break;
case SMALL_BIG_EXT:
ptr += 1;
ASSERT(*ptr <= 8);
ptr += *ptr + 1;
break;
default:
ERTS_INTERNAL_ERROR("Invalid encoding of DOP_UNLINK_ID signal");
break;
}

ASSERT((ptr - ep) <= 16);
ASSERT((ptr - ep) <= iov[2].iov_len);

*(ptr--) = DOP_UNLINK;
*(ptr--) = SMALL_INTEGER_EXT;
*(ptr--) = 3;
*ptr = SMALL_TUPLE_EXT;

iov[2].iov_base = ptr;
iov[2].iov_len -= (ptr - ep);

#ifdef DEBUG
{
ErtsTranscodeDecodeState dbg_tds;
Eterm new_ctl_msg = transcode_decode_ctl_msg(&dbg_tds,
iov,
eiov->vsize);
ASSERT(is_tuple_arity(new_ctl_msg, 3));
tp = tuple_val(new_ctl_msg);
ASSERT(tp[1] == make_small(DOP_UNLINK));
ASSERT(tp[2] == local);
ASSERT(eq(tp[3], remote));
transcode_decode_state_destroy(&dbg_tds);
}
#endif

/* Send unlink ack to local sender... */
erts_proc_sig_send_dist_unlink_ack(dep, dep->connection_id,
remote, local, id);

transcode_decode_state_destroy(&tds);

reds -= 5;
}
if (reds < 0)
return 0;
return reds;
}

start_r = r = reds*ERTS_TRANSCODE_REDS_FACT;

Expand Down
3 changes: 1 addition & 2 deletions lib/erl_interface/src/connect/ei_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -2243,8 +2243,7 @@ static DistFlags preferred_flags(void)
DFLAG_MANDATORY_25_DIGEST
| DFLAG_DIST_MANDATORY
| DFLAG_DIST_MONITOR
| DFLAG_SMALL_ATOM_TAGS
| DFLAG_UNLINK_ID;
| DFLAG_SMALL_ATOM_TAGS;
return flags;
}

Expand Down
3 changes: 2 additions & 1 deletion lib/erl_interface/src/connect/ei_connect_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ typedef EI_ULONGLONG DistFlags;
| DFLAG_HANDSHAKE_23)

/* New mandatory flags for distribution in OTP 26. */
#define DFLAG_DIST_MANDATORY_26 (DFLAG_V4_NC)
#define DFLAG_DIST_MANDATORY_26 (DFLAG_V4_NC \
| DFLAG_UNLINK_ID)

/* Mandatory flags for distribution. */

Expand Down
6 changes: 4 additions & 2 deletions lib/erl_interface/test/ei_tmo_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ end_per_testcase(_Case, _Config) ->
-define(DFLAG_MAP_TAG, 16#20000).
-define(DFLAG_BIG_CREATION, 16#40000).
-define(DFLAG_HANDSHAKE_23, 16#1000000).
-define(DFLAG_UNLINK_ID, 16#2000000).
-define(DFLAG_MANDATORY_25_DIGEST, 16#4000000).
-define(DFLAG_V4_NC, 16#400000000).

Expand All @@ -100,10 +101,11 @@ end_per_testcase(_Case, _Config) ->
?DFLAG_BIT_BINARIES bor
?DFLAG_HANDSHAKE_23)).

%% From OTP 26 V4_NC is compulsory.
%% From OTP 26 V4_NC, and UNLINK_ID are compulsory.

-define(DFLAGS_MANDATORY_26,
(?DFLAG_V4_NC)).
(?DFLAG_V4_NC bor
?DFLAG_UNLINK_ID)).

-define(COMPULSORY_DFLAGS,
(?DFLAGS_MANDATORY_25 bor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,29 +425,12 @@ protected void sendUnlink(final OtpErlangPid from, final OtpErlangPid dest,
header.write1(passThrough);
header.write1(version);

if ((peer.flags & AbstractNode.dFlagUnlinkId) != 0) {
// header
header.write_tuple_head(4);
header.write_long(unlinkIdTag);
header.write_long(unlink_id);
header.write_any(from);
header.write_any(dest);
}
else {
/*
* A node that isn't capable of talking the new link protocol.
*
* Send an old unlink op, and send ourselves an unlink-ack. We may
* end up in an inconsistent state as we could before the new link
* protocol was introduced...
*/
// header
header.write_tuple_head(3);
header.write_long(unlinkTag);
header.write_any(from);
header.write_any(dest);
deliver(new OtpMsg(unlinkIdAckTag, dest, from, unlink_id));
}
// header
header.write_tuple_head(4);
header.write_long(unlinkIdTag);
header.write_long(unlink_id);
header.write_any(from);
header.write_any(dest);

// fix up length in preamble
header.poke4BE(0, header.size() - 4);
Expand All @@ -471,26 +454,25 @@ protected void sendUnlinkAck(final OtpErlangPid from, final OtpErlangPid dest,
if (!connected) {
throw new IOException("Not connected");
}
if ((peer.flags & AbstractNode.dFlagUnlinkId) != 0) {
@SuppressWarnings("resource")

@SuppressWarnings("resource")
final OtpOutputStream header = new OtpOutputStream(headerLen);

// preamble: 4 byte length + "passthrough" tag
header.write4BE(0); // reserve space for length
header.write1(passThrough);
header.write1(version);
// preamble: 4 byte length + "passthrough" tag
header.write4BE(0); // reserve space for length
header.write1(passThrough);
header.write1(version);

// header
header.write_tuple_head(4);
header.write_long(unlinkIdAckTag);
header.write_long(unlink_id);
header.write_any(from);
header.write_any(dest);
// fix up length in preamble
header.poke4BE(0, header.size() - 4);
// header
header.write_tuple_head(4);
header.write_long(unlinkIdAckTag);
header.write_long(unlink_id);
header.write_any(from);
header.write_any(dest);
// fix up length in preamble
header.poke4BE(0, header.size() - 4);

do_send(header);
}
do_send(header);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public class AbstractNode implements OtpTransportFactory {
| dFlagHandshake23;

/* New mandatory flags in OTP 26 */
static final long mandatoryFlags26 = dFlagV4PidsRefs;
static final long mandatoryFlags26 = dFlagV4PidsRefs
| dFlagUnlinkId;

/* Mandatory flags for distribution. Keep them in sync with
DFLAG_DIST_MANDATORY in erts/emulator/beam/dist.h. */
Expand All @@ -125,7 +126,6 @@ public class AbstractNode implements OtpTransportFactory {
int distLow = 6; // Cannot talk to nodes before OTP 23
private int creation = 0x710000;
long flags = mandatoryFlags
| dFlagUnlinkId
| dFlagMandatory25Digest;

/* initialize hostname and default cookie */
Expand Down
3 changes: 2 additions & 1 deletion lib/kernel/include/dist.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@
?DFLAG_HANDSHAKE_23)).

%% New mandatory flags in OTP 26
-define(MANDATORY_DFLAGS_26, (?DFLAG_V4_NC)).
-define(MANDATORY_DFLAGS_26, (?DFLAG_V4_NC bor
?DFLAG_UNLINK_ID)).

%% All mandatory flags
-define(DFLAGS_MANDATORY, (?MANDATORY_DFLAGS_25 bor
Expand Down
6 changes: 4 additions & 2 deletions lib/kernel/test/erl_distribution_wb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
-define(DFLAG_MAP_TAG, 16#20000).
-define(DFLAG_BIG_CREATION, 16#40000).
-define(DFLAG_HANDSHAKE_23, 16#1000000).
-define(DFLAG_UNLINK_ID, 16#2000000).
-define(DFLAG_MANDATORY_25_DIGEST, 16#4000000).
-define(DFLAG_V4_NC, 16#400000000).

Expand All @@ -79,10 +80,11 @@
?DFLAG_BIT_BINARIES bor
?DFLAG_HANDSHAKE_23)).

%% From OTP 26 V4_NC is compulsory.
%% From OTP 26 V4_NC and UNLINK_ID are compulsory.

-define(DFLAGS_MANDATORY_26,
(?DFLAG_V4_NC)).
(?DFLAG_V4_NC bor
?DFLAG_UNLINK_ID)).

-define(COMPULSORY_DFLAGS,
(?DFLAGS_MANDATORY_25 bor
Expand Down
Loading

0 comments on commit 92584b8

Please sign in to comment.