summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl30
-rw-r--r--src/rabbit_channel.erl51
2 files changed, 53 insertions, 28 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d7cd9fb1c7..4341c3d6d8 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -352,7 +352,7 @@ ch_record(ChPid) ->
undefined -> MonitorRef = erlang:monitor(process, ChPid),
C = #cr{ch_pid = ChPid,
monitor_ref = MonitorRef,
- acktags = sets:new(),
+ acktags = queue:new(),
consumer_count = 0,
blocked_consumers = queue:new(),
is_limit_active = false,
@@ -366,9 +366,9 @@ ch_record(ChPid) ->
update_ch_record(C = #cr{consumer_count = ConsumerCount,
acktags = ChAckTags,
unsent_message_count = UnsentMessageCount}) ->
- case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of
- {0, 0, 0} -> ok = erase_ch_record(C);
- _ -> ok = store_ch_record(C)
+ case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of
+ {true, 0, 0} -> ok = erase_ch_record(C);
+ _ -> ok = store_ch_record(C)
end,
C.
@@ -451,7 +451,7 @@ deliver_msg_to_consumer(DeliverFun,
rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
- true -> sets:add_element(AckTag, ChAckTags);
+ true -> queue:in(AckTag, ChAckTags);
false -> ChAckTags
end,
update_ch_record(C#cr{acktags = ChAckTags1,
@@ -638,7 +638,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
senders = Senders1},
case should_auto_delete(State1) of
true -> {stop, State1};
- false -> {ok, requeue_and_run(sets:to_list(ChAckTags),
+ false -> {ok, requeue_and_run(queue:to_list(ChAckTags),
ensure_expiry_timer(State1))}
end
end.
@@ -677,11 +677,21 @@ subtract_acks(ChPid, AckTags, State, Fun) ->
not_found ->
State;
C = #cr{acktags = ChAckTags} ->
- update_ch_record(C#cr{acktags = lists:foldl(fun sets:del_element/2,
- ChAckTags, AckTags)}),
+ update_ch_record(
+ C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}),
Fun(State)
end.
+subtract_acks([], [], AckQ) ->
+ AckQ;
+subtract_acks([], Prefix, AckQ) ->
+ queue:join(queue:from_list(lists:reverse(Prefix)), AckQ);
+subtract_acks([T | TL] = AckTags, Prefix, AckQ) ->
+ case queue:out(AckQ) of
+ {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail);
+ {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail)
+ end.
+
message_properties(Message, Confirm, #q{ttl = TTL}) ->
#message_properties{expiry = calculate_msg_expiry(Message, TTL),
needs_confirming = Confirm == eventually}.
@@ -886,7 +896,7 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:len(BQS);
i(messages_unacknowledged, _) ->
- lists:sum([sets:size(C#cr.acktags) || C <- all_ch_record()]);
+ lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]);
i(messages, State) ->
lists:sum([i(Item, State) || Item <- [messages_ready,
messages_unacknowledged]]);
@@ -1042,7 +1052,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
- ChAckTags1 = sets:add_element(AckTag, ChAckTags),
+ ChAckTags1 = queue:in(AckTag, ChAckTags),
update_ch_record(C#cr{acktags = ChAckTags1}),
State2;
false -> State2
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 2686d76d9c..831058db31 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -40,7 +40,14 @@
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
unconfirmed, confirmed, capabilities, trace_state}).
--record(tx, {msgs, acks, nacks}).
+
+-record(tx, {msgs, acks}). %% (1)
+%% (1) acks looks s.t. like this:
+%% [{true,[[6,7,8],[5]]},{ack,[[4],[1,2,3]]}, ...]
+%%
+%% Each element is a pair consisting of a tag and a list of lists of
+%% ack'ed/reject'ed msg ids. The tag is one of 'ack' (to ack), 'true'
+%% (reject w requeue), 'false' (reject w/o requeue).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -647,7 +654,8 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
case Tx of
none -> ack(Acked, State1),
State1;
- #tx{acks = Acks} -> State1#ch{tx = Tx#tx{acks = Acked ++ Acks}}
+ #tx{acks = Acks} -> Acks1 = ack_cons(ack, Acked, Acks),
+ State1#ch{tx = Tx#tx{acks = Acks1}}
end};
handle_method(#'basic.get'{queue = QueueNameBin,
@@ -1032,24 +1040,23 @@ handle_method(#'tx.select'{}, _, State) ->
handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
-handle_method(#'tx.commit'{}, _, State = #ch{tx = #tx{msgs = Msgs,
- acks = Acks,
- nacks = Nacks},
+handle_method(#'tx.commit'{}, _, State = #ch{tx = #tx{msgs = Msgs,
+ acks = Acks},
limiter = Limiter}) ->
State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs),
- ack(Acks, State1),
lists:foreach(
- fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, Nacks),
+ fun ({ack, A}) -> ack(append_reverse(A), State1);
+ ({Requeue, A}) -> reject(Requeue, append_reverse(A), Limiter)
+ end, lists:reverse(Acks)),
{noreply, maybe_complete_tx(State1#ch{tx = committing})};
handle_method(#'tx.rollback'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- tx = #tx{acks = Acks,
- nacks = Nacks}}) ->
- NacksL = lists:append([L || {_, L} <- Nacks]),
- UAMQ1 = queue:from_list(lists:usort(Acks ++ NacksL ++ queue:to_list(UAMQ))),
+ tx = #tx{acks = Acks}}) ->
+ AcksL = append_reverse([append_reverse(L) || {_, L} <- Acks]),
+ UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))),
{reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1,
tx = new_tx()}};
@@ -1206,10 +1213,10 @@ reject(DeliveryTag, Requeue, Multiple,
State1 = State#ch{unacked_message_q = Remaining},
{noreply,
case Tx of
- none -> reject(Requeue, Acked, State1#ch.limiter),
- State1;
- #tx{nacks = Nacks} -> Nacks1 = [{Requeue, Acked} | Nacks],
- State1#ch{tx = Tx#tx{nacks = Nacks1}}
+ none -> reject(Requeue, Acked, State1#ch.limiter),
+ State1;
+ #tx{acks = Acks} -> Acks1 = ack_cons(Requeue, Acked, Acks),
+ State1#ch{tx = Tx#tx{acks = Acks1}}
end}.
reject(Requeue, Acked, Limiter) ->
@@ -1252,7 +1259,10 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
{{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}},
QTail} ->
if CurrentDeliveryTag == DeliveryTag ->
- {[UnackedMsg | ToAcc],
+ {case ToAcc of
+ [] -> [UnackedMsg];
+ _ -> lists:reverse([UnackedMsg | ToAcc])
+ end,
case PrefixAcc of
[] -> QTail;
_ -> queue:join(
@@ -1282,7 +1292,7 @@ ack(Acked, State = #ch{queue_names = QNames}) ->
end, Acked),
ok = notify_limiter(State#ch.limiter, Acked).
-new_tx() -> #tx{msgs = queue:new(), acks = [], nacks = []}.
+new_tx() -> #tx{msgs = queue:new(), acks = []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -1299,7 +1309,7 @@ foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
foreach_per_queue(F, UAL) ->
T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
rabbit_misc:gb_trees_cons(QPid, MsgId, T)
- end, gb_trees:empty(), UAL),
+ end, gb_trees:empty(), lists:reverse(UAL)),
rabbit_misc:gb_trees_foreach(F, T).
enable_limiter(State = #ch{unacked_message_q = UAMQ,
@@ -1440,6 +1450,11 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss],
State.
+append_reverse(L) -> lists:append(lists:reverse(L)).
+
+ack_cons(Tag, Acked, [{Tag, Acks} | L]) -> [{Tag, [Acked | Acks]} | L];
+ack_cons(Tag, Acked, Acks) -> [{Tag, [Acked]} | Acks].
+
maybe_complete_tx(State = #ch{tx = #tx{}}) ->
State;
maybe_complete_tx(State = #ch{unconfirmed = UC}) ->