summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit_backing_queue_spec.hrl7
-rw-r--r--src/rabbit_amqqueue_process.erl23
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_invariable_queue.erl12
-rw-r--r--src/rabbit_msg_store.erl2
-rw-r--r--src/rabbit_variable_queue.erl51
6 files changed, 56 insertions, 43 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 005994f09f..c01e924688 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -43,9 +43,10 @@
-spec(terminate/1 :: (state()) -> state()).
-spec(delete_and_terminate/1 :: (state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
--spec(publish/2 :: (rabbit_types:basic_message(), state()) -> state()).
--spec(publish_delivered/3 ::
- (ack_required(), rabbit_types:basic_message(), state()) -> {ack(), state()}).
+-spec(publish/3 :: (rabbit_types:basic_message(), boolean(), state()) -> state()).
+-spec(publish_delivered/4 ::
+ (ack_required(), rabbit_types:basic_message(), boolean(), state())
+ -> {ack(), state()}).
-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
-spec(ack/2 :: ([ack()], state()) -> state()).
-spec(tx_publish/3 :: (rabbit_types:txn(), rabbit_types:basic_message(), state()) -> state()).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index eb34aeff47..bc2ffd173c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -427,28 +427,30 @@ run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
{_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State),
State1.
-attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
+attempt_delivery(none, _ChPid, Message, MsgSeqNo, State = #q{backing_queue = BQ}) ->
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
{AckTag, BQS1} =
- BQ:publish_delivered(AckRequired, Message, BQS),
+ BQ:publish_delivered(AckRequired, Message,
+ MsgSeqNo =/= undefined, BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
-attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+attempt_delivery(Txn, ChPid, Message, _MSN, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
record_current_channel_tx(ChPid, Txn),
{true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}.
-deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
- case attempt_delivery(Txn, ChPid, Message, State) of
+deliver_or_enqueue(Txn, ChPid, Message, MsgSeqNo, State = #q{backing_queue = BQ}) ->
+ case attempt_delivery(Txn, ChPid, Message, MsgSeqNo, State) of
{true, NewState} ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
- BQS = BQ:publish(Message, State #q.backing_queue_state),
+ BQS = BQ:publish(Message, MsgSeqNo =/= undefined,
+ State #q.backing_queue_state),
{false, NewState#q{backing_queue_state = BQS}}
end.
@@ -698,14 +700,13 @@ handle_call({deliver_immediately, Txn, Message, MsgSeqNo, ChPid}, _From, State)
%% queues discarding the message?
%%
State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State),
- {Delivered, State2} = attempt_delivery(Txn, ChPid, Message, State1
-),
+ {Delivered, State2} = attempt_delivery(Txn, ChPid, Message, MsgSeqNo, State1),
reply(Delivered, State2);
handle_call({deliver, Txn, Message, MsgSeqNo, ChPid}, _From, State) ->
%% Synchronous, "mandatory" delivery mode
State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State),
- {Delivered, State2} = deliver_or_enqueue(Txn, ChPid, Message, State1),
+ {Delivered, State2} = deliver_or_enqueue(Txn, ChPid, Message, MsgSeqNo, State1),
reply(Delivered, State2);
handle_call({commit, Txn, ChPid}, From, State) ->
@@ -855,7 +856,7 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
handle_cast({deliver, Txn, Message, MsgSeqNo, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State),
- {_Delivered, State2} = deliver_or_enqueue(Txn, ChPid, Message, State1),
+ {_Delivered, State2} = deliver_or_enqueue(Txn, ChPid, Message, MsgSeqNo, State1),
noreply(State2);
handle_cast({ack, Txn, AckTags, ChPid},
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 2230c507e9..32f9f15ab0 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -62,12 +62,12 @@ behaviour_info(callbacks) ->
{purge, 1},
%% Publish a message.
- {publish, 2},
+ {publish, 3},
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
- {publish_delivered, 3},
+ {publish_delivered, 4},
%% Produce the next message.
{fetch, 2},
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index 4e0dad8422..664ef65399 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -31,8 +31,8 @@
-module(rabbit_invariable_queue).
--export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2,
- publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3,
+-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3,
+ publish_delivered/4, fetch/2, ack/2, tx_publish/3, tx_ack/3,
tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1,
idle_timeout/1, handle_pre_hibernate/1, status/1]).
@@ -99,14 +99,14 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
ok = persist_acks(QName, IsDurable, none, AckTags, PA),
{Len, State #iv_state { len = 0, queue = queue:new() }}.
-publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
- len = Len }) ->
+publish(Msg, _, State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
+ len = Len }) ->
ok = persist_message(QName, IsDurable, none, Msg),
State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }.
-publish_delivered(false, _Msg, State) ->
+publish_delivered(false, _Msg, _, State) ->
{blank_ack, State};
-publish_delivered(true, Msg = #basic_message { guid = Guid },
+publish_delivered(true, Msg = #basic_message { guid = Guid }, _,
State = #iv_state { qname = QName, durable = IsDurable,
len = 0, pending_ack = PA }) ->
ok = persist_message(QName, IsDurable, none, Msg),
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 9e38a9766d..3cf2ec4fcb 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -634,7 +634,7 @@ handle_call({register_sync_callback, ClientRef, Fun}, _From,
reply(ok, State #msstate { client_ondisk_callback =
dict:store(ClientRef, Fun, CODC) });
-handle_call({client_terminate, CState = #client_msstate { client_ref = CRef }},
+handle_call({client_terminate, #client_msstate { client_ref = CRef }},
_From,
State = #msstate { client_ondisk_callback = CODC,
cref_to_guids = CTG }) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8b4f55c5e1..83448c54b5 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -32,7 +32,7 @@
-module(rabbit_variable_queue).
-export([init/3, terminate/1, delete_and_terminate/1,
- purge/1, publish/2, publish_delivered/3, fetch/2, ack/2,
+ purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3,
requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
@@ -509,14 +509,15 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
ram_index_count = 0,
persistent_count = 0 })}.
-publish(Msg, State) ->
- {_SeqId, State1} = publish(Msg, false, false, State),
+publish(Msg, NeedsConfirming, State) ->
+ {_SeqId, State1} = publish(Msg, false, false, NeedsConfirming, State),
a(reduce_memory_use(State1)).
-publish_delivered(false, _Msg, State = #vqstate { len = 0 }) ->
+publish_delivered(false, _Msg, _NC, State = #vqstate { len = 0 }) ->
{blank_ack, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
guid = Guid },
+ NeedsConfirming,
State = #vqstate { len = 0,
next_seq_id = SeqId,
out_counter = OutCount,
@@ -530,12 +531,17 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
PA1 = record_pending_ack(m(MsgStatus1), PA),
PCount1 = PCount + one_if(IsPersistent1),
- {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- pending_ack = PA1,
- need_acking = gb_sets:insert(Guid, State1#vqstate.need_acking)})}.
+ {SeqId, a(State1 #vqstate {
+ next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ pending_ack = PA1,
+ need_acking =
+ case NeedsConfirming of
+ true -> gb_sets:insert(Guid, State1#vqstate.need_acking);
+ false -> State1#vqstate.need_acking
+ end })}.
fetch(AckRequired, State = #vqstate { q4 = Q4,
ram_msg_count = RamMsgCount,
@@ -649,14 +655,14 @@ requeue(AckTags, State) ->
a(reduce_memory_use(
ack(fun rabbit_msg_store:release/2,
fun (#msg_status { msg = Msg }, State1) ->
- {_SeqId, State2} = publish(Msg, true, false, State1),
+ {_SeqId, State2} = publish(Msg, true, false, false, State1),
State2;
({IsPersistent, Guid}, State1) ->
#vqstate { msg_store_clients = MSCState } = State1,
{{ok, Msg = #basic_message{}}, MSCState1} =
read_from_msg_store(MSCState, IsPersistent, Guid),
State2 = State1 #vqstate { msg_store_clients = MSCState1 },
- {_SeqId, State3} = publish(Msg, true, true, State2),
+ {_SeqId, State3} = publish(Msg, true, true, false, State2),
State3
end,
AckTags, State))).
@@ -974,7 +980,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
fun (Msg = #basic_message { is_persistent = IsPersistent },
{SeqIdsAcc, State2}) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- {SeqId, State3} = publish(Msg, false, IsPersistent1, State2),
+ {SeqId, State3} = publish(Msg, false, IsPersistent1, false, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
end, {PAcks, element(1, ack(Acks, State))}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
@@ -1022,7 +1028,7 @@ remove_queue_entries1(
publish(Msg = #basic_message { is_persistent = IsPersistent,
guid = Guid },
- IsDelivered, MsgOnDisk,
+ IsDelivered, MsgOnDisk, NeedsConfirming,
State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
next_seq_id = SeqId,
len = Len,
@@ -1039,12 +1045,17 @@ publish(Msg = #basic_message { is_persistent = IsPersistent,
true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
end,
PCount1 = PCount + one_if(IsPersistent1),
- {SeqId, State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- ram_msg_count = RamMsgCount + 1,
- need_acking = gb_sets:add(Guid, State2#vqstate.need_acking) }}.
+ {SeqId, State2 #vqstate {
+ next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ ram_msg_count = RamMsgCount + 1,
+ need_acking =
+ case NeedsConfirming of
+ true -> gb_sets:add(Guid, State2#vqstate.need_acking);
+ false -> State2#vqstate.need_acking
+ end }}.
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_on_disk = true }, MSCState) ->