diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-06-24 15:42:30 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-06-24 15:42:30 +0100 |
| commit | 0a02dd9cd09af2132c4bf72e82a2d33daeb34cc1 (patch) | |
| tree | 50cb665f03d72e5ab6794988bfc82211c3be5239 /src | |
| parent | 681a2deadac15035531000daba40964e67c45ae7 (diff) | |
| download | rabbitmq-server-git-0a02dd9cd09af2132c4bf72e82a2d33daeb34cc1.tar.gz | |
Changed reports so that we get bytes gained and lost since the last report.
Also, the sync version of publish is unnecessary as we were only ever using it in one place where we threw away the result. Thus even when publishing a message and marking it delivered in one up (as opposed to publish_delivered, which is quite different ;) ), we can make it cast, not call, as we don't need the acktag.
Also, the memory accounting was wrong for requeue in mixed_queue because requeue doesn't actually change the memory sizes (memory goes up on (tx_)publish, and down on ack/tx_cancel. Requeue has no effect. Nor does deliver.).
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 184 | ||||
| -rw-r--r-- | src/rabbit_queue_mode_manager.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 20 |
5 files changed, 136 insertions, 99 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6b65a5a5d4..b6353beff6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -545,7 +545,8 @@ i(Item, _) -> report_memory(State = #q { old_memory_report = {OldMem, Then}, mixed_state = MS }) -> - MSize = rabbit_mixed_queue:estimate_queue_memory(MS), + {MSize, Gain, Loss} = + rabbit_mixed_queue:estimate_queue_memory(MS), NewMem = case MSize of 0 -> 1; %% avoid / 0 N -> N @@ -555,8 +556,9 @@ report_memory(State = #q { old_memory_report = {OldMem, Then}, case ((NewMem / OldMem) > 1.1 orelse (OldMem / NewMem) > 1.1) andalso (?MEMORY_REPORT_TIME_INTERVAL < timer:now_diff(Now, Then)) of true -> - rabbit_queue_mode_manager:report_memory(self(), NewMem), - State1 #q { old_memory_report = {NewMem, Now} }; + rabbit_queue_mode_manager:report_memory(self(), NewMem, Gain, Loss), + State1 #q { old_memory_report = {NewMem, Now}, + mixed_state = rabbit_mixed_queue:reset_counters(MS) }; false -> State1 end. diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index d161a09349..db1b314a74 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -279,10 +279,8 @@ start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [?FILE_SIZE_LIMIT, ?MAX_READ_FILE_HANDLES], []). -publish(Q, Message = #basic_message {}, false) -> - gen_server2:cast(?SERVER, {publish, Q, Message}); -publish(Q, Message = #basic_message {}, true) -> - gen_server2:call(?SERVER, {publish, Q, Message}, infinity). +publish(Q, Message = #basic_message {}, IsDelivered) -> + gen_server2:cast(?SERVER, {publish, Q, Message, IsDelivered}). deliver(Q) -> gen_server2:call(?SERVER, {deliver, Q}, infinity). @@ -427,10 +425,6 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> end, {ok, State1 #dqstate { current_file_handle = FileHdl }}. -handle_call({publish, Q, Message}, _From, State) -> - {ok, MsgSeqId, State1} = - internal_publish(Q, Message, next, true, State), - reply(MsgSeqId, State1); handle_call({deliver, Q}, _From, State) -> {ok, Result, State1} = internal_deliver(Q, true, false, State), reply(Result, State1); @@ -478,9 +472,9 @@ handle_call({delete_non_durable_queues, DurableQueues}, _From, State) -> handle_call(cache_info, _From, State = #dqstate { message_cache = Cache }) -> reply(ets:info(Cache), State). -handle_cast({publish, Q, Message}, State) -> +handle_cast({publish, Q, Message, IsDelivered}, State) -> {ok, _MsgSeqId, State1} = - internal_publish(Q, Message, next, false, State), + internal_publish(Q, Message, next, IsDelivered, State), noreply(State1); handle_cast({ack, Q, MsgSeqIds}, State) -> {ok, State1} = internal_ack(Q, MsgSeqIds, State), diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 88077f100d..12fede1728 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -40,14 +40,16 @@ length/1, is_empty/1, delete_queue/1]). -export([to_disk_only_mode/2, to_mixed_mode/2, estimate_queue_memory/1, - info/1]). + reset_counters/1, info/1]). -record(mqstate, { mode, msg_buf, queue, is_durable, length, - memory_size + memory_size, + memory_gain, + memory_loss } ). @@ -59,7 +61,9 @@ queue :: queue_name(), is_durable :: bool(), length :: non_neg_integer(), - memory_size :: non_neg_integer() + memory_size :: non_neg_integer(), + memory_gain :: non_neg_integer(), + memory_loss :: non_neg_integer() }). -type(acktag() :: ( 'noack' | { non_neg_integer(), non_neg_integer() })). -type(okmqs() :: {'ok', mqstate()}). @@ -86,7 +90,9 @@ -spec(to_disk_only_mode/2 :: ([message()], mqstate()) -> okmqs()). -spec(to_mixed_mode/2 :: ([message()], mqstate()) -> okmqs()). --spec(estimate_queue_memory/1 :: (mqstate()) -> non_neg_integer). +-spec(estimate_queue_memory/1 :: (mqstate()) -> + {non_neg_integer, non_neg_integer, non_neg_integer}). +-spec(reset_counters/1 :: (mqstate()) -> (mqstate())). -spec(info/1 :: (mqstate()) -> mode()). -endif. @@ -94,7 +100,8 @@ init(Queue, IsDurable, disk) -> purge_non_persistent_messages( #mqstate { mode = disk, msg_buf = queue:new(), queue = Queue, - is_durable = IsDurable, length = 0, memory_size = 0 }); + is_durable = IsDurable, length = 0, memory_size = 0, + memory_gain = 0, memory_loss = 0 }); init(Queue, IsDurable, mixed) -> {ok, State} = init(Queue, IsDurable, disk), to_mixed_mode([], State). @@ -217,21 +224,24 @@ deliver_all_messages(Q, IsDurable, Acks, Requeue, Length, QSize) -> end. publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length, - memory_size = Size }) -> + memory_size = QSize, memory_gain = Gain }) -> ok = rabbit_disk_queue:publish(Q, Msg, false), - Size1 = Size + size_of_message(Msg), - {ok, State #mqstate { length = Length + 1, memory_size = Size1 }}; + MsgSize = size_of_message(Msg), + {ok, State #mqstate { length = Length + 1, memory_size = QSize + MsgSize, + memory_gain = Gain + MsgSize }}; publish(Msg = #basic_message { is_persistent = IsPersistent }, State = #mqstate { queue = Q, mode = mixed, is_durable = IsDurable, - msg_buf = MsgBuf, length = Length, memory_size = Size }) -> + msg_buf = MsgBuf, length = Length, memory_size = QSize, + memory_gain = Gain }) -> OnDisk = IsDurable andalso IsPersistent, ok = if OnDisk -> rabbit_disk_queue:publish(Q, Msg, false); true -> ok end, - Size1 = Size + size_of_message(Msg), + MsgSize = size_of_message(Msg), {ok, State #mqstate { msg_buf = queue:in({Msg, false, OnDisk}, MsgBuf), - length = Length + 1, memory_size = Size1 }}. + length = Length + 1, memory_size = QSize + MsgSize, + memory_gain = Gain + MsgSize }}. %% Assumption here is that the queue is empty already (only called via %% attempt_immediate_delivery). @@ -239,10 +249,13 @@ publish_delivered(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent}, State = #mqstate { mode = Mode, is_durable = IsDurable, - queue = Q, length = 0, memory_size = QSize }) + queue = Q, length = 0, memory_size = QSize, + memory_gain = Gain }) when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> rabbit_disk_queue:publish(Q, Msg, false), - State1 = State #mqstate { memory_size = QSize + size_of_message(Msg) }, + MsgSize = size_of_message(Msg), + State1 = State #mqstate { memory_size = QSize + MsgSize, + memory_gain = Gain + MsgSize }, if IsDurable andalso IsPersistent -> %% must call phantom_deliver otherwise the msg remains at %% the head of the queue. This is synchronous, but @@ -256,8 +269,11 @@ publish_delivered(Msg = {ok, noack, State1} end; publish_delivered(Msg, State = - #mqstate { mode = mixed, length = 0, memory_size = QSize }) -> - {ok, noack, State #mqstate { memory_size = QSize + size_of_message(Msg) }}. + #mqstate { mode = mixed, length = 0, memory_size = QSize, + memory_gain = Gain }) -> + MsgSize = size_of_message(Msg), + {ok, noack, State #mqstate { memory_size = QSize + MsgSize, + memory_gain = Gain + MsgSize }}. deliver(State = #mqstate { length = 0 }) -> {empty, State}; @@ -304,43 +320,56 @@ remove_noacks(MsgsWithAcks) -> end, {[], 0}, MsgsWithAcks), {AckTags, ASize}. -ack(MsgsWithAcks, State = #mqstate { queue = Q, memory_size = QSize }) -> - case remove_noacks(MsgsWithAcks) of - {[], ASize} -> {ok, State #mqstate { memory_size = QSize - ASize }}; - {AckTags, ASize} -> ok = rabbit_disk_queue:ack(Q, AckTags), - {ok, State #mqstate { memory_size = QSize - ASize }} - end. +ack(MsgsWithAcks, State = #mqstate { queue = Q, memory_size = QSize, + memory_loss = Loss }) -> + ASize = case remove_noacks(MsgsWithAcks) of + {[], ASize1} -> ASize1; + {AckTags, ASize1} -> rabbit_disk_queue:ack(Q, AckTags), + ASize1 + end, + State1 = State #mqstate { memory_size = QSize - ASize, + memory_loss = Loss + ASize }, + {ok, State1}. -tx_publish(Msg, State = #mqstate { mode = disk, memory_size = QSize }) -> +tx_publish(Msg, State = #mqstate { mode = disk, memory_size = QSize, + memory_gain = Gain }) -> ok = rabbit_disk_queue:tx_publish(Msg), - {ok, State #mqstate { memory_size = QSize + size_of_message(Msg) }}; + MsgSize = size_of_message(Msg), + {ok, State #mqstate { memory_size = QSize + MsgSize, + memory_gain = Gain + MsgSize }}; tx_publish(Msg = #basic_message { is_persistent = IsPersistent }, State = #mqstate { mode = mixed, is_durable = IsDurable, - memory_size = QSize }) + memory_size = QSize, memory_gain = Gain }) when IsDurable andalso IsPersistent -> ok = rabbit_disk_queue:tx_publish(Msg), - {ok, State #mqstate { memory_size = QSize + size_of_message(Msg) }}; -tx_publish(Msg, State = #mqstate { mode = mixed, memory_size = QSize }) -> + MsgSize = size_of_message(Msg), + {ok, State #mqstate { memory_size = QSize + MsgSize, + memory_gain = Gain + MsgSize }}; +tx_publish(Msg, State = #mqstate { mode = mixed, memory_size = QSize, + memory_gain = Gain }) -> %% this message will reappear in the tx_commit, so ignore for now - {ok, State #mqstate { memory_size = QSize + size_of_message(Msg) }}. + MsgSize = size_of_message(Msg), + {ok, State #mqstate { memory_size = QSize + MsgSize, + memory_gain = Gain + MsgSize }}. only_msg_ids(Pubs) -> lists:map(fun (Msg) -> Msg #basic_message.guid end, Pubs). tx_commit(Publishes, MsgsWithAcks, State = #mqstate { mode = disk, queue = Q, length = Length, - memory_size = QSize }) -> + memory_size = QSize, memory_loss = Loss }) -> {RealAcks, ASize} = remove_noacks(MsgsWithAcks), ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok; true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes), RealAcks) end, {ok, State #mqstate { length = Length + erlang:length(Publishes), - memory_size = QSize - ASize }}; + memory_size = QSize - ASize, + memory_loss = Loss + ASize }}; tx_commit(Publishes, MsgsWithAcks, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, is_durable = IsDurable, length = Length, - memory_size = QSize }) -> + memory_size = QSize, memory_loss = Loss }) -> {PersistentPubs, MsgBuf1} = lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent }, {Acc, MsgBuf2}) -> @@ -360,20 +389,23 @@ tx_commit(Publishes, MsgsWithAcks, rabbit_disk_queue:tx_commit( Q, lists:reverse(PersistentPubs), RealAcks) end, - {ok, State #mqstate { msg_buf = MsgBuf1, + {ok, State #mqstate { msg_buf = MsgBuf1, memory_size = QSize - ASize, length = Length + erlang:length(Publishes), - memory_size = QSize - ASize }}. + memory_loss = Loss + ASize }}. -tx_cancel(Publishes, State = #mqstate { mode = disk, memory_size = QSize }) -> +tx_cancel(Publishes, State = #mqstate { mode = disk, memory_size = QSize, + memory_loss = Loss }) -> {MsgIds, CSize} = lists:foldl( fun (Msg = #basic_message { guid = MsgId }, {MsgIdsAcc, CSizeAcc}) -> {[MsgId | MsgIdsAcc], CSizeAcc + size_of_message(Msg)} end, {[], 0}, Publishes), ok = rabbit_disk_queue:tx_cancel(MsgIds), - {ok, State #mqstate { memory_size = QSize - CSize }}; + {ok, State #mqstate { memory_size = QSize - CSize, + memory_loss = Loss + CSize }}; tx_cancel(Publishes, State = #mqstate { mode = mixed, is_durable = IsDurable, - memory_size = QSize }) -> + memory_size = QSize, + memory_loss = Loss }) -> {PersistentPubs, CSize} = lists:foldl( fun (Msg = #basic_message { is_persistent = IsPersistent, @@ -389,74 +421,78 @@ tx_cancel(Publishes, State = #mqstate { mode = mixed, is_durable = IsDurable, rabbit_disk_queue:tx_cancel(PersistentPubs); true -> ok end, - {ok, State #mqstate { memory_size = QSize - CSize }}. + {ok, State #mqstate { memory_size = QSize - CSize, + memory_loss = Loss + CSize }}. %% [{Msg, AckTag}] requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable, - length = Length, - memory_size = QSize + length = Length }) -> %% here, we may have messages with no ack tags, because of the %% fact they are not persistent, but nevertheless we want to %% requeue them. This means publishing them delivered. - {Requeue, CSize} + Requeue = lists:foldl( - fun ({Msg = #basic_message { is_persistent = IsPersistent }, - AckTag}, {RQ, CSizeAcc}) - when IsPersistent andalso IsDurable -> - {[AckTag | RQ], CSizeAcc + size_of_message(Msg)}; - ({Msg, _AckTag}, {RQ, CSizeAcc}) -> - ok = if RQ == [] -> ok; - true -> rabbit_disk_queue:requeue( - Q, lists:reverse(RQ)) + fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, RQ) + when IsDurable andalso IsPersistent -> + [AckTag | RQ]; + ({Msg, _AckTag}, RQ) -> + ok = case RQ == [] of + true -> ok; + false -> rabbit_disk_queue:requeue( + Q, lists:reverse(RQ)) end, - _AckTag1 = rabbit_disk_queue:publish( - Q, Msg, true), - {[], CSizeAcc + size_of_message(Msg)} - end, {[], 0}, MessagesWithAckTags), + ok = rabbit_disk_queue:publish(Q, Msg, true), + [] + end, [], MessagesWithAckTags), ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)), - {ok, State #mqstate { length = Length + erlang:length(MessagesWithAckTags), - memory_size = QSize + CSize - }}; + {ok, + State #mqstate { length = Length + erlang:length(MessagesWithAckTags) }}; requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, is_durable = IsDurable, - length = Length, - memory_size = QSize + length = Length }) -> - {PersistentPubs, MsgBuf1, CSize} = + {PersistentPubs, MsgBuf1} = lists:foldl( fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag}, - {Acc, MsgBuf2, CSizeAcc}) -> + {Acc, MsgBuf2}) -> OnDisk = IsDurable andalso IsPersistent, Acc1 = if OnDisk -> [AckTag | Acc]; true -> Acc end, - CSizeAcc1 = CSizeAcc + size_of_message(Msg), - {Acc1, queue:in({Msg, true, OnDisk}, MsgBuf2), CSizeAcc1} - end, {[], MsgBuf, 0}, MessagesWithAckTags), + {Acc1, queue:in({Msg, true, OnDisk}, MsgBuf2)} + end, {[], MsgBuf}, MessagesWithAckTags), ok = if [] == PersistentPubs -> ok; true -> rabbit_disk_queue:requeue(Q, lists:reverse(PersistentPubs)) end, - {ok, State #mqstate {msg_buf = MsgBuf1, memory_size = QSize + CSize, + {ok, State #mqstate {msg_buf = MsgBuf1, length = Length + erlang:length(MessagesWithAckTags)}}. -purge(State = #mqstate { queue = Q, mode = disk, length = Count }) -> +purge(State = #mqstate { queue = Q, mode = disk, length = Count, + memory_loss = Loss, memory_size = QSize }) -> Count = rabbit_disk_queue:purge(Q), - {Count, State #mqstate { length = 0, memory_size = 0 }}; -purge(State = #mqstate { queue = Q, mode = mixed, length = Length }) -> + {Count, State #mqstate { length = 0, memory_size = 0, + memory_loss = Loss + QSize }}; +purge(State = #mqstate { queue = Q, mode = mixed, length = Length, + memory_loss = Loss, memory_size = QSize }) -> rabbit_disk_queue:purge(Q), {Length, - State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0 }}. + State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0, + memory_loss = Loss + QSize }}. -delete_queue(State = #mqstate { queue = Q, mode = disk }) -> +delete_queue(State = #mqstate { queue = Q, mode = disk, memory_size = QSize, + memory_loss = Loss }) -> rabbit_disk_queue:delete_queue(Q), - {ok, State #mqstate { length = 0, memory_size = 0 }}; -delete_queue(State = #mqstate { queue = Q, mode = mixed }) -> + {ok, State #mqstate { length = 0, memory_size = 0, + memory_loss = Loss + QSize }}; +delete_queue(State = #mqstate { queue = Q, mode = mixed, memory_size = QSize, + memory_loss = Loss }) -> rabbit_disk_queue:delete_queue(Q), - {ok, State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0 }}. + {ok, State #mqstate { msg_buf = queue:new(), length = 0, memory_size = 0, + memory_loss = Loss + QSize }}. length(#mqstate { length = Length }) -> Length. @@ -464,8 +500,12 @@ length(#mqstate { length = Length }) -> is_empty(#mqstate { length = Length }) -> 0 == Length. -estimate_queue_memory(#mqstate { memory_size = Size }) -> - 2 * Size. %% Magic number. Will probably need playing with. +estimate_queue_memory(#mqstate { memory_size = Size, memory_gain = Gain, + memory_loss = Loss }) -> + {Size, Gain, Loss}. + +reset_counters(State) -> + State #mqstate { memory_gain = 0, memory_loss = 0 }. info(#mqstate { mode = Mode }) -> Mode. diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl index 50f66063d8..5a3b464d02 100644 --- a/src/rabbit_queue_mode_manager.erl +++ b/src/rabbit_queue_mode_manager.erl @@ -38,7 +38,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([register/1, report_memory/2]). +-export([register/1, report_memory/4]). -define(SERVER, ?MODULE). @@ -49,7 +49,8 @@ -spec(start_link/0 :: () -> ({'ok', pid()} | 'ignore' | {'error', any()})). -spec(register/1 :: (pid()) -> {'ok', queue_mode()}). --spec(report_memory/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(report_memory/4 :: (pid(), non_neg_integer(), + non_neg_integer(), non_neg_integer()) -> 'ok'). -endif. @@ -63,8 +64,8 @@ start_link() -> register(Pid) -> gen_server2:call(?SERVER, {register, Pid}). -report_memory(Pid, Memory) -> - gen_server2:cast(?SERVER, {report_memory, Pid, Memory}). +report_memory(Pid, Memory, Gain, Loss) -> + gen_server2:cast(?SERVER, {report_memory, Pid, Memory, Gain, Loss}). init([]) -> process_flag(trap_exit, true), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 7d74968b9c..34a4fcb5f3 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1003,15 +1003,15 @@ rdq_test_mixed_queue_modes() -> end, MS4, lists:seq(1,10)), 30 = rabbit_mixed_queue:length(MS6), io:format("Published a mixture of messages; ~w~n", - [rabbit_mixed_queue:estimate_extra_memory(MS6)]), + [rabbit_mixed_queue:estimate_queue_memory(MS6)]), {ok, MS7} = rabbit_mixed_queue:to_disk_only_mode([], MS6), 30 = rabbit_mixed_queue:length(MS7), io:format("Converted to disk only mode; ~w~n", - [rabbit_mixed_queue:estimate_extra_memory(MS7)]), + [rabbit_mixed_queue:estimate_queue_memory(MS7)]), {ok, MS8} = rabbit_mixed_queue:to_mixed_mode([], MS7), 30 = rabbit_mixed_queue:length(MS8), io:format("Converted to mixed mode; ~w~n", - [rabbit_mixed_queue:estimate_extra_memory(MS8)]), + [rabbit_mixed_queue:estimate_queue_memory(MS8)]), MS10 = lists:foldl( fun (N, MS9) -> @@ -1035,10 +1035,10 @@ rdq_test_mixed_queue_modes() -> lists:foldl( fun (N, {MS13, AcksAcc}) -> Rem = 10 - N, - {{#basic_message { is_persistent = true }, + {{Msg = #basic_message { is_persistent = true }, false, AckTag, Rem}, MS13a} = rabbit_mixed_queue:deliver(MS13), - {MS13a, [AckTag | AcksAcc]} + {MS13a, [{Msg, AckTag} | AcksAcc]} end, {MS12, []}, lists:seq(1,10)), 0 = rabbit_mixed_queue:length(MS14), {ok, MS15} = rabbit_mixed_queue:ack(AckTags, MS14), @@ -1050,7 +1050,7 @@ rdq_test_mixed_queue_modes() -> rdq_start(), {ok, MS17} = rabbit_mixed_queue:init(q, true, mixed), 0 = rabbit_mixed_queue:length(MS17), - 0 = rabbit_mixed_queue:estimate_extra_memory(MS17), + {0,0,0} = rabbit_mixed_queue:estimate_queue_memory(MS17), io:format("Recovered queue~n"), rdq_stop(), passed. @@ -1120,10 +1120,10 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, ChangeFun, CommitOrCanc Rem = Len1 - (Msg #basic_message.guid) - 1, {{Msg, false, AckTag, Rem}, MS7a} = rabbit_mixed_queue:deliver(MS7), - {[AckTag | Acc], MS7a} + {[{Msg, AckTag} | Acc], MS7a} end, {[], MS6}, MsgsA ++ MsgsB), 0 = rabbit_mixed_queue:length(MS8), - rabbit_mixed_queue:ack(lists:reverse(AckTags), MS8); + rabbit_mixed_queue:ack(AckTags, MS8); cancel -> {ok, MS6} = rabbit_mixed_queue:tx_cancel(MsgsB, MS5), Len0 = rabbit_mixed_queue:length(MS6), @@ -1133,10 +1133,10 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, ChangeFun, CommitOrCanc Rem = Len0 - (Msg #basic_message.guid) - 1, {{Msg, false, AckTag, Rem}, MS7a} = rabbit_mixed_queue:deliver(MS7), - {[AckTag | Acc], MS7a} + {[{Msg, AckTag} | Acc], MS7a} end, {[], MS6}, MsgsA), 0 = rabbit_mixed_queue:length(MS8), - rabbit_mixed_queue:ack(lists:reverse(AckTags), MS8) + rabbit_mixed_queue:ack(AckTags, MS8) end, 0 = rabbit_mixed_queue:length(MS9), passed. |
