diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-20 11:11:08 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-20 11:11:08 +0100 |
| commit | 06c83c2e075826a0391b627d8f09b3968a4200fc (patch) | |
| tree | cc58e65706b136606a953458437bf62e85dac289 /src | |
| parent | e4d5badbb083140a759c5cd8096a047d4be1640a (diff) | |
| download | rabbitmq-server-git-06c83c2e075826a0391b627d8f09b3968a4200fc.tar.gz | |
> You should just *replace* to_{mixed,disk_only_mode}_mode with set_mode, not make the latter a wrapper for the former.
Done.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 67 |
1 files changed, 30 insertions, 37 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 2b25ab0fac..2e67735f1b 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -121,15 +121,9 @@ size_of_message( set_mode(Mode, _TxnMessages, State = #mqstate { mode = Mode }) -> {ok, State}; -set_mode(disk, TxnMessages, State) -> - to_disk_only_mode(TxnMessages, State); -set_mode(mixed, TxnMessages, State) -> - to_mixed_mode(TxnMessages, State). - -to_disk_only_mode(TxnMessages, State = - #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, - is_durable = IsDurable, prefetcher = Prefetcher - }) -> +set_mode(disk, TxnMessages, State = + #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, + is_durable = IsDurable, prefetcher = Prefetcher }) -> rabbit_log:info("Converting queue to disk only mode: ~p~n", [Q]), State1 = State #mqstate { mode = disk }, {MsgBuf1, State2} = @@ -164,7 +158,33 @@ to_disk_only_mode(TxnMessages, State = end end, TxnMessages), garbage_collect(), - {ok, State2 #mqstate { msg_buf = MsgBuf3, prefetcher = undefined }}. + {ok, State2 #mqstate { msg_buf = MsgBuf3, prefetcher = undefined }}; +set_mode(mixed, TxnMessages, State = #mqstate { mode = disk, queue = Q, + is_durable = IsDurable }) -> + rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]), + %% The queue has a token just saying how many msgs are on disk + %% (this is already built for us when in disk mode). + %% Don't actually do anything to the disk + %% Don't start prefetcher just yet because the queue maybe busy - + %% wait for hibernate timeout in the amqqueue_process. + + %% Remove txn messages from disk which are neither persistent and + %% durable. This is necessary to avoid leaks. This is also pretty + %% much the inverse behaviour of our own tx_cancel/2 which is why + %% we're not using it. + Cancel = + lists:foldl( + fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) -> + case IsDurable andalso IsPersistent of + true -> Acc; + false -> [Msg #basic_message.guid | Acc] + end + end, [], TxnMessages), + ok = if Cancel == [] -> ok; + true -> rabbit_disk_queue:tx_cancel(Cancel) + end, + garbage_collect(), + {ok, State #mqstate { mode = mixed }}. send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount, Commit, MsgBuf) -> @@ -223,33 +243,6 @@ flush_requeue_to_disk_queue(Q, RequeueCount, Commit) -> ok = rabbit_disk_queue:requeue_next_n(Q, RequeueCount), []. -to_mixed_mode(TxnMessages, State = #mqstate { mode = disk, queue = Q, - is_durable = IsDurable }) -> - rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]), - %% The queue has a token just saying how many msgs are on disk - %% (this is already built for us when in disk mode). - %% Don't actually do anything to the disk - %% Don't start prefetcher just yet because the queue maybe busy - - %% wait for hibernate timeout in the amqqueue_process. - - %% Remove txn messages from disk which are neither persistent and - %% durable. This is necessary to avoid leaks. This is also pretty - %% much the inverse behaviour of our own tx_cancel/2 which is why - %% we're not using it. - Cancel = - lists:foldl( - fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) -> - case IsDurable andalso IsPersistent of - true -> Acc; - false -> [Msg #basic_message.guid | Acc] - end - end, [], TxnMessages), - ok = if Cancel == [] -> ok; - true -> rabbit_disk_queue:tx_cancel(Cancel) - end, - garbage_collect(), - {ok, State #mqstate { mode = mixed }}. - gain_memory(Inc, State = #mqstate { memory_size = QSize, memory_gain = Gain }) -> State #mqstate { memory_size = QSize + Inc, |
