summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-14 11:47:26 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-14 11:47:26 +0000
commitea94de35ecb237762c5c0d0ff33af8c42ff1da95 (patch)
tree7a156cd9bdff08090a3d0c3f5eef4f418d1e4f3d /src
parentd8d0420bbecd508ad00655b38fa671b3c9ebea09 (diff)
downloadrabbitmq-server-git-ea94de35ecb237762c5c0d0ff33af8c42ff1da95.tar.gz
refactoring
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl32
-rw-r--r--src/rabbit_misc.erl13
2 files changed, 25 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d5d68c7aa0..76b5defb68 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -771,18 +771,9 @@ dead_letter_msg(Msg, AckTag, Reason,
_ -> State3 =
lists:foldl(
fun(QPid, State0 = #q{unconfirmed_qm = UQM}) ->
- case gb_trees:lookup(QPid, UQM) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:insert(MsgSeqNo,
- MsgSeqNos),
- UQM1 = gb_trees:update(QPid, MsgSeqNos1,
- UQM),
- State0#q{unconfirmed_qm = UQM1};
- none ->
- S = gb_sets:singleton(MsgSeqNo),
- UQM1 = gb_trees:insert(QPid, S, UQM),
- State0#q{unconfirmed_qm = UQM1}
- end
+ UQM1 = rabbit_misc:gb_trees_set_insert(
+ QPid, MsgSeqNo, UQM),
+ State0#q{unconfirmed_qm = UQM1}
end, State2, QPids),
noreply(State3#q{
unconfirmed_mq =
@@ -807,18 +798,21 @@ demonitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
end.
handle_queue_down(QPid, State = #q{queue_monitors = QMons,
- unconfirmed_mq = UMQ}) ->
+ unconfirmed_qm = UQM}) ->
case dict:find(QPid, QMons) of
error ->
noreply(State);
{ok, _} ->
#resource{name = QName} = qname(State),
rabbit_log:info("DLQ ~p (for ~p) died~n", [QPid, QName]),
- MsgSeqNos = [MsgSeqNo ||
- {MsgSeqNo, {QPids, _}} <- gb_trees:to_list(UMQ),
- gb_sets:is_member(QPid, QPids)],
- handle_confirm(MsgSeqNos, QPid,
- State#q{queue_monitors = dict:erase(QPid, QMons)})
+ case gb_trees:lookup(QPid, UQM) of
+ none ->
+ noreply(State);
+ {value, MsgSeqNosSet} ->
+ handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid,
+ State#q{queue_monitors =
+ dict:erase(QPid, QMons)})
+ end
end.
handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ,
@@ -866,7 +860,7 @@ cleanup_after_confirm(State = #q{blocked_op = Op,
end,
{stop, normal, State1};
_ ->
- noreply(State1#q{blocked_op = Op})
+ noreply(State)
end.
already_been_here(#delivery{message = #basic_message{content = Content}},
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index afe136043c..f8c8d48233 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -45,7 +45,8 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
--export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
+-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3,
+ gb_trees_set_insert/3]).
-export([gb_trees_fold/3, gb_trees_foreach/2]).
-export([get_options/2]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
@@ -172,6 +173,7 @@
-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()).
-spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()).
+-spec(gb_trees_set_insert/3 :: (any(), any(), gb_tree()) -> gb_tree()).
-spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A).
-spec(gb_trees_foreach/2 ::
(fun ((any(), any()) -> any()), gb_tree()) -> 'ok').
@@ -704,6 +706,15 @@ gb_trees_cons(Key, Value, Tree) ->
none -> gb_trees:insert(Key, [Value], Tree)
end.
+gb_trees_set_insert(Key, Value, Tree) ->
+ case gb_trees:lookup(Key, Tree) of
+ {value, Values} ->
+ Values1 = gb_sets:insert(Value, Values),
+ gb_trees:update(Key, Values1, Tree);
+ none ->
+ gb_trees:insert(Key, gb_sets:singleton(Value), Tree)
+ end.
+
gb_trees_fold(Fun, Acc, Tree) ->
gb_trees_fold1(Fun, Acc, gb_trees:next(gb_trees:iterator(Tree))).