diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2011-09-27 18:00:00 +0100 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2011-09-27 18:00:00 +0100 |
| commit | 1e71f208683e5d505c4c0810751545c95391c20e (patch) | |
| tree | 43f765df4eac38101f4fa4a68ab0822c329c6fc1 /src | |
| parent | 96ce53bf319c4f8fec99580095f84be809516dbd (diff) | |
| download | rabbitmq-server-git-1e71f208683e5d505c4c0810751545c95391c20e.tar.gz | |
Refactor
Making use of the symmetry between q3 and q4
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_backing_queue_qc.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 153 |
2 files changed, 84 insertions, 79 deletions
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index d6b17771fc..a8cfe6ba85 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -383,10 +383,12 @@ qc_test_queue(Durable) -> rand_choice([]) -> []; rand_choice(List) -> rand_choice(List, [], random:uniform(length(List))). -rand_choice(_List, Selection, 0) -> Selection; -rand_choice(List, Selection, N) -> Picked = lists:nth(random:uniform(length(List)), List), - rand_choice(List -- [Picked], - [Picked | Selection], N - 1). +rand_choice(_List, Selection, 0) -> + Selection; +rand_choice(List, Selection, N) -> + Picked = lists:nth(random:uniform(length(List)), List), + rand_choice(List -- [Picked], [Picked | Selection], + N - 1). dropfun(Props) -> Expiry = eval({call, erlang, element, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f6446d9cf6..df8bb312da 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -270,6 +270,8 @@ end_seq_id %% end_seq_id is exclusive }). +-record(merge_funs, {join, out, in, publish}). + %% When we discover, on publish, that we should write some indices to %% disk for some betas, the IO_BATCH_SIZE sets the number of betas %% that we must be due to write indices for before we do any work at @@ -1307,12 +1309,12 @@ requeue_merge(SeqIdsSorted, MsgPropsFun, q4 = Q4, in_counter = InCounter, len = Len } = State) -> - {SeqIds1, MsgIds, Q4a, State1} = q4_merge(SeqIdsSorted, q3_least_key(Q3), - Q4, queue:new(), [], MsgPropsFun, - State), - {SeqIds2, MsgIds1, Q3a, State2} = q3_merge(SeqIds1, delta_least_key(Delta), - Q3, bpqueue:new(), MsgIds, - MsgPropsFun, State1), + {SeqIds1, MsgIds, Q4a, State1} = queue_merge(SeqIdsSorted, q3_limit(Q3), + Q4, queue:new(), [], + q4_funs(MsgPropsFun), State), + {SeqIds2, MsgIds1, Q3a, State2} = queue_merge(SeqIds1, delta_limit(Delta), + Q3, bpqueue:new(), MsgIds, + q3_funs(MsgPropsFun), State1), {MsgIds2, Delta1, State3} = delta_merge(SeqIds2, MsgIds1, Delta, MsgPropsFun, State2), MsgCount = length(MsgIds2), @@ -1322,81 +1324,82 @@ requeue_merge(SeqIdsSorted, MsgPropsFun, in_counter = InCounter + MsgCount, len = Len + MsgCount }}. -q4_merge([], _Limit, Q, Front, MsgIds, _MsgPropsFun, State) -> - {[], MsgIds, queue:join(Front, Q), State}; -q4_merge([SeqId | Rest] = SeqIds, Limit, Q, Front, MsgIds, MsgPropsFun, State) +queue_merge([], _Limit, Q, Front, MsgIds, #merge_funs { join = QJoin }, + State) -> + {[], MsgIds, QJoin(Front, Q), State}; +queue_merge([SeqId | Rest] = SeqIds, Limit, Q, Front, MsgIds, + #merge_funs { out = QOut, + in = QIn, + publish = QPublish } = Funs, State) when Limit == undefined orelse SeqId < Limit -> - case queue:out(Q) of + case QOut(Q) of {{value, #msg_status { seq_id = SeqId1 } = MsgStatusHead}, Q1} -> case SeqId1 > SeqId of true -> {#msg_status { msg_id = MsgId } = MsgStatus1, State1} = - q4_publish(SeqId, MsgPropsFun, State), - q4_merge(Rest, Limit, Q, queue:in(MsgStatus1, Front), - [MsgId | MsgIds], MsgPropsFun, State1); - false -> q4_merge(SeqIds, Limit, Q1, - queue:in(MsgStatusHead, Front), MsgIds, - MsgPropsFun, State) + QPublish(SeqId, State), + queue_merge(Rest, Limit, Q, QIn(MsgStatus1, Front), + [MsgId | MsgIds], Funs, State1); + false -> queue_merge(SeqIds, Limit, Q1, + QIn(MsgStatusHead, Front), MsgIds, + Funs, State) end; {empty, _Q1} -> {#msg_status { msg_id = MsgId } = MsgStatus1, State1} = - q4_publish(SeqId, MsgPropsFun, State), - q4_merge(Rest, Limit, Q, queue:in(MsgStatus1, Front), - [MsgId | MsgIds], MsgPropsFun, State1) + QPublish(SeqId, State), + queue_merge(Rest, Limit, Q, QIn(MsgStatus1, Front), + [MsgId | MsgIds], Funs, State1) end; -q4_merge(SeqIds, _Limit, Q, Front, MsgIds, _MsgPropsFun, State) -> - {SeqIds, MsgIds, queue:join(Front, Q), State}. - -q4_publish(SeqId, MsgPropsFun, State) -> - {#msg_status { msg = Msg } = MsgStatus, - #vqstate { ram_msg_count = RamMsgCount } = State1} = - msg_from_pending_ack(SeqId, MsgPropsFun, State), - case Msg of - undefined -> read_msg(MsgStatus, State1); - #basic_message{} -> {MsgStatus, - State1 #vqstate {ram_msg_count = RamMsgCount + 1}} - end. +queue_merge(SeqIds, _Limit, Q, Front, MsgIds, #merge_funs { join = QJoin }, + State) -> + {SeqIds, MsgIds, QJoin(Front, Q), State}. + +q4_funs(MsgPropsFun) -> + #merge_funs { + join = fun queue:join/2, + out = fun queue:out/1, + in = fun queue:in/2, + publish = fun (SeqId, State) -> + {#msg_status { msg = Msg } = MsgStatus, + #vqstate { ram_msg_count = RamMsgCount } = State1} = + msg_from_pending_ack(SeqId, MsgPropsFun, State), + case Msg of + undefined -> + read_msg(MsgStatus, State1); + #basic_message{} -> {MsgStatus, + State1 #vqstate { ram_msg_count = + RamMsgCount + 1}} + end + end}. + +q3_funs(MsgPropsFun) -> + #merge_funs { + join = fun bpqueue:join/2, + out = fun (Q) -> + case bpqueue:out(Q) of + {{value, _IndexOnDisk, MsgStatus}, Q1} -> + {{value, MsgStatus}, Q1}; + {empty, _Q1} = X -> + X + end + end, + in = fun (#msg_status { index_on_disk = IOD } = MsgStatus, Q) -> + bpqueue:in(IOD, MsgStatus, Q) + end, + publish = fun (SeqId, State) -> + {#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, + State1} = msg_from_pending_ack(SeqId, MsgPropsFun, State), + {#msg_status { msg = Msg, + index_on_disk = IndexOnDisk1 } = MsgStatus1, + #vqstate { ram_index_count = RamIndexCount, + ram_msg_count = RamMsgCount } = State2} = + maybe_write_to_disk(not MsgOnDisk, false, MsgStatus, + State1), + {MsgStatus1, + State2 #vqstate { + ram_index_count = RamIndexCount + one_if(not IndexOnDisk1), + ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }} + end}. -q3_merge([], _Limit, Q, Front, MsgIds, _MsgPropsFun, State) -> - {[], MsgIds, bpqueue:join(Front, Q), State}; -q3_merge([SeqId | Rest] = SeqIds, Limit, Q, Front, MsgIds, MsgPropsFun, State) - when Limit == undefined orelse SeqId < Limit -> - case bpqueue:out(Q) of - {{value, IndexOnDiskHead, - #msg_status { seq_id = SeqId1 } = MsgStatusHead}, Q1} -> - case SeqId1 > SeqId of - true -> {#msg_status { msg_id = MsgId, - index_on_disk = IndexOnDisk1 } = - MsgStatus1, State1} = - q3_publish(SeqId, MsgPropsFun, State), - q3_merge(Rest, Limit, Q, - bpqueue:in(IndexOnDisk1, MsgStatus1, Front), - [MsgId | MsgIds], MsgPropsFun, State1); - false -> q3_merge(SeqIds, Limit, Q1, - bpqueue:in(IndexOnDiskHead, MsgStatusHead, - Front), MsgIds, MsgPropsFun, State) - end; - {empty, _Q1} -> - {#msg_status { msg_id = MsgId, - index_on_disk = IndexOnDisk1 } = MsgStatus1, - State1} = q3_publish(SeqId, MsgPropsFun, State), - q3_merge(Rest, Limit, Q, - bpqueue:in(IndexOnDisk1, MsgStatus1, Front), - [MsgId | MsgIds], MsgPropsFun, State1) - end; -q3_merge(SeqIds, _Limit, Q, Front, MsgIds, _MsgPropsFun, State) -> - {SeqIds, MsgIds, bpqueue:join(Front, Q), State}. - -q3_publish(SeqId, MsgPropsFun, State) -> - {#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, State1} = - msg_from_pending_ack(SeqId, MsgPropsFun, State), - {#msg_status { msg = Msg, index_on_disk = IndexOnDisk1 } = MsgStatus1, - #vqstate { ram_index_count = RamIndexCount, - ram_msg_count = RamMsgCount } = State1} = - maybe_write_to_disk(not MsgOnDisk, false, MsgStatus, State), - {MsgStatus1, - State1 #vqstate { - ram_index_count = RamIndexCount + one_if(not IndexOnDisk1), - ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}. delta_merge([], MsgIds, Delta, _MsgPropsFun, State) -> {MsgIds, Delta, State}; @@ -1444,7 +1447,7 @@ msg_from_pending_ack(SeqId, MsgPropsFun, end, {MsgStatus1 #msg_status { msg_props = MsgPropsFun1(MsgProps1) }, State1}. -q3_least_key(BPQ) -> +q3_limit(BPQ) -> case bpqueue:is_empty(BPQ) of true -> undefined; false -> {{value, _Prefix, #msg_status { seq_id = SeqId }}, _BPQ} = @@ -1452,8 +1455,8 @@ q3_least_key(BPQ) -> SeqId end. -delta_least_key(?BLANK_DELTA_PATTERN(_X)) -> undefined; -delta_least_key(#delta { start_seq_id = StartSeqId }) -> StartSeqId. +delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; +delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. %%---------------------------------------------------------------------------- %% Phase changes |
