diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-09 16:52:22 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-09 16:52:22 +0100 |
| commit | d4490aaa6a3944b0cd0b852a9a8fe1237c56d0a9 (patch) | |
| tree | eba1ecba071f432b1352540b5313fda9064cfe9c /src | |
| parent | ac042d70eaf31c10bec0692a55039508b20afd3c (diff) | |
| download | rabbitmq-server-git-d4490aaa6a3944b0cd0b852a9a8fe1237c56d0a9.tar.gz | |
Added ack to vq. Realised that ack no longer needs the msg itself, so there've been a few associated changes in queue_process.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 16 |
2 files changed, 25 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5789b1059c..06c6cd853a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -278,11 +278,10 @@ deliver_from_queue_deliver(AckRequired, {false, AutoAcks}, State = #q { mixed_state = MS }) -> {{Msg, IsDelivered, AckTag, Remaining}, MS1} = rabbit_mixed_queue:fetch(MS), - AutoAcks1 = - case AckRequired of - true -> AutoAcks; - false -> [{Msg, AckTag} | AutoAcks] - end, + AutoAcks1 = case AckRequired of + true -> AutoAcks; + false -> [AckTag | AutoAcks] + end, {{Msg, IsDelivered, AckTag}, {0 == Remaining, AutoAcks1}, State #q { mixed_state = MS1 }}. @@ -348,8 +347,8 @@ deliver_or_requeue_n(MsgsWithAcks, State) -> deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) -> 0 < Len. deliver_or_requeue_msgs_deliver( - false, {Len, AcksAcc, [(MsgAckTag = {Msg, _}) | MsgsWithAcks]}, State) -> - {{Msg, true, noack}, {Len - 1, [MsgAckTag | AcksAcc], MsgsWithAcks}, State}; + false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) -> + {{Msg, true, noack}, {Len - 1, [AckTag | AcksAcc], MsgsWithAcks}, State}; deliver_or_requeue_msgs_deliver( true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) -> {{Msg, true, AckTag}, {Len - 1, AcksAcc, MsgsWithAcks}, State}. @@ -620,7 +619,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, store_ch_record(C#cr{unacked_messages = NewUAM}), {ok, MS1}; false -> - rabbit_mixed_queue:ack([{Msg, AckTag}], MS1) + rabbit_mixed_queue:ack([AckTag], MS1) end, Message = {QName, self(), NextId, IsDelivered, Msg}, reply({ok, Remaining, Message}, @@ -764,8 +763,9 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> case Txn of none -> {MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM), - {ok, MS} = - rabbit_mixed_queue:ack(MsgWithAcks, State #q.mixed_state), + {ok, MS} = rabbit_mixed_queue:ack( + [AckTag || {_Msg, AckTag} <- MsgWithAcks], + State #q.mixed_state), store_ch_record(C#cr{unacked_messages = Remaining}), noreply(State #q { mixed_state = MS }); _ -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index a6574d4790..5cf0893973 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -32,7 +32,7 @@ -module(rabbit_variable_queue). -export([init/1, publish/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1, - fetch/1, len/1, is_empty/1, maybe_start_prefetcher/1]). + fetch/1, ack/2, len/1, is_empty/1, maybe_start_prefetcher/1]). %%---------------------------------------------------------------------------- @@ -246,6 +246,20 @@ maybe_start_prefetcher(State = #vqstate { ram_msg_count = RamMsgCount, maybe_start_prefetcher(State) -> State. +ack(AckTags, State = #vqstate { index_state = IndexState }) -> + {MsgIds, SeqIds} = + lists:foldl( + fun (ack_not_on_disk, Acc) -> Acc; + ({ack_index_and_store, MsgId, SeqId}, {MsgIds, SeqIds}) -> + {[MsgId | MsgIds], [SeqId, SeqIds]} + end, {[], []}, AckTags), + IndexState1 = case SeqIds of + [] -> IndexState; + _ -> rabbit_queue_index:write_acks(SeqIds, IndexState) + end, + ok = rabbit_msg_store:remove(MsgIds), + State #vqstate { index_state = IndexState1 }. + %%---------------------------------------------------------------------------- publish(msg, Msg = #basic_message { guid = MsgId, |
