summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rharrop@vmware.com>2010-09-21 15:15:26 +0100
committerRob Harrop <rharrop@vmware.com>2010-09-21 15:15:26 +0100
commit4bce995542380040f1b376bf13aef92a5b954a47 (patch)
treeb13c40f0132e95e45285d16087cd7e719d0a2ca9
parent0f51b6de043eae6f2a505fd45242b7f6281c2d21 (diff)
downloadrabbitmq-server-git-4bce995542380040f1b376bf13aef92a5b954a47.tar.gz
cosmetic changes - line width
-rw-r--r--src/rabbit_amqqueue_process.erl16
-rw-r--r--src/rabbit_invariable_queue.erl4
-rw-r--r--src/rabbit_queue_index.erl15
-rw-r--r--src/rabbit_tests.erl6
-rw-r--r--src/rabbit_variable_queue.erl28
5 files changed, 47 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6c420ed85c..51ea4825ae 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -427,7 +427,8 @@ attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
record_current_channel_tx(ChPid, Txn),
MsgProperties = new_msg_properties(State),
- {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, MsgProperties, BQS)}}.
+ {true, State#q{backing_queue_state =
+ BQ:tx_publish(Txn, Message, MsgProperties, BQS)}}.
deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
case attempt_delivery(Txn, ChPid, Message, State) of
@@ -436,7 +437,9 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
MsgProperties = new_msg_properties(State),
- BQS = BQ:publish(Message, MsgProperties, State #q.backing_queue_state),
+ BQS = BQ:publish(Message,
+ MsgProperties,
+ State #q.backing_queue_state),
{false, NewState#q{backing_queue_state = BQS}}
end.
@@ -454,7 +457,8 @@ fetch(AckRequired, State = #q{backing_queue_state = BQS,
true ->
fetch(AckRequired, State#q{backing_queue_state = BQS1});
false ->
- {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}}
+ {{Message, IsDelivered, AckTag, Remaining},
+ State#q{backing_queue_state = BQS1}}
end
end.
@@ -553,8 +557,10 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- {AckTags, BQS1} =
- BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, reset_msg_expiry_fun(State), BQS),
+ {AckTags, BQS1} = BQ:tx_commit(Txn,
+ fun () -> gen_server2:reply(From, ok) end,
+ reset_msg_expiry_fun(State),
+ BQS),
%% ChPid must be known here because of the participant management
%% by the channel.
C = #cr{acktags = ChAckTags} = lookup_ch(ChPid),
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index 14afd76710..2847136141 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -89,8 +89,8 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
%% We do not purge messages pending acks.
{AckTags, PA} =
rabbit_misc:queue_fold(
- fun ({#basic_message { is_persistent = false }, _MsgProps, _IsDelivered},
- Acc) ->
+ fun ({#basic_message { is_persistent = false },
+ _MsgProps, _IsDelivered}, Acc) ->
Acc;
({Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered},
{AckTagsN, PAN}) ->
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index ed04c1e145..1af8dd7674 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -261,7 +261,8 @@ publish(Guid, SeqId, MsgProperties, IsPersistent, State)
end):?JPREFIX_BITS,
SeqId:?SEQ_BITS>>,
create_pub_record_body(Guid, MsgProperties)]),
- maybe_flush_journal(add_to_journal(SeqId, {Guid, MsgProperties, IsPersistent}, State1)).
+ maybe_flush_journal(
+ add_to_journal(SeqId, {Guid, MsgProperties, IsPersistent}, State1)).
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).
@@ -457,7 +458,9 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
- fun (RelSeq, {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack}, Segment1) ->
+ fun (RelSeq,
+ {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack},
+ Segment1) ->
recover_message(ContainsCheckFun(Guid), CleanShutdown,
Del, RelSeq, Segment1)
end,
@@ -510,7 +513,9 @@ queue_index_walker_reader(QueueName, Gatherer) ->
State = #qistate { segments = Segments, dir = Dir } =
recover_journal(blank_state(QueueName, false)),
[ok = segment_entries_foldr(
- fun (_RelSeq, {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, ok) ->
+ fun (_RelSeq,
+ {{Guid, _MsgProps, true}, _IsDelivered, no_ack},
+ ok) ->
gatherer:in(Gatherer, {Guid, 1});
(_RelSeq, _Value, Acc) ->
Acc
@@ -789,7 +794,9 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
{Messages, Segments}, Dir) ->
Segment = segment_find_or_new(Seg, Dir, Segments),
{segment_entries_foldr(
- fun (RelSeq, {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack}, Acc)
+ fun (RelSeq,
+ {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack},
+ Acc)
when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
(Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
[ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProperties,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index a5059f8738..08ae0d6caa 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1664,7 +1664,8 @@ test_queue_index_props() ->
Guid = rabbit_guid:guid(),
Props = #msg_properties{expiry=12345},
Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0),
- {[{Guid, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1),
+ {[{Guid, 1, Props, _, _}], Qi2} =
+ rabbit_queue_index:read(1, 2, Qi1),
Qi2
end),
@@ -1877,7 +1878,8 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
VQ0;
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
- {{_Msg, _MsgProps, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
+ {{_Msg, _MsgProps, false, AckTag, Len}, VQ2} =
+ rabbit_variable_queue:fetch(true, VQ1),
publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 5ffd6b61f7..e0cae48df9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -638,7 +638,8 @@ requeue(AckTags, MsgPropsFun, State) ->
fun (#msg_status { msg = Msg,
msg_properties = MsgProperties }, State1) ->
{_SeqId, State2} =
- publish(Msg, MsgPropsFun(MsgProperties), true, false, State1),
+ publish(Msg, MsgPropsFun(MsgProperties), true,
+ false, State1),
State2;
({IsPersistent, Guid, MsgProperties}, State1) ->
#vqstate { msg_store_clients = MSCState } = State1,
@@ -794,7 +795,8 @@ one_if(false) -> 0.
cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
-msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, MsgProperties) ->
+msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid },
+ MsgProperties) ->
#msg_status { seq_id = SeqId, guid = Guid, msg = Msg,
is_persistent = IsPersistent, is_delivered = false,
msg_on_disk = false, index_on_disk = false,
@@ -834,7 +836,8 @@ erase_tx(Txn) -> erase({txn, Txn}).
persistent_guids(Pubs) ->
[Guid ||
- {#basic_message { guid = Guid, is_persistent = true }, _MsgProps} <- Pubs].
+ {#basic_message { guid = Guid, is_persistent = true },
+ _MsgProps} <- Pubs].
betas_from_index_entries(List, TransientThreshold, IndexState) ->
{Filtered, Delivers, Acks} =
@@ -927,8 +930,9 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun,
case IsDurable of
true -> [AckTag || AckTag <- AckTags,
case dict:fetch(AckTag, PA) of
- #msg_status {} -> false;
- {IsPersistent, _Guid, _MsgProperties} -> IsPersistent
+ #msg_status {} -> false;
+ {IsPersistent,
+ _Guid, _MsgProps} -> IsPersistent
end];
false -> []
end,
@@ -960,10 +964,12 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
Pubs = lists:append(lists:reverse(SPubs)),
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
lists:foldl(
- fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProperties},
+ fun ({Msg = #basic_message { is_persistent = IsPersistent },
+ MsgProperties},
{SeqIdsAcc, State2}) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- {SeqId, State3} = publish(Msg, MsgProperties, false, IsPersistent1, State2),
+ {SeqId, State3} =
+ publish(Msg, MsgProperties, false, IsPersistent1, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
end, {PAcks, ack(Acks, State)}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
@@ -1094,7 +1100,9 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk, msg_properties = MsgProperties } = MsgStatus, PA) ->
+ msg_on_disk = MsgOnDisk,
+ msg_properties = MsgProperties } = MsgStatus,
+ PA) ->
AckEntry = case MsgOnDisk of
true -> {IsPersistent, Guid, MsgProperties};
false -> MsgStatus
@@ -1149,7 +1157,9 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
index_on_disk = false }, Acc) ->
Acc;
-accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProperties}, {SeqIdsAcc, Dict}) ->
+accumulate_ack(SeqId,
+ {IsPersistent, Guid, _MsgProperties},
+ {SeqIdsAcc, Dict}) ->
{cons_if(IsPersistent, SeqId, SeqIdsAcc),
rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}.