diff options
| -rw-r--r-- | include/rabbit.hrl | 5 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 44 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 4 |
4 files changed, 30 insertions, 30 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 44e1368460..6212d4f327 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -62,7 +62,7 @@ -record(listener, {node, protocol, host, port}). --record(basic_message, {exchange_name, routing_key, content, persistent_key}). +-record(basic_message, {exchange_name, routing_key, content, guid, is_persistent}). -record(dq_msg_loc, {queue_and_seq_id, is_delivered, msg_id}). @@ -134,7 +134,8 @@ #basic_message{exchange_name :: exchange_name(), routing_key :: routing_key(), content :: content(), - persistent_key :: maybe(pkey())}). + guid :: guid(), + is_persistent :: bool()}). -type(message() :: basic_message()). %% this really should be an abstract type -type(msg_id() :: non_neg_integer()). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c390b2b7e4..69edb64fbc 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -203,15 +203,15 @@ attempt_delivery(none, Message, State) -> {offered, false, State1} -> {true, State1}; {offered, true, State1} -> - persist_message(none, qname(State), Message), - persist_delivery(qname(State), Message, false), + persist_message(none, qname(State), Message), %% DQ HERE + persist_delivery(qname(State), Message, false), %% DQ HERE {true, State1}; {not_offered, State1} -> {false, State1} end; attempt_delivery(Txn, Message, State) -> - persist_message(Txn, qname(State), Message), - record_pending_message(Txn, Message), + persist_message(Txn, qname(State), Message), %% DQ tx_commit and store msgid in txn map + record_pending_message(Txn, Message), %% DQ seems to be done here! {true, State}. deliver_or_enqueue(Txn, Message, State) -> @@ -219,8 +219,8 @@ deliver_or_enqueue(Txn, Message, State) -> {true, NewState} -> {true, NewState}; {false, NewState} -> - persist_message(Txn, qname(State), Message), - NewMB = queue:in({Message, false}, NewState#q.message_buffer), + persist_message(Txn, qname(State), Message), %% DQ Txn must be false here + NewMB = queue:in({Message, false}, NewState#q.message_buffer), %% DQ magic here {false, NewState#q{message_buffer = NewMB}} end. @@ -302,7 +302,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, case check_auto_delete( deliver_or_enqueue_n( [{Message, true} || - {_Messsage_id, Message} <- dict:to_list(UAM)], + {_Messsage_id, Message} <- dict:to_list(UAM)], %% DQ alter all this stuff? State#q{ exclusive_consumer = case Holder of {ChPid, _} -> none; @@ -343,10 +343,10 @@ run_poke_burst(MessageBuffer, State) -> {{value, {Message, Delivered}}, BufferTail} -> case deliver_immediately(Message, Delivered, State) of {offered, true, NewState} -> - persist_delivery(qname(State), Message, Delivered), + persist_delivery(qname(State), Message, Delivered), %% DQ ack needed run_poke_burst(BufferTail, NewState); {offered, false, NewState} -> - persist_auto_ack(qname(State), Message), + persist_auto_ack(qname(State), Message), %% DQ record? We don't persist acks anyway now... run_poke_burst(BufferTail, NewState); {not_offered, NewState} -> NewState#q{message_buffer = MessageBuffer} @@ -371,7 +371,7 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. -persist_message(_Txn, _QName, #basic_message{persistent_key = none}) -> +persist_message(_Txn, _QName, #basic_message{is_persistent = false}) -> %% DQ ok; persist_message(Txn, QName, Message) -> M = Message#basic_message{ @@ -379,29 +379,29 @@ persist_message(Txn, QName, Message) -> content = rabbit_binary_parser:clear_decoded_content( Message#basic_message.content)}, persist_work(Txn, QName, - [{publish, M, {QName, M#basic_message.persistent_key}}]). + [{publish, M, {QName, M#basic_message.guid}}]). -persist_delivery(_QName, _Message, +persist_delivery(_QName, _Message, %% DQ true) -> ok; -persist_delivery(_QName, #basic_message{persistent_key = none}, +persist_delivery(_QName, #basic_message{is_persistent = false}, %% DQ _Delivered) -> ok; -persist_delivery(QName, #basic_message{persistent_key = PKey}, +persist_delivery(QName, #basic_message{guid = MsgId}, %% DQ _Delivered) -> - persist_work(none, QName, [{deliver, {QName, PKey}}]). + persist_work(none, QName, [{deliver, {QName, MsgId}}]). -persist_acks(Txn, QName, Messages) -> +persist_acks(Txn, QName, Messages) -> %% DQ persist_work(Txn, QName, - [{ack, {QName, PKey}} || - #basic_message{persistent_key = PKey} <- Messages, - PKey =/= none]). + [{ack, {QName, MsgId}} || + #basic_message{guid = MsgId, is_persistent = P} <- Messages, + P]). -persist_auto_ack(_QName, #basic_message{persistent_key = none}) -> +persist_auto_ack(_QName, #basic_message{is_persistent = false}) -> ok; -persist_auto_ack(QName, #basic_message{persistent_key = PKey}) -> +persist_auto_ack(QName, #basic_message{is_persistent = true, guid = MsgId}) -> %% auto-acks are always non-transactional - rabbit_persister:dirty_work([{ack, {QName, PKey}}]). + rabbit_persister:dirty_work([{ack, {QName, MsgId}}]). persist_work(_Txn,_QName, []) -> ok; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7574cd673a..aeb15bd1b9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -313,15 +313,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), - PersistentKey = case is_message_persistent(DecodedContent) of - true -> rabbit_guid:guid(); - false -> none - end, {noreply, publish(Mandatory, Immediate, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, - persistent_key = PersistentKey}, + guid = rabbit_guid:guid(), + is_persistent = is_message_persistent(DecodedContent)}, rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index a57e8076bf..9b3bbb1851 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -204,7 +204,9 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKeyBin, content = Content, - persistent_key = none}, + is_persistent = false, + guid = rabbit_guid:guid() + }, simple_publish(Mandatory, Immediate, Message). %% Usable by Erlang code that wants to publish messages. |
