Skip to content

Commit

Permalink
Clean up internal protocol<->tuple mappings for correct epoch checking
Browse files Browse the repository at this point in the history
  • Loading branch information
slfritchie committed Dec 29, 2015
1 parent 5a65a16 commit e24acb7
Show file tree
Hide file tree
Showing 12 changed files with 90 additions and 104 deletions.
3 changes: 1 addition & 2 deletions src/machi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,7 @@ message Mpb_LL_TrimChunkResp {
// Low level API: checksum_list()

message Mpb_LL_ChecksumListReq {
required Mpb_EpochID epoch_id = 1;
required string file = 2;
required string file = 1;
}

message Mpb_LL_ChecksumListResp {
Expand Down
4 changes: 2 additions & 2 deletions src/machi_admin_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ verify_file_checksums_remote2(Sock1, EpochID, File) ->
end,
verify_file_checksums_common(Sock1, EpochID, File, ReadChunk).

verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) ->
verify_file_checksums_common(Sock1, _EpochID, File, ReadChunk) ->
try
case ?FLU_C:checksum_list(Sock1, EpochID, File) of
case ?FLU_C:checksum_list(Sock1, File) of
{ok, InfoBin} ->
Info = machi_csum_table:split_checksum_list_blob_decode(InfoBin),
Res = lists:foldl(verify_chunk_checksum(File, ReadChunk),
Expand Down
4 changes: 2 additions & 2 deletions src/machi_chain_repair.erl
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ make_repair_compare_fun(SrcFLU) ->
T_a =< T_b
end.

make_repair_directives(ConsistencyMode, RepairMode, File, Size, EpochID,
make_repair_directives(ConsistencyMode, RepairMode, File, Size, _EpochID,
Verb, Src, FLUs0, ProxiesDict, ETS) ->
true = (Size < ?MAX_OFFSET),
FLUs = lists:usort(FLUs0),
Expand All @@ -216,7 +216,7 @@ make_repair_directives(ConsistencyMode, RepairMode, File, Size, EpochID,
Proxy = orddict:fetch(FLU, ProxiesDict),
OffSzCs =
case machi_proxy_flu1_client:checksum_list(
Proxy, EpochID, File, ?LONG_TIMEOUT) of
Proxy, File, ?LONG_TIMEOUT) of
{ok, InfoBin} ->
machi_csum_table:split_checksum_list_blob_decode(InfoBin);
{error, no_such_file} ->
Expand Down
4 changes: 2 additions & 2 deletions src/machi_cr_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -895,9 +895,9 @@ do_checksum_list(File, Depth, STime, TO, #state{proj=P}=S) ->
end.

do_checksum_list2(File, Depth, STime, TO,
#state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) ->
#state{proj=P, proxies_dict=PD}=S) ->
Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD),
case ?FLU_PC:checksum_list(Proxy, EpochID, File, ?TIMEOUT) of
case ?FLU_PC:checksum_list(Proxy, File, ?TIMEOUT) of
{ok, _}=OK ->
{reply, OK, S};
{error, Retry}
Expand Down
32 changes: 17 additions & 15 deletions src/machi_flu1_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
append_chunk/6, append_chunk/7,
append_chunk/8, append_chunk/9,
read_chunk/7, read_chunk/8,
checksum_list/3, checksum_list/4,
checksum_list/2, checksum_list/3,
list_files/2, list_files/3,
wedge_status/1, wedge_status/2,

Expand Down Expand Up @@ -180,12 +180,12 @@ io:format(user, "dbgyo ~s LINE ~p NSInfo0 ~p NSInfo ~p\n", [?MODULE, ?LINE, NSIn

%% @doc Fetch the list of chunk checksums for `File'.

-spec checksum_list(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name()) ->
-spec checksum_list(port_wrap(), machi_dt:file_name()) ->
{ok, binary()} |
{error, machi_dt:error_general() | 'no_such_file' | 'partial_read'} |
{error, term()}.
checksum_list(Sock, EpochID, File) ->
checksum_list2(Sock, EpochID, File).
checksum_list(Sock, File) ->
checksum_list2(Sock, File).

%% @doc Fetch the list of chunk checksums for `File'.
%%
Expand All @@ -209,13 +209,13 @@ checksum_list(Sock, EpochID, File) ->
%% Details of the encoding used inside the `binary()' blog can be found
%% in the EDoc comments for {@link machi_flu1:decode_csum_file_entry/1}.

-spec checksum_list(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(), machi_dt:file_name()) ->
-spec checksum_list(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:file_name()) ->
{ok, binary()} |
{error, machi_dt:error_general() | 'no_such_file'} | {error, term()}.
checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
checksum_list(Host, TcpPort, File) when is_integer(TcpPort) ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
checksum_list2(Sock, EpochID, File)
checksum_list2(Sock, File)
after
disconnect(Sock)
end.
Expand Down Expand Up @@ -567,9 +567,11 @@ append_chunk2(Sock, NSInfo, EpochID,
machi_util:unmake_tagged_csum(CSum0)
end,
#ns_info{version=NSVersion, name=NS, locator=NSLocator} = NSInfo,
%% NOTE: The tuple position of NSLocator is a bit odd, because EpochID
%% _must_ be in the 4th position (as NSV & NS must be in 2nd & 3rd).
Req = machi_pb_translate:to_pb_request(
ReqID,
{low_append_chunk, NSVersion, NS, NSLocator, EpochID,
{low_append_chunk, NSVersion, NS, EpochID, NSLocator,
Prefix, Chunk, CSum_tag, CSum, Opts}),
do_pb_request_common(Sock, ReqID, Req, true, Timeout).

Expand All @@ -594,37 +596,37 @@ write_chunk2(Sock, NSInfo, EpochID, File0, Offset, Chunk0) ->
list2(Sock, EpochID) ->
ReqID = <<"id">>,
Req = machi_pb_translate:to_pb_request(
ReqID, {low_list_files, EpochID}),
ReqID, {low_skip_wedge, {low_list_files, EpochID}}),
do_pb_request_common(Sock, ReqID, Req).

wedge_status2(Sock) ->
ReqID = <<"id">>,
Req = machi_pb_translate:to_pb_request(
ReqID, {low_wedge_status, undefined}),
ReqID, {low_skip_wedge, {low_wedge_status}}),
do_pb_request_common(Sock, ReqID, Req).

echo2(Sock, Message) ->
ReqID = <<"id">>,
Req = machi_pb_translate:to_pb_request(
ReqID, {low_echo, undefined, Message}),
ReqID, {low_skip_wedge, {low_echo, Message}}),
do_pb_request_common(Sock, ReqID, Req).

checksum_list2(Sock, EpochID, File) ->
checksum_list2(Sock, File) ->
ReqID = <<"id">>,
Req = machi_pb_translate:to_pb_request(
ReqID, {low_checksum_list, EpochID, File}),
ReqID, {low_skip_wedge, {low_checksum_list, File}}),
do_pb_request_common(Sock, ReqID, Req).

delete_migration2(Sock, EpochID, File) ->
ReqID = <<"id">>,
Req = machi_pb_translate:to_pb_request(
ReqID, {low_delete_migration, EpochID, File}),
ReqID, {low_skip_wedge, {low_delete_migration, EpochID, File}}),
do_pb_request_common(Sock, ReqID, Req).

trunc_hack2(Sock, EpochID, File) ->
ReqID = <<"id-trunc">>,
Req = machi_pb_translate:to_pb_request(
ReqID, {low_trunc_hack, EpochID, File}),
ReqID, {low_skip_wedge, {low_trunc_hack, EpochID, File}}),
do_pb_request_common(Sock, ReqID, Req).

get_latest_epochid2(Sock, ProjType) ->
Expand Down
46 changes: 18 additions & 28 deletions src/machi_flu1_net_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -209,31 +209,31 @@ do_pb_ll_request(#mpb_ll_request{req_id=ReqID}, #state{pb_mode=high}=S) ->
{machi_pb_translate:to_pb_response(ReqID, unused, Result), S};
do_pb_ll_request(PB_request, S) ->
Req = machi_pb_translate:from_pb_request(PB_request),
%% io:format(user, "[~w] do_pb_ll_request Req: ~w~n", [S#state.flu_name, Req]),
{ReqID, Cmd, Result, S2} =
case Req of
{RqID, {LowCmd, _}=Cmd0}
when LowCmd =:= low_proj;
LowCmd =:= low_wedge_status;
LowCmd =:= low_list_files ->
{RqID, {low_skip_wedge, LowSubCmd}=Cmd0} ->
%% Skip wedge check for these unprivileged commands
{Rs, NewS} = do_pb_ll_request3(LowSubCmd, S),
{RqID, Cmd0, Rs, NewS};
{RqID, {low_proj, _LowSubCmd}=Cmd0} ->
{Rs, NewS} = do_pb_ll_request3(Cmd0, S),
{RqID, Cmd0, Rs, NewS};
{RqID, Cmd0} ->
io:format(user, "TODO: epoch at pos 2 in tuple is probably broken now, whee: ~p\n", [Cmd0]),
EpochID = element(2, Cmd0), % by common convention
{Rs, NewS} = do_pb_ll_request2(EpochID, Cmd0, S),
%% All remaining must have NSVersion, NS, & EpochID at next pos
NSVersion = element(2, Cmd0),
NS = element(3, Cmd0),
EpochID = element(4, Cmd0),
{Rs, NewS} = do_pb_ll_request2(NSVersion, NS, EpochID, Cmd0, S),
{RqID, Cmd0, Rs, NewS}
end,
{machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}.

do_pb_ll_request2(EpochID, CMD, S) ->
do_pb_ll_request2(NSVersion, NS, EpochID, CMD, S) ->
{Wedged_p, CurrentEpochID} = lookup_epoch(S),
%% io:format(user, "{Wedged_p, CurrentEpochID}: ~w~n", [{Wedged_p, CurrentEpochID}]),
if Wedged_p == true ->
if not is_tuple(EpochID) orelse tuple_size(EpochID) /= 2 ->
exit({bad_epoch_id, EpochID, for, CMD});
Wedged_p == true ->
{{error, wedged}, S#state{epoch_id=CurrentEpochID}};
is_tuple(EpochID)
andalso
EpochID /= CurrentEpochID ->
{Epoch, _} = EpochID,
{CurrentEpoch, _} = CurrentEpochID,
Expand All @@ -259,30 +259,20 @@ do_pb_ll_request2b(CMD, S) ->
do_pb_ll_request3(CMD, S).

%% Witness status does not matter below.
do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) ->
do_pb_ll_request3({low_echo, Msg}, S) ->
{Msg, S};
do_pb_ll_request3({low_auth, _BogusEpochID, _User, _Pass}, S) ->
do_pb_ll_request3({low_auth, _User, _Pass}, S) ->
{-6, S};
do_pb_ll_request3({low_wedge_status, _EpochID}, S) ->
do_pb_ll_request3({low_wedge_status}, S) ->
{do_server_wedge_status(S), S};
do_pb_ll_request3({low_proj, PCMD}, S) ->
{do_server_proj_request(PCMD, S), S};

%% Witness status *matters* below
do_pb_ll_request3({low_append_chunk, NSVersion, NS, NSLocator, EpochID,
do_pb_ll_request3({low_append_chunk, NSVersion, NS, EpochID, NSLocator,
Prefix, Chunk, CSum_tag,
CSum, Opts},
#state{witness=false}=S) ->
%% io:format(user, "
%% append_chunk namespace_version=~p
%% namespace=~p
%% locator=~p
%% epoch_id=~p
%% prefix=~p
%% chunk=~p
%% csum={~p,~p}
%% opts=~p\n",
%% [NSVersion, NS, NSLocator, EpochID, Prefix, Chunk, CSum_tag, CSum, Opts]),
NSInfo = #ns_info{version=NSVersion, name=NS, locator=NSLocator},
{do_server_append_chunk(NSInfo, EpochID,
Prefix, Chunk, CSum_tag, CSum,
Expand All @@ -297,7 +287,7 @@ do_pb_ll_request3({low_read_chunk, _NSVersion, _NS, _EpochID, File, Offset, Size
do_pb_ll_request3({low_trim_chunk, _NSVersion, _NS, _EpochID, File, Offset, Size, TriggerGC},
#state{witness=false}=S) ->
{do_server_trim_chunk(File, Offset, Size, TriggerGC, S), S};
do_pb_ll_request3({low_checksum_list, _EpochID, File},
do_pb_ll_request3({low_checksum_list, File},
#state{witness=false}=S) ->
{do_server_checksum_listing(File, S), S};
do_pb_ll_request3({low_list_files, _EpochID},
Expand Down
Loading

0 comments on commit e24acb7

Please sign in to comment.