Skip to content

Commit

Permalink
[master] HELP-36065: don't repeat CDRs headers in csv (2600hz#4733)
Browse files Browse the repository at this point in the history
* resp_data could be undefined, be careful

* check if db list is emprt set chunking_finish to true when starting

query

* do not change chunking_started if resp_data is empty

* use fold to have context updated so csv header is not adding every time

* i don't like tofu

* one more check for undefined resp_data

* include erts for dev release (master version of  2600hz#4731)
  • Loading branch information
icehess authored and k-anderson committed Mar 29, 2018
1 parent 110f56b commit 95db9cc
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 23 deletions.
34 changes: 27 additions & 7 deletions applications/crossbar/src/api_resource.erl
Original file line number Diff line number Diff line change
Expand Up @@ -676,11 +676,15 @@ does_request_validate(Req, Context0) ->
{'false', Req, Context2};
'false' ->
lager:debug("failed to validate resource"),
Msg = case cb_context:resp_error_msg(Context2) of
'undefined' ->
Data = cb_context:resp_data(Context2),
Msg = case {cb_context:resp_error_msg(Context2)
,cb_context:resp_data(Context2)
}
of
{'undefined', 'undefined'} ->
<<"validation failed">>;
{'undefined', Data} ->
kz_json:get_value(<<"message">>, Data, <<"validation failed">>);
Message -> Message
{Message, _} -> Message
end,
api_util:stop(Req, cb_context:set_resp_error_msg(Context2, Msg))
end.
Expand Down Expand Up @@ -839,7 +843,11 @@ to_binary(Req, Context, 'undefined') ->
Event = api_util:create_event_name(Context, <<"to_binary">>),
_ = crossbar_bindings:map(Event, {Req, Context}),
%% Handle HTTP range header
case cb_context:req_header(Context, <<"range">>) of
case kz_term:is_ne_binary(RespData)
andalso cb_context:req_header(Context, <<"range">>)
of
'false' ->
{<<>>, api_util:set_resp_headers(Req, Context), Context};
'undefined' ->
{RespData, api_util:set_resp_headers(Req, Context), Context};
RangeHeader ->
Expand Down Expand Up @@ -944,8 +952,10 @@ to_pdf(Req, Context) ->
{Req1, Context1} = crossbar_bindings:fold(Event, {Req, Context}),
to_pdf(Req1, Context1, cb_context:resp_data(Context1)).

-spec to_pdf(cowboy_req:req(), cb_context:context(), binary()) ->
-spec to_pdf(cowboy_req:req(), cb_context:context(), kz_term:api_binary()) ->
{binary(), cowboy_req:req(), cb_context:context()}.
to_pdf(Req, Context, 'undefined') ->
to_pdf(Req, Context, kz_pdf:error_empty());
to_pdf(Req, Context, <<>>) ->
to_pdf(Req, Context, kz_pdf:error_empty());
to_pdf(Req, Context, RespData) ->
Expand Down Expand Up @@ -1047,12 +1057,22 @@ process_chunk(#{context := Context
of
'false' ->
finish_chunked_response(ChunkMap#{context => reset_context_between_chunks(Context, IsStarted)});
0 ->
next_chunk_fold(ChunkMap#{context => reset_context_between_chunks(Context, IsStarted)
,chunking_started => IsStarted
}
);
SentLength when is_integer(SentLength) ->
next_chunk_fold(ChunkMap#{context => reset_context_between_chunks(Context, 'true')
,chunking_started => 'true'
,previous_chunk_length => SentLength
}
);
[] ->
next_chunk_fold(ChunkMap#{context => reset_context_between_chunks(Context, IsStarted)
,chunking_started => IsStarted
}
);
Resp when is_list(Resp) ->
{StartedChunk, Req1} = send_chunk_response(ToFun, Req, Context),
next_chunk_fold(ChunkMap#{context => reset_context_between_chunks(Context, StartedChunk)
Expand All @@ -1063,7 +1083,7 @@ process_chunk(#{context := Context
);
_Other ->
lager:debug("event ~s returned unsupported chunk response, stopping here", [EventName]),
finish_chunked_response(ChunkMap#{context => reset_context_between_chunks(Context, IsStarted)}) %% TOFU: stop
finish_chunked_response(ChunkMap#{context => reset_context_between_chunks(Context, IsStarted)})
end.

-spec reset_context_between_chunks(cb_context:context(), boolean()) -> cb_context:context().
Expand Down
20 changes: 15 additions & 5 deletions applications/crossbar/src/api_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1218,8 +1218,10 @@ create_json_chunk_response(Req, Context) ->
JObjs = cb_context:resp_data(Context),
create_json_chunk_response(Req, JObjs, cb_context:fetch(Context, 'chunking_started', 'false')).

-spec create_json_chunk_response(cowboy_req:req(), kz_json:objects(), boolean()) ->
-spec create_json_chunk_response(cowboy_req:req(), kz_json:api_objects(), boolean()) ->
{boolean(), cowboy_req:req()}.
create_json_chunk_response(Req, 'undefined', StartedChunk) ->
{StartedChunk, Req};
create_json_chunk_response(Req, [], StartedChunk) ->
{StartedChunk, Req};
create_json_chunk_response(Req, JObjs, StartedChunk) ->
Expand Down Expand Up @@ -1249,12 +1251,20 @@ do_encode_to_json(JObjs) ->
-spec create_csv_chunk_response(cowboy_req:req(), cb_context:context()) ->
{boolean(), cowboy_req:req()}.
create_csv_chunk_response(Req, Context) ->
CSVs = cb_context:resp_data(Context),
case cb_context:fetch(Context, 'chunking_started', 'false') of
'true' ->
case {cb_context:resp_data(Context)
,cb_context:fetch(Context, 'chunking_started', 'false')
}
of
{'undefined', IsStarted} ->
{IsStarted, Req};
{<<>>, IsStarted} ->
{IsStarted, Req};
{[], IsStarted} ->
{IsStarted, Req};
{CSVs, 'true'} ->
'ok' = cowboy_req:stream_body(CSVs, 'nofin', Req),
{'true', Req};
'false' ->
{CSVs, 'false'} ->
FileName = csv_file_name(Context, ?DEFAULT_CSV_FILE_NAME),
Req1 = init_chunk_stream(Req, <<"to_csv">>, FileName),
'ok' = cowboy_req:stream_body(CSVs, 'nofin', Req1),
Expand Down
16 changes: 11 additions & 5 deletions applications/crossbar/src/crossbar_view.erl
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ next_chunk(#{options := #{last_key := OldLastKey}=LoadMap
,context => Context
,last_key => LastKey
}));
%% just query next_db if there are any databases left
%% only one database is left and it does not have any more result give, so request is completed.
next_chunk(#{options := #{databases := [_]}
,previous_chunk_length := PrevLength
,total_queried := TotalQueried
Expand All @@ -630,6 +630,7 @@ next_chunk(#{options := #{databases := [_]}
ChunkMap#{total_queried => TotalQueried + PrevLength
,chunking_finished => 'true'
};
%% just query next_db
next_chunk(#{options := #{databases := [_|RestDbs], last_key := LastKey}=LoadMap
,total_queried := TotalQueried
,previous_chunk_length := PrevLength
Expand All @@ -643,11 +644,16 @@ next_chunk(#{options := #{databases := [_|RestDbs], last_key := LastKey}=LoadMap
}));
%% starting chunked query
next_chunk(#{context := Context}=ChunkMap) ->
LoadMap = cb_context:fetch(Context, 'load_view_opts'),
lager:debug("(chunked) starting chunked query"),
chunk_map_roll_in(ChunkMap
,get_results(LoadMap#{context => cb_context:store(Context, 'load_view_opts', 'undefined')})
).
case cb_context:fetch(Context, 'load_view_opts') of
#{databases := []} ->
lager:debug("(chunked) databases exhausted"),
ChunkMap#{chunking_finished => 'true'};
#{}=LoadMap ->
chunk_map_roll_in(ChunkMap
,get_results(LoadMap#{context => cb_context:store(Context, 'load_view_opts', 'undefined')})
)
end.

-spec chunk_map_roll_in(map(), load_params()) -> map().
chunk_map_roll_in(#{last_key := OldLastKey}=ChunkMap
Expand Down
23 changes: 17 additions & 6 deletions applications/crossbar/src/modules/cb_cdrs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,14 @@ maybe_add_stale_to_options('false') ->[].
%%------------------------------------------------------------------------------
-spec load_chunked_cdrs(cb_context:context(), kz_term:ne_binary()) -> cb_context:context().
load_chunked_cdrs(Context, RespType) ->
load_chunked_cdrs(Context, RespType, cb_context:resp_data(Context)).

-spec load_chunked_cdrs(cb_context:context(), kz_term:ne_binary(), resp_data()) -> cb_context:context().
load_chunked_cdrs(Context, _, 'undefined') ->
Context;
load_chunked_cdrs(Context, RespType, RespData) ->
Fun = fun(JObj, Acc) -> split_to_modbs(cb_context:account_id(Context), kz_doc:id(JObj), Acc) end,
MapIds = lists:foldl(Fun, #{}, cb_context:resp_data(Context)),
MapIds = lists:foldl(Fun, #{}, RespData),
C1 = cb_context:set_resp_data(Context, []),
maps:fold(fun(Db, Ids, C) -> load_chunked_cdr_ids(C, RespType, Db, Ids) end, C1, MapIds).

Expand Down Expand Up @@ -363,10 +369,12 @@ load_chunked_cdr_ids(Context, RespType, Db, Ids) ->
crossbar_doc:handle_datamgr_errors(Reason, <<"load_cdrs">>, Context)
end.

-spec normalize_cdrs(cb_context:context(), kz_term:ne_binary(), kz_json:objects()) -> kz_json:objects() | kz_term:binaries().
normalize_cdrs(Context, <<"json">>, JObjs) ->
[normalize_cdr_to_jobj(JObj, Context) || JObj <- JObjs];
normalize_cdrs(Context, <<"csv">>, JObjs) ->
[normalize_cdr_to_csv(JObj, Context) || JObj <- JObjs].
{_, Data} = lists:foldl(fun(J, {C, Acc}) -> normalize_cdr_to_csv(J, C, Acc) end, {Context, []}, JObjs),
lists:reverse(Data).

%%------------------------------------------------------------------------------
%% @doc Normalize CDR in JSON
Expand All @@ -382,15 +390,18 @@ normalize_cdr_to_jobj(JObj, Context) ->
%% @doc Normalize CDR in CSV
%% @end
%%------------------------------------------------------------------------------
-spec normalize_cdr_to_csv(kz_json:object(), cb_context:context()) -> binary().
normalize_cdr_to_csv(JObj, Context) ->
-spec normalize_cdr_to_csv(kz_json:object(), cb_context:context(), kz_term:binaries()) -> {cb_context:context(), kz_term:binaries()}.
normalize_cdr_to_csv(JObj, Context, Acc) ->
Timestamp = kz_json:get_integer_value(<<"timestamp">>, JObj, 0),
CSV = kz_binary:join([F(JObj, Timestamp) || {_, F} <- csv_rows(Context)], <<",">>),
case cb_context:fetch(Context, 'chunking_started') of
'true' -> <<CSV/binary, "\r\n">>;
'true' ->
{Context, [<<CSV/binary, "\r\n">> | Acc]};
'false' ->
CSVHeader = kz_binary:join([K || {K, _Fun} <- csv_rows(Context)], <<",">>),
<<CSVHeader/binary, "\r\n", CSV/binary, "\r\n">>
{cb_context:store(Context, chunking_started, 'true')
,[<<CSVHeader/binary, "\r\n", CSV/binary, "\r\n">> | Acc]
}

end.

Expand Down
4 changes: 4 additions & 0 deletions rel/dev.sys.config
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
,{formatter, lager_default_formatter}
,{formatter_config, [time, " ", color, "[",severity,"] ", "|", {callid, <<"0000000000">>}, "|", module, ":", line, "(",pid, ") ", {log_prefix, ["[", log_prefix, "] "], ""}, "\e[0m", message, "\n"]}
]}
,{lager_file_backend, [{file, "log/debug.log"}, {level, debug}, {size, 10485760}, {date, "$D0"}, {count, 5}
,{formatter, lager_default_formatter}
,{formatter_config, [time," [",severity,"] ", "|", {callid, <<"0000000000">>}, "|", module, ":", line, "(",pid, ") ", {log_prefix, ["[", log_prefix, "] "], ""}, message, "\n"]}
]}
]}
,{extra_sinks, [{data_lager_event,[]}
,{amqp_lager_event,[]}
Expand Down

0 comments on commit 95db9cc

Please sign in to comment.