forked from sky-big/RabbitMQ
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rabbit_msg_file.erl
139 lines (121 loc) · 5.8 KB
/
rabbit_msg_file.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
%%
-module(rabbit_msg_file).
-export([append/3, read/2, scan/4]).
%%----------------------------------------------------------------------------
-include("rabbit_msg_store.hrl").
-define(INTEGER_SIZE_BYTES, 8).
-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
-define(WRITE_OK_SIZE_BITS, 8).
-define(WRITE_OK_MARKER, 255).
-define(FILE_PACKING_ADJUSTMENT, (1 + ?INTEGER_SIZE_BYTES)).
-define(MSG_ID_SIZE_BYTES, 16).
-define(MSG_ID_SIZE_BITS, (8 * ?MSG_ID_SIZE_BYTES)).
-define(SCAN_BLOCK_SIZE, 4194304). %% 4MB
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-type(io_device() :: any()).
-type(position() :: non_neg_integer()).
-type(msg_size() :: non_neg_integer()).
-type(file_size() :: non_neg_integer()).
-type(message_accumulator(A) ::
fun (({rabbit_types:msg_id(), msg_size(), position(), binary()}, A) ->
A)).
-spec(append/3 :: (io_device(), rabbit_types:msg_id(), msg()) ->
rabbit_types:ok_or_error2(msg_size(), any())).
-spec(read/2 :: (io_device(), msg_size()) ->
rabbit_types:ok_or_error2({rabbit_types:msg_id(), msg()},
any())).
-spec(scan/4 :: (io_device(), file_size(), message_accumulator(A), A) ->
{'ok', A, position()}).
-endif.
%%----------------------------------------------------------------------------
%% 将消息ID和消息内容经过组装后放到FileHdl句柄对应的文件的最后面
append(FileHdl, MsgId, MsgBody)
when is_binary(MsgId) andalso size(MsgId) =:= ?MSG_ID_SIZE_BYTES ->
MsgBodyBin = term_to_binary(MsgBody),
MsgBodyBinSize = size(MsgBodyBin),
%% 得到消息实体和消息ID占用的大小
Size = MsgBodyBinSize + ?MSG_ID_SIZE_BYTES,
%% 实际的将消息ID和消息内容(msg_status结构)存入磁盘文件
case file_handle_cache:append(FileHdl,
<<Size:?INTEGER_SIZE_BITS,
MsgId:?MSG_ID_SIZE_BYTES/binary,
MsgBodyBin:MsgBodyBinSize/binary,
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of
%% 成功后返回该消息占用的实际大小(64个字节保存消息ID长度 + 消息内容长度的和 + 消息ID + 消息内容 + 255结束标志(占用一个字节))
ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT};
KO -> KO
end.
%% 从FileHdl文件句柄读取TotalSize大小的二进制数据,然后将二进制数据转化为消息结构
read(FileHdl, TotalSize) ->
%% 减去存储消息大小的8个字节和结束标志位一位得到消息ID和消息体的字节大小
Size = TotalSize - ?FILE_PACKING_ADJUSTMENT,
%% 得到消息体的实际大小
BodyBinSize = Size - ?MSG_ID_SIZE_BYTES,
%% 实际从文件句柄读取消息
case file_handle_cache:read(FileHdl, TotalSize) of
{ok, <<Size:?INTEGER_SIZE_BITS,
MsgId:?MSG_ID_SIZE_BYTES/binary,
MsgBodyBin:BodyBinSize/binary,
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} ->
%% 读取成功
{ok, {MsgId, binary_to_term(MsgBodyBin)}};
KO -> KO
end.
%% 扫描消息存储文件
scan(FileHdl, FileSize, Fun, Acc) when FileSize >= 0 ->
scan(FileHdl, FileSize, <<>>, 0, 0, Fun, Acc).
scan(_FileHdl, FileSize, _Data, FileSize, ScanOffset, _Fun, Acc) ->
{ok, Acc, ScanOffset};
scan(FileHdl, FileSize, Data, ReadOffset, ScanOffset, Fun, Acc) ->
Read = lists:min([?SCAN_BLOCK_SIZE, (FileSize - ReadOffset)]),
case file_handle_cache:read(FileHdl, Read) of
{ok, Data1} ->
{Data2, Acc1, ScanOffset1} =
scanner(<<Data/binary, Data1/binary>>, ScanOffset, Fun, Acc),
ReadOffset1 = ReadOffset + size(Data1),
scan(FileHdl, FileSize, Data2, ReadOffset1, ScanOffset1, Fun, Acc1);
_KO ->
{ok, Acc, ScanOffset}
end.
%% 从存储在磁盘文件中的二进制转化为RabbitMQ能够识别的数据
scanner(<<>>, Offset, _Fun, Acc) ->
{<<>>, Acc, Offset};
scanner(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Offset, _Fun, Acc) ->
{<<>>, Acc, Offset}; %% Nothing to do other than stop.
scanner(<<Size:?INTEGER_SIZE_BITS, MsgIdAndMsg:Size/binary,
WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Offset, Fun, Acc) ->
%% TotalSize = 消息ID的16位 + 消息实体长度 + 存储Size的8字节 + 结尾的标志符一个字节
TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
case WriteMarker of
?WRITE_OK_MARKER ->
%% Here we take option 5 from
%% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in
%% which we read the MsgId as a number, and then convert it
%% back to a binary in order to work around bugs in
%% Erlang's GC.
<<MsgIdNum:?MSG_ID_SIZE_BITS, Msg/binary>> =
<<MsgIdAndMsg:Size/binary>>,
<<MsgId:?MSG_ID_SIZE_BYTES/binary>> =
<<MsgIdNum:?MSG_ID_SIZE_BITS>>,
scanner(Rest, Offset + TotalSize, Fun,
Fun({MsgId, TotalSize, Offset, Msg}, Acc));
_ ->
scanner(Rest, Offset + TotalSize, Fun, Acc)
end;
scanner(Data, Offset, _Fun, Acc) ->
{Data, Acc, Offset}.