diff options
| author | Matthias Radestock <matthias@lshift.net> | 2009-04-28 23:22:54 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2009-04-28 23:22:54 +0100 |
| commit | 1a25579854e1ed17b0ff5689dde624622ea9109a (patch) | |
| tree | 3a075f6366737cfed86c5277190cf7cc08c24503 /src | |
| parent | 9f7900665c0442ae4fd2e1ada5dbe74ce1f5037e (diff) | |
| download | rabbitmq-server-git-1a25579854e1ed17b0ff5689dde624622ea9109a.tar.gz | |
refactoring
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 32 |
1 files changed, 14 insertions, 18 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a7e3f2bb0a..4a3cb28561 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -306,7 +306,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, immediate = Immediate}, - Content, State = #ch{ virtual_host = VHostPath}) -> + Content, State = #ch{ virtual_host = VHostPath, + transaction_id = TxnKey, + writer_pid = WriterPid}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -317,12 +319,16 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, true -> rabbit_guid:guid(); false -> none end, - {noreply, publish(Exchange, Mandatory, Immediate, + Handled = publish(Exchange, Mandatory, Immediate, TxnKey, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, persistent_key = PersistentKey}, - rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)}; + WriterPid), + {noreply, case TxnKey of + none -> State; + _ -> add_tx_participants(Handled, State) + end}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, @@ -767,17 +773,10 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ok -> return_ok(State, NoWait, ReturnMethod) end. -publish(X, Mandatory, Immediate, Message, QPids, - State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) -> - Handled = deliver(X, QPids, Mandatory, Immediate, TxnKey, - Message, WriterPid), - case TxnKey of - none -> State; - _ -> add_tx_participants(Handled, State) - end. - -deliver(X, QPids, Mandatory, Immediate, Txn, Message, WriterPid) -> - case deliver(QPids, Mandatory, Immediate, Txn, Message, WriterPid) of +publish(X, Mandatory, Immediate, Txn, + Message = #basic_message{routing_key = RK, content = C}, WriterPid) -> + case deliver(rabbit_exchange:route(X, RK, C), Mandatory, Immediate, + Txn, Message, WriterPid) of [] -> case lists:keysearch(<<"ume">>, 1, X#exchange.arguments) of {value, {_, longstr, UmeNameBin}} -> @@ -785,10 +784,7 @@ deliver(X, QPids, Mandatory, Immediate, Txn, Message, WriterPid) -> UmeName = rabbit_misc:r(XName, exchange, UmeNameBin), case rabbit_exchange:lookup(UmeName) of {ok, Ume} -> - #basic_message{routing_key = RK, content = C} = - Message, - deliver(Ume, rabbit_exchange:route(Ume, RK, C), - false, false, Txn, Message, WriterPid); + publish(Ume, false, false, Txn, Message, WriterPid); {error, not_found} -> rabbit_log:warning( "unroutable message exchange for ~s " |
