summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-19 15:35:44 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-19 15:35:44 +0100
commit656cbf75101146cac83dffee1ff708bc39d1498d (patch)
tree486faaabe135e613317adb78a8ce9ebc9db023ff
parent80cd8f686e4efc52c07703015ecd1f4c781dcbab (diff)
downloadrabbitmq-server-git-656cbf75101146cac83dffee1ff708bc39d1498d.tar.gz
Altered API so that the disk_queue understands about #basic_message. This means that the mixed_queue avoids unnecessary term_to_binary calls. Tests adjusted and whole test suite still passes
-rw-r--r--src/rabbit_basic.erl9
-rw-r--r--src/rabbit_disk_queue.erl67
-rw-r--r--src/rabbit_mixed_queue.erl46
-rw-r--r--src/rabbit_tests.erl66
4 files changed, 109 insertions, 79 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 0673bdd8d2..f9a8f488af 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/1, message/4, delivery/4]).
+-export([publish/1, message/4, message/5, delivery/4]).
%%----------------------------------------------------------------------------
@@ -44,6 +44,8 @@
-spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()).
-spec(message/4 :: (exchange_name(), routing_key(), binary(), binary()) ->
message()).
+-spec(message/5 :: (exchange_name(), routing_key(), binary(), binary(), guid()) ->
+ message()).
-endif.
@@ -64,6 +66,9 @@ delivery(Mandatory, Immediate, Txn, Message) ->
sender = self(), message = Message}.
message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) ->
+ message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, rabbit_guid:guid()).
+
+message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, MsgId) ->
{ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
Content = #content{class_id = ClassId,
properties = #'P_basic'{content_type = ContentTypeBin},
@@ -72,5 +77,5 @@ message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) ->
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKeyBin,
content = Content,
- guid = rabbit_guid:guid(),
+ guid = MsgId,
is_persistent = false}.
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 3370ef840d..b133f538ed 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -38,8 +38,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([publish/4, deliver/1, phantom_deliver/1, ack/2,
- tx_publish/2, tx_commit/3, tx_cancel/1,
+-export([publish/3, deliver/1, phantom_deliver/1, ack/2,
+ tx_publish/1, tx_commit/3, tx_cancel/1,
requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1,
dump_queue/1, delete_non_durable_queues/1, auto_ack_next_message/1
]).
@@ -235,21 +235,22 @@
-spec(start_link/0 :: () ->
({'ok', pid()} | 'ignore' | {'error', any()})).
--spec(publish/4 :: (queue_name(), msg_id(), binary(), bool()) -> 'ok').
+-spec(publish/3 :: (queue_name(), message(), bool()) -> 'ok').
-spec(deliver/1 :: (queue_name()) ->
- ('empty' | {msg_id(), binary(), non_neg_integer(),
+ ('empty' | {message(), non_neg_integer(),
bool(), {msg_id(), seq_id()}, non_neg_integer()})).
-spec(phantom_deliver/1 :: (queue_name()) ->
( 'empty' | {msg_id(), bool(), {msg_id(), seq_id()},
non_neg_integer()})).
-spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
--spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok').
+-spec(tx_publish/1 :: (message()) -> 'ok').
-spec(tx_commit/3 :: (queue_name(), [msg_id()], [{msg_id(), seq_id()}]) ->
'ok').
-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok').
-spec(requeue/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
--spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()},
- seq_id_or_next()}]) -> 'ok').
+-spec(requeue_with_seqs/2 ::
+ (queue_name(),
+ [{{msg_id(), seq_id()}, {seq_id_or_next(), bool()}}]) -> 'ok').
-spec(purge/1 :: (queue_name()) -> non_neg_integer()).
-spec(dump_queue/1 :: (queue_name()) ->
[{msg_id(), binary(), non_neg_integer(), bool(),
@@ -269,10 +270,10 @@ start_link() ->
gen_server2:start_link({local, ?SERVER}, ?MODULE,
[?FILE_SIZE_LIMIT, ?MAX_READ_FILE_HANDLES], []).
-publish(Q, MsgId, Msg, false) when is_binary(Msg) ->
- gen_server2:cast(?SERVER, {publish, Q, MsgId, Msg});
-publish(Q, MsgId, Msg, true) when is_binary(Msg) ->
- gen_server2:call(?SERVER, {publish, Q, MsgId, Msg}, infinity).
+publish(Q, Message = #basic_message {}, false) ->
+ gen_server2:cast(?SERVER, {publish, Q, Message});
+publish(Q, Message = #basic_message {}, true) ->
+ gen_server2:call(?SERVER, {publish, Q, Message}, infinity).
deliver(Q) ->
gen_server2:call(?SERVER, {deliver, Q}, infinity).
@@ -286,8 +287,8 @@ ack(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
auto_ack_next_message(Q) ->
gen_server2:cast(?SERVER, {auto_ack_next_message, Q}).
-tx_publish(MsgId, Msg) when is_binary(Msg) ->
- gen_server2:cast(?SERVER, {tx_publish, MsgId, Msg}).
+tx_publish(Message = #basic_message {}) ->
+ gen_server2:cast(?SERVER, {tx_publish, Message}).
tx_commit(Q, PubMsgIds, AckSeqIds)
when is_list(PubMsgIds) andalso is_list(AckSeqIds) ->
@@ -403,9 +404,9 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
end,
{ok, State1 #dqstate { current_file_handle = FileHdl }}.
-handle_call({publish, Q, MsgId, MsgBody}, _From, State) ->
+handle_call({publish, Q, Message}, _From, State) ->
{ok, MsgSeqId, State1} =
- internal_publish(Q, MsgId, next, MsgBody, true, State),
+ internal_publish(Q, Message, next, true, State),
{reply, MsgSeqId, State1};
handle_call({deliver, Q}, _From, State) ->
{ok, Result, State1} = internal_deliver(Q, true, false, State),
@@ -470,9 +471,9 @@ handle_call({delete_non_durable_queues, DurableQueues}, _From, State) ->
{ok, State1} = internal_delete_non_durable_queues(DurableQueues, State),
{reply, ok, State1}.
-handle_cast({publish, Q, MsgId, MsgBody}, State) ->
+handle_cast({publish, Q, Message}, State) ->
{ok, _MsgSeqId, State1} =
- internal_publish(Q, MsgId, next, MsgBody, false, State),
+ internal_publish(Q, Message, next, false, State),
{noreply, State1};
handle_cast({ack, Q, MsgSeqIds}, State) ->
{ok, State1} = internal_ack(Q, MsgSeqIds, State),
@@ -480,8 +481,8 @@ handle_cast({ack, Q, MsgSeqIds}, State) ->
handle_cast({auto_ack_next_message, Q}, State) ->
{ok, State1} = internal_auto_ack(Q, State),
{noreply, State1};
-handle_cast({tx_publish, MsgId, MsgBody}, State) ->
- {ok, State1} = internal_tx_publish(MsgId, MsgBody, State),
+handle_cast({tx_publish, Message = #basic_message { guid = MsgId }}, State) ->
+ {ok, State1} = internal_tx_publish(MsgId, Message, State),
{noreply, State1};
handle_cast({tx_cancel, MsgIds}, State) ->
{ok, State1} = internal_tx_cancel(MsgIds, State),
@@ -676,6 +677,13 @@ sequence_lookup(Sequences, Q) ->
{ReadSeqId, WriteSeqId, Length}
end.
+msg_to_bin(Msg = #basic_message { content = Content }) ->
+ ClearedContent = rabbit_binary_parser:clear_decoded_content(Content),
+ term_to_binary(Msg #basic_message { content = ClearedContent }).
+
+bin_to_msg(MsgBin) ->
+ binary_to_term(MsgBin).
+
%% ---- INTERNAL RAW FUNCTIONS ----
internal_deliver(Q, ReadMsg, FakeDeliver,
@@ -694,8 +702,8 @@ internal_deliver(Q, ReadMsg, FakeDeliver,
case Result of
{MsgId, Delivered, {MsgId, ReadSeqId}} ->
{MsgId, Delivered, {MsgId, ReadSeqId}, Remaining};
- {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}} ->
- {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId},
+ {Message, BodySize, Delivered, {MsgId, ReadSeqId}} ->
+ {Message, BodySize, Delivered, {MsgId, ReadSeqId},
Remaining}
end, State1}
end.
@@ -718,7 +726,8 @@ internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) ->
{FileHdl, State1} = get_read_handle(File, State),
{ok, {MsgBody, BodySize}} =
read_message_at_offset(FileHdl, Offset, TotalSize),
- {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}},
+ Message = bin_to_msg(MsgBody),
+ {ok, {Message, BodySize, Delivered, {MsgId, ReadSeqId}},
NextReadSeqId, State1};
false ->
{ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, NextReadSeqId, State}
@@ -783,7 +792,7 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete,
State1 = compact(Files, State),
{ok, State1}.
-internal_tx_publish(MsgId, MsgBody,
+internal_tx_publish(MsgId, Message,
State = #dqstate { current_file_handle = CurHdl,
current_file_name = CurName,
current_offset = CurOffset,
@@ -792,7 +801,8 @@ internal_tx_publish(MsgId, MsgBody,
case dets_ets_lookup(State, MsgId) of
[] ->
%% New message, lots to do
- {ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody),
+ {ok, TotalSize} =
+ append_message(CurHdl, MsgId, msg_to_bin(Message)),
true = dets_ets_insert_new(State, {MsgId, 1, CurName,
CurOffset, TotalSize}),
[{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] =
@@ -882,9 +892,10 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
{ok, State1 #dqstate { current_dirty = IsDirty1 }}.
%% SeqId can be 'next'
-internal_publish(Q, MsgId, SeqId, MsgBody, IsDelivered, State) ->
+internal_publish(Q, Message = #basic_message { guid = MsgId }, SeqId,
+ IsDelivered, State) ->
{ok, State1 = #dqstate { sequences = Sequences }} =
- internal_tx_publish(MsgId, MsgBody, State),
+ internal_tx_publish(MsgId, Message, State),
{ReadSeqId, WriteSeqId, Length} =
sequence_lookup(Sequences, Q),
ReadSeqId3 = determine_next_read_id(ReadSeqId, WriteSeqId, SeqId),
@@ -1023,12 +1034,12 @@ internal_dump_queue(Q, State = #dqstate { sequences = Sequences }) ->
fun ({SeqId, _State1}) when SeqId == WriteSeq ->
false;
({SeqId, State1}) ->
- {ok, {MsgId, Msg, Size, Delivered, {MsgId, SeqId}},
+ {ok, {Message, Size, Delivered, {MsgId, SeqId}},
NextReadSeqId, State2} =
internal_read_message(Q, SeqId, true, true,
State1),
{true,
- {MsgId, Msg, Size, Delivered, {MsgId, SeqId}, SeqId},
+ {Message, Size, Delivered, {MsgId, SeqId}, SeqId},
{NextReadSeqId, State2}}
end, {ReadSeq, State}),
{lists:reverse(QList), State3}
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index e7ac171c6a..9b99ab7f90 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -117,7 +117,7 @@ to_disk_only_mode(State =
Q, lists:reverse(RQueueAcc))
end,
ok = rabbit_disk_queue:publish(
- Q, MsgId, msg_to_bin(Msg), false),
+ Q, Msg, false),
[]
end
end, [], Msgs),
@@ -136,9 +136,8 @@ to_mixed_mode(State = #mqstate { mode = disk, queue = Q, length = Length }) ->
QList = rabbit_disk_queue:dump_queue(Q),
{MsgBuf1, Length} =
lists:foldl(
- fun ({MsgId, MsgBin, _Size, IsDelivered, _AckTag, _SeqId},
+ fun ({Msg, _Size, IsDelivered, _AckTag, _SeqId},
{Buf, L}) ->
- Msg = #basic_message { guid = MsgId } = bin_to_msg(MsgBin),
{queue:in({Msg, IsDelivered, true}, Buf), L+1}
end, {queue:new(), 0}, QList),
{ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1 }}.
@@ -162,9 +161,8 @@ purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q,
deliver_all_messages(Q, IsDurable, Acks, Requeue, Length) ->
case rabbit_disk_queue:deliver(Q) of
empty -> {Acks, Requeue, Length};
- {MsgId, MsgBin, _Size, IsDelivered, AckTag, _Remaining} ->
- #basic_message { guid = MsgId, is_persistent = IsPersistent } =
- bin_to_msg(MsgBin),
+ {#basic_message { is_persistent = IsPersistent },
+ _Size, IsDelivered, AckTag, _Remaining} ->
OnDisk = IsPersistent andalso IsDurable,
{Acks1, Requeue1, Length1} =
if OnDisk -> {Acks,
@@ -176,23 +174,15 @@ deliver_all_messages(Q, IsDurable, Acks, Requeue, Length) ->
deliver_all_messages(Q, IsDurable, Acks1, Requeue1, Length1)
end.
-msg_to_bin(Msg = #basic_message { content = Content }) ->
- ClearedContent = rabbit_binary_parser:clear_decoded_content(Content),
- term_to_binary(Msg #basic_message { content = ClearedContent }).
-
-bin_to_msg(MsgBin) ->
- binary_to_term(MsgBin).
-
-publish(Msg = #basic_message { guid = MsgId },
- State = #mqstate { mode = disk, queue = Q, length = Length }) ->
- ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false),
+publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length }) ->
+ ok = rabbit_disk_queue:publish(Q, Msg, false),
{ok, State #mqstate { length = Length + 1 }};
-publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
+publish(Msg = #basic_message { is_persistent = IsPersistent },
State = #mqstate { queue = Q, mode = mixed, is_durable = IsDurable,
msg_buf = MsgBuf, length = Length }) ->
OnDisk = IsDurable andalso IsPersistent,
ok = if OnDisk ->
- rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false);
+ rabbit_disk_queue:publish(Q, Msg, false);
true -> ok
end,
{ok, State #mqstate { msg_buf = queue:in({Msg, false, OnDisk}, MsgBuf),
@@ -205,7 +195,7 @@ publish_delivered(Msg =
State = #mqstate { mode = Mode, is_durable = IsDurable,
queue = Q, length = 0 })
when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
- rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false),
+ rabbit_disk_queue:publish(Q, Msg, false),
if IsDurable andalso IsPersistent ->
%% must call phantom_deliver otherwise the msg remains at
%% the head of the queue. This is synchronous, but
@@ -225,10 +215,9 @@ deliver(State = #mqstate { length = 0 }) ->
{empty, State};
deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable,
length = Length }) ->
- {MsgId, MsgBin, _Size, IsDelivered, AckTag, Remaining}
+ {Msg = #basic_message { is_persistent = IsPersistent },
+ _Size, IsDelivered, AckTag, Remaining}
= rabbit_disk_queue:deliver(Q),
- #basic_message { guid = MsgId, is_persistent = IsPersistent } =
- Msg = bin_to_msg(MsgBin),
AckTag1 = if IsPersistent andalso IsDurable -> AckTag;
true -> ok = rabbit_disk_queue:ack(Q, [AckTag]),
noack
@@ -268,14 +257,13 @@ ack(Acks, State = #mqstate { queue = Q }) ->
{ok, State}
end.
-tx_publish(Msg = #basic_message { guid = MsgId },
- State = #mqstate { mode = disk }) ->
- ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)),
+tx_publish(Msg, State = #mqstate { mode = disk }) ->
+ ok = rabbit_disk_queue:tx_publish(Msg),
{ok, State};
-tx_publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
+tx_publish(Msg = #basic_message { is_persistent = IsPersistent },
State = #mqstate { mode = mixed, is_durable = IsDurable })
when IsDurable andalso IsPersistent ->
- ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)),
+ ok = rabbit_disk_queue:tx_publish(Msg),
{ok, State};
tx_publish(_Msg, State = #mqstate { mode = mixed }) ->
%% this message will reappear in the tx_commit, so ignore for now
@@ -352,13 +340,13 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q,
fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, RQ)
when IsPersistent andalso IsDurable ->
[AckTag | RQ];
- ({Msg = #basic_message { guid = MsgId }, _AckTag}, RQ) ->
+ ({Msg, _AckTag}, RQ) ->
ok = if RQ == [] -> ok;
true -> rabbit_disk_queue:requeue(
Q, lists:reverse(RQ))
end,
_AckTag1 = rabbit_disk_queue:publish(
- Q, MsgId, msg_to_bin(Msg), true),
+ Q, Msg, true),
[]
end, [], MessagesWithAckTags),
ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index f45a36bb48..bddb451a17 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -716,6 +716,15 @@ benchmark_disk_queue() ->
ok = control_action(start_app, []),
passed.
+rdq_message(MsgId, MsgBody) ->
+ rabbit_basic:message(x, <<>>, <<>>, MsgBody, MsgId).
+
+rdq_match_message(
+ #basic_message { guid = MsgId, content =
+ #content { payload_fragments_rev = [MsgBody] }},
+ MsgId, MsgBody, Size) when size(MsgBody) =:= Size ->
+ ok.
+
rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
Startup = rdq_virgin(),
rdq_start(),
@@ -724,7 +733,7 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
List = lists:seq(1, MsgCount),
{Publish, ok} =
timer:tc(?MODULE, rdq_time_commands,
- [[fun() -> [rabbit_disk_queue:tx_publish(N, Msg)
+ [[fun() -> [rabbit_disk_queue:tx_publish(rdq_message(N, Msg))
|| N <- List, _ <- Qs] end,
fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List, [])
|| Q <- Qs] end
@@ -735,8 +744,9 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
[[fun() -> [begin SeqIds =
[begin
Remaining = MsgCount - N,
- {N, Msg, MsgSizeBytes, false, SeqId,
+ {Message, _TSize, false, SeqId,
Remaining} = rabbit_disk_queue:deliver(Q),
+ ok = rdq_match_message(Message, N, Msg, MsgSizeBytes),
SeqId
end || N <- List],
ok = rabbit_disk_queue:tx_commit(Q, [], SeqIds)
@@ -759,7 +769,7 @@ rdq_stress_gc(MsgCount) ->
MsgSizeBytes = 256*1024,
Msg = <<0:(8*MsgSizeBytes)>>, % 256KB
List = lists:seq(1, MsgCount),
- [rabbit_disk_queue:tx_publish(N, Msg) || N <- List],
+ [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- List],
rabbit_disk_queue:tx_commit(q, List, []),
StartChunk = round(MsgCount / 20), % 5%
AckList =
@@ -780,8 +790,9 @@ rdq_stress_gc(MsgCount) ->
lists:foldl(
fun (MsgId, Acc) ->
Remaining = MsgCount - MsgId,
- {MsgId, Msg, MsgSizeBytes, false, SeqId, Remaining} =
+ {Message, _TSize, false, SeqId, Remaining} =
rabbit_disk_queue:deliver(q),
+ ok = rdq_match_message(Message, MsgId, Msg, MsgSizeBytes),
dict:store(MsgId, SeqId, Acc)
end, dict:new(), List),
%% we really do want to ack each of this individually
@@ -800,14 +811,16 @@ rdq_test_startup_with_queue_gaps() ->
Total = 1000,
Half = round(Total/2),
All = lists:seq(1,Total),
- [rabbit_disk_queue:tx_publish(N, Msg) || N <- All],
+ [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- All],
rabbit_disk_queue:tx_commit(q, All, []),
io:format("Publish done~n", []),
%% deliver first half
Seqs = [begin
Remaining = Total - N,
- {N, Msg, 256, false, SeqId, Remaining} =
- rabbit_disk_queue:deliver(q), SeqId
+ {Message, _TSize, false, SeqId, Remaining} =
+ rabbit_disk_queue:deliver(q),
+ ok = rdq_match_message(Message, N, Msg, 256),
+ SeqId
end || N <- lists:seq(1,Half)],
io:format("Deliver first half done~n", []),
%% ack every other message we have delivered (starting at the _first_)
@@ -826,8 +839,9 @@ rdq_test_startup_with_queue_gaps() ->
%% lists:seq(2,500,2) already delivered
Seqs2 = [begin
Remaining = round(Total - ((Half + N)/2)),
- {N, Msg, 256, true, SeqId, Remaining} =
+ {Message, _TSize, true, SeqId, Remaining} =
rabbit_disk_queue:deliver(q),
+ ok = rdq_match_message(Message, N, Msg, 256),
SeqId
end || N <- lists:seq(2,Half,2)],
rabbit_disk_queue:tx_commit(q, [], Seqs2),
@@ -835,8 +849,9 @@ rdq_test_startup_with_queue_gaps() ->
%% and now fetch the rest
Seqs3 = [begin
Remaining = Total - N,
- {N, Msg, 256, false, SeqId, Remaining} =
+ {Message, _TSize, false, SeqId, Remaining} =
rabbit_disk_queue:deliver(q),
+ ok = rdq_match_message(Message, N, Msg, 256),
SeqId
end || N <- lists:seq(1 + Half,Total)],
rabbit_disk_queue:tx_commit(q, [], Seqs3),
@@ -852,14 +867,15 @@ rdq_test_redeliver() ->
Total = 1000,
Half = round(Total/2),
All = lists:seq(1,Total),
- [rabbit_disk_queue:tx_publish(N, Msg) || N <- All],
+ [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- All],
rabbit_disk_queue:tx_commit(q, All, []),
io:format("Publish done~n", []),
%% deliver first half
Seqs = [begin
Remaining = Total - N,
- {N, Msg, 256, false, SeqId, Remaining} =
+ {Message, _TSize, false, SeqId, Remaining} =
rabbit_disk_queue:deliver(q),
+ ok = rdq_match_message(Message, N, Msg, 256),
SeqId
end || N <- lists:seq(1,Half)],
io:format("Deliver first half done~n", []),
@@ -878,15 +894,17 @@ rdq_test_redeliver() ->
%% every-other-from-the-first-half
Seqs2 = [begin
Remaining = round(Total - N + (Half/2)),
- {N, Msg, 256, false, SeqId, Remaining} =
+ {Message, _TSize, false, SeqId, Remaining} =
rabbit_disk_queue:deliver(q),
+ ok = rdq_match_message(Message, N, Msg, 256),
SeqId
end || N <- lists:seq(1+Half, Total)],
rabbit_disk_queue:tx_commit(q, [], Seqs2),
Seqs3 = [begin
Remaining = round((Half - N) / 2) - 1,
- {N, Msg, 256, true, SeqId, Remaining} =
+ {Message, _TSize, true, SeqId, Remaining} =
rabbit_disk_queue:deliver(q),
+ ok = rdq_match_message(Message, N, Msg, 256),
SeqId
end || N <- lists:seq(1, Half, 2)],
rabbit_disk_queue:tx_commit(q, [], Seqs3),
@@ -901,14 +919,15 @@ rdq_test_purge() ->
Total = 1000,
Half = round(Total/2),
All = lists:seq(1,Total),
- [rabbit_disk_queue:tx_publish(N, Msg) || N <- All],
+ [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- All],
rabbit_disk_queue:tx_commit(q, All, []),
io:format("Publish done~n", []),
%% deliver first half
Seqs = [begin
Remaining = Total - N,
- {N, Msg, 256, false, SeqId, Remaining} =
+ {Message, _TSize, false, SeqId, Remaining} =
rabbit_disk_queue:deliver(q),
+ ok = rdq_match_message(Message, N, Msg, 256),
SeqId
end || N <- lists:seq(1,Half)],
io:format("Deliver first half done~n", []),
@@ -926,10 +945,13 @@ rdq_test_dump_queue() ->
Msg = <<0:(8*256)>>,
Total = 1000,
All = lists:seq(1,Total),
- [rabbit_disk_queue:tx_publish(N, Msg) || N <- All],
+ [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- All],
rabbit_disk_queue:tx_commit(q, All, []),
io:format("Publish done~n", []),
- QList = [{N, Msg, 256, false, {N, (N-1)}, (N-1)} || N <- All],
+ QList = [begin Message = rdq_message(N, Msg),
+ Size = size(term_to_binary(Message)),
+ {Message, Size, false, {N, (N-1)}, (N-1)}
+ end || N <- All],
QList = rabbit_disk_queue:dump_queue(q),
rdq_stop(),
io:format("dump ok undelivered~n", []),
@@ -937,14 +959,18 @@ rdq_test_dump_queue() ->
lists:foreach(
fun (N) ->
Remaining = Total - N,
- {N, Msg, 256, false, _SeqId, Remaining} =
- rabbit_disk_queue:deliver(q)
+ {Message, _TSize, false, _SeqId, Remaining} =
+ rabbit_disk_queue:deliver(q),
+ ok = rdq_match_message(Message, N, Msg, 256)
end, All),
[] = rabbit_disk_queue:dump_queue(q),
rdq_stop(),
io:format("dump ok post delivery~n", []),
rdq_start(),
- QList2 = [{N, Msg, 256, true, {N, (N-1)}, (N-1)} || N <- All],
+ QList2 = [begin Message = rdq_message(N, Msg),
+ Size = size(term_to_binary(Message)),
+ {Message, Size, true, {N, (N-1)}, (N-1)}
+ end || N <- All],
QList2 = rabbit_disk_queue:dump_queue(q),
io:format("dump ok post delivery + restart~n", []),
rdq_stop(),