summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-14 22:02:34 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-14 22:02:34 +0000
commit50a345df8f8d165e443ad5c25f57b118f7c4b139 (patch)
tree54e750d9073f37674186ca3357def6a48a91b6f2
parenta9f617968b7b0cd3464ce30fd007340e919f4ff4 (diff)
parent917b43d758f0df8f786177a2b7d31be89b3b3347 (diff)
downloadrabbitmq-server-git-50a345df8f8d165e443ad5c25f57b118f7c4b139.tar.gz
merge default into bug20337
-rw-r--r--src/rabbit_amqqueue_process.erl109
-rw-r--r--src/rabbit_node_monitor.erl21
-rw-r--r--src/rabbit_variable_queue.erl14
3 files changed, 63 insertions, 81 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 76b5defb68..68032d7641 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -229,24 +229,24 @@ bq_init(BQ, Q, Recover) ->
end).
process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
- lists:foldl(fun({Arg, Fun}, State1) ->
- case rabbit_misc:table_lookup(Arguments, Arg) of
- {_Type, Val} -> Fun(Val, State1);
- undefined -> State1
- end
- end, State, [{<<"x-expires">>, fun init_expires/2},
- {<<"x-message-ttl">>, fun init_ttl/2},
- {<<"x-dead-letter-exchange">>, fun init_dlx/2},
- {<<"x-dead-letter-routing-key">>,
- fun init_dlx_routing_key/2}]).
+ lists:foldl(
+ fun({Arg, Fun}, State1) ->
+ case rabbit_misc:table_lookup(Arguments, Arg) of
+ {_Type, Val} -> Fun(Val, State1);
+ undefined -> State1
+ end
+ end, State,
+ [{<<"x-expires">>, fun init_expires/2},
+ {<<"x-message-ttl">>, fun init_ttl/2},
+ {<<"x-dead-letter-exchange">>, fun init_dlx/2},
+ {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}).
-init_dlx(DLX, State = #q{q = #amqqueue{name = #resource{
- virtual_host = VHostPath}}}) ->
- State#q{dlx = rabbit_misc:r(VHostPath, exchange, DLX)}.
+init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
+ State#q{dlx = rabbit_misc:r(QName, exchange, DLX)}.
init_dlx_routing_key(RoutingKey, State) ->
State#q{dlx_routing_key = RoutingKey}.
@@ -863,6 +863,8 @@ cleanup_after_confirm(State = #q{blocked_op = Op,
noreply(State)
end.
+already_been_here(_Delivery, #q{dlx = undefined}) ->
+ false;
already_been_here(#delivery{message = #basic_message{content = Content}},
State) ->
#content{properties = #'P_basic'{headers = Headers}} =
@@ -951,15 +953,12 @@ infos(Items, State) ->
|| Item <- (Items1 -- [synchronised_slave_pids])].
slaves_status(#q{q = #amqqueue{name = Name}}) ->
- {ok, #amqqueue{mirror_nodes = MNodes, slave_pids = SPids}} =
- rabbit_amqqueue:lookup(Name),
- case MNodes of
- undefined ->
+ case rabbit_amqqueue:lookup(Name) of
+ {ok, #amqqueue{mirror_nodes = undefined}} ->
[{slave_pids, ''}, {synchronised_slave_pids, ''}];
- _ ->
+ {ok, #amqqueue{slave_pids = SPids}} ->
{Results, _Bad} =
- delegate:invoke(
- SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end),
+ delegate:invoke(SPids, fun rabbit_mirror_queue_slave:info/1),
{SPids1, SSPids} =
lists:foldl(
fun ({Pid, Infos}, {SPidsN, SSPidsN}) ->
@@ -1003,11 +1002,9 @@ i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
- {ok, #amqqueue{mirror_nodes = MNodes,
- slave_pids = SPids}} = rabbit_amqqueue:lookup(Name),
- case MNodes of
- undefined -> [];
- _ -> SPids
+ case rabbit_amqqueue:lookup(Name) of
+ {ok, #amqqueue{mirror_nodes = undefined}} -> [];
+ {ok, #amqqueue{slave_pids = SPids}} -> SPids
end;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
@@ -1234,12 +1231,9 @@ handle_call({delete, IfUnused, IfEmpty}, From,
IsEmpty = BQ:is_empty(BQS),
IsUnused = is_unused(State),
if
- IfEmpty and not(IsEmpty) ->
- reply({error, not_empty}, State);
- IfUnused and not(IsUnused) ->
- reply({error, in_use}, State);
- true ->
- dead_letter_deleted_queue(From, State)
+ IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
+ IfUnused and not(IsUnused) -> reply({error, in_use}, State);
+ true -> dead_letter_deleted_queue(From, State)
end;
handle_call(purge, _From, State = #q{backing_queue = BQ,
@@ -1250,10 +1244,9 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
handle_call(purge, From, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- BQS1 = BQ:dropwhile(
- fun (_) -> true end,
- mk_dead_letter_fun(queue_purged, State),
- BQS),
+ BQS1 = BQ:dropwhile(fun (_) -> true end,
+ mk_dead_letter_fun(queue_purged, State),
+ BQS),
case BQ:len(BQS) of
0 -> reply({ok, 0}, State#q{backing_queue_state = BQS1});
_ -> noreply(
@@ -1271,37 +1264,23 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
handle_cast({deliver, Delivery = #delivery{sender = Sender,
- msg_seq_no = MsgSeqNo},
- Flow}, State = #q{dlx = DLX}) ->
+ msg_seq_no = MsgSeqNo}, Flow},
+ State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- ShouldDeliver =
- case DLX of
- undefined ->
- true;
- _ ->
- case already_been_here(Delivery, State) of
- false -> true;
- Qs -> log_cycle_once(Qs),
- rabbit_misc:confirm_to_sender(Sender,
- [MsgSeqNo]),
- false
- end
- end,
- case ShouldDeliver of
- false -> noreply(State);
- true -> case Flow of
- flow ->
- Key = {ch_publisher, Sender},
- case get(Key) of
- undefined -> put(Key, erlang:monitor(process,
- Sender));
- _ -> ok
- end,
- credit_flow:ack(Sender);
- noflow ->
- ok
- end,
- noreply(deliver_or_enqueue(Delivery, State))
+ case Flow of
+ flow -> Key = {ch_publisher, Sender},
+ case get(Key) of
+ undefined -> put(Key, erlang:monitor(process, Sender));
+ _ -> ok
+ end,
+ credit_flow:ack(Sender);
+ noflow -> ok
+ end,
+ case already_been_here(Delivery, State) of
+ false -> noreply(deliver_or_enqueue(Delivery, State));
+ Qs -> log_cycle_once(Qs),
+ rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]),
+ noreply(State)
end;
handle_cast({ack, AckTags, ChPid}, State) ->
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 54a7add2e6..323cf0ce9e 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -61,23 +61,26 @@ notify_cluster() ->
%%--------------------------------------------------------------------
init([]) ->
- {ok, no_state}.
+ {ok, ordsets:new()}.
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast({rabbit_running_on, Node}, State) ->
- rabbit_log:info("rabbit on ~p up~n", [Node]),
- erlang:monitor(process, {rabbit, Node}),
- ok = handle_live_rabbit(Node),
- {noreply, State};
+handle_cast({rabbit_running_on, Node}, Nodes) ->
+ case ordsets:is_element(Node, Nodes) of
+ true -> {noreply, Nodes};
+ false -> rabbit_log:info("rabbit on node ~p up~n", [Node]),
+ erlang:monitor(process, {rabbit, Node}),
+ ok = handle_live_rabbit(Node),
+ {noreply, ordsets:add_element(Node, Nodes)}
+ end;
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
- rabbit_log:info("node ~p lost 'rabbit'~n", [Node]),
+handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Nodes) ->
+ rabbit_log:info("rabbit on node ~p down~n", [Node]),
ok = handle_dead_rabbit(Node),
- {noreply, State};
+ {noreply, ordsets:del_element(Node, Nodes)};
handle_info(_Info, State) ->
{noreply, State}.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index a4a2d655cb..95b47d8343 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -593,8 +593,8 @@ dropwhile(Pred, MsgFun, State) ->
{true, _} ->
{{_, _, AckTag, _}, State2} =
internal_fetch(true, MsgStatus, State1),
- State3 = MsgFun(read_msg_callback(MsgStatus), AckTag, State2),
- dropwhile(Pred, MsgFun, State3);
+ dropwhile(Pred, MsgFun, MsgFun(read_msg_callback(MsgStatus),
+ AckTag, State2));
{false, _} ->
a(in_r(MsgStatus, State1))
end
@@ -648,11 +648,11 @@ ack(AckTags, undefined, State) ->
ack_out_counter = AckOutCount + length(AckTags) })};
ack(AckTags, MsgFun, State = #vqstate{pending_ack = PA}) ->
- State2 = lists:foldl(fun(SeqId, State1) ->
- AckEntry = gb_trees:get(SeqId, PA),
- MsgFun(read_msg_callback(AckEntry), SeqId, State1)
- end, State, AckTags),
- {[], State2}.
+ {[], lists:foldl(
+ fun(SeqId, State1) ->
+ AckEntry = gb_trees:get(SeqId, PA),
+ MsgFun(read_msg_callback(AckEntry), SeqId, State1)
+ end, State, AckTags)}.
requeue(AckTags, #vqstate { delta = Delta,
q3 = Q3,