diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2011-08-15 17:42:09 +0100 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2011-08-15 17:42:09 +0100 |
| commit | 7c151ba0a8f3482d3651e0dff3244f5fb3a02cac (patch) | |
| tree | 9b1c7d33a7730fd37caf7cd167b5a0237e95825a | |
| parent | 43b0f15fc6ade39b98c31298fa9393ce5142b4af (diff) | |
| download | rabbitmq-server-git-7c151ba0a8f3482d3651e0dff3244f5fb3a02cac.tar.gz | |
Initial version of requeue preserving order
| -rw-r--r-- | src/rabbit_variable_queue.erl | 117 |
1 files changed, 91 insertions, 26 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f49b10aa87..203e3f8c92 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -560,27 +560,10 @@ ack(AckTags, State) -> {MsgIds, a(State1)}. requeue(AckTags, MsgPropsFun, State) -> - MsgPropsFun1 = fun (MsgProps) -> - (MsgPropsFun(MsgProps)) #message_properties { - needs_confirming = false } - end, - {MsgIds, State1} = - ack(fun (_, _, _) -> ok end, - fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> - {_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps), - true, false, State1), - State2; - ({IsPersistent, MsgId, MsgProps, _IndexOnDisk}, State1) -> - #vqstate { msg_store_clients = MSCState } = State1, - {{ok, Msg = #basic_message{}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, MsgId), - State2 = State1 #vqstate { msg_store_clients = MSCState1 }, - {_SeqId, State3} = publish(Msg, MsgPropsFun1(MsgProps), - true, true, State2), - State3 - end, - AckTags, State), - {MsgIds, a(reduce_memory_use(State1))}. + lists:foldl(fun (AckTag, {MsgIds, S}) -> + {MsgId, S1} = requeue_single(AckTag, MsgPropsFun, S), + {[MsgId | MsgIds], reduce_memory_use(S1)} + end, {[], State}, lists:sort(fun erlang:'>='/2, AckTags)). len(#vqstate { len = Len }) -> Len. @@ -1031,11 +1014,11 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { end, Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, IndexState2 = - case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of - {false, true, false, _} -> Rem(), IndexState1; - {false, true, true, _} -> Rem(), Ack(); - { true, true, true, false} -> Ack(); - _ -> IndexState1 + case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent, IsDelivered} of + {false, true, false, _, _} -> Rem(), IndexState1; + {false, true, true, _, false} -> Rem(), Ack(); + { true, true, true, false, false} -> Ack(); + _ -> IndexState1 end, %% 3. If an ack is required, add something sensible to PA @@ -1337,6 +1320,88 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> end). %%---------------------------------------------------------------------------- +%% Internal plumbing for requeue +%%---------------------------------------------------------------------------- + +requeue_single(AckTag, MsgPropsFun, #vqstate { pending_ack = PA, + ram_ack_index = RAI } = State) -> + MsgPropsFun1 = fun (MsgProps) -> + (MsgPropsFun(MsgProps)) #message_properties { + needs_confirming = false } + end, + State1 = State #vqstate { pending_ack = dict:erase(AckTag, PA), + ram_ack_index = gb_trees:delete_any(AckTag, RAI) }, + #msg_status { msg_id = MsgId1, + msg_props = MsgProps1 } = MsgStatus1 = + case dict:fetch(AckTag, PA) of + {IsPersistent, MsgId, MsgProps, IndexOnDisk} -> + #msg_status { seq_id = AckTag, + msg_id = MsgId, + msg = undefined, + is_persistent = IsPersistent, + is_delivered = true, + msg_on_disk = true, + index_on_disk = IndexOnDisk, + msg_props = MsgProps }; + #msg_status{} = MsgStatus0 -> MsgStatus0 + end, + {MsgId1, publish_r(MsgStatus1 #msg_status{ + msg_props = MsgPropsFun1(MsgProps1)}, State1)}. + +publish_r(MsgStatus = #msg_status { seq_id = SeqId, + msg = Msg, + index_on_disk = IndexOnDisk }, + State = #vqstate { q3 = Q3, + q4 = Q4, + delta = Delta, + len = Len, + in_counter = InCounter, + ram_msg_count = RamMsgCount, + ram_index_count = RamIndexCount }) -> + (case pick_store(SeqId, State) of + q4 -> case Msg of + undefined -> + {MsgStatus1, State1 = #vqstate { q4 = Q4a }} = + read_msg(MsgStatus, State), + State1 #vqstate { q4 = gb_trees:insert( + SeqId, MsgStatus1, Q4a)}; + #basic_message{} -> + State #vqstate { q4 = gb_trees:insert(SeqId, MsgStatus, + Q4), + ram_msg_count = RamMsgCount + 1 } + end; + q3 -> State #vqstate { + q3 = q3tree:in_r(IndexOnDisk, MsgStatus, Q3), + ram_index_count = RamIndexCount + one_if(not IndexOnDisk), + ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }; + + delta -> #delta { start_seq_id = StartSeqId, + count = Count, + end_seq_id = EndSeqId} = Delta, + Delta1 = Delta #delta { start_seq_id = min(SeqId, StartSeqId), + count = Count + 1, + end_seq_id = max(SeqId + 1, EndSeqId)}, + State #vqstate { delta = Delta1 } + end) #vqstate { len = Len + 1, + in_counter = InCounter + 1 }. + +pick_store(SeqId, #vqstate { q3 = Q3, + delta = #delta { start_seq_id = DeltaLimit } + = Delta}) -> + case q3tree:is_empty(Q3) orelse + not q3tree:is_empty(Q3) andalso SeqId < q3tree:least_key(Q3) of + true -> q4; + false -> BlankDelta = case Delta of + ?BLANK_DELTA_PATTERN(X) -> true; + _ -> false + end, + case BlankDelta orelse SeqId < DeltaLimit of + true -> q3; + false -> delta + end + end. + +%%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- |
