summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl349
-rw-r--r--src/rabbit_misc.erl10
-rw-r--r--src/rabbit_mixed_queue.erl9
3 files changed, 174 insertions, 194 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 69edb64fbc..417c3f02b0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -51,8 +51,8 @@
owner,
exclusive_consumer,
has_had_consumers,
+ mixed_state,
next_msg_id,
- message_buffer,
round_robin}).
-record(consumer, {tag, ack_required}).
@@ -96,16 +96,18 @@ init(Q) ->
owner = none,
exclusive_consumer = none,
has_had_consumers = false,
+ mixed_state = rabbit_mixed_queue:start_link(qname(Q), mixed), %% TODO, CHANGE ME
next_msg_id = 1,
- message_buffer = queue:new(),
round_robin = queue:new()}, ?HIBERNATE_AFTER}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
QName = qname(State),
- lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end,
- all_tx()),
- ok = purge_message_buffer(QName, State#q.message_buffer),
+ NewState =
+ lists:foldl(fun (Txn, State1) ->
+ rollback_transaction(Txn, State1)
+ end, State, all_tx()),
+ rabbit_mixed_queue:purge(NewState #q.mixed_state),
ok = rabbit_amqqueue:internal_delete(QName).
code_change(_OldVsn, State, _Extra) ->
@@ -156,11 +158,10 @@ ch_record_state_transition(OldCR, NewCR) ->
true -> ok
end.
-deliver_immediately(Message, Delivered,
- State = #q{q = #amqqueue{name = QName},
- round_robin = RoundRobin,
- next_msg_id = NextId}) ->
- ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
+deliver_queue(Fun,
+ State = #q{q = #amqqueue{name = QName},
+ round_robin = RoundRobin,
+ next_msg_id = NextId}) ->
case queue:out(RoundRobin) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
@@ -171,62 +172,103 @@ deliver_immediately(Message, Delivered,
case not(AckRequired) orelse rabbit_limiter:can_send(
LimiterPid, self()) of
true ->
- rabbit_channel:deliver(
- ChPid, ConsumerTag, AckRequired,
- {QName, self(), NextId, Delivered, Message}),
- NewUAM = case AckRequired of
- true -> dict:store(NextId, Message, UAM);
- false -> UAM
- end,
- NewC = C#cr{unsent_message_count = Count + 1,
- unacked_messages = NewUAM},
- store_ch_record(NewC),
- NewConsumers =
- case ch_record_state_transition(C, NewC) of
- ok -> queue:in(QEntry, RoundRobinTail);
- block -> block_consumers(ChPid, RoundRobinTail)
- end,
- {offered, AckRequired, State#q{round_robin = NewConsumers,
- next_msg_id = NextId + 1}};
+ case Fun(State) of
+ {empty, State2} ->
+ {empty, State2};
+ {{MsgId, Msg, MsgSize, IsDelivered, AckTag, Remaining}, State2} ->
+ rabbit_channel:deliver(
+ ChPid, ConsumerTag, AckRequired,
+ {QName, self(), NextId, Delivered, Message}), %% TODO FIXME
+ NewUAM = case AckRequired of
+ true -> dict:store(NextId, Message, UAM);
+ false -> UAM
+ end,
+ NewC = C#cr{unsent_message_count = Count + 1,
+ unacked_messages = NewUAM},
+ store_ch_record(NewC),
+ NewConsumers =
+ case ch_record_state_transition(C, NewC) of
+ ok -> queue:in(QEntry, RoundRobinTail);
+ block -> block_consumers(ChPid, RoundRobinTail)
+ end,
+ State3 = State2 #q { round_robin = NewConsumers,
+ next_msg_id = NextId + 1
+ },
+ if Remaining == 0 -> {offered, AckRequired, State3};
+ true -> deliver_queue(Fun, State3)
+ end
+ end;
false ->
store_ch_record(C#cr{is_limit_active = true}),
NewConsumers = block_consumers(ChPid, RoundRobinTail),
- deliver_immediately(Message, Delivered,
- State#q{round_robin = NewConsumers})
+ deliver_queue(Fun, State#q{round_robin = NewConsumers})
end;
{empty, _} ->
{not_offered, State}
end.
+deliver_from_queue(State = #q { mixed_state = MS }) ->
+ {Res, MS2} = rabbit_mixed_queue:deliver(MS),
+ {Res, State #q { mixed_state = MS2 }}.
+
+run_message_queue(State) ->
+ case deliver_queue(deliver_from_queue/1, State) of
+ {not_offered, State2} ->
+ State2;
+ {empty, State2} ->
+ State2;
+ {offered, _AckRequired, State2} ->
+ State2
+ end.
+
attempt_delivery(none, Message, State) ->
- case deliver_immediately(Message, false, State) of
+ Fun = fun (State2) -> {{MsgId, Message, MsgSize, false, AckTag, 0}, State2} end, %% TODO FIX ME
+ case deliver_queue(Fun, State) of
{offered, false, State1} ->
{true, State1};
{offered, true, State1} ->
- persist_message(none, qname(State), Message), %% DQ HERE
- persist_delivery(qname(State), Message, false), %% DQ HERE
- {true, State1};
+ MS = rabbit_mixed_queue:publish_delivered(Message, State1 #q.mixed_state), %% TODO API CHANGE
+ {true, State1 #q { mixed_state = MS }};
{not_offered, State1} ->
{false, State1}
end;
attempt_delivery(Txn, Message, State) ->
- persist_message(Txn, qname(State), Message), %% DQ tx_commit and store msgid in txn map
- record_pending_message(Txn, Message), %% DQ seems to be done here!
- {true, State}.
+ MS = rabbit_mixed_queue:tx_publish(Message, State #q.mixed_state), %% TODO API CHANGE
+ record_pending_message(Txn, Message),
+ {true, State #q { mixed_state = MS }}.
deliver_or_enqueue(Txn, Message, State) ->
case attempt_delivery(Txn, Message, State) of
{true, NewState} ->
{true, NewState};
{false, NewState} ->
- persist_message(Txn, qname(State), Message), %% DQ Txn must be false here
- NewMB = queue:in({Message, false}, NewState#q.message_buffer), %% DQ magic here
- {false, NewState#q{message_buffer = NewMB}}
+ %% Txn is none
+ MS = rabbit_mixed_queue:publish(Message, State #q.mixed_state), %% TODO API CHANGE
+ {false, NewState #q { mixed_state = MS }}
+ end.
+
+%% all these messages have already been delivered at least once and
+%% not ack'd, but need to be either redelivered or requeued
+deliver_or_requeue_n(Messages, State) ->
+ {AutoAcks, Remaining} =
+ dropwhilefoldl(deliver_or_requeue_msg/2, {[], State}, Messages),
+ {ok, MS} = rabbit_mixed_queue:ack(lists:reverse(AutoAcks), State #q.mixed_state), %% TODO FIXME
+ case Remaining of
+ [] -> run_message_queue(State #q { mixed_state = MS });
+ _ -> {ok, MS2} = rabbit_mixed_queue:requeue(Remaining, MS), %% TODO FIXME
+ State #q { mixed_state = MS2 }
end.
-deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) ->
- run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)),
- State).
+deliver_or_requeue_msg(Message, {AcksAcc, State}) ->
+ Fun = fun (State2) -> {{MsgId, Message, MsgSize, true, AckTag, 0}, State2} end, %% TODO FIX ME
+ case deliver_queue(Fun, State) of
+ {offered, true, State1} ->
+ {true, {AcksAcc, State1}};
+ {offered, false, State1} ->
+ {true, {[AckTag|AcksAcc], State1}}; %% TODO FIXME where does AckTag come from?!
+ {not_offered, State1} ->
+ {false, {AcksAcc, State1}}
+ end.
block_consumers(ChPid, RoundRobin) ->
%%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ChPid, queue:to_list(RoundRobin)]),
@@ -257,7 +299,7 @@ possibly_unblock(State, ChPid, Update) ->
unblock -> NewRR = unblock_consumers(ChPid,
NewC#cr.consumers,
State#q.round_robin),
- run_poke_burst(State#q{round_robin = NewRR})
+ run_message_queue(State#q{round_robin = NewRR})
end
end.
@@ -300,9 +342,9 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
case check_auto_delete(
- deliver_or_enqueue_n(
- [{Message, true} ||
- {_Messsage_id, Message} <- dict:to_list(UAM)], %% DQ alter all this stuff?
+ deliver_or_requeue_n(
+ [Message ||
+ {_Messsage_id, Message} <- dict:to_list(UAM)],
State#q{
exclusive_consumer = case Holder of
{ChPid, _} -> none;
@@ -335,26 +377,6 @@ check_exclusive_access(none, true) ->
false -> in_use
end.
-run_poke_burst(State = #q{message_buffer = MessageBuffer}) ->
- run_poke_burst(MessageBuffer, State).
-
-run_poke_burst(MessageBuffer, State) ->
- case queue:out(MessageBuffer) of
- {{value, {Message, Delivered}}, BufferTail} ->
- case deliver_immediately(Message, Delivered, State) of
- {offered, true, NewState} ->
- persist_delivery(qname(State), Message, Delivered), %% DQ ack needed
- run_poke_burst(BufferTail, NewState);
- {offered, false, NewState} ->
- persist_auto_ack(qname(State), Message), %% DQ record? We don't persist acks anyway now...
- run_poke_burst(BufferTail, NewState);
- {not_offered, NewState} ->
- NewState#q{message_buffer = MessageBuffer}
- end;
- {empty, _} ->
- State#q{message_buffer = MessageBuffer}
- end.
-
is_unused() ->
is_unused1(get()).
@@ -371,62 +393,6 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
-persist_message(_Txn, _QName, #basic_message{is_persistent = false}) -> %% DQ
- ok;
-persist_message(Txn, QName, Message) ->
- M = Message#basic_message{
- %% don't persist any recoverable decoded properties, rebuild from properties_bin on restore
- content = rabbit_binary_parser:clear_decoded_content(
- Message#basic_message.content)},
- persist_work(Txn, QName,
- [{publish, M, {QName, M#basic_message.guid}}]).
-
-persist_delivery(_QName, _Message, %% DQ
- true) ->
- ok;
-persist_delivery(_QName, #basic_message{is_persistent = false}, %% DQ
- _Delivered) ->
- ok;
-persist_delivery(QName, #basic_message{guid = MsgId}, %% DQ
- _Delivered) ->
- persist_work(none, QName, [{deliver, {QName, MsgId}}]).
-
-persist_acks(Txn, QName, Messages) -> %% DQ
- persist_work(Txn, QName,
- [{ack, {QName, MsgId}} ||
- #basic_message{guid = MsgId, is_persistent = P} <- Messages,
- P]).
-
-persist_auto_ack(_QName, #basic_message{is_persistent = false}) ->
- ok;
-persist_auto_ack(QName, #basic_message{is_persistent = true, guid = MsgId}) ->
- %% auto-acks are always non-transactional
- rabbit_persister:dirty_work([{ack, {QName, MsgId}}]).
-
-persist_work(_Txn,_QName, []) ->
- ok;
-persist_work(none, _QName, WorkList) ->
- rabbit_persister:dirty_work(WorkList);
-persist_work(Txn, QName, WorkList) ->
- mark_tx_persistent(Txn),
- rabbit_persister:extend_transaction({Txn, QName}, WorkList).
-
-commit_work(Txn, QName) ->
- do_if_persistent(fun rabbit_persister:commit_transaction/1,
- Txn, QName).
-
-rollback_work(Txn, QName) ->
- do_if_persistent(fun rabbit_persister:rollback_transaction/1,
- Txn, QName).
-
-%% optimisation: don't do unnecessary work
-%% it would be nice if this was handled by the persister
-do_if_persistent(F, Txn, QName) ->
- case is_tx_persistent(Txn) of
- false -> ok;
- true -> ok = F({Txn, QName})
- end.
-
lookup_tx(Txn) ->
case get({txn, Txn}) of
undefined -> #tx{ch_pid = none,
@@ -448,54 +414,52 @@ all_tx_record() ->
all_tx() ->
[Txn || {{txn, Txn}, _} <- get()].
-mark_tx_persistent(Txn) ->
- Tx = lookup_tx(Txn),
- store_tx(Txn, Tx#tx{is_persistent = true}).
-
is_tx_persistent(Txn) ->
#tx{is_persistent = Res} = lookup_tx(Txn),
Res.
-record_pending_message(Txn, Message) ->
- Tx = #tx{pending_messages = Pending} = lookup_tx(Txn),
- store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}).
+record_pending_message(Txn, Message = #basic_message { is_persistent = IsPersistent }) ->
+ Tx = #tx{pending_messages = Pending, is_persistent = IsPersistentTxn } = lookup_tx(Txn),
+ store_tx(Txn, Tx #tx { pending_messages = [{Message, false} | Pending],
+ is_persistent = IsPersistentTxn orelse IsPersistent
+ }).
record_pending_acks(Txn, ChPid, MsgIds) ->
Tx = #tx{pending_acks = Pending} = lookup_tx(Txn),
store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], ch_pid = ChPid}).
-process_pending(Txn, State) ->
- #tx{ch_pid = ChPid,
- pending_messages = PendingMessages,
- pending_acks = PendingAcks} = lookup_tx(Txn),
+commit_transaction(Txn, State) ->
+ #tx { ch_pid = ChPid,
+ pending_messages = PendingMessages,
+ pending_acks = PendingAcks
+ } = lookup_tx(Txn),
+ PendingMessagesOrdered = lists:reverse(PendingMessages),
+ PendingAcksOrdered = lists:append(lists:reverse(PendingAcks)),
case lookup_ch(ChPid) of
- not_found -> ok;
- C = #cr{unacked_messages = UAM} ->
- {_Acked, Remaining} =
- collect_messages(lists:append(PendingAcks), UAM),
- store_ch_record(C#cr{unacked_messages = Remaining})
- end,
- deliver_or_enqueue_n(lists:reverse(PendingMessages), State).
+ not_found -> State;
+ C = #cr { unacked_messages = UAM } ->
+ {Acked, Remaining} =
+ collect_messages(PendingAcksAppended, UAM),
+ store_ch_record(C#cr{unacked_messages = Remaining}),
+ MS = rabbit_mixed_queue:tx_commit(PendingMessagesOrdered,
+ Acked,
+ State #q.mixed_state),
+ State #q { mixed_state = MS }
+ end.
+rollback_transaction(Txn, State) ->
+ #tx { pending_messages = PendingMessages
+ } = lookup_tx(Txn),
+ MS = rabbit_mixed_queue:tx_cancel(lists:reverse(PendingMessages), State #q.mixed_state),
+ State #q { mixed_state = MS }.
+
+%% {A, B} = collect_messages(C, D) %% A = C `intersect` D; B = D \\ C
+%% err, A = C `intersect` D , via projection through the dict that is A
collect_messages(MsgIds, UAM) ->
lists:mapfoldl(
fun (MsgId, D) -> {dict:fetch(MsgId, D), dict:erase(MsgId, D)} end,
UAM, MsgIds).
-purge_message_buffer(QName, MessageBuffer) ->
- Messages =
- [[Message || {Message, _Delivered} <-
- queue:to_list(MessageBuffer)] |
- lists:map(
- fun (#cr{unacked_messages = UAM}) ->
- [Message || {_MessageId, Message} <- dict:to_list(UAM)]
- end,
- all_ch_record())],
- %% the simplest, though certainly not the most obvious or
- %% efficient, way to purge messages from the persister is to
- %% artifically ack them.
- persist_acks(none, QName, lists:append(Messages)).
-
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
@@ -564,12 +528,11 @@ handle_call({deliver, Txn, Message}, _From, State) ->
reply(Delivered, NewState);
handle_call({commit, Txn}, From, State) ->
- ok = commit_work(Txn, qname(State)),
+ NewState = commit_transaction(Txn, State),
%% optimisation: we reply straight away so the sender can continue
gen_server2:reply(From, ok),
- NewState = process_pending(Txn, State),
erase_tx(Txn),
- noreply(NewState);
+ noreply(run_message_queue(NewState));
handle_call({notify_down, ChPid}, From, State) ->
%% optimisation: we reply straight away so the sender can continue
@@ -579,23 +542,25 @@ handle_call({notify_down, ChPid}, From, State) ->
handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName},
next_msg_id = NextId,
- message_buffer = MessageBuffer}) ->
- case queue:out(MessageBuffer) of
- {{value, {Message, Delivered}}, BufferTail} ->
+ mixed_state = MS
+ }) ->
+ case rabbit_mixed_queue:deliver(MS) of
+ {empty, MS2} -> reply(empty, State #q { mixed_state = MS2 });
+ {{MsgId, Msg, MsgSize, IsDelivered, AckTag, Remaining}, MS2} ->
AckRequired = not(NoAck),
- case AckRequired of
- true ->
- persist_delivery(QName, Message, Delivered),
- C = #cr{unacked_messages = UAM} = ch_record(ChPid),
- NewUAM = dict:store(NextId, Message, UAM),
- store_ch_record(C#cr{unacked_messages = NewUAM});
- false ->
- persist_auto_ack(QName, Message)
- end,
- Msg = {QName, self(), NextId, Delivered, Message},
- reply({ok, queue:len(BufferTail), Msg},
- State#q{message_buffer = BufferTail,
- next_msg_id = NextId + 1});
+ MS3 =
+ case AckRequired of
+ true ->
+ C = #cr{unacked_messages = UAM} = ch_record(ChPid),
+ NewUAM = dict:store(NextId, Message, UAM),
+ store_ch_record(C#cr{unacked_messages = NewUAM}),
+ MS2;
+ false ->
+ rabbit_mixed_queue:ack([AckTag], MS2)
+ end,
+ Message = {QName, self(), NextId, IsDelivered, Msg}, %% TODO, FIX UP
+ reply({ok, Remaining, Message},
+ State#q{next_msg_id = NextId + 1});
{empty, _} ->
reply(empty, State)
end;
@@ -630,7 +595,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
end,
round_robin = queue:in({ChPid, Consumer}, RoundRobin)},
ok = maybe_send_reply(ChPid, OkMsg),
- reply(ok, run_poke_burst(State1))
+ reply(ok, run_message_queue(State1))
end
end;
@@ -667,27 +632,29 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end;
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
- message_buffer = MessageBuffer,
+ mixed_state = MS,
round_robin = RoundRobin}) ->
- reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State);
+ {Length, MS2} = rabbit_mixed_queue:length(MS),
+ reply({ok, Name, Length, queue:len(RoundRobin)}, State #q { mixed_state = MS2 });
handle_call({delete, IfUnused, IfEmpty}, _From,
- State = #q{message_buffer = MessageBuffer}) ->
+ State = #q{message_buffer = MessageBuffer, mixed_state = MS}) ->
IsEmpty = queue:is_empty(MessageBuffer),
IsUnused = is_unused(),
+ {Length, MS2} = rabbit_mixed_queue:length(MS),
if
IfEmpty and not(IsEmpty) ->
reply({error, not_empty}, State);
IfUnused and not(IsUnused) ->
reply({error, in_use}, State);
true ->
- {stop, normal, {ok, queue:len(MessageBuffer)}, State}
+ {stop, normal, {ok, Length}, State #q { mixed_state = MS2 }}
end;
-handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) ->
- ok = purge_message_buffer(qname(State), MessageBuffer),
- reply({ok, queue:len(MessageBuffer)},
- State#q{message_buffer = queue:new()});
+handle_call(purge, _From, State) ->
+ {Count, MS} = rabbit_mixed_queue:purge(State #q.mixed_state),
+ reply({ok, Count},
+ State #q { mixed_state = MS });
handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
exclusive_consumer = Holder}) ->
@@ -722,23 +689,24 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
noreply(State);
C = #cr{unacked_messages = UAM} ->
{Acked, Remaining} = collect_messages(MsgIds, UAM),
- persist_acks(Txn, qname(State), Acked),
case Txn of
none ->
- store_ch_record(C#cr{unacked_messages = Remaining});
+ MS = rabbit_mixed_queue:ack(Acked, State #q.mixed_state), %% TODO API
+ store_ch_record(C#cr{unacked_messages = Remaining}),
+ noreply(State #q { mixed_state = MS });
_ ->
- record_pending_acks(Txn, ChPid, MsgIds)
+ record_pending_acks(Txn, ChPid, MsgIds),
+ noreply(State)
end,
- noreply(State)
end;
handle_cast({rollback, Txn}, State) ->
- ok = rollback_work(Txn, qname(State)),
+ NewState = rollback_transaction(Txn, State),
erase_tx(Txn),
- noreply(State);
+ noreply(State2);
handle_cast({redeliver, Messages}, State) ->
- noreply(deliver_or_enqueue_n(Messages, State));
+ noreply(ok); %% TODO - probably remove - only used by the old persister
handle_cast({requeue, MsgIds, ChPid}, State) ->
case lookup_ch(ChPid) of
@@ -749,8 +717,7 @@ handle_cast({requeue, MsgIds, ChPid}, State) ->
C = #cr{unacked_messages = UAM} ->
{Messages, NewUAM} = collect_messages(MsgIds, UAM),
store_ch_record(C#cr{unacked_messages = NewUAM}),
- noreply(deliver_or_enqueue_n(
- [{Message, true} || Message <- Messages], State))
+ noreply(deliver_or_requeue_n(Messages, State))
end;
handle_cast({unblock, ChPid}, State) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index de7bc010b2..f90abe3ffb 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -51,6 +51,7 @@
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
+-export([dropwhilefoldl/3]).
-import(mnesia).
-import(lists).
@@ -407,3 +408,12 @@ stop_applications(Apps) ->
cannot_stop_application,
Apps).
+dropwhilefoldl(_PredFun, Acc0, []) ->
+ {Acc0, []};
+dropwhilefoldl(PredFun, Acc0, [E|List]) ->
+ case PredFun(E, Acc0) of
+ {true, Acc1} ->
+ dropwhilefoldl(PredFun, Acc1, List);
+ {false, Acc1} ->
+ {Acc1, List}
+ end.
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 811d140a4e..790f4b756e 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -87,9 +87,12 @@ deliver(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, next_write
remove_noacks(Acks) ->
lists:filter(fun (A) -> A /= noack end, Acks).
-ack(Acks, State = #mqstate { queue = Q }) ->
- ok = rabbit_disk_queue:ack(Q, remove_noacks(Acks)),
- {ok, State}.
+ack(Acks, State = #mqstate { queue = Q }) ->
+ case remove_noacks(Acks) of
+ [] -> {ok, State};
+ AckTags -> ok = rabbit_disk_queue:ack(Q, AckTags),
+ {ok, State}
+ end.
tx_publish(MsgId, Msg, _IsPersistent, State = #mqstate { mode = disk }) ->
ok = rabbit_disk_queue:tx_publish(MsgId, Msg),