Skip to content

Commit

Permalink
Add surface of trim to scrub
Browse files Browse the repository at this point in the history
  • Loading branch information
kuenishi committed Oct 13, 2015
1 parent 4d0019f commit dfe953b
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 4 deletions.
24 changes: 20 additions & 4 deletions src/machi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ message Mpb_ErrorResp {
// append_chunk() : Mpb_AppendChunkReq and Mpb_AppendChunkResp
// write_chunk() : Mpb_WriteChunkReq and Mpb_WriteChunkResp
// read_chunk() : Mpb_ReadChunkReq and Mpb_ReadChunkResp
// trim_chunk() : Mpb_TrimCHunkReq and Mpb_TrimChunkResp
// checksum_list() : Mpb_ChecksumListReq and Mpb_ChecksumListResp
// list_files() : Mpb_ListFilesReq and Mpb_ListFilesResp
//
Expand Down Expand Up @@ -209,6 +210,19 @@ message Mpb_ReadChunkResp {
optional Mpb_ChunkCSum csum = 3;
}


// High level API: trim_chunk() request & response

message Mpb_TrimChunkReq {
required string file = 1;
required uint64 offset = 2;
required uint32 size = 3;
}

message Mpb_TrimChunkResp {
required Mpb_GeneralStatusCode status = 1;
}

// High level API: checksum_list() request & response

message Mpb_ChecksumListReq {
Expand Down Expand Up @@ -258,8 +272,9 @@ message Mpb_Request {
optional Mpb_AppendChunkReq append_chunk = 112;
optional Mpb_WriteChunkReq write_chunk = 113;
optional Mpb_ReadChunkReq read_chunk = 114;
optional Mpb_ChecksumListReq checksum_list = 115;
optional Mpb_ListFilesReq list_files = 116;
optional Mpb_TrimChunkReq trim_chunk = 115;
optional Mpb_ChecksumListReq checksum_list = 116;
optional Mpb_ListFilesReq list_files = 117;
}

message Mpb_Response {
Expand All @@ -281,8 +296,9 @@ message Mpb_Response {
optional Mpb_AppendChunkResp append_chunk = 12;
optional Mpb_WriteChunkResp write_chunk = 13;
optional Mpb_ReadChunkResp read_chunk = 14;
optional Mpb_ChecksumListResp checksum_list = 15;
optional Mpb_ListFilesResp list_files = 16;
optional Mpb_TrimChunkResp trim_chunk = 15;
optional Mpb_ChecksumListResp checksum_list = 16;
optional Mpb_ListFilesResp list_files = 17;
}

//////////////////////////////////////////
Expand Down
13 changes: 13 additions & 0 deletions src/machi_cr_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
append_chunk_extra/4, append_chunk_extra/5,
write_chunk/4, write_chunk/5,
read_chunk/4, read_chunk/5,
trim_chunk/4, trim_chunk/5,
checksum_list/2, checksum_list/3,
list_files/1, list_files/2,

Expand Down Expand Up @@ -209,6 +210,18 @@ read_chunk(PidSpec, File, Offset, Size, Timeout0) ->
gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size, TO}},
Timeout).

%% @doc Trim a chunk of data of size `Size' from `File' at `Offset'.

trim_chunk(PidSpec, File, Offset, Size) ->
trim_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT).

%% @doc Trim a chunk of data of size `Size' from `File' at `Offset'.

trim_chunk(PidSpec, File, Offset, Size, Timeout0) ->
{TO, Timeout} = timeout(Timeout0),
gen_server:call(PidSpec, {req, {trim_chunk, File, Offset, Size, TO}},
Timeout).

