diff options
| -rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 7 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 51 |
6 files changed, 56 insertions, 43 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 005994f09f..c01e924688 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -43,9 +43,10 @@ -spec(terminate/1 :: (state()) -> state()). -spec(delete_and_terminate/1 :: (state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). --spec(publish/2 :: (rabbit_types:basic_message(), state()) -> state()). --spec(publish_delivered/3 :: - (ack_required(), rabbit_types:basic_message(), state()) -> {ack(), state()}). +-spec(publish/3 :: (rabbit_types:basic_message(), boolean(), state()) -> state()). +-spec(publish_delivered/4 :: + (ack_required(), rabbit_types:basic_message(), boolean(), state()) + -> {ack(), state()}). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). -spec(ack/2 :: ([ack()], state()) -> state()). -spec(tx_publish/3 :: (rabbit_types:txn(), rabbit_types:basic_message(), state()) -> state()). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index eb34aeff47..bc2ffd173c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -427,28 +427,30 @@ run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), State1. -attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> +attempt_delivery(none, _ChPid, Message, MsgSeqNo, State = #q{backing_queue = BQ}) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> {AckTag, BQS1} = - BQ:publish_delivered(AckRequired, Message, BQS), + BQ:publish_delivered(AckRequired, Message, + MsgSeqNo =/= undefined, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); -attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +attempt_delivery(Txn, ChPid, Message, _MSN, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}. -deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> - case attempt_delivery(Txn, ChPid, Message, State) of +deliver_or_enqueue(Txn, ChPid, Message, MsgSeqNo, State = #q{backing_queue = BQ}) -> + case attempt_delivery(Txn, ChPid, Message, MsgSeqNo, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - BQS = BQ:publish(Message, State #q.backing_queue_state), + BQS = BQ:publish(Message, MsgSeqNo =/= undefined, + State #q.backing_queue_state), {false, NewState#q{backing_queue_state = BQS}} end. @@ -698,14 +700,13 @@ handle_call({deliver_immediately, Txn, Message, MsgSeqNo, ChPid}, _From, State) %% queues discarding the message? %% State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State), - {Delivered, State2} = attempt_delivery(Txn, ChPid, Message, State1 -), + {Delivered, State2} = attempt_delivery(Txn, ChPid, Message, MsgSeqNo, State1), reply(Delivered, State2); handle_call({deliver, Txn, Message, MsgSeqNo, ChPid}, _From, State) -> %% Synchronous, "mandatory" delivery mode State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State), - {Delivered, State2} = deliver_or_enqueue(Txn, ChPid, Message, State1), + {Delivered, State2} = deliver_or_enqueue(Txn, ChPid, Message, MsgSeqNo, State1), reply(Delivered, State2); handle_call({commit, Txn, ChPid}, From, State) -> @@ -855,7 +856,7 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> handle_cast({deliver, Txn, Message, MsgSeqNo, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State), - {_Delivered, State2} = deliver_or_enqueue(Txn, ChPid, Message, State1), + {_Delivered, State2} = deliver_or_enqueue(Txn, ChPid, Message, MsgSeqNo, State1), noreply(State2); handle_cast({ack, Txn, AckTags, ChPid}, diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 2230c507e9..32f9f15ab0 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -62,12 +62,12 @@ behaviour_info(callbacks) -> {purge, 1}, %% Publish a message. - {publish, 2}, + {publish, 3}, %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). - {publish_delivered, 3}, + {publish_delivered, 4}, %% Produce the next message. {fetch, 2}, diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 4e0dad8422..664ef65399 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -31,8 +31,8 @@ -module(rabbit_invariable_queue). --export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2, - publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3, +-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, + publish_delivered/4, fetch/2, ack/2, tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -99,14 +99,14 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, ok = persist_acks(QName, IsDurable, none, AckTags, PA), {Len, State #iv_state { len = 0, queue = queue:new() }}. -publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable, - len = Len }) -> +publish(Msg, _, State = #iv_state { queue = Q, qname = QName, durable = IsDurable, + len = Len }) -> ok = persist_message(QName, IsDurable, none, Msg), State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }. -publish_delivered(false, _Msg, State) -> +publish_delivered(false, _Msg, _, State) -> {blank_ack, State}; -publish_delivered(true, Msg = #basic_message { guid = Guid }, +publish_delivered(true, Msg = #basic_message { guid = Guid }, _, State = #iv_state { qname = QName, durable = IsDurable, len = 0, pending_ack = PA }) -> ok = persist_message(QName, IsDurable, none, Msg), diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 9e38a9766d..3cf2ec4fcb 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -634,7 +634,7 @@ handle_call({register_sync_callback, ClientRef, Fun}, _From, reply(ok, State #msstate { client_ondisk_callback = dict:store(ClientRef, Fun, CODC) }); -handle_call({client_terminate, CState = #client_msstate { client_ref = CRef }}, +handle_call({client_terminate, #client_msstate { client_ref = CRef }}, _From, State = #msstate { client_ondisk_callback = CODC, cref_to_guids = CTG }) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8b4f55c5e1..83448c54b5 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -32,7 +32,7 @@ -module(rabbit_variable_queue). -export([init/3, terminate/1, delete_and_terminate/1, - purge/1, publish/2, publish_delivered/3, fetch/2, ack/2, + purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, @@ -509,14 +509,15 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> ram_index_count = 0, persistent_count = 0 })}. -publish(Msg, State) -> - {_SeqId, State1} = publish(Msg, false, false, State), +publish(Msg, NeedsConfirming, State) -> + {_SeqId, State1} = publish(Msg, false, false, NeedsConfirming, State), a(reduce_memory_use(State1)). -publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> +publish_delivered(false, _Msg, _NC, State = #vqstate { len = 0 }) -> {blank_ack, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, + NeedsConfirming, State = #vqstate { len = 0, next_seq_id = SeqId, out_counter = OutCount, @@ -530,12 +531,17 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), PA1 = record_pending_ack(m(MsgStatus1), PA), PCount1 = PCount + one_if(IsPersistent1), - {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - pending_ack = PA1, - need_acking = gb_sets:insert(Guid, State1#vqstate.need_acking)})}. + {SeqId, a(State1 #vqstate { + next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + pending_ack = PA1, + need_acking = + case NeedsConfirming of + true -> gb_sets:insert(Guid, State1#vqstate.need_acking); + false -> State1#vqstate.need_acking + end })}. fetch(AckRequired, State = #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, @@ -649,14 +655,14 @@ requeue(AckTags, State) -> a(reduce_memory_use( ack(fun rabbit_msg_store:release/2, fun (#msg_status { msg = Msg }, State1) -> - {_SeqId, State2} = publish(Msg, true, false, State1), + {_SeqId, State2} = publish(Msg, true, false, false, State1), State2; ({IsPersistent, Guid}, State1) -> #vqstate { msg_store_clients = MSCState } = State1, {{ok, Msg = #basic_message{}}, MSCState1} = read_from_msg_store(MSCState, IsPersistent, Guid), State2 = State1 #vqstate { msg_store_clients = MSCState1 }, - {_SeqId, State3} = publish(Msg, true, true, State2), + {_SeqId, State3} = publish(Msg, true, true, false, State2), State3 end, AckTags, State))). @@ -974,7 +980,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { fun (Msg = #basic_message { is_persistent = IsPersistent }, {SeqIdsAcc, State2}) -> IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State3} = publish(Msg, false, IsPersistent1, State2), + {SeqId, State3} = publish(Msg, false, IsPersistent1, false, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} end, {PAcks, element(1, ack(Acks, State))}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), @@ -1022,7 +1028,7 @@ remove_queue_entries1( publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, - IsDelivered, MsgOnDisk, + IsDelivered, MsgOnDisk, NeedsConfirming, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, len = Len, @@ -1039,12 +1045,17 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } end, PCount1 = PCount + one_if(IsPersistent1), - {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - ram_msg_count = RamMsgCount + 1, - need_acking = gb_sets:add(Guid, State2#vqstate.need_acking) }}. + {SeqId, State2 #vqstate { + next_seq_id = SeqId + 1, + len = Len + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + ram_msg_count = RamMsgCount + 1, + need_acking = + case NeedsConfirming of + true -> gb_sets:add(Guid, State2#vqstate.need_acking); + false -> State2#vqstate.need_acking + end }}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, MSCState) -> |
