summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-21 17:23:19 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-21 17:23:19 +0000
commit38e5b687de76739b5419c1f0f6ddf0d8262ea16e (patch)
tree1e8668f4f740f1e34f2baa20d038f910e7948fdc
parent4735326a076ac5f00d11118bd223a6079b7262e7 (diff)
downloadrabbitmq-server-git-38e5b687de76739b5419c1f0f6ddf0d8262ea16e.tar.gz
Quick-and-dirty version of pipelined mandatory publishing
-rw-r--r--src/rabbit_amqqueue.erl21
-rw-r--r--src/rabbit_amqqueue_process.erl14
-rw-r--r--src/rabbit_basic.erl4
-rw-r--r--src/rabbit_channel.erl75
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_trace.erl6
6 files changed, 73 insertions, 49 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 282113a419..0fd945392f 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -703,11 +703,12 @@ pseudo_queue(QueueName, Pid) ->
pid = Pid,
slave_pids = []}.
-deliver([], #delivery{mandatory = false}, _Flow) ->
+deliver([], _Delivery, _Flow) ->
%% /dev/null optimisation
- {routed, []};
+ [];
-deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) ->
+deliver(Qs, Delivery, Flow) ->
+ %% TODO simplify?
%% optimisation: when Mandatory = false, rabbit_amqqueue:deliver
%% will deliver the message to the queue process asynchronously,
%% and return true, which means all the QPids will always be
@@ -730,19 +731,7 @@ deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) ->
SMsg = {deliver, Delivery, true, Flow},
delegate:cast(MPids, MMsg),
delegate:cast(SPids, SMsg),
- {routed, QPids};
-
-deliver(Qs, Delivery, _Flow) ->
- {MPids, SPids} = qpids(Qs),
- %% see comment above
- MMsg = {deliver, Delivery, false},
- SMsg = {deliver, Delivery, true},
- {MRouted, _} = delegate:call(MPids, MMsg),
- {SRouted, _} = delegate:call(SPids, SMsg),
- case MRouted ++ SRouted of
- [] -> {unroutable, []};
- R -> {routed, [QPid || {QPid, ok} <- R]}
- end.
+ QPids.
qpids([]) -> {[], []}; %% optimisation
qpids([#amqqueue{pid = QPid, slave_pids = SPids}]) -> {[QPid], SPids}; %% opt
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 08509c96cc..44754788d9 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -441,6 +441,13 @@ send_or_record_confirm(#delivery{sender = SenderPid,
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
{immediately, State}.
+send_mandatory(#delivery{mandatory = false}) ->
+ ok;
+send_mandatory(#delivery{mandatory = true,
+ sender = SenderPid,
+ msg_seq_no = MsgSeqNo}) ->
+ gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo, self()}).
+
discard(#delivery{sender = SenderPid,
msg_seq_no = MsgSeqNo,
message = #basic_message{id = MsgId}}, State) ->
@@ -496,6 +503,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
+ send_mandatory(Delivery),
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State),
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
@@ -884,11 +892,6 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State = #q{consumers = Consumers}) ->
reply(rabbit_queue_consumers:all(Consumers), State);
-handle_call({deliver, Delivery, Delivered}, From, State) ->
- %% Synchronous, "mandatory" deliver mode.
- gen_server2:reply(From, ok),
- noreply(deliver_or_enqueue(Delivery, Delivered, State));
-
handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
%% are no longer visible by the time we send a response to the
@@ -1041,7 +1044,6 @@ handle_cast({run_backing_queue, Mod, Fun},
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
State = #q{senders = Senders}) ->
- %% Asynchronous, non-"mandatory" deliver mode.
Senders1 = case Flow of
flow -> credit_flow:ack(Sender),
pmon:monitor(Sender, Senders);
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 3d70be4bc3..3e94486787 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -107,8 +107,8 @@ publish(Delivery = #delivery{
publish(X, Delivery) ->
Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)),
- {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver(Qs, Delivery),
- {ok, RoutingRes, DeliveredQPids}.
+ DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery),
+ {ok, DeliveredQPids}.
delivery(Mandatory, Message, MsgSeqNo) ->
#delivery{mandatory = Mandatory, sender = self(),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b072941964..dba826fc08 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -39,7 +39,7 @@
queue_names, queue_monitors, consumer_mapping,
blocking, queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
- unconfirmed, confirmed, capabilities, trace_state}).
+ unconfirmed, confirmed, mandatory, capabilities, trace_state}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -221,6 +221,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
publish_seqno = 1,
unconfirmed = dtree:empty(),
confirmed = [],
+ mandatory = dtree:empty(),
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost)},
State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer),
@@ -239,8 +240,9 @@ prioritise_call(Msg, _From, _Len, _State) ->
prioritise_cast(Msg, _Len, _State) ->
case Msg of
- {confirm, _MsgSeqNos, _QPid} -> 5;
- _ -> 0
+ {confirm, _MsgSeqNos, _QPid} -> 5;
+ {mandatory_received, _MsgSeqNo, _QPid} -> 5;
+ _ -> 0
end.
prioritise_info(Msg, _Len, _State) ->
@@ -346,6 +348,13 @@ handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
noreply(State);
+%% TODO duplication?
+handle_cast({mandatory_received, MsgSeqNo, From}, State) ->
+ State1 = #ch{mandatory = M} = handle_mandatory(MsgSeqNo, From, State),
+ Timeout = case M of [] -> hibernate; _ -> 0 end,
+ %% NB: don't call noreply/1 since we don't want to send confirms.
+ {noreply, ensure_stats_timer(State1), Timeout};
+
handle_cast({confirm, MsgSeqNos, From}, State) ->
State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
Timeout = case C of [] -> hibernate; _ -> 0 end,
@@ -623,6 +632,10 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
{MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
record_confirms(MXs, State#ch{unconfirmed = UC1}).
+handle_mandatory(MsgSeqNo, QPid, State = #ch{mandatory = UC}) ->
+ {_MXs, UC1} = dtree:take([MsgSeqNo], QPid, UC),
+ State#ch{mandatory = UC1}.
+
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
%% Don't leave "starting" as the state for 5s. TODO is this TRTTD?
State1 = State#ch{state = running},
@@ -696,7 +709,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_user_id_header(Props, State),
check_expiration_header(Props),
{MsgSeqNo, State1} =
- case {Tx, ConfirmEnabled} of
+ case {Tx, ConfirmEnabled orelse Mandatory} of
{none, false} -> {undefined, State};
{_, _} -> SeqNo = State#ch.publish_seqno,
{SeqNo, State#ch{publish_seqno = SeqNo + 1}}
@@ -1251,12 +1264,19 @@ monitor_delivering_queue(NoAck, QPid, QName,
false -> sets:add_element(QPid, DQ)
end}.
-handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
+handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
+ mandatory = Mand}) ->
+ %% TODO do we need take_all here?
+ {MMsgs, Mand1} = dtree:take(QPid, Mand),
+ io:format("returning ~p~n", [MMsgs]),
+ [basic_return(Msg, State, no_route) || {_, Msg} <- MMsgs],
+ State1 = State#ch{mandatory = Mand1},
case rabbit_misc:is_abnormal_exit(Reason) of
true -> {MXs, UC1} = dtree:take_all(QPid, UC),
- send_nacks(MXs, State#ch{unconfirmed = UC1});
+ send_nacks(MXs, State1#ch{unconfirmed = UC1});
false -> {MXs, UC1} = dtree:take(QPid, UC),
- record_confirms(MXs, State#ch{unconfirmed = UC1})
+ record_confirms(MXs, State1#ch{unconfirmed = UC1})
+
end.
handle_consuming_queue_down(QPid,
@@ -1500,11 +1520,12 @@ deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
State;
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
+ mandatory = Mandatory,
msg_seq_no = MsgSeqNo},
DelQNames}, State = #ch{queue_names = QNames,
queue_monitors = QMons}) ->
Qs = rabbit_amqqueue:lookup(DelQNames),
- {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(Qs, Delivery),
+ DeliveredQPids = rabbit_amqqueue:deliver_flow(Qs, Delivery),
%% The pmon:monitor_all/2 monitors all queues to which we
%% delivered. But we want to monitor even queues we didn't deliver
%% to, since we need their 'DOWN' messages to clean
@@ -1523,8 +1544,8 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
false -> dict:store(QPid, QName, QNames0)
end, pmon:monitor(QPid, QMons0)}
end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs),
- State1 = process_routing_result(RoutingRes, DeliveredQPids,
- XName, MsgSeqNo, Message,
+ State1 = process_routing_result(DeliveredQPids, XName, Mandatory, MsgSeqNo,
+ Message,
State#ch{queue_names = QNames1,
queue_monitors = QMons1}),
?INCR_STATS([{exchange_stats, XName, 1} |
@@ -1534,20 +1555,32 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
publish, State1),
State1.
-process_routing_result(routed, _, _, undefined, _, State) ->
+%% TODO unbreak basic.return stats
+
+process_routing_result(_, _, _, undefined, _Msg, State) ->
State;
-process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
+process_routing_result([], XName, false, MsgSeqNo, _Msg, State) ->
record_confirms([{MsgSeqNo, XName}], State);
-process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
- State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
- State#ch.unconfirmed)};
-process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
+process_routing_result([], XName, true, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
- ?INCR_STATS([{exchange_stats, XName, 1}], return_unroutable, State),
- case MsgSeqNo of
- undefined -> State;
- _ -> record_confirms([{MsgSeqNo, XName}], State)
- end.
+ record_confirms([{MsgSeqNo, XName}], State);
+process_routing_result(QPids, XName, Mandatory, MsgSeqNo, Msg, State) ->
+ MandatoryTree = case Mandatory of
+ false -> State#ch.mandatory;
+ true -> dtree:insert(MsgSeqNo, QPids, Msg,
+ State#ch.mandatory)
+ end,
+ State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
+ State#ch.unconfirmed),
+ mandatory = MandatoryTree}.
+
+%% process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
+%% ok = basic_return(Msg, State, no_route),
+%% ?INCR_STATS([{exchange_stats, XName, 1}], return_unroutable, State),
+%% case MsgSeqNo of
+%% undefined -> State;
+%% _ -> record_confirms([{MsgSeqNo, XName}], State)
+%% end.
send_nacks([], State) ->
State;
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index ab8c62fe57..447cd89317 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -87,7 +87,7 @@ publish1(RoutingKey, Format, Data, LogExch) ->
%% 0-9-1 says the timestamp is a "64 bit POSIX timestamp". That's
%% second resolution, not millisecond.
Timestamp = rabbit_misc:now_ms() div 1000,
- {ok, _RoutingRes, _DeliveredQPids} =
+ {ok, _DeliveredQPids} =
rabbit_basic:publish(LogExch, RoutingKey,
#'P_basic'{content_type = <<"text/plain">>,
timestamp = Timestamp},
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index d0dcaa7185..b08a9a1c66 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -88,9 +88,9 @@ trace(#exchange{name = Name}, #basic_message{exchange_name = Name},
ok;
trace(X, Msg = #basic_message{content = #content{payload_fragments_rev = PFR}},
RKPrefix, RKSuffix, Extra) ->
- {ok, _, _} = rabbit_basic:publish(
- X, <<RKPrefix/binary, ".", RKSuffix/binary>>,
- #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR),
+ {ok, _} = rabbit_basic:publish(
+ X, <<RKPrefix/binary, ".", RKSuffix/binary>>,
+ #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR),
ok.
msg_to_table(#basic_message{exchange_name = #resource{name = XName},