summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-09-27 18:00:00 +0100
committerEmile Joubert <emile@rabbitmq.com>2011-09-27 18:00:00 +0100
commit1e71f208683e5d505c4c0810751545c95391c20e (patch)
tree43f765df4eac38101f4fa4a68ab0822c329c6fc1
parent96ce53bf319c4f8fec99580095f84be809516dbd (diff)
downloadrabbitmq-server-git-1e71f208683e5d505c4c0810751545c95391c20e.tar.gz
Refactor
Making use of the symmetry between q3 and q4
-rw-r--r--src/rabbit_backing_queue_qc.erl10
-rw-r--r--src/rabbit_variable_queue.erl153
2 files changed, 84 insertions, 79 deletions
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index d6b17771fc..a8cfe6ba85 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -383,10 +383,12 @@ qc_test_queue(Durable) ->
rand_choice([]) -> [];
rand_choice(List) -> rand_choice(List, [], random:uniform(length(List))).
-rand_choice(_List, Selection, 0) -> Selection;
-rand_choice(List, Selection, N) -> Picked = lists:nth(random:uniform(length(List)), List),
- rand_choice(List -- [Picked],
- [Picked | Selection], N - 1).
+rand_choice(_List, Selection, 0) ->
+ Selection;
+rand_choice(List, Selection, N) ->
+ Picked = lists:nth(random:uniform(length(List)), List),
+ rand_choice(List -- [Picked], [Picked | Selection],
+ N - 1).
dropfun(Props) ->
Expiry = eval({call, erlang, element,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f6446d9cf6..df8bb312da 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -270,6 +270,8 @@
end_seq_id %% end_seq_id is exclusive
}).
+-record(merge_funs, {join, out, in, publish}).
+
%% When we discover, on publish, that we should write some indices to
%% disk for some betas, the IO_BATCH_SIZE sets the number of betas
%% that we must be due to write indices for before we do any work at
@@ -1307,12 +1309,12 @@ requeue_merge(SeqIdsSorted, MsgPropsFun,
q4 = Q4,
in_counter = InCounter,
len = Len } = State) ->
- {SeqIds1, MsgIds, Q4a, State1} = q4_merge(SeqIdsSorted, q3_least_key(Q3),
- Q4, queue:new(), [], MsgPropsFun,
- State),
- {SeqIds2, MsgIds1, Q3a, State2} = q3_merge(SeqIds1, delta_least_key(Delta),
- Q3, bpqueue:new(), MsgIds,
- MsgPropsFun, State1),
+ {SeqIds1, MsgIds, Q4a, State1} = queue_merge(SeqIdsSorted, q3_limit(Q3),
+ Q4, queue:new(), [],
+ q4_funs(MsgPropsFun), State),
+ {SeqIds2, MsgIds1, Q3a, State2} = queue_merge(SeqIds1, delta_limit(Delta),
+ Q3, bpqueue:new(), MsgIds,
+ q3_funs(MsgPropsFun), State1),
{MsgIds2, Delta1, State3} = delta_merge(SeqIds2, MsgIds1, Delta,
MsgPropsFun, State2),
MsgCount = length(MsgIds2),
@@ -1322,81 +1324,82 @@ requeue_merge(SeqIdsSorted, MsgPropsFun,
in_counter = InCounter + MsgCount,
len = Len + MsgCount }}.
-q4_merge([], _Limit, Q, Front, MsgIds, _MsgPropsFun, State) ->
- {[], MsgIds, queue:join(Front, Q), State};
-q4_merge([SeqId | Rest] = SeqIds, Limit, Q, Front, MsgIds, MsgPropsFun, State)
+queue_merge([], _Limit, Q, Front, MsgIds, #merge_funs { join = QJoin },
+ State) ->
+ {[], MsgIds, QJoin(Front, Q), State};
+queue_merge([SeqId | Rest] = SeqIds, Limit, Q, Front, MsgIds,
+ #merge_funs { out = QOut,
+ in = QIn,
+ publish = QPublish } = Funs, State)
when Limit == undefined orelse SeqId < Limit ->
- case queue:out(Q) of
+ case QOut(Q) of
{{value, #msg_status { seq_id = SeqId1 } = MsgStatusHead}, Q1} ->
case SeqId1 > SeqId of
true -> {#msg_status { msg_id = MsgId } = MsgStatus1, State1} =
- q4_publish(SeqId, MsgPropsFun, State),
- q4_merge(Rest, Limit, Q, queue:in(MsgStatus1, Front),
- [MsgId | MsgIds], MsgPropsFun, State1);
- false -> q4_merge(SeqIds, Limit, Q1,
- queue:in(MsgStatusHead, Front), MsgIds,
- MsgPropsFun, State)
+ QPublish(SeqId, State),
+ queue_merge(Rest, Limit, Q, QIn(MsgStatus1, Front),
+ [MsgId | MsgIds], Funs, State1);
+ false -> queue_merge(SeqIds, Limit, Q1,
+ QIn(MsgStatusHead, Front), MsgIds,
+ Funs, State)
end;
{empty, _Q1} ->
{#msg_status { msg_id = MsgId } = MsgStatus1, State1} =
- q4_publish(SeqId, MsgPropsFun, State),
- q4_merge(Rest, Limit, Q, queue:in(MsgStatus1, Front),
- [MsgId | MsgIds], MsgPropsFun, State1)
+ QPublish(SeqId, State),
+ queue_merge(Rest, Limit, Q, QIn(MsgStatus1, Front),
+ [MsgId | MsgIds], Funs, State1)
end;
-q4_merge(SeqIds, _Limit, Q, Front, MsgIds, _MsgPropsFun, State) ->
- {SeqIds, MsgIds, queue:join(Front, Q), State}.
-
-q4_publish(SeqId, MsgPropsFun, State) ->
- {#msg_status { msg = Msg } = MsgStatus,
- #vqstate { ram_msg_count = RamMsgCount } = State1} =
- msg_from_pending_ack(SeqId, MsgPropsFun, State),
- case Msg of
- undefined -> read_msg(MsgStatus, State1);
- #basic_message{} -> {MsgStatus,
- State1 #vqstate {ram_msg_count = RamMsgCount + 1}}
- end.
+queue_merge(SeqIds, _Limit, Q, Front, MsgIds, #merge_funs { join = QJoin },
+ State) ->
+ {SeqIds, MsgIds, QJoin(Front, Q), State}.
+
+q4_funs(MsgPropsFun) ->
+ #merge_funs {
+ join = fun queue:join/2,
+ out = fun queue:out/1,
+ in = fun queue:in/2,
+ publish = fun (SeqId, State) ->
+ {#msg_status { msg = Msg } = MsgStatus,
+ #vqstate { ram_msg_count = RamMsgCount } = State1} =
+ msg_from_pending_ack(SeqId, MsgPropsFun, State),
+ case Msg of
+ undefined ->
+ read_msg(MsgStatus, State1);
+ #basic_message{} -> {MsgStatus,
+ State1 #vqstate { ram_msg_count =
+ RamMsgCount + 1}}
+ end
+ end}.
+
+q3_funs(MsgPropsFun) ->
+ #merge_funs {
+ join = fun bpqueue:join/2,
+ out = fun (Q) ->
+ case bpqueue:out(Q) of
+ {{value, _IndexOnDisk, MsgStatus}, Q1} ->
+ {{value, MsgStatus}, Q1};
+ {empty, _Q1} = X ->
+ X
+ end
+ end,
+ in = fun (#msg_status { index_on_disk = IOD } = MsgStatus, Q) ->
+ bpqueue:in(IOD, MsgStatus, Q)
+ end,
+ publish = fun (SeqId, State) ->
+ {#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus,
+ State1} = msg_from_pending_ack(SeqId, MsgPropsFun, State),
+ {#msg_status { msg = Msg,
+ index_on_disk = IndexOnDisk1 } = MsgStatus1,
+ #vqstate { ram_index_count = RamIndexCount,
+ ram_msg_count = RamMsgCount } = State2} =
+ maybe_write_to_disk(not MsgOnDisk, false, MsgStatus,
+ State1),
+ {MsgStatus1,
+ State2 #vqstate {
+ ram_index_count = RamIndexCount + one_if(not IndexOnDisk1),
+ ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}
+ end}.
-q3_merge([], _Limit, Q, Front, MsgIds, _MsgPropsFun, State) ->
- {[], MsgIds, bpqueue:join(Front, Q), State};
-q3_merge([SeqId | Rest] = SeqIds, Limit, Q, Front, MsgIds, MsgPropsFun, State)
- when Limit == undefined orelse SeqId < Limit ->
- case bpqueue:out(Q) of
- {{value, IndexOnDiskHead,
- #msg_status { seq_id = SeqId1 } = MsgStatusHead}, Q1} ->
- case SeqId1 > SeqId of
- true -> {#msg_status { msg_id = MsgId,
- index_on_disk = IndexOnDisk1 } =
- MsgStatus1, State1} =
- q3_publish(SeqId, MsgPropsFun, State),
- q3_merge(Rest, Limit, Q,
- bpqueue:in(IndexOnDisk1, MsgStatus1, Front),
- [MsgId | MsgIds], MsgPropsFun, State1);
- false -> q3_merge(SeqIds, Limit, Q1,
- bpqueue:in(IndexOnDiskHead, MsgStatusHead,
- Front), MsgIds, MsgPropsFun, State)
- end;
- {empty, _Q1} ->
- {#msg_status { msg_id = MsgId,
- index_on_disk = IndexOnDisk1 } = MsgStatus1,
- State1} = q3_publish(SeqId, MsgPropsFun, State),
- q3_merge(Rest, Limit, Q,
- bpqueue:in(IndexOnDisk1, MsgStatus1, Front),
- [MsgId | MsgIds], MsgPropsFun, State1)
- end;
-q3_merge(SeqIds, _Limit, Q, Front, MsgIds, _MsgPropsFun, State) ->
- {SeqIds, MsgIds, bpqueue:join(Front, Q), State}.
-
-q3_publish(SeqId, MsgPropsFun, State) ->
- {#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, State1} =
- msg_from_pending_ack(SeqId, MsgPropsFun, State),
- {#msg_status { msg = Msg, index_on_disk = IndexOnDisk1 } = MsgStatus1,
- #vqstate { ram_index_count = RamIndexCount,
- ram_msg_count = RamMsgCount } = State1} =
- maybe_write_to_disk(not MsgOnDisk, false, MsgStatus, State),
- {MsgStatus1,
- State1 #vqstate {
- ram_index_count = RamIndexCount + one_if(not IndexOnDisk1),
- ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}.
delta_merge([], MsgIds, Delta, _MsgPropsFun, State) ->
{MsgIds, Delta, State};
@@ -1444,7 +1447,7 @@ msg_from_pending_ack(SeqId, MsgPropsFun,
end,
{MsgStatus1 #msg_status { msg_props = MsgPropsFun1(MsgProps1) }, State1}.
-q3_least_key(BPQ) ->
+q3_limit(BPQ) ->
case bpqueue:is_empty(BPQ) of
true -> undefined;
false -> {{value, _Prefix, #msg_status { seq_id = SeqId }}, _BPQ} =
@@ -1452,8 +1455,8 @@ q3_least_key(BPQ) ->
SeqId
end.
-delta_least_key(?BLANK_DELTA_PATTERN(_X)) -> undefined;
-delta_least_key(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
+delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
+delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
%%----------------------------------------------------------------------------
%% Phase changes