diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 26 |
6 files changed, 40 insertions, 40 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ec25a871d4..60e3709444 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -311,7 +311,7 @@ check_declare_arguments(QueueName, Args) -> "invalid arg '~s' for ~s: ~w", [Key, rabbit_misc:rs(QueueName), Error]) end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1}, - {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]], + {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]], ok. check_expires_argument(undefined) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 429f664498..6c420ed85c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -446,16 +446,16 @@ requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> fun (BQS) -> BQ:requeue(AckTags, MsgPropsFun, BQS) end, State). fetch(AckRequired, State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> + backing_queue = BQ}) -> case BQ:fetch(AckRequired, BQS) of {empty, BQS1} -> {empty, State#q{backing_queue_state = BQS1}}; {{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} -> - case msg_expired(MsgProperties) of - true -> - fetch(AckRequired, State#q{backing_queue_state = BQS1}); - false -> - {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}} - end + case msg_expired(MsgProperties) of + true -> + fetch(AckRequired, State#q{backing_queue_state = BQS1}); + false -> + {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}} + end end. add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). @@ -581,7 +581,7 @@ msg_expired(_MsgProperties = #msg_properties{expiry=Expiry}) -> reset_msg_expiry_fun(State) -> fun(MsgProps) -> - MsgProps#msg_properties{expiry=calculate_msg_expiry(State)} + MsgProps#msg_properties{expiry=calculate_msg_expiry(State)} end. new_msg_properties(State) -> @@ -591,7 +591,7 @@ calculate_msg_expiry(_State = #q{ttl = undefined}) -> undefined; calculate_msg_expiry(_State = #q{ttl = Ttl}) -> Now = timer:now_diff(now(), {0,0,0}), - Now + (Ttl * 1000). + Now + (Ttl * 1000). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 3eea7becce..14afd76710 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -111,7 +111,7 @@ publish(Msg, MsgProps, State = #iv_state { queue = Q, publish_delivered(false, _Msg, _MsgProps, State) -> {blank_ack, State}; publish_delivered(true, Msg = #basic_message { guid = Guid }, - MsgProps, + MsgProps, State = #iv_state { qname = QName, durable = IsDurable, len = 0, pending_ack = PA }) -> ok = persist_message(QName, IsDurable, none, Msg), @@ -165,7 +165,7 @@ tx_rollback(Txn, State = #iv_state { qname = QName }) -> {lists:flatten(AckTags), State}. tx_commit(Txn, Fun, MsgPropsFun, - State = #iv_state { qname = QName, pending_ack = PA, + State = #iv_state { qname = QName, pending_ack = PA, queue = Q, len = Len }) -> #tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn), ok = do_if_persistent(fun rabbit_persister:commit_transaction/1, @@ -175,8 +175,8 @@ tx_commit(Txn, Fun, MsgPropsFun, AckTags1 = lists:flatten(AckTags), PA1 = remove_acks(AckTags1, PA), {Q1, Len1} = lists:foldr(fun ({Msg, MsgProps}, {QN, LenN}) -> - MsgProps1 = MsgPropsFun(MsgProps), - QN = enqueue(Msg, MsgProps1, false, Q), + MsgProps1 = MsgPropsFun(MsgProps), + QN = enqueue(Msg, MsgProps1, false, Q), {QN, LenN + 1} end, {Q, Len}, PubsRev), {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}. @@ -195,8 +195,8 @@ requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, queue = Q, {Q1, Len1} = lists:foldl( fun (Guid, {QN, LenN}) -> {ok, {Msg = #basic_message {}, MsgProps}} - = dict:find(Guid, PA), - MsgProps1 = MsgPropsFun(MsgProps), + = dict:find(Guid, PA), + MsgProps1 = MsgPropsFun(MsgProps), {enqueue(Msg, MsgProps1, true, QN), LenN + 1} end, {Q, Len}, AckTags), PA1 = remove_acks(AckTags, PA), diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index b5e92dcaad..ed04c1e145 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -204,14 +204,14 @@ -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). -spec(publish/5 :: (rabbit_guid:guid(), seq_id(), msg_properties(), - boolean(), qistate()) -> qistate()). + boolean(), qistate()) -> qistate()). -spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()). -spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). -spec(sync/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush/1 :: (qistate()) -> qistate()). -spec(read/3 :: (seq_id(), seq_id(), qistate()) -> {[{rabbit_guid:guid(), seq_id(), msg_properties(), - boolean(), boolean()}], qistate()}). + boolean(), boolean()}], qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). @@ -259,8 +259,8 @@ publish(Guid, SeqId, MsgProperties, IsPersistent, State) true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX end):?JPREFIX_BITS, - SeqId:?SEQ_BITS>>, - create_pub_record_body(Guid, MsgProperties)]), + SeqId:?SEQ_BITS>>, + create_pub_record_body(Guid, MsgProperties)]), maybe_flush_journal(add_to_journal(SeqId, {Guid, MsgProperties, IsPersistent}, State1)). deliver(SeqIds, State) -> @@ -540,7 +540,7 @@ read_pub_record_body(Hdl) -> X -> X end, {Guid, #msg_properties{expiry = Exp}}. - + %%---------------------------------------------------------------------------- %% journal manipulation %%---------------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index fdead8f9e0..a5059f8738 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1661,11 +1661,11 @@ verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> test_queue_index_props() -> with_empty_test_queue( fun(Qi0) -> - Guid = rabbit_guid:guid(), - Props = #msg_properties{expiry=12345}, - Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0), - {[{Guid, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1), - Qi2 + Guid = rabbit_guid:guid(), + Props = #msg_properties{expiry=12345}, + Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0), + {[{Guid, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1), + Qi2 end), ok = rabbit_variable_queue:stop(), @@ -1814,7 +1814,7 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> lists:foldl(fun (N, {VQN, AckTagsAcc}) -> Rem = Len - N, {{#basic_message { is_persistent = IsPersistent }, - _Props, IsDelivered, AckTagN, Rem}, VQM} = + _Props, IsDelivered, AckTagN, Rem}, VQM} = rabbit_variable_queue:fetch(true, VQN), {VQM, [AckTagN | AckTagsAcc]} end, {VQ, []}, lists:seq(1, Count)). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ddcf958f8c..5ffd6b61f7 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -498,7 +498,7 @@ publish(Msg, MsgProperties, State) -> publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) -> {blank_ack, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, - MsgProps, + MsgProps, State = #vqstate { len = 0, next_seq_id = SeqId, out_counter = OutCount, @@ -636,9 +636,9 @@ requeue(AckTags, MsgPropsFun, State) -> a(reduce_memory_use( ack(fun rabbit_msg_store:release/2, fun (#msg_status { msg = Msg, - msg_properties = MsgProperties }, State1) -> + msg_properties = MsgProperties }, State1) -> {_SeqId, State2} = - publish(Msg, MsgPropsFun(MsgProperties), true, false, State1), + publish(Msg, MsgPropsFun(MsgProperties), true, false, State1), State2; ({IsPersistent, Guid, MsgProperties}, State1) -> #vqstate { msg_store_clients = MSCState } = State1, @@ -646,7 +646,7 @@ requeue(AckTags, MsgPropsFun, State) -> read_from_msg_store(MSCState, IsPersistent, Guid), State2 = State1 #vqstate { msg_store_clients = MSCState1 }, {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProperties), - true, true, State2), + true, true, State2), State3 end, AckTags, State))). @@ -798,7 +798,7 @@ msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, MsgPropert #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, is_persistent = IsPersistent, is_delivered = false, msg_on_disk = false, index_on_disk = false, - msg_properties = MsgProperties }. + msg_properties = MsgProperties }. find_msg_store(true) -> ?PERSISTENT_MSG_STORE; find_msg_store(false) -> ?TRANSIENT_MSG_STORE. @@ -1010,7 +1010,7 @@ remove_queue_entries1( %%---------------------------------------------------------------------------- publish(Msg = #basic_message { is_persistent = IsPersistent }, - MsgProperties, IsDelivered, MsgOnDisk, + MsgProperties, IsDelivered, MsgOnDisk, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, len = Len, @@ -1021,7 +1021,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProperties)) #msg_status { is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk}, + msg_on_disk = MsgOnDisk}, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case bpqueue:is_empty(Q3) of false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; @@ -1060,18 +1060,18 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION {MsgStatus, IndexState}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { - guid = Guid, + guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_properties = MsgProperties}, - IndexState) + msg_properties = MsgProperties}, + IndexState) when Force orelse IsPersistent -> true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION IndexState1 = rabbit_queue_index:publish(Guid, - SeqId, - MsgProperties, - IsPersistent, + SeqId, + MsgProperties, + IsPersistent, IndexState), {MsgStatus #msg_status { index_on_disk = true }, maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; |
