summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/credit_flow.erl4
-rw-r--r--src/rabbit_amqqueue.erl21
-rw-r--r--src/rabbit_amqqueue_process.erl63
-rw-r--r--src/rabbit_basic.erl2
-rw-r--r--src/rabbit_channel.erl146
-rw-r--r--src/rabbit_exchange_type_topic.erl3
-rw-r--r--src/rabbit_guid.erl81
-rw-r--r--src/rabbit_misc.erl27
-rw-r--r--src/rabbit_nodes.erl6
-rw-r--r--src/rabbit_reader.erl45
-rw-r--r--src/rabbit_tests.erl16
-rw-r--r--src/rabbit_variable_queue.erl5
-rw-r--r--src/rabbit_writer.erl3
13 files changed, 248 insertions, 174 deletions
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index bdcd892a9c..072f4d9dc5 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -38,7 +38,7 @@
-ifdef(use_specs).
-opaque(bump_msg() :: {pid(), non_neg_integer()}).
--opaque(credit_spec() :: {non_neg_integer(), non_neg_integer()}).
+-type(credit_spec() :: {non_neg_integer(), non_neg_integer()}).
-spec(send/1 :: (pid()) -> 'ok').
-spec(send/2 :: (pid(), credit_spec()) -> 'ok').
@@ -124,4 +124,4 @@ get(Key, Default) ->
Value -> Value
end.
-update(Key, Default, Fun) -> put(Key, Fun(get(Key, Default))).
+update(Key, Default, Fun) -> put(Key, Fun(get(Key, Default))), ok.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index fb4540a30b..a7dfd535c8 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -25,7 +25,7 @@
-export([force_event_refresh/0]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
--export([notify_sent/2, unblock/2, flush_all/2]).
+-export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]).
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-export([store_queue/1]).
@@ -40,6 +40,8 @@
-define(INTEGER_ARG_TYPES, [byte, short, signedint, long]).
+-define(MORE_CONSUMER_CREDIT_AFTER, 50).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -137,6 +139,7 @@
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
+-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
-spec(internal_delete/1 ::
@@ -461,7 +464,21 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
notify_sent(QPid, ChPid) ->
- gen_server2:cast(QPid, {notify_sent, ChPid}).
+ Key = {consumer_credit_to, QPid},
+ put(Key, case get(Key) of
+ 1 -> gen_server2:cast(
+ QPid, {notify_sent, ChPid,
+ ?MORE_CONSUMER_CREDIT_AFTER}),
+ ?MORE_CONSUMER_CREDIT_AFTER;
+ undefined -> erlang:monitor(process, QPid),
+ ?MORE_CONSUMER_CREDIT_AFTER - 1;
+ C -> C - 1
+ end),
+ ok.
+
+notify_sent_queue_down(QPid) ->
+ erase({consumer_credit_to, QPid}),
+ ok.
unblock(QPid, ChPid) ->
delegate_cast(QPid, {unblock, ChPid}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c492151085..b3a620fae6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -20,7 +20,7 @@
-behaviour(gen_server2).
--define(UNSENT_MESSAGE_LIMIT, 100).
+-define(UNSENT_MESSAGE_LIMIT, 200).
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
@@ -388,34 +388,32 @@ ch_record_state_transition(OldCR, NewCR) ->
{_, _} -> ok
end.
-deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
+deliver_msgs_to_consumers(_DeliverFun, true, State) ->
+ {true, State};
+deliver_msgs_to_consumers(DeliverFun, false,
State = #q{active_consumers = ActiveConsumers}) ->
- case PredFun(FunAcc, State) of
- false -> {FunAcc, State};
- true -> case queue:out(ActiveConsumers) of
- {empty, _} ->
- {FunAcc, State};
- {{value, QEntry}, Tail} ->
- {FunAcc1, State1} =
- deliver_msg_to_consumer(
+ case queue:out(ActiveConsumers) of
+ {empty, _} ->
+ {false, State};
+ {{value, QEntry}, Tail} ->
+ {Stop, State1} = deliver_msg_to_consumer(
DeliverFun, QEntry,
- FunAcc, State#q{active_consumers = Tail}),
- deliver_msgs_to_consumers(Funs, FunAcc1, State1)
- end
+ State#q{active_consumers = Tail}),
+ deliver_msgs_to_consumers(DeliverFun, Stop, State1)
end.
-deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, FunAcc, State) ->
+deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
C = ch_record(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
- {FunAcc, State};
+ {false, State};
false -> case rabbit_limiter:can_send(C#cr.limiter, self(),
Consumer#consumer.ack_required) of
false -> block_consumer(C#cr{is_limit_active = true}, E),
- {FunAcc, State};
+ {false, State};
true -> AC1 = queue:in(E, State#q.active_consumers),
deliver_msg_to_consumer(
- DeliverFun, Consumer, C, FunAcc,
+ DeliverFun, Consumer, C,
State#q{active_consumers = AC1})
end
end.
@@ -426,9 +424,9 @@ deliver_msg_to_consumer(DeliverFun,
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
unsent_message_count = Count},
- FunAcc, State = #q{q = #amqqueue{name = QName}}) ->
- {{Message, IsDelivered, AckTag}, FunAcc1, State1} =
- DeliverFun(AckRequired, FunAcc, State),
+ State = #q{q = #amqqueue{name = QName}}) ->
+ {{Message, IsDelivered, AckTag}, Stop, State1} =
+ DeliverFun(AckRequired, State),
rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
@@ -437,11 +435,9 @@ deliver_msg_to_consumer(DeliverFun,
end,
update_ch_record(C#cr{acktags = ChAckTags1,
unsent_message_count = Count + 1}),
- {FunAcc1, State1}.
-
-deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty.
+ {Stop, State1}.
-deliver_from_queue_deliver(AckRequired, false, State) ->
+deliver_from_queue_deliver(AckRequired, State) ->
{{Message, IsDelivered, AckTag, Remaining}, State1} =
fetch(AckRequired, State),
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
@@ -485,12 +481,11 @@ maybe_record_confirm_message(_Confirm, State) ->
State.
run_message_queue(State) ->
- Funs = {fun deliver_from_queue_pred/2,
- fun deliver_from_queue_deliver/3},
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
drop_expired_messages(State),
- IsEmpty = BQ:is_empty(BQS),
- {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1),
+ {_IsEmpty1, State2} = deliver_msgs_to_consumers(
+ fun deliver_from_queue_deliver/2,
+ BQ:is_empty(BQS), State1),
State2.
attempt_delivery(Delivery = #delivery{sender = ChPid,
@@ -504,10 +499,8 @@ attempt_delivery(Delivery = #delivery{sender = ChPid,
end,
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
- PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
- fun (AckRequired, false,
- State1 = #q{backing_queue_state = BQS2}) ->
+ fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
%% we don't need an expiry here because
%% messages are not being enqueued, so we use
%% an empty message_properties.
@@ -521,7 +514,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid,
State1#q{backing_queue_state = BQS3}}
end,
{Delivered, State2} =
- deliver_msgs_to_consumers({ PredFun, DeliverFun }, false,
+ deliver_msgs_to_consumers(DeliverFun, false,
State#q{backing_queue_state = BQS1}),
{Delivered, Confirm, State2};
{Duplicate, BQS1} ->
@@ -830,7 +823,7 @@ prioritise_cast(Msg, _State) ->
{set_maximum_since_use, _Age} -> 8;
{ack, _AckTags, _ChPid} -> 7;
{reject, _AckTags, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid} -> 7;
+ {notify_sent, _ChPid, _Credit} -> 7;
{unblock, _ChPid} -> 7;
{run_backing_queue, _Mod, _Fun} -> 6;
_ -> 0
@@ -1064,11 +1057,11 @@ handle_cast({unblock, ChPid}, State) ->
possibly_unblock(State, ChPid,
fun (C) -> C#cr{is_limit_active = false} end));
-handle_cast({notify_sent, ChPid}, State) ->
+handle_cast({notify_sent, ChPid, Credit}, State) ->
noreply(
possibly_unblock(State, ChPid,
fun (C = #cr{unsent_message_count = Count}) ->
- C#cr{unsent_message_count = Count - 1}
+ C#cr{unsent_message_count = Count - Credit}
end));
handle_cast({limit, ChPid, Limiter}, State) ->
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index e645a9ee55..b8211d4332 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -139,7 +139,7 @@ message(XName, RoutingKey, #content{properties = Props} = DecodedContent) ->
{ok, #basic_message{
exchange_name = XName,
content = strip_header(DecodedContent, ?DELETED_HEADER),
- id = rabbit_guid:guid(),
+ id = rabbit_guid:gen(),
is_persistent = is_message_persistent(DecodedContent),
routing_keys = [RoutingKey |
header_routes(Props#'P_basic'.headers)]}}
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f17f98cad0..a101886f26 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -33,9 +33,9 @@
-export([list_local/0]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- limiter, tx_status, next_tag,
- unacked_message_q, uncommitted_message_q, uncommitted_acks,
- user, virtual_host, most_recently_declared_queue, queue_monitors,
+ limiter, tx_status, next_tag, unacked_message_q,
+ uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
+ virtual_host, most_recently_declared_queue, queue_monitors,
consumer_mapping, blocking, queue_consumers, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
unconfirmed_qm, confirmed, capabilities, trace_state}).
@@ -191,6 +191,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
unacked_message_q = queue:new(),
uncommitted_message_q = queue:new(),
uncommitted_acks = [],
+ uncommitted_nacks = [],
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
@@ -295,29 +296,19 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
handle_cast({deliver, ConsumerTag, AckRequired,
Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_keys = [RoutingKey | _CcRoutes],
- content = Content}}},
- State = #ch{writer_pid = WriterPid,
- next_tag = DeliveryTag,
- trace_state = TraceState}) ->
- State1 = lock_message(AckRequired,
- ack_record(DeliveryTag, ConsumerTag, Msg),
- State),
-
- M = #'basic.deliver'{consumer_tag = ConsumerTag,
- delivery_tag = DeliveryTag,
- redelivered = Redelivered,
- exchange = ExchangeName#resource.name,
- routing_key = RoutingKey},
- rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content),
- maybe_incr_stats([{QPid, 1}], case AckRequired of
- true -> deliver;
- false -> deliver_no_ack
- end, State1),
- maybe_incr_redeliver_stats(Redelivered, QPid, State1),
- rabbit_trace:tap_trace_out(Msg, TraceState),
- noreply(State1#ch{next_tag = DeliveryTag + 1});
-
+ routing_keys = [RoutingKey | _CcRoutes],
+ content = Content}}},
+ State = #ch{writer_pid = WriterPid,
+ next_tag = DeliveryTag}) ->
+ ok = rabbit_writer:send_command_and_notify(
+ WriterPid, QPid, self(),
+ #'basic.deliver'{consumer_tag = ConsumerTag,
+ delivery_tag = DeliveryTag,
+ redelivered = Redelivered,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey},
+ Content),
+ noreply(record_sent(ConsumerTag, AckRequired, Msg, State));
handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
@@ -695,38 +686,28 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
- _, State = #ch{writer_pid = WriterPid,
- conn_pid = ConnPid,
- next_tag = DeliveryTag,
- trace_state = TraceState}) ->
+ _, State = #ch{writer_pid = WriterPid,
+ conn_pid = ConnPid,
+ next_tag = DeliveryTag}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, QPid, _MsgId, Redelivered,
+ Msg = {_QName, _QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_keys = [RoutingKey | _CcRoutes],
- content = Content}}} ->
- State1 = lock_message(not(NoAck),
- ack_record(DeliveryTag, none, Msg),
- State),
- maybe_incr_stats([{QPid, 1}], case NoAck of
- true -> get_no_ack;
- false -> get
- end, State1),
- maybe_incr_redeliver_stats(Redelivered, QPid, State1),
- rabbit_trace:tap_trace_out(Msg, TraceState),
+ routing_keys = [RoutingKey | _CcRoutes],
+ content = Content}}} ->
ok = rabbit_writer:send_command(
WriterPid,
- #'basic.get_ok'{delivery_tag = DeliveryTag,
- redelivered = Redelivered,
- exchange = ExchangeName#resource.name,
- routing_key = RoutingKey,
+ #'basic.get_ok'{delivery_tag = DeliveryTag,
+ redelivered = Redelivered,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey,
message_count = MessageCount},
Content),
- {noreply, State1#ch{next_tag = DeliveryTag + 1}};
+ {noreply, record_sent(none, not(NoAck), Msg, State)};
empty ->
{reply, #'basic.get_empty'{}, State}
end;
@@ -746,7 +727,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
check_read_permitted(QueueName, State),
ActualConsumerTag =
case ConsumerTag of
- <<>> -> rabbit_guid:binstring_guid("amq.ctag");
+ <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
+ "amq.ctag");
Other -> Other
end,
@@ -975,7 +957,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
false -> none
end,
ActualNameBin = case QueueNameBin of
- <<>> -> rabbit_guid:binstring_guid("amq.gen");
+ <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
+ "amq.gen");
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
@@ -1083,10 +1066,15 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
-handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
- uncommitted_acks = TAL}) ->
+handle_method(#'tx.commit'{}, _,
+ State = #ch{uncommitted_message_q = TMQ,
+ uncommitted_acks = TAL,
+ uncommitted_nacks = TNL,
+ limiter = Limiter}) ->
State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
ack(TAL, State1),
+ lists:foreach(
+ fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, TNL),
{noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))};
handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
@@ -1094,8 +1082,10 @@ handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
precondition_failed, "channel is not transactional", []);
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- uncommitted_acks = TAL}) ->
- UAMQ1 = queue:from_list(lists:usort(TAL ++ queue:to_list(UAMQ))),
+ uncommitted_acks = TAL,
+ uncommitted_nacks = TNL}) ->
+ TNL1 = lists:append([L || {_, L} <- TNL]),
+ UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))),
{reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})};
handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
@@ -1259,18 +1249,46 @@ basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey},
Content).
-reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) ->
+reject(DeliveryTag, Requeue, Multiple,
+ State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
+ State1 = State#ch{unacked_message_q = Remaining},
+ {noreply,
+ case TxStatus of
+ none ->
+ reject(Requeue, Acked, State1#ch.limiter),
+ State1;
+ in_progress ->
+ State1#ch{uncommitted_nacks =
+ [{Requeue, Acked} | State1#ch.uncommitted_nacks]}
+ end}.
+
+reject(Requeue, Acked, Limiter) ->
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
end, ok, Acked),
- ok = notify_limiter(State#ch.limiter, Acked),
- {noreply, State#ch{unacked_message_q = Remaining}}.
-
-ack_record(DeliveryTag, ConsumerTag,
- _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) ->
- {DeliveryTag, ConsumerTag, {QPid, MsgId}}.
+ ok = notify_limiter(Limiter, Acked).
+
+record_sent(ConsumerTag, AckRequired,
+ Msg = {_QName, QPid, MsgId, Redelivered, _Message},
+ State = #ch{unacked_message_q = UAMQ,
+ next_tag = DeliveryTag,
+ trace_state = TraceState}) ->
+ maybe_incr_stats([{QPid, 1}], case {ConsumerTag, AckRequired} of
+ {none, true} -> get;
+ {none, false} -> get_no_ack;
+ {_ , true} -> deliver;
+ {_ , false} -> deliver_no_ack
+ end, State),
+ maybe_incr_redeliver_stats(Redelivered, QPid, State),
+ rabbit_trace:tap_trace_out(Msg, TraceState),
+ UAMQ1 = case AckRequired of
+ true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}},
+ UAMQ);
+ false -> UAMQ
+ end,
+ State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
collect_acks(Q, 0, true) ->
{queue:to_list(Q), queue:new()};
@@ -1305,7 +1323,8 @@ ack(Acked, State) ->
maybe_incr_stats(QIncs, ack, State).
new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
- uncommitted_acks = []}.
+ uncommitted_acks = [],
+ uncommitted_nacks = []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -1395,11 +1414,6 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
end
end, State#ch{unconfirmed_mq = UMQ1}, QPids).
-lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
- State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
-lock_message(false, _MsgStruct, State) ->
- State.
-
send_nacks([], State) ->
State;
send_nacks(MXs, State = #ch{tx_status = none}) ->
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 3ac6ae7438..84f4f8a9de 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -250,7 +250,7 @@ remove_all(Table, Pattern) ->
mnesia:match_object(Table, Pattern, write)).
new_node_id() ->
- rabbit_guid:guid().
+ rabbit_guid:gen().
split_topic_key(Key) ->
split_topic_key(Key, [], []).
@@ -263,4 +263,3 @@ split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) ->
split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]);
split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) ->
split_topic_key(Rest, [C | RevWordAcc], RevResAcc).
-
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 70772ccd2c..f4c425ca20 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -19,7 +19,7 @@
-behaviour(gen_server).
-export([start_link/0]).
--export([guid/0, string_guid/1, binstring_guid/1]).
+-export([gen/0, gen_secure/0, string/2, binary/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
@@ -38,9 +38,10 @@
-type(guid() :: binary()).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(guid/0 :: () -> guid()).
--spec(string_guid/1 :: (any()) -> string()).
--spec(binstring_guid/1 :: (any()) -> binary()).
+-spec(gen/0 :: () -> guid()).
+-spec(gen_secure/0 :: () -> guid()).
+-spec(string/2 :: (guid(), any()) -> string()).
+-spec(binary/2 :: (guid(), any()) -> binary()).
-endif.
@@ -65,11 +66,8 @@ update_disk_serial() ->
end,
Serial.
-%% generate a GUID.
-%%
-%% The id is only unique within a single cluster and as long as the
-%% serial store hasn't been deleted.
-guid() ->
+%% Generate an un-hashed guid.
+fresh() ->
%% We don't use erlang:now() here because a) it may return
%% duplicates when the system clock has been rewound prior to a
%% restart, or ids were generated at a high rate (which causes
@@ -78,29 +76,74 @@ guid() ->
%%
%% A persisted serial number, the node, and a unique reference
%% (per node incarnation) uniquely identifies a process in space
- %% and time. We combine that with a process-local counter to give
- %% us a GUID.
- G = case get(guid) of
- undefined -> Serial = gen_server:call(?SERVER, serial, infinity),
- {{Serial, node(), make_ref()}, 0};
+ %% and time.
+ Serial = gen_server:call(?SERVER, serial, infinity),
+ {Serial, node(), make_ref()}.
+
+advance_blocks({B1, B2, B3, B4}, I) ->
+ %% To produce a new set of blocks, we create a new 32bit block
+ %% hashing {B5, I}. The new hash is used as last block, and the
+ %% other three blocks are XORed with it.
+ %%
+ %% Doing this is convenient because it avoids cascading conflits,
+ %% while being very fast. The conflicts are avoided by propagating
+ %% the changes through all the blocks at each round by XORing, so
+ %% the only occasion in which a collision will take place is when
+ %% all 4 blocks are the same and the counter is the same.
+ %%
+ %% The range (2^32) is provided explicitly since phash uses 2^27
+ %% by default.
+ B5 = erlang:phash2({B1, I}, 4294967296),
+ {{(B2 bxor B5), (B3 bxor B5), (B4 bxor B5), B5}, I+1}.
+
+blocks_to_binary({B1, B2, B3, B4}) -> <<B1:32, B2:32, B3:32, B4:32>>.
+
+%% generate a GUID. This function should be used when performance is a
+%% priority and predictability is not an issue. Otherwise use
+%% gen_secure/0.
+gen() ->
+ %% We hash a fresh GUID with md5, split it in 4 blocks, and each
+ %% time we need a new guid we rotate them producing a new hash
+ %% with the aid of the counter. Look at the comments in
+ %% advance_blocks/2 for details.
+ {BS, I} = case get(guid) of
+ undefined -> <<B1:32, B2:32, B3:32, B4:32>> =
+ erlang:md5(term_to_binary(fresh())),
+ {{B1,B2,B3,B4}, 0};
+ {BS0, I0} -> advance_blocks(BS0, I0)
+ end,
+ put(guid, {BS, I}),
+ blocks_to_binary(BS).
+
+%% generate a non-predictable GUID.
+%%
+%% The id is only unique within a single cluster and as long as the
+%% serial store hasn't been deleted.
+%%
+%% If you are not concerned with predictability, gen/0 is faster.
+gen_secure() ->
+ %% Here instead of hashing once we hash the GUID and the counter
+ %% each time, so that the GUID is not predictable.
+ G = case get(guid_secure) of
+ undefined -> {fresh(), 0};
{S, I} -> {S, I+1}
end,
- put(guid, G),
+ put(guid_secure, G),
erlang:md5(term_to_binary(G)).
%% generate a readable string representation of a GUID.
%%
%% employs base64url encoding, which is safer in more contexts than
%% plain base64.
-string_guid(Prefix) ->
+string(G, Prefix) ->
Prefix ++ "-" ++ lists:foldl(fun ($\+, Acc) -> [$\- | Acc];
($\/, Acc) -> [$\_ | Acc];
($\=, Acc) -> Acc;
(Chr, Acc) -> [Chr | Acc]
- end, [], base64:encode_to_string(guid())).
+ end, [], base64:encode_to_string(G)).
-binstring_guid(Prefix) ->
- list_to_binary(string_guid(Prefix)).
+binary(G, Prefix) ->
+ list_to_binary(string(G, Prefix)).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 798c21e6e8..9a6879b13d 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -152,7 +152,7 @@
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom())
-> 'ok' | 'aborted').
-spec(dirty_dump_log/1 :: (file:filename()) -> ok_or_error()).
--spec(format/2 :: (string(), [any()]) -> 'ok').
+-spec(format/2 :: (string(), [any()]) -> string()).
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(with_local_io/1 :: (fun (() -> A)) -> A).
-spec(local_info_msg/2 :: (string(), [any()]) -> 'ok').
@@ -414,16 +414,25 @@ execute_mnesia_transaction(TxFun) ->
%% Making this a sync_transaction allows us to use dirty_read
%% elsewhere and get a consistent result even when that read
%% executes on a different node.
- case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of
- {atomic, Result} -> case mnesia:is_transaction() of
- true -> ok;
- false -> mnesia_sync:sync()
- end,
- Result;
- {aborted, Reason} -> throw({error, Reason})
+ case worker_pool:submit(
+ fun () ->
+ case mnesia:is_transaction() of
+ false -> DiskLogBefore = mnesia_dumper:get_log_writes(),
+ Res = mnesia:sync_transaction(TxFun),
+ DiskLogAfter = mnesia_dumper:get_log_writes(),
+ case DiskLogAfter == DiskLogBefore of
+ true -> Res;
+ false -> {sync, Res}
+ end;
+ true -> mnesia:sync_transaction(TxFun)
+ end
+ end) of
+ {sync, {atomic, Result}} -> mnesia_sync:sync(), Result;
+ {sync, {aborted, Reason}} -> throw({error, Reason});
+ {atomic, Result} -> Result;
+ {aborted, Reason} -> throw({error, Reason})
end.
-
%% Like execute_mnesia_transaction/1 with additional Pre- and Post-
%% commit function
execute_mnesia_transaction(TxFun, PrePostCommitFun) ->
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index ba695b8e78..329c07dc9f 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -78,10 +78,8 @@ diagnostics_host(Host) ->
{Name, Port} <- NamePorts]]}
end.
-make({Prefix, Suffix}) ->
- list_to_atom(lists:append([Prefix, "@", Suffix]));
-make(NodeStr) ->
- make(parts(NodeStr)).
+make({Prefix, Suffix}) -> list_to_atom(lists:append([Prefix, "@", Suffix]));
+make(NodeStr) -> make(parts(NodeStr)).
parts(Node) when is_atom(Node) ->
parts(atom_to_list(Node));
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 1602cc2b83..908a279c0c 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -499,23 +499,21 @@ handle_frame(Type, Channel, Payload,
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- AnalyzedFrame ->
- case get({channel, Channel}) of
- {ChPid, AState} ->
- NewAState = process_channel_frame(
- AnalyzedFrame, Channel, ChPid, AState),
- put({channel, Channel}, {ChPid, NewAState}),
- post_process_frame(AnalyzedFrame, ChPid,
- control_throttle(State));
- undefined ->
- case ?IS_RUNNING(State) of
- true -> send_to_new_channel(
- Channel, AnalyzedFrame, State);
- false -> throw({channel_frame_while_starting,
- Channel, State#v1.connection_state,
- AnalyzedFrame})
- end
- end
+ AnalyzedFrame -> process_frame(AnalyzedFrame, Channel, State)
+ end.
+
+process_frame(Frame, Channel, State) ->
+ case get({channel, Channel}) of
+ {ChPid, AState} ->
+ NewAState = process_channel_frame(Frame, Channel, ChPid, AState),
+ put({channel, Channel}, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ undefined when ?IS_RUNNING(State) ->
+ ok = create_channel(Channel, State),
+ process_frame(Frame, Channel, State);
+ undefined ->
+ throw({channel_frame_while_starting,
+ Channel, State#v1.connection_state, Frame})
end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
@@ -526,11 +524,11 @@ post_process_frame({method, MethodName, _}, _ChPid,
protocol = Protocol}}) ->
case Protocol:method_has_content(MethodName) of
true -> erlang:bump_reductions(2000),
- maybe_block(State);
- false -> State
+ maybe_block(control_throttle(State));
+ false -> control_throttle(State)
end;
post_process_frame(_Frame, _ChPid, State) ->
- State.
+ control_throttle(State).
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
ensure_stats_timer(
@@ -895,7 +893,7 @@ cert_info(F, Sock) ->
%%--------------------------------------------------------------------------
-send_to_new_channel(Channel, AnalyzedFrame, State) ->
+create_channel(Channel, State) ->
#v1{sock = Sock, queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
connection = #connection{protocol = Protocol,
@@ -908,10 +906,9 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User,
VHost, Capabilities, Collector}),
MRef = erlang:monitor(process, ChPid),
- NewAState = process_channel_frame(AnalyzedFrame, Channel, ChPid, AState),
- put({channel, Channel}, {ChPid, NewAState}),
put({ch_pid, ChPid}, {Channel, MRef}),
- State.
+ put({channel, Channel}, {ChPid, AState}),
+ ok.
process_channel_frame(Frame, Channel, ChPid, AState) ->
case rabbit_command_assembler:process(Frame, AState) of
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index c645a4364b..7a96af26d9 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1786,10 +1786,10 @@ test_msg_store() ->
restart_msg_store_empty(),
MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)],
{MsgIds1stHalf, MsgIds2ndHalf} = lists:split(length(MsgIds) div 2, MsgIds),
- Ref = rabbit_guid:guid(),
+ Ref = rabbit_guid:gen(),
{Cap, MSCState} = msg_store_client_init_capture(
?PERSISTENT_MSG_STORE, Ref),
- Ref2 = rabbit_guid:guid(),
+ Ref2 = rabbit_guid:gen(),
{Cap2, MSC2State} = msg_store_client_init_capture(
?PERSISTENT_MSG_STORE, Ref2),
%% check we don't contain any of the msgs we're about to publish
@@ -1941,7 +1941,7 @@ test_msg_store_confirms(MsgIds, Cap, MSCState) ->
passed.
test_msg_store_confirm_timer() ->
- Ref = rabbit_guid:guid(),
+ Ref = rabbit_guid:gen(),
MsgId = msg_id_bin(1),
Self = self(),
MSCState = rabbit_msg_store:client_init(
@@ -1970,7 +1970,7 @@ msg_store_keep_busy_until_confirm(MsgIds, MSCState) ->
test_msg_store_client_delete_and_terminate() ->
restart_msg_store_empty(),
MsgIds = [msg_id_bin(M) || M <- lists:seq(1, 10)],
- Ref = rabbit_guid:guid(),
+ Ref = rabbit_guid:gen(),
MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
ok = msg_store_write(MsgIds, MSCState),
%% test the 'dying client' fast path for writes
@@ -1986,7 +1986,7 @@ test_queue() ->
init_test_queue() ->
TestQueue = test_queue(),
Terms = rabbit_queue_index:shutdown_terms(TestQueue),
- PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()),
+ PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:gen()),
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef),
Res = rabbit_queue_index:recover(
TestQueue, Terms, false,
@@ -2020,7 +2020,7 @@ restart_app() ->
rabbit:start().
queue_index_publish(SeqIds, Persistent, Qi) ->
- Ref = rabbit_guid:guid(),
+ Ref = rabbit_guid:gen(),
MsgStore = case Persistent of
true -> ?PERSISTENT_MSG_STORE;
false -> ?TRANSIENT_MSG_STORE
@@ -2029,7 +2029,7 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
{A, B = [{_SeqId, LastMsgIdWritten} | _]} =
lists:foldl(
fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) ->
- MsgId = rabbit_guid:guid(),
+ MsgId = rabbit_guid:gen(),
QiM = rabbit_queue_index:publish(
MsgId, SeqId, #message_properties{}, Persistent, QiN),
ok = rabbit_msg_store:write(MsgId, MsgId, MSCState),
@@ -2052,7 +2052,7 @@ verify_read_with_published(_Delivered, _Persistent, _Read, _Published) ->
test_queue_index_props() ->
with_empty_test_queue(
fun(Qi0) ->
- MsgId = rabbit_guid:guid(),
+ MsgId = rabbit_guid:gen(),
Props = #message_properties{expiry=12345},
Qi1 = rabbit_queue_index:publish(MsgId, 1, Props, true, Qi0),
{[{MsgId, 1, Props, _, _}], Qi2} =
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ea7f0c786c..52eb168a42 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -434,7 +434,7 @@ init(#amqqueue { name = QueueName, durable = true }, true,
Terms = rabbit_queue_index:shutdown_terms(QueueName),
{PRef, Terms1} =
case proplists:get_value(persistent_ref, Terms) of
- undefined -> {rabbit_guid:guid(), []};
+ undefined -> {rabbit_guid:gen(), []};
PRef1 -> {PRef1, Terms}
end,
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
@@ -860,7 +860,8 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
Res.
msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
- msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun, Callback).
+ msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun,
+ Callback).
msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 269128df17..dc74b2f5b0 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -129,6 +129,9 @@ handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
ok = internal_send_command_async(MethodRecord, Content, State),
rabbit_amqqueue:notify_sent(QPid, ChPid),
State;
+handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
+ rabbit_amqqueue:notify_sent_queue_down(QPid),
+ State;
handle_message({inet_reply, _, ok}, State) ->
State;
handle_message({inet_reply, _, Status}, _State) ->