summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-28 12:29:37 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-28 12:29:37 +0000
commit4435c748294379add22aca9e3ee7273113daee18 (patch)
tree260b85749a269e21313ff119a07ed3510a333f68 /src
parent168c060308e2ccdba8ef6f8e06fe2e87400bc3b4 (diff)
parent5e78aacf560b504834559e7e2b5a1fc9118983bd (diff)
downloadrabbitmq-server-git-4435c748294379add22aca9e3ee7273113daee18.tar.gz
merge bug24650 into default (Move names for connections and channels from mgmt to broker)
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl18
-rw-r--r--src/rabbit_amqqueue.erl25
-rw-r--r--src/rabbit_amqqueue_process.erl349
-rw-r--r--src/rabbit_backing_queue.erl14
-rw-r--r--src/rabbit_basic.erl33
-rw-r--r--src/rabbit_channel.erl17
-rw-r--r--src/rabbit_mirror_queue_master.erl22
-rw-r--r--src/rabbit_mirror_queue_slave.erl13
-rw-r--r--src/rabbit_misc.erl26
-rw-r--r--src/rabbit_ssl.erl41
-rw-r--r--src/rabbit_tests.erl7
-rw-r--r--src/rabbit_variable_queue.erl43
12 files changed, 506 insertions, 102 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 0a0ca90a63..dd5fb89ce4 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -212,14 +212,13 @@
-type(file_suffix() :: binary()).
%% this really should be an abstract type
-type(log_location() :: 'tty' | 'undefined' | file:filename()).
+-type(param() :: atom()).
-spec(maybe_hipe_compile/0 :: () -> 'ok').
-spec(prepare/0 :: () -> 'ok').
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_halt/0 :: () -> no_return()).
--spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())).
--spec(force_event_refresh/0 :: () -> 'ok').
-spec(status/0 ::
() -> [{pid, integer()} |
{running_applications, [{atom(), string(), string()}]} |
@@ -228,12 +227,11 @@
{memory, any()}]).
-spec(is_running/0 :: () -> boolean()).
-spec(is_running/1 :: (node()) -> boolean()).
--spec(environment/0 :: () -> [{atom() | term()}]).
--spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()).
+-spec(environment/0 :: () -> [{param() | term()}]).
+-spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())).
+-spec(force_event_refresh/0 :: () -> 'ok').
--spec(maybe_insert_default_data/0 :: () -> 'ok').
--spec(boot_delegate/0 :: () -> 'ok').
--spec(recover/0 :: () -> 'ok').
+-spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()).
-spec(start/2 :: ('normal',[]) ->
{'error',
@@ -243,6 +241,10 @@
{'ok',pid()}).
-spec(stop/1 :: (_) -> 'ok').
+-spec(maybe_insert_default_data/0 :: () -> 'ok').
+-spec(boot_delegate/0 :: () -> 'ok').
+-spec(recover/0 :: () -> 'ok').
+
-endif.
%%----------------------------------------------------------------------------
@@ -712,6 +714,6 @@ config_files() ->
case init:get_argument(config) of
{ok, Files} -> [filename:absname(
filename:rootname(File, ".config") ++ ".config") ||
- File <- Files];
+ [File] <- Files];
error -> []
end.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a7dfd535c8..c95efa1447 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -334,12 +334,23 @@ check_declare_arguments(QueueName, Args) ->
precondition_failed,
"invalid arg '~s' for ~s: ~255p",
[Key, rabbit_misc:rs(QueueName), Error])
- end || {Key, Fun} <-
+ end ||
+ {Key, Fun} <-
[{<<"x-expires">>, fun check_integer_argument/2},
{<<"x-message-ttl">>, fun check_integer_argument/2},
- {<<"x-ha-policy">>, fun check_ha_policy_argument/2}]],
+ {<<"x-ha-policy">>, fun check_ha_policy_argument/2},
+ {<<"x-dead-letter-exchange">>, fun check_string_argument/2},
+ {<<"x-dead-letter-routing-key">>,
+ fun check_dlxrk_argument/2}]],
ok.
+check_string_argument(undefined, _Args) ->
+ ok;
+check_string_argument({longstr, _}, _Args) ->
+ ok;
+check_string_argument({Type, _}, _) ->
+ {error, {unacceptable_type, Type}}.
+
check_integer_argument(undefined, _Args) ->
ok;
check_integer_argument({Type, Val}, _Args) when Val > 0 ->
@@ -350,6 +361,16 @@ check_integer_argument({Type, Val}, _Args) when Val > 0 ->
check_integer_argument({_Type, Val}, _Args) ->
{error, {value_zero_or_less, Val}}.
+check_dlxrk_argument(undefined, _Args) ->
+ ok;
+check_dlxrk_argument({longstr, _}, Args) ->
+ case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of
+ undefined -> {error, routing_key_but_no_dlx_defined};
+ _ -> ok
+ end;
+check_dlxrk_argument({Type, _}, _Args) ->
+ {error, {unacceptable_type, Type}}.
+
check_ha_policy_argument(undefined, _Args) ->
ok;
check_ha_policy_argument({longstr, <<"all">>}, _Args) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 12cd0c93ff..fd2d7214d2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -49,7 +49,14 @@
stats_timer,
msg_id_to_channel,
ttl,
- ttl_timer_ref
+ ttl_timer_ref,
+ publish_seqno,
+ unconfirmed_mq,
+ unconfirmed_qm,
+ delayed_stop,
+ queue_monitors,
+ dlx,
+ dlx_routing_key
}).
-record(consumer, {tag, ack_required}).
@@ -128,6 +135,13 @@ init(Q) ->
rate_timer_ref = undefined,
expiry_timer_ref = undefined,
ttl = undefined,
+ dlx = undefined,
+ dlx_routing_key = undefined,
+ publish_seqno = 1,
+ unconfirmed_mq = gb_trees:empty(),
+ unconfirmed_qm = gb_trees:empty(),
+ delayed_stop = undefined,
+ queue_monitors = dict:new(),
msg_id_to_channel = gb_trees:empty()},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -149,6 +163,11 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
rate_timer_ref = RateTRef,
expiry_timer_ref = undefined,
ttl = undefined,
+ publish_seqno = 1,
+ unconfirmed_mq = gb_trees:empty(),
+ unconfirmed_qm = gb_trees:empty(),
+ delayed_stop = undefined,
+ queue_monitors = dict:new(),
msg_id_to_channel = MTC},
State1 = requeue_and_run(AckTags, process_args(
rabbit_event:init_stats_timer(
@@ -210,18 +229,28 @@ bq_init(BQ, Q, Recover) ->
end).
process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
- lists:foldl(fun({Arg, Fun}, State1) ->
- case rabbit_misc:table_lookup(Arguments, Arg) of
- {_Type, Val} -> Fun(Val, State1);
- undefined -> State1
- end
- end, State, [{<<"x-expires">>, fun init_expires/2},
- {<<"x-message-ttl">>, fun init_ttl/2}]).
+ lists:foldl(
+ fun({Arg, Fun}, State1) ->
+ case rabbit_misc:table_lookup(Arguments, Arg) of
+ {_Type, Val} -> Fun(Val, State1);
+ undefined -> State1
+ end
+ end, State,
+ [{<<"x-expires">>, fun init_expires/2},
+ {<<"x-message-ttl">>, fun init_ttl/2},
+ {<<"x-dead-letter-exchange">>, fun init_dlx/2},
+ {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}).
+init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
+ State#q{dlx = rabbit_misc:r(QName, exchange, DLX)}.
+
+init_dlx_routing_key(RoutingKey, State) ->
+ State#q{dlx_routing_key = RoutingKey}.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),
@@ -449,34 +478,36 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
lists:foldl(
fun(MsgId, {CMs, MTC0}) ->
case gb_trees:lookup(MsgId, MTC0) of
- {value, {ChPid, MsgSeqNo}} ->
- {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMs),
+ {value, {SenderPid, MsgSeqNo}} ->
+ {rabbit_misc:gb_trees_cons(SenderPid,
+ MsgSeqNo, CMs),
gb_trees:delete(MsgId, MTC0)};
none ->
{CMs, MTC0}
end
end, {gb_trees:empty(), MTC}, MsgIds),
- rabbit_misc:gb_trees_foreach(fun rabbit_channel:confirm/2, CMs),
+ rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
State#q{msg_id_to_channel = MTC1}.
should_confirm_message(#delivery{msg_seq_no = undefined}, _State) ->
never;
-should_confirm_message(#delivery{sender = ChPid,
+should_confirm_message(#delivery{sender = SenderPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
is_persistent = true,
id = MsgId}},
#q{q = #amqqueue{durable = true}}) ->
- {eventually, ChPid, MsgSeqNo, MsgId};
+ {eventually, SenderPid, MsgSeqNo, MsgId};
should_confirm_message(_Delivery, _State) ->
immediately.
needs_confirming({eventually, _, _, _}) -> true;
needs_confirming(_) -> false.
-maybe_record_confirm_message({eventually, ChPid, MsgSeqNo, MsgId},
+maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId},
State = #q{msg_id_to_channel = MTC}) ->
- State#q{msg_id_to_channel = gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC)};
+ State#q{msg_id_to_channel =
+ gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)};
maybe_record_confirm_message(_Confirm, State) ->
State.
@@ -488,13 +519,13 @@ run_message_queue(State) ->
BQ:is_empty(BQS), State1),
State2.
-attempt_delivery(Delivery = #delivery{sender = ChPid,
+attempt_delivery(Delivery = #delivery{sender = SenderPid,
message = Message,
msg_seq_no = MsgSeqNo},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Confirm = should_confirm_message(Delivery, State),
case Confirm of
- immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
+ immediately -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]);
_ -> ok
end,
case BQ:is_duplicate(Message, BQS) of
@@ -509,7 +540,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid,
AckRequired, Message,
(?BASE_MESSAGE_PROPERTIES)#message_properties{
needs_confirming = needs_confirming(Confirm)},
- ChPid, BQS2),
+ SenderPid, BQS2),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS3}}
end,
@@ -530,7 +561,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid,
end.
deliver_or_enqueue(Delivery = #delivery{message = Message,
- sender = ChPid}, State) ->
+ sender = SenderPid}, State) ->
{Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
maybe_record_confirm_message(Confirm, State1),
@@ -538,7 +569,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
true -> State2;
false -> Props = (message_properties(State)) #message_properties{
needs_confirming = needs_confirming(Confirm)},
- BQS1 = BQ:publish(Message, Props, ChPid, BQS),
+ BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
@@ -659,11 +690,11 @@ subtract_acks(ChPid, AckTags, State, Fun) ->
Fun(State)
end.
-discard_delivery(#delivery{sender = ChPid,
+discard_delivery(#delivery{sender = SenderPid,
message = Message},
State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- State#q{backing_queue_state = BQ:discard(Message, ChPid, BQS)}.
+ State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}.
message_properties(#q{ttl=TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL)}.
@@ -674,10 +705,11 @@ calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
drop_expired_messages(State = #q{ttl = undefined}) ->
State;
drop_expired_messages(State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+ backing_queue = BQ}) ->
Now = now_micros(),
BQS1 = BQ:dropwhile(
fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
+ dead_letter_fun(expired, State),
BQS),
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
@@ -694,6 +726,199 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ensure_ttl_timer(State) ->
State.
+dead_letter_fun(_Reason, #q{dlx = undefined}) ->
+ undefined;
+dead_letter_fun(Reason, _State) ->
+ fun(Msg, AckTag) ->
+ gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason})
+ end.
+
+dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) ->
+ case rabbit_exchange:lookup(DLX) of
+ {error, not_found} ->
+ noreply(State);
+ _ ->
+ dead_letter_msg_existing_dlx(Msg, AckTag, Reason, State)
+ end.
+
+dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
+ State = #q{publish_seqno = MsgSeqNo,
+ unconfirmed_mq = UMQ,
+ dlx = DLX,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {ok, _, QPids} =
+ rabbit_basic:publish(
+ rabbit_basic:delivery(
+ false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
+ MsgSeqNo)),
+ State1 = lists:foldl(fun monitor_queue/2, State, QPids),
+ State2 = State1#q{publish_seqno = MsgSeqNo + 1},
+ case QPids of
+ [] -> {_, BQS1} = BQ:ack([AckTag], BQS),
+ cleanup_after_confirm(State2#q{backing_queue_state = BQS1});
+ _ -> State3 =
+ lists:foldl(
+ fun(QPid, State0 = #q{unconfirmed_qm = UQM}) ->
+ UQM1 = rabbit_misc:gb_trees_set_insert(
+ QPid, MsgSeqNo, UQM),
+ State0#q{unconfirmed_qm = UQM1}
+ end, State2, QPids),
+ noreply(State3#q{
+ unconfirmed_mq =
+ gb_trees:insert(
+ MsgSeqNo, {gb_sets:from_list(QPids),
+ AckTag}, UMQ)})
+ end.
+
+monitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
+ case dict:is_key(QPid, QMons) of
+ true -> State;
+ false -> State#q{queue_monitors =
+ dict:store(QPid, erlang:monitor(process, QPid),
+ QMons)}
+ end.
+
+demonitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
+ case dict:find(QPid, QMons) of
+ {ok, MRef} -> erlang:demonitor(MRef),
+ State#q{queue_monitors = dict:erase(QPid, QMons)};
+ error -> State
+ end.
+
+handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
+ unconfirmed_qm = UQM}) ->
+ case dict:find(QPid, QMons) of
+ error ->
+ noreply(State);
+ {ok, _} ->
+ #resource{name = QName} = qname(State),
+ rabbit_log:info("DLQ ~p (for ~p) died~n", [QPid, QName]),
+ case gb_trees:lookup(QPid, UQM) of
+ none ->
+ noreply(State);
+ {value, MsgSeqNosSet} ->
+ case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> rabbit_log:warning(
+ "Dead queue lost ~p messages~n",
+ [gb_sets:size(MsgSeqNosSet)]);
+ false -> ok
+ end,
+ handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid,
+ State#q{queue_monitors =
+ dict:erase(QPid, QMons)})
+ end
+ end.
+
+handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ,
+ unconfirmed_qm = UQM,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {AckTags1, UMQ3} =
+ lists:foldl(
+ fun (MsgSeqNo, {AckTags, UMQ1}) ->
+ {QPids, AckTag} = gb_trees:get(MsgSeqNo, UMQ1),
+ QPids1 = gb_sets:delete(QPid, QPids),
+ case gb_sets:is_empty(QPids1) of
+ true -> {[AckTag | AckTags],
+ gb_trees:delete(MsgSeqNo, UMQ1)};
+ false -> {AckTags, gb_trees:update(
+ MsgSeqNo, {QPids1, AckTag}, UMQ1)}
+ end
+ end, {[], UMQ}, MsgSeqNos),
+ {_Guids, BQS1} = BQ:ack(AckTags1, BQS),
+ MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM),
+ gb_sets:from_list(MsgSeqNos)),
+ State1 = case gb_sets:is_empty(MsgSeqNos1) of
+ false -> State#q{
+ unconfirmed_qm =
+ gb_trees:update(QPid, MsgSeqNos1, UQM)};
+ true -> demonitor_queue(
+ QPid, State#q{
+ unconfirmed_qm =
+ gb_trees:delete(QPid, UQM)})
+ end,
+ cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3,
+ backing_queue_state = BQS1}).
+
+stop_later(Reason, State) ->
+ stop_later(Reason, undefined, noreply, State).
+
+stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) ->
+ case {gb_trees:is_empty(UMQ), Reply} of
+ {true, noreply} ->
+ {stop, Reason, State};
+ {true, _} ->
+ {stop, Reason, Reply, State};
+ {false, _} ->
+ noreply(State#q{delayed_stop = {Reason, {From, Reply}}})
+ end.
+
+cleanup_after_confirm(State = #q{delayed_stop = DS,
+ unconfirmed_mq = UMQ}) ->
+ case gb_trees:is_empty(UMQ) andalso DS =/= undefined of
+ true -> case DS of
+ {_, {_, noreply}} -> ok;
+ {_, {From, Reply}} -> gen_server2:reply(From, Reply)
+ end,
+ {Reason, _} = DS,
+ {stop, Reason, State};
+ false -> noreply(State)
+ end.
+
+already_been_here(_Delivery, #q{dlx = undefined}) ->
+ false;
+already_been_here(#delivery{message = #basic_message{content = Content}},
+ State) ->
+ #content{properties = #'P_basic'{headers = Headers}} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ #resource{name = QueueName} = qname(State),
+ case Headers of
+ undefined ->
+ false;
+ _ ->
+ case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
+ {array, DeathTables} ->
+ OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) ||
+ {table, D} <- DeathTables],
+ OldQueues1 = [QName || {longstr, QName} <- OldQueues],
+ case lists:member(QueueName, OldQueues1) of
+ true -> [QueueName | OldQueues1];
+ _ -> false
+ end;
+ _ ->
+ false
+ end
+ end.
+
+make_dead_letter_msg(DLX, Reason,
+ Msg = #basic_message{content = Content,
+ exchange_name = Exchange,
+ routing_keys = RoutingKeys},
+ State = #q{dlx_routing_key = DlxRoutingKey}) ->
+ Headers = rabbit_basic:extract_headers(Content),
+ #resource{name = QName} = qname(State),
+ %% The first routing key is the one specified in the
+ %% basic.publish; all others are CC or BCC keys.
+ RoutingKeys1 = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
+ Info = [{<<"reason">>, longstr, list_to_binary(atom_to_list(Reason))},
+ {<<"queue">>, longstr, QName},
+ {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000},
+ {<<"exchange">>, longstr, Exchange#resource.name},
+ {<<"routing-keys">>, array,
+ [{longstr, Key} || Key <- RoutingKeys1]}],
+ Headers1 = rabbit_basic:append_table_header(<<"x-death">>, Info, Headers),
+ {DeathRoutingKeys, Headers2} =
+ case DlxRoutingKey of
+ undefined -> {RoutingKeys, Headers1};
+ _ -> {[DlxRoutingKey],
+ lists:keydelete(<<"CC">>, 1, Headers1)}
+ end,
+ Content1 = rabbit_basic:replace_headers(Headers2, Content),
+ Msg#basic_message{exchange_name = DLX, id = rabbit_guid:gen(),
+ routing_keys = DeathRoutingKeys, content = Content1}.
+
+
now_micros() -> timer:now_diff(now(), {0,0,0}).
infos(Items, State) ->
@@ -835,6 +1060,9 @@ prioritise_info(Msg, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
_ -> 0
end.
+handle_call(_, _, State = #q{delayed_stop = DS}) when DS =/= undefined ->
+ noreply(State);
+
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
declare(Recover, From, State);
@@ -891,15 +1119,15 @@ handle_call({deliver, Delivery = #delivery{mandatory = true}}, From, State) ->
gen_server2:reply(From, true),
noreply(deliver_or_enqueue(Delivery, State));
-handle_call({notify_down, ChPid}, _From, State) ->
+handle_call({notify_down, ChPid}, From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
%% are no longer visible by the time we send a response to the
%% client. The queue is ultimately deleted in terminate/2; if we
%% return stop with a reply, terminate/2 will be called by
%% gen_server2 *before* the reply is sent.
case handle_ch_down(ChPid, State) of
- {ok, NewState} -> reply(ok, NewState);
- {stop, NewState} -> {stop, normal, ok, NewState}
+ {ok, State1} -> reply(ok, State1);
+ {stop, State1} -> stop_later(normal, From, ok, State1)
end;
handle_call({basic_get, ChPid, NoAck}, _From,
@@ -954,7 +1182,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
reply(ok, State2)
end;
-handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
+handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
State = #q{exclusive_consumer = Holder}) ->
ok = maybe_send_reply(ChPid, OkMsg),
case lookup_ch(ChPid) of
@@ -974,7 +1202,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
State#q.active_consumers)},
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
- true -> {stop, normal, ok, State1}
+ true -> stop_later(normal, From, ok, State1)
end
end;
@@ -983,20 +1211,18 @@ handle_call(stat, _From, State) ->
drop_expired_messages(ensure_expiry_timer(State)),
reply({ok, BQ:len(BQS), active_consumer_count()}, State1);
-handle_call({delete, IfUnused, IfEmpty}, _From,
+handle_call({delete, IfUnused, IfEmpty}, From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
IsEmpty = BQ:is_empty(BQS),
IsUnused = is_unused(State),
if
- IfEmpty and not(IsEmpty) ->
- reply({error, not_empty}, State);
- IfUnused and not(IsUnused) ->
- reply({error, in_use}, State);
- true ->
- {stop, normal, {ok, BQ:len(BQS)}, State}
+ IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
+ IfUnused and not(IsUnused) -> reply({error, in_use}, State);
+ true -> stop_later(normal, From,
+ {ok, BQ:len(BQS)}, State)
end;
-handle_call(purge, _From, State = #q{backing_queue = BQ,
+handle_call(purge, _From, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Count, BQS1} = BQ:purge(BQS),
reply({ok, Count}, State#q{backing_queue_state = BQS1});
@@ -1007,10 +1233,18 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
ChPid, AckTags, State,
fun (State1) -> requeue_and_run(AckTags, State1) end)).
+handle_cast({confirm, MsgSeqNos, QPid}, State) ->
+ handle_confirm(MsgSeqNos, QPid, State);
+
+handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
+ noreply(State);
+
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender,
+ msg_seq_no = MsgSeqNo}, Flow},
+ State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
case Flow of
flow -> Key = {ch_publisher, Sender},
@@ -1021,7 +1255,12 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
credit_flow:ack(Sender);
noflow -> ok
end,
- noreply(deliver_or_enqueue(Delivery, State));
+ case already_been_here(Delivery, State) of
+ false -> noreply(deliver_or_enqueue(Delivery, State));
+ Qs -> log_cycle_once(Qs),
+ rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]),
+ noreply(State)
+ end;
handle_cast({ack, AckTags, ChPid}, State) ->
noreply(subtract_acks(
@@ -1039,13 +1278,14 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
backing_queue_state = BQS}) ->
case Requeue of
true -> requeue_and_run(AckTags, State1);
- false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ false -> Fun = dead_letter_fun(rejected, State),
+ BQS1 = BQ:fold(Fun, BQS, AckTags),
State1#q{backing_queue_state = BQS1}
end
end));
handle_cast(delete_immediately, State) ->
- {stop, normal, State};
+ stop_later(normal, State);
handle_cast({unblock, ChPid}, State) ->
noreply(
@@ -1096,11 +1336,17 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) ->
{Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
emit_consumer_created(Ch, CTag, true, AckRequired)
end,
- noreply(State).
+ noreply(State);
+
+handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) ->
+ dead_letter_msg(Msg, AckTag, Reason, State).
+
+handle_info(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
+ noreply(State);
handle_info(maybe_expire, State) ->
case is_unused(State) of
- true -> {stop, normal, State};
+ true -> stop_later(normal, State);
false -> noreply(ensure_expiry_timer(State))
end;
@@ -1122,11 +1368,11 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
%% match what people expect (see bug 21824). However we need this
%% monitor-and-async- delete in case the connection goes away
%% unexpectedly.
- {stop, normal, State};
-handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
+ stop_later(normal, State);
+handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) ->
case handle_ch_down(DownPid, State) of
- {ok, NewState} -> noreply(NewState);
- {stop, NewState} -> {stop, normal, NewState}
+ {ok, State1} -> handle_queue_down(DownPid, Reason, State1);
+ {stop, State1} -> stop_later(normal, State1)
end;
handle_info(update_ram_duration, State = #q{backing_queue = BQ,
@@ -1171,3 +1417,14 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
{hibernate, stop_rate_timer(State1)}.
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+
+log_cycle_once(Queues) ->
+ Key = {queue_cycle, Queues},
+ case get(Key) of
+ true -> ok;
+ undefined -> rabbit_log:warning(
+ "Message dropped. Dead-letter queues cycle detected" ++
+ ": ~p~nThis cycle will NOT be reported again.~n",
+ [Queues]),
+ put(Key, true)
+ end.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 364eb8f646..42627aae7e 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -95,16 +95,24 @@ behaviour_info(callbacks) ->
{drain_confirmed, 1},
%% Drop messages from the head of the queue while the supplied
- %% predicate returns true.
- {dropwhile, 2},
+ %% predicate returns true. A callback function is supplied
+ %% allowing callers access to messages that are about to be
+ %% dropped.
+ {dropwhile, 3},
%% Produce the next message.
{fetch, 2},
%% Acktags supplied are for messages which can now be forgotten
- %% about. Must return 1 msg_id per Ack, in the same order as Acks.
+ %% about. Must return 1 msg_id per Ack, in the same order as
+ %% Acks.
{ack, 2},
+ %% Acktags supplied are for messages which should be
+ %% processed. The provided callback function is called with each
+ %% message.
+ {fold, 3},
+
%% Reinsert messages into the queue which have already been
%% delivered and were pending acknowledgement.
{requeue, 2},
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index b8211d4332..25485ca0b0 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -19,7 +19,8 @@
-include("rabbit_framing.hrl").
-export([publish/4, publish/6, publish/1,
- message/3, message/4, properties/1, delivery/4]).
+ message/3, message/4, properties/1, append_table_header/3,
+ extract_headers/1, replace_headers/2, delivery/4, header_routes/1]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -31,6 +32,7 @@
-type(publish_result() ::
({ok, rabbit_amqqueue:routing_result(), [pid()]}
| rabbit_types:error('not_found'))).
+-type(headers() :: rabbit_framing:amqp_table() | 'undefined').
-type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())).
-type(body_input() :: (binary() | [binary()])).
@@ -55,6 +57,17 @@
rabbit_types:ok_or_error2(rabbit_types:message(), any())).
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
+
+-spec(append_table_header/3 ::
+ (binary(), rabbit_framing:amqp_table(), headers()) -> headers()).
+
+-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()).
+
+-spec(replace_headers/2 :: (headers(), rabbit_types:content())
+ -> rabbit_types:content()).
+
+-spec(header_routes/1 ::
+ (undefined | rabbit_framing:amqp_table()) -> [string()]).
-spec(build_content/2 :: (rabbit_framing:amqp_property_record(),
binary() | [binary()]) -> rabbit_types:content()).
-spec(from_content/1 :: (rabbit_types:content()) ->
@@ -166,6 +179,24 @@ properties(P) when is_list(P) ->
end
end, #'P_basic'{}, P).
+append_table_header(Name, Info, undefined) ->
+ append_table_header(Name, Info, []);
+append_table_header(Name, Info, Headers) ->
+ Prior = case rabbit_misc:table_lookup(Headers, Name) of
+ undefined -> [];
+ {array, Existing} -> Existing
+ end,
+ rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]).
+
+extract_headers(Content) ->
+ #content{properties = #'P_basic'{headers = Headers}} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ Headers.
+
+replace_headers(Headers, Content = #content{properties = Props}) ->
+ rabbit_binary_generator:clear_encoded_content(
+ Content#content{properties = Props#'P_basic'{headers = Headers}}).
+
indexof(L, Element) -> indexof(L, Element, 1).
indexof([], _Element, _N) -> 0;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 24cfbcf890..d5cba91b2e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -21,7 +21,7 @@
-behaviour(gen_server2).
-export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, flushed/2, confirm/2]).
+-export([send_command/2, deliver/4, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
-export([force_event_refresh/0]).
@@ -89,7 +89,6 @@
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
--spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
@@ -136,9 +135,6 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
-confirm(Pid, MsgSeqNos) ->
- gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
-
list() ->
rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(),
rabbit_channel, list_local, []).
@@ -1166,14 +1162,9 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
%% the set one by one which which would be inefficient
State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
{Nack, SendFun} =
- case Reason of
- Reason when Reason =:= noproc; Reason =:= noconnection;
- Reason =:= normal; Reason =:= shutdown ->
- {false, fun record_confirms/2};
- {shutdown, _} ->
- {false, fun record_confirms/2};
- _ ->
- {true, fun send_nacks/2}
+ case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> {true, fun send_nacks/2};
+ false -> {false, fun record_confirms/2}
end,
{MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
SendFun(MXs, State2).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 64a4a7371e..bfdab487f3 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -18,10 +18,10 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
- requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/2,
+ requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, discard/3]).
+ status/1, invoke/3, is_duplicate/2, discard/3, fold/3]).
-export([start/1, stop/0]).
@@ -172,12 +172,13 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 })}.
-dropwhile(Fun, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- set_delivered = SetDelivered }) ->
+dropwhile(Pred, MsgFun,
+ State = #state{gm = GM,
+ backing_queue = BQ,
+ set_delivered = SetDelivered,
+ backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- BQS1 = BQ:dropwhile(Fun, BQS),
+ BQS1 = BQ:dropwhile(Pred, MsgFun, BQS),
Dropped = Len - BQ:len(BQS1),
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}),
@@ -248,6 +249,13 @@ ack(AckTags, State = #state { gm = GM,
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
+fold(MsgFun, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS}, AckTags) ->
+ BQS1 = BQ:fold(MsgFun, BQS, AckTags),
+ ok = gm:broadcast(GM, {fold, MsgFun, AckTags}),
+ State #state { backing_queue_state = BQS1 }.
+
requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 9bf89bce3f..98a80a2619 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -430,7 +430,7 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
Acc
end
end, {gb_trees:empty(), MS}, MsgIds),
- rabbit_misc:gb_trees_foreach(fun rabbit_channel:confirm/2, CMs),
+ rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
State #state { msg_id_status = MS1 }.
handle_process_result({ok, State}) -> noreply(State);
@@ -665,7 +665,7 @@ maybe_enqueue_message(
{ok, {confirmed, ChPid}} ->
%% BQ has confirmed it but we didn't know what the
%% msg_seq_no was at the time. We do now!
- ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { sender_queues = SQ1,
msg_id_status = dict:erase(MsgId, MS) };
@@ -682,7 +682,7 @@ maybe_enqueue_message(
msg_id_status =
dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) };
immediately ->
- ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { msg_id_status = dict:erase(MsgId, MS),
sender_queues = SQ1 }
@@ -744,7 +744,7 @@ process_instruction(
{MQ2, PendingCh,
dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)};
immediately ->
- ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
{MQ2, PendingCh, MS}
end;
{{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
@@ -843,6 +843,11 @@ process_instruction({ack, MsgIds},
[] = MsgIds1 -- MsgIds, %% ASSERTION
{ok, State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 }};
+process_instruction({fold, MsgFun, AckTags},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ BQS1 = BQ:fold(AckTags, MsgFun, BQS),
+ {ok, State #state { backing_queue_state = BQS1 }};
process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index b6d38172b5..dca3bead75 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -28,7 +28,9 @@
-export([enable_cover/0, report_cover/0]).
-export([enable_cover/1, report_cover/1]).
-export([start_cover/1]).
+-export([confirm_to_sender/2]).
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
+-export([is_abnormal_termination/1]).
-export([with_user/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
-export([execute_mnesia_transaction/2]).
@@ -44,7 +46,8 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
--export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
+-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3,
+ gb_trees_set_insert/3]).
-export([gb_trees_fold/3, gb_trees_foreach/2]).
-export([get_options/2]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
@@ -108,7 +111,6 @@
(rabbit_framing:amqp_table(), binary(),
rabbit_framing:amqp_field_type(), rabbit_framing:amqp_value())
-> rabbit_framing:amqp_table()).
-
-spec(r/2 :: (rabbit_types:vhost(), K)
-> rabbit_types:r3(rabbit_types:vhost(), K, '_')
when is_subtype(K, atom())).
@@ -131,6 +133,7 @@
(atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
-spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]).
+-spec(is_abnormal_termination/1 :: (any()) -> boolean()).
-spec(with_user/2 :: (rabbit_types:username(), thunk(A)) -> A).
-spec(with_user_and_vhost/3 ::
(rabbit_types:username(), rabbit_types:vhost(), thunk(A))
@@ -172,6 +175,7 @@
-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()).
-spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()).
+-spec(gb_trees_set_insert/3 :: (any(), any(), gb_tree()) -> gb_tree()).
-spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A).
-spec(gb_trees_foreach/2 ::
(fun ((any(), any()) -> any()), gb_tree()) -> 'ok').
@@ -372,6 +376,9 @@ report_coverage_percentage(File, Cov, NotCov, Mod) ->
end,
Mod]).
+confirm_to_sender(Pid, MsgSeqNos) ->
+ gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
+
throw_on_error(E, Thunk) ->
case Thunk() of
{error, Reason} -> throw({E, Reason});
@@ -397,6 +404,12 @@ filter_exit_map(F, L) ->
fun () -> Ref end,
fun () -> F(I) end) || I <- L]).
+is_abnormal_termination(Reason)
+ when Reason =:= noproc; Reason =:= noconnection;
+ Reason =:= normal; Reason =:= shutdown -> false;
+is_abnormal_termination({shutdown, _}) -> false;
+is_abnormal_termination(_) -> true.
+
with_user(Username, Thunk) ->
fun () ->
case mnesia:read({rabbit_user, Username}) of
@@ -701,6 +714,15 @@ gb_trees_cons(Key, Value, Tree) ->
none -> gb_trees:insert(Key, [Value], Tree)
end.
+gb_trees_set_insert(Key, Value, Tree) ->
+ case gb_trees:lookup(Key, Tree) of
+ {value, Values} ->
+ Values1 = gb_sets:insert(Value, Values),
+ gb_trees:update(Key, Values1, Tree);
+ none ->
+ gb_trees:insert(Key, gb_sets:singleton(Value), Tree)
+ end.
+
gb_trees_fold(Fun, Acc, Tree) ->
gb_trees_fold1(Fun, Acc, gb_trees:next(gb_trees:iterator(Tree))).
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index 3025d981d4..22ff555ff0 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -21,7 +21,7 @@
-include_lib("public_key/include/public_key.hrl").
-export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]).
--export([peer_cert_subject_items/2]).
+-export([peer_cert_subject_items/2, peer_cert_auth_name/1]).
%%--------------------------------------------------------------------------
@@ -36,6 +36,8 @@
-spec(peer_cert_validity/1 :: (certificate()) -> string()).
-spec(peer_cert_subject_items/2 ::
(certificate(), tuple()) -> [string()] | 'not_found').
+-spec(peer_cert_auth_name/1 ::
+ (certificate()) -> binary() | 'not_found' | 'unsafe').
-endif.
@@ -76,6 +78,43 @@ peer_cert_validity(Cert) ->
format_asn1_value(End)])
end, Cert).
+%% Extract a username from the certificate
+peer_cert_auth_name(Cert) ->
+ {ok, Mode} = application:get_env(rabbit, ssl_cert_login_from),
+ peer_cert_auth_name(Mode, Cert).
+
+peer_cert_auth_name(distinguished_name, Cert) ->
+ case auth_config_sane() of
+ true -> iolist_to_binary(peer_cert_subject(Cert));
+ false -> unsafe
+ end;
+
+peer_cert_auth_name(common_name, Cert) ->
+ %% If there is more than one CN then we join them with "," in a
+ %% vaguely DN-like way. But this is more just so we do something
+ %% more intelligent than crashing, if you actually want to escape
+ %% things properly etc, use DN mode.
+ case auth_config_sane() of
+ true -> case peer_cert_subject_items(Cert, ?'id-at-commonName') of
+ not_found -> not_found;
+ CNs -> list_to_binary(string:join(CNs, ","))
+ end;
+ false -> unsafe
+ end.
+
+auth_config_sane() ->
+ {ok, Opts} = application:get_env(rabbit, ssl_options),
+ case {proplists:get_value(fail_if_no_peer_cert, Opts),
+ proplists:get_value(verify, Opts)} of
+ {true, verify_peer} ->
+ true;
+ {F, V} ->
+ rabbit_log:warning("SSL certificate authentication disabled, "
+ "fail_if_no_peer_cert=~p; "
+ "verify=~p~n", [F, V]),
+ false
+ end.
+
%%--------------------------------------------------------------------------
cert_info(F, Cert) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 29e0428de2..f7e3baa706 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2366,7 +2366,7 @@ test_dropwhile(VQ0) ->
VQ2 = rabbit_variable_queue:dropwhile(
fun(#message_properties { expiry = Expiry }) ->
Expiry =< 5
- end, VQ1),
+ end, undefined, VQ1),
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
@@ -2383,10 +2383,11 @@ test_dropwhile(VQ0) ->
test_dropwhile_varying_ram_duration(VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
- VQ3 = rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ2),
+ VQ3 = rabbit_variable_queue:dropwhile(
+ fun(_) -> false end, undefined, VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
- rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ5).
+ rabbit_variable_queue:dropwhile(fun(_) -> false end, undefined, VQ5).
test_variable_queue_dynamic_duration_change(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 52eb168a42..1b32d21197 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,11 +18,11 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
- dropwhile/2, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
+ dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3,
- multiple_routing_keys/0]).
+ multiple_routing_keys/0, fold/3]).
-export([start/1, stop/0]).
@@ -581,15 +581,23 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
confirmed = gb_sets:new() }}
end.
-dropwhile(Pred, State) ->
+dropwhile(Pred, MsgFun, State) ->
case queue_out(State) of
{empty, State1} ->
a(State1);
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case Pred(MsgProps) of
- true -> {_, State2} = internal_fetch(false, MsgStatus, State1),
- dropwhile(Pred, State2);
- false -> a(in_r(MsgStatus, State1))
+ case {Pred(MsgProps), MsgFun} of
+ {true, undefined} ->
+ {_, State2} = internal_fetch(false, MsgStatus, State1),
+ dropwhile(Pred, MsgFun, State2);
+ {true, _} ->
+ {{_, _, AckTag, _}, State2} =
+ internal_fetch(true, MsgStatus, State1),
+ {MsgStatus, State3} = read_msg(MsgStatus, State2),
+ MsgFun(MsgStatus#msg_status.msg, AckTag),
+ dropwhile(Pred, MsgFun, State3);
+ {false, _} ->
+ a(in_r(MsgStatus, State1))
end
end.
@@ -628,11 +636,22 @@ ack(AckTags, State) ->
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
-requeue(AckTags, #vqstate { delta = Delta,
- q3 = Q3,
- q4 = Q4,
- in_counter = InCounter,
- len = Len } = State) ->
+fold(undefined, State, _AckTags) ->
+ State;
+fold(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) ->
+ lists:foldl(
+ fun(SeqId, State1) ->
+ {MsgStatus, State2} =
+ read_msg(gb_trees:get(SeqId, PA), State1),
+ MsgFun(MsgStatus#msg_status.msg, SeqId),
+ State2
+ end, State, AckTags).
+
+requeue(AckTags, #vqstate { delta = Delta,
+ q3 = Q3,
+ q4 = Q4,
+ in_counter = InCounter,
+ len = Len } = State) ->
{SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [],
beta_limit(Q3),
fun publish_alpha/2, State),