diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-06-19 15:35:44 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-06-19 15:35:44 +0100 |
| commit | 656cbf75101146cac83dffee1ff708bc39d1498d (patch) | |
| tree | 486faaabe135e613317adb78a8ce9ebc9db023ff | |
| parent | 80cd8f686e4efc52c07703015ecd1f4c781dcbab (diff) | |
| download | rabbitmq-server-git-656cbf75101146cac83dffee1ff708bc39d1498d.tar.gz | |
Altered API so that the disk_queue understands about #basic_message. This means that the mixed_queue avoids unnecessary term_to_binary calls. Tests adjusted and whole test suite still passes
| -rw-r--r-- | src/rabbit_basic.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 67 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 46 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 66 |
4 files changed, 109 insertions, 79 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 0673bdd8d2..f9a8f488af 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/4, delivery/4]). +-export([publish/1, message/4, message/5, delivery/4]). %%---------------------------------------------------------------------------- @@ -44,6 +44,8 @@ -spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()). -spec(message/4 :: (exchange_name(), routing_key(), binary(), binary()) -> message()). +-spec(message/5 :: (exchange_name(), routing_key(), binary(), binary(), guid()) -> + message()). -endif. @@ -64,6 +66,9 @@ delivery(Mandatory, Immediate, Txn, Message) -> sender = self(), message = Message}. message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> + message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, rabbit_guid:guid()). + +message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, MsgId) -> {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), Content = #content{class_id = ClassId, properties = #'P_basic'{content_type = ContentTypeBin}, @@ -72,5 +77,5 @@ message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> #basic_message{exchange_name = ExchangeName, routing_key = RoutingKeyBin, content = Content, - guid = rabbit_guid:guid(), + guid = MsgId, is_persistent = false}. diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 3370ef840d..b133f538ed 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -38,8 +38,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([publish/4, deliver/1, phantom_deliver/1, ack/2, - tx_publish/2, tx_commit/3, tx_cancel/1, +-export([publish/3, deliver/1, phantom_deliver/1, ack/2, + tx_publish/1, tx_commit/3, tx_cancel/1, requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1, dump_queue/1, delete_non_durable_queues/1, auto_ack_next_message/1 ]). @@ -235,21 +235,22 @@ -spec(start_link/0 :: () -> ({'ok', pid()} | 'ignore' | {'error', any()})). --spec(publish/4 :: (queue_name(), msg_id(), binary(), bool()) -> 'ok'). +-spec(publish/3 :: (queue_name(), message(), bool()) -> 'ok'). -spec(deliver/1 :: (queue_name()) -> - ('empty' | {msg_id(), binary(), non_neg_integer(), + ('empty' | {message(), non_neg_integer(), bool(), {msg_id(), seq_id()}, non_neg_integer()})). -spec(phantom_deliver/1 :: (queue_name()) -> ( 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}, non_neg_integer()})). -spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). --spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok'). +-spec(tx_publish/1 :: (message()) -> 'ok'). -spec(tx_commit/3 :: (queue_name(), [msg_id()], [{msg_id(), seq_id()}]) -> 'ok'). -spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). -spec(requeue/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). --spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, - seq_id_or_next()}]) -> 'ok'). +-spec(requeue_with_seqs/2 :: + (queue_name(), + [{{msg_id(), seq_id()}, {seq_id_or_next(), bool()}}]) -> 'ok'). -spec(purge/1 :: (queue_name()) -> non_neg_integer()). -spec(dump_queue/1 :: (queue_name()) -> [{msg_id(), binary(), non_neg_integer(), bool(), @@ -269,10 +270,10 @@ start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [?FILE_SIZE_LIMIT, ?MAX_READ_FILE_HANDLES], []). -publish(Q, MsgId, Msg, false) when is_binary(Msg) -> - gen_server2:cast(?SERVER, {publish, Q, MsgId, Msg}); -publish(Q, MsgId, Msg, true) when is_binary(Msg) -> - gen_server2:call(?SERVER, {publish, Q, MsgId, Msg}, infinity). +publish(Q, Message = #basic_message {}, false) -> + gen_server2:cast(?SERVER, {publish, Q, Message}); +publish(Q, Message = #basic_message {}, true) -> + gen_server2:call(?SERVER, {publish, Q, Message}, infinity). deliver(Q) -> gen_server2:call(?SERVER, {deliver, Q}, infinity). @@ -286,8 +287,8 @@ ack(Q, MsgSeqIds) when is_list(MsgSeqIds) -> auto_ack_next_message(Q) -> gen_server2:cast(?SERVER, {auto_ack_next_message, Q}). -tx_publish(MsgId, Msg) when is_binary(Msg) -> - gen_server2:cast(?SERVER, {tx_publish, MsgId, Msg}). +tx_publish(Message = #basic_message {}) -> + gen_server2:cast(?SERVER, {tx_publish, Message}). tx_commit(Q, PubMsgIds, AckSeqIds) when is_list(PubMsgIds) andalso is_list(AckSeqIds) -> @@ -403,9 +404,9 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> end, {ok, State1 #dqstate { current_file_handle = FileHdl }}. -handle_call({publish, Q, MsgId, MsgBody}, _From, State) -> +handle_call({publish, Q, Message}, _From, State) -> {ok, MsgSeqId, State1} = - internal_publish(Q, MsgId, next, MsgBody, true, State), + internal_publish(Q, Message, next, true, State), {reply, MsgSeqId, State1}; handle_call({deliver, Q}, _From, State) -> {ok, Result, State1} = internal_deliver(Q, true, false, State), @@ -470,9 +471,9 @@ handle_call({delete_non_durable_queues, DurableQueues}, _From, State) -> {ok, State1} = internal_delete_non_durable_queues(DurableQueues, State), {reply, ok, State1}. -handle_cast({publish, Q, MsgId, MsgBody}, State) -> +handle_cast({publish, Q, Message}, State) -> {ok, _MsgSeqId, State1} = - internal_publish(Q, MsgId, next, MsgBody, false, State), + internal_publish(Q, Message, next, false, State), {noreply, State1}; handle_cast({ack, Q, MsgSeqIds}, State) -> {ok, State1} = internal_ack(Q, MsgSeqIds, State), @@ -480,8 +481,8 @@ handle_cast({ack, Q, MsgSeqIds}, State) -> handle_cast({auto_ack_next_message, Q}, State) -> {ok, State1} = internal_auto_ack(Q, State), {noreply, State1}; -handle_cast({tx_publish, MsgId, MsgBody}, State) -> - {ok, State1} = internal_tx_publish(MsgId, MsgBody, State), +handle_cast({tx_publish, Message = #basic_message { guid = MsgId }}, State) -> + {ok, State1} = internal_tx_publish(MsgId, Message, State), {noreply, State1}; handle_cast({tx_cancel, MsgIds}, State) -> {ok, State1} = internal_tx_cancel(MsgIds, State), @@ -676,6 +677,13 @@ sequence_lookup(Sequences, Q) -> {ReadSeqId, WriteSeqId, Length} end. +msg_to_bin(Msg = #basic_message { content = Content }) -> + ClearedContent = rabbit_binary_parser:clear_decoded_content(Content), + term_to_binary(Msg #basic_message { content = ClearedContent }). + +bin_to_msg(MsgBin) -> + binary_to_term(MsgBin). + %% ---- INTERNAL RAW FUNCTIONS ---- internal_deliver(Q, ReadMsg, FakeDeliver, @@ -694,8 +702,8 @@ internal_deliver(Q, ReadMsg, FakeDeliver, case Result of {MsgId, Delivered, {MsgId, ReadSeqId}} -> {MsgId, Delivered, {MsgId, ReadSeqId}, Remaining}; - {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}} -> - {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}, + {Message, BodySize, Delivered, {MsgId, ReadSeqId}} -> + {Message, BodySize, Delivered, {MsgId, ReadSeqId}, Remaining} end, State1} end. @@ -718,7 +726,8 @@ internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) -> {FileHdl, State1} = get_read_handle(File, State), {ok, {MsgBody, BodySize}} = read_message_at_offset(FileHdl, Offset, TotalSize), - {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}}, + Message = bin_to_msg(MsgBody), + {ok, {Message, BodySize, Delivered, {MsgId, ReadSeqId}}, NextReadSeqId, State1}; false -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, NextReadSeqId, State} @@ -783,7 +792,7 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete, State1 = compact(Files, State), {ok, State1}. -internal_tx_publish(MsgId, MsgBody, +internal_tx_publish(MsgId, Message, State = #dqstate { current_file_handle = CurHdl, current_file_name = CurName, current_offset = CurOffset, @@ -792,7 +801,8 @@ internal_tx_publish(MsgId, MsgBody, case dets_ets_lookup(State, MsgId) of [] -> %% New message, lots to do - {ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody), + {ok, TotalSize} = + append_message(CurHdl, MsgId, msg_to_bin(Message)), true = dets_ets_insert_new(State, {MsgId, 1, CurName, CurOffset, TotalSize}), [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] = @@ -882,9 +892,10 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, {ok, State1 #dqstate { current_dirty = IsDirty1 }}. %% SeqId can be 'next' -internal_publish(Q, MsgId, SeqId, MsgBody, IsDelivered, State) -> +internal_publish(Q, Message = #basic_message { guid = MsgId }, SeqId, + IsDelivered, State) -> {ok, State1 = #dqstate { sequences = Sequences }} = - internal_tx_publish(MsgId, MsgBody, State), + internal_tx_publish(MsgId, Message, State), {ReadSeqId, WriteSeqId, Length} = sequence_lookup(Sequences, Q), ReadSeqId3 = determine_next_read_id(ReadSeqId, WriteSeqId, SeqId), @@ -1023,12 +1034,12 @@ internal_dump_queue(Q, State = #dqstate { sequences = Sequences }) -> fun ({SeqId, _State1}) when SeqId == WriteSeq -> false; ({SeqId, State1}) -> - {ok, {MsgId, Msg, Size, Delivered, {MsgId, SeqId}}, + {ok, {Message, Size, Delivered, {MsgId, SeqId}}, NextReadSeqId, State2} = internal_read_message(Q, SeqId, true, true, State1), {true, - {MsgId, Msg, Size, Delivered, {MsgId, SeqId}, SeqId}, + {Message, Size, Delivered, {MsgId, SeqId}, SeqId}, {NextReadSeqId, State2}} end, {ReadSeq, State}), {lists:reverse(QList), State3} diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index e7ac171c6a..9b99ab7f90 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -117,7 +117,7 @@ to_disk_only_mode(State = Q, lists:reverse(RQueueAcc)) end, ok = rabbit_disk_queue:publish( - Q, MsgId, msg_to_bin(Msg), false), + Q, Msg, false), [] end end, [], Msgs), @@ -136,9 +136,8 @@ to_mixed_mode(State = #mqstate { mode = disk, queue = Q, length = Length }) -> QList = rabbit_disk_queue:dump_queue(Q), {MsgBuf1, Length} = lists:foldl( - fun ({MsgId, MsgBin, _Size, IsDelivered, _AckTag, _SeqId}, + fun ({Msg, _Size, IsDelivered, _AckTag, _SeqId}, {Buf, L}) -> - Msg = #basic_message { guid = MsgId } = bin_to_msg(MsgBin), {queue:in({Msg, IsDelivered, true}, Buf), L+1} end, {queue:new(), 0}, QList), {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1 }}. @@ -162,9 +161,8 @@ purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q, deliver_all_messages(Q, IsDurable, Acks, Requeue, Length) -> case rabbit_disk_queue:deliver(Q) of empty -> {Acks, Requeue, Length}; - {MsgId, MsgBin, _Size, IsDelivered, AckTag, _Remaining} -> - #basic_message { guid = MsgId, is_persistent = IsPersistent } = - bin_to_msg(MsgBin), + {#basic_message { is_persistent = IsPersistent }, + _Size, IsDelivered, AckTag, _Remaining} -> OnDisk = IsPersistent andalso IsDurable, {Acks1, Requeue1, Length1} = if OnDisk -> {Acks, @@ -176,23 +174,15 @@ deliver_all_messages(Q, IsDurable, Acks, Requeue, Length) -> deliver_all_messages(Q, IsDurable, Acks1, Requeue1, Length1) end. -msg_to_bin(Msg = #basic_message { content = Content }) -> - ClearedContent = rabbit_binary_parser:clear_decoded_content(Content), - term_to_binary(Msg #basic_message { content = ClearedContent }). - -bin_to_msg(MsgBin) -> - binary_to_term(MsgBin). - -publish(Msg = #basic_message { guid = MsgId }, - State = #mqstate { mode = disk, queue = Q, length = Length }) -> - ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false), +publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length }) -> + ok = rabbit_disk_queue:publish(Q, Msg, false), {ok, State #mqstate { length = Length + 1 }}; -publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, +publish(Msg = #basic_message { is_persistent = IsPersistent }, State = #mqstate { queue = Q, mode = mixed, is_durable = IsDurable, msg_buf = MsgBuf, length = Length }) -> OnDisk = IsDurable andalso IsPersistent, ok = if OnDisk -> - rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false); + rabbit_disk_queue:publish(Q, Msg, false); true -> ok end, {ok, State #mqstate { msg_buf = queue:in({Msg, false, OnDisk}, MsgBuf), @@ -205,7 +195,7 @@ publish_delivered(Msg = State = #mqstate { mode = Mode, is_durable = IsDurable, queue = Q, length = 0 }) when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> - rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false), + rabbit_disk_queue:publish(Q, Msg, false), if IsDurable andalso IsPersistent -> %% must call phantom_deliver otherwise the msg remains at %% the head of the queue. This is synchronous, but @@ -225,10 +215,9 @@ deliver(State = #mqstate { length = 0 }) -> {empty, State}; deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable, length = Length }) -> - {MsgId, MsgBin, _Size, IsDelivered, AckTag, Remaining} + {Msg = #basic_message { is_persistent = IsPersistent }, + _Size, IsDelivered, AckTag, Remaining} = rabbit_disk_queue:deliver(Q), - #basic_message { guid = MsgId, is_persistent = IsPersistent } = - Msg = bin_to_msg(MsgBin), AckTag1 = if IsPersistent andalso IsDurable -> AckTag; true -> ok = rabbit_disk_queue:ack(Q, [AckTag]), noack @@ -268,14 +257,13 @@ ack(Acks, State = #mqstate { queue = Q }) -> {ok, State} end. -tx_publish(Msg = #basic_message { guid = MsgId }, - State = #mqstate { mode = disk }) -> - ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)), +tx_publish(Msg, State = #mqstate { mode = disk }) -> + ok = rabbit_disk_queue:tx_publish(Msg), {ok, State}; -tx_publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, +tx_publish(Msg = #basic_message { is_persistent = IsPersistent }, State = #mqstate { mode = mixed, is_durable = IsDurable }) when IsDurable andalso IsPersistent -> - ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)), + ok = rabbit_disk_queue:tx_publish(Msg), {ok, State}; tx_publish(_Msg, State = #mqstate { mode = mixed }) -> %% this message will reappear in the tx_commit, so ignore for now @@ -352,13 +340,13 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q, fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, RQ) when IsPersistent andalso IsDurable -> [AckTag | RQ]; - ({Msg = #basic_message { guid = MsgId }, _AckTag}, RQ) -> + ({Msg, _AckTag}, RQ) -> ok = if RQ == [] -> ok; true -> rabbit_disk_queue:requeue( Q, lists:reverse(RQ)) end, _AckTag1 = rabbit_disk_queue:publish( - Q, MsgId, msg_to_bin(Msg), true), + Q, Msg, true), [] end, [], MessagesWithAckTags), ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index f45a36bb48..bddb451a17 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -716,6 +716,15 @@ benchmark_disk_queue() -> ok = control_action(start_app, []), passed. +rdq_message(MsgId, MsgBody) -> + rabbit_basic:message(x, <<>>, <<>>, MsgBody, MsgId). + +rdq_match_message( + #basic_message { guid = MsgId, content = + #content { payload_fragments_rev = [MsgBody] }}, + MsgId, MsgBody, Size) when size(MsgBody) =:= Size -> + ok. + rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> Startup = rdq_virgin(), rdq_start(), @@ -724,7 +733,7 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> List = lists:seq(1, MsgCount), {Publish, ok} = timer:tc(?MODULE, rdq_time_commands, - [[fun() -> [rabbit_disk_queue:tx_publish(N, Msg) + [[fun() -> [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- List, _ <- Qs] end, fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List, []) || Q <- Qs] end @@ -735,8 +744,9 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> [[fun() -> [begin SeqIds = [begin Remaining = MsgCount - N, - {N, Msg, MsgSizeBytes, false, SeqId, + {Message, _TSize, false, SeqId, Remaining} = rabbit_disk_queue:deliver(Q), + ok = rdq_match_message(Message, N, Msg, MsgSizeBytes), SeqId end || N <- List], ok = rabbit_disk_queue:tx_commit(Q, [], SeqIds) @@ -759,7 +769,7 @@ rdq_stress_gc(MsgCount) -> MsgSizeBytes = 256*1024, Msg = <<0:(8*MsgSizeBytes)>>, % 256KB List = lists:seq(1, MsgCount), - [rabbit_disk_queue:tx_publish(N, Msg) || N <- List], + [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- List], rabbit_disk_queue:tx_commit(q, List, []), StartChunk = round(MsgCount / 20), % 5% AckList = @@ -780,8 +790,9 @@ rdq_stress_gc(MsgCount) -> lists:foldl( fun (MsgId, Acc) -> Remaining = MsgCount - MsgId, - {MsgId, Msg, MsgSizeBytes, false, SeqId, Remaining} = + {Message, _TSize, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + ok = rdq_match_message(Message, MsgId, Msg, MsgSizeBytes), dict:store(MsgId, SeqId, Acc) end, dict:new(), List), %% we really do want to ack each of this individually @@ -800,14 +811,16 @@ rdq_test_startup_with_queue_gaps() -> Total = 1000, Half = round(Total/2), All = lists:seq(1,Total), - [rabbit_disk_queue:tx_publish(N, Msg) || N <- All], + [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- All], rabbit_disk_queue:tx_commit(q, All, []), io:format("Publish done~n", []), %% deliver first half Seqs = [begin Remaining = Total - N, - {N, Msg, 256, false, SeqId, Remaining} = - rabbit_disk_queue:deliver(q), SeqId + {Message, _TSize, false, SeqId, Remaining} = + rabbit_disk_queue:deliver(q), + ok = rdq_match_message(Message, N, Msg, 256), + SeqId end || N <- lists:seq(1,Half)], io:format("Deliver first half done~n", []), %% ack every other message we have delivered (starting at the _first_) @@ -826,8 +839,9 @@ rdq_test_startup_with_queue_gaps() -> %% lists:seq(2,500,2) already delivered Seqs2 = [begin Remaining = round(Total - ((Half + N)/2)), - {N, Msg, 256, true, SeqId, Remaining} = + {Message, _TSize, true, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + ok = rdq_match_message(Message, N, Msg, 256), SeqId end || N <- lists:seq(2,Half,2)], rabbit_disk_queue:tx_commit(q, [], Seqs2), @@ -835,8 +849,9 @@ rdq_test_startup_with_queue_gaps() -> %% and now fetch the rest Seqs3 = [begin Remaining = Total - N, - {N, Msg, 256, false, SeqId, Remaining} = + {Message, _TSize, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + ok = rdq_match_message(Message, N, Msg, 256), SeqId end || N <- lists:seq(1 + Half,Total)], rabbit_disk_queue:tx_commit(q, [], Seqs3), @@ -852,14 +867,15 @@ rdq_test_redeliver() -> Total = 1000, Half = round(Total/2), All = lists:seq(1,Total), - [rabbit_disk_queue:tx_publish(N, Msg) || N <- All], + [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- All], rabbit_disk_queue:tx_commit(q, All, []), io:format("Publish done~n", []), %% deliver first half Seqs = [begin Remaining = Total - N, - {N, Msg, 256, false, SeqId, Remaining} = + {Message, _TSize, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + ok = rdq_match_message(Message, N, Msg, 256), SeqId end || N <- lists:seq(1,Half)], io:format("Deliver first half done~n", []), @@ -878,15 +894,17 @@ rdq_test_redeliver() -> %% every-other-from-the-first-half Seqs2 = [begin Remaining = round(Total - N + (Half/2)), - {N, Msg, 256, false, SeqId, Remaining} = + {Message, _TSize, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + ok = rdq_match_message(Message, N, Msg, 256), SeqId end || N <- lists:seq(1+Half, Total)], rabbit_disk_queue:tx_commit(q, [], Seqs2), Seqs3 = [begin Remaining = round((Half - N) / 2) - 1, - {N, Msg, 256, true, SeqId, Remaining} = + {Message, _TSize, true, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + ok = rdq_match_message(Message, N, Msg, 256), SeqId end || N <- lists:seq(1, Half, 2)], rabbit_disk_queue:tx_commit(q, [], Seqs3), @@ -901,14 +919,15 @@ rdq_test_purge() -> Total = 1000, Half = round(Total/2), All = lists:seq(1,Total), - [rabbit_disk_queue:tx_publish(N, Msg) || N <- All], + [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- All], rabbit_disk_queue:tx_commit(q, All, []), io:format("Publish done~n", []), %% deliver first half Seqs = [begin Remaining = Total - N, - {N, Msg, 256, false, SeqId, Remaining} = + {Message, _TSize, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q), + ok = rdq_match_message(Message, N, Msg, 256), SeqId end || N <- lists:seq(1,Half)], io:format("Deliver first half done~n", []), @@ -926,10 +945,13 @@ rdq_test_dump_queue() -> Msg = <<0:(8*256)>>, Total = 1000, All = lists:seq(1,Total), - [rabbit_disk_queue:tx_publish(N, Msg) || N <- All], + [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- All], rabbit_disk_queue:tx_commit(q, All, []), io:format("Publish done~n", []), - QList = [{N, Msg, 256, false, {N, (N-1)}, (N-1)} || N <- All], + QList = [begin Message = rdq_message(N, Msg), + Size = size(term_to_binary(Message)), + {Message, Size, false, {N, (N-1)}, (N-1)} + end || N <- All], QList = rabbit_disk_queue:dump_queue(q), rdq_stop(), io:format("dump ok undelivered~n", []), @@ -937,14 +959,18 @@ rdq_test_dump_queue() -> lists:foreach( fun (N) -> Remaining = Total - N, - {N, Msg, 256, false, _SeqId, Remaining} = - rabbit_disk_queue:deliver(q) + {Message, _TSize, false, _SeqId, Remaining} = + rabbit_disk_queue:deliver(q), + ok = rdq_match_message(Message, N, Msg, 256) end, All), [] = rabbit_disk_queue:dump_queue(q), rdq_stop(), io:format("dump ok post delivery~n", []), rdq_start(), - QList2 = [{N, Msg, 256, true, {N, (N-1)}, (N-1)} || N <- All], + QList2 = [begin Message = rdq_message(N, Msg), + Size = size(term_to_binary(Message)), + {Message, Size, true, {N, (N-1)}, (N-1)} + end || N <- All], QList2 = rabbit_disk_queue:dump_queue(q), io:format("dump ok post delivery + restart~n", []), rdq_stop(), |
