diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-21 17:23:19 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-21 17:23:19 +0000 |
| commit | 38e5b687de76739b5419c1f0f6ddf0d8262ea16e (patch) | |
| tree | 1e8668f4f740f1e34f2baa20d038f910e7948fdc | |
| parent | 4735326a076ac5f00d11118bd223a6079b7262e7 (diff) | |
| download | rabbitmq-server-git-38e5b687de76739b5419c1f0f6ddf0d8262ea16e.tar.gz | |
Quick-and-dirty version of pipelined mandatory publishing
| -rw-r--r-- | src/rabbit_amqqueue.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 75 | ||||
| -rw-r--r-- | src/rabbit_error_logger.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_trace.erl | 6 |
6 files changed, 73 insertions, 49 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 282113a419..0fd945392f 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -703,11 +703,12 @@ pseudo_queue(QueueName, Pid) -> pid = Pid, slave_pids = []}. -deliver([], #delivery{mandatory = false}, _Flow) -> +deliver([], _Delivery, _Flow) -> %% /dev/null optimisation - {routed, []}; + []; -deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) -> +deliver(Qs, Delivery, Flow) -> + %% TODO simplify? %% optimisation: when Mandatory = false, rabbit_amqqueue:deliver %% will deliver the message to the queue process asynchronously, %% and return true, which means all the QPids will always be @@ -730,19 +731,7 @@ deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) -> SMsg = {deliver, Delivery, true, Flow}, delegate:cast(MPids, MMsg), delegate:cast(SPids, SMsg), - {routed, QPids}; - -deliver(Qs, Delivery, _Flow) -> - {MPids, SPids} = qpids(Qs), - %% see comment above - MMsg = {deliver, Delivery, false}, - SMsg = {deliver, Delivery, true}, - {MRouted, _} = delegate:call(MPids, MMsg), - {SRouted, _} = delegate:call(SPids, SMsg), - case MRouted ++ SRouted of - [] -> {unroutable, []}; - R -> {routed, [QPid || {QPid, ok} <- R]} - end. + QPids. qpids([]) -> {[], []}; %% optimisation qpids([#amqqueue{pid = QPid, slave_pids = SPids}]) -> {[QPid], SPids}; %% opt diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 08509c96cc..44754788d9 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -441,6 +441,13 @@ send_or_record_confirm(#delivery{sender = SenderPid, rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), {immediately, State}. +send_mandatory(#delivery{mandatory = false}) -> + ok; +send_mandatory(#delivery{mandatory = true, + sender = SenderPid, + msg_seq_no = MsgSeqNo}) -> + gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo, self()}). + discard(#delivery{sender = SenderPid, msg_seq_no = MsgSeqNo, message = #basic_message{id = MsgId}}, State) -> @@ -496,6 +503,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + send_mandatory(Delivery), {Confirm, State1} = send_or_record_confirm(Delivery, State), Props = message_properties(Message, Confirm, State), {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), @@ -884,11 +892,6 @@ handle_call({info, Items}, _From, State) -> handle_call(consumers, _From, State = #q{consumers = Consumers}) -> reply(rabbit_queue_consumers:all(Consumers), State); -handle_call({deliver, Delivery, Delivered}, From, State) -> - %% Synchronous, "mandatory" deliver mode. - gen_server2:reply(From, ok), - noreply(deliver_or_enqueue(Delivery, Delivered, State)); - handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues %% are no longer visible by the time we send a response to the @@ -1041,7 +1044,6 @@ handle_cast({run_backing_queue, Mod, Fun}, handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, State = #q{senders = Senders}) -> - %% Asynchronous, non-"mandatory" deliver mode. Senders1 = case Flow of flow -> credit_flow:ack(Sender), pmon:monitor(Sender, Senders); diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 3d70be4bc3..3e94486787 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -107,8 +107,8 @@ publish(Delivery = #delivery{ publish(X, Delivery) -> Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)), - {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver(Qs, Delivery), - {ok, RoutingRes, DeliveredQPids}. + DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery), + {ok, DeliveredQPids}. delivery(Mandatory, Message, MsgSeqNo) -> #delivery{mandatory = Mandatory, sender = self(), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b072941964..dba826fc08 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -39,7 +39,7 @@ queue_names, queue_monitors, consumer_mapping, blocking, queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, - unconfirmed, confirmed, capabilities, trace_state}). + unconfirmed, confirmed, mandatory, capabilities, trace_state}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -221,6 +221,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, publish_seqno = 1, unconfirmed = dtree:empty(), confirmed = [], + mandatory = dtree:empty(), capabilities = Capabilities, trace_state = rabbit_trace:init(VHost)}, State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer), @@ -239,8 +240,9 @@ prioritise_call(Msg, _From, _Len, _State) -> prioritise_cast(Msg, _Len, _State) -> case Msg of - {confirm, _MsgSeqNos, _QPid} -> 5; - _ -> 0 + {confirm, _MsgSeqNos, _QPid} -> 5; + {mandatory_received, _MsgSeqNo, _QPid} -> 5; + _ -> 0 end. prioritise_info(Msg, _Len, _State) -> @@ -346,6 +348,13 @@ handle_cast(force_event_refresh, State) -> rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), noreply(State); +%% TODO duplication? +handle_cast({mandatory_received, MsgSeqNo, From}, State) -> + State1 = #ch{mandatory = M} = handle_mandatory(MsgSeqNo, From, State), + Timeout = case M of [] -> hibernate; _ -> 0 end, + %% NB: don't call noreply/1 since we don't want to send confirms. + {noreply, ensure_stats_timer(State1), Timeout}; + handle_cast({confirm, MsgSeqNos, From}, State) -> State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), Timeout = case C of [] -> hibernate; _ -> 0 end, @@ -623,6 +632,10 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC), record_confirms(MXs, State#ch{unconfirmed = UC1}). +handle_mandatory(MsgSeqNo, QPid, State = #ch{mandatory = UC}) -> + {_MXs, UC1} = dtree:take([MsgSeqNo], QPid, UC), + State#ch{mandatory = UC1}. + handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> %% Don't leave "starting" as the state for 5s. TODO is this TRTTD? State1 = State#ch{state = running}, @@ -696,7 +709,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, check_user_id_header(Props, State), check_expiration_header(Props), {MsgSeqNo, State1} = - case {Tx, ConfirmEnabled} of + case {Tx, ConfirmEnabled orelse Mandatory} of {none, false} -> {undefined, State}; {_, _} -> SeqNo = State#ch.publish_seqno, {SeqNo, State#ch{publish_seqno = SeqNo + 1}} @@ -1251,12 +1264,19 @@ monitor_delivering_queue(NoAck, QPid, QName, false -> sets:add_element(QPid, DQ) end}. -handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> +handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC, + mandatory = Mand}) -> + %% TODO do we need take_all here? + {MMsgs, Mand1} = dtree:take(QPid, Mand), + io:format("returning ~p~n", [MMsgs]), + [basic_return(Msg, State, no_route) || {_, Msg} <- MMsgs], + State1 = State#ch{mandatory = Mand1}, case rabbit_misc:is_abnormal_exit(Reason) of true -> {MXs, UC1} = dtree:take_all(QPid, UC), - send_nacks(MXs, State#ch{unconfirmed = UC1}); + send_nacks(MXs, State1#ch{unconfirmed = UC1}); false -> {MXs, UC1} = dtree:take(QPid, UC), - record_confirms(MXs, State#ch{unconfirmed = UC1}) + record_confirms(MXs, State1#ch{unconfirmed = UC1}) + end. handle_consuming_queue_down(QPid, @@ -1500,11 +1520,12 @@ deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, State; deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ exchange_name = XName}, + mandatory = Mandatory, msg_seq_no = MsgSeqNo}, DelQNames}, State = #ch{queue_names = QNames, queue_monitors = QMons}) -> Qs = rabbit_amqqueue:lookup(DelQNames), - {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(Qs, Delivery), + DeliveredQPids = rabbit_amqqueue:deliver_flow(Qs, Delivery), %% The pmon:monitor_all/2 monitors all queues to which we %% delivered. But we want to monitor even queues we didn't deliver %% to, since we need their 'DOWN' messages to clean @@ -1523,8 +1544,8 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ false -> dict:store(QPid, QName, QNames0) end, pmon:monitor(QPid, QMons0)} end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs), - State1 = process_routing_result(RoutingRes, DeliveredQPids, - XName, MsgSeqNo, Message, + State1 = process_routing_result(DeliveredQPids, XName, Mandatory, MsgSeqNo, + Message, State#ch{queue_names = QNames1, queue_monitors = QMons1}), ?INCR_STATS([{exchange_stats, XName, 1} | @@ -1534,20 +1555,32 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ publish, State1), State1. -process_routing_result(routed, _, _, undefined, _, State) -> +%% TODO unbreak basic.return stats + +process_routing_result(_, _, _, undefined, _Msg, State) -> State; -process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> +process_routing_result([], XName, false, MsgSeqNo, _Msg, State) -> record_confirms([{MsgSeqNo, XName}], State); -process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> - State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, - State#ch.unconfirmed)}; -process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> +process_routing_result([], XName, true, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), - ?INCR_STATS([{exchange_stats, XName, 1}], return_unroutable, State), - case MsgSeqNo of - undefined -> State; - _ -> record_confirms([{MsgSeqNo, XName}], State) - end. + record_confirms([{MsgSeqNo, XName}], State); +process_routing_result(QPids, XName, Mandatory, MsgSeqNo, Msg, State) -> + MandatoryTree = case Mandatory of + false -> State#ch.mandatory; + true -> dtree:insert(MsgSeqNo, QPids, Msg, + State#ch.mandatory) + end, + State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, + State#ch.unconfirmed), + mandatory = MandatoryTree}. + +%% process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> +%% ok = basic_return(Msg, State, no_route), +%% ?INCR_STATS([{exchange_stats, XName, 1}], return_unroutable, State), +%% case MsgSeqNo of +%% undefined -> State; +%% _ -> record_confirms([{MsgSeqNo, XName}], State) +%% end. send_nacks([], State) -> State; diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index ab8c62fe57..447cd89317 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -87,7 +87,7 @@ publish1(RoutingKey, Format, Data, LogExch) -> %% 0-9-1 says the timestamp is a "64 bit POSIX timestamp". That's %% second resolution, not millisecond. Timestamp = rabbit_misc:now_ms() div 1000, - {ok, _RoutingRes, _DeliveredQPids} = + {ok, _DeliveredQPids} = rabbit_basic:publish(LogExch, RoutingKey, #'P_basic'{content_type = <<"text/plain">>, timestamp = Timestamp}, diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl index d0dcaa7185..b08a9a1c66 100644 --- a/src/rabbit_trace.erl +++ b/src/rabbit_trace.erl @@ -88,9 +88,9 @@ trace(#exchange{name = Name}, #basic_message{exchange_name = Name}, ok; trace(X, Msg = #basic_message{content = #content{payload_fragments_rev = PFR}}, RKPrefix, RKSuffix, Extra) -> - {ok, _, _} = rabbit_basic:publish( - X, <<RKPrefix/binary, ".", RKSuffix/binary>>, - #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR), + {ok, _} = rabbit_basic:publish( + X, <<RKPrefix/binary, ".", RKSuffix/binary>>, + #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR), ok. msg_to_table(#basic_message{exchange_name = #resource{name = XName}, |
