summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-04-26 18:44:10 +0100
committerMatthias Radestock <matthias@lshift.net>2010-04-26 18:44:10 +0100
commit9f1cd2e1905daba7e15642f542fc23a22b32307a (patch)
tree8ce985c068c262afbe1a6b15035445f48233b131
parent61474916926f6ea04457aa43b7e09ae6efb2b651 (diff)
downloadrabbitmq-server-git-9f1cd2e1905daba7e15642f542fc23a22b32307a.tar.gz
consistency, consistency
-rw-r--r--src/rabbit_amqqueue_process.erl44
1 files changed, 21 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 10e1193f02..4579c3b530 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -138,8 +138,8 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-terminate_shutdown(Fun, State =
- #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+terminate_shutdown(Fun, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
ok = rabbit_memory_monitor:deregister(self()),
case BQS of
undefined -> State;
@@ -164,8 +164,7 @@ noreply(NewState) ->
{NewState1, Timeout} = next_state(NewState),
{noreply, NewState1, Timeout}.
-next_state(State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
next_state1(ensure_rate_timer(State), BQ:sync_callback(BQS)).
next_state1(State = #q{sync_timer_ref = undefined}, Fun)
@@ -206,8 +205,8 @@ stop_sync_timer(State = #q{sync_timer_ref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
State#q{sync_timer_ref = undefined, backing_queue_timeout_fun = undefined}.
-assert_invariant(#q{active_consumers = AC, backing_queue_state = BQS,
- backing_queue = BQ}) ->
+assert_invariant(#q{active_consumers = AC,
+ backing_queue = BQ, backing_queue_state = BQS}) ->
true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
lookup_ch(ChPid) ->
@@ -321,15 +320,14 @@ deliver_from_queue_pred(IsEmpty, _State) ->
not IsEmpty.
deliver_from_queue_deliver(AckRequired, false,
- State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
{{Message, IsDelivered, AckTag, Remaining}, BQS1} =
BQ:fetch(AckRequired, BQS),
{{Message, IsDelivered, AckTag}, 0 == Remaining,
State #q { backing_queue_state = BQS1 }}.
-run_message_queue(State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Funs = {fun deliver_from_queue_pred/2,
fun deliver_from_queue_deliver/3},
IsEmpty = BQ:is_empty(BQS),
@@ -676,8 +674,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end;
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
- backing_queue_state = BQS,
backing_queue = BQ,
+ backing_queue_state = BQS,
active_consumers = ActiveConsumers}) ->
reply({ok, Name, BQ:len(BQS), queue:len(ActiveConsumers)}, State);
@@ -695,9 +693,10 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
{stop, normal, {ok, Length}, State}
end;
-handle_call(purge, _From, State = #q{backing_queue = BQ}) ->
- {Count, BQS} = BQ:purge(State#q.backing_queue_state),
- reply({ok, Count}, State#q{backing_queue_state = BQS});
+handle_call(purge, _From, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {Count, BQS1} = BQ:purge(BQS),
+ reply({ok, Count}, State#q{backing_queue_state = BQS1});
handle_call({claim_queue, ReaderPid}, _From,
State = #q{owner = Owner, exclusive_consumer = Holder}) ->
@@ -727,7 +726,7 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
handle_cast(init_backing_queue,
State = #q{q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue_state = undefined, backing_queue = BQ}) ->
+ backing_queue = BQ, backing_queue_state = undefined}) ->
noreply(State#q{backing_queue_state = BQ:init(QName, IsDurable)});
handle_cast(init_backing_queue, State) ->
@@ -738,8 +737,8 @@ handle_cast({deliver, Txn, Message, ChPid}, State) ->
{_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
noreply(NewState);
-handle_cast({ack, Txn, AckTags, ChPid}, State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+handle_cast({ack, Txn, AckTags, ChPid},
+ State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
case lookup_ch(ChPid) of
not_found ->
noreply(State);
@@ -801,8 +800,8 @@ handle_cast({flush, ChPid}, State) ->
ok = rabbit_channel:flushed(ChPid, self()),
noreply(State);
-handle_cast(update_ram_duration, State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+handle_cast(update_ram_duration, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
{RamDuration, BQS1} = BQ:ram_duration(BQS),
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
@@ -811,8 +810,7 @@ handle_cast(update_ram_duration, State = #q{backing_queue_state = BQS,
backing_queue_state = BQS2});
handle_cast({set_ram_duration_target, Duration},
- State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+ State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
noreply(State#q{backing_queue_state = BQS1});
@@ -853,8 +851,8 @@ handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
-handle_pre_hibernate(State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+handle_pre_hibernate(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
BQS1 = BQ:handle_pre_hibernate(BQS),
%% no activity for a while == 0 egress and ingress rates
DesiredDuration =