summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-08-15 17:42:09 +0100
committerEmile Joubert <emile@rabbitmq.com>2011-08-15 17:42:09 +0100
commit7c151ba0a8f3482d3651e0dff3244f5fb3a02cac (patch)
tree9b1c7d33a7730fd37caf7cd167b5a0237e95825a
parent43b0f15fc6ade39b98c31298fa9393ce5142b4af (diff)
downloadrabbitmq-server-git-7c151ba0a8f3482d3651e0dff3244f5fb3a02cac.tar.gz
Initial version of requeue preserving order
-rw-r--r--src/rabbit_variable_queue.erl117
1 files changed, 91 insertions, 26 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f49b10aa87..203e3f8c92 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -560,27 +560,10 @@ ack(AckTags, State) ->
{MsgIds, a(State1)}.
requeue(AckTags, MsgPropsFun, State) ->
- MsgPropsFun1 = fun (MsgProps) ->
- (MsgPropsFun(MsgProps)) #message_properties {
- needs_confirming = false }
- end,
- {MsgIds, State1} =
- ack(fun (_, _, _) -> ok end,
- fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
- {_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps),
- true, false, State1),
- State2;
- ({IsPersistent, MsgId, MsgProps, _IndexOnDisk}, State1) ->
- #vqstate { msg_store_clients = MSCState } = State1,
- {{ok, Msg = #basic_message{}}, MSCState1} =
- msg_store_read(MSCState, IsPersistent, MsgId),
- State2 = State1 #vqstate { msg_store_clients = MSCState1 },
- {_SeqId, State3} = publish(Msg, MsgPropsFun1(MsgProps),
- true, true, State2),
- State3
- end,
- AckTags, State),
- {MsgIds, a(reduce_memory_use(State1))}.
+ lists:foldl(fun (AckTag, {MsgIds, S}) ->
+ {MsgId, S1} = requeue_single(AckTag, MsgPropsFun, S),
+ {[MsgId | MsgIds], reduce_memory_use(S1)}
+ end, {[], State}, lists:sort(fun erlang:'>='/2, AckTags)).
len(#vqstate { len = Len }) -> Len.
@@ -1031,11 +1014,11 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
end,
Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
IndexState2 =
- case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of
- {false, true, false, _} -> Rem(), IndexState1;
- {false, true, true, _} -> Rem(), Ack();
- { true, true, true, false} -> Ack();
- _ -> IndexState1
+ case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent, IsDelivered} of
+ {false, true, false, _, _} -> Rem(), IndexState1;
+ {false, true, true, _, false} -> Rem(), Ack();
+ { true, true, true, false, false} -> Ack();
+ _ -> IndexState1
end,
%% 3. If an ack is required, add something sensible to PA
@@ -1337,6 +1320,88 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
end).
%%----------------------------------------------------------------------------
+%% Internal plumbing for requeue
+%%----------------------------------------------------------------------------
+
+requeue_single(AckTag, MsgPropsFun, #vqstate { pending_ack = PA,
+ ram_ack_index = RAI } = State) ->
+ MsgPropsFun1 = fun (MsgProps) ->
+ (MsgPropsFun(MsgProps)) #message_properties {
+ needs_confirming = false }
+ end,
+ State1 = State #vqstate { pending_ack = dict:erase(AckTag, PA),
+ ram_ack_index = gb_trees:delete_any(AckTag, RAI) },
+ #msg_status { msg_id = MsgId1,
+ msg_props = MsgProps1 } = MsgStatus1 =
+ case dict:fetch(AckTag, PA) of
+ {IsPersistent, MsgId, MsgProps, IndexOnDisk} ->
+ #msg_status { seq_id = AckTag,
+ msg_id = MsgId,
+ msg = undefined,
+ is_persistent = IsPersistent,
+ is_delivered = true,
+ msg_on_disk = true,
+ index_on_disk = IndexOnDisk,
+ msg_props = MsgProps };
+ #msg_status{} = MsgStatus0 -> MsgStatus0
+ end,
+ {MsgId1, publish_r(MsgStatus1 #msg_status{
+ msg_props = MsgPropsFun1(MsgProps1)}, State1)}.
+
+publish_r(MsgStatus = #msg_status { seq_id = SeqId,
+ msg = Msg,
+ index_on_disk = IndexOnDisk },
+ State = #vqstate { q3 = Q3,
+ q4 = Q4,
+ delta = Delta,
+ len = Len,
+ in_counter = InCounter,
+ ram_msg_count = RamMsgCount,
+ ram_index_count = RamIndexCount }) ->
+ (case pick_store(SeqId, State) of
+ q4 -> case Msg of
+ undefined ->
+ {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
+ read_msg(MsgStatus, State),
+ State1 #vqstate { q4 = gb_trees:insert(
+ SeqId, MsgStatus1, Q4a)};
+ #basic_message{} ->
+ State #vqstate { q4 = gb_trees:insert(SeqId, MsgStatus,
+ Q4),
+ ram_msg_count = RamMsgCount + 1 }
+ end;
+ q3 -> State #vqstate {
+ q3 = q3tree:in_r(IndexOnDisk, MsgStatus, Q3),
+ ram_index_count = RamIndexCount + one_if(not IndexOnDisk),
+ ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) };
+
+ delta -> #delta { start_seq_id = StartSeqId,
+ count = Count,
+ end_seq_id = EndSeqId} = Delta,
+ Delta1 = Delta #delta { start_seq_id = min(SeqId, StartSeqId),
+ count = Count + 1,
+ end_seq_id = max(SeqId + 1, EndSeqId)},
+ State #vqstate { delta = Delta1 }
+ end) #vqstate { len = Len + 1,
+ in_counter = InCounter + 1 }.
+
+pick_store(SeqId, #vqstate { q3 = Q3,
+ delta = #delta { start_seq_id = DeltaLimit }
+ = Delta}) ->
+ case q3tree:is_empty(Q3) orelse
+ not q3tree:is_empty(Q3) andalso SeqId < q3tree:least_key(Q3) of
+ true -> q4;
+ false -> BlankDelta = case Delta of
+ ?BLANK_DELTA_PATTERN(X) -> true;
+ _ -> false
+ end,
+ case BlankDelta orelse SeqId < DeltaLimit of
+ true -> q3;
+ false -> delta
+ end
+ end.
+
+%%----------------------------------------------------------------------------
%% Phase changes
%%----------------------------------------------------------------------------