summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-02-10 09:01:48 +0000
committerMatthias Radestock <matthias@lshift.net>2010-02-10 09:01:48 +0000
commit2db11990abb85ce83c114dda76961b9e244b1af7 (patch)
tree2bd0bacb2fd26f61c8c3c27957a356fdc7429f9f
parent2bce382b64f7f666079236453eb80808ea86326c (diff)
downloadrabbitmq-server-git-2db11990abb85ce83c114dda76961b9e244b1af7.tar.gz
cosmetic
-rw-r--r--src/rabbit_amqqueue_process.erl33
1 files changed, 14 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b6650ddd2c..996fe52b39 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -111,18 +111,16 @@ init(Q = #amqqueue { name = QName }) ->
ok = rabbit_memory_monitor:register
(self(), {rabbit_amqqueue, set_queue_duration, [self()]}),
VQS = rabbit_variable_queue:init(QName),
- State = #q{q = Q,
- owner = none,
- exclusive_consumer = none,
- has_had_consumers = false,
- variable_queue_state = VQS,
- next_msg_id = 1,
- active_consumers = queue:new(),
- blocked_consumers = queue:new(),
- sync_timer_ref = undefined,
- rate_timer_ref = undefined
- },
- {ok, State, hibernate,
+ {ok, #q{q = Q,
+ owner = none,
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ variable_queue_state = VQS,
+ next_msg_id = 1,
+ active_consumers = queue:new(),
+ blocked_consumers = queue:new(),
+ sync_timer_ref = undefined,
+ rate_timer_ref = undefined}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, #q{variable_queue_state = VQS}) ->
@@ -293,11 +291,10 @@ deliver_msgs_to_consumers(
{ActiveConsumers1,
queue:in(QEntry, BlockedConsumers1)}
end,
- State2 = State1 #q {
+ State2 = State1#q{
active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedConsumers,
- next_msg_id = NextId + 1
- },
+ next_msg_id = NextId + 1},
deliver_msgs_to_consumers(Funs, FunAcc1, State2);
%% if IsMsgReady then we've hit the limiter
false when IsMsgReady ->
@@ -519,8 +516,8 @@ all_tx_record() ->
record_pending_message(Txn, ChPid, Message) ->
Tx = #tx{pending_messages = Pending} = lookup_tx(Txn),
record_current_channel_tx(ChPid, Txn),
- store_tx(Txn, Tx #tx { pending_messages = [Message | Pending],
- ch_pid = ChPid }).
+ store_tx(Txn, Tx#tx{pending_messages = [Message | Pending],
+ ch_pid = ChPid}).
record_pending_acks(Txn, ChPid, MsgIds) ->
Tx = #tx{pending_acks = Pending} = lookup_tx(Txn),
@@ -557,8 +554,6 @@ rollback_transaction(Txn, State) ->
erase_tx(Txn),
State #q { variable_queue_state = VQS }.
-%% {A, B} = collect_messages(C, D) %% A = C `intersect` D; B = D \\ C
-%% err, A = C `intersect` D , via projection through the dict that is C
collect_messages(MsgIds, UAM) ->
lists:mapfoldl(
fun (MsgId, D) -> {dict:fetch(MsgId, D), dict:erase(MsgId, D)} end,