summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2018-12-11 17:09:37 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2018-12-11 17:09:37 +0100
commit8317600c5f65b42149827849f5c5fcaf766a5512 (patch)
treeaae47583893eb28a9054f5f280128d48da0d158d /src
parenteb3bb7ebb7a6491a09843165c64f6f6ac6bc5dea (diff)
parentc80289849404a153f141ff96208abcd9c65f0153 (diff)
downloadrabbitmq-server-git-8317600c5f65b42149827849f5c5fcaf766a5512.tar.gz
Merge branch 'master' into rabbitmq-server-1743-exclusive-consumer
Diffstat (limited to 'src')
-rw-r--r--src/lqueue.erl8
-rw-r--r--src/rabbit_amqqueue.erl29
-rw-r--r--src/rabbit_amqqueue_process.erl63
-rw-r--r--src/rabbit_basic.erl10
-rw-r--r--src/rabbit_channel.erl222
-rw-r--r--src/rabbit_connection_sup.erl4
-rw-r--r--src/rabbit_fifo.erl365
-rw-r--r--src/rabbit_fifo_client.erl52
-rw-r--r--src/rabbit_fifo_index.erl52
-rw-r--r--src/rabbit_lager.erl30
-rw-r--r--src/rabbit_mirror_queue_master.erl6
-rw-r--r--src/rabbit_networking.erl41
-rw-r--r--src/rabbit_queue_consumers.erl18
-rw-r--r--src/rabbit_quorum_memory_manager.erl76
-rw-r--r--src/rabbit_quorum_queue.erl62
-rw-r--r--src/rabbit_reader.erl18
-rw-r--r--src/rabbit_vhost.erl11
-rw-r--r--src/rabbit_vm.erl16
18 files changed, 784 insertions, 299 deletions
diff --git a/src/lqueue.erl b/src/lqueue.erl
index 0652061075..1abe4e0b82 100644
--- a/src/lqueue.erl
+++ b/src/lqueue.erl
@@ -21,7 +21,7 @@
%% is an O(1) operation, in contrast with queue:len/1 which is O(n).
-export([new/0, is_empty/1, len/1, in/2, in_r/2, out/1, out_r/1, join/2,
- foldl/3, foldr/3, from_list/1, to_list/1, peek/1, peek_r/1]).
+ foldl/3, foldr/3, from_list/1, drop/1, to_list/1, peek/1, peek_r/1]).
-define(QUEUE, queue).
@@ -32,6 +32,7 @@
-type result() :: 'empty' | {'value', value()}.
-spec new() -> ?MODULE().
+-spec drop(?MODULE()) -> ?MODULE().
-spec is_empty(?MODULE()) -> boolean().
-spec len(?MODULE()) -> non_neg_integer().
-spec in(value(), ?MODULE()) -> ?MODULE().
@@ -48,6 +49,8 @@
new() -> {0, ?QUEUE:new()}.
+drop({L, Q}) -> {L - 1, ?QUEUE:drop(Q)}.
+
is_empty({0, _Q}) -> true;
is_empty(_) -> false.
@@ -81,7 +84,8 @@ foldr(Fun, Init, Q) ->
{{value, V}, Q1} -> foldr(Fun, Fun(V, Init), Q1)
end.
-len({L, _Q}) -> L.
+len({L, _}) -> L.
+
peek({ 0, _Q}) -> empty;
peek({_L, Q}) -> ?QUEUE:peek(Q).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 64cb3cf790..0f7d569a00 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -44,6 +44,7 @@
-export([list_local_followers/0]).
-export([ensure_rabbit_queue_record_is_initialized/1]).
-export([format/1]).
+-export([delete_immediately_by_resource/1]).
-export([pid_of/1, pid_of/2]).
-export([mark_local_durable_queues_stopped/1]).
@@ -266,6 +267,13 @@ filter_per_type(Queues) ->
filter_pid_per_type(QPids) ->
lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids).
+filter_resource_per_type(Resources) ->
+ Queues = [begin
+ {ok, #amqqueue{pid = QPid}} = lookup(Resource),
+ {Resource, QPid}
+ end || Resource <- Resources],
+ lists:partition(fun({_Resource, QPid}) -> ?IS_CLASSIC(QPid) end, Queues).
+
stop(VHost) ->
%% Classic queues
ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
@@ -912,8 +920,11 @@ list_local(VHostPath) ->
[ Q || #amqqueue{state = State, pid = QPid} = Q <- list(VHostPath),
State =/= crashed, is_local_to_node(QPid, node()) ].
-notify_policy_changed(#amqqueue{pid = QPid}) ->
- gen_server2:cast(QPid, policy_changed).
+notify_policy_changed(#amqqueue{pid = QPid}) when ?IS_CLASSIC(QPid) ->
+ gen_server2:cast(QPid, policy_changed);
+notify_policy_changed(#amqqueue{pid = QPid,
+ name = QName}) when ?IS_QUORUM(QPid) ->
+ rabbit_quorum_queue:policy_changed(QName, QPid).
consumers(#amqqueue{ pid = QPid }) ->
delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}).
@@ -961,7 +972,16 @@ delete_exclusive(QPids, ConnId) ->
delete_immediately(QPids) ->
{Classic, Quorum} = filter_pid_per_type(QPids),
[gen_server2:cast(QPid, delete_immediately) || QPid <- Classic],
- [rabbit_quorum_queue:delete_immediately(QPid) || QPid <- Quorum],
+ case Quorum of
+ [] -> ok;
+ _ -> {error, cannot_delete_quorum_queues, Quorum}
+ end.
+
+delete_immediately_by_resource(Resources) ->
+ {Classic, Quorum} = filter_resource_per_type(Resources),
+ [gen_server2:cast(QPid, delete_immediately) || {_, QPid} <- Classic],
+ [rabbit_quorum_queue:delete_immediately(Resource, QPid)
+ || {Resource, QPid} <- Quorum],
ok.
delete(#amqqueue{ type = quorum} = Q,
@@ -1098,7 +1118,8 @@ notify_down_all(QPids, ChPid, Timeout) ->
Error -> {error, Error}
end.
-activate_limit_all(QPids, ChPid) ->
+activate_limit_all(QRefs, ChPid) ->
+ QPids = [P || P <- QRefs, ?IS_CLASSIC(P)],
delegate:invoke_no_result(QPids, {gen_server2, cast,
[{activate_limit, ChPid}]}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 606e62af11..5f56955ccc 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -669,44 +669,53 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
State#q{consumers = Consumers})}
end.
-maybe_deliver_or_enqueue(Delivery, Delivered, State = #q{overflow = Overflow}) ->
+maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
+ Delivered,
+ State = #q{overflow = Overflow,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
send_mandatory(Delivery), %% must do this before confirms
case {will_overflow(Delivery, State), Overflow} of
{true, 'reject-publish'} ->
%% Drop publish and nack to publisher
send_reject_publish(Delivery, Delivered, State);
_ ->
- %% Enqueue and maybe drop head later
- deliver_or_enqueue(Delivery, Delivered, State)
+ {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
+ State1 = State#q{backing_queue_state = BQS1},
+ case IsDuplicate of
+ true -> State1;
+ {true, drop} -> State1;
+ %% Drop publish and nack to publisher
+ {true, reject} ->
+ send_reject_publish(Delivery, Delivered, State1);
+ %% Enqueue and maybe drop head later
+ false ->
+ deliver_or_enqueue(Delivery, Delivered, State1)
+ end
end.
deliver_or_enqueue(Delivery = #delivery{message = Message,
sender = SenderPid,
flow = Flow},
- Delivered, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ Delivered,
+ State = #q{backing_queue = BQ}) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State1),
- {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
- State2 = State1#q{backing_queue_state = BQS1},
- case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered,
- State2) of
- true ->
+ case attempt_delivery(Delivery, Props, Delivered, State1) of
+ {delivered, State2} ->
State2;
- {delivered, State3} ->
- State3;
%% The next one is an optimisation
- {undelivered, State3 = #q{ttl = 0, dlx = undefined,
- backing_queue_state = BQS2,
+ {undelivered, State2 = #q{ttl = 0, dlx = undefined,
+ backing_queue_state = BQS,
msg_id_to_channel = MTC}} ->
- {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
- State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
- {undelivered, State3 = #q{backing_queue_state = BQS2}} ->
-
- BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
- {Dropped, State4 = #q{backing_queue_state = BQS4}} =
- maybe_drop_head(State3#q{backing_queue_state = BQS3}),
- QLen = BQ:len(BQS4),
+ {BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC),
+ State2#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1};
+ {undelivered, State2 = #q{backing_queue_state = BQS}} ->
+
+ BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS),
+ {Dropped, State3 = #q{backing_queue_state = BQS2}} =
+ maybe_drop_head(State2#q{backing_queue_state = BQS1}),
+ QLen = BQ:len(BQS2),
%% optimisation: it would be perfectly safe to always
%% invoke drop_expired_msgs here, but that is expensive so
%% we only do that if a new message that might have an
@@ -715,9 +724,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
%% has no expiry and becomes the head of the queue then
%% the call is unnecessary.
case {Dropped, QLen =:= 1, Props#message_properties.expiry} of
- {false, false, _} -> State4;
- {true, true, undefined} -> State4;
- {_, _, _} -> drop_expired_msgs(State4)
+ {false, false, _} -> State3;
+ {true, true, undefined} -> State3;
+ {_, _, _} -> drop_expired_msgs(State3)
end
end.
@@ -1683,7 +1692,3 @@ update_ha_mode(State) ->
{ok, Q} = rabbit_amqqueue:lookup(qname(State)),
ok = rabbit_mirror_queue_misc:update_mirrors(Q),
State.
-
-
-
-
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index d474e9cad3..22ceefb85f 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -23,6 +23,7 @@
extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4,
header_routes/1, parse_expiration/1, header/2, header/3]).
-export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]).
+-export([add_header/4]).
%%----------------------------------------------------------------------------
@@ -300,3 +301,12 @@ maybe_gc_large_msg(Content) ->
msg_size(Content) ->
rabbit_writer:msg_size(Content).
+
+add_header(Name, Type, Value, #basic_message{content = Content0} = Msg) ->
+ Content = rabbit_basic:map_headers(
+ fun(undefined) ->
+ rabbit_misc:set_table_value([], Name, Type, Value);
+ (Headers) ->
+ rabbit_misc:set_table_value(Headers, Name, Type, Value)
+ end, Content0),
+ Msg#basic_message{content = Content}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 1b74b655f5..f749d9f30e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -110,7 +110,7 @@
%% when queue.bind's queue field is empty,
%% this name will be used instead
most_recently_declared_queue,
- %% a map of queue pid to queue name
+ %% a map of queue ref to queue name
queue_names,
%% queue processes are monitored to update
%% queue names
@@ -161,6 +161,7 @@
queue_cleanup_timer
}).
+-define(QUEUE, lqueue).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -339,12 +340,29 @@ list_local() ->
info_keys() -> ?INFO_KEYS.
info(Pid) ->
- gen_server2:call(Pid, info, infinity).
+ {Timeout, Deadline} = get_operation_timeout_and_deadline(),
+ try
+ case gen_server2:call(Pid, {info, Deadline}, Timeout) of
+ {ok, Res} -> Res;
+ {error, Error} -> throw(Error)
+ end
+ catch
+ exit:{timeout, _} ->
+ rabbit_log:error("Timed out getting channel ~p info", [Pid]),
+ throw(timeout)
+ end.
info(Pid, Items) ->
- case gen_server2:call(Pid, {info, Items}, infinity) of
- {ok, Res} -> Res;
- {error, Error} -> throw(Error)
+ {Timeout, Deadline} = get_operation_timeout_and_deadline(),
+ try
+ case gen_server2:call(Pid, {{info, Items}, Deadline}, Timeout) of
+ {ok, Res} -> Res;
+ {error, Error} -> throw(Error)
+ end
+ catch
+ exit:{timeout, _} ->
+ rabbit_log:error("Timed out getting channel ~p info", [Pid]),
+ throw(timeout)
end.
info_all() ->
@@ -433,7 +451,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
limiter = Limiter,
tx = none,
next_tag = 1,
- unacked_message_q = queue:new(),
+ unacked_message_q = ?QUEUE:new(),
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
@@ -493,13 +511,20 @@ prioritise_info(Msg, _Len, _State) ->
handle_call(flush, _From, State) ->
reply(ok, State);
-handle_call(info, _From, State) ->
- reply(infos(?INFO_KEYS, State), State);
+handle_call({info, Deadline}, _From, State) ->
+ try
+ reply({ok, infos(?INFO_KEYS, Deadline, State)}, State)
+ catch
+ Error ->
+ reply({error, Error}, State)
+ end;
-handle_call({info, Items}, _From, State) ->
+handle_call({{info, Items}, Deadline}, _From, State) ->
try
- reply({ok, infos(Items, State)}, State)
- catch Error -> reply({error, Error}, State)
+ reply({ok, infos(Items, Deadline, State)}, State)
+ catch
+ Error ->
+ reply({error, Error}, State)
end;
handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
@@ -649,8 +674,9 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
State = lists:foldl(
fun({MsgId, {MsgHeader, Msg}}, Acc) ->
IsDelivered = maps:is_key(delivery_count, MsgHeader),
+ Msg1 = add_delivery_count_header(MsgHeader, Msg),
handle_deliver(CTag, AckRequired,
- {QName, From, MsgId, IsDelivered, Msg},
+ {QName, From, MsgId, IsDelivered, Msg1},
Acc)
end, State0#ch{queue_states = maps:put(Name, QState2, QueueStates)}, Msgs),
noreply(State);
@@ -669,19 +695,20 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
#'basic.credit_drained'{consumer_tag = CTag,
credit_drained = Credit})
end, Actions),
- noreply_coalesce(confirm(MsgSeqNos, From, State));
+ noreply_coalesce(confirm(MsgSeqNos, Name, State));
eol ->
- State1 = handle_consuming_queue_down_or_eol(From, State0),
- State2 = handle_delivering_queue_down(From, State1),
- {MXs, UC1} = dtree:take(From, State2#ch.unconfirmed),
+ State1 = handle_consuming_queue_down_or_eol(Name, State0),
+ State2 = handle_delivering_queue_down(Name, State1),
+ %% TODO: this should use dtree:take/3
+ {MXs, UC1} = dtree:take(Name, State2#ch.unconfirmed),
State3 = record_confirms(MXs, State1#ch{unconfirmed = UC1}),
- case maps:find(From, QNames) of
+ case maps:find(Name, QNames) of
{ok, QName} -> erase_queue_stats(QName);
error -> ok
end,
noreply_coalesce(
State3#ch{queue_states = maps:remove(Name, QueueStates),
- queue_names = maps:remove(From, QNames)})
+ queue_names = maps:remove(Name, QNames)})
end;
_ ->
%% the assumption here is that the queue state has been cleaned up and
@@ -1179,7 +1206,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
DQ = {Delivery#delivery{flow = Flow}, QNames},
{noreply, case Tx of
none -> deliver_to_queues(DQ, State1);
- {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),
+ {Msgs, Acks} -> Msgs1 = ?QUEUE:in(DQ, Msgs),
State1#ch{tx = {Msgs1, Acks}}
end};
{error, Reason} ->
@@ -1334,13 +1361,14 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
return_ok(State, NoWait, OkMsg);
{ok, {Q = #amqqueue{pid = QPid}, _CParams}} ->
ConsumerMapping1 = maps:remove(ConsumerTag, ConsumerMapping),
+ QRef = qpid_to_ref(QPid),
QCons1 =
- case maps:find(QPid, QCons) of
+ case maps:find(QRef, QCons) of
error -> QCons;
{ok, CTags} -> CTags1 = gb_sets:delete(ConsumerTag, CTags),
case gb_sets:is_empty(CTags1) of
- true -> maps:remove(QPid, QCons);
- false -> maps:put(QPid, CTags1, QCons)
+ true -> maps:remove(QRef, QCons);
+ false -> maps:put(QRef, CTags1, QCons)
end
end,
NewState = State#ch{consumer_mapping = ConsumerMapping1,
@@ -1386,14 +1414,14 @@ handle_method(#'basic.qos'{global = true,
handle_method(#'basic.qos'{global = true,
prefetch_count = PrefetchCount},
_, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) ->
- %% TODO queue:len(UAMQ) is not strictly right since that counts
+ %% TODO ?QUEUE:len(UAMQ) is not strictly right since that counts
%% unacked messages from basic.get too. Pretty obscure though.
Limiter1 = rabbit_limiter:limit_prefetch(Limiter,
- PrefetchCount, queue:len(UAMQ)),
+ PrefetchCount, ?QUEUE:len(UAMQ)),
case ((not rabbit_limiter:is_active(Limiter)) andalso
rabbit_limiter:is_active(Limiter1)) of
true -> rabbit_amqqueue:activate_limit_all(
- consumer_queues(State#ch.consumer_mapping), self());
+ consumer_queue_refs(State#ch.consumer_mapping), self());
false -> ok
end,
{reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
@@ -1402,7 +1430,7 @@ handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ, limiter = Limiter,
queue_states = QueueStates0}) ->
OkFun = fun () -> ok end,
- UAMQL = queue:to_list(UAMQ),
+ UAMQL = ?QUEUE:to_list(UAMQ),
QueueStates =
foreach_per_queue(
fun ({QPid, CTag}, MsgIds, Acc0) ->
@@ -1416,7 +1444,7 @@ handle_method(#'basic.recover_async'{requeue = true},
ok = notify_limiter(Limiter, UAMQL),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
- {noreply, State#ch{unacked_message_q = queue:new(),
+ {noreply, State#ch{unacked_message_q = ?QUEUE:new(),
queue_states = QueueStates}};
handle_method(#'basic.recover_async'{requeue = false}, _, _State) ->
@@ -1525,7 +1553,7 @@ handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks},
limiter = Limiter}) ->
- State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs),
+ State1 = queue_fold(fun deliver_to_queues/2, State, Msgs),
Rev = fun (X) -> lists:reverse(lists:sort(X)) end,
State2 = lists:foldl(fun ({ack, A}, Acc) ->
ack(Rev(A), Acc);
@@ -1540,7 +1568,7 @@ handle_method(#'tx.rollback'{}, _, #ch{tx = none}) ->
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
tx = {_Msgs, Acks}}) ->
AcksL = lists:append(lists:reverse([lists:reverse(L) || {_, L} <- Acks])),
- UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))),
+ UAMQ1 = ?QUEUE:from_list(lists:usort(AcksL ++ ?QUEUE:to_list(UAMQ))),
{reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1,
tx = new_tx()}};
@@ -1639,25 +1667,26 @@ consumer_monitor(ConsumerTag,
State = #ch{consumer_mapping = ConsumerMapping,
queue_monitors = QMons,
queue_consumers = QCons}) ->
- {#amqqueue{pid = QPid}, _} =
- maps:get(ConsumerTag, ConsumerMapping),
- CTags1 = case maps:find(QPid, QCons) of
+ {#amqqueue{pid = QPid}, _} = maps:get(ConsumerTag, ConsumerMapping),
+ QRef = qpid_to_ref(QPid),
+ CTags1 = case maps:find(QRef, QCons) of
{ok, CTags} -> gb_sets:insert(ConsumerTag, CTags);
error -> gb_sets:singleton(ConsumerTag)
end,
- QCons1 = maps:put(QPid, CTags1, QCons),
- State#ch{queue_monitors = maybe_monitor(QPid, QMons),
+ QCons1 = maps:put(QRef, CTags1, QCons),
+ State#ch{queue_monitors = maybe_monitor(QRef, QMons),
queue_consumers = QCons1}.
track_delivering_queue(NoAck, QPid, QName,
State = #ch{queue_names = QNames,
queue_monitors = QMons,
delivering_queues = DQ}) ->
- State#ch{queue_names = maps:put(QPid, QName, QNames),
- queue_monitors = maybe_monitor(QPid, QMons),
+ QRef = qpid_to_ref(QPid),
+ State#ch{queue_names = maps:put(QRef, QName, QNames),
+ queue_monitors = maybe_monitor(QRef, QMons),
delivering_queues = case NoAck of
true -> DQ;
- false -> sets:add_element(QPid, DQ)
+ false -> sets:add_element(QRef, DQ)
end}.
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
@@ -1676,16 +1705,16 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) ->
error(quorum_queues_should_never_be_monitored).
-handle_consuming_queue_down_or_eol(QPid,
- State = #ch{queue_consumers = QCons,
- queue_names = QNames}) ->
- ConsumerTags = case maps:find(QPid, QCons) of
+handle_consuming_queue_down_or_eol(QRef,
+ State = #ch{queue_consumers = QCons,
+ queue_names = QNames}) ->
+ ConsumerTags = case maps:find(QRef, QCons) of
error -> gb_sets:new();
{ok, CTags} -> CTags
end,
gb_sets:fold(
fun (CTag, StateN = #ch{consumer_mapping = CMap}) ->
- QName = maps:get(QPid, QNames),
+ QName = maps:get(QRef, QNames),
case queue_down_consumer_action(CTag, CMap) of
remove ->
cancel_consumer(CTag, QName, StateN);
@@ -1697,7 +1726,7 @@ handle_consuming_queue_down_or_eol(QPid,
_ -> cancel_consumer(CTag, QName, StateN)
end
end
- end, State#ch{queue_consumers = maps:remove(QPid, QCons)}, ConsumerTags).
+ end, State#ch{queue_consumers = maps:remove(QRef, QCons)}, ConsumerTags).
%% [0] There is a slight danger here that if a queue is deleted and
%% then recreated again the reconsume will succeed even though it was
@@ -1724,8 +1753,8 @@ queue_down_consumer_action(CTag, CMap) ->
_ -> {recover, ConsumeSpec}
end.
-handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
- State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
+handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) ->
+ State#ch{delivering_queues = sets:del_element(QRef, DQ)}.
binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
RoutingKey, Arguments, VHostPath, ConnPid,
@@ -1831,28 +1860,28 @@ record_sent(Type, Tag, AckRequired,
end,
rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState),
UAMQ1 = case AckRequired of
- true -> queue:in({DeliveryTag, Tag, {QPid, MsgId}},
- UAMQ);
+ true -> ?QUEUE:in({DeliveryTag, Tag, {QPid, MsgId}},
+ UAMQ);
false -> UAMQ
end,
State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
%% NB: returns acks in youngest-first order
collect_acks(Q, 0, true) ->
- {lists:reverse(queue:to_list(Q)), queue:new()};
+ {lists:reverse(?QUEUE:to_list(Q)), ?QUEUE:new()};
collect_acks(Q, DeliveryTag, Multiple) ->
collect_acks([], [], Q, DeliveryTag, Multiple).
collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
- case queue:out(Q) of
+ case ?QUEUE:out(Q) of
{{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}},
QTail} ->
if CurrentDeliveryTag == DeliveryTag ->
{[UnackedMsg | ToAcc],
case PrefixAcc of
[] -> QTail;
- _ -> queue:join(
- queue:from_list(lists:reverse(PrefixAcc)),
+ _ -> ?QUEUE:join(
+ ?QUEUE:from_list(lists:reverse(PrefixAcc)),
QTail)
end};
Multiple ->
@@ -1880,7 +1909,7 @@ ack(Acked, State = #ch{queue_names = QNames,
State#ch{queue_states = QueueStates}.
incr_queue_stats(QPid, QNames, MsgIds, State) ->
- case maps:find(QPid, QNames) of
+ case maps:find(qpid_to_ref(QPid), QNames) of
{ok, QName} -> Count = length(MsgIds),
?INCR_STATS(queue_stats, QName, Count, ack, State);
error -> ok
@@ -1898,16 +1927,16 @@ incr_queue_stats(QPid, QNames, MsgIds, State) ->
%% (reject w requeue), 'false' (reject w/o requeue). The msg ids, as
%% well as the list overall, are in "most-recent (generally youngest)
%% ack first" order.
-new_tx() -> {queue:new(), []}.
+new_tx() -> {?QUEUE:new(), []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
notify_queues(State = #ch{consumer_mapping = Consumers,
delivering_queues = DQ }) ->
- QPids0 = sets:to_list(
- sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
+ QRefs0 = sets:to_list(
+ sets:union(sets:from_list(consumer_queue_refs(Consumers)), DQ)),
%% filter to only include pids to avoid trying to notify quorum queues
- QPids = [P || P <- QPids0, ?IS_CLASSIC(P)],
+ QPids = [P || P <- QRefs0, ?IS_CLASSIC(P)],
Timeout = get_operation_timeout(),
{rabbit_amqqueue:notify_down_all(QPids, self(), Timeout),
State#ch{state = closing}}.
@@ -1923,8 +1952,8 @@ foreach_per_queue(F, UAL, Acc) ->
end, gb_trees:empty(), UAL),
rabbit_misc:gb_trees_fold(fun (Key, Val, Acc0) -> F(Key, Val, Acc0) end, Acc, T).
-consumer_queues(Consumers) ->
- lists:usort([QPid || {_Key, {#amqqueue{pid = QPid}, _CParams}}
+consumer_queue_refs(Consumers) ->
+ lists:usort([qpid_to_ref(QPid) || {_Key, {#amqqueue{pid = QPid}, _CParams}}
<- maps:to_list(Consumers)]).
%% tell the limiter about the number of acks that have been received
@@ -1967,7 +1996,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
Qs = rabbit_amqqueue:lookup(DelQNames),
{DeliveredQPids, DeliveredQQPids, QueueStates} =
rabbit_amqqueue:deliver(Qs, Delivery, QueueStates0),
- AllDeliveredQPids = DeliveredQPids ++ DeliveredQQPids,
+ AllDeliveredQRefs = DeliveredQPids ++ [N || {N, _} <- DeliveredQQPids],
%% The maybe_monitor_all/2 monitors all queues to which we
%% delivered. But we want to monitor even queues we didn't deliver
%% to, since we need their 'DOWN' messages to clean
@@ -1981,49 +2010,50 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
{QNames1, QMons1} =
lists:foldl(fun (#amqqueue{pid = QPid, name = QName},
{QNames0, QMons0}) ->
- {case maps:is_key(QPid, QNames0) of
+ QRef = qpid_to_ref(QPid),
+ {case maps:is_key(QRef, QNames0) of
true -> QNames0;
- false -> maps:put(QPid, QName, QNames0)
+ false -> maps:put(QRef, QName, QNames0)
end, maybe_monitor(QPid, QMons0)}
end, {QNames, maybe_monitor_all(DeliveredQPids, QMons)}, Qs),
State1 = State#ch{queue_names = QNames1,
queue_monitors = QMons1},
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
- State2 = process_routing_mandatory(Mandatory, AllDeliveredQPids, MsgSeqNo,
+ State2 = process_routing_mandatory(Mandatory, AllDeliveredQRefs , MsgSeqNo,
Message, State1),
- State3 = process_routing_confirm( Confirm, AllDeliveredQPids, MsgSeqNo,
- XName, State2),
+ State3 = process_routing_confirm(Confirm, AllDeliveredQRefs , MsgSeqNo,
+ XName, State2),
case rabbit_event:stats_level(State3, #ch.stats_timer) of
fine ->
?INCR_STATS(exchange_stats, XName, 1, publish),
[?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) ||
- QPid <- AllDeliveredQPids,
- {ok, QName} <- [maps:find(QPid, QNames1)]];
+ QRef <- AllDeliveredQRefs,
+ {ok, QName} <- [maps:find(QRef, QNames1)]];
_ ->
ok
end,
State3#ch{queue_states = QueueStates}.
-process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) ->
+process_routing_mandatory(false, _, _, _, State) ->
State;
-process_routing_mandatory(true, [], _MsgSeqNo, Msg, State) ->
+process_routing_mandatory(true, [], _, Msg, State) ->
ok = basic_return(Msg, State, no_route),
State;
-process_routing_mandatory(true, QPids, MsgSeqNo, Msg, State) ->
- State#ch{mandatory = dtree:insert(MsgSeqNo, QPids, Msg,
+process_routing_mandatory(true, QRefs, MsgSeqNo, Msg, State) ->
+ State#ch{mandatory = dtree:insert(MsgSeqNo, QRefs, Msg,
State#ch.mandatory)}.
-process_routing_confirm(false, _, _MsgSeqNo, _XName, State) ->
+process_routing_confirm(false, _, _, _, State) ->
State;
-process_routing_confirm(true, [], MsgSeqNo, XName, State) ->
+process_routing_confirm(true, [], MsgSeqNo, XName, State) ->
record_confirms([{MsgSeqNo, XName}], State);
-process_routing_confirm(true, QPids, MsgSeqNo, XName, State) ->
- State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
+process_routing_confirm(true, QRefs, MsgSeqNo, XName, State) ->
+ State#ch{unconfirmed = dtree:insert(MsgSeqNo, QRefs, XName,
State#ch.unconfirmed)}.
-confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
- {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
+confirm(MsgSeqNos, QRef, State = #ch{unconfirmed = UC}) ->
+ {MXs, UC1} = dtree:take(MsgSeqNos, QRef, UC),
%% NB: don't call noreply/1 since we don't want to send confirms.
record_confirms(MXs, State#ch{unconfirmed = UC1}).
@@ -2129,6 +2159,17 @@ complete_tx(State = #ch{tx = failed}) ->
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+infos(Items, Deadline, State) ->
+ [begin
+ Now = now_millis(),
+ if
+ Now > Deadline ->
+ throw(timeout);
+ true ->
+ {Item, i(Item, State)}
+ end
+ end || Item <- Items].
+
i(pid, _) -> self();
i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
i(number, #ch{channel = Channel}) -> Channel;
@@ -2140,8 +2181,8 @@ i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM);
i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
-i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ);
-i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs);
+i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> ?QUEUE:len(UAMQ);
+i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> ?QUEUE:len(Msgs);
i(messages_uncommitted, #ch{}) -> 0;
i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
@@ -2488,3 +2529,28 @@ maybe_monitor(_, QMons) ->
maybe_monitor_all([], S) -> S; %% optimisation
maybe_monitor_all([Item], S) -> maybe_monitor(Item, S); %% optimisation
maybe_monitor_all(Items, S) -> lists:foldl(fun maybe_monitor/2, S, Items).
+
+add_delivery_count_header(MsgHeader, Msg) ->
+ Count = maps:get(delivery_count, MsgHeader, 0),
+ rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg).
+
+qpid_to_ref(Pid) when is_pid(Pid) -> Pid;
+qpid_to_ref({Name, _}) -> Name;
+%% assume it already is a ref
+qpid_to_ref(Ref) -> Ref.
+
+now_millis() ->
+ erlang:monotonic_time(millisecond).
+
+get_operation_timeout_and_deadline() ->
+ % NB: can't use get_operation_timeout because
+ % this code may not be running via the channel Pid
+ Timeout = ?CHANNEL_OPERATION_TIMEOUT,
+ Deadline = now_millis() + Timeout,
+ {Timeout, Deadline}.
+
+queue_fold(Fun, Init, Q) ->
+ case ?QUEUE:out(Q) of
+ {empty, _Q} -> Init;
+ {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
+ end.
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index 4dcfa8dc8a..7ec5262448 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -42,7 +42,7 @@
%%--------------------------------------------------------------------------
-start_link(Ref, Sock, _Transport, _Opts) ->
+start_link(Ref, _Sock, _Transport, _Opts) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
%% We need to get channels in the hierarchy here so they get shut
%% down after the reader, so the reader gets a chance to terminate
@@ -62,7 +62,7 @@ start_link(Ref, Sock, _Transport, _Opts) ->
{ok, ReaderPid} =
supervisor2:start_child(
SupPid,
- {reader, {rabbit_reader, start_link, [HelperSup, Ref, Sock]},
+ {reader, {rabbit_reader, start_link, [HelperSup, Ref]},
intrinsic, ?WORKER_WAIT, worker, [rabbit_reader]}),
{ok, SupPid, ReaderPid}.
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 79d4a3effc..00d0db0b8a 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -1,3 +1,19 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
-module(rabbit_fifo).
-behaviour(ra_machine).
@@ -106,7 +122,7 @@
-type applied_mfa() :: {module(), atom(), list()}.
% represents a partially applied module call
--define(SHADOW_COPY_INTERVAL, 4096).
+-define(SHADOW_COPY_INTERVAL, 4096 * 4).
-define(USE_AVG_HALF_LIFE, 10000.0).
-record(consumer,
@@ -147,7 +163,7 @@
next_msg_num = 1 :: msg_in_id(),
% list of returned msg_in_ids - when checking out it picks from
% this list first before taking low_msg_num
- returns = queue:new() :: queue:queue(msg_in_id()),
+ returns = lqueue:new() :: lqueue:queue(msg_in_id() | '$prefix_msg'),
% a counter of enqueues - used to trigger shadow copy points
enqueue_count = 0 :: non_neg_integer(),
% a map containing all the live processes that have ever enqueued
@@ -180,7 +196,8 @@
%% it instead takes messages from the `messages' map.
%% This is done so that consumers are still served in a deterministic
%% order on recovery.
- prefix_msg_count = 0 :: non_neg_integer()
+ prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(),
+ PrefixMsgs :: non_neg_integer()}
}).
-opaque state() :: #state{}.
@@ -207,19 +224,19 @@
-spec init(config()) -> {state(), ra_machine:effects()}.
init(#{name := Name} = Conf) ->
+ update_state(Conf, #state{name = Name}).
+
+update_state(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
CCH = maps:get(cancel_consumer_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
MH = maps:get(metrics_handler, Conf, undefined),
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
- #state{name = Name,
- dead_letter_handler = DLH,
- cancel_consumer_handler = CCH,
- become_leader_handler = BLH,
- metrics_handler = MH,
- shadow_copy_interval = SHI}.
-
-
+ State#state{dead_letter_handler = DLH,
+ cancel_consumer_handler = CCH,
+ become_leader_handler = BLH,
+ metrics_handler = MH,
+ shadow_copy_interval = SHI}.
% msg_ids are scoped per consumer
% ra_indexes holds all raft indexes for enqueues currently on queue
@@ -228,9 +245,9 @@ init(#{name := Name} = Conf) ->
{state(), ra_machine:effects(), Reply :: term()}.
apply(#{index := RaftIdx}, {enqueue, From, Seq, RawMsg}, Effects0, State00) ->
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, Effects0, State00) of
- {ok, State0, Effects} ->
- State = append_to_master_index(RaftIdx, State0),
- checkout(State, Effects);
+ {ok, State0, Effects1} ->
+ {State, Effects, ok} = checkout(State0, Effects1),
+ {append_to_master_index(RaftIdx, State), Effects, ok};
{duplicate, State, Effects} ->
{State, Effects, ok}
end;
@@ -263,7 +280,7 @@ apply(_, {return, MsgIds, ConsumerId}, Effects0,
#{ConsumerId := Con0 = #consumer{checked_out = Checked0}} ->
Checked = maps:without(MsgIds, Checked0),
Returned = maps:with(MsgIds, Checked0),
- MsgNumMsgs = [M || M <- maps:values(Returned)],
+ MsgNumMsgs = maps:values(Returned),
return(ConsumerId, MsgNumMsgs, Con0, Checked, Effects0, State);
_ ->
{State, Effects0, ok}
@@ -313,7 +330,8 @@ apply(_, {credit, NewCredit, RemoteDelCnt, Drain, ConsumerId}, Effects0,
end;
apply(_, {checkout, {dequeue, _}, {_Tag, _Pid}}, Effects0,
#state{messages = M,
- prefix_msg_count = 0} = State0) when map_size(M) == 0 ->
+ prefix_msg_counts = {0, 0}} = State0) when map_size(M) == 0 ->
+ %% FIXME: also check if there are returned messages
%% TODO do we need metric visibility of empty get requests?
{State0, Effects0, {dequeue, empty}};
apply(Meta, {checkout, {dequeue, settled}, ConsumerId},
@@ -357,7 +375,7 @@ apply(#{index := RaftIdx}, purge, Effects0,
{StateAcc0, EffectsAcc0, ok}) ->
MsgRaftIdxs = [RIdx || {_MsgInId, {RIdx, _}}
<- maps:values(Checked0)],
- complete(ConsumerId, MsgRaftIdxs, C,
+ complete(ConsumerId, MsgRaftIdxs, maps:size(Checked0), C,
#{}, EffectsAcc0, StateAcc0)
end, {State0, Effects0, ok}, Cons0),
{State, Effects, _} =
@@ -365,7 +383,7 @@ apply(#{index := RaftIdx}, purge, Effects0,
RaftIdx, Indexes,
State1#state{ra_indexes = rabbit_fifo_index:empty(),
messages = #{},
- returns = queue:new(),
+ returns = lqueue:new(),
low_msg_num = undefined}, Effects1),
{State, [garbage_collection | Effects], {purge, Total}};
apply(_, {down, ConsumerPid, noconnection},
@@ -374,10 +392,16 @@ apply(_, {down, ConsumerPid, noconnection},
Node = node(ConsumerPid),
% mark all consumers and enqueuers as suspect
% and monitor the node
- Cons = maps:map(fun({_, P}, C) when node(P) =:= Node ->
- C#consumer{suspected_down = true};
- (_, C) -> C
- end, Cons0),
+ {Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C,
+ {Co, St0}) when node(P) =:= Node ->
+ St = return_all(St0, Checked0),
+ {maps:put(K, C#consumer{suspected_down = true,
+ checked_out = #{}},
+ Co),
+ St};
+ (K, C, {Co, St}) ->
+ {maps:put(K, C, Co), St}
+ end, {#{}, State0}, Cons0),
Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
E#enqueuer{suspected_down = true};
(_, E) -> E
@@ -388,7 +412,7 @@ apply(_, {down, ConsumerPid, noconnection},
_ ->
[{monitor, node, Node} | Effects0]
end,
- {State0#state{consumers = Cons, enqueuers = Enqs}, Effects, ok};
+ {State#state{consumers = Cons, enqueuers = Enqs}, Effects, ok};
apply(_, {down, Pid, _Info}, Effects0,
#state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
@@ -411,7 +435,8 @@ apply(_, {down, Pid, _Info}, Effects0,
checkout(State2, Effects1);
apply(_, {nodeup, Node}, Effects0,
#state{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
+ enqueuers = Enqs0,
+ service_queue = SQ0} = State0) ->
%% A node we are monitoring has come back.
%% If we have suspected any processes of being
%% down we should now re-issue the monitors for them to detect if they're
@@ -427,26 +452,48 @@ apply(_, {nodeup, Node}, Effects0,
(_, _, Acc) -> Acc
end, [], Enqs0),
Monitors = [{monitor, process, P} || P <- Cons ++ Enqs],
+ Enqs1 = maps:map(fun(P, E) when node(P) =:= Node ->
+ E#enqueuer{suspected_down = false};
+ (_, E) -> E
+ end, Enqs0),
+ {Cons1, SQ, Effects} =
+ maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
+ when node(P) =:= Node ->
+ update_or_remove_sub(
+ ConsumerId, C#consumer{suspected_down = false},
+ CAcc, SQAcc, EAcc);
+ (_, _, Acc) ->
+ Acc
+ end, {Cons0, SQ0, Effects0}, Cons0),
% TODO: avoid list concat
- {State0, Monitors ++ Effects0, ok};
+ checkout(State0#state{consumers = Cons1, enqueuers = Enqs1,
+ service_queue = SQ}, Monitors ++ Effects);
apply(_, {nodedown, _Node}, Effects, State) ->
- {State, Effects, ok}.
+ {State, Effects, ok};
+apply(_, {update_state, Conf}, Effects, State) ->
+ {update_state(Conf, State), Effects, ok}.
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
-state_enter(leader, #state{consumers = Custs,
+state_enter(leader, #state{consumers = Cons,
enqueuers = Enqs,
name = Name,
+ prefix_msg_counts = {0, 0},
become_leader_handler = BLH}) ->
% return effects to monitor all current consumers and enqueuers
- ConMons = [{monitor, process, P} || {_, P} <- maps:keys(Custs)],
- EnqMons = [{monitor, process, P} || P <- maps:keys(Enqs)],
- Effects = ConMons ++ EnqMons,
+ Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]),
+ Mons = [{monitor, process, P} || P <- Pids],
+ Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
+ Effects = Mons ++ Nots,
case BLH of
undefined ->
Effects;
{Mod, Fun, Args} ->
[{mod_call, Mod, Fun, Args ++ [Name]} | Effects]
end;
+state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts})
+ when PrefixMsgCounts =/= {0, 0} ->
+ %% TODO: remove assertion?
+ exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts});
state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0}) ->
Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0),
[{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, Custs))];
@@ -583,9 +630,7 @@ cancel_consumer(ConsumerId,
{Effects0, #state{consumers = C0, name = Name} = S0}) ->
case maps:take(ConsumerId, C0) of
{#consumer{checked_out = Checked0}, Cons} ->
- S = maps:fold(fun (_, {MsgNum, Msg}, S) ->
- return_one(MsgNum, Msg, S)
- end, S0, Checked0),
+ S = return_all(S0, Checked0),
Effects = cancel_consumer_effects(ConsumerId, Name, S, Effects0),
case maps:size(Cons) of
0 ->
@@ -678,8 +723,8 @@ return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked,
end,
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
- State1 = lists:foldl(fun('$prefix_msg', #state{prefix_msg_count = MsgCount} = S0) ->
- S0#state{prefix_msg_count = MsgCount + 1};
+ State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) ->
+ return_one(0, Msg, S0);
({MsgNum, Msg}, S0) ->
return_one(MsgNum, Msg, S0)
end, State0, MsgNumMsgs),
@@ -688,14 +733,14 @@ return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked,
Effects).
% used to processes messages that are finished
-complete(ConsumerId, MsgRaftIdxs,
+complete(ConsumerId, MsgRaftIdxs, NumDiscarded,
Con0, Checked, Effects0,
#state{consumers = Cons0, service_queue = SQ0,
ra_indexes = Indexes0} = State0) ->
- %% credit_mode = simple_prefetch should automatically top-up credit as messages
- %% are simple_prefetch or otherwise returned
+ %% credit_mode = simple_prefetch should automatically top-up credit
+ %% as messages are simple_prefetch or otherwise returned
Con = Con0#consumer{checked_out = Checked,
- credit = increase_credit(Con0, length(MsgRaftIdxs))},
+ credit = increase_credit(Con0, NumDiscarded)},
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs),
@@ -721,7 +766,10 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId,
Checked = maps:without(MsgIds, Checked0),
Discarded = maps:with(MsgIds, Checked0),
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
+ %% need to pass the length of discarded as $prefix_msgs would be filtered
+ %% by the above list comprehension
{State1, Effects1, _} = complete(ConsumerId, MsgRaftIdxs,
+ maps:size(Discarded),
Con0, Checked, Effects0, State0),
{State, Effects, _} = checkout(State1, Effects1),
% settle metrics are incremented separately
@@ -749,6 +797,7 @@ cancel_consumer_effects(Pid, Name,
update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
#state{ra_indexes = Indexes,
+ % prefix_msg_count = 0,
messages = Messages} = State, Effects) ->
case rabbit_fifo_index:size(Indexes) of
0 when map_size(Messages) =:= 0 ->
@@ -777,6 +826,9 @@ update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
end.
% TODO update message then update messages and returns in single operations
+return_one(0, '$prefix_msg',
+ #state{returns = Returns} = State0) ->
+ State0#state{returns = lqueue:in('$prefix_msg', Returns)};
return_one(MsgNum, {RaftId, {Header0, RawMsg}},
#state{messages = Messages,
returns = Returns} = State0) ->
@@ -786,8 +838,14 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
Msg = {RaftId, {Header, RawMsg}},
% this should not affect the release cursor in any way
State0#state{messages = maps:put(MsgNum, Msg, Messages),
- returns = queue:in(MsgNum, Returns)}.
+ returns = lqueue:in(MsgNum, Returns)}.
+return_all(State, Checked) ->
+ maps:fold(fun (_, '$prefix_msg', S) ->
+ return_one(0, '$prefix_msg', S);
+ (_, {MsgNum, Msg}, S) ->
+ return_one(MsgNum, Msg, S)
+ end, State, Checked).
checkout(State, Effects) ->
checkout0(checkout_one(State), Effects, #{}).
@@ -813,13 +871,20 @@ append_send_msg_effects(Effects0, AccMap) ->
end, Effects0, AccMap),
[{aux, active} | Effects].
+next_checkout_message(#state{prefix_msg_counts = {PReturns, P}} = State)
+ when PReturns > 0 ->
+ %% there are prefix returns, these should be served first
+ {'$prefix_msg', State#state{prefix_msg_counts = {PReturns - 1, P}}};
next_checkout_message(#state{returns = Returns,
low_msg_num = Low0,
+ prefix_msg_counts = {R, P},
next_msg_num = NextMsgNum} = State) ->
%% use peek rather than out there as the most likely case is an empty
%% queue
- case queue:peek(Returns) of
- empty ->
+ case lqueue:peek(Returns) of
+ {value, Next} ->
+ {Next, State#state{returns = lqueue:drop(Returns)}};
+ empty when P == 0 ->
case Low0 of
undefined ->
{undefined, State};
@@ -832,25 +897,32 @@ next_checkout_message(#state{returns = Returns,
{Low0, State#state{low_msg_num = Low}}
end
end;
- {value, Next} ->
- {Next, State#state{returns = queue:drop(Returns)}}
+ empty ->
+ %% There are prefix msgs
+ {'$prefix_msg', State#state{prefix_msg_counts = {R, P - 1}}}
end.
-take_next_msg(#state{prefix_msg_count = 0,
- messages = Messages0} = State0) ->
- {NextMsgInId, State} = next_checkout_message(State0),
- %% messages are available
- case maps:take(NextMsgInId, Messages0) of
- {IdxMsg, Messages} ->
- {{NextMsgInId, IdxMsg}, State, Messages, 0};
- error ->
- error
- end;
-take_next_msg(#state{prefix_msg_count = MsgCount,
- messages = Messages} = State) ->
- %% there is still a prefix message count for the consumer
- %% "fake" a '$prefix_msg' message
- {'$prefix_msg', State, Messages, MsgCount - 1}.
+%% next message is determined as follows:
+%% First we check if there are are prefex returns
+%% Then we check if there are current returns
+%% then we check prefix msgs
+%% then we check current messages
+%%
+%% When we return it is always done to the current return queue
+%% for both prefix messages and current messages
+take_next_msg(#state{messages = Messages0} = State0) ->
+ case next_checkout_message(State0) of
+ {'$prefix_msg', State} ->
+ {'$prefix_msg', State, Messages0};
+ {NextMsgInId, State} ->
+ %% messages are available
+ case maps:take(NextMsgInId, Messages0) of
+ {IdxMsg, Messages} ->
+ {{NextMsgInId, IdxMsg}, State, Messages};
+ error ->
+ error
+ end
+ end.
send_msg_effect({CTag, CPid}, Msgs) ->
{send_msg, CPid, {delivery, CTag, Msgs}, ra_event}.
@@ -861,7 +933,7 @@ checkout_one(#state{service_queue = SQ0,
case queue:peek(SQ0) of
{value, ConsumerId} ->
case take_next_msg(InitState) of
- {ConsumerMsg, State0, Messages, PrefMsgC} ->
+ {ConsumerMsg, State0, Messages} ->
SQ1 = queue:drop(SQ0),
%% there are consumers waiting to be serviced
%% process consumer checkout
@@ -871,6 +943,8 @@ checkout_one(#state{service_queue = SQ0,
%% can happen when draining
%% recurse without consumer on queue
checkout_one(InitState#state{service_queue = SQ1});
+ {ok, #consumer{suspected_down = true}} ->
+ checkout_one(InitState#state{service_queue = SQ1});
{ok, #consumer{checked_out = Checked0,
next_msg_id = Next,
credit = Credit,
@@ -885,7 +959,6 @@ checkout_one(#state{service_queue = SQ0,
Cons0, SQ1, []),
State = State0#state{service_queue = SQ,
messages = Messages,
- prefix_msg_count = PrefMsgC,
consumers = Cons},
Msg = case ConsumerMsg of
'$prefix_msg' -> '$prefix_msg';
@@ -976,17 +1049,30 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
%% creates a dehydrated version of the current state to be cached and
%% potentially used to for a snaphot at a later point
-dehydrate_state(#state{messages = Messages0,
+dehydrate_state(#state{messages = Messages,
consumers = Consumers,
- prefix_msg_count = MsgCount} = State) ->
+ returns = Returns,
+ prefix_msg_counts = {PrefRetCnt, MsgCount}} = State) ->
+ %% TODO: optimise to avoid having to iterate the queue to get the number
+ %% of current returned messages
+ RetLen = lqueue:len(Returns), % O(1)
+ CurReturns = length([R || R <- lqueue:to_list(Returns),
+ R =/= '$prefix_msg']),
+ PrefixMsgCnt = MsgCount + maps:size(Messages) - CurReturns,
State#state{messages = #{},
ra_indexes = rabbit_fifo_index:empty(),
low_msg_num = undefined,
consumers = maps:map(fun (_, C) ->
- C#consumer{checked_out = #{}}
+ dehydrate_consumer(C)
end, Consumers),
- returns = queue:new(),
- prefix_msg_count = maps:size(Messages0) + MsgCount}.
+ returns = lqueue:new(),
+ %% messages include returns
+ prefix_msg_counts = {RetLen + PrefRetCnt,
+ PrefixMsgCnt}}.
+
+dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
+ Checked = maps:map(fun (_, _) -> '$prefix_msg' end, Checked0),
+ Con#consumer{checked_out = Checked}.
-ifdef(TEST).
@@ -1289,6 +1375,20 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() ->
?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3),
ok.
+down_with_noconnection_returns_unack_test() ->
+ Pid = spawn(fun() -> ok end),
+ Cid = {<<"down_with_noconnect">>, Pid},
+ {State0, _} = enq(1, 1, second, test_init(test)),
+ ?assertEqual(1, maps:size(State0#state.messages)),
+ ?assertEqual(0, lqueue:len(State0#state.returns)),
+ {State1, {_, _}} = deq(2, Cid, unsettled, State0),
+ ?assertEqual(0, maps:size(State1#state.messages)),
+ ?assertEqual(0, lqueue:len(State1#state.returns)),
+ {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, [], State1),
+ ?assertEqual(1, maps:size(State2a#state.messages)),
+ ?assertEqual(1, lqueue:len(State2a#state.returns)),
+ ok.
+
down_with_noproc_enqueuer_is_cleaned_up_test() ->
State00 = test_init(test),
Pid = spawn(fun() -> ok end),
@@ -1351,7 +1451,7 @@ tick_test() ->
ok.
enq_deq_snapshot_recover_test() ->
- Tag = <<"release_cursor_snapshot_state_test">>,
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
% OthPid = spawn(fun () -> ok end),
% Oth = {<<"oth">>, OthPid},
@@ -1408,20 +1508,49 @@ snapshot_recover_test() ->
],
run_snapshot_test(?FUNCTION_NAME, Commands).
-enq_deq_return_snapshot_recover_test() ->
+enq_deq_return_settle_snapshot_test() ->
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
+ Cid = {Tag, self()},
+ Commands = [
+ {enqueue, self(), 1, one}, %% to Cid
+ {checkout, {auto, 1, simple_prefetch}, Cid},
+ {return, [0], Cid}, %% should be re-delivered to Cid
+ {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2
+ {settle, [1], Cid},
+ {settle, [2], Cid}
+ ],
+ run_snapshot_test(?FUNCTION_NAME, Commands).
+
+return_prefix_msg_count_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
- OthPid = spawn(fun () -> ok end),
- Oth = {<<"oth">>, OthPid},
Commands = [
{enqueue, self(), 1, one},
- {enqueue, self(), 2, two},
- {checkout, {dequeue, unsettled}, Oth},
- {checkout, {dequeue, unsettled}, Cid},
- {settle, [0], Oth},
- {return, [0], Cid},
+ {checkout, {auto, 1, simple_prefetch}, Cid},
+ {checkout, cancel, Cid},
+ {enqueue, self(), 2, two} %% Cid prefix_msg_count: 2
+ ],
+ Indexes = lists:seq(1, length(Commands)),
+ Entries = lists:zip(Indexes, Commands),
+ {State, _Effects} = run_log(test_init(?FUNCTION_NAME), Entries),
+ ?debugFmt("return_prefix_msg_count_test state ~n~p~n", [State]),
+ ok.
+
+
+return_settle_snapshot_test() ->
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
+ Cid = {Tag, self()},
+ Commands = [
+ {enqueue, self(), 1, one}, %% to Cid
+ {checkout, {auto, 1, simple_prefetch}, Cid},
+ {return, [0], Cid}, %% should be re-delivered to Oth
+ {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2
+ {settle, [1], Cid},
+ {return, [2], Cid},
+ {settle, [3], Cid},
{enqueue, self(), 3, three},
- purge
+ purge,
+ {enqueue, self(), 4, four}
],
run_snapshot_test(?FUNCTION_NAME, Commands).
@@ -1436,17 +1565,67 @@ enq_check_settle_snapshot_recover_test() ->
{settle, [0], Cid},
{enqueue, self(), 3, three},
{settle, [2], Cid}
+ ],
+ % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
+ run_snapshot_test(?FUNCTION_NAME, Commands).
+enq_check_settle_snapshot_purge_test() ->
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
+ Cid = {Tag, self()},
+ Commands = [
+ {checkout, {auto, 2, simple_prefetch}, Cid},
+ {enqueue, self(), 1, one},
+ {enqueue, self(), 2, two},
+ {settle, [1], Cid},
+ {settle, [0], Cid},
+ {enqueue, self(), 3, three},
+ purge
+ ],
+ % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
+ run_snapshot_test(?FUNCTION_NAME, Commands).
+
+enq_check_settle_duplicate_test() ->
+ %% duplicate settle commands are likely
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
+ Cid = {Tag, self()},
+ Commands = [
+ {checkout, {auto, 2, simple_prefetch}, Cid},
+ {enqueue, self(), 1, one}, %% 0
+ {enqueue, self(), 2, two}, %% 0
+ {settle, [0], Cid},
+ {settle, [1], Cid},
+ {settle, [1], Cid},
+ {enqueue, self(), 3, three},
+ {settle, [2], Cid}
],
% ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
run_snapshot_test(?FUNCTION_NAME, Commands).
+multi_return_snapshot_test() ->
+ %% this was discovered using property testing
+ C1 = {<<>>, c:pid(0,6723,1)},
+ C2 = {<<0>>,c:pid(0,6723,1)},
+ E = c:pid(0,6720,1),
+ Commands = [
+ {checkout,{auto,2,simple_prefetch},C1},
+ {enqueue,E,1,msg},
+ {enqueue,E,2,msg},
+ {checkout,cancel,C1}, %% both on returns queue
+ {checkout,{auto,1,simple_prefetch},C2}, % on on return one on C2
+ {return,[0],C2}, %% E1 in returns, E2 with C2
+ {return,[1],C2}, %% E2 in returns E1 with C2
+ {settle,[2],C2} %% E2 with C2
+ ],
+ run_snapshot_test(?FUNCTION_NAME, Commands),
+ ok.
+
+
run_snapshot_test(Name, Commands) ->
%% create every incremental permuation of the commands lists
%% and run the snapshot tests against that
[begin
- % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
+ ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]),
run_snapshot_test0(Name, C)
end || C <- prefixes(Commands, 1, [])].
@@ -1459,6 +1638,7 @@ run_snapshot_test0(Name, Commands) ->
Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
(_) -> false
end, Entries),
+ ?debugFmt("running from snapshot: ~b", [SnapIdx]),
{S, _} = run_log(SnapState, Filtered),
% assert log can be restored from any release cursor index
% ?debugFmt("Name ~p Idx ~p S~p~nState~p~nSnapState ~p~nFiltered ~p~n",
@@ -1474,7 +1654,7 @@ prefixes(Source, N, Acc) ->
prefixes(Source, N+1, [X | Acc]).
delivery_query_returns_deliveries_test() ->
- Tag = <<"release_cursor_snapshot_state_test">>,
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
{checkout, {auto, 5, simple_prefetch}, Cid},
@@ -1514,19 +1694,30 @@ state_enter_test() ->
[{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0),
ok.
-leader_monitors_on_state_enter_test() ->
- Cid = {<<"cid">>, self()},
- {State0, [_, _]} = enq(1, 1, first, test_init(test)),
- {State1, _} = check_auto(Cid, 2, State0),
+state_enter_montors_and_notifications_test() ->
+ Oth = spawn(fun () -> ok end),
+ {State0, _} = enq(1, 1, first, test_init(test)),
+ Cid = {<<"adf">>, self()},
+ OthCid = {<<"oth">>, Oth},
+ {State1, _} = check(Cid, 2, State0),
+ {State, _} = check(OthCid, 3, State1),
Self = self(),
- %% as we have an enqueuer _and_ a consumer we chould
- %% get two monitor effects in total, even if they are for the same
- %% processs
+ Effects = state_enter(leader, State),
+
+ %% monitor all enqueuers and consumers
[{monitor, process, Self},
- {monitor, process, Self}] = state_enter(leader, State1),
+ {monitor, process, Oth}] =
+ lists:filter(fun ({monitor, process, _}) -> true;
+ (_) -> false
+ end, Effects),
+ [{send_msg, Self, leader_change, ra_event},
+ {send_msg, Oth, leader_change, ra_event}] =
+ lists:filter(fun ({send_msg, _, leader_change, ra_event}) -> true;
+ (_) -> false
+ end, Effects),
+ ?ASSERT_EFF({monitor, process, _}, Effects),
ok.
-
purge_test() ->
Cid = {<<"purge_test">>, self()},
{State1, _} = enq(1, 1, first, test_init(test)),
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index c087e35fb2..635d85be4a 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -1,3 +1,19 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
%% @doc Provides an easy to consume API for interacting with the {@link rabbit_fifo.}
%% state machine implementation running inside a `ra' raft system.
%%
@@ -21,7 +37,8 @@
handle_ra_event/3,
untracked_enqueue/2,
purge/1,
- cluster_name/1
+ cluster_name/1,
+ update_machine_state/2
]).
-include_lib("ra/include/ra.hrl").
@@ -375,6 +392,14 @@ purge(Node) ->
cluster_name(#state{cluster_name = ClusterName}) ->
ClusterName.
+update_machine_state(Node, Conf) ->
+ case ra:process_command(Node, {update_state, Conf}) of
+ {ok, ok, _} ->
+ ok;
+ Err ->
+ Err
+ end.
+
%% @doc Handles incoming `ra_events'. Events carry both internal "bookeeping"
%% events emitted by the `ra' leader as well as `rabbit_fifo' emitted events such
%% as message deliveries. All ra events need to be handled by {@module}
@@ -438,7 +463,8 @@ handle_ra_event(From, {applied, Seqs},
fun (Cid, {Settled, Returns, Discards}, Acc) ->
add_command(Cid, settle, Settled,
add_command(Cid, return, Returns,
- add_command(Cid, discard, Discards, Acc)))
+ add_command(Cid, discard,
+ Discards, Acc)))
end, [], State1#state.unsent_commands),
Node = pick_node(State2),
%% send all the settlements and returns
@@ -456,10 +482,21 @@ handle_ra_event(From, {applied, Seqs},
end;
handle_ra_event(Leader, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
handle_delivery(Leader, Del, State0);
+handle_ra_event(Leader, {machine, leader_change},
+ #state{leader = Leader} = State) ->
+ %% leader already known
+ {internal, [], [], State};
+handle_ra_event(Leader, {machine, leader_change}, State0) ->
+ %% we need to update leader
+ %% and resend any pending commands
+ State = resend_all_pending(State0#state{leader = Leader}),
+ {internal, [], [], State};
handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) ->
% TODO: how should these be handled? re-sent on timer or try random
{internal, [], [], State0};
handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) ->
+ % ?INFO("rabbit_fifo_client: rejected ~b not leader ~w leader: ~w~n",
+ % [Seq, From, Leader]),
State1 = State0#state{leader = Leader},
State = resend(Seq, State1),
{internal, [], [], State};
@@ -517,7 +554,9 @@ seq_applied({Seq, MaybeAction},
last_applied = Seq}};
error ->
% must have already been resent or removed for some other reason
- {Corrs, Actions, State}
+ % still need to update last_applied or we may inadvertently resend
+ % stuff later
+ {Corrs, Actions, State#state{last_applied = Seq}}
end;
seq_applied(_Seq, Acc) ->
Acc.
@@ -541,7 +580,7 @@ maybe_add_action(Action, Acc, State) ->
{[Action | Acc], State}.
do_resends(From, To, State) when From =< To ->
- ?INFO("doing resends From ~w To ~w~n", [From, To]),
+ % ?INFO("rabbit_fifo_client: doing resends From ~w To ~w~n", [From, To]),
lists:foldl(fun resend/2, State, lists:seq(From, To));
do_resends(_, _, State) ->
State.
@@ -556,6 +595,10 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) ->
State
end.
+resend_all_pending(#state{pending = Pend} = State) ->
+ Seqs = lists:sort(maps:keys(Pend)),
+ lists:foldl(fun resend/2, State, Seqs).
+
handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0,
#state{consumer_deliveries = CDels0} = State0) ->
{LastId, _} = lists:last(IdMsgs),
@@ -610,6 +653,7 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) ->
Missing.
pick_node(#state{leader = undefined, servers = [N | _]}) ->
+ %% TODO: pick random rather that first?
N;
pick_node(#state{leader = Leader}) ->
Leader.
diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl
index e1848862fe..345a99a03c 100644
--- a/src/rabbit_fifo_index.erl
+++ b/src/rabbit_fifo_index.erl
@@ -15,83 +15,83 @@
-include_lib("ra/include/ra.hrl").
-compile({no_auto_import, [size/1]}).
--record(state, {data = #{} :: #{integer() => term()},
- smallest :: undefined | non_neg_integer(),
- largest :: undefined | non_neg_integer()
- }).
+-record(?MODULE, {data = #{} :: #{integer() => term()},
+ smallest :: undefined | non_neg_integer(),
+ largest :: undefined | non_neg_integer()
+ }).
--opaque state() :: #state{}.
+-opaque state() :: #?MODULE{}.
-export_type([state/0]).
-spec empty() -> state().
empty() ->
- #state{}.
+ #?MODULE{}.
-spec fetch(integer(), state()) -> undefined | term().
-fetch(Key, #state{data = Data}) ->
+fetch(Key, #?MODULE{data = Data}) ->
maps:get(Key, Data, undefined).
% only integer keys are supported
-spec append(integer(), term(), state()) -> state().
append(Key, Value,
- #state{data = Data,
+ #?MODULE{data = Data,
smallest = Smallest,
largest = Largest} = State)
when Key > Largest orelse Largest =:= undefined ->
- State#state{data = maps:put(Key, Value, Data),
+ State#?MODULE{data = maps:put(Key, Value, Data),
smallest = ra_lib:default(Smallest, Key),
largest = Key}.
-spec return(integer(), term(), state()) -> state().
-return(Key, Value, #state{data = Data, smallest = Smallest} = State)
+return(Key, Value, #?MODULE{data = Data, smallest = Smallest} = State)
when is_integer(Key) andalso Key < Smallest ->
% TODO: this could potentially result in very large gaps which would
% result in poor performance of smallest/1
% We could try to persist a linked list of "smallests" to make it quicker
% to skip from one to the other - needs measurement
- State#state{data = maps:put(Key, Value, Data),
+ State#?MODULE{data = maps:put(Key, Value, Data),
smallest = Key};
-return(Key, Value, #state{data = Data} = State)
+return(Key, Value, #?MODULE{data = Data} = State)
when is_integer(Key) ->
- State#state{data = maps:put(Key, Value, Data)}.
+ State#?MODULE{data = maps:put(Key, Value, Data)}.
-spec delete(integer(), state()) -> state().
-delete(Smallest, #state{data = Data0,
+delete(Smallest, #?MODULE{data = Data0,
largest = Largest,
smallest = Smallest} = State) ->
Data = maps:remove(Smallest, Data0),
case find_next(Smallest + 1, Largest, Data) of
undefined ->
- State#state{data = Data,
+ State#?MODULE{data = Data,
smallest = undefined,
largest = undefined};
Next ->
- State#state{data = Data, smallest = Next}
+ State#?MODULE{data = Data, smallest = Next}
end;
-delete(Key, #state{data = Data} = State) ->
- State#state{data = maps:remove(Key, Data)}.
+delete(Key, #?MODULE{data = Data} = State) ->
+ State#?MODULE{data = maps:remove(Key, Data)}.
-spec size(state()) -> non_neg_integer().
-size(#state{data = Data}) ->
+size(#?MODULE{data = Data}) ->
maps:size(Data).
-spec smallest(state()) -> undefined | {integer(), term()}.
-smallest(#state{smallest = undefined}) ->
+smallest(#?MODULE{smallest = undefined}) ->
undefined;
-smallest(#state{smallest = Smallest, data = Data}) ->
+smallest(#?MODULE{smallest = Smallest, data = Data}) ->
{Smallest, maps:get(Smallest, Data)}.
-spec next_key_after(non_neg_integer(), state()) -> undefined | integer().
-next_key_after(_Idx, #state{smallest = undefined}) ->
+next_key_after(_Idx, #?MODULE{smallest = undefined}) ->
% map must be empty
undefined;
-next_key_after(Idx, #state{smallest = Smallest,
+next_key_after(Idx, #?MODULE{smallest = Smallest,
largest = Largest})
when Idx+1 < Smallest orelse Idx+1 > Largest ->
undefined;
-next_key_after(Idx, #state{data = Data} = State) ->
+next_key_after(Idx, #?MODULE{data = Data} = State) ->
Next = Idx+1,
case maps:is_key(Next, Data) of
true ->
@@ -101,8 +101,8 @@ next_key_after(Idx, #state{data = Data} = State) ->
end.
-spec map(fun(), state()) -> state().
-map(F, #state{data = Data} = State) ->
- State#state{data = maps:map(F, Data)}.
+map(F, #?MODULE{data = Data} = State) ->
+ State#?MODULE{data = maps:map(F, Data)}.
%% internal
diff --git a/src/rabbit_lager.erl b/src/rabbit_lager.erl
index 21934b5941..da40f207de 100644
--- a/src/rabbit_lager.erl
+++ b/src/rabbit_lager.erl
@@ -20,7 +20,7 @@
%% API
-export([start_logger/0, log_locations/0, fold_sinks/2,
- broker_is_started/0]).
+ broker_is_started/0, set_log_level/1]).
%% For test purposes
-export([configure_lager/0]).
@@ -56,6 +56,34 @@ broker_is_started() ->
ok
end.
+set_log_level(Level) ->
+ IsValidLevel = lists:member(Level, lager_util:levels()),
+ set_log_level(IsValidLevel, Level).
+
+set_log_level(true, Level) ->
+ SinksAndHandlers = [{Sink, gen_event:which_handlers(Sink)} ||
+ Sink <- lager:list_all_sinks()],
+ set_sink_log_level(SinksAndHandlers, Level);
+set_log_level(_, Level) ->
+ {error, {invalid_log_level, Level}}.
+
+set_sink_log_level([], _Level) ->
+ ok;
+set_sink_log_level([{Sink, Handlers}|Rest], Level) ->
+ set_sink_handler_log_level(Sink, Handlers, Level),
+ set_sink_log_level(Rest, Level).
+
+set_sink_handler_log_level(_Sink, [], _Level) ->
+ ok;
+set_sink_handler_log_level(Sink, [Handler|Rest], Level) when is_atom(Handler) ->
+ ok = lager:set_loglevel(Sink, Handler, undefined, Level),
+ set_sink_handler_log_level(Sink, Rest, Level);
+set_sink_handler_log_level(Sink, [{Handler, Id}|Rest], Level) ->
+ ok = lager:set_loglevel(Sink, Handler, Id, Level),
+ set_sink_handler_log_level(Sink, Rest, Level);
+set_sink_handler_log_level(Sink, [_|Rest], Level) ->
+ set_sink_handler_log_level(Sink, Rest, Level).
+
log_locations() ->
ensure_lager_configured(),
DefaultHandlers = application:get_env(lager, handlers, []),
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index a3050c570f..04353423cc 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -462,7 +462,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% immediately after calling is_duplicate). The msg is
%% invalid. We will not see this again, nor will we be
%% further involved in confirming this message, so erase.
- {true, State #state { seen_status = maps:remove(MsgId, SS) }};
+ {{true, drop}, State #state { seen_status = maps:remove(MsgId, SS) }};
{ok, Disposition}
when Disposition =:= confirmed
%% It got published when we were a slave via gm, and
@@ -477,8 +477,8 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% Message was discarded while we were a slave. Confirm now.
%% As above, amqqueue_process will have the entry for the
%% msg_id_to_channel mapping.
- {true, State #state { seen_status = maps:remove(MsgId, SS),
- confirmed = [MsgId | Confirmed] }}
+ {{true, drop}, State #state { seen_status = maps:remove(MsgId, SS),
+ confirmed = [MsgId | Confirmed] }}
end.
set_queue_mode(Mode, State = #state { gm = GM,
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 256d424740..cf431ee04f 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -36,7 +36,7 @@
connection_info_all/0, connection_info_all/1,
emit_connection_info_all/4, emit_connection_info_local/3,
close_connection/2, accept_ack/2,
- tcp_host/1]).
+ handshake/2, tcp_host/1]).
%% Used by TCP-based transports, e.g. STOMP adapter
-export([tcp_listener_addresses/1, tcp_listener_spec/9,
@@ -121,7 +121,6 @@ boot() ->
%% Failures will throw exceptions
_ = boot_listeners(fun boot_tcp/1, application:get_env(rabbit, num_tcp_acceptors, 10), "TCP"),
_ = boot_listeners(fun boot_tls/1, application:get_env(rabbit, num_ssl_acceptors, 10), "TLS"),
- _ = maybe_start_proxy_protocol(),
ok.
boot_listeners(Fun, NumAcceptors, Type) ->
@@ -190,12 +189,6 @@ log_poodle_fail(Context) ->
"'rabbit' section of your configuration file.~n",
[rabbit_misc:otp_release(), Context]).
-maybe_start_proxy_protocol() ->
- case application:get_env(rabbit, proxy_protocol, false) of
- false -> ok;
- true -> application:start(ranch_proxy_protocol)
- end.
-
fix_ssl_options(Config) ->
rabbit_ssl_options:fix(Config).
@@ -263,12 +256,9 @@ start_listener0(Address, NumAcceptors, Protocol, Label, Opts) ->
end.
transport(Protocol) ->
- ProxyProtocol = application:get_env(rabbit, proxy_protocol, false),
- case {Protocol, ProxyProtocol} of
- {amqp, false} -> ranch_tcp;
- {amqp, true} -> ranch_proxy;
- {'amqp/ssl', false} -> ranch_ssl;
- {'amqp/ssl', true} -> ranch_proxy_ssl
+ case Protocol of
+ amqp -> ranch_tcp;
+ 'amqp/ssl' -> ranch_ssl
end.
@@ -368,16 +358,31 @@ close_connection(Pid, Explanation) ->
ok
end.
+handshake(Ref, ProxyProtocol) ->
+ case ProxyProtocol of
+ true ->
+ {ok, ProxyInfo} = ranch:recv_proxy_header(Ref, 1000),
+ {ok, Sock} = ranch:handshake(Ref),
+ tune_buffer_size(Sock),
+ ok = file_handle_cache:obtain(),
+ {ok, {rabbit_proxy_socket, Sock, ProxyInfo}};
+ false ->
+ ranch:handshake(Ref)
+ end.
+
accept_ack(Ref, Sock) ->
ok = ranch:accept_ack(Ref),
- case tune_buffer_size(Sock) of
+ tune_buffer_size(Sock),
+ ok = file_handle_cache:obtain().
+
+tune_buffer_size(Sock) ->
+ case tune_buffer_size1(Sock) of
ok -> ok;
{error, _} -> rabbit_net:fast_close(Sock),
exit(normal)
- end,
- ok = file_handle_cache:obtain().
+ end.
-tune_buffer_size(Sock) ->
+tune_buffer_size1(Sock) ->
case rabbit_net:getopts(Sock, [sndbuf, recbuf, buffer]) of
{ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
rabbit_net:setopts(Sock, [{buffer, BufSz}]);
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index f32d261d20..bbbeb3ce44 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -26,6 +26,8 @@
%%----------------------------------------------------------------------------
+-define(QUEUE, lqueue).
+
-define(UNSENT_MESSAGE_LIMIT, 200).
%% Utilisation average calculations are all in μs.
@@ -128,7 +130,7 @@ consumers(Consumers, Acc) ->
count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
unacknowledged_message_count() ->
- lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
+ lists:sum([?QUEUE:len(C#cr.acktags) || C <- all_ch_record()]).
add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty,
Username, State = #state{consumers = Consumers,
@@ -185,7 +187,7 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
All = priority_queue:join(Consumers, BlockedQ),
ok = erase_ch_record(C),
Filtered = priority_queue:filter(chan_pred(ChPid, true), All),
- {[AckTag || {AckTag, _CTag} <- queue:to_list(ChAckTags)],
+ {[AckTag || {AckTag, _CTag} <- ?QUEUE:to_list(ChAckTags)],
tags(priority_queue:to_list(Filtered)),
State#state{consumers = remove_consumers(ChPid, Consumers)}}
end.
@@ -267,7 +269,7 @@ deliver_to_consumer(FetchFun,
rabbit_channel:deliver(ChPid, CTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
- true -> queue:in({AckTag, CTag}, ChAckTags);
+ true -> ?QUEUE:in({AckTag, CTag}, ChAckTags);
false -> ChAckTags
end,
update_ch_record(C#cr{acktags = ChAckTags1,
@@ -280,7 +282,7 @@ is_blocked(Consumer = {ChPid, _C}) ->
record_ack(ChPid, LimiterPid, AckTag) ->
C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid),
- update_ch_record(C#cr{acktags = queue:in({AckTag, none}, ChAckTags)}),
+ update_ch_record(C#cr{acktags = ?QUEUE:in({AckTag, none}, ChAckTags)}),
ok.
subtract_acks(ChPid, AckTags, State) ->
@@ -308,9 +310,9 @@ subtract_acks(ChPid, AckTags, State) ->
subtract_acks([], [], CTagCounts, AckQ) ->
{CTagCounts, AckQ};
subtract_acks([], Prefix, CTagCounts, AckQ) ->
- {CTagCounts, queue:join(queue:from_list(lists:reverse(Prefix)), AckQ)};
+ {CTagCounts, ?QUEUE:join(?QUEUE:from_list(lists:reverse(Prefix)), AckQ)};
subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) ->
- case queue:out(AckQ) of
+ case ?QUEUE:out(AckQ) of
{{value, {T, CTag}}, QTail} ->
subtract_acks(TL, Prefix,
maps:update_with(CTag, fun (Old) -> Old + 1 end, 1, CTagCounts), QTail);
@@ -437,7 +439,7 @@ ch_record(ChPid, LimiterPid) ->
Limiter = rabbit_limiter:client(LimiterPid),
C = #cr{ch_pid = ChPid,
monitor_ref = MonitorRef,
- acktags = queue:new(),
+ acktags = ?QUEUE:new(),
consumer_count = 0,
blocked_consumers = priority_queue:new(),
limiter = Limiter,
@@ -450,7 +452,7 @@ ch_record(ChPid, LimiterPid) ->
update_ch_record(C = #cr{consumer_count = ConsumerCount,
acktags = ChAckTags,
unsent_message_count = UnsentMessageCount}) ->
- case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of
+ case {?QUEUE:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of
{true, 0, 0} -> ok = erase_ch_record(C);
_ -> ok = store_ch_record(C)
end,
diff --git a/src/rabbit_quorum_memory_manager.erl b/src/rabbit_quorum_memory_manager.erl
new file mode 100644
index 0000000000..347f7f205e
--- /dev/null
+++ b/src/rabbit_quorum_memory_manager.erl
@@ -0,0 +1,76 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+%%
+-module(rabbit_quorum_memory_manager).
+
+-include("rabbit.hrl").
+
+-export([init/1, handle_call/2, handle_event/2, handle_info/2,
+ terminate/2, code_change/3]).
+-export([register/0, unregister/0]).
+
+-record(state, {last_roll_over,
+ interval}).
+
+-rabbit_boot_step({rabbit_quorum_memory_manager,
+ [{description, "quorum memory manager"},
+ {mfa, {?MODULE, register, []}},
+ {cleanup, {?MODULE, unregister, []}},
+ {requires, rabbit_event},
+ {enables, recovery}]}).
+
+register() ->
+ gen_event:add_handler(rabbit_alarm, ?MODULE, []).
+
+unregister() ->
+ gen_event:delete_handler(rabbit_alarm, ?MODULE, []).
+
+init([]) ->
+ {ok, #state{interval = interval()}}.
+
+handle_call( _, State) ->
+ {ok, ok, State}.
+
+handle_event({set_alarm, {{resource_limit, memory, Node}, []}},
+ #state{last_roll_over = undefined} = State) when Node == node() ->
+ {ok, force_roll_over(State)};
+handle_event({set_alarm, {{resource_limit, memory, Node}, []}},
+ #state{last_roll_over = Last, interval = Interval } = State)
+ when Node == node() ->
+ Now = erlang:system_time(millisecond),
+ case Now > (Last + Interval) of
+ true ->
+ {ok, force_roll_over(State)};
+ false ->
+ {ok, State}
+ end;
+handle_event(_, State) ->
+ {ok, State}.
+
+handle_info(_, State) ->
+ {ok, State}.
+
+terminate(_, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+force_roll_over(State) ->
+ ra_log_wal:force_roll_over(ra_log_wal),
+ State#state{last_roll_over = erlang:system_time(millisecond)}.
+
+interval() ->
+ application:get_env(rabbit, min_wal_roll_over_interval, 20000).
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index e24b907600..5e854a4657 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -17,7 +17,7 @@
-module(rabbit_quorum_queue).
-export([init_state/2, handle_event/2]).
--export([declare/1, recover/1, stop/1, delete/4, delete_immediately/1]).
+-export([declare/1, recover/1, stop/1, delete/4, delete_immediately/2]).
-export([info/1, info/2, stat/1, infos/1]).
-export([ack/3, reject/4, basic_get/4, basic_consume/9, basic_cancel/4]).
-export([credit/4]).
@@ -34,6 +34,7 @@
-export([add_member/3]).
-export([delete_member/3]).
-export([requeue/3]).
+-export([policy_changed/2]).
-export([cleanup_data_dir/0]).
-include_lib("rabbit_common/include/rabbit.hrl").
@@ -74,7 +75,7 @@
-spec infos(rabbit_types:r('queue')) -> rabbit_types:infos().
-spec stat(rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}.
-spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'.
--spec status(rabbit_types:vhost(), Name :: atom()) -> rabbit_types:infos() | {error, term()}.
+-spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> rabbit_types:infos() | {error, term()}.
-define(STATISTICS_KEYS,
[policy,
@@ -93,14 +94,17 @@
%%----------------------------------------------------------------------------
-spec init_state(ra_server_id(), rabbit_types:r('queue')) ->
- rabbit_fifo_client:state().
+ rabbit_fifo_client:state().
init_state({Name, _}, QName) ->
{ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit),
- {ok, #amqqueue{pid = Leader, quorum_nodes = Nodes0}} =
+ %% This lookup could potentially return an {error, not_found}, but we do not
+ %% know what to do if the queue has `disappeared`. Let it crash.
+ {ok, #amqqueue{pid = Leader, quorum_nodes = Nodes}} =
rabbit_amqqueue:lookup(QName),
%% Ensure the leader is listed first
- Nodes = [Leader | lists:delete(Leader, Nodes0)],
- rabbit_fifo_client:init(QName, Nodes, SoftLimit,
+ Servers0 = [{Name, N} || N <- Nodes],
+ Servers = [Leader | lists:delete(Leader, Servers0)],
+ rabbit_fifo_client:init(qname_to_rname(QName), Servers, SoftLimit,
fun() -> credit_flow:block(Name), ok end,
fun() -> credit_flow:unblock(Name), ok end).
@@ -148,12 +152,14 @@ declare(#amqqueue{name = QName,
-ra_machine(Q = #amqqueue{name = QName}) ->
- {module, rabbit_fifo,
- #{dead_letter_handler => dlx_mfa(Q),
- cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]},
- become_leader_handler => {?MODULE, become_leader, [QName]},
- metrics_handler => {?MODULE, update_metrics, [QName]}}}.
+ra_machine(Q) ->
+ {module, rabbit_fifo, ra_machine_config(Q)}.
+
+ra_machine_config(Q = #amqqueue{name = QName}) ->
+ #{dead_letter_handler => dlx_mfa(Q),
+ cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]},
+ become_leader_handler => {?MODULE, become_leader, [QName]},
+ metrics_handler => {?MODULE, update_metrics, [QName]}}.
cancel_consumer_handler(QName, {ConsumerTag, ChPid}, _Name) ->
Node = node(ChPid),
@@ -272,9 +278,10 @@ stop(VHost) ->
delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = QNodes},
_IfUnused, _IfEmpty, ActingUser) ->
%% TODO Quorum queue needs to support consumer tracking for IfUnused
+ Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
Msgs = quorum_messages(Name),
_ = rabbit_amqqueue:internal_delete(QName, ActingUser),
- case ra:delete_cluster([{Name, Node} || Node <- QNodes], 120000) of
+ case ra:delete_cluster([{Name, Node} || Node <- QNodes], Timeout) of
{ok, {_, LeaderNode} = Leader} ->
MRef = erlang:monitor(process, Leader),
receive
@@ -300,11 +307,10 @@ delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = Q
end
end.
-delete_immediately({Name, _} = QPid) ->
- QName = queue_name(Name),
- _ = rabbit_amqqueue:internal_delete(QName, ?INTERNAL_USER),
- ok = ra:delete_cluster([QPid]),
- rabbit_core_metrics:queue_deleted(QName),
+delete_immediately(Resource, {_Name, _} = QPid) ->
+ _ = rabbit_amqqueue:internal_delete(Resource, ?INTERNAL_USER),
+ {ok, _} = ra:delete_cluster([QPid]),
+ rabbit_core_metrics:queue_deleted(Resource),
ok.
ack(CTag, MsgIds, QState) ->
@@ -330,8 +336,10 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
case rabbit_fifo_client:dequeue(CTag, Settlement, QState0) of
{ok, empty, QState} ->
{ok, empty, QState};
- {ok, {MsgId, {MsgHeader, Msg}}, QState} ->
- IsDelivered = maps:is_key(delivery_count, MsgHeader),
+ {ok, {MsgId, {MsgHeader, Msg0}}, QState} ->
+ Count = maps:get(delivery_count, MsgHeader, 0),
+ IsDelivered = Count > 0,
+ Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0),
{ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, QState};
{timeout, _} ->
{error, timeout}
@@ -411,6 +419,10 @@ maybe_delete_data_dir(UId) ->
ok
end.
+policy_changed(QName, Node) ->
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ rabbit_fifo_client:update_machine_state(Node, ra_machine_config(Q)).
+
cluster_state(Name) ->
case whereis(Name) of
undefined -> down;
@@ -547,9 +559,13 @@ args_policy_lookup(Name, Resolve, Q = #amqqueue{arguments = Args}) ->
dead_letter_publish(undefined, _, _, _) ->
ok;
dead_letter_publish(X, RK, QName, ReasonMsgs) ->
- {ok, Exchange} = rabbit_exchange:lookup(X),
- [rabbit_dead_letter:publish(Msg, Reason, Exchange, RK, QName)
- || {Reason, Msg} <- ReasonMsgs].
+ case rabbit_exchange:lookup(X) of
+ {ok, Exchange} ->
+ [rabbit_dead_letter:publish(Msg, Reason, Exchange, RK, QName)
+ || {Reason, Msg} <- ReasonMsgs];
+ {error, not_found} ->
+ ok
+ end.
%% TODO escape hack
qname_to_rname(#resource{virtual_host = <<"/">>, name = Name}) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 91002d0b94..8370d08ed9 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -57,12 +57,12 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/3, info_keys/0, info/1, info/2,
+-export([start_link/2, info_keys/0, info/1, info/2,
shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/4, mainloop/4, recvloop/4]).
+-export([init/3, mainloop/4, recvloop/4]).
-export([conserve_resources/3, server_properties/1]).
@@ -157,7 +157,7 @@
%%--------------------------------------------------------------------------
--spec start_link(pid(), any(), rabbit_net:socket()) -> rabbit_types:ok(pid()).
+-spec start_link(pid(), any()) -> rabbit_types:ok(pid()).
-spec info_keys() -> rabbit_types:info_keys().
-spec info(pid()) -> rabbit_types:infos().
-spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos().
@@ -170,7 +170,7 @@
rabbit_framing:amqp_table().
%% These specs only exists to add no_return() to keep dialyzer happy
--spec init(pid(), pid(), any(), rabbit_net:socket()) -> no_return().
+-spec init(pid(), pid(), any()) -> no_return().
-spec start_connection(pid(), pid(), any(), rabbit_net:socket()) ->
no_return().
@@ -181,18 +181,18 @@
%%--------------------------------------------------------------------------
-start_link(HelperSup, Ref, Sock) ->
- Pid = proc_lib:spawn_link(?MODULE, init, [self(), HelperSup, Ref, Sock]),
+start_link(HelperSup, Ref) ->
+ Pid = proc_lib:spawn_link(?MODULE, init, [self(), HelperSup, Ref]),
{ok, Pid}.
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent, HelperSup, Ref, Sock) ->
+init(Parent, HelperSup, Ref) ->
?LG_PROCESS_TYPE(reader),
- RealSocket = rabbit_net:unwrap_socket(Sock),
- rabbit_networking:accept_ack(Ref, RealSocket),
+ {ok, Sock} = rabbit_networking:handshake(Ref,
+ application:get_env(rabbit, proxy_protocol, false)),
Deb = sys:debug_options([]),
start_connection(Parent, HelperSup, Deb, Sock).
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index c460b02e5b..e462fc6bc0 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -208,7 +208,16 @@ delete_storage(VHost) ->
VhostDir = msg_store_dir_path(VHost),
rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]),
%% Message store should be closed when vhost supervisor is closed.
- ok = rabbit_file:recursive_delete([VhostDir]).
+ case rabbit_file:recursive_delete([VhostDir]) of
+ ok -> ok;
+ {error, {_, enoent}} ->
+ %% a concurrent delete did the job for us
+ rabbit_log:warning("Tried to delete storage directories for vhost '~s', it failed with an ENOENT", [VHost]),
+ ok;
+ Other ->
+ rabbit_log:warning("Tried to delete storage directories for vhost '~s': ~p", [VHost, Other]),
+ Other
+ end.
assert_benign(ok, _) -> ok;
assert_benign({ok, _}, _) -> ok;
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 265bcb45e3..e495ab8677 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -35,7 +35,7 @@ memory() ->
{Sums, _Other} = sum_processes(
lists:append(All), distinguishers(), [memory]),
- [Qs, QsSlave, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther,
+ [Qs, QsSlave, Qqs, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther,
MsgIndexProc, MgmtDbProc, Plugins] =
[aggregate(Names, Sums, memory, fun (X) -> X end)
|| Names <- distinguished_interesting_sups()],
@@ -69,7 +69,7 @@ memory() ->
OtherProc = Processes
- ConnsReader - ConnsWriter - ConnsChannel - ConnsOther
- - Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc,
+ - Qs - QsSlave - Qqs - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc,
[
%% Connections
@@ -81,6 +81,7 @@ memory() ->
%% Queues
{queue_procs, Qs},
{queue_slave_procs, QsSlave},
+ {quorum_queue_procs, Qqs},
%% Processes
{plugins, Plugins},
@@ -124,7 +125,7 @@ binary() ->
sets:add_element({Ptr, Sz}, Acc0)
end, Acc, Info)
end, distinguishers(), [{binary, sets:new()}]),
- [Other, Qs, QsSlave, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther,
+ [Other, Qs, QsSlave, Qqs, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther,
MsgIndexProc, MgmtDbProc, Plugins] =
[aggregate(Names, [{other, Rest} | Sums], binary, fun sum_binary/1)
|| Names <- [[other] | distinguished_interesting_sups()]],
@@ -134,6 +135,7 @@ binary() ->
{connection_other, ConnsOther},
{queue_procs, Qs},
{queue_slave_procs, QsSlave},
+ {quorum_queue_procs, Qqs},
{plugins, Plugins},
{mgmt_db, MgmtDbProc},
{msg_index, MsgIndexProc},
@@ -173,11 +175,16 @@ bytes(Words) -> try
end.
interesting_sups() ->
- [queue_sups(), conn_sups() | interesting_sups0()].
+ [queue_sups(), quorum_sups(), conn_sups() | interesting_sups0()].
queue_sups() ->
all_vhosts_children(rabbit_amqqueue_sup_sup).
+quorum_sups() ->
+ %% TODO: in the future not all ra servers may be queues and we needs
+ %% some way to filter this
+ [ra_server_sup].
+
msg_stores() ->
all_vhosts_children(msg_store_transient)
++
@@ -229,6 +236,7 @@ distinguished_interesting_sups() ->
[
with(queue_sups(), master),
with(queue_sups(), slave),
+ quorum_sups(),
with(conn_sups(), reader),
with(conn_sups(), writer),
with(conn_sups(), channel),