summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-17 15:12:04 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-17 15:12:04 +0100
commit3e16b75ac21e8dabd5c8bcfe4ac11cd67a4f19c7 (patch)
tree044bb3c52465f7f11b9e424ff7a9b56d22dfadee
parentb1f86bacbee9d1d3a61dd8f03d7ea3b47484ed52 (diff)
downloadrabbitmq-server-git-3e16b75ac21e8dabd5c8bcfe4ac11cd67a4f19c7.tar.gz
part 2 done. The mixed_queue is back to using only one queue. Start up time isn't too bad with big queues, and memory use is stable. In disk_queue, when iterating through the mnesia table, do the normal limited batching for removal of non-persistent messages.
-rw-r--r--src/rabbit_disk_queue.erl57
-rw-r--r--src/rabbit_mixed_queue.erl189
-rw-r--r--src/rabbit_tests.erl20
3 files changed, 114 insertions, 152 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 4eef884f72..95ed8adf78 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -1560,25 +1560,46 @@ load_from_disk(State) ->
State1 = load_messages(undefined, Files, State),
%% Finally, check there is nothing in mnesia which we haven't
%% loaded
- {atomic, true} = mnesia:transaction(
- fun() ->
- ok = mnesia:read_lock_table(rabbit_disk_queue),
- mnesia:foldl(
- fun (#dq_msg_loc { msg_id = MsgId,
- queue_and_seq_id = {Q, SeqId} },
- true) ->
- case erlang:length
- (dets_ets_lookup(State1, MsgId)) of
- 0 -> ok == mnesia:delete(rabbit_disk_queue,
- {Q, SeqId}, write);
- 1 -> true
- end
- end,
- true, rabbit_disk_queue)
- end),
- State2 = extract_sequence_numbers(State1),
+ {atomic, State2} =
+ mnesia:transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ {State6, FinalQ, MsgSeqIds2, _Len} =
+ mnesia:foldl(
+ fun (#dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = {Q, SeqId} },
+ {State3, OldQ, MsgSeqIds, Len}) ->
+ {State4, MsgSeqIds1, Len1} =
+ case {OldQ == Q, MsgSeqIds} of
+ {true, _} when Len < 10000 ->
+ {State3, MsgSeqIds, Len};
+ {false, []} -> {State3, MsgSeqIds, Len};
+ {_, _} ->
+ {ok, State5} =
+ remove_messages(Q, MsgSeqIds,
+ txn, State3),
+ {State5, [], 0}
+ end,
+ case dets_ets_lookup(State4, MsgId) of
+ [] -> ok = mnesia:delete(rabbit_disk_queue,
+ {Q, SeqId}, write),
+ {State4, Q, MsgSeqIds1, Len1};
+ [{MsgId, _RefCount, _File, _Offset,
+ _TotalSize, true}] ->
+ {State4, Q, MsgSeqIds1, Len1};
+ [{MsgId, _RefCount, _File, _Offset,
+ _TotalSize, false}] ->
+ {State4, Q,
+ [{MsgId, SeqId} | MsgSeqIds1], Len1+1}
+ end
+ end, {State1, undefined, [], 0}, rabbit_disk_queue),
+ {ok, State7} =
+ remove_messages(FinalQ, MsgSeqIds2, txn, State6),
+ State7
+ end),
+ State8 = extract_sequence_numbers(State2),
ok = del_index(),
- {ok, State2}.
+ {ok, State8}.
extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
{atomic, true} = mnesia:transaction(
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 425d776377..3b86596b6a 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -101,10 +101,10 @@
init(Queue, IsDurable) ->
Len = rabbit_disk_queue:length(Queue),
- ok = rabbit_disk_queue:delete_queue(transient_queue(Queue)),
MsgBuf = inc_queue_length(Queue, queue:new(), Len),
Size = rabbit_disk_queue:foldl(
- fun ({Msg, _Size, _IsDelivered, _AckTag}, Acc) ->
+ fun ({Msg = #basic_message { is_persistent = true },
+ _Size, _IsDelivered, _AckTag}, Acc) ->
Acc + size_of_message(Msg)
end, 0, Queue),
{ok, #mqstate { mode = disk, msg_buf = MsgBuf, queue = Queue,
@@ -130,10 +130,8 @@ to_disk_only_mode(TxnMessages, State =
%% message on disk.
%% Note we also batch together messages on disk so that we minimise
%% the calls to requeue.
- TransQ = transient_queue(Q),
{ok, MsgBuf1} =
- send_messages_to_disk(IsDurable, Q, TransQ, MsgBuf, 0, 0, [],
- queue:new()),
+ send_messages_to_disk(IsDurable, Q, MsgBuf, 0, 0, [], queue:new()),
%% tx_publish txn messages. Some of these will have been already
%% published if they really are durable and persistent which is
%% why we can't just use our own tx_publish/2 function (would end
@@ -148,47 +146,42 @@ to_disk_only_mode(TxnMessages, State =
garbage_collect(),
{ok, State #mqstate { mode = disk, msg_buf = MsgBuf1 }}.
-send_messages_to_disk(IsDurable, Q, TransQ, Queue, PublishCount, RequeueCount,
+send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount,
Commit, MsgBuf) ->
case queue:out(Queue) of
{empty, Queue} ->
- ok = flush_messages_to_disk_queue(TransQ, Commit),
- [] = flush_requeue_to_disk_queue(TransQ, RequeueCount, []),
+ ok = flush_messages_to_disk_queue(Q, Commit),
+ [] = flush_requeue_to_disk_queue(Q, RequeueCount, []),
{ok, MsgBuf};
{{value, {Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
_IsDelivered}}, Queue1} ->
case IsDurable andalso IsPersistent of
- true -> %% it's already in the persistent Q
+ true -> %% it's already in the Q
send_messages_to_disk(
- IsDurable, Q, TransQ, Queue1, PublishCount, RequeueCount,
+ IsDurable, Q, Queue1, PublishCount, RequeueCount,
Commit, inc_queue_length(Q, MsgBuf, 1));
false ->
Commit1 = flush_requeue_to_disk_queue
- (TransQ, RequeueCount, Commit),
+ (Q, RequeueCount, Commit),
ok = rabbit_disk_queue:tx_publish(Msg),
case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of
true ->
- ok = flush_messages_to_disk_queue(TransQ, Commit1),
+ ok = flush_messages_to_disk_queue(Q, Commit1),
send_messages_to_disk(
- IsDurable, Q, TransQ, Queue1, 1, 0, [MsgId],
- inc_queue_length(TransQ, MsgBuf, 1));
+ IsDurable, Q, Queue1, 1, 0, [MsgId],
+ inc_queue_length(Q, MsgBuf, 1));
false ->
send_messages_to_disk(
- IsDurable, Q, TransQ, Queue1, PublishCount + 1, 0,
+ IsDurable, Q, Queue1, PublishCount + 1, 0,
[MsgId | Commit1],
- inc_queue_length(TransQ, MsgBuf, 1))
+ inc_queue_length(Q, MsgBuf, 1))
end
end;
{{value, {Q, Count}}, Queue1} ->
- send_messages_to_disk(IsDurable, Q, TransQ, Queue1, PublishCount,
+ send_messages_to_disk(IsDurable, Q, Queue1, PublishCount,
RequeueCount, Commit,
- inc_queue_length(Q, MsgBuf, Count));
- {{value, {TransQ, Count}}, Queue1} ->
- ok = flush_messages_to_disk_queue(TransQ, Commit),
- send_messages_to_disk(IsDurable, Q, TransQ, Queue1, 0,
- RequeueCount + Count, [],
- inc_queue_length(TransQ, MsgBuf, Count))
+ inc_queue_length(Q, MsgBuf, Count))
end.
flush_messages_to_disk_queue(Q, Commit) ->
@@ -235,9 +228,6 @@ to_mixed_mode(TxnMessages, State =
garbage_collect(),
{ok, State #mqstate { mode = mixed }}.
-transient_queue(Queue) ->
- {Queue, transient}.
-
inc_queue_length(_Queue, MsgBuf, 0) ->
MsgBuf;
inc_queue_length(Queue, MsgBuf, Count) ->
@@ -259,17 +249,11 @@ dec_queue_length(Mode, MsgBuf) ->
end,
{Queue, MsgBuf2}.
-publish(Msg = #basic_message { is_persistent = IsPersistent },
- State = #mqstate { mode = disk, queue = Q, length = Length,
- is_durable = IsDurable, msg_buf = MsgBuf,
- memory_size = QSize, memory_gain = Gain }) ->
- Persist = IsDurable andalso IsPersistent,
- PubQ = case Persist of
- true -> Q;
- false -> transient_queue(Q)
- end,
- MsgBuf1 = inc_queue_length(PubQ, MsgBuf, 1),
- ok = rabbit_disk_queue:publish(PubQ, Msg, false),
+publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length,
+ msg_buf = MsgBuf, memory_size = QSize,
+ memory_gain = Gain }) ->
+ MsgBuf1 = inc_queue_length(Q, MsgBuf, 1),
+ ok = rabbit_disk_queue:publish(Q, Msg, false),
MsgSize = size_of_message(Msg),
{ok, State #mqstate { memory_gain = Gain + MsgSize,
memory_size = QSize + MsgSize,
@@ -298,11 +282,7 @@ publish_delivered(Msg =
memory_size = QSize, memory_gain = Gain })
when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
Persist = IsDurable andalso IsPersistent,
- PubQ = case Persist of
- true -> Q;
- false -> transient_queue(Q)
- end,
- rabbit_disk_queue:publish(PubQ, Msg, false),
+ rabbit_disk_queue:publish(Q, Msg, false),
MsgSize = size_of_message(Msg),
State1 = State #mqstate { memory_size = QSize + MsgSize,
memory_gain = Gain + MsgSize },
@@ -312,12 +292,12 @@ publish_delivered(Msg =
%% the head of the queue. This is synchronous, but
%% unavoidable as we need the AckTag
{MsgId, IsPersistent, false, AckTag, 0} =
- rabbit_disk_queue:phantom_deliver(PubQ),
+ rabbit_disk_queue:phantom_deliver(Q),
{ok, AckTag, State1};
false ->
%% in this case, we don't actually care about the ack, so
%% auto ack it (asynchronously).
- ok = rabbit_disk_queue:auto_ack_next_message(PubQ),
+ ok = rabbit_disk_queue:auto_ack_next_message(Q),
{ok, noack, State1}
end;
publish_delivered(Msg, State =
@@ -350,16 +330,16 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q,
ok = maybe_prefetch(Mode, MsgBuf1),
{Msg1, IsDelivered1, AckTag1, MsgBuf1};
_ ->
- {ReadQ, MsgBuf3} = dec_queue_length(Mode, MsgBuf),
+ {Q, MsgBuf3} = dec_queue_length(Mode, MsgBuf),
{Msg1 = #basic_message { is_persistent = IsPersistent },
_Size, IsDelivered1, AckTag1, _PersistRem}
- = rabbit_disk_queue:deliver(ReadQ),
+ = rabbit_disk_queue:deliver(Q),
AckTag2 =
case IsDurable andalso IsPersistent of
true ->
AckTag1;
false ->
- ok = rabbit_disk_queue:ack(ReadQ, [AckTag1]),
+ ok = rabbit_disk_queue:ack(Q, [AckTag1]),
noack
end,
{Msg1, IsDelivered1, AckTag2, MsgBuf3}
@@ -368,7 +348,7 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q,
{{Msg, IsDelivered, AckTag, Rem},
State #mqstate { msg_buf = MsgBuf2, length = Rem }}.
-maybe_prefetch(disk, MsgBuf) ->
+maybe_prefetch(disk, _MsgBuf) ->
ok;
maybe_prefetch(mixed, MsgBuf) ->
case queue:peek(MsgBuf) of
@@ -417,64 +397,19 @@ tx_publish(Msg, State = #mqstate { mode = mixed, memory_size = QSize,
only_msg_ids(Pubs) ->
lists:map(fun (Msg) -> Msg #basic_message.guid end, Pubs).
-%% The last 2 params are accumulators. We work through the publishes,
-%% sorting out our msgbuf as we go. Finally, when no more work to do,
-%% we commit first transient, and the persistent msgs. This is safe
-%% because in case of failure, transient messages will be lost on
-%% restart anyway.
-commit_to_queues(_IsDurable, _Q, _TransQ, MsgBuf, [], [], [], []) ->
- MsgBuf;
-commit_to_queues(_IsDurable, Q, _TransQ, MsgBuf, AckTags, [],
- PersistMsgIds, []) ->
- MsgIds = lists:flatten(lists:reverse(PersistMsgIds)),
- ok = rabbit_disk_queue:tx_commit(Q, MsgIds, AckTags),
- MsgBuf;
-commit_to_queues(IsDurable, Q, TransQ, MsgBuf, AckTags, [],
- PersistMsgIds, TransMsgIds) ->
- MsgIds = lists:flatten(lists:reverse(TransMsgIds)),
- ok = rabbit_disk_queue:tx_commit(TransQ, MsgIds, []),
- commit_to_queues(IsDurable, Q, TransQ, MsgBuf, AckTags, [],
- PersistMsgIds, []);
-commit_to_queues(false, Q, TransQ, MsgBuf, AckTags, Publishes, [], []) ->
- MsgIds = only_msg_ids(Publishes),
- MsgBuf1 = inc_queue_length(TransQ, MsgBuf, erlang:length(MsgIds)),
- commit_to_queues(false, Q, TransQ, MsgBuf1, AckTags, [], [], [MsgIds]);
-commit_to_queues(true, Q, TransQ, MsgBuf, AckTags, Publishes =
- [#basic_message { is_persistent = true } | _],
- PersistAcc, TransAcc) ->
- {Persist, Publishes1} = lists:splitwith(fun is_persistent/1, Publishes),
- MsgIds = only_msg_ids(Persist),
- MsgBuf1 = inc_queue_length(Q, MsgBuf, erlang:length(MsgIds)),
- commit_to_queues(true, Q, TransQ, MsgBuf1, AckTags, Publishes1,
- [MsgIds | PersistAcc], TransAcc);
-commit_to_queues(true, Q, TransQ, MsgBuf, AckTags, Publishes,
- PersistAcc, TransAcc) ->
- %% not persistent
- {Trans, Publishes1} = lists:splitwith(fun is_not_persistent/1, Publishes),
- MsgIds = only_msg_ids(Trans),
- MsgBuf1 = inc_queue_length(TransQ, MsgBuf, erlang:length(MsgIds)),
- commit_to_queues(true, Q, TransQ, MsgBuf1, AckTags, Publishes1,
- PersistAcc, [MsgIds | TransAcc]).
-
-is_persistent(#basic_message { is_persistent = IsPersistent }) ->
- IsPersistent.
-
-is_not_persistent(#basic_message { is_persistent = IsPersistent }) ->
- not IsPersistent.
-
tx_commit(Publishes, MsgsWithAcks,
State = #mqstate { mode = disk, queue = Q, length = Length,
memory_size = QSize, memory_loss = Loss,
- is_durable = IsDurable, msg_buf = MsgBuf }) ->
+ msg_buf = MsgBuf }) ->
{RealAcks, ASize} = remove_noacks(MsgsWithAcks),
- MsgBuf1 = case ([] == Publishes) andalso ([] == RealAcks) of
- true -> MsgBuf;
- false -> commit_to_queues
- (IsDurable, Q, transient_queue(Q), MsgBuf,
- RealAcks, Publishes, [], [])
- end,
- {ok, State #mqstate { length = Length + erlang:length(Publishes),
- msg_buf = MsgBuf1, memory_size = QSize - ASize,
+ ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok;
+ true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes),
+ RealAcks)
+ end,
+ Len = erlang:length(Publishes),
+ {ok, State #mqstate { length = Length + Len,
+ msg_buf = inc_queue_length(Q, MsgBuf, Len),
+ memory_size = QSize - ASize,
memory_loss = Loss + ASize }};
tx_commit(Publishes, MsgsWithAcks,
State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
@@ -493,9 +428,8 @@ tx_commit(Publishes, MsgsWithAcks,
{RealAcks, ASize} = remove_noacks(MsgsWithAcks),
ok = case ([] == PersistentPubs) andalso ([] == RealAcks) of
true -> ok;
- false ->
- rabbit_disk_queue:tx_commit(
- Q, lists:reverse(PersistentPubs), RealAcks)
+ false -> rabbit_disk_queue:tx_commit(
+ Q, lists:reverse(PersistentPubs), RealAcks)
end,
{ok, State #mqstate { msg_buf = MsgBuf1, memory_size = QSize - ASize,
length = Length + erlang:length(Publishes),
@@ -540,20 +474,24 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q,
%% here, we may have messages with no ack tags, because of the
%% fact they are not persistent, but nevertheless we want to
%% requeue them. This means publishing them delivered.
- TransQ = transient_queue(Q),
- {MsgBuf1, PersistRQ}
+ Requeue
= lists:foldl(
- fun ({#basic_message { is_persistent = IsPersistent }, AckTag},
- {MB, PRQ})
+ fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, RQ)
when IsDurable andalso IsPersistent ->
- {inc_queue_length(Q, MB, 1), [{AckTag, true} | PRQ]};
- ({Msg, noack}, {MB, PRQ}) ->
- ok = rabbit_disk_queue:publish(TransQ, Msg, true),
- {inc_queue_length(TransQ, MB, 1), PRQ}
- end, {MsgBuf, []}, MessagesWithAckTags),
- ok = rabbit_disk_queue:requeue(Q, lists:reverse(PersistRQ)),
- {ok, State #mqstate { length = Length + erlang:length(MessagesWithAckTags),
- msg_buf = MsgBuf1 }};
+ [{AckTag, true} | RQ];
+ ({Msg, noack}, RQ) ->
+ ok = case RQ == [] of
+ true -> ok;
+ false -> rabbit_disk_queue:requeue(
+ Q, lists:reverse(RQ))
+ end,
+ ok = rabbit_disk_queue:publish(Q, Msg, true),
+ []
+ end, [], MessagesWithAckTags),
+ ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)),
+ Len = erlang:length(MessagesWithAckTags),
+ {ok, State #mqstate { length = Length + Len,
+ msg_buf = inc_queue_length(Q, MsgBuf, Len) }};
requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
msg_buf = MsgBuf,
is_durable = IsDurable,
@@ -577,18 +515,21 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
{ok, State #mqstate {msg_buf = MsgBuf1,
length = Length + erlang:length(MessagesWithAckTags)}}.
-purge(State = #mqstate { queue = Q, length = Count,
+purge(State = #mqstate { queue = Q, mode = disk, length = Count,
+ memory_loss = Loss, memory_size = QSize }) ->
+ Count = rabbit_disk_queue:purge(Q),
+ {Count, State #mqstate { length = 0, memory_size = 0,
+ memory_loss = Loss + QSize }};
+purge(State = #mqstate { queue = Q, mode = mixed, length = Length,
memory_loss = Loss, memory_size = QSize }) ->
- Len1 = rabbit_disk_queue:purge(Q),
- Len2 = rabbit_disk_queue:purge(transient_queue(Q)),
- true = Count >= Len1 + Len2,
- {Count, State #mqstate { length = 0, memory_size = 0, msg_buf = queue:new(),
- memory_loss = Loss + QSize }}.
+ rabbit_disk_queue:purge(Q),
+ {Length,
+ State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0,
+ memory_loss = Loss + QSize }}.
delete_queue(State = #mqstate { queue = Q, memory_size = QSize,
memory_loss = Loss }) ->
ok = rabbit_disk_queue:delete_queue(Q),
- ok = rabbit_disk_queue:delete_queue(transient_queue(Q)),
{ok, State #mqstate { length = 0, memory_size = 0, msg_buf = queue:new(),
memory_loss = Loss + QSize }}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 58a9d0cddd..b9777337ed 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -767,8 +767,8 @@ benchmark_disk_queue() ->
ok = control_action(start_app, []),
passed.
-rdq_message(MsgId, MsgBody) ->
- rabbit_basic:message(x, <<>>, [], MsgBody, MsgId).
+rdq_message(MsgId, MsgBody, IsPersistent) ->
+ rabbit_basic:message(x, <<>>, [], MsgBody, MsgId, IsPersistent).
rdq_match_message(
#basic_message { guid = MsgId, content =
@@ -784,7 +784,7 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
List = lists:seq(1, MsgCount),
{Publish, ok} =
timer:tc(?MODULE, rdq_time_commands,
- [[fun() -> [rabbit_disk_queue:tx_publish(rdq_message(N, Msg))
+ [[fun() -> [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false))
|| N <- List, _ <- Qs] end,
fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List, [])
|| Q <- Qs] end
@@ -820,7 +820,7 @@ rdq_stress_gc(MsgCount) ->
MsgSizeBytes = 256*1024,
Msg = <<0:(8*MsgSizeBytes)>>, % 256KB
List = lists:seq(1, MsgCount),
- [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- List],
+ [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- List],
rabbit_disk_queue:tx_commit(q, List, []),
StartChunk = round(MsgCount / 20), % 5%
AckList =
@@ -862,7 +862,7 @@ rdq_test_startup_with_queue_gaps() ->
Total = 1000,
Half = round(Total/2),
All = lists:seq(1,Total),
- [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- All],
+ [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, true)) || N <- All],
rabbit_disk_queue:tx_commit(q, All, []),
io:format("Publish done~n", []),
%% deliver first half
@@ -918,7 +918,7 @@ rdq_test_redeliver() ->
Total = 1000,
Half = round(Total/2),
All = lists:seq(1,Total),
- [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- All],
+ [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All],
rabbit_disk_queue:tx_commit(q, All, []),
io:format("Publish done~n", []),
%% deliver first half
@@ -970,7 +970,7 @@ rdq_test_purge() ->
Total = 1000,
Half = round(Total/2),
All = lists:seq(1,Total),
- [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- All],
+ [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All],
rabbit_disk_queue:tx_commit(q, All, []),
io:format("Publish done~n", []),
%% deliver first half
@@ -1161,7 +1161,7 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, ChangeFun, CommitOrCanc
rabbit_mixed_queue:ack(AckTags, MS8)
end,
0 = rabbit_mixed_queue:length(MS9),
- Msg = rdq_message(0, <<0:256>>),
+ Msg = rdq_message(0, <<0:256>>, false),
{ok, AckTag, MS10} = rabbit_mixed_queue:publish_delivered(Msg, MS9),
{ok,MS11} = rabbit_mixed_queue:ack([{Msg, AckTag}], MS10),
0 = rabbit_mixed_queue:length(MS11),
@@ -1174,12 +1174,12 @@ rdq_test_disk_queue_modes() ->
Total = 1000,
Half1 = lists:seq(1,round(Total/2)),
Half2 = lists:seq(1 + round(Total/2), Total),
- [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- Half1],
+ [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- Half1],
ok = rabbit_disk_queue:tx_commit(q, Half1, []),
io:format("Publish done~n", []),
ok = rabbit_disk_queue:to_disk_only_mode(),
io:format("To Disk Only done~n", []),
- [rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- Half2],
+ [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- Half2],
ok = rabbit_disk_queue:tx_commit(q, Half2, []),
Seqs = [begin
Remaining = Total - N,