diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-02-10 09:01:48 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-02-10 09:01:48 +0000 |
| commit | 2db11990abb85ce83c114dda76961b9e244b1af7 (patch) | |
| tree | 2bd0bacb2fd26f61c8c3c27957a356fdc7429f9f | |
| parent | 2bce382b64f7f666079236453eb80808ea86326c (diff) | |
| download | rabbitmq-server-git-2db11990abb85ce83c114dda76961b9e244b1af7.tar.gz | |
cosmetic
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 33 |
1 files changed, 14 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b6650ddd2c..996fe52b39 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -111,18 +111,16 @@ init(Q = #amqqueue { name = QName }) -> ok = rabbit_memory_monitor:register (self(), {rabbit_amqqueue, set_queue_duration, [self()]}), VQS = rabbit_variable_queue:init(QName), - State = #q{q = Q, - owner = none, - exclusive_consumer = none, - has_had_consumers = false, - variable_queue_state = VQS, - next_msg_id = 1, - active_consumers = queue:new(), - blocked_consumers = queue:new(), - sync_timer_ref = undefined, - rate_timer_ref = undefined - }, - {ok, State, hibernate, + {ok, #q{q = Q, + owner = none, + exclusive_consumer = none, + has_had_consumers = false, + variable_queue_state = VQS, + next_msg_id = 1, + active_consumers = queue:new(), + blocked_consumers = queue:new(), + sync_timer_ref = undefined, + rate_timer_ref = undefined}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, #q{variable_queue_state = VQS}) -> @@ -293,11 +291,10 @@ deliver_msgs_to_consumers( {ActiveConsumers1, queue:in(QEntry, BlockedConsumers1)} end, - State2 = State1 #q { + State2 = State1#q{ active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedConsumers, - next_msg_id = NextId + 1 - }, + next_msg_id = NextId + 1}, deliver_msgs_to_consumers(Funs, FunAcc1, State2); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> @@ -519,8 +516,8 @@ all_tx_record() -> record_pending_message(Txn, ChPid, Message) -> Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx #tx { pending_messages = [Message | Pending], - ch_pid = ChPid }). + store_tx(Txn, Tx#tx{pending_messages = [Message | Pending], + ch_pid = ChPid}). record_pending_acks(Txn, ChPid, MsgIds) -> Tx = #tx{pending_acks = Pending} = lookup_tx(Txn), @@ -557,8 +554,6 @@ rollback_transaction(Txn, State) -> erase_tx(Txn), State #q { variable_queue_state = VQS }. -%% {A, B} = collect_messages(C, D) %% A = C `intersect` D; B = D \\ C -%% err, A = C `intersect` D , via projection through the dict that is C collect_messages(MsgIds, UAM) -> lists:mapfoldl( fun (MsgId, D) -> {dict:fetch(MsgId, D), dict:erase(MsgId, D)} end, |
