diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-29 15:41:54 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-29 15:41:54 +0100 |
| commit | 21c8e31bb846bc8ac7fa10938cd2d88d8af02412 (patch) | |
| tree | 823c71f9590f77f8a211370dacf7515cdbc5a55d | |
| parent | 7b7be8435a870ff8a2f4fe138427b8d1bd26ae10 (diff) | |
| download | rabbitmq-server-git-21c8e31bb846bc8ac7fa10938cd2d88d8af02412.tar.gz | |
no more redundant message passing
| -rw-r--r-- | src/rabbit_channel.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 18 |
2 files changed, 17 insertions, 27 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7a8e1a3157..1f8455cc25 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -287,13 +287,7 @@ handle_cast(flush_multiple_acks, confirm_tref = undefined}}; handle_cast({confirm, MsgSeqNo}, State) -> - {noreply, send_or_enqueue_ack(MsgSeqNo, State)}; - -handle_cast({msg_sent_to_queues, MsgSeqNo, QPids}, State) -> - {noreply, lists:foldl(fun (QPid, State0) -> - msg_sent_to_queues(MsgSeqNo, QPid, State0) - end, State, QPids)}. - + {noreply, send_or_enqueue_ack(MsgSeqNo, State)}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, State = #ch{qpid_to_msgs = QTM}) -> @@ -484,7 +478,9 @@ send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) -> qpid_to_msgs = QTM1}) end). -msg_sent_to_queues(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) -> +msg_sent_to_queue(undefined, _QPid, State) -> + State; +msg_sent_to_queue(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) -> case dict:find(QPid, QTM) of {ok, Msgs} -> State#ch{ @@ -527,9 +523,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, immediate = Immediate}, - Content, State = #ch{virtual_host = VHostPath, - transaction_id = TxnKey, - writer_pid = WriterPid}) -> + Content, State = #ch{virtual_host = VHostPath, + transaction_id = TxnKey, + writer_pid = WriterPid, + confirm_enabled = ConfirmEnabled}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -538,7 +535,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), IsPersistent = is_message_persistent(DecodedContent), {MsgSeqNo, State1} - = case State#ch.confirm_enabled of + = case ConfirmEnabled of false -> {undefined, State}; true -> @@ -566,7 +563,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routed -> case {IsPersistent, DeliveredQPids} of {_, []} -> send_or_enqueue_ack(MsgSeqNo, State1); - {true, _} -> State1; + {true, _} -> + lists:foldl(fun (QPid, State0) -> + msg_sent_to_queue(MsgSeqNo, QPid, State0) + end, State1, DeliveredQPids); {false, _} -> send_or_enqueue_ack(MsgSeqNo, State1) end; %% Confirm after basic.returns diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index e5ffe863a9..707698b099 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -58,8 +58,7 @@ %%---------------------------------------------------------------------------- deliver(QPids, Delivery = #delivery{mandatory = false, - immediate = false, - msg_seq_no = MsgSeqNo}) -> + immediate = false}) -> %% optimisation: when Mandatory = false and Immediate = false, %% rabbit_amqqueue:deliver will deliver the message to the queue %% process asynchronously, and return true, which means all the @@ -69,10 +68,9 @@ deliver(QPids, Delivery = #delivery{mandatory = false, %% case below. delegate:invoke_no_result( QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), - maybe_inform_channel(MsgSeqNo, QPids), {routed, QPids}; -deliver(QPids, Delivery = #delivery{msg_seq_no = MsgSeqNo}) -> +deliver(QPids, Delivery) -> {Success, _} = delegate:invoke(QPids, fun (Pid) -> @@ -82,11 +80,8 @@ deliver(QPids, Delivery = #delivery{msg_seq_no = MsgSeqNo}) -> lists:foldl(fun fold_deliveries/2, {false, []}, Success), case check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, {Routed, Handled}) of - {routed, Qs} -> - maybe_inform_channel(MsgSeqNo, Qs), - {routed, Qs}; - O -> - O + {routed, Qs} -> {routed, Qs}; + O -> O end. %% TODO: Maybe this should be handled by a cursor instead. @@ -125,8 +120,3 @@ fold_deliveries({_, false},{_, Handled}) -> {true, Handled}. check_delivery(true, _ , {false, []}) -> {unroutable, []}; check_delivery(_ , true, {_ , []}) -> {not_delivered, []}; check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}. - -maybe_inform_channel(undefined, _) -> - ok; -maybe_inform_channel(MsgSeqNo, QPids) -> - gen_server2:cast(self(), {msg_sent_to_queues, MsgSeqNo, QPids}). |
