diff options
| author | Rob Harrop <rharrop@vmware.com> | 2010-09-21 15:15:26 +0100 |
|---|---|---|
| committer | Rob Harrop <rharrop@vmware.com> | 2010-09-21 15:15:26 +0100 |
| commit | 4bce995542380040f1b376bf13aef92a5b954a47 (patch) | |
| tree | b13c40f0132e95e45285d16087cd7e719d0a2ca9 | |
| parent | 0f51b6de043eae6f2a505fd45242b7f6281c2d21 (diff) | |
| download | rabbitmq-server-git-4bce995542380040f1b376bf13aef92a5b954a47.tar.gz | |
cosmetic changes - line width
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 28 |
5 files changed, 47 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6c420ed85c..51ea4825ae 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -427,7 +427,8 @@ attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), MsgProperties = new_msg_properties(State), - {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, MsgProperties, BQS)}}. + {true, State#q{backing_queue_state = + BQ:tx_publish(Txn, Message, MsgProperties, BQS)}}. deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> case attempt_delivery(Txn, ChPid, Message, State) of @@ -436,7 +437,9 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> {false, NewState} -> %% Txn is none and no unblocked channels with consumers MsgProperties = new_msg_properties(State), - BQS = BQ:publish(Message, MsgProperties, State #q.backing_queue_state), + BQS = BQ:publish(Message, + MsgProperties, + State #q.backing_queue_state), {false, NewState#q{backing_queue_state = BQS}} end. @@ -454,7 +457,8 @@ fetch(AckRequired, State = #q{backing_queue_state = BQS, true -> fetch(AckRequired, State#q{backing_queue_state = BQS1}); false -> - {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}} + {{Message, IsDelivered, AckTag, Remaining}, + State#q{backing_queue_state = BQS1}} end end. @@ -553,8 +557,10 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - {AckTags, BQS1} = - BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, reset_msg_expiry_fun(State), BQS), + {AckTags, BQS1} = BQ:tx_commit(Txn, + fun () -> gen_server2:reply(From, ok) end, + reset_msg_expiry_fun(State), + BQS), %% ChPid must be known here because of the participant management %% by the channel. C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 14afd76710..2847136141 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -89,8 +89,8 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, %% We do not purge messages pending acks. {AckTags, PA} = rabbit_misc:queue_fold( - fun ({#basic_message { is_persistent = false }, _MsgProps, _IsDelivered}, - Acc) -> + fun ({#basic_message { is_persistent = false }, + _MsgProps, _IsDelivered}, Acc) -> Acc; ({Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered}, {AckTagsN, PAN}) -> diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index ed04c1e145..1af8dd7674 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -261,7 +261,8 @@ publish(Guid, SeqId, MsgProperties, IsPersistent, State) end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, create_pub_record_body(Guid, MsgProperties)]), - maybe_flush_journal(add_to_journal(SeqId, {Guid, MsgProperties, IsPersistent}, State1)). + maybe_flush_journal( + add_to_journal(SeqId, {Guid, MsgProperties, IsPersistent}, State1)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -457,7 +458,9 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack}, Segment1) -> + fun (RelSeq, + {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack}, + Segment1) -> recover_message(ContainsCheckFun(Guid), CleanShutdown, Del, RelSeq, Segment1) end, @@ -510,7 +513,9 @@ queue_index_walker_reader(QueueName, Gatherer) -> State = #qistate { segments = Segments, dir = Dir } = recover_journal(blank_state(QueueName, false)), [ok = segment_entries_foldr( - fun (_RelSeq, {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, ok) -> + fun (_RelSeq, + {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, + ok) -> gatherer:in(Gatherer, {Guid, 1}); (_RelSeq, _Value, Acc) -> Acc @@ -789,7 +794,9 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), {segment_entries_foldr( - fun (RelSeq, {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack}, Acc) + fun (RelSeq, + {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack}, + Acc) when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProperties, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index a5059f8738..08ae0d6caa 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1664,7 +1664,8 @@ test_queue_index_props() -> Guid = rabbit_guid:guid(), Props = #msg_properties{expiry=12345}, Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0), - {[{Guid, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1), + {[{Guid, 1, Props, _, _}], Qi2} = + rabbit_queue_index:read(1, 2, Qi1), Qi2 end), @@ -1877,7 +1878,8 @@ publish_fetch_and_ack(0, _Len, VQ0) -> VQ0; publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), - {{_Msg, _MsgProps, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), + {{_Msg, _MsgProps, false, AckTag, Len}, VQ2} = + rabbit_variable_queue:fetch(true, VQ1), publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)). test_variable_queue_partial_segments_delta_thing(VQ0) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 5ffd6b61f7..e0cae48df9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -638,7 +638,8 @@ requeue(AckTags, MsgPropsFun, State) -> fun (#msg_status { msg = Msg, msg_properties = MsgProperties }, State1) -> {_SeqId, State2} = - publish(Msg, MsgPropsFun(MsgProperties), true, false, State1), + publish(Msg, MsgPropsFun(MsgProperties), true, + false, State1), State2; ({IsPersistent, Guid, MsgProperties}, State1) -> #vqstate { msg_store_clients = MSCState } = State1, @@ -794,7 +795,8 @@ one_if(false) -> 0. cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. -msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, MsgProperties) -> +msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, + MsgProperties) -> #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, is_persistent = IsPersistent, is_delivered = false, msg_on_disk = false, index_on_disk = false, @@ -834,7 +836,8 @@ erase_tx(Txn) -> erase({txn, Txn}). persistent_guids(Pubs) -> [Guid || - {#basic_message { guid = Guid, is_persistent = true }, _MsgProps} <- Pubs]. + {#basic_message { guid = Guid, is_persistent = true }, + _MsgProps} <- Pubs]. betas_from_index_entries(List, TransientThreshold, IndexState) -> {Filtered, Delivers, Acks} = @@ -927,8 +930,9 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, case IsDurable of true -> [AckTag || AckTag <- AckTags, case dict:fetch(AckTag, PA) of - #msg_status {} -> false; - {IsPersistent, _Guid, _MsgProperties} -> IsPersistent + #msg_status {} -> false; + {IsPersistent, + _Guid, _MsgProps} -> IsPersistent end]; false -> [] end, @@ -960,10 +964,12 @@ tx_commit_index(State = #vqstate { on_sync = #sync { Pubs = lists:append(lists:reverse(SPubs)), {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( - fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProperties}, + fun ({Msg = #basic_message { is_persistent = IsPersistent }, + MsgProperties}, {SeqIdsAcc, State2}) -> IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State3} = publish(Msg, MsgProperties, false, IsPersistent1, State2), + {SeqId, State3} = + publish(Msg, MsgProperties, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} end, {PAcks, ack(Acks, State)}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), @@ -1094,7 +1100,9 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk, msg_properties = MsgProperties } = MsgStatus, PA) -> + msg_on_disk = MsgOnDisk, + msg_properties = MsgProperties } = MsgStatus, + PA) -> AckEntry = case MsgOnDisk of true -> {IsPersistent, Guid, MsgProperties}; false -> MsgStatus @@ -1149,7 +1157,9 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, index_on_disk = false }, Acc) -> Acc; -accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProperties}, {SeqIdsAcc, Dict}) -> +accumulate_ack(SeqId, + {IsPersistent, Guid, _MsgProperties}, + {SeqIdsAcc, Dict}) -> {cons_if(IsPersistent, SeqId, SeqIdsAcc), rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}. |
