diff options
| -rw-r--r-- | include/rabbit.hrl | 2 | ||||
| -rw-r--r-- | include/rabbit_backing_queue_type_spec.hrl | 4 | ||||
| -rw-r--r-- | include/rabbit_msg_store.hrl | 2 | ||||
| -rw-r--r-- | include/rabbit_msg_store_index.hrl | 6 | ||||
| -rw-r--r-- | src/rabbit_msg_file.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 46 | ||||
| -rw-r--r-- | src/rabbit_msg_store_ets_index.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 32 |
10 files changed, 94 insertions, 94 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 4b1be43cda..982d90e909 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -89,7 +89,7 @@ %% this is really an abstract type, but dialyzer does not support them -type(guid() :: binary()). --type(msg_id() :: guid()). +-type(msg_id() :: non_neg_integer()). -type(txn() :: guid()). -type(pkey() :: guid()). -type(r(Kind) :: diff --git a/include/rabbit_backing_queue_type_spec.hrl b/include/rabbit_backing_queue_type_spec.hrl index 54118ba640..ac47ccba3d 100644 --- a/include/rabbit_backing_queue_type_spec.hrl +++ b/include/rabbit_backing_queue_type_spec.hrl @@ -41,8 +41,8 @@ state()}). -spec(ack/2 :: ([ack()], state()) -> state()). -spec(tx_publish/2 :: (basic_message(), state()) -> state()). --spec(tx_rollback/2 :: ([msg_id()], state()) -> state()). --spec(tx_commit/4 :: ([msg_id()], [ack()], {pid(), any()}, state()) -> +-spec(tx_rollback/2 :: ([guid()], state()) -> state()). +-spec(tx_commit/4 :: ([guid()], [ack()], {pid(), any()}, state()) -> {boolean(), state()}). -spec(requeue/2 :: ([{basic_message(), ack()}], state()) -> state()). -spec(len/1 :: (state()) -> non_neg_integer()). diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl index 696ccf3cc6..d96fa758bd 100644 --- a/include/rabbit_msg_store.hrl +++ b/include/rabbit_msg_store.hrl @@ -38,4 +38,4 @@ -endif. -record(msg_location, - {msg_id, ref_count, file, offset, total_size}). + {guid, ref_count, file, offset, total_size}). diff --git a/include/rabbit_msg_store_index.hrl b/include/rabbit_msg_store_index.hrl index 9b3332eed0..eb0ad5cb68 100644 --- a/include/rabbit_msg_store_index.hrl +++ b/include/rabbit_msg_store_index.hrl @@ -43,13 +43,13 @@ -spec(init/2 :: (('fresh'|'recover'), dir()) -> {'fresh'|'recovered', index_state()}). --spec(lookup/2 :: (msg_id(), index_state()) -> ('not_found' | keyvalue())). +-spec(lookup/2 :: (guid(), index_state()) -> ('not_found' | keyvalue())). -spec(insert/2 :: (keyvalue(), index_state()) -> 'ok'). -spec(update/2 :: (keyvalue(), index_state()) -> 'ok'). --spec(update_fields/3 :: (msg_id(), ({fieldpos(), fieldvalue()} | +-spec(update_fields/3 :: (guid(), ({fieldpos(), fieldvalue()} | [{fieldpos(), fieldvalue()}]), index_state()) -> 'ok'). --spec(delete/2 :: (msg_id(), index_state()) -> 'ok'). +-spec(delete/2 :: (guid(), index_state()) -> 'ok'). -spec(delete_by_file/2 :: (fieldvalue(), index_state()) -> 'ok'). -spec(terminate/1 :: (index_state()) -> any()). diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index 2c7ea89302..0edeb4698d 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -40,9 +40,9 @@ -define(WRITE_OK_SIZE_BITS, 8). -define(WRITE_OK_MARKER, 255). -define(FILE_PACKING_ADJUSTMENT, (1 + ?INTEGER_SIZE_BYTES)). --define(MSG_ID_SIZE_BYTES, 16). --define(MSG_ID_SIZE_BITS, (8 * ?MSG_ID_SIZE_BYTES)). --define(SIZE_AND_MSG_ID_BYTES, (?MSG_ID_SIZE_BYTES + ?INTEGER_SIZE_BYTES)). +-define(GUID_SIZE_BYTES, 16). +-define(GUID_SIZE_BITS, (8 * ?GUID_SIZE_BYTES)). +-define(SIZE_AND_GUID_BYTES, (?GUID_SIZE_BYTES + ?INTEGER_SIZE_BYTES)). %%---------------------------------------------------------------------------- @@ -53,25 +53,25 @@ -type(position() :: non_neg_integer()). -type(msg_size() :: non_neg_integer()). --spec(append/3 :: (io_device(), msg_id(), msg()) -> +-spec(append/3 :: (io_device(), guid(), msg()) -> ({'ok', msg_size()} | {'error', any()})). -spec(read/2 :: (io_device(), msg_size()) -> - ({'ok', {msg_id(), msg()}} | {'error', any()})). + ({'ok', {guid(), msg()}} | {'error', any()})). -spec(scan/1 :: (io_device()) -> - {'ok', [{msg_id(), msg_size(), position()}], position()}). + {'ok', [{guid(), msg_size(), position()}], position()}). -endif. %%---------------------------------------------------------------------------- append(FileHdl, MsgId, MsgBody) - when is_binary(MsgId) andalso size(MsgId) =< ?MSG_ID_SIZE_BYTES -> + when is_binary(MsgId) andalso size(MsgId) =< ?GUID_SIZE_BYTES -> MsgBodyBin = term_to_binary(MsgBody), MsgBodyBinSize = size(MsgBodyBin), - Size = MsgBodyBinSize + ?MSG_ID_SIZE_BYTES, + Size = MsgBodyBinSize + ?GUID_SIZE_BYTES, case file_handle_cache:append(FileHdl, <<Size:?INTEGER_SIZE_BITS, - MsgId:?MSG_ID_SIZE_BYTES/binary, + MsgId:?GUID_SIZE_BYTES/binary, MsgBodyBin:MsgBodyBinSize/binary, ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT}; @@ -80,10 +80,10 @@ append(FileHdl, MsgId, MsgBody) read(FileHdl, TotalSize) -> Size = TotalSize - ?FILE_PACKING_ADJUSTMENT, - BodyBinSize = Size - ?MSG_ID_SIZE_BYTES, + BodyBinSize = Size - ?GUID_SIZE_BYTES, case file_handle_cache:read(FileHdl, TotalSize) of {ok, <<Size:?INTEGER_SIZE_BITS, - MsgId:?MSG_ID_SIZE_BYTES/binary, + MsgId:?GUID_SIZE_BYTES/binary, MsgBodyBin:BodyBinSize/binary, ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} -> {ok, {MsgId, binary_to_term(MsgBodyBin)}}; @@ -105,26 +105,26 @@ scan(FileHdl, Offset, Acc) -> end. read_next(FileHdl, Offset) -> - case file_handle_cache:read(FileHdl, ?SIZE_AND_MSG_ID_BYTES) of + case file_handle_cache:read(FileHdl, ?SIZE_AND_GUID_BYTES) of %% Here we take option 5 from %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in which %% we read the MsgId as a number, and then convert it back to %% a binary in order to work around bugs in Erlang's GC. - {ok, <<Size:?INTEGER_SIZE_BITS, MsgIdNum:?MSG_ID_SIZE_BITS>>} -> + {ok, <<Size:?INTEGER_SIZE_BITS, MsgIdNum:?GUID_SIZE_BITS>>} -> case Size of 0 -> eof; %% Nothing we can do other than stop _ -> TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, ExpectedAbsPos = Offset + TotalSize - 1, case file_handle_cache:position( - FileHdl, {cur, Size - ?MSG_ID_SIZE_BYTES}) of + FileHdl, {cur, Size - ?GUID_SIZE_BYTES}) of {ok, ExpectedAbsPos} -> NextOffset = ExpectedAbsPos + 1, case file_handle_cache:read(FileHdl, 1) of {ok, <<?WRITE_OK_MARKER: ?WRITE_OK_SIZE_BITS>>} -> - <<MsgId:?MSG_ID_SIZE_BYTES/binary>> = - <<MsgIdNum:?MSG_ID_SIZE_BITS>>, + <<MsgId:?GUID_SIZE_BYTES/binary>> = + <<MsgIdNum:?GUID_SIZE_BITS>>, {ok, {MsgId, TotalSize, NextOffset}}; {ok, _SomeOtherData} -> {corrupted, NextOffset}; diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 322cad87ce..1a7085a2bd 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -121,16 +121,16 @@ -spec(start_link/5 :: (atom(), file_path(), [binary()] | 'undefined', - (fun ((A) -> 'finished' | {msg_id(), non_neg_integer(), A})), A) -> + (fun ((A) -> 'finished' | {guid(), non_neg_integer(), A})), A) -> {'ok', pid()} | 'ignore' | {'error', any()}). --spec(write/4 :: (server(), msg_id(), msg(), client_msstate()) -> +-spec(write/4 :: (server(), guid(), msg(), client_msstate()) -> {'ok', client_msstate()}). --spec(read/3 :: (server(), msg_id(), client_msstate()) -> +-spec(read/3 :: (server(), guid(), client_msstate()) -> {{'ok', msg()} | 'not_found', client_msstate()}). --spec(contains/2 :: (server(), msg_id()) -> boolean()). --spec(remove/2 :: (server(), [msg_id()]) -> 'ok'). --spec(release/2 :: (server(), [msg_id()]) -> 'ok'). --spec(sync/3 :: (server(), [msg_id()], fun (() -> any())) -> 'ok'). +-spec(contains/2 :: (server(), guid()) -> boolean()). +-spec(remove/2 :: (server(), [guid()]) -> 'ok'). +-spec(release/2 :: (server(), [guid()]) -> 'ok'). +-spec(sync/3 :: (server(), [guid()], fun (() -> any())) -> 'ok'). -spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). -spec(client_init/2 :: (server(), binary()) -> client_msstate()). @@ -153,7 +153,7 @@ %% The components: %% %% MsgLocation: this is a mapping from MsgId to #msg_location{}: -%% {MsgId, RefCount, File, Offset, TotalSize} +%% {Guid, RefCount, File, Offset, TotalSize} %% By default, it's in ets, but it's also pluggable. %% FileSummary: this is an ets table which contains: %% {File, ValidTotalSize, ContiguousTop, Left, Right} @@ -393,7 +393,7 @@ add_to_cache(CurFileCacheEts, MsgId, Msg) -> end end. -client_read1(Server, #msg_location { msg_id = MsgId, file = File } = +client_read1(Server, #msg_location { guid = MsgId, file = File } = MsgLocation, Defer, CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> case ets:lookup(FileSummaryEts, File) of @@ -404,7 +404,7 @@ client_read1(Server, #msg_location { msg_id = MsgId, file = File } = end. client_read2(_Server, false, undefined, - #msg_location { msg_id = MsgId, ref_count = RefCount }, Defer, + #msg_location { guid = MsgId, ref_count = RefCount }, Defer, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, dedup_cache_ets = DedupCacheEts }) -> case ets:lookup(CurFileCacheEts, MsgId) of @@ -420,7 +420,7 @@ client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) -> %% the safest and simplest thing to do. Defer(); client_read2(Server, false, _Right, - #msg_location { msg_id = MsgId, ref_count = RefCount, file = File }, + #msg_location { guid = MsgId, ref_count = RefCount, file = File }, Defer, CState = #client_msstate { file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, @@ -631,7 +631,7 @@ handle_cast({write, MsgId, Msg}, {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg), ok = index_insert(#msg_location { - msg_id = MsgId, ref_count = 1, file = CurFile, + guid = MsgId, ref_count = 1, file = CurFile, offset = CurOffset, total_size = TotalSize }, State), [#file_summary { valid_total_size = ValidTotalSize, @@ -836,7 +836,7 @@ read_message(MsgId, From, State = end end. -read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount, +read_message1(From, #msg_location { guid = MsgId, ref_count = RefCount, file = File, offset = Offset } = MsgLoc, State = #msstate { current_file = CurFile, current_file_handle = CurHdl, @@ -874,7 +874,7 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount, end end. -read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount, +read_from_disk(#msg_location { guid = MsgId, ref_count = RefCount, file = File, offset = Offset, total_size = TotalSize }, State, DedupCacheEts) -> @@ -888,7 +888,7 @@ read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount, throw({error, {misread, [{old_state, State}, {file_num, File}, {offset, Offset}, - {msg_id, MsgId}, + {guid, MsgId}, {read, Rest}, {proc_dict, get()} ]}}) @@ -1176,7 +1176,7 @@ count_msg_refs(Gen, Seed, State) -> {MsgId, Delta, Next} -> ok = case index_lookup(MsgId, State) of not_found -> - index_insert(#msg_location { msg_id = MsgId, + index_insert(#msg_location { guid = MsgId, ref_count = Delta }, State); StoreEntry = #msg_location { ref_count = RefCount } -> @@ -1202,9 +1202,9 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) -> NonTmpRelatedFileName = filename:rootname(TmpFileName) ++ ?FILE_EXTENSION, true = lists:member(NonTmpRelatedFileName, FileNames), {ok, UncorruptedMessagesTmp, MsgIdsTmp} = - scan_file_for_valid_messages_msg_ids(Dir, TmpFileName), + scan_file_for_valid_messages_guids(Dir, TmpFileName), {ok, UncorruptedMessages, MsgIds} = - scan_file_for_valid_messages_msg_ids(Dir, NonTmpRelatedFileName), + scan_file_for_valid_messages_guids(Dir, NonTmpRelatedFileName), %% 1) It's possible that everything in the tmp file is also in the %% main file such that the main file is (prefix ++ %% tmpfile). This means that compaction failed immediately @@ -1282,7 +1282,7 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) -> ok = file_handle_cache:delete(TmpHdl), {ok, _MainMessages, MsgIdsMain} = - scan_file_for_valid_messages_msg_ids( + scan_file_for_valid_messages_guids( Dir, NonTmpRelatedFileName), %% check that everything in MsgIds1 is in MsgIdsMain true = is_sublist(MsgIds1, MsgIdsMain), @@ -1297,7 +1297,7 @@ is_sublist(SmallerL, BiggerL) -> is_disjoint(SmallerL, BiggerL) -> lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL). -scan_file_for_valid_messages_msg_ids(Dir, FileName) -> +scan_file_for_valid_messages_guids(Dir, FileName) -> {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, FileName), {ok, Messages, [MsgId || {MsgId, _TotalSize, _FileOffset} <- Messages]}. @@ -1367,7 +1367,7 @@ build_index(Gatherer, Left, [File|Files], State) -> build_index(Gatherer, File, Files, State). build_index_worker( - Gatherer, Guid, State = #msstate { dir = Dir }, Left, File, Files) -> + Gatherer, Ref, State = #msstate { dir = Dir }, Left, File, Files) -> {ok, Messages, FileSize} = scan_file_for_valid_messages( Dir, filenum_to_name(File)), @@ -1405,7 +1405,7 @@ build_index_worker( contiguous_top = ContiguousTop, locked = false, left = Left, right = Right, file_size = FileSize1, readers = 0 }), - ok = gatherer:finished(Gatherer, Guid). + ok = gatherer:finished(Gatherer, Ref). %%---------------------------------------------------------------------------- %% garbage collection / compaction / aggregation -- internal @@ -1660,7 +1660,7 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, Destination, {_FileSummaryEts, _Dir, Index, IndexState}) -> {FinalOffset, BlockStart1, BlockEnd1} = lists:foldl( - fun (#msg_location { msg_id = MsgId, offset = Offset, + fun (#msg_location { guid = MsgId, offset = Offset, total_size = TotalSize }, {CurOffset, BlockStart, BlockEnd}) -> %% CurOffset is in the DestinationFile. diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl index d46212ba15..b4fb5ef174 100644 --- a/src/rabbit_msg_store_ets_index.erl +++ b/src/rabbit_msg_store_ets_index.erl @@ -45,7 +45,7 @@ init(fresh, Dir) -> file:delete(filename:join(Dir, ?FILENAME)), - Tid = ets:new(?MSG_LOC_NAME, [set, public, {keypos, #msg_location.msg_id}]), + Tid = ets:new(?MSG_LOC_NAME, [set, public, {keypos, #msg_location.guid}]), {fresh, #state { table = Tid, dir = Dir }}; init(recover, Dir) -> Path = filename:join(Dir, ?FILENAME), diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index a5583b8765..556c6968e4 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -143,10 +143,10 @@ -define(PUBLISH_PREFIX, 1). -define(PUBLISH_PREFIX_BITS, 1). --define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes --define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)). +-define(GUID_BYTES, 16). %% md5sum is 128 bit or 16 bytes +-define(GUID_BITS, (?GUID_BYTES * 8)). %% 16 bytes for md5sum + 2 for seq, bits and prefix --define(PUBLISH_RECORD_LENGTH_BYTES, ?MSG_ID_BYTES + 2). +-define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + 2). %% 1 publish, 1 deliver, 1 ack per msg -define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT * @@ -199,14 +199,14 @@ {'undefined' | non_neg_integer(), binary(), binary(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(terminate_and_erase/1 :: (qistate()) -> qistate()). --spec(write_published/4 :: (msg_id(), seq_id(), boolean(), qistate()) +-spec(write_published/4 :: (guid(), seq_id(), boolean(), qistate()) -> qistate()). -spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()). -spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()). -spec(sync_seq_ids/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush_journal/1 :: (qistate()) -> qistate()). -spec(read_segment_entries/2 :: (seq_id(), qistate()) -> - {[{msg_id(), seq_id(), boolean(), boolean()}], qistate()}). + {[{guid(), seq_id(), boolean(), boolean()}], qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(segment_size/0 :: () -> non_neg_integer()). -spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) -> @@ -328,7 +328,7 @@ terminate_and_erase(State) -> State1. write_published(MsgId, SeqId, IsPersistent, State) when is_binary(MsgId) -> - ?MSG_ID_BYTES = size(MsgId), + ?GUID_BYTES = size(MsgId), {JournalHdl, State1} = get_journal_handle(State), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of @@ -501,20 +501,20 @@ queue_index_walker({[QueueName | QueueNames], Gatherer}) -> [QueueName, Gatherer, Child]}), queue_index_walker({QueueNames, Gatherer}). -queue_index_walker_reader(QueueName, Gatherer, Guid) -> +queue_index_walker_reader(QueueName, Gatherer, Ref) -> State = blank_state(QueueName), State1 = load_journal(State), SegNums = all_segment_nums(State1), - queue_index_walker_reader(Gatherer, Guid, State1, SegNums). + queue_index_walker_reader(Gatherer, Ref, State1, SegNums). -queue_index_walker_reader(Gatherer, Guid, State, []) -> +queue_index_walker_reader(Gatherer, Ref, State, []) -> _State = terminate(false, [], State), - ok = gatherer:finished(Gatherer, Guid); -queue_index_walker_reader(Gatherer, Guid, State, [Seg | SegNums]) -> + ok = gatherer:finished(Gatherer, Ref); +queue_index_walker_reader(Gatherer, Ref, State, [Seg | SegNums]) -> SeqId = reconstruct_seq_id(Seg, 0), {Messages, State1} = read_segment_entries(SeqId, State), State2 = queue_index_walker_reader1(Gatherer, State1, Messages), - queue_index_walker_reader(Gatherer, Guid, State2, SegNums). + queue_index_walker_reader(Gatherer, Ref, State2, SegNums). queue_index_walker_reader1(_Gatherer, State, []) -> State; @@ -775,7 +775,7 @@ load_segment_entries(KeepAcks, Hdl, SegEntries, PubCount, AckCount) -> IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> %% because we specify /binary, and binaries are complete %% bytes, the size spec is in bytes, not bits. - {ok, MsgId} = file_handle_cache:read(Hdl, ?MSG_ID_BYTES), + {ok, MsgId} = file_handle_cache:read(Hdl, ?GUID_BYTES), SegEntries1 = array:set(RelSeq, {{MsgId, 1 == IsPersistentNum}, no_del, no_ack}, @@ -836,13 +836,13 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> ?ACK_JPREFIX -> load_journal_entries(add_to_journal(SeqId, ack, State)); _ -> - case file_handle_cache:read(Hdl, ?MSG_ID_BYTES) of - {ok, <<MsgIdNum:?MSG_ID_BITS>>} -> + case file_handle_cache:read(Hdl, ?GUID_BYTES) of + {ok, <<MsgIdNum:?GUID_BITS>>} -> %% work around for binary data %% fragmentation. See %% rabbit_msg_file:read_next/2 - <<MsgId:?MSG_ID_BYTES/binary>> = - <<MsgIdNum:?MSG_ID_BITS>>, + <<MsgId:?GUID_BYTES/binary>> = + <<MsgIdNum:?GUID_BITS>>, Publish = {MsgId, case Prefix of ?PUB_PERSIST_JPREFIX -> true; ?PUB_TRANS_JPREFIX -> false diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 97590e663c..8eb129394d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1019,7 +1019,7 @@ stop_msg_store() -> E -> {persistent, E} end. -msg_id_bin(X) -> +guid_bin(X) -> erlang:md5(term_to_binary(X)). msg_store_contains(Atom, MsgIds) -> @@ -1037,7 +1037,7 @@ msg_store_sync(MsgIds) -> {sync, Ref} -> ok after 10000 -> - io:format("Sync from msg_store missing for msg_ids ~p~n", [MsgIds]), + io:format("Sync from msg_store missing for guids ~p~n", [MsgIds]), throw(timeout) end. @@ -1060,7 +1060,7 @@ test_msg_store() -> stop_msg_store(), ok = start_msg_store_empty(), Self = self(), - MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)], + MsgIds = [guid_bin(M) || M <- lists:seq(1,100)], {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds), %% check we don't contain any of the msgs we're about to publish false = msg_store_contains(false, MsgIds), @@ -1094,7 +1094,7 @@ test_msg_store() -> {sync, MsgId} -> ok after 10000 -> - io:format("Sync from msg_store missing (msg_id: ~p)~n", + io:format("Sync from msg_store missing (guid: ~p)~n", [MsgId]), throw(timeout) end @@ -1147,10 +1147,10 @@ test_msg_store() -> ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, MsgIds1stHalf), %% restart empty ok = stop_msg_store(), - ok = start_msg_store_empty(), %% now safe to reuse msg_ids + ok = start_msg_store_empty(), %% now safe to reuse guids %% push a lot of msgs in... BigCount = 100000, - MsgIdsBig = [msg_id_bin(X) || X <- lists:seq(1, BigCount)], + MsgIdsBig = [guid_bin(X) || X <- lists:seq(1, BigCount)], Payload = << 0:65536 >>, ok = rabbit_msg_store:client_terminate( lists:foldl( @@ -1170,19 +1170,19 @@ test_msg_store() -> %% .., then 3s by 1... ok = lists:foldl( fun (MsgId, ok) -> - rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [msg_id_bin(MsgId)]) + rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [guid_bin(MsgId)]) end, ok, lists:seq(BigCount, 1, -3)), %% .., then remove 3s by 2, from the young end first. This hits %% GC (under 50% good data left, but no empty files. Must GC). ok = lists:foldl( fun (MsgId, ok) -> - rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [msg_id_bin(MsgId)]) + rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [guid_bin(MsgId)]) end, ok, lists:seq(BigCount-1, 1, -3)), %% .., then remove 3s by 3, from the young end first. This hits %% GC... ok = lists:foldl( fun (MsgId, ok) -> - rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [msg_id_bin(MsgId)]) + rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [guid_bin(MsgId)]) end, ok, lists:seq(BigCount-2, 1, -3)), %% ensure empty false = msg_store_contains(false, MsgIdsBig), @@ -1212,12 +1212,12 @@ queue_index_publish(SeqIds, Persistent, Qi) -> {A, B, MSCStateEnd} = lists:foldl( fun (SeqId, {QiN, SeqIdsMsgIdsAcc, MSCStateN}) -> - MsgId = rabbit_guid:guid(), - QiM = rabbit_queue_index:write_published(MsgId, SeqId, Persistent, + Guid = rabbit_guid:guid(), + QiM = rabbit_queue_index:write_published(Guid, SeqId, Persistent, QiN), - {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, MsgId, - MsgId, MSCStateN), - {QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc], MSCStateM} + {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid, + Guid, MSCStateN), + {QiM, [{SeqId, Guid} | SeqIdsMsgIdsAcc], MSCStateM} end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref)}, SeqIds), ok = rabbit_msg_store:delete_client(MsgStore, Ref), ok = rabbit_msg_store:client_terminate(MSCStateEnd), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 1c29c19367..f2e9c19c56 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -165,7 +165,7 @@ -record(msg_status, { msg, - msg_id, + guid, seq_id, is_persistent, is_delivered, @@ -197,7 +197,7 @@ -type(bpqueue() :: any()). -type(seq_id() :: non_neg_integer()). --type(ack() :: {'ack_index_and_store', msg_id(), seq_id(), atom() | pid()} +-type(ack() :: {'ack_index_and_store', guid(), seq_id(), atom() | pid()} | 'ack_not_on_disk'). -type(delta() :: #delta { start_seq_id :: non_neg_integer(), @@ -225,7 +225,7 @@ avg_ingress_rate :: float(), rate_timestamp :: {integer(), integer(), integer()}, len :: non_neg_integer(), - on_sync :: {[[ack()]], [[msg_id()]], [{pid(), any()}]}, + on_sync :: {[[ack()]], [[guid()]], [{pid(), any()}]}, msg_store_clients :: {{any(), binary()}, {any(), binary()}}, persistent_store :: pid() | atom(), persistent_count :: non_neg_integer(), @@ -233,7 +233,7 @@ }). -spec(tx_commit_post_msg_store/5 :: - (boolean(), [msg_id()], [ack()], {pid(), any()}, state()) -> + (boolean(), [guid()], [ack()], {pid(), any()}, state()) -> {boolean(), state()}). -spec(tx_commit_index/1 :: (state()) -> {boolean(), state()}). @@ -331,7 +331,7 @@ publish_delivered(Msg = #basic_message { guid = MsgId, State1 = State #vqstate { out_counter = OutCount + 1, in_counter = InCount + 1 }, MsgStatus = #msg_status { - msg = Msg, msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent, + msg = Msg, guid = MsgId, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = true, msg_on_disk = false, index_on_disk = false }, {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState), @@ -411,7 +411,7 @@ fetch(State = {empty, _Q4} -> fetch_from_q3_or_delta(State); {{value, #msg_status { - msg = Msg, msg_id = MsgId, seq_id = SeqId, + msg = Msg, guid = MsgId, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }}, Q4a} -> @@ -572,7 +572,7 @@ tx_publish(Msg = #basic_message { is_persistent = true, guid = MsgId }, State = #vqstate { msg_store_clients = MSCState, persistent_store = PersistentStore }) -> MsgStatus = #msg_status { - msg = Msg, msg_id = MsgId, seq_id = undefined, is_persistent = true, + msg = Msg, guid = MsgId, seq_id = undefined, is_persistent = true, is_delivered = false, msg_on_disk = false, index_on_disk = false }, {#msg_status { msg_on_disk = true }, MSCState1} = maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState), @@ -581,7 +581,7 @@ tx_publish(_Msg, State) -> State. tx_rollback(Pubs, State = #vqstate { persistent_store = PersistentStore }) -> - ok = case persistent_msg_ids(Pubs) of + ok = case persistent_guids(Pubs) of [] -> ok; PP -> rabbit_msg_store:remove(PersistentStore, PP) end, @@ -591,7 +591,7 @@ tx_commit(Pubs, AckTags, From, State = #vqstate { persistent_store = PersistentStore }) -> %% If we are a non-durable queue, or we have no persistent pubs, %% we can skip the msg_store loop. - PersistentMsgIds = persistent_msg_ids(Pubs), + PersistentMsgIds = persistent_guids(Pubs), IsTransientPubs = [] == PersistentMsgIds, case IsTransientPubs orelse ?TRANSIENT_MSG_STORE == PersistentStore of @@ -699,7 +699,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> Avg = 1000000 * ((Count + OCount) / timer:now_diff(Now, OThen)), {Avg, {Then, Count}}. -persistent_msg_ids(Pubs) -> +persistent_guids(Pubs) -> [MsgId || Obj = #basic_message { guid = MsgId } <- Pubs, Obj #basic_message.is_persistent]. @@ -722,7 +722,7 @@ betas_from_segment_entries(List, SeqIdLimit, TransientThreshold, IndexState) -> case SeqId < SeqIdLimit of true -> {[#msg_status { msg = undefined, - msg_id = MsgId, + guid = MsgId, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, @@ -852,7 +852,7 @@ remove_queue_entries(PersistentStore, Fold, Q, IndexState) -> {Count, IndexState2}. remove_queue_entries1( - #msg_status { msg_id = MsgId, seq_id = SeqId, + #msg_status { guid = MsgId, seq_id = SeqId, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, {PersistentStore, CountN, MsgIdsByStore, SeqIdsAcc, IndexStateN}) -> @@ -889,7 +889,7 @@ fetch_from_q3_or_delta(State = #vqstate { true = queue:is_empty(Q1), %% ASSERTION {empty, State}; {{value, IndexOnDisk, MsgStatus = #msg_status { - msg = undefined, msg_id = MsgId, + msg = undefined, guid = MsgId, is_persistent = IsPersistent }}, Q3a} -> {{ok, Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }}, MSCState1} = @@ -983,7 +983,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }, #vqstate { next_seq_id = SeqId, len = Len, in_counter = InCount, persistent_count = PCount }) -> MsgStatus = #msg_status { - msg = Msg, msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent, + msg = Msg, guid = MsgId, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = false }, PCount1 = PCount + case IsPersistent of @@ -1096,7 +1096,7 @@ maybe_write_msg_to_disk(_PersistentStore, _Force, MsgStatus = {MsgStatus, MSCState}; maybe_write_msg_to_disk(PersistentStore, Force, MsgStatus = #msg_status { - msg = Msg, msg_id = MsgId, + msg = Msg, guid = MsgId, is_persistent = IsPersistent }, MSCState) when Force orelse IsPersistent -> {ok, MSCState1} = @@ -1115,7 +1115,7 @@ maybe_write_index_to_disk(_Force, MsgStatus = true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION {MsgStatus, IndexState}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { - msg_id = MsgId, seq_id = SeqId, + guid = MsgId, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered }, IndexState) when Force orelse IsPersistent -> |
