summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl13
-rw-r--r--src/rabbit_disk_queue.erl29
-rw-r--r--src/rabbit_mixed_queue.erl35
3 files changed, 22 insertions, 55 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6d742b7ad9..c65c65edbc 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -63,7 +63,7 @@
-record(consumer, {tag, ack_required}).
--record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}).
+-record(tx, {ch_pid, pending_messages, pending_acks}).
%% These are held in our process dictionary
-record(cr, {consumer_count,
@@ -453,7 +453,6 @@ qname(#q{q = #amqqueue{name = QName}}) -> QName.
lookup_tx(Txn) ->
case get({txn, Txn}) of
undefined -> #tx{ch_pid = none,
- is_persistent = false,
pending_messages = [],
pending_acks = []};
V -> V
@@ -471,14 +470,10 @@ all_tx_record() ->
all_tx() ->
[Txn || {{txn, Txn}, _} <- get()].
-record_pending_message(Txn, ChPid, Message =
- #basic_message { is_persistent = IsPersistent }) ->
- Tx = #tx{pending_messages = Pending, is_persistent = IsPersistentTxn } =
- lookup_tx(Txn),
+record_pending_message(Txn, ChPid, Message) ->
+ Tx = #tx{pending_messages = Pending} = lookup_tx(Txn),
record_current_channel_tx(ChPid, Txn),
- store_tx(Txn, Tx #tx { pending_messages = [Message | Pending],
- is_persistent = IsPersistentTxn orelse IsPersistent
- }).
+ store_tx(Txn, Tx #tx { pending_messages = [Message | Pending] }).
record_pending_acks(Txn, ChPid, MsgIds) ->
Tx = #tx{pending_acks = Pending} = lookup_tx(Txn),
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index b4e6b8b106..70d448451a 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -39,11 +39,10 @@
terminate/2, code_change/3]).
-export([handle_pre_hibernate/1]).
--export([publish/3, fetch/1, phantom_fetch/1, ack/2,
- tx_publish/1, tx_commit/3, tx_cancel/1,
- requeue/2, purge/1, delete_queue/1,
- delete_non_durable_queues/1, auto_ack_next_message/1,
- requeue_next_n/2, len/1, foldl/3, prefetch/1
+-export([publish/3, fetch/1, phantom_fetch/1, ack/2, tx_publish/1, tx_commit/3,
+ tx_cancel/1, requeue/2, purge/1, delete_queue/1,
+ delete_non_durable_queues/1, requeue_next_n/2, len/1, foldl/3,
+ prefetch/1
]).
-export([filesync/0, cache_info/0]).
@@ -264,7 +263,6 @@
{msg_id(), boolean(), boolean(), ack_tag(), non_neg_integer()})).
-spec(prefetch/1 :: (queue_name()) -> 'ok').
-spec(ack/2 :: (queue_name(), [ack_tag()]) -> 'ok').
--spec(auto_ack_next_message/1 :: (queue_name()) -> 'ok').
-spec(tx_publish/1 :: (message()) -> 'ok').
-spec(tx_commit/3 :: (queue_name(), [{msg_id(), boolean()}], [ack_tag()]) ->
'ok').
@@ -308,9 +306,6 @@ prefetch(Q) ->
ack(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
gen_server2:cast(?SERVER, {ack, Q, MsgSeqIds}).
-auto_ack_next_message(Q) ->
- gen_server2:cast(?SERVER, {auto_ack_next_message, Q}).
-
tx_publish(Message = #basic_message {}) ->
gen_server2:cast(?SERVER, {tx_publish, Message}).
@@ -510,9 +505,6 @@ handle_cast({publish, Q, Message, IsDelivered}, State) ->
handle_cast({ack, Q, MsgSeqIds}, State) ->
{ok, State1} = internal_ack(Q, MsgSeqIds, State),
noreply(State1);
-handle_cast({auto_ack_next_message, Q}, State) ->
- {ok, State1} = internal_auto_ack(Q, State),
- noreply(State1);
handle_cast({tx_publish, Message}, State) ->
{ok, State1} = internal_tx_publish(Message, State),
noreply(State1);
@@ -940,15 +932,6 @@ internal_foldl(Q, WriteSeqId, Fun, State, Acc, ReadSeqId) ->
Acc1 = Fun(Message, AckTag, IsDelivered, Acc),
internal_foldl(Q, WriteSeqId, Fun, State1, Acc1, ReadSeqId + 1).
-internal_auto_ack(Q, State) ->
- case internal_fetch_attributes(Q, ignore_delivery, pop_queue, State) of
- {ok, empty, State1} ->
- {ok, State1};
- {ok, {_MsgId, _IsPersistent, _IsDelivered, AckTag, _Remaining},
- State1} ->
- remove_messages(Q, [AckTag], true, State1)
- end.
-
internal_ack(Q, MsgSeqIds, State) ->
remove_messages(Q, MsgSeqIds, true, State).
@@ -1950,8 +1933,8 @@ read_next_file_entry(FileHdl, Offset) ->
eof;
KO -> KO
end;
- KO -> KO
+ Other -> Other
end
end;
- KO -> KO
+ Other -> Other
end.
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 771a920f87..9ead773de9 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -311,32 +311,21 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, State =
%% Assumption here is that the queue is empty already (only called via
%% attempt_immediate_delivery).
-publish_delivered(Msg =
- #basic_message { guid = MsgId, is_persistent = IsPersistent},
- State =
- #mqstate { mode = Mode, is_durable = IsDurable,
- queue = Q, length = 0 })
- when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
+publish_delivered(Msg = #basic_message { guid = MsgId,
+ is_persistent = IsPersistent},
+ State = #mqstate { is_durable = IsDurable, queue = Q,
+ length = 0 })
+ when IsDurable andalso IsPersistent ->
ok = rabbit_disk_queue:publish(Q, Msg, true),
MsgSize = size_of_message(Msg),
State1 = gain_memory(MsgSize, State),
- case IsDurable andalso IsPersistent of
- true ->
- %% must call phantom_fetch otherwise the msg remains at
- %% the head of the queue. This is synchronous, but
- %% unavoidable as we need the AckTag
- {MsgId, IsPersistent, true, AckTag, 0} =
- rabbit_disk_queue:phantom_fetch(Q),
- {ok, AckTag, State1};
- false ->
- %% in this case, we don't actually care about the ack, so
- %% auto ack it (asynchronously).
- ok = rabbit_disk_queue:auto_ack_next_message(Q),
- {ok, noack, State1}
- end;
-publish_delivered(Msg, State = #mqstate { mode = mixed, length = 0 }) ->
- MsgSize = size_of_message(Msg),
- {ok, noack, gain_memory(MsgSize, State)}.
+ %% must call phantom_fetch otherwise the msg remains at the head
+ %% of the queue. This is synchronous, but unavoidable as we need
+ %% the AckTag
+ {MsgId, IsPersistent, true, AckTag, 0} = rabbit_disk_queue:phantom_fetch(Q),
+ {ok, AckTag, State1};
+publish_delivered(Msg, State = #mqstate { length = 0 }) ->
+ {ok, noack, gain_memory(size_of_message(Msg), State)}.
fetch(State = #mqstate { length = 0 }) ->
{empty, State};