summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-19 14:31:40 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-19 14:31:40 +0100
commit4602409e598987b94bc9a519d0e64c1694a06e00 (patch)
treebeafd634cfaa4b605a13839032a74941031be55f /src
parent42b0b0f3695edeb479164738332dad172583082f (diff)
downloadrabbitmq-server-git-4602409e598987b94bc9a519d0e64c1694a06e00.tar.gz
Sort out handling of acktags in the queue_process. Use a set throughout - this avoids the unpleasant lists:usort on every ack (which is what ordsets:from_list does). Also, don't bother with the intersection - just assume that we're only given acks we really know about, which the channel should be able to guarantee. This results in a performance improvement from MulticastMain -s 0 -r 12750 to MulticastMain -s 0 -r 13900 : i.e. 9%
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl61
1 files changed, 28 insertions, 33 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 720d390abf..a0bb3b0bd6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -69,7 +69,7 @@
ch_pid,
limiter_pid,
monitor_ref,
- unacked_messages,
+ acktags,
is_limit_active,
txn,
unsent_message_count}).
@@ -230,7 +230,7 @@ ch_record(ChPid) ->
C = #cr{consumer_count = 0,
ch_pid = ChPid,
monitor_ref = MonitorRef,
- unacked_messages = [],
+ acktags = sets:new(),
is_limit_active = false,
txn = none,
unsent_message_count = 0},
@@ -271,7 +271,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
ActiveConsumersTail} ->
C = #cr{limiter_pid = LimiterPid,
unsent_message_count = Count,
- unacked_messages = UAM} = ch_record(ChPid),
+ acktags = ChAckTags} = ch_record(ChPid),
IsMsgReady = PredFun(FunAcc, State),
case (IsMsgReady andalso
rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of
@@ -281,12 +281,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
- NewUAM = case AckRequired of
- true -> [AckTag|UAM];
- false -> UAM
- end,
+ ChAckTags1 = case AckRequired of
+ true -> sets:add_element(AckTag, ChAckTags);
+ false -> ChAckTags
+ end,
NewC = C#cr{unsent_message_count = Count + 1,
- unacked_messages = NewUAM},
+ acktags = ChAckTags1},
store_ch_record(NewC),
{NewActiveConsumers, NewBlockedConsumers} =
case ch_record_state_transition(C, NewC) of
@@ -414,7 +414,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
not_found ->
{ok, State};
#cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn,
- unacked_messages = UAM} ->
+ acktags = ChAckTags} ->
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
State1 = State#q{
@@ -433,7 +433,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
_ -> rollback_transaction(Txn, ChPid,
State1)
end,
- {ok, requeue_and_run(UAM, State2)}
+ {ok, requeue_and_run(ChAckTags, State2)}
end
end.
@@ -472,25 +472,21 @@ commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
{AckTags, BQS1} = BQ:tx_commit(Txn, From, BQS),
%% ChPid must be known here because of the participant management
%% by the channel.
- C = #cr{unacked_messages = UAM} = lookup_ch(ChPid),
- Remaining = ordsets:to_list(ordsets:subtract(ordsets:from_list(UAM),
- ordsets:from_list(AckTags))),
- store_ch_record(C#cr{unacked_messages = Remaining, txn = none}),
+ C = #cr{acktags = ChAckTags} = lookup_ch(ChPid),
+ ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ store_ch_record(C#cr{acktags = ChAckTags1, txn = none}),
State#q{backing_queue_state = BQS1}.
rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ backing_queue_state = BQS}) ->
{_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
%% Iff we removed acktags from the channel record on ack+txn then
%% we would add them back in here (would also require ChPid)
record_current_channel_tx(ChPid, none),
State#q{backing_queue_state = BQS1}.
-collect_messages(AckTags, UAM) ->
- AckTagsSet = ordsets:from_list(AckTags),
- UAMSet = ordsets:from_list(UAM),
- {ordsets:to_list(ordsets:intersection(AckTagsSet, UAMSet)),
- ordsets:to_list(ordsets:subtract(UAMSet, AckTagsSet))}.
+subtract_acks(A, B) when is_list(B) ->
+ lists:foldl(fun sets:del_element/2, A, B).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -515,8 +511,7 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:len(BQS);
i(messages_unacknowledged, _) ->
- lists:sum([ordsets:size(UAM) ||
- #cr{unacked_messages = UAM} <- all_ch_record()]);
+ lists:sum([sets:size(C#cr.acktags) || C <- all_ch_record()]);
i(messages, State) ->
lists:sum([i(Item, State) || Item <- [messages_ready,
messages_unacknowledged]]);
@@ -600,8 +595,9 @@ handle_call({basic_get, ChPid, NoAck}, _From,
{empty, BQS1} -> reply(empty, State#q{backing_queue_state = BQS1});
{{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
case AckRequired of
- true -> C = #cr{unacked_messages = UAM} = ch_record(ChPid),
- store_ch_record(C#cr{unacked_messages = [AckTag|UAM]});
+ true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
+ store_ch_record(
+ C#cr{acktags = sets:add_element(AckTag, ChAckTags)});
false -> ok
end,
reply({ok, Remaining, {QName, self(), AckTag, IsDelivered, Message}},
@@ -753,13 +749,12 @@ handle_cast({ack, Txn, AckTags, ChPid}, State = #q{backing_queue_state = BQS,
case lookup_ch(ChPid) of
not_found ->
noreply(State);
- C = #cr{unacked_messages = UAM} ->
- {AckTags1, Remaining} = collect_messages(AckTags, UAM),
+ C = #cr{acktags = ChAckTags} ->
{C1, BQS1} =
case Txn of
- none -> {C#cr{unacked_messages = Remaining},
- BQ:ack(AckTags1, BQS)};
- _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags1, BQS)}
+ none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)};
+ _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)}
end,
store_ch_record(C1),
noreply(State #q { backing_queue_state = BQS1 })
@@ -774,10 +769,10 @@ handle_cast({requeue, AckTags, ChPid}, State) ->
rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
[ChPid]),
noreply(State);
- C = #cr{unacked_messages = UAM} ->
- {AckTags1, Remaining} = collect_messages(AckTags, UAM),
- store_ch_record(C#cr{unacked_messages = Remaining}),
- noreply(requeue_and_run(AckTags1, State))
+ C = #cr{acktags = ChAckTags} ->
+ ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ store_ch_record(C#cr{acktags = ChAckTags1}),
+ noreply(requeue_and_run(AckTags, State))
end;
handle_cast({unblock, ChPid}, State) ->