summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl18
-rw-r--r--src/rabbit_invariable_queue.erl12
-rw-r--r--src/rabbit_queue_index.erl10
-rw-r--r--src/rabbit_tests.erl12
-rw-r--r--src/rabbit_variable_queue.erl26
6 files changed, 40 insertions, 40 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ec25a871d4..60e3709444 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -311,7 +311,7 @@ check_declare_arguments(QueueName, Args) ->
"invalid arg '~s' for ~s: ~w",
[Key, rabbit_misc:rs(QueueName), Error])
end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1},
- {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]],
+ {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]],
ok.
check_expires_argument(undefined) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 429f664498..6c420ed85c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -446,16 +446,16 @@ requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
fun (BQS) -> BQ:requeue(AckTags, MsgPropsFun, BQS) end, State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+ backing_queue = BQ}) ->
case BQ:fetch(AckRequired, BQS) of
{empty, BQS1} -> {empty, State#q{backing_queue_state = BQS1}};
{{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} ->
- case msg_expired(MsgProperties) of
- true ->
- fetch(AckRequired, State#q{backing_queue_state = BQS1});
- false ->
- {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}}
- end
+ case msg_expired(MsgProperties) of
+ true ->
+ fetch(AckRequired, State#q{backing_queue_state = BQS1});
+ false ->
+ {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}}
+ end
end.
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
@@ -581,7 +581,7 @@ msg_expired(_MsgProperties = #msg_properties{expiry=Expiry}) ->
reset_msg_expiry_fun(State) ->
fun(MsgProps) ->
- MsgProps#msg_properties{expiry=calculate_msg_expiry(State)}
+ MsgProps#msg_properties{expiry=calculate_msg_expiry(State)}
end.
new_msg_properties(State) ->
@@ -591,7 +591,7 @@ calculate_msg_expiry(_State = #q{ttl = undefined}) ->
undefined;
calculate_msg_expiry(_State = #q{ttl = Ttl}) ->
Now = timer:now_diff(now(), {0,0,0}),
- Now + (Ttl * 1000).
+ Now + (Ttl * 1000).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index 3eea7becce..14afd76710 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -111,7 +111,7 @@ publish(Msg, MsgProps, State = #iv_state { queue = Q,
publish_delivered(false, _Msg, _MsgProps, State) ->
{blank_ack, State};
publish_delivered(true, Msg = #basic_message { guid = Guid },
- MsgProps,
+ MsgProps,
State = #iv_state { qname = QName, durable = IsDurable,
len = 0, pending_ack = PA }) ->
ok = persist_message(QName, IsDurable, none, Msg),
@@ -165,7 +165,7 @@ tx_rollback(Txn, State = #iv_state { qname = QName }) ->
{lists:flatten(AckTags), State}.
tx_commit(Txn, Fun, MsgPropsFun,
- State = #iv_state { qname = QName, pending_ack = PA,
+ State = #iv_state { qname = QName, pending_ack = PA,
queue = Q, len = Len }) ->
#tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn),
ok = do_if_persistent(fun rabbit_persister:commit_transaction/1,
@@ -175,8 +175,8 @@ tx_commit(Txn, Fun, MsgPropsFun,
AckTags1 = lists:flatten(AckTags),
PA1 = remove_acks(AckTags1, PA),
{Q1, Len1} = lists:foldr(fun ({Msg, MsgProps}, {QN, LenN}) ->
- MsgProps1 = MsgPropsFun(MsgProps),
- QN = enqueue(Msg, MsgProps1, false, Q),
+ MsgProps1 = MsgPropsFun(MsgProps),
+ QN = enqueue(Msg, MsgProps1, false, Q),
{QN, LenN + 1}
end, {Q, Len}, PubsRev),
{AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}.
@@ -195,8 +195,8 @@ requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, queue = Q,
{Q1, Len1} = lists:foldl(
fun (Guid, {QN, LenN}) ->
{ok, {Msg = #basic_message {}, MsgProps}}
- = dict:find(Guid, PA),
- MsgProps1 = MsgPropsFun(MsgProps),
+ = dict:find(Guid, PA),
+ MsgProps1 = MsgPropsFun(MsgProps),
{enqueue(Msg, MsgProps1, true, QN), LenN + 1}
end, {Q, Len}, AckTags),
PA1 = remove_acks(AckTags, PA),
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index b5e92dcaad..ed04c1e145 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -204,14 +204,14 @@
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
-spec(publish/5 :: (rabbit_guid:guid(), seq_id(), msg_properties(),
- boolean(), qistate()) -> qistate()).
+ boolean(), qistate()) -> qistate()).
-spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(sync/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(flush/1 :: (qistate()) -> qistate()).
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
{[{rabbit_guid:guid(), seq_id(), msg_properties(),
- boolean(), boolean()}], qistate()}).
+ boolean(), boolean()}], qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
@@ -259,8 +259,8 @@ publish(Guid, SeqId, MsgProperties, IsPersistent, State)
true -> ?PUB_PERSIST_JPREFIX;
false -> ?PUB_TRANS_JPREFIX
end):?JPREFIX_BITS,
- SeqId:?SEQ_BITS>>,
- create_pub_record_body(Guid, MsgProperties)]),
+ SeqId:?SEQ_BITS>>,
+ create_pub_record_body(Guid, MsgProperties)]),
maybe_flush_journal(add_to_journal(SeqId, {Guid, MsgProperties, IsPersistent}, State1)).
deliver(SeqIds, State) ->
@@ -540,7 +540,7 @@ read_pub_record_body(Hdl) ->
X -> X
end,
{Guid, #msg_properties{expiry = Exp}}.
-
+
%%----------------------------------------------------------------------------
%% journal manipulation
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index fdead8f9e0..a5059f8738 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1661,11 +1661,11 @@ verify_read_with_published(_Delivered, _Persistent, _Read, _Published) ->
test_queue_index_props() ->
with_empty_test_queue(
fun(Qi0) ->
- Guid = rabbit_guid:guid(),
- Props = #msg_properties{expiry=12345},
- Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0),
- {[{Guid, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1),
- Qi2
+ Guid = rabbit_guid:guid(),
+ Props = #msg_properties{expiry=12345},
+ Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0),
+ {[{Guid, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1),
+ Qi2
end),
ok = rabbit_variable_queue:stop(),
@@ -1814,7 +1814,7 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
lists:foldl(fun (N, {VQN, AckTagsAcc}) ->
Rem = Len - N,
{{#basic_message { is_persistent = IsPersistent },
- _Props, IsDelivered, AckTagN, Rem}, VQM} =
+ _Props, IsDelivered, AckTagN, Rem}, VQM} =
rabbit_variable_queue:fetch(true, VQN),
{VQM, [AckTagN | AckTagsAcc]}
end, {VQ, []}, lists:seq(1, Count)).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ddcf958f8c..5ffd6b61f7 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -498,7 +498,7 @@ publish(Msg, MsgProperties, State) ->
publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) ->
{blank_ack, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
- MsgProps,
+ MsgProps,
State = #vqstate { len = 0,
next_seq_id = SeqId,
out_counter = OutCount,
@@ -636,9 +636,9 @@ requeue(AckTags, MsgPropsFun, State) ->
a(reduce_memory_use(
ack(fun rabbit_msg_store:release/2,
fun (#msg_status { msg = Msg,
- msg_properties = MsgProperties }, State1) ->
+ msg_properties = MsgProperties }, State1) ->
{_SeqId, State2} =
- publish(Msg, MsgPropsFun(MsgProperties), true, false, State1),
+ publish(Msg, MsgPropsFun(MsgProperties), true, false, State1),
State2;
({IsPersistent, Guid, MsgProperties}, State1) ->
#vqstate { msg_store_clients = MSCState } = State1,
@@ -646,7 +646,7 @@ requeue(AckTags, MsgPropsFun, State) ->
read_from_msg_store(MSCState, IsPersistent, Guid),
State2 = State1 #vqstate { msg_store_clients = MSCState1 },
{_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProperties),
- true, true, State2),
+ true, true, State2),
State3
end,
AckTags, State))).
@@ -798,7 +798,7 @@ msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, MsgPropert
#msg_status { seq_id = SeqId, guid = Guid, msg = Msg,
is_persistent = IsPersistent, is_delivered = false,
msg_on_disk = false, index_on_disk = false,
- msg_properties = MsgProperties }.
+ msg_properties = MsgProperties }.
find_msg_store(true) -> ?PERSISTENT_MSG_STORE;
find_msg_store(false) -> ?TRANSIENT_MSG_STORE.
@@ -1010,7 +1010,7 @@ remove_queue_entries1(
%%----------------------------------------------------------------------------
publish(Msg = #basic_message { is_persistent = IsPersistent },
- MsgProperties, IsDelivered, MsgOnDisk,
+ MsgProperties, IsDelivered, MsgOnDisk,
State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
next_seq_id = SeqId,
len = Len,
@@ -1021,7 +1021,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent },
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProperties))
#msg_status { is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk},
+ msg_on_disk = MsgOnDisk},
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case bpqueue:is_empty(Q3) of
false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) };
@@ -1060,18 +1060,18 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
{MsgStatus, IndexState};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
- guid = Guid,
+ guid = Guid,
seq_id = SeqId,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
- msg_properties = MsgProperties},
- IndexState)
+ msg_properties = MsgProperties},
+ IndexState)
when Force orelse IsPersistent ->
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
IndexState1 = rabbit_queue_index:publish(Guid,
- SeqId,
- MsgProperties,
- IsPersistent,
+ SeqId,
+ MsgProperties,
+ IsPersistent,
IndexState),
{MsgStatus #msg_status { index_on_disk = true },
maybe_write_delivered(IsDelivered, SeqId, IndexState1)};