diff options
| author | Rob Harrop <rharrop@vmware.com> | 2010-09-21 17:17:33 +0100 |
|---|---|---|
| committer | Rob Harrop <rharrop@vmware.com> | 2010-09-21 17:17:33 +0100 |
| commit | b6356f82afd07d7c5533d6184d9a26672ab0dc9f (patch) | |
| tree | d61d66dd47d9e683e4aff2f8fd040a3f68b2d8e6 /src | |
| parent | de44a2b9771e37956dcaf30b3860287e5adda331 (diff) | |
| download | rabbitmq-server-git-b6356f82afd07d7c5533d6184d9a26672ab0dc9f.tar.gz | |
cosmetic changes, alignment etc
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 42 |
2 files changed, 33 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1df0c054cd..f895385210 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -160,8 +160,8 @@ init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> init_ttl(State = #q{q = #amqqueue{arguments = Arguments}}) -> case rabbit_misc:table_lookup(Arguments, <<"x-message-ttl">>) of - {_Type, Ttl} -> State#q{ttl=Ttl}; - undefined -> State + {_Type, Ttl} -> State#q{ttl = Ttl}; + undefined -> State end. declare(Recover, From, @@ -444,9 +444,10 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> - MsgPropsFun = reset_msg_expiry_fun(State), maybe_run_queue_via_backing_queue( - fun (BQS) -> BQ:requeue(AckTags, MsgPropsFun, BQS) end, State). + fun (BQS) -> + BQ:requeue(AckTags, reset_msg_expiry_fun(State), BQS) + end, State). fetch(AckRequired, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> @@ -454,7 +455,7 @@ fetch(AckRequired, State = #q{backing_queue_state = BQS, {empty, BQS1} -> {empty, State#q{backing_queue_state = BQS1}}; {{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} -> case msg_expired(MsgProperties) of - true -> + true -> fetch(AckRequired, State#q{backing_queue_state = BQS1}); false -> {{Message, IsDelivered, AckTag, Remaining}, @@ -748,11 +749,12 @@ handle_call({basic_get, ChPid, NoAck}, _From, AckRequired = not NoAck, State1 = ensure_expiry_timer(State), case fetch(AckRequired, State1) of - {empty, State2} -> reply(empty, State2); + {empty, State2} -> + reply(empty, State2); {{Message, IsDelivered, AckTag, Remaining}, State2} -> case AckRequired of - true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), - store_ch_record( + true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), + store_ch_record( C#cr{acktags = sets:add_element(AckTag, ChAckTags)}); false -> ok end, diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 2993b325ce..d317b55c68 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -100,10 +100,10 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, ok = persist_acks(QName, IsDurable, none, AckTags, PA), {Len, State #iv_state { len = 0, queue = queue:new() }}. -publish(Msg, MsgProps, State = #iv_state { queue = Q, - qname = QName, - durable = IsDurable, - len = Len }) -> +publish(Msg, MsgProps, State = #iv_state { queue = Q, + qname = QName, + durable = IsDurable, + len = Len }) -> ok = persist_message(QName, IsDurable, none, Msg), Q1 = enqueue(Msg, MsgProps, false, Q), State #iv_state { queue = Q1, len = Len + 1 }. @@ -120,8 +120,10 @@ publish_delivered(true, Msg = #basic_message { guid = Guid }, fetch(_AckRequired, State = #iv_state { len = 0 }) -> {empty, State}; -fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName, - durable = IsDurable, +fetch(AckRequired, State = #iv_state { len = Len, + queue = Q, + qname = QName, + durable = IsDurable, pending_ack = PA }) -> {{value, {Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered}}, Q1} = queue:out(Q), @@ -164,9 +166,10 @@ tx_rollback(Txn, State = #iv_state { qname = QName }) -> erase_tx(Txn), {lists:flatten(AckTags), State}. -tx_commit(Txn, Fun, MsgPropsFun, - State = #iv_state { qname = QName, pending_ack = PA, - queue = Q, len = Len }) -> +tx_commit(Txn, Fun, MsgPropsFun, State = #iv_state { qname = QName, + pending_ack = PA, + queue = Q, + len = Len }) -> #tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn), ok = do_if_persistent(fun rabbit_persister:commit_transaction/1, Txn, QName), @@ -175,14 +178,15 @@ tx_commit(Txn, Fun, MsgPropsFun, AckTags1 = lists:flatten(AckTags), PA1 = remove_acks(AckTags1, PA), {Q1, Len1} = lists:foldr(fun ({Msg, MsgProps}, {QN, LenN}) -> - MsgProps1 = MsgPropsFun(MsgProps), - QM = enqueue(Msg, MsgProps1, false, QN), - {QM, LenN + 1} + {enqueue(Msg, MsgPropsFun(MsgProps), + false, QN), + LenN + 1} end, {Q, Len}, PubsRev), {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}. -requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, queue = Q, - len = Len }) -> +requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, + queue = Q, + len = Len }) -> %% We don't need to touch the persister here - the persister will %% already have these messages published and delivered as %% necessary. The complication is that the persister's seq_id will @@ -194,10 +198,10 @@ requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, queue = Q, %% order to the last known state of our queue, prior to shutdown. {Q1, Len1} = lists:foldl( fun (Guid, {QN, LenN}) -> - {ok, {Msg = #basic_message {}, MsgProps}} - = dict:find(Guid, PA), - MsgProps1 = MsgPropsFun(MsgProps), - {enqueue(Msg, MsgProps1, true, QN), LenN + 1} + {Msg = #basic_message {}, MsgProps} + = dict:fetch(Guid, PA), + {enqueue(Msg, MsgPropsFun(MsgProps), true, QN), + LenN + 1} end, {Q, Len}, AckTags), PA1 = remove_acks(AckTags, PA), State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }. @@ -225,7 +229,7 @@ status(_State) -> []. remove_acks(AckTags, PA) -> lists:foldl(fun dict:erase/2, PA, AckTags). -store_ack(Msg = #basic_message { guid = Guid}, MsgProps, PA) -> +store_ack(Msg = #basic_message { guid = Guid }, MsgProps, PA) -> dict:store(Guid, {Msg, MsgProps}, PA). %%---------------------------------------------------------------------------- |
