diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-24 18:06:15 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-24 18:06:15 +0100 |
| commit | 183ba68a125927970814f70c275d4e49c59be5d9 (patch) | |
| tree | d62913bf4ef63663af9a3a88aa699262766775f6 /src | |
| parent | 186ae813835218bf5c3c5fb1aff2658a745a29b6 (diff) | |
| download | rabbitmq-server-git-183ba68a125927970814f70c275d4e49c59be5d9.tar.gz | |
Removed is_persistent from tx_tracking. Removed auto_ack_next_message as it wasn't needed as the one case where it was being used in mq was wrong. And some cosmetic stuff too.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 35 |
3 files changed, 22 insertions, 55 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6d742b7ad9..c65c65edbc 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -63,7 +63,7 @@ -record(consumer, {tag, ack_required}). --record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}). +-record(tx, {ch_pid, pending_messages, pending_acks}). %% These are held in our process dictionary -record(cr, {consumer_count, @@ -453,7 +453,6 @@ qname(#q{q = #amqqueue{name = QName}}) -> QName. lookup_tx(Txn) -> case get({txn, Txn}) of undefined -> #tx{ch_pid = none, - is_persistent = false, pending_messages = [], pending_acks = []}; V -> V @@ -471,14 +470,10 @@ all_tx_record() -> all_tx() -> [Txn || {{txn, Txn}, _} <- get()]. -record_pending_message(Txn, ChPid, Message = - #basic_message { is_persistent = IsPersistent }) -> - Tx = #tx{pending_messages = Pending, is_persistent = IsPersistentTxn } = - lookup_tx(Txn), +record_pending_message(Txn, ChPid, Message) -> + Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx #tx { pending_messages = [Message | Pending], - is_persistent = IsPersistentTxn orelse IsPersistent - }). + store_tx(Txn, Tx #tx { pending_messages = [Message | Pending] }). record_pending_acks(Txn, ChPid, MsgIds) -> Tx = #tx{pending_acks = Pending} = lookup_tx(Txn), diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index b4e6b8b106..70d448451a 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -39,11 +39,10 @@ terminate/2, code_change/3]). -export([handle_pre_hibernate/1]). --export([publish/3, fetch/1, phantom_fetch/1, ack/2, - tx_publish/1, tx_commit/3, tx_cancel/1, - requeue/2, purge/1, delete_queue/1, - delete_non_durable_queues/1, auto_ack_next_message/1, - requeue_next_n/2, len/1, foldl/3, prefetch/1 +-export([publish/3, fetch/1, phantom_fetch/1, ack/2, tx_publish/1, tx_commit/3, + tx_cancel/1, requeue/2, purge/1, delete_queue/1, + delete_non_durable_queues/1, requeue_next_n/2, len/1, foldl/3, + prefetch/1 ]). -export([filesync/0, cache_info/0]). @@ -264,7 +263,6 @@ {msg_id(), boolean(), boolean(), ack_tag(), non_neg_integer()})). -spec(prefetch/1 :: (queue_name()) -> 'ok'). -spec(ack/2 :: (queue_name(), [ack_tag()]) -> 'ok'). --spec(auto_ack_next_message/1 :: (queue_name()) -> 'ok'). -spec(tx_publish/1 :: (message()) -> 'ok'). -spec(tx_commit/3 :: (queue_name(), [{msg_id(), boolean()}], [ack_tag()]) -> 'ok'). @@ -308,9 +306,6 @@ prefetch(Q) -> ack(Q, MsgSeqIds) when is_list(MsgSeqIds) -> gen_server2:cast(?SERVER, {ack, Q, MsgSeqIds}). -auto_ack_next_message(Q) -> - gen_server2:cast(?SERVER, {auto_ack_next_message, Q}). - tx_publish(Message = #basic_message {}) -> gen_server2:cast(?SERVER, {tx_publish, Message}). @@ -510,9 +505,6 @@ handle_cast({publish, Q, Message, IsDelivered}, State) -> handle_cast({ack, Q, MsgSeqIds}, State) -> {ok, State1} = internal_ack(Q, MsgSeqIds, State), noreply(State1); -handle_cast({auto_ack_next_message, Q}, State) -> - {ok, State1} = internal_auto_ack(Q, State), - noreply(State1); handle_cast({tx_publish, Message}, State) -> {ok, State1} = internal_tx_publish(Message, State), noreply(State1); @@ -940,15 +932,6 @@ internal_foldl(Q, WriteSeqId, Fun, State, Acc, ReadSeqId) -> Acc1 = Fun(Message, AckTag, IsDelivered, Acc), internal_foldl(Q, WriteSeqId, Fun, State1, Acc1, ReadSeqId + 1). -internal_auto_ack(Q, State) -> - case internal_fetch_attributes(Q, ignore_delivery, pop_queue, State) of - {ok, empty, State1} -> - {ok, State1}; - {ok, {_MsgId, _IsPersistent, _IsDelivered, AckTag, _Remaining}, - State1} -> - remove_messages(Q, [AckTag], true, State1) - end. - internal_ack(Q, MsgSeqIds, State) -> remove_messages(Q, MsgSeqIds, true, State). @@ -1950,8 +1933,8 @@ read_next_file_entry(FileHdl, Offset) -> eof; KO -> KO end; - KO -> KO + Other -> Other end end; - KO -> KO + Other -> Other end. diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 771a920f87..9ead773de9 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -311,32 +311,21 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, State = %% Assumption here is that the queue is empty already (only called via %% attempt_immediate_delivery). -publish_delivered(Msg = - #basic_message { guid = MsgId, is_persistent = IsPersistent}, - State = - #mqstate { mode = Mode, is_durable = IsDurable, - queue = Q, length = 0 }) - when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> +publish_delivered(Msg = #basic_message { guid = MsgId, + is_persistent = IsPersistent}, + State = #mqstate { is_durable = IsDurable, queue = Q, + length = 0 }) + when IsDurable andalso IsPersistent -> ok = rabbit_disk_queue:publish(Q, Msg, true), MsgSize = size_of_message(Msg), State1 = gain_memory(MsgSize, State), - case IsDurable andalso IsPersistent of - true -> - %% must call phantom_fetch otherwise the msg remains at - %% the head of the queue. This is synchronous, but - %% unavoidable as we need the AckTag - {MsgId, IsPersistent, true, AckTag, 0} = - rabbit_disk_queue:phantom_fetch(Q), - {ok, AckTag, State1}; - false -> - %% in this case, we don't actually care about the ack, so - %% auto ack it (asynchronously). - ok = rabbit_disk_queue:auto_ack_next_message(Q), - {ok, noack, State1} - end; -publish_delivered(Msg, State = #mqstate { mode = mixed, length = 0 }) -> - MsgSize = size_of_message(Msg), - {ok, noack, gain_memory(MsgSize, State)}. + %% must call phantom_fetch otherwise the msg remains at the head + %% of the queue. This is synchronous, but unavoidable as we need + %% the AckTag + {MsgId, IsPersistent, true, AckTag, 0} = rabbit_disk_queue:phantom_fetch(Q), + {ok, AckTag, State1}; +publish_delivered(Msg, State = #mqstate { length = 0 }) -> + {ok, noack, gain_memory(size_of_message(Msg), State)}. fetch(State = #mqstate { length = 0 }) -> {empty, State}; |