%% @doc Fetch the list of chunk checksums for `File'.

checksum_list(PidSpec, File) ->
Expand Down
5 changes: 5 additions & 0 deletions src/machi_flu1.erl
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,11 @@ do_pb_hl_request2({high_read_chunk, File, Offset, Size},
#state{high_clnt=Clnt}=S) ->
Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size),
{Res, S};
do_pb_hl_request2({high_trim_chunk, File, Offset, Size},
#state{high_clnt=Clnt}=S) ->
%% Res = machi_cr_client:trim_chunk(Clnt, File, Offset, Size),
%% {Res, S};
{{error, bad_joss}, S};
do_pb_hl_request2({high_checksum_list, File}, #state{high_clnt=Clnt}=S) ->
Res = machi_cr_client:checksum_list(Clnt, File),
{Res, S};
Expand Down
36 changes: 36 additions & 0 deletions src/machi_pb_high_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
append_chunk/6, append_chunk/7,
write_chunk/5, write_chunk/6,
read_chunk/4, read_chunk/5,
trim_chunk/4, trim_chunk/5,
checksum_list/2, checksum_list/3,
list_files/1, list_files/2
]).
Expand Down Expand Up @@ -99,6 +100,12 @@ read_chunk(PidSpec, File, Offset, Size) ->
read_chunk(PidSpec, File, Offset, Size, Timeout) ->
send_sync(PidSpec, {read_chunk, File, Offset, Size}, Timeout).

trim_chunk(PidSpec, File, Offset, Size) ->
trim_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT).

trim_chunk(PidSpec, File, Offset, Size, Timeout) ->
send_sync(PidSpec, {trim_chunk, File, Offset, Size}, Timeout).

checksum_list(PidSpec, File) ->
checksum_list(PidSpec, File, ?DEFAULT_TIMEOUT).

Expand Down Expand Up @@ -298,6 +305,30 @@ do_send_sync2({read_chunk, File, Offset, Size},
Res = {bummer, {X, Y, erlang:get_stacktrace()}},
{Res, S#state{count=Count+1}}
end;
do_send_sync2({trim_chunk, File, Offset, Size},
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
try
ReqID = <<Index:64/big, Count:64/big>>,
Req = #mpb_trimchunkreq{file=File,
offset=Offset,
size=Size},
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
trim_chunk=Req},
Bin1a = machi_pb:encode_mpb_request(R1a),
ok = gen_tcp:send(Sock, Bin1a),
{ok, Bin1B} = gen_tcp:recv(Sock, 0),
case (catch machi_pb:decode_mpb_response(Bin1B)) of
#mpb_response{req_id=ReqID, trim_chunk=R} when R /= undefined ->
Result = convert_trim_chunk_resp(R),
{Result, S#state{count=Count+1}};
#mpb_response{req_id=ReqID, generic=G} when G /= undefined ->
#mpb_errorresp{code=Code, msg=Msg, extra=Extra} = G,
{{error, {Code, Msg, Extra}}, S#state{count=Count+1}}
end
catch X:Y ->
Res = {bummer, {X, Y, erlang:get_stacktrace()}},
{Res, S#state{count=Count+1}}
end;
do_send_sync2({checksum_list, File},
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
try
Expand Down Expand Up @@ -389,6 +420,11 @@ convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunk=Chunk}) ->
convert_read_chunk_resp(#mpb_readchunkresp{status=Status}) ->
convert_general_status_code(Status).

convert_trim_chunk_resp(#mpb_trimchunkresp{status='OK'}) ->
ok;
convert_trim_chunk_resp(#mpb_trimchunkresp{status=Status}) ->
convert_general_status_code(Status).

convert_checksum_list_resp(#mpb_checksumlistresp{status='OK', chunk=Chunk}) ->
{ok, Chunk};
convert_checksum_list_resp(#mpb_checksumlistresp{status=Status}) ->
Expand Down
18 changes: 18 additions & 0 deletions src/machi_pb_translate.erl
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ from_pb_request(#mpb_request{req_id=ReqID,
offset=Offset,
size=Size} = IR,
{ReqID, {high_read_chunk, File, Offset, Size}};
from_pb_request(#mpb_request{req_id=ReqID,
trim_chunk=IR=#mpb_trimchunkreq{}}) ->
#mpb_trimchunkreq{file=File,
offset=Offset,
size=Size} = IR,
{ReqID, {high_trim_chunk, File, Offset, Size}};
from_pb_request(#mpb_request{req_id=ReqID,
checksum_list=IR=#mpb_checksumlistreq{}}) ->
#mpb_checksumlistreq{file=File} = IR,
Expand Down Expand Up @@ -643,6 +649,18 @@ to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) ->
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb_response(ReqID, {high_trim_chunk, _File, _Offset, _Size}, Resp) ->
case Resp of
ok ->
#mpb_response{req_id=ReqID,
trim_chunk=#mpb_trimchunkresp{status='OK'}};
{error, _}=Error ->
Status = conv_from_status(Error),
#mpb_response{req_id=ReqID,
trim_chunk=#mpb_trimchunkresp{status=Status}};
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb_response(ReqID, {high_checksum_list, _File}, Resp) ->
case Resp of
{ok, Chunk} ->
Expand Down
10 changes: 10 additions & 0 deletions test/machi_pb_high_client_test.erl
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ smoke_test2() ->
{ok, [{File1Size,File1}]} = ?C:list_files(Clnt),
true = is_integer(File1Size),

[begin
%% ok = ?C:trim_chunk(Clnt, Fl, Off, Sz)
?assertMatch({bummer,
{throw,
{error, bad_joss_taipan_fixme},
_Boring_stack_trace}},
?C:trim_chunk(Clnt, Fl, Off, Sz))
end || {Ch, Fl, Off, Sz} <- Reads],
?debugVal(?C:list_files(Clnt)),

ok
after
(catch ?C:quit(Clnt))
Expand Down

0 comments on commit dfe953b

Please sign in to comment.