summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRob Harrop <rharrop@vmware.com>2010-09-21 17:17:33 +0100
committerRob Harrop <rharrop@vmware.com>2010-09-21 17:17:33 +0100
commitb6356f82afd07d7c5533d6184d9a26672ab0dc9f (patch)
treed61d66dd47d9e683e4aff2f8fd040a3f68b2d8e6 /src
parentde44a2b9771e37956dcaf30b3860287e5adda331 (diff)
downloadrabbitmq-server-git-b6356f82afd07d7c5533d6184d9a26672ab0dc9f.tar.gz
cosmetic changes, alignment etc
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl18
-rw-r--r--src/rabbit_invariable_queue.erl42
2 files changed, 33 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 1df0c054cd..f895385210 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -160,8 +160,8 @@ init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
init_ttl(State = #q{q = #amqqueue{arguments = Arguments}}) ->
case rabbit_misc:table_lookup(Arguments, <<"x-message-ttl">>) of
- {_Type, Ttl} -> State#q{ttl=Ttl};
- undefined -> State
+ {_Type, Ttl} -> State#q{ttl = Ttl};
+ undefined -> State
end.
declare(Recover, From,
@@ -444,9 +444,10 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
- MsgPropsFun = reset_msg_expiry_fun(State),
maybe_run_queue_via_backing_queue(
- fun (BQS) -> BQ:requeue(AckTags, MsgPropsFun, BQS) end, State).
+ fun (BQS) ->
+ BQ:requeue(AckTags, reset_msg_expiry_fun(State), BQS)
+ end, State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
@@ -454,7 +455,7 @@ fetch(AckRequired, State = #q{backing_queue_state = BQS,
{empty, BQS1} -> {empty, State#q{backing_queue_state = BQS1}};
{{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} ->
case msg_expired(MsgProperties) of
- true ->
+ true ->
fetch(AckRequired, State#q{backing_queue_state = BQS1});
false ->
{{Message, IsDelivered, AckTag, Remaining},
@@ -748,11 +749,12 @@ handle_call({basic_get, ChPid, NoAck}, _From,
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
case fetch(AckRequired, State1) of
- {empty, State2} -> reply(empty, State2);
+ {empty, State2} ->
+ reply(empty, State2);
{{Message, IsDelivered, AckTag, Remaining}, State2} ->
case AckRequired of
- true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
- store_ch_record(
+ true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
+ store_ch_record(
C#cr{acktags = sets:add_element(AckTag, ChAckTags)});
false -> ok
end,
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index 2993b325ce..d317b55c68 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -100,10 +100,10 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
ok = persist_acks(QName, IsDurable, none, AckTags, PA),
{Len, State #iv_state { len = 0, queue = queue:new() }}.
-publish(Msg, MsgProps, State = #iv_state { queue = Q,
- qname = QName,
- durable = IsDurable,
- len = Len }) ->
+publish(Msg, MsgProps, State = #iv_state { queue = Q,
+ qname = QName,
+ durable = IsDurable,
+ len = Len }) ->
ok = persist_message(QName, IsDurable, none, Msg),
Q1 = enqueue(Msg, MsgProps, false, Q),
State #iv_state { queue = Q1, len = Len + 1 }.
@@ -120,8 +120,10 @@ publish_delivered(true, Msg = #basic_message { guid = Guid },
fetch(_AckRequired, State = #iv_state { len = 0 }) ->
{empty, State};
-fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName,
- durable = IsDurable,
+fetch(AckRequired, State = #iv_state { len = Len,
+ queue = Q,
+ qname = QName,
+ durable = IsDurable,
pending_ack = PA }) ->
{{value, {Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered}},
Q1} = queue:out(Q),
@@ -164,9 +166,10 @@ tx_rollback(Txn, State = #iv_state { qname = QName }) ->
erase_tx(Txn),
{lists:flatten(AckTags), State}.
-tx_commit(Txn, Fun, MsgPropsFun,
- State = #iv_state { qname = QName, pending_ack = PA,
- queue = Q, len = Len }) ->
+tx_commit(Txn, Fun, MsgPropsFun, State = #iv_state { qname = QName,
+ pending_ack = PA,
+ queue = Q,
+ len = Len }) ->
#tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn),
ok = do_if_persistent(fun rabbit_persister:commit_transaction/1,
Txn, QName),
@@ -175,14 +178,15 @@ tx_commit(Txn, Fun, MsgPropsFun,
AckTags1 = lists:flatten(AckTags),
PA1 = remove_acks(AckTags1, PA),
{Q1, Len1} = lists:foldr(fun ({Msg, MsgProps}, {QN, LenN}) ->
- MsgProps1 = MsgPropsFun(MsgProps),
- QM = enqueue(Msg, MsgProps1, false, QN),
- {QM, LenN + 1}
+ {enqueue(Msg, MsgPropsFun(MsgProps),
+ false, QN),
+ LenN + 1}
end, {Q, Len}, PubsRev),
{AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}.
-requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, queue = Q,
- len = Len }) ->
+requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA,
+ queue = Q,
+ len = Len }) ->
%% We don't need to touch the persister here - the persister will
%% already have these messages published and delivered as
%% necessary. The complication is that the persister's seq_id will
@@ -194,10 +198,10 @@ requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, queue = Q,
%% order to the last known state of our queue, prior to shutdown.
{Q1, Len1} = lists:foldl(
fun (Guid, {QN, LenN}) ->
- {ok, {Msg = #basic_message {}, MsgProps}}
- = dict:find(Guid, PA),
- MsgProps1 = MsgPropsFun(MsgProps),
- {enqueue(Msg, MsgProps1, true, QN), LenN + 1}
+ {Msg = #basic_message {}, MsgProps}
+ = dict:fetch(Guid, PA),
+ {enqueue(Msg, MsgPropsFun(MsgProps), true, QN),
+ LenN + 1}
end, {Q, Len}, AckTags),
PA1 = remove_acks(AckTags, PA),
State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }.
@@ -225,7 +229,7 @@ status(_State) -> [].
remove_acks(AckTags, PA) -> lists:foldl(fun dict:erase/2, PA, AckTags).
-store_ack(Msg = #basic_message { guid = Guid}, MsgProps, PA) ->
+store_ack(Msg = #basic_message { guid = Guid }, MsgProps, PA) ->
dict:store(Guid, {Msg, MsgProps}, PA).
%%----------------------------------------------------------------------------