summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-24 15:42:30 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-24 15:42:30 +0100
commit0a02dd9cd09af2132c4bf72e82a2d33daeb34cc1 (patch)
tree50cb665f03d72e5ab6794988bfc82211c3be5239 /src
parent681a2deadac15035531000daba40964e67c45ae7 (diff)
downloadrabbitmq-server-git-0a02dd9cd09af2132c4bf72e82a2d33daeb34cc1.tar.gz
Changed reports so that we get bytes gained and lost since the last report.
Also, the sync version of publish is unnecessary as we were only ever using it in one place where we threw away the result. Thus even when publishing a message and marking it delivered in one up (as opposed to publish_delivered, which is quite different ;) ), we can make it cast, not call, as we don't need the acktag. Also, the memory accounting was wrong for requeue in mixed_queue because requeue doesn't actually change the memory sizes (memory goes up on (tx_)publish, and down on ack/tx_cancel. Requeue has no effect. Nor does deliver.).
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl8
-rw-r--r--src/rabbit_disk_queue.erl14
-rw-r--r--src/rabbit_mixed_queue.erl184
-rw-r--r--src/rabbit_queue_mode_manager.erl9
-rw-r--r--src/rabbit_tests.erl20
5 files changed, 136 insertions, 99 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6b65a5a5d4..b6353beff6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -545,7 +545,8 @@ i(Item, _) ->
report_memory(State = #q { old_memory_report = {OldMem, Then},
mixed_state = MS }) ->
- MSize = rabbit_mixed_queue:estimate_queue_memory(MS),
+ {MSize, Gain, Loss} =
+ rabbit_mixed_queue:estimate_queue_memory(MS),
NewMem = case MSize of
0 -> 1; %% avoid / 0
N -> N
@@ -555,8 +556,9 @@ report_memory(State = #q { old_memory_report = {OldMem, Then},
case ((NewMem / OldMem) > 1.1 orelse (OldMem / NewMem) > 1.1) andalso
(?MEMORY_REPORT_TIME_INTERVAL < timer:now_diff(Now, Then)) of
true ->
- rabbit_queue_mode_manager:report_memory(self(), NewMem),
- State1 #q { old_memory_report = {NewMem, Now} };
+ rabbit_queue_mode_manager:report_memory(self(), NewMem, Gain, Loss),
+ State1 #q { old_memory_report = {NewMem, Now},
+ mixed_state = rabbit_mixed_queue:reset_counters(MS) };
false -> State1
end.
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index d161a09349..db1b314a74 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -279,10 +279,8 @@ start_link() ->
gen_server2:start_link({local, ?SERVER}, ?MODULE,
[?FILE_SIZE_LIMIT, ?MAX_READ_FILE_HANDLES], []).
-publish(Q, Message = #basic_message {}, false) ->
- gen_server2:cast(?SERVER, {publish, Q, Message});
-publish(Q, Message = #basic_message {}, true) ->
- gen_server2:call(?SERVER, {publish, Q, Message}, infinity).
+publish(Q, Message = #basic_message {}, IsDelivered) ->
+ gen_server2:cast(?SERVER, {publish, Q, Message, IsDelivered}).
deliver(Q) ->
gen_server2:call(?SERVER, {deliver, Q}, infinity).
@@ -427,10 +425,6 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
end,
{ok, State1 #dqstate { current_file_handle = FileHdl }}.
-handle_call({publish, Q, Message}, _From, State) ->
- {ok, MsgSeqId, State1} =
- internal_publish(Q, Message, next, true, State),
- reply(MsgSeqId, State1);
handle_call({deliver, Q}, _From, State) ->
{ok, Result, State1} = internal_deliver(Q, true, false, State),
reply(Result, State1);
@@ -478,9 +472,9 @@ handle_call({delete_non_durable_queues, DurableQueues}, _From, State) ->
handle_call(cache_info, _From, State = #dqstate { message_cache = Cache }) ->
reply(ets:info(Cache), State).
-handle_cast({publish, Q, Message}, State) ->
+handle_cast({publish, Q, Message, IsDelivered}, State) ->
{ok, _MsgSeqId, State1} =
- internal_publish(Q, Message, next, false, State),
+ internal_publish(Q, Message, next, IsDelivered, State),
noreply(State1);
handle_cast({ack, Q, MsgSeqIds}, State) ->
{ok, State1} = internal_ack(Q, MsgSeqIds, State),
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 88077f100d..12fede1728 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -40,14 +40,16 @@
length/1, is_empty/1, delete_queue/1]).
-export([to_disk_only_mode/2, to_mixed_mode/2, estimate_queue_memory/1,
- info/1]).
+ reset_counters/1, info/1]).
-record(mqstate, { mode,
msg_buf,
queue,
is_durable,
length,
- memory_size
+ memory_size,
+ memory_gain,
+ memory_loss
}
).
@@ -59,7 +61,9 @@
queue :: queue_name(),
is_durable :: bool(),
length :: non_neg_integer(),
- memory_size :: non_neg_integer()
+ memory_size :: non_neg_integer(),
+ memory_gain :: non_neg_integer(),
+ memory_loss :: non_neg_integer()
}).
-type(acktag() :: ( 'noack' | { non_neg_integer(), non_neg_integer() })).
-type(okmqs() :: {'ok', mqstate()}).
@@ -86,7 +90,9 @@
-spec(to_disk_only_mode/2 :: ([message()], mqstate()) -> okmqs()).
-spec(to_mixed_mode/2 :: ([message()], mqstate()) -> okmqs()).
--spec(estimate_queue_memory/1 :: (mqstate()) -> non_neg_integer).
+-spec(estimate_queue_memory/1 :: (mqstate()) ->
+ {non_neg_integer, non_neg_integer, non_neg_integer}).
+-spec(reset_counters/1 :: (mqstate()) -> (mqstate())).
-spec(info/1 :: (mqstate()) -> mode()).
-endif.
@@ -94,7 +100,8 @@
init(Queue, IsDurable, disk) ->
purge_non_persistent_messages(
#mqstate { mode = disk, msg_buf = queue:new(), queue = Queue,
- is_durable = IsDurable, length = 0, memory_size = 0 });
+ is_durable = IsDurable, length = 0, memory_size = 0,
+ memory_gain = 0, memory_loss = 0 });
init(Queue, IsDurable, mixed) ->
{ok, State} = init(Queue, IsDurable, disk),
to_mixed_mode([], State).
@@ -217,21 +224,24 @@ deliver_all_messages(Q, IsDurable, Acks, Requeue, Length, QSize) ->
end.
publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length,
- memory_size = Size }) ->
+ memory_size = QSize, memory_gain = Gain }) ->
ok = rabbit_disk_queue:publish(Q, Msg, false),
- Size1 = Size + size_of_message(Msg),
- {ok, State #mqstate { length = Length + 1, memory_size = Size1 }};
+ MsgSize = size_of_message(Msg),
+ {ok, State #mqstate { length = Length + 1, memory_size = QSize + MsgSize,
+ memory_gain = Gain + MsgSize }};
publish(Msg = #basic_message { is_persistent = IsPersistent }, State =
#mqstate { queue = Q, mode = mixed, is_durable = IsDurable,
- msg_buf = MsgBuf, length = Length, memory_size = Size }) ->
+ msg_buf = MsgBuf, length = Length, memory_size = QSize,
+ memory_gain = Gain }) ->
OnDisk = IsDurable andalso IsPersistent,
ok = if OnDisk ->
rabbit_disk_queue:publish(Q, Msg, false);
true -> ok
end,
- Size1 = Size + size_of_message(Msg),
+ MsgSize = size_of_message(Msg),
{ok, State #mqstate { msg_buf = queue:in({Msg, false, OnDisk}, MsgBuf),
- length = Length + 1, memory_size = Size1 }}.
+ length = Length + 1, memory_size = QSize + MsgSize,
+ memory_gain = Gain + MsgSize }}.
%% Assumption here is that the queue is empty already (only called via
%% attempt_immediate_delivery).
@@ -239,10 +249,13 @@ publish_delivered(Msg =
#basic_message { guid = MsgId, is_persistent = IsPersistent},
State =
#mqstate { mode = Mode, is_durable = IsDurable,
- queue = Q, length = 0, memory_size = QSize })
+ queue = Q, length = 0, memory_size = QSize,
+ memory_gain = Gain })
when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
rabbit_disk_queue:publish(Q, Msg, false),
- State1 = State #mqstate { memory_size = QSize + size_of_message(Msg) },
+ MsgSize = size_of_message(Msg),
+ State1 = State #mqstate { memory_size = QSize + MsgSize,
+ memory_gain = Gain + MsgSize },
if IsDurable andalso IsPersistent ->
%% must call phantom_deliver otherwise the msg remains at
%% the head of the queue. This is synchronous, but
@@ -256,8 +269,11 @@ publish_delivered(Msg =
{ok, noack, State1}
end;
publish_delivered(Msg, State =
- #mqstate { mode = mixed, length = 0, memory_size = QSize }) ->
- {ok, noack, State #mqstate { memory_size = QSize + size_of_message(Msg) }}.
+ #mqstate { mode = mixed, length = 0, memory_size = QSize,
+ memory_gain = Gain }) ->
+ MsgSize = size_of_message(Msg),
+ {ok, noack, State #mqstate { memory_size = QSize + MsgSize,
+ memory_gain = Gain + MsgSize }}.
deliver(State = #mqstate { length = 0 }) ->
{empty, State};
@@ -304,43 +320,56 @@ remove_noacks(MsgsWithAcks) ->
end, {[], 0}, MsgsWithAcks),
{AckTags, ASize}.
-ack(MsgsWithAcks, State = #mqstate { queue = Q, memory_size = QSize }) ->
- case remove_noacks(MsgsWithAcks) of
- {[], ASize} -> {ok, State #mqstate { memory_size = QSize - ASize }};
- {AckTags, ASize} -> ok = rabbit_disk_queue:ack(Q, AckTags),
- {ok, State #mqstate { memory_size = QSize - ASize }}
- end.
+ack(MsgsWithAcks, State = #mqstate { queue = Q, memory_size = QSize,
+ memory_loss = Loss }) ->
+ ASize = case remove_noacks(MsgsWithAcks) of
+ {[], ASize1} -> ASize1;
+ {AckTags, ASize1} -> rabbit_disk_queue:ack(Q, AckTags),
+ ASize1
+ end,
+ State1 = State #mqstate { memory_size = QSize - ASize,
+ memory_loss = Loss + ASize },
+ {ok, State1}.
-tx_publish(Msg, State = #mqstate { mode = disk, memory_size = QSize }) ->
+tx_publish(Msg, State = #mqstate { mode = disk, memory_size = QSize,
+ memory_gain = Gain }) ->
ok = rabbit_disk_queue:tx_publish(Msg),
- {ok, State #mqstate { memory_size = QSize + size_of_message(Msg) }};
+ MsgSize = size_of_message(Msg),
+ {ok, State #mqstate { memory_size = QSize + MsgSize,
+ memory_gain = Gain + MsgSize }};
tx_publish(Msg = #basic_message { is_persistent = IsPersistent }, State =
#mqstate { mode = mixed, is_durable = IsDurable,
- memory_size = QSize })
+ memory_size = QSize, memory_gain = Gain })
when IsDurable andalso IsPersistent ->
ok = rabbit_disk_queue:tx_publish(Msg),
- {ok, State #mqstate { memory_size = QSize + size_of_message(Msg) }};
-tx_publish(Msg, State = #mqstate { mode = mixed, memory_size = QSize }) ->
+ MsgSize = size_of_message(Msg),
+ {ok, State #mqstate { memory_size = QSize + MsgSize,
+ memory_gain = Gain + MsgSize }};
+tx_publish(Msg, State = #mqstate { mode = mixed, memory_size = QSize,
+ memory_gain = Gain }) ->
%% this message will reappear in the tx_commit, so ignore for now
- {ok, State #mqstate { memory_size = QSize + size_of_message(Msg) }}.
+ MsgSize = size_of_message(Msg),
+ {ok, State #mqstate { memory_size = QSize + MsgSize,
+ memory_gain = Gain + MsgSize }}.
only_msg_ids(Pubs) ->
lists:map(fun (Msg) -> Msg #basic_message.guid end, Pubs).
tx_commit(Publishes, MsgsWithAcks,
State = #mqstate { mode = disk, queue = Q, length = Length,
- memory_size = QSize }) ->
+ memory_size = QSize, memory_loss = Loss }) ->
{RealAcks, ASize} = remove_noacks(MsgsWithAcks),
ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok;
true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes),
RealAcks)
end,
{ok, State #mqstate { length = Length + erlang:length(Publishes),
- memory_size = QSize - ASize }};
+ memory_size = QSize - ASize,
+ memory_loss = Loss + ASize }};
tx_commit(Publishes, MsgsWithAcks,
State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
is_durable = IsDurable, length = Length,
- memory_size = QSize }) ->
+ memory_size = QSize, memory_loss = Loss }) ->
{PersistentPubs, MsgBuf1} =
lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent },
{Acc, MsgBuf2}) ->
@@ -360,20 +389,23 @@ tx_commit(Publishes, MsgsWithAcks,
rabbit_disk_queue:tx_commit(
Q, lists:reverse(PersistentPubs), RealAcks)
end,
- {ok, State #mqstate { msg_buf = MsgBuf1,
+ {ok, State #mqstate { msg_buf = MsgBuf1, memory_size = QSize - ASize,
length = Length + erlang:length(Publishes),
- memory_size = QSize - ASize }}.
+ memory_loss = Loss + ASize }}.
-tx_cancel(Publishes, State = #mqstate { mode = disk, memory_size = QSize }) ->
+tx_cancel(Publishes, State = #mqstate { mode = disk, memory_size = QSize,
+ memory_loss = Loss }) ->
{MsgIds, CSize} =
lists:foldl(
fun (Msg = #basic_message { guid = MsgId }, {MsgIdsAcc, CSizeAcc}) ->
{[MsgId | MsgIdsAcc], CSizeAcc + size_of_message(Msg)}
end, {[], 0}, Publishes),
ok = rabbit_disk_queue:tx_cancel(MsgIds),
- {ok, State #mqstate { memory_size = QSize - CSize }};
+ {ok, State #mqstate { memory_size = QSize - CSize,
+ memory_loss = Loss + CSize }};
tx_cancel(Publishes, State = #mqstate { mode = mixed, is_durable = IsDurable,
- memory_size = QSize }) ->
+ memory_size = QSize,
+ memory_loss = Loss }) ->
{PersistentPubs, CSize} =
lists:foldl(
fun (Msg = #basic_message { is_persistent = IsPersistent,
@@ -389,74 +421,78 @@ tx_cancel(Publishes, State = #mqstate { mode = mixed, is_durable = IsDurable,
rabbit_disk_queue:tx_cancel(PersistentPubs);
true -> ok
end,
- {ok, State #mqstate { memory_size = QSize - CSize }}.
+ {ok, State #mqstate { memory_size = QSize - CSize,
+ memory_loss = Loss + CSize }}.
%% [{Msg, AckTag}]
requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q,
is_durable = IsDurable,
- length = Length,
- memory_size = QSize
+ length = Length
}) ->
%% here, we may have messages with no ack tags, because of the
%% fact they are not persistent, but nevertheless we want to
%% requeue them. This means publishing them delivered.
- {Requeue, CSize}
+ Requeue
= lists:foldl(
- fun ({Msg = #basic_message { is_persistent = IsPersistent },
- AckTag}, {RQ, CSizeAcc})
- when IsPersistent andalso IsDurable ->
- {[AckTag | RQ], CSizeAcc + size_of_message(Msg)};
- ({Msg, _AckTag}, {RQ, CSizeAcc}) ->
- ok = if RQ == [] -> ok;
- true -> rabbit_disk_queue:requeue(
- Q, lists:reverse(RQ))
+ fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, RQ)
+ when IsDurable andalso IsPersistent ->
+ [AckTag | RQ];
+ ({Msg, _AckTag}, RQ) ->
+ ok = case RQ == [] of
+ true -> ok;
+ false -> rabbit_disk_queue:requeue(
+ Q, lists:reverse(RQ))
end,
- _AckTag1 = rabbit_disk_queue:publish(
- Q, Msg, true),
- {[], CSizeAcc + size_of_message(Msg)}
- end, {[], 0}, MessagesWithAckTags),
+ ok = rabbit_disk_queue:publish(Q, Msg, true),
+ []
+ end, [], MessagesWithAckTags),
ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)),
- {ok, State #mqstate { length = Length + erlang:length(MessagesWithAckTags),
- memory_size = QSize + CSize
- }};
+ {ok,
+ State #mqstate { length = Length + erlang:length(MessagesWithAckTags) }};
requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
msg_buf = MsgBuf,
is_durable = IsDurable,
- length = Length,
- memory_size = QSize
+ length = Length
}) ->
- {PersistentPubs, MsgBuf1, CSize} =
+ {PersistentPubs, MsgBuf1} =
lists:foldl(
fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag},
- {Acc, MsgBuf2, CSizeAcc}) ->
+ {Acc, MsgBuf2}) ->
OnDisk = IsDurable andalso IsPersistent,
Acc1 =
if OnDisk -> [AckTag | Acc];
true -> Acc
end,
- CSizeAcc1 = CSizeAcc + size_of_message(Msg),
- {Acc1, queue:in({Msg, true, OnDisk}, MsgBuf2), CSizeAcc1}
- end, {[], MsgBuf, 0}, MessagesWithAckTags),
+ {Acc1, queue:in({Msg, true, OnDisk}, MsgBuf2)}
+ end, {[], MsgBuf}, MessagesWithAckTags),
ok = if [] == PersistentPubs -> ok;
true -> rabbit_disk_queue:requeue(Q, lists:reverse(PersistentPubs))
end,
- {ok, State #mqstate {msg_buf = MsgBuf1, memory_size = QSize + CSize,
+ {ok, State #mqstate {msg_buf = MsgBuf1,
length = Length + erlang:length(MessagesWithAckTags)}}.
-purge(State = #mqstate { queue = Q, mode = disk, length = Count }) ->
+purge(State = #mqstate { queue = Q, mode = disk, length = Count,
+ memory_loss = Loss, memory_size = QSize }) ->
Count = rabbit_disk_queue:purge(Q),
- {Count, State #mqstate { length = 0, memory_size = 0 }};
-purge(State = #mqstate { queue = Q, mode = mixed, length = Length }) ->
+ {Count, State #mqstate { length = 0, memory_size = 0,
+ memory_loss = Loss + QSize }};
+purge(State = #mqstate { queue = Q, mode = mixed, length = Length,
+ memory_loss = Loss, memory_size = QSize }) ->
rabbit_disk_queue:purge(Q),
{Length,
- State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0 }}.
+ State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0,
+ memory_loss = Loss + QSize }}.
-delete_queue(State = #mqstate { queue = Q, mode = disk }) ->
+delete_queue(State = #mqstate { queue = Q, mode = disk, memory_size = QSize,
+ memory_loss = Loss }) ->
rabbit_disk_queue:delete_queue(Q),
- {ok, State #mqstate { length = 0, memory_size = 0 }};
-delete_queue(State = #mqstate { queue = Q, mode = mixed }) ->
+ {ok, State #mqstate { length = 0, memory_size = 0,
+ memory_loss = Loss + QSize }};
+delete_queue(State = #mqstate { queue = Q, mode = mixed, memory_size = QSize,
+ memory_loss = Loss }) ->
rabbit_disk_queue:delete_queue(Q),
- {ok, State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0 }}.
+ {ok, State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0,
+ memory_loss = Loss + QSize }}.
length(#mqstate { length = Length }) ->
Length.
@@ -464,8 +500,12 @@ length(#mqstate { length = Length }) ->
is_empty(#mqstate { length = Length }) ->
0 == Length.
-estimate_queue_memory(#mqstate { memory_size = Size }) ->
- 2 * Size. %% Magic number. Will probably need playing with.
+estimate_queue_memory(#mqstate { memory_size = Size, memory_gain = Gain,
+ memory_loss = Loss }) ->
+ {Size, Gain, Loss}.
+
+reset_counters(State) ->
+ State #mqstate { memory_gain = 0, memory_loss = 0 }.
info(#mqstate { mode = Mode }) ->
Mode.
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
index 50f66063d8..5a3b464d02 100644
--- a/src/rabbit_queue_mode_manager.erl
+++ b/src/rabbit_queue_mode_manager.erl
@@ -38,7 +38,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([register/1, report_memory/2]).
+-export([register/1, report_memory/4]).
-define(SERVER, ?MODULE).
@@ -49,7 +49,8 @@
-spec(start_link/0 :: () ->
({'ok', pid()} | 'ignore' | {'error', any()})).
-spec(register/1 :: (pid()) -> {'ok', queue_mode()}).
--spec(report_memory/2 :: (pid(), non_neg_integer()) -> 'ok').
+-spec(report_memory/4 :: (pid(), non_neg_integer(),
+ non_neg_integer(), non_neg_integer()) -> 'ok').
-endif.
@@ -63,8 +64,8 @@ start_link() ->
register(Pid) ->
gen_server2:call(?SERVER, {register, Pid}).
-report_memory(Pid, Memory) ->
- gen_server2:cast(?SERVER, {report_memory, Pid, Memory}).
+report_memory(Pid, Memory, Gain, Loss) ->
+ gen_server2:cast(?SERVER, {report_memory, Pid, Memory, Gain, Loss}).
init([]) ->
process_flag(trap_exit, true),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 7d74968b9c..34a4fcb5f3 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1003,15 +1003,15 @@ rdq_test_mixed_queue_modes() ->
end, MS4, lists:seq(1,10)),
30 = rabbit_mixed_queue:length(MS6),
io:format("Published a mixture of messages; ~w~n",
- [rabbit_mixed_queue:estimate_extra_memory(MS6)]),
+ [rabbit_mixed_queue:estimate_queue_memory(MS6)]),
{ok, MS7} = rabbit_mixed_queue:to_disk_only_mode([], MS6),
30 = rabbit_mixed_queue:length(MS7),
io:format("Converted to disk only mode; ~w~n",
- [rabbit_mixed_queue:estimate_extra_memory(MS7)]),
+ [rabbit_mixed_queue:estimate_queue_memory(MS7)]),
{ok, MS8} = rabbit_mixed_queue:to_mixed_mode([], MS7),
30 = rabbit_mixed_queue:length(MS8),
io:format("Converted to mixed mode; ~w~n",
- [rabbit_mixed_queue:estimate_extra_memory(MS8)]),
+ [rabbit_mixed_queue:estimate_queue_memory(MS8)]),
MS10 =
lists:foldl(
fun (N, MS9) ->
@@ -1035,10 +1035,10 @@ rdq_test_mixed_queue_modes() ->
lists:foldl(
fun (N, {MS13, AcksAcc}) ->
Rem = 10 - N,
- {{#basic_message { is_persistent = true },
+ {{Msg = #basic_message { is_persistent = true },
false, AckTag, Rem},
MS13a} = rabbit_mixed_queue:deliver(MS13),
- {MS13a, [AckTag | AcksAcc]}
+ {MS13a, [{Msg, AckTag} | AcksAcc]}
end, {MS12, []}, lists:seq(1,10)),
0 = rabbit_mixed_queue:length(MS14),
{ok, MS15} = rabbit_mixed_queue:ack(AckTags, MS14),
@@ -1050,7 +1050,7 @@ rdq_test_mixed_queue_modes() ->
rdq_start(),
{ok, MS17} = rabbit_mixed_queue:init(q, true, mixed),
0 = rabbit_mixed_queue:length(MS17),
- 0 = rabbit_mixed_queue:estimate_extra_memory(MS17),
+ {0,0,0} = rabbit_mixed_queue:estimate_queue_memory(MS17),
io:format("Recovered queue~n"),
rdq_stop(),
passed.
@@ -1120,10 +1120,10 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, ChangeFun, CommitOrCanc
Rem = Len1 - (Msg #basic_message.guid) - 1,
{{Msg, false, AckTag, Rem}, MS7a} =
rabbit_mixed_queue:deliver(MS7),
- {[AckTag | Acc], MS7a}
+ {[{Msg, AckTag} | Acc], MS7a}
end, {[], MS6}, MsgsA ++ MsgsB),
0 = rabbit_mixed_queue:length(MS8),
- rabbit_mixed_queue:ack(lists:reverse(AckTags), MS8);
+ rabbit_mixed_queue:ack(AckTags, MS8);
cancel ->
{ok, MS6} = rabbit_mixed_queue:tx_cancel(MsgsB, MS5),
Len0 = rabbit_mixed_queue:length(MS6),
@@ -1133,10 +1133,10 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, ChangeFun, CommitOrCanc
Rem = Len0 - (Msg #basic_message.guid) - 1,
{{Msg, false, AckTag, Rem}, MS7a} =
rabbit_mixed_queue:deliver(MS7),
- {[AckTag | Acc], MS7a}
+ {[{Msg, AckTag} | Acc], MS7a}
end, {[], MS6}, MsgsA),
0 = rabbit_mixed_queue:length(MS8),
- rabbit_mixed_queue:ack(lists:reverse(AckTags), MS8)
+ rabbit_mixed_queue:ack(AckTags, MS8)
end,
0 = rabbit_mixed_queue:length(MS9),
passed.