summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2010-10-08 14:28:49 +0100
committerRob Harrop <rob@rabbitmq.com>2010-10-08 14:28:49 +0100
commit9a2ec18d5ca5b946d381450cdc5af5df2eb822e0 (patch)
tree1ce52dd424c10944170b0e3534bb23f5311d5c32
parent2477d9e1780d81529b1734f853c310725081457d (diff)
downloadrabbitmq-server-git-9a2ec18d5ca5b946d381450cdc5af5df2eb822e0.tar.gz
tweaked RamIndexCount accounting again, now it doesn't get decremented twice when reinserting into the queue. Also moved transactional msgpropsfun application to tx_commit_index
-rw-r--r--src/rabbit_variable_queue.erl57
1 files changed, 31 insertions, 26 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 05e540c0bf..3cfebb50ef 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -620,12 +620,8 @@ internal_fetch(AckRequired,
Len1 = Len - 1,
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
- RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk),
- true = RamIndexCount1 >= 0, %% ASSERTION
-
{{Msg, IsDelivered, AckTag, Len1},
a(State #vqstate { ram_msg_count = RamMsgCount1,
- ram_index_count = RamIndexCount1,
out_counter = OutCount + 1,
index_state = IndexState2,
len = Len1,
@@ -987,22 +983,20 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun,
end];
false -> []
end,
- PubsOrdered = lists:foldl(
- fun ({Msg, MsgProps}, Acc) ->
- [{Msg, MsgPropsFun(MsgProps)} | Acc]
- end, [], Pubs),
case IsDurable andalso (HasPersistentPubs orelse PersistentAcks =/= []) of
- true -> State #vqstate { on_sync = #sync {
- acks_persistent = [PersistentAcks | SPAcks],
- acks_all = [AckTags | SAcks],
- pubs = [PubsOrdered | SPubs],
- funs = [Fun | SFuns] }};
+ true -> State #vqstate {
+ on_sync = #sync {
+ acks_persistent = [PersistentAcks | SPAcks],
+ acks_all = [AckTags | SAcks],
+ pubs = [{MsgPropsFun, Pubs} | SPubs],
+ funs = [Fun | SFuns] }};
false -> State1 = tx_commit_index(
- State #vqstate { on_sync = #sync {
- acks_persistent = [],
- acks_all = [AckTags],
- pubs = [PubsOrdered],
- funs = [Fun] } }),
+ State #vqstate {
+ on_sync = #sync {
+ acks_persistent = [],
+ acks_all = [AckTags],
+ pubs = [{MsgPropsFun, Pubs}],
+ funs = [Fun] } }),
State1 #vqstate { on_sync = OnSync }
end.
@@ -1016,7 +1010,13 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
durable = IsDurable }) ->
PAcks = lists:append(SPAcks),
Acks = lists:append(SAcks),
- Pubs = lists:append(lists:reverse(SPubs)),
+ Pubs = lists:foldl(
+ fun({Fun, PubsN}, OuterAcc) ->
+ lists:foldl(
+ fun({Msg, MsgProps}, Acc) ->
+ [{Msg, Fun(MsgProps)} | Acc]
+ end, OuterAcc, PubsN)
+ end, [], SPubs),
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun ({Msg = #basic_message { is_persistent = IsPersistent },
@@ -1325,16 +1325,21 @@ chunk_size(Current, Permitted) ->
lists:min([Current - Permitted, ?IO_BATCH_SIZE]).
fetch_from_q3(State = #vqstate {
- q1 = Q1,
- q2 = Q2,
- delta = #delta { count = DeltaCount },
- q3 = Q3,
- q4 = Q4 }) ->
+ q1 = Q1,
+ q2 = Q2,
+ delta = #delta { count = DeltaCount },
+ q3 = Q3,
+ q4 = Q4,
+ ram_index_count = RamIndexCount}) ->
case bpqueue:out(Q3) of
{empty, _Q3} ->
{empty, State};
- {{value, _IndexOnDisk, MsgStatus}, Q3a} ->
- State1 = State #vqstate { q3 = Q3a },
+ {{value, IndexOnDisk, MsgStatus}, Q3a} ->
+ RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk),
+ true = RamIndexCount1 >= 0, %% ASSERTION
+
+ State1 = State #vqstate { q3 = Q3a,
+ ram_index_count = RamIndexCount1 },
State2 =
case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of
{true, true} ->