summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit.hrl2
-rw-r--r--include/rabbit_backing_queue_type_spec.hrl4
-rw-r--r--include/rabbit_msg_store.hrl2
-rw-r--r--include/rabbit_msg_store_index.hrl6
-rw-r--r--src/rabbit_msg_file.erl32
-rw-r--r--src/rabbit_msg_store.erl46
-rw-r--r--src/rabbit_msg_store_ets_index.erl2
-rw-r--r--src/rabbit_queue_index.erl34
-rw-r--r--src/rabbit_tests.erl28
-rw-r--r--src/rabbit_variable_queue.erl32
10 files changed, 94 insertions, 94 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 4b1be43cda..982d90e909 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -89,7 +89,7 @@
%% this is really an abstract type, but dialyzer does not support them
-type(guid() :: binary()).
--type(msg_id() :: guid()).
+-type(msg_id() :: non_neg_integer()).
-type(txn() :: guid()).
-type(pkey() :: guid()).
-type(r(Kind) ::
diff --git a/include/rabbit_backing_queue_type_spec.hrl b/include/rabbit_backing_queue_type_spec.hrl
index 54118ba640..ac47ccba3d 100644
--- a/include/rabbit_backing_queue_type_spec.hrl
+++ b/include/rabbit_backing_queue_type_spec.hrl
@@ -41,8 +41,8 @@
state()}).
-spec(ack/2 :: ([ack()], state()) -> state()).
-spec(tx_publish/2 :: (basic_message(), state()) -> state()).
--spec(tx_rollback/2 :: ([msg_id()], state()) -> state()).
--spec(tx_commit/4 :: ([msg_id()], [ack()], {pid(), any()}, state()) ->
+-spec(tx_rollback/2 :: ([guid()], state()) -> state()).
+-spec(tx_commit/4 :: ([guid()], [ack()], {pid(), any()}, state()) ->
{boolean(), state()}).
-spec(requeue/2 :: ([{basic_message(), ack()}], state()) -> state()).
-spec(len/1 :: (state()) -> non_neg_integer()).
diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl
index 696ccf3cc6..d96fa758bd 100644
--- a/include/rabbit_msg_store.hrl
+++ b/include/rabbit_msg_store.hrl
@@ -38,4 +38,4 @@
-endif.
-record(msg_location,
- {msg_id, ref_count, file, offset, total_size}).
+ {guid, ref_count, file, offset, total_size}).
diff --git a/include/rabbit_msg_store_index.hrl b/include/rabbit_msg_store_index.hrl
index 9b3332eed0..eb0ad5cb68 100644
--- a/include/rabbit_msg_store_index.hrl
+++ b/include/rabbit_msg_store_index.hrl
@@ -43,13 +43,13 @@
-spec(init/2 :: (('fresh'|'recover'), dir()) ->
{'fresh'|'recovered', index_state()}).
--spec(lookup/2 :: (msg_id(), index_state()) -> ('not_found' | keyvalue())).
+-spec(lookup/2 :: (guid(), index_state()) -> ('not_found' | keyvalue())).
-spec(insert/2 :: (keyvalue(), index_state()) -> 'ok').
-spec(update/2 :: (keyvalue(), index_state()) -> 'ok').
--spec(update_fields/3 :: (msg_id(), ({fieldpos(), fieldvalue()} |
+-spec(update_fields/3 :: (guid(), ({fieldpos(), fieldvalue()} |
[{fieldpos(), fieldvalue()}]),
index_state()) -> 'ok').
--spec(delete/2 :: (msg_id(), index_state()) -> 'ok').
+-spec(delete/2 :: (guid(), index_state()) -> 'ok').
-spec(delete_by_file/2 :: (fieldvalue(), index_state()) -> 'ok').
-spec(terminate/1 :: (index_state()) -> any()).
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index 2c7ea89302..0edeb4698d 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -40,9 +40,9 @@
-define(WRITE_OK_SIZE_BITS, 8).
-define(WRITE_OK_MARKER, 255).
-define(FILE_PACKING_ADJUSTMENT, (1 + ?INTEGER_SIZE_BYTES)).
--define(MSG_ID_SIZE_BYTES, 16).
--define(MSG_ID_SIZE_BITS, (8 * ?MSG_ID_SIZE_BYTES)).
--define(SIZE_AND_MSG_ID_BYTES, (?MSG_ID_SIZE_BYTES + ?INTEGER_SIZE_BYTES)).
+-define(GUID_SIZE_BYTES, 16).
+-define(GUID_SIZE_BITS, (8 * ?GUID_SIZE_BYTES)).
+-define(SIZE_AND_GUID_BYTES, (?GUID_SIZE_BYTES + ?INTEGER_SIZE_BYTES)).
%%----------------------------------------------------------------------------
@@ -53,25 +53,25 @@
-type(position() :: non_neg_integer()).
-type(msg_size() :: non_neg_integer()).
--spec(append/3 :: (io_device(), msg_id(), msg()) ->
+-spec(append/3 :: (io_device(), guid(), msg()) ->
({'ok', msg_size()} | {'error', any()})).
-spec(read/2 :: (io_device(), msg_size()) ->
- ({'ok', {msg_id(), msg()}} | {'error', any()})).
+ ({'ok', {guid(), msg()}} | {'error', any()})).
-spec(scan/1 :: (io_device()) ->
- {'ok', [{msg_id(), msg_size(), position()}], position()}).
+ {'ok', [{guid(), msg_size(), position()}], position()}).
-endif.
%%----------------------------------------------------------------------------
append(FileHdl, MsgId, MsgBody)
- when is_binary(MsgId) andalso size(MsgId) =< ?MSG_ID_SIZE_BYTES ->
+ when is_binary(MsgId) andalso size(MsgId) =< ?GUID_SIZE_BYTES ->
MsgBodyBin = term_to_binary(MsgBody),
MsgBodyBinSize = size(MsgBodyBin),
- Size = MsgBodyBinSize + ?MSG_ID_SIZE_BYTES,
+ Size = MsgBodyBinSize + ?GUID_SIZE_BYTES,
case file_handle_cache:append(FileHdl,
<<Size:?INTEGER_SIZE_BITS,
- MsgId:?MSG_ID_SIZE_BYTES/binary,
+ MsgId:?GUID_SIZE_BYTES/binary,
MsgBodyBin:MsgBodyBinSize/binary,
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of
ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT};
@@ -80,10 +80,10 @@ append(FileHdl, MsgId, MsgBody)
read(FileHdl, TotalSize) ->
Size = TotalSize - ?FILE_PACKING_ADJUSTMENT,
- BodyBinSize = Size - ?MSG_ID_SIZE_BYTES,
+ BodyBinSize = Size - ?GUID_SIZE_BYTES,
case file_handle_cache:read(FileHdl, TotalSize) of
{ok, <<Size:?INTEGER_SIZE_BITS,
- MsgId:?MSG_ID_SIZE_BYTES/binary,
+ MsgId:?GUID_SIZE_BYTES/binary,
MsgBodyBin:BodyBinSize/binary,
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} ->
{ok, {MsgId, binary_to_term(MsgBodyBin)}};
@@ -105,26 +105,26 @@ scan(FileHdl, Offset, Acc) ->
end.
read_next(FileHdl, Offset) ->
- case file_handle_cache:read(FileHdl, ?SIZE_AND_MSG_ID_BYTES) of
+ case file_handle_cache:read(FileHdl, ?SIZE_AND_GUID_BYTES) of
%% Here we take option 5 from
%% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in which
%% we read the MsgId as a number, and then convert it back to
%% a binary in order to work around bugs in Erlang's GC.
- {ok, <<Size:?INTEGER_SIZE_BITS, MsgIdNum:?MSG_ID_SIZE_BITS>>} ->
+ {ok, <<Size:?INTEGER_SIZE_BITS, MsgIdNum:?GUID_SIZE_BITS>>} ->
case Size of
0 -> eof; %% Nothing we can do other than stop
_ ->
TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
ExpectedAbsPos = Offset + TotalSize - 1,
case file_handle_cache:position(
- FileHdl, {cur, Size - ?MSG_ID_SIZE_BYTES}) of
+ FileHdl, {cur, Size - ?GUID_SIZE_BYTES}) of
{ok, ExpectedAbsPos} ->
NextOffset = ExpectedAbsPos + 1,
case file_handle_cache:read(FileHdl, 1) of
{ok,
<<?WRITE_OK_MARKER: ?WRITE_OK_SIZE_BITS>>} ->
- <<MsgId:?MSG_ID_SIZE_BYTES/binary>> =
- <<MsgIdNum:?MSG_ID_SIZE_BITS>>,
+ <<MsgId:?GUID_SIZE_BYTES/binary>> =
+ <<MsgIdNum:?GUID_SIZE_BITS>>,
{ok, {MsgId, TotalSize, NextOffset}};
{ok, _SomeOtherData} ->
{corrupted, NextOffset};
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 322cad87ce..1a7085a2bd 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -121,16 +121,16 @@
-spec(start_link/5 ::
(atom(), file_path(), [binary()] | 'undefined',
- (fun ((A) -> 'finished' | {msg_id(), non_neg_integer(), A})), A) ->
+ (fun ((A) -> 'finished' | {guid(), non_neg_integer(), A})), A) ->
{'ok', pid()} | 'ignore' | {'error', any()}).
--spec(write/4 :: (server(), msg_id(), msg(), client_msstate()) ->
+-spec(write/4 :: (server(), guid(), msg(), client_msstate()) ->
{'ok', client_msstate()}).
--spec(read/3 :: (server(), msg_id(), client_msstate()) ->
+-spec(read/3 :: (server(), guid(), client_msstate()) ->
{{'ok', msg()} | 'not_found', client_msstate()}).
--spec(contains/2 :: (server(), msg_id()) -> boolean()).
--spec(remove/2 :: (server(), [msg_id()]) -> 'ok').
--spec(release/2 :: (server(), [msg_id()]) -> 'ok').
--spec(sync/3 :: (server(), [msg_id()], fun (() -> any())) -> 'ok').
+-spec(contains/2 :: (server(), guid()) -> boolean()).
+-spec(remove/2 :: (server(), [guid()]) -> 'ok').
+-spec(release/2 :: (server(), [guid()]) -> 'ok').
+-spec(sync/3 :: (server(), [guid()], fun (() -> any())) -> 'ok').
-spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> 'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
-spec(client_init/2 :: (server(), binary()) -> client_msstate()).
@@ -153,7 +153,7 @@
%% The components:
%%
%% MsgLocation: this is a mapping from MsgId to #msg_location{}:
-%% {MsgId, RefCount, File, Offset, TotalSize}
+%% {Guid, RefCount, File, Offset, TotalSize}
%% By default, it's in ets, but it's also pluggable.
%% FileSummary: this is an ets table which contains:
%% {File, ValidTotalSize, ContiguousTop, Left, Right}
@@ -393,7 +393,7 @@ add_to_cache(CurFileCacheEts, MsgId, Msg) ->
end
end.
-client_read1(Server, #msg_location { msg_id = MsgId, file = File } =
+client_read1(Server, #msg_location { guid = MsgId, file = File } =
MsgLocation, Defer, CState =
#client_msstate { file_summary_ets = FileSummaryEts }) ->
case ets:lookup(FileSummaryEts, File) of
@@ -404,7 +404,7 @@ client_read1(Server, #msg_location { msg_id = MsgId, file = File } =
end.
client_read2(_Server, false, undefined,
- #msg_location { msg_id = MsgId, ref_count = RefCount }, Defer,
+ #msg_location { guid = MsgId, ref_count = RefCount }, Defer,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
dedup_cache_ets = DedupCacheEts }) ->
case ets:lookup(CurFileCacheEts, MsgId) of
@@ -420,7 +420,7 @@ client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) ->
%% the safest and simplest thing to do.
Defer();
client_read2(Server, false, _Right,
- #msg_location { msg_id = MsgId, ref_count = RefCount, file = File },
+ #msg_location { guid = MsgId, ref_count = RefCount, file = File },
Defer, CState =
#client_msstate { file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
@@ -631,7 +631,7 @@ handle_cast({write, MsgId, Msg},
{ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
{ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg),
ok = index_insert(#msg_location {
- msg_id = MsgId, ref_count = 1, file = CurFile,
+ guid = MsgId, ref_count = 1, file = CurFile,
offset = CurOffset, total_size = TotalSize },
State),
[#file_summary { valid_total_size = ValidTotalSize,
@@ -836,7 +836,7 @@ read_message(MsgId, From, State =
end
end.
-read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
+read_message1(From, #msg_location { guid = MsgId, ref_count = RefCount,
file = File, offset = Offset } = MsgLoc,
State = #msstate { current_file = CurFile,
current_file_handle = CurHdl,
@@ -874,7 +874,7 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
end
end.
-read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount,
+read_from_disk(#msg_location { guid = MsgId, ref_count = RefCount,
file = File, offset = Offset,
total_size = TotalSize }, State,
DedupCacheEts) ->
@@ -888,7 +888,7 @@ read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount,
throw({error, {misread, [{old_state, State},
{file_num, File},
{offset, Offset},
- {msg_id, MsgId},
+ {guid, MsgId},
{read, Rest},
{proc_dict, get()}
]}})
@@ -1176,7 +1176,7 @@ count_msg_refs(Gen, Seed, State) ->
{MsgId, Delta, Next} ->
ok = case index_lookup(MsgId, State) of
not_found ->
- index_insert(#msg_location { msg_id = MsgId,
+ index_insert(#msg_location { guid = MsgId,
ref_count = Delta },
State);
StoreEntry = #msg_location { ref_count = RefCount } ->
@@ -1202,9 +1202,9 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
NonTmpRelatedFileName = filename:rootname(TmpFileName) ++ ?FILE_EXTENSION,
true = lists:member(NonTmpRelatedFileName, FileNames),
{ok, UncorruptedMessagesTmp, MsgIdsTmp} =
- scan_file_for_valid_messages_msg_ids(Dir, TmpFileName),
+ scan_file_for_valid_messages_guids(Dir, TmpFileName),
{ok, UncorruptedMessages, MsgIds} =
- scan_file_for_valid_messages_msg_ids(Dir, NonTmpRelatedFileName),
+ scan_file_for_valid_messages_guids(Dir, NonTmpRelatedFileName),
%% 1) It's possible that everything in the tmp file is also in the
%% main file such that the main file is (prefix ++
%% tmpfile). This means that compaction failed immediately
@@ -1282,7 +1282,7 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
ok = file_handle_cache:delete(TmpHdl),
{ok, _MainMessages, MsgIdsMain} =
- scan_file_for_valid_messages_msg_ids(
+ scan_file_for_valid_messages_guids(
Dir, NonTmpRelatedFileName),
%% check that everything in MsgIds1 is in MsgIdsMain
true = is_sublist(MsgIds1, MsgIdsMain),
@@ -1297,7 +1297,7 @@ is_sublist(SmallerL, BiggerL) ->
is_disjoint(SmallerL, BiggerL) ->
lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL).
-scan_file_for_valid_messages_msg_ids(Dir, FileName) ->
+scan_file_for_valid_messages_guids(Dir, FileName) ->
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, FileName),
{ok, Messages, [MsgId || {MsgId, _TotalSize, _FileOffset} <- Messages]}.
@@ -1367,7 +1367,7 @@ build_index(Gatherer, Left, [File|Files], State) ->
build_index(Gatherer, File, Files, State).
build_index_worker(
- Gatherer, Guid, State = #msstate { dir = Dir }, Left, File, Files) ->
+ Gatherer, Ref, State = #msstate { dir = Dir }, Left, File, Files) ->
{ok, Messages, FileSize} =
scan_file_for_valid_messages(
Dir, filenum_to_name(File)),
@@ -1405,7 +1405,7 @@ build_index_worker(
contiguous_top = ContiguousTop, locked = false,
left = Left, right = Right, file_size = FileSize1,
readers = 0 }),
- ok = gatherer:finished(Gatherer, Guid).
+ ok = gatherer:finished(Gatherer, Ref).
%%----------------------------------------------------------------------------
%% garbage collection / compaction / aggregation -- internal
@@ -1660,7 +1660,7 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
Destination, {_FileSummaryEts, _Dir, Index, IndexState}) ->
{FinalOffset, BlockStart1, BlockEnd1} =
lists:foldl(
- fun (#msg_location { msg_id = MsgId, offset = Offset,
+ fun (#msg_location { guid = MsgId, offset = Offset,
total_size = TotalSize },
{CurOffset, BlockStart, BlockEnd}) ->
%% CurOffset is in the DestinationFile.
diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl
index d46212ba15..b4fb5ef174 100644
--- a/src/rabbit_msg_store_ets_index.erl
+++ b/src/rabbit_msg_store_ets_index.erl
@@ -45,7 +45,7 @@
init(fresh, Dir) ->
file:delete(filename:join(Dir, ?FILENAME)),
- Tid = ets:new(?MSG_LOC_NAME, [set, public, {keypos, #msg_location.msg_id}]),
+ Tid = ets:new(?MSG_LOC_NAME, [set, public, {keypos, #msg_location.guid}]),
{fresh, #state { table = Tid, dir = Dir }};
init(recover, Dir) ->
Path = filename:join(Dir, ?FILENAME),
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index a5583b8765..556c6968e4 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -143,10 +143,10 @@
-define(PUBLISH_PREFIX, 1).
-define(PUBLISH_PREFIX_BITS, 1).
--define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
--define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)).
+-define(GUID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
+-define(GUID_BITS, (?GUID_BYTES * 8)).
%% 16 bytes for md5sum + 2 for seq, bits and prefix
--define(PUBLISH_RECORD_LENGTH_BYTES, ?MSG_ID_BYTES + 2).
+-define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + 2).
%% 1 publish, 1 deliver, 1 ack per msg
-define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT *
@@ -199,14 +199,14 @@
{'undefined' | non_neg_integer(), binary(), binary(), [any()], qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(terminate_and_erase/1 :: (qistate()) -> qistate()).
--spec(write_published/4 :: (msg_id(), seq_id(), boolean(), qistate())
+-spec(write_published/4 :: (guid(), seq_id(), boolean(), qistate())
-> qistate()).
-spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()).
-spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(sync_seq_ids/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(flush_journal/1 :: (qistate()) -> qistate()).
-spec(read_segment_entries/2 :: (seq_id(), qistate()) ->
- {[{msg_id(), seq_id(), boolean(), boolean()}], qistate()}).
+ {[{guid(), seq_id(), boolean(), boolean()}], qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(segment_size/0 :: () -> non_neg_integer()).
-spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) ->
@@ -328,7 +328,7 @@ terminate_and_erase(State) ->
State1.
write_published(MsgId, SeqId, IsPersistent, State) when is_binary(MsgId) ->
- ?MSG_ID_BYTES = size(MsgId),
+ ?GUID_BYTES = size(MsgId),
{JournalHdl, State1} = get_journal_handle(State),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
@@ -501,20 +501,20 @@ queue_index_walker({[QueueName | QueueNames], Gatherer}) ->
[QueueName, Gatherer, Child]}),
queue_index_walker({QueueNames, Gatherer}).
-queue_index_walker_reader(QueueName, Gatherer, Guid) ->
+queue_index_walker_reader(QueueName, Gatherer, Ref) ->
State = blank_state(QueueName),
State1 = load_journal(State),
SegNums = all_segment_nums(State1),
- queue_index_walker_reader(Gatherer, Guid, State1, SegNums).
+ queue_index_walker_reader(Gatherer, Ref, State1, SegNums).
-queue_index_walker_reader(Gatherer, Guid, State, []) ->
+queue_index_walker_reader(Gatherer, Ref, State, []) ->
_State = terminate(false, [], State),
- ok = gatherer:finished(Gatherer, Guid);
-queue_index_walker_reader(Gatherer, Guid, State, [Seg | SegNums]) ->
+ ok = gatherer:finished(Gatherer, Ref);
+queue_index_walker_reader(Gatherer, Ref, State, [Seg | SegNums]) ->
SeqId = reconstruct_seq_id(Seg, 0),
{Messages, State1} = read_segment_entries(SeqId, State),
State2 = queue_index_walker_reader1(Gatherer, State1, Messages),
- queue_index_walker_reader(Gatherer, Guid, State2, SegNums).
+ queue_index_walker_reader(Gatherer, Ref, State2, SegNums).
queue_index_walker_reader1(_Gatherer, State, []) ->
State;
@@ -775,7 +775,7 @@ load_segment_entries(KeepAcks, Hdl, SegEntries, PubCount, AckCount) ->
IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
%% because we specify /binary, and binaries are complete
%% bytes, the size spec is in bytes, not bits.
- {ok, MsgId} = file_handle_cache:read(Hdl, ?MSG_ID_BYTES),
+ {ok, MsgId} = file_handle_cache:read(Hdl, ?GUID_BYTES),
SegEntries1 =
array:set(RelSeq,
{{MsgId, 1 == IsPersistentNum}, no_del, no_ack},
@@ -836,13 +836,13 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
?ACK_JPREFIX ->
load_journal_entries(add_to_journal(SeqId, ack, State));
_ ->
- case file_handle_cache:read(Hdl, ?MSG_ID_BYTES) of
- {ok, <<MsgIdNum:?MSG_ID_BITS>>} ->
+ case file_handle_cache:read(Hdl, ?GUID_BYTES) of
+ {ok, <<MsgIdNum:?GUID_BITS>>} ->
%% work around for binary data
%% fragmentation. See
%% rabbit_msg_file:read_next/2
- <<MsgId:?MSG_ID_BYTES/binary>> =
- <<MsgIdNum:?MSG_ID_BITS>>,
+ <<MsgId:?GUID_BYTES/binary>> =
+ <<MsgIdNum:?GUID_BITS>>,
Publish = {MsgId, case Prefix of
?PUB_PERSIST_JPREFIX -> true;
?PUB_TRANS_JPREFIX -> false
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 97590e663c..8eb129394d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1019,7 +1019,7 @@ stop_msg_store() ->
E -> {persistent, E}
end.
-msg_id_bin(X) ->
+guid_bin(X) ->
erlang:md5(term_to_binary(X)).
msg_store_contains(Atom, MsgIds) ->
@@ -1037,7 +1037,7 @@ msg_store_sync(MsgIds) ->
{sync, Ref} -> ok
after
10000 ->
- io:format("Sync from msg_store missing for msg_ids ~p~n", [MsgIds]),
+ io:format("Sync from msg_store missing for guids ~p~n", [MsgIds]),
throw(timeout)
end.
@@ -1060,7 +1060,7 @@ test_msg_store() ->
stop_msg_store(),
ok = start_msg_store_empty(),
Self = self(),
- MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)],
+ MsgIds = [guid_bin(M) || M <- lists:seq(1,100)],
{MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds),
%% check we don't contain any of the msgs we're about to publish
false = msg_store_contains(false, MsgIds),
@@ -1094,7 +1094,7 @@ test_msg_store() ->
{sync, MsgId} -> ok
after
10000 ->
- io:format("Sync from msg_store missing (msg_id: ~p)~n",
+ io:format("Sync from msg_store missing (guid: ~p)~n",
[MsgId]),
throw(timeout)
end
@@ -1147,10 +1147,10 @@ test_msg_store() ->
ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, MsgIds1stHalf),
%% restart empty
ok = stop_msg_store(),
- ok = start_msg_store_empty(), %% now safe to reuse msg_ids
+ ok = start_msg_store_empty(), %% now safe to reuse guids
%% push a lot of msgs in...
BigCount = 100000,
- MsgIdsBig = [msg_id_bin(X) || X <- lists:seq(1, BigCount)],
+ MsgIdsBig = [guid_bin(X) || X <- lists:seq(1, BigCount)],
Payload = << 0:65536 >>,
ok = rabbit_msg_store:client_terminate(
lists:foldl(
@@ -1170,19 +1170,19 @@ test_msg_store() ->
%% .., then 3s by 1...
ok = lists:foldl(
fun (MsgId, ok) ->
- rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [msg_id_bin(MsgId)])
+ rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [guid_bin(MsgId)])
end, ok, lists:seq(BigCount, 1, -3)),
%% .., then remove 3s by 2, from the young end first. This hits
%% GC (under 50% good data left, but no empty files. Must GC).
ok = lists:foldl(
fun (MsgId, ok) ->
- rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [msg_id_bin(MsgId)])
+ rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [guid_bin(MsgId)])
end, ok, lists:seq(BigCount-1, 1, -3)),
%% .., then remove 3s by 3, from the young end first. This hits
%% GC...
ok = lists:foldl(
fun (MsgId, ok) ->
- rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [msg_id_bin(MsgId)])
+ rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [guid_bin(MsgId)])
end, ok, lists:seq(BigCount-2, 1, -3)),
%% ensure empty
false = msg_store_contains(false, MsgIdsBig),
@@ -1212,12 +1212,12 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
{A, B, MSCStateEnd} =
lists:foldl(
fun (SeqId, {QiN, SeqIdsMsgIdsAcc, MSCStateN}) ->
- MsgId = rabbit_guid:guid(),
- QiM = rabbit_queue_index:write_published(MsgId, SeqId, Persistent,
+ Guid = rabbit_guid:guid(),
+ QiM = rabbit_queue_index:write_published(Guid, SeqId, Persistent,
QiN),
- {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, MsgId,
- MsgId, MSCStateN),
- {QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc], MSCStateM}
+ {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid,
+ Guid, MSCStateN),
+ {QiM, [{SeqId, Guid} | SeqIdsMsgIdsAcc], MSCStateM}
end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref)}, SeqIds),
ok = rabbit_msg_store:delete_client(MsgStore, Ref),
ok = rabbit_msg_store:client_terminate(MSCStateEnd),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 1c29c19367..f2e9c19c56 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -165,7 +165,7 @@
-record(msg_status,
{ msg,
- msg_id,
+ guid,
seq_id,
is_persistent,
is_delivered,
@@ -197,7 +197,7 @@
-type(bpqueue() :: any()).
-type(seq_id() :: non_neg_integer()).
--type(ack() :: {'ack_index_and_store', msg_id(), seq_id(), atom() | pid()}
+-type(ack() :: {'ack_index_and_store', guid(), seq_id(), atom() | pid()}
| 'ack_not_on_disk').
-type(delta() :: #delta { start_seq_id :: non_neg_integer(),
@@ -225,7 +225,7 @@
avg_ingress_rate :: float(),
rate_timestamp :: {integer(), integer(), integer()},
len :: non_neg_integer(),
- on_sync :: {[[ack()]], [[msg_id()]], [{pid(), any()}]},
+ on_sync :: {[[ack()]], [[guid()]], [{pid(), any()}]},
msg_store_clients :: {{any(), binary()}, {any(), binary()}},
persistent_store :: pid() | atom(),
persistent_count :: non_neg_integer(),
@@ -233,7 +233,7 @@
}).
-spec(tx_commit_post_msg_store/5 ::
- (boolean(), [msg_id()], [ack()], {pid(), any()}, state()) ->
+ (boolean(), [guid()], [ack()], {pid(), any()}, state()) ->
{boolean(), state()}).
-spec(tx_commit_index/1 :: (state()) -> {boolean(), state()}).
@@ -331,7 +331,7 @@ publish_delivered(Msg = #basic_message { guid = MsgId,
State1 = State #vqstate { out_counter = OutCount + 1,
in_counter = InCount + 1 },
MsgStatus = #msg_status {
- msg = Msg, msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent,
+ msg = Msg, guid = MsgId, seq_id = SeqId, is_persistent = IsPersistent,
is_delivered = true, msg_on_disk = false, index_on_disk = false },
{MsgStatus1, MSCState1} = maybe_write_msg_to_disk(PersistentStore, false,
MsgStatus, MSCState),
@@ -411,7 +411,7 @@ fetch(State =
{empty, _Q4} ->
fetch_from_q3_or_delta(State);
{{value, #msg_status {
- msg = Msg, msg_id = MsgId, seq_id = SeqId,
+ msg = Msg, guid = MsgId, seq_id = SeqId,
is_persistent = IsPersistent, is_delivered = IsDelivered,
msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }},
Q4a} ->
@@ -572,7 +572,7 @@ tx_publish(Msg = #basic_message { is_persistent = true, guid = MsgId },
State = #vqstate { msg_store_clients = MSCState,
persistent_store = PersistentStore }) ->
MsgStatus = #msg_status {
- msg = Msg, msg_id = MsgId, seq_id = undefined, is_persistent = true,
+ msg = Msg, guid = MsgId, seq_id = undefined, is_persistent = true,
is_delivered = false, msg_on_disk = false, index_on_disk = false },
{#msg_status { msg_on_disk = true }, MSCState1} =
maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState),
@@ -581,7 +581,7 @@ tx_publish(_Msg, State) ->
State.
tx_rollback(Pubs, State = #vqstate { persistent_store = PersistentStore }) ->
- ok = case persistent_msg_ids(Pubs) of
+ ok = case persistent_guids(Pubs) of
[] -> ok;
PP -> rabbit_msg_store:remove(PersistentStore, PP)
end,
@@ -591,7 +591,7 @@ tx_commit(Pubs, AckTags, From, State =
#vqstate { persistent_store = PersistentStore }) ->
%% If we are a non-durable queue, or we have no persistent pubs,
%% we can skip the msg_store loop.
- PersistentMsgIds = persistent_msg_ids(Pubs),
+ PersistentMsgIds = persistent_guids(Pubs),
IsTransientPubs = [] == PersistentMsgIds,
case IsTransientPubs orelse
?TRANSIENT_MSG_STORE == PersistentStore of
@@ -699,7 +699,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
Avg = 1000000 * ((Count + OCount) / timer:now_diff(Now, OThen)),
{Avg, {Then, Count}}.
-persistent_msg_ids(Pubs) ->
+persistent_guids(Pubs) ->
[MsgId || Obj = #basic_message { guid = MsgId } <- Pubs,
Obj #basic_message.is_persistent].
@@ -722,7 +722,7 @@ betas_from_segment_entries(List, SeqIdLimit, TransientThreshold, IndexState) ->
case SeqId < SeqIdLimit of
true ->
{[#msg_status { msg = undefined,
- msg_id = MsgId,
+ guid = MsgId,
seq_id = SeqId,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
@@ -852,7 +852,7 @@ remove_queue_entries(PersistentStore, Fold, Q, IndexState) ->
{Count, IndexState2}.
remove_queue_entries1(
- #msg_status { msg_id = MsgId, seq_id = SeqId,
+ #msg_status { guid = MsgId, seq_id = SeqId,
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
{PersistentStore, CountN, MsgIdsByStore, SeqIdsAcc, IndexStateN}) ->
@@ -889,7 +889,7 @@ fetch_from_q3_or_delta(State = #vqstate {
true = queue:is_empty(Q1), %% ASSERTION
{empty, State};
{{value, IndexOnDisk, MsgStatus = #msg_status {
- msg = undefined, msg_id = MsgId,
+ msg = undefined, guid = MsgId,
is_persistent = IsPersistent }}, Q3a} ->
{{ok, Msg = #basic_message { is_persistent = IsPersistent,
guid = MsgId }}, MSCState1} =
@@ -983,7 +983,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId },
#vqstate { next_seq_id = SeqId, len = Len, in_counter = InCount,
persistent_count = PCount }) ->
MsgStatus = #msg_status {
- msg = Msg, msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent,
+ msg = Msg, guid = MsgId, seq_id = SeqId, is_persistent = IsPersistent,
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
index_on_disk = false },
PCount1 = PCount + case IsPersistent of
@@ -1096,7 +1096,7 @@ maybe_write_msg_to_disk(_PersistentStore, _Force, MsgStatus =
{MsgStatus, MSCState};
maybe_write_msg_to_disk(PersistentStore, Force,
MsgStatus = #msg_status {
- msg = Msg, msg_id = MsgId,
+ msg = Msg, guid = MsgId,
is_persistent = IsPersistent }, MSCState)
when Force orelse IsPersistent ->
{ok, MSCState1} =
@@ -1115,7 +1115,7 @@ maybe_write_index_to_disk(_Force, MsgStatus =
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
{MsgStatus, IndexState};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
- msg_id = MsgId, seq_id = SeqId,
+ guid = MsgId, seq_id = SeqId,
is_persistent = IsPersistent,
is_delivered = IsDelivered }, IndexState)
when Force orelse IsPersistent ->