summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-10-06 15:29:53 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-10-06 15:29:53 +0100
commit33d59cf16e7b83eb783de0a5b64f1509c9d6e588 (patch)
treec60f15e265bedb532557be235f994559af67c974 /src
parentc6f48bac39166c3a35f53a6c89e6717b129abe59 (diff)
downloadrabbitmq-server-git-33d59cf16e7b83eb783de0a5b64f1509c9d6e588.tar.gz
msg_properties -> message_properties in order to be consistent with message and basic_message (though within vq, we have plenty of msg_-prefixes so don't bother inside the msg_status record in there). Also, tidied up a lot of trailing whitespace
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl66
-rw-r--r--src/rabbit_persister.erl3
-rw-r--r--src/rabbit_queue_index.erl33
-rw-r--r--src/rabbit_tests.erl22
-rw-r--r--src/rabbit_types.erl6
-rw-r--r--src/rabbit_variable_queue.erl76
6 files changed, 104 insertions, 102 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 53b98490c2..91d3f586fb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -39,7 +39,7 @@
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
--define(BASE_MSG_PROPERTIES, #msg_properties{expiry = undefined}).
+-define(BASE_MESSAGE_PROPERTIES, #message_properties{expiry = undefined}).
-export([start_link/1, info_keys/0]).
@@ -152,7 +152,7 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
init_queue_state(State) ->
- lists:foldl(fun(F, S) -> F(S) end, State,
+ lists:foldl(fun(F, S) -> F(S) end, State,
[fun init_expires/1, fun init_ttl/1]).
init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
@@ -413,7 +413,7 @@ deliver_from_queue_deliver(AckRequired, false, State) ->
run_message_queue(State) ->
Funs = {fun deliver_from_queue_pred/2,
fun deliver_from_queue_deliver/3},
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
drop_expired_messages(State),
IsEmpty = BQ:is_empty(BQS),
{_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1),
@@ -424,17 +424,18 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
DeliverFun =
fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
{AckTag, BQS1} =
- BQ:publish_delivered(AckRequired, Message,
- #msg_properties{}, BQS),
+ BQ:publish_delivered(AckRequired, Message,
+ #message_properties{}, BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
-attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
+attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
record_current_channel_tx(ChPid, Txn),
- {true, State#q{backing_queue_state =
- BQ:tx_publish(Txn, Message, #msg_properties{}, BQS)}}.
+ {true,
+ State#q{backing_queue_state =
+ BQ:tx_publish(Txn, Message, #message_properties{}, BQS)}}.
deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
case attempt_delivery(Txn, ChPid, Message, State) of
@@ -442,25 +443,25 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
- BQS = BQ:publish(Message,
- msg_properties(State),
+ BQS = BQ:publish(Message,
+ message_properties(State),
State #q.backing_queue_state),
{false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})}
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
maybe_run_queue_via_backing_queue(
- fun (BQS) ->
+ fun (BQS) ->
BQ:requeue(AckTags, reset_msg_expiry_fun(State), BQS)
end, State).
-fetch(AckRequired, State = #q{backing_queue_state = BQS,
+fetch(AckRequired, State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
case BQ:fetch(AckRequired, BQS) of
- {empty, BQS1} ->
+ {empty, BQS1} ->
{empty, State#q{backing_queue_state = BQS1}};
{{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
- {{Message, IsDelivered, AckTag, Remaining},
+ {{Message, IsDelivered, AckTag, Remaining},
State#q{backing_queue_state = BQS1}}
end.
@@ -559,9 +560,9 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- {AckTags, BQS1} = BQ:tx_commit(Txn,
- fun () -> gen_server2:reply(From, ok) end,
- reset_msg_expiry_fun(State),
+ {AckTags, BQS1} = BQ:tx_commit(Txn,
+ fun () -> gen_server2:reply(From, ok) end,
+ reset_msg_expiry_fun(State),
BQS),
%% ChPid must be known here because of the participant management
%% by the channel.
@@ -583,38 +584,38 @@ subtract_acks(A, B) when is_list(B) ->
reset_msg_expiry_fun(State) ->
fun(MsgProps) ->
- MsgProps#msg_properties{expiry=calculate_msg_expiry(State)}
+ MsgProps#message_properties{expiry = calculate_msg_expiry(State)}
end.
-msg_properties(State) ->
- #msg_properties{expiry = calculate_msg_expiry(State)}.
+message_properties(State) ->
+ #message_properties{expiry = calculate_msg_expiry(State)}.
calculate_msg_expiry(_State = #q{ttl = undefined}) ->
undefined;
calculate_msg_expiry(_State = #q{ttl = TTL}) ->
- now_millis() + (TTL * 1000).
+ now_millis() + (TTL * 1000).
drop_expired_messages(State = #q{ttl = undefined}) ->
State;
-drop_expired_messages(State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+drop_expired_messages(State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
Now = now_millis(),
BQS1 = BQ:dropwhile(
- fun (_MsgProperties = #msg_properties{expiry=Expiry}) ->
+ fun (_MsgProperties = #message_properties{expiry = Expiry}) ->
Now > Expiry
end, BQS),
ensure_ttl_timer(State #q{backing_queue_state = BQS1}).
-ensure_ttl_timer(State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- ttl = TTL,
- ttl_timer_ref = undefined})
+ensure_ttl_timer(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ ttl = TTL,
+ ttl_timer_ref = undefined})
when TTL =/= undefined->
case BQ:is_empty(BQS) of
true ->
State;
false ->
- State#q{ttl_timer_ref =
+ State#q{ttl_timer_ref =
timer:send_after(TTL, self(), drop_expired)}
end;
ensure_ttl_timer(State) ->
@@ -622,8 +623,7 @@ ensure_ttl_timer(State) ->
now_millis() ->
timer:now_diff(now(), {0,0,0}).
-
-
+
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
@@ -752,7 +752,7 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
%%
%% we don't need an expiry here because messages are not being
- %% enqueued, so we use an empty msg_properties.
+ %% enqueued, so we use an empty message_properties.
{Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State),
reply(Delivered, NewState);
@@ -781,7 +781,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
case fetch(AckRequired, drop_expired_messages(State1)) of
- {empty, State2} ->
+ {empty, State2} ->
reply(empty, State2);
{{Message, IsDelivered, AckTag, Remaining}, State2} ->
case AckRequired of
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 513b14dff4..11056c8e12 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -69,7 +69,8 @@
-type(pmsg() :: {rabbit_amqqueue:name(), pkey()}).
-type(work_item() ::
- {publish, rabbit_types:message(), rabbit_types:msg_properties(), pmsg()} |
+ {publish,
+ rabbit_types:message(), rabbit_types:message_properties(), pmsg()} |
{deliver, pmsg()} |
{ack, pmsg()}).
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 6568aa705f..c5a3da5314 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -141,7 +141,7 @@
-define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2).
%% publish record is binary 1 followed by a bit for is_persistent,
-%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits
+%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits
%% of md5sum msg id
-define(PUBLISH_PREFIX, 1).
-define(PUBLISH_PREFIX_BITS, 1).
@@ -205,15 +205,16 @@
{'undefined' | non_neg_integer(), [any()], qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
--spec(publish/5 :: (rabbit_guid:guid(), seq_id(), rabbit_types:msg_properties(),
- boolean(), qistate()) -> qistate()).
+-spec(publish/5 :: (rabbit_guid:guid(), seq_id(),
+ rabbit_types:message_properties(), boolean(), qistate())
+ -> qistate()).
-spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(sync/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(flush/1 :: (qistate()) -> qistate()).
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
- {[{rabbit_guid:guid(), seq_id(),
- rabbit_types:msg_properties(),
+ {[{rabbit_guid:guid(), seq_id(),
+ rabbit_types:message_properties(),
boolean(), boolean()}], qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
@@ -258,7 +259,7 @@ delete_and_terminate(State) ->
ok = rabbit_misc:recursive_delete([Dir]),
State1.
-publish(Guid, SeqId, MsgProperties, IsPersistent, State)
+publish(Guid, SeqId, MsgProperties, IsPersistent, State)
when is_binary(Guid) ->
?GUID_BYTES = size(Guid),
{JournalHdl, State1} = get_journal_handle(State),
@@ -266,7 +267,7 @@ publish(Guid, SeqId, MsgProperties, IsPersistent, State)
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
false -> ?PUB_TRANS_JPREFIX
- end):?JPREFIX_BITS,
+ end):?JPREFIX_BITS,
SeqId:?SEQ_BITS>>,
create_pub_record_body(Guid, MsgProperties)]),
maybe_flush_journal(
@@ -463,8 +464,8 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
- fun (RelSeq,
- {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack},
+ fun (RelSeq,
+ {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack},
Segment1) ->
recover_message(ContainsCheckFun(Guid), CleanShutdown,
Del, RelSeq, Segment1)
@@ -518,8 +519,8 @@ queue_index_walker_reader(QueueName, Gatherer) ->
State = #qistate { segments = Segments, dir = Dir } =
recover_journal(blank_state(QueueName)),
[ok = segment_entries_foldr(
- fun (_RelSeq,
- {{Guid, _MsgProps, true}, _IsDelivered, no_ack},
+ fun (_RelSeq,
+ {{Guid, _MsgProps, true}, _IsDelivered, no_ack},
ok) ->
gatherer:in(Gatherer, {Guid, 1});
(_RelSeq, _Value, Acc) ->
@@ -533,7 +534,7 @@ queue_index_walker_reader(QueueName, Gatherer) ->
%% expiry/binary manipulation
%%----------------------------------------------------------------------------
-create_pub_record_body(Guid, #msg_properties{expiry = Expiry}) ->
+create_pub_record_body(Guid, #message_properties{expiry = Expiry}) ->
[Guid, expiry_to_binary(Expiry)].
expiry_to_binary(undefined) ->
@@ -552,11 +553,11 @@ read_pub_record_body(Hdl) ->
?NO_EXPIRY -> undefined;
X -> X
end,
- {Guid, #msg_properties{expiry = Exp}};
+ {Guid, #message_properties{expiry = Exp}};
Error ->
Error
end.
-
+
%%----------------------------------------------------------------------------
%% journal manipulation
%%----------------------------------------------------------------------------
@@ -806,8 +807,8 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
{Messages, Segments}, Dir) ->
Segment = segment_find_or_new(Seg, Dir, Segments),
{segment_entries_foldr(
- fun (RelSeq,
- {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack},
+ fun (RelSeq,
+ {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack},
Acc)
when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
(Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d2489685fd..638a45e117 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1639,7 +1639,7 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
fun (SeqId, {QiN, SeqIdsGuidsAcc, MSCStateN}) ->
Guid = rabbit_guid:guid(),
QiM = rabbit_queue_index:publish(
- Guid, SeqId, #msg_properties{}, Persistent, QiN),
+ Guid, SeqId, #message_properties{}, Persistent, QiN),
{ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid,
Guid, MSCStateN),
{QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM}
@@ -1661,9 +1661,9 @@ test_queue_index_props() ->
with_empty_test_queue(
fun(Qi0) ->
Guid = rabbit_guid:guid(),
- Props = #msg_properties{expiry=12345},
+ Props = #message_properties{expiry=12345},
Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0),
- {[{Guid, 1, Props, _, _}], Qi2} =
+ {[{Guid, 1, Props, _, _}], Qi2} =
rabbit_queue_index:read(1, 2, Qi1),
Qi2
end),
@@ -1802,12 +1802,12 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
fun (_N, VQN) ->
rabbit_variable_queue:publish(
rabbit_basic:message(
- rabbit_misc:r(<<>>, exchange, <<>>),
+ rabbit_misc:r(<<>>, exchange, <<>>),
<<>>, #'P_basic'{delivery_mode = case IsPersistent of
true -> 2;
false -> 1
- end}, <<>>),
- #msg_properties{}, VQN)
+ end}, <<>>),
+ #message_properties{}, VQN)
end, VQ, lists:seq(1, Count)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
@@ -1853,14 +1853,14 @@ test_dropwhile(VQ0) ->
fun (N, VQN) ->
rabbit_variable_queue:publish(
rabbit_basic:message(
- rabbit_misc:r(<<>>, exchange, <<>>),
- <<>>, #'P_basic'{}, <<>>),
- #msg_properties{expiry = N}, VQN)
+ rabbit_misc:r(<<>>, exchange, <<>>),
+ <<>>, #'P_basic'{}, <<>>),
+ #message_properties{expiry = N}, VQN)
end, VQ0, lists:seq(1, Count)),
%% drop the first 5 messages
VQ2 = rabbit_variable_queue:dropwhile(
- fun(#msg_properties { expiry = Expiry }) ->
+ fun(#message_properties { expiry = Expiry }) ->
Expiry =< 5
end, VQ1),
@@ -1875,7 +1875,7 @@ test_dropwhile(VQ0) ->
{empty, VQ4} = rabbit_variable_queue:fetch(false, VQ3),
VQ4.
-
+
test_variable_queue_dynamic_duration_change(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 1db2388331..7671267c31 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -42,7 +42,7 @@
binding/0, amqqueue/0, exchange/0, connection/0, protocol/0,
user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2,
ok_pid_or_error/0, channel_exit/0, connection_exit/0,
- msg_properties/0]).
+ message_properties/0]).
-type(channel_exit() :: no_return()).
-type(connection_exit() :: no_return()).
@@ -87,8 +87,8 @@
txn :: maybe(txn()),
sender :: pid(),
message :: message()}).
--type(msg_properties() ::
- #msg_properties{expiry :: pos_integer() | 'undefined'}).
+-type(message_properties() ::
+ #message_properties{expiry :: pos_integer() | 'undefined'}).
%% this is really an abstract type, but dialyzer does not support them
-type(txn() :: rabbit_guid:guid()).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 789587177e..72fa4aeb18 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -524,22 +524,22 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
in_counter = InCount + 1,
persistent_count = PCount1,
pending_ack = PA1 })}.
-
+
dropwhile(Pred, State) ->
case internal_queue_out(
fun(MsgStatus = #msg_status { msg_properties = MsgProps },
State1) ->
case Pred(MsgProps) of
true ->
- {_, State2} = internal_fetch(false,
+ {_, State2} = internal_fetch(false,
MsgStatus, State1),
- dropwhile(Pred, State2);
+ dropwhile(Pred, State2);
false ->
%% message needs to go back into Q4 (or
%% maybe go in for the first time if it was
%% loaded from Q3). Also the msg contents
%% might not be in RAM, so read them in now
- {MsgStatus1, State2 = #vqstate { q4 = Q4 }} =
+ {MsgStatus1, State2 = #vqstate { q4 = Q4 }} =
read_msg(MsgStatus, State1),
State2 #vqstate {q4 = queue:in_r(MsgStatus1, Q4)}
end
@@ -550,11 +550,11 @@ dropwhile(Pred, State) ->
fetch(AckRequired, State) ->
internal_queue_out(
- fun(MsgStatus, State1) ->
+ fun(MsgStatus, State1) ->
%% it's possible that the message wasn't read from disk
%% at this point, so read it in.
{MsgStatus1, State2} = read_msg(MsgStatus, State1),
- internal_fetch(AckRequired, MsgStatus1, State2)
+ internal_fetch(AckRequired, MsgStatus1, State2)
end, State).
internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) ->
@@ -568,20 +568,20 @@ internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) ->
Fun(MsgStatus, State #vqstate { q4 = Q4a })
end.
-read_msg(MsgStatus = #msg_status { msg = undefined,
- guid = Guid,
+read_msg(MsgStatus = #msg_status { msg = undefined,
+ guid = Guid,
index_on_disk = IndexOnDisk,
- is_persistent = IsPersistent },
- State = #vqstate { ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount,
+ is_persistent = IsPersistent },
+ State = #vqstate { ram_msg_count = RamMsgCount,
+ ram_index_count = RamIndexCount,
msg_store_clients = MSCState}) ->
- {{ok, Msg = #basic_message {}}, MSCState1} =
+ {{ok, Msg = #basic_message {}}, MSCState1} =
read_from_msg_store(MSCState, IsPersistent, Guid),
RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk),
true = RamIndexCount1 >= 0, %% ASSERTION
- {MsgStatus #msg_status { msg = Msg },
+ {MsgStatus #msg_status { msg = Msg },
State #vqstate { ram_msg_count = RamMsgCount + 1,
ram_index_count = RamIndexCount1,
msg_store_clients = MSCState1 }};
@@ -590,12 +590,12 @@ read_msg(MsgStatus, State) ->
internal_fetch(AckRequired,
MsgStatus = #msg_status {
- msg = Msg, guid = Guid, seq_id = SeqId,
+ msg = Msg, guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent, is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk },
- State = #vqstate {
- ram_msg_count = RamMsgCount, out_counter = OutCount,
- index_state = IndexState, len = Len, persistent_count = PCount,
+ msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk },
+ State = #vqstate {
+ ram_msg_count = RamMsgCount, out_counter = OutCount,
+ index_state = IndexState, len = Len, persistent_count = PCount,
pending_ack = PA }) ->
%% 1. Mark it delivered if necessary
IndexState1 = maybe_write_delivered(
@@ -637,7 +637,7 @@ internal_fetch(AckRequired,
len = Len1,
persistent_count = PCount1,
pending_ack = PA1 })}.
-
+
ack(AckTags, State) ->
a(ack(fun rabbit_msg_store:remove/2,
fun (_AckEntry, State1) -> State1 end,
@@ -682,20 +682,20 @@ tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) ->
a(case IsDurable andalso HasPersistentPubs of
true -> ok = rabbit_msg_store:sync(
?PERSISTENT_MSG_STORE, PersistentGuids,
- msg_store_callback(PersistentGuids,Pubs, AckTags1,
+ msg_store_callback(PersistentGuids,Pubs, AckTags1,
Fun, MsgPropsFun)),
State;
- false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1,
+ false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1,
Fun, MsgPropsFun, State)
end)}.
requeue(AckTags, MsgPropsFun, State) ->
a(reduce_memory_use(
ack(fun rabbit_msg_store:release/2,
- fun (#msg_status { msg = Msg,
+ fun (#msg_status { msg = Msg,
msg_properties = MsgProperties }, State1) ->
- {_SeqId, State2} =
- publish(Msg, MsgPropsFun(MsgProperties), true,
+ {_SeqId, State2} =
+ publish(Msg, MsgPropsFun(MsgProperties), true,
false, State1),
State2;
({IsPersistent, Guid, MsgProperties}, State1) ->
@@ -852,7 +852,7 @@ one_if(false) -> 0.
cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
-msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid },
+msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid },
MsgProperties) ->
#msg_status { seq_id = SeqId, guid = Guid, msg = Msg,
is_persistent = IsPersistent, is_delivered = false,
@@ -892,8 +892,8 @@ store_tx(Txn, Tx) -> put({txn, Txn}, Tx).
erase_tx(Txn) -> erase({txn, Txn}).
persistent_guids(Pubs) ->
- [Guid ||
- {#basic_message { guid = Guid, is_persistent = true },
+ [Guid ||
+ {#basic_message { guid = Guid, is_persistent = true },
_MsgProps} <- Pubs].
betas_from_index_entries(List, TransientThreshold, IndexState) ->
@@ -963,7 +963,7 @@ msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
Self = self(),
F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
Self, fun (StateN) -> tx_commit_post_msg_store(
- true, Pubs, AckTags,
+ true, Pubs, AckTags,
Fun, MsgPropsFun, StateN)
end)
end,
@@ -989,7 +989,7 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun,
true -> [AckTag || AckTag <- AckTags,
case dict:fetch(AckTag, PA) of
#msg_status {} -> false;
- {IsPersistent,
+ {IsPersistent,
_Guid, _MsgProps} -> IsPersistent
end];
false -> []
@@ -1026,11 +1026,11 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
Pubs = lists:append(lists:reverse(SPubs)),
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
lists:foldl(
- fun ({Msg = #basic_message { is_persistent = IsPersistent },
+ fun ({Msg = #basic_message { is_persistent = IsPersistent },
MsgProperties},
{SeqIdsAcc, State2}) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- {SeqId, State3} =
+ {SeqId, State3} =
publish(Msg, MsgProperties, false, IsPersistent1, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
end, {PAcks, ack(Acks, State)}, Pubs),
@@ -1098,7 +1098,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent },
ram_msg_count = RamMsgCount }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProperties))
- #msg_status { is_delivered = IsDelivered,
+ #msg_status { is_delivered = IsDelivered,
msg_on_disk = MsgOnDisk},
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case bpqueue:is_empty(Q3) of
@@ -1146,8 +1146,8 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
IndexState)
when Force orelse IsPersistent ->
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
- IndexState1 = rabbit_queue_index:publish(Guid,
- SeqId,
+ IndexState1 = rabbit_queue_index:publish(Guid,
+ SeqId,
MsgProperties,
IsPersistent,
IndexState),
@@ -1172,8 +1172,8 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk,
- msg_properties = MsgProperties } = MsgStatus,
+ msg_on_disk = MsgOnDisk,
+ msg_properties = MsgProperties } = MsgStatus,
PA) ->
AckEntry = case MsgOnDisk of
true -> {IsPersistent, Guid, MsgProperties};
@@ -1227,8 +1227,8 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
index_on_disk = false }, Acc) ->
Acc;
-accumulate_ack(SeqId,
- {IsPersistent, Guid, _MsgProperties},
+accumulate_ack(SeqId,
+ {IsPersistent, Guid, _MsgProperties},
{SeqIdsAcc, Dict}) ->
{cons_if(IsPersistent, SeqId, SeqIdsAcc),
rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}.