diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-04-26 18:44:10 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-04-26 18:44:10 +0100 |
| commit | 9f1cd2e1905daba7e15642f542fc23a22b32307a (patch) | |
| tree | 8ce985c068c262afbe1a6b15035445f48233b131 /src | |
| parent | 61474916926f6ea04457aa43b7e09ae6efb2b651 (diff) | |
| download | rabbitmq-server-git-9f1cd2e1905daba7e15642f542fc23a22b32307a.tar.gz | |
consistency, consistency
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 44 |
1 files changed, 21 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 10e1193f02..4579c3b530 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -138,8 +138,8 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -terminate_shutdown(Fun, State = - #q{backing_queue = BQ, backing_queue_state = BQS}) -> +terminate_shutdown(Fun, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> ok = rabbit_memory_monitor:deregister(self()), case BQS of undefined -> State; @@ -164,8 +164,7 @@ noreply(NewState) -> {NewState1, Timeout} = next_state(NewState), {noreply, NewState1, Timeout}. -next_state(State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> +next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> next_state1(ensure_rate_timer(State), BQ:sync_callback(BQS)). next_state1(State = #q{sync_timer_ref = undefined}, Fun) @@ -206,8 +205,8 @@ stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> {ok, cancel} = timer:cancel(TRef), State#q{sync_timer_ref = undefined, backing_queue_timeout_fun = undefined}. -assert_invariant(#q{active_consumers = AC, backing_queue_state = BQS, - backing_queue = BQ}) -> +assert_invariant(#q{active_consumers = AC, + backing_queue = BQ, backing_queue_state = BQS}) -> true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). lookup_ch(ChPid) -> @@ -321,15 +320,14 @@ deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty. deliver_from_queue_deliver(AckRequired, false, - State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> {{Message, IsDelivered, AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), {{Message, IsDelivered, AckTag}, 0 == Remaining, State #q { backing_queue_state = BQS1 }}. -run_message_queue(State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> +run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Funs = {fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3}, IsEmpty = BQ:is_empty(BQS), @@ -676,8 +674,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, end; handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, - backing_queue_state = BQS, backing_queue = BQ, + backing_queue_state = BQS, active_consumers = ActiveConsumers}) -> reply({ok, Name, BQ:len(BQS), queue:len(ActiveConsumers)}, State); @@ -695,9 +693,10 @@ handle_call({delete, IfUnused, IfEmpty}, _From, {stop, normal, {ok, Length}, State} end; -handle_call(purge, _From, State = #q{backing_queue = BQ}) -> - {Count, BQS} = BQ:purge(State#q.backing_queue_state), - reply({ok, Count}, State#q{backing_queue_state = BQS}); +handle_call(purge, _From, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {Count, BQS1} = BQ:purge(BQS), + reply({ok, Count}, State#q{backing_queue_state = BQS1}); handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, exclusive_consumer = Holder}) -> @@ -727,7 +726,7 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> handle_cast(init_backing_queue, State = #q{q = #amqqueue{name = QName, durable = IsDurable}, - backing_queue_state = undefined, backing_queue = BQ}) -> + backing_queue = BQ, backing_queue_state = undefined}) -> noreply(State#q{backing_queue_state = BQ:init(QName, IsDurable)}); handle_cast(init_backing_queue, State) -> @@ -738,8 +737,8 @@ handle_cast({deliver, Txn, Message, ChPid}, State) -> {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), noreply(NewState); -handle_cast({ack, Txn, AckTags, ChPid}, State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> +handle_cast({ack, Txn, AckTags, ChPid}, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case lookup_ch(ChPid) of not_found -> noreply(State); @@ -801,8 +800,8 @@ handle_cast({flush, ChPid}, State) -> ok = rabbit_channel:flushed(ChPid, self()), noreply(State); -handle_cast(update_ram_duration, State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> +handle_cast(update_ram_duration, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> {RamDuration, BQS1} = BQ:ram_duration(BQS), DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), RamDuration), @@ -811,8 +810,7 @@ handle_cast(update_ram_duration, State = #q{backing_queue_state = BQS, backing_queue_state = BQS2}); handle_cast({set_ram_duration_target, Duration}, - State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> BQS1 = BQ:set_ram_duration_target(Duration, BQS), noreply(State#q{backing_queue_state = BQS1}); @@ -853,8 +851,8 @@ handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. -handle_pre_hibernate(State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> +handle_pre_hibernate(State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> BQS1 = BQ:handle_pre_hibernate(BQS), %% no activity for a while == 0 egress and ingress rates DesiredDuration = |
