summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-09 16:52:22 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-09 16:52:22 +0100
commitd4490aaa6a3944b0cd0b852a9a8fe1237c56d0a9 (patch)
treeeba1ecba071f432b1352540b5313fda9064cfe9c /src
parentac042d70eaf31c10bec0692a55039508b20afd3c (diff)
downloadrabbitmq-server-git-d4490aaa6a3944b0cd0b852a9a8fe1237c56d0a9.tar.gz
Added ack to vq. Realised that ack no longer needs the msg itself, so there've been a few associated changes in queue_process.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl20
-rw-r--r--src/rabbit_variable_queue.erl16
2 files changed, 25 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5789b1059c..06c6cd853a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -278,11 +278,10 @@ deliver_from_queue_deliver(AckRequired, {false, AutoAcks},
State = #q { mixed_state = MS }) ->
{{Msg, IsDelivered, AckTag, Remaining}, MS1} =
rabbit_mixed_queue:fetch(MS),
- AutoAcks1 =
- case AckRequired of
- true -> AutoAcks;
- false -> [{Msg, AckTag} | AutoAcks]
- end,
+ AutoAcks1 = case AckRequired of
+ true -> AutoAcks;
+ false -> [AckTag | AutoAcks]
+ end,
{{Msg, IsDelivered, AckTag}, {0 == Remaining, AutoAcks1},
State #q { mixed_state = MS1 }}.
@@ -348,8 +347,8 @@ deliver_or_requeue_n(MsgsWithAcks, State) ->
deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) ->
0 < Len.
deliver_or_requeue_msgs_deliver(
- false, {Len, AcksAcc, [(MsgAckTag = {Msg, _}) | MsgsWithAcks]}, State) ->
- {{Msg, true, noack}, {Len - 1, [MsgAckTag | AcksAcc], MsgsWithAcks}, State};
+ false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) ->
+ {{Msg, true, noack}, {Len - 1, [AckTag | AcksAcc], MsgsWithAcks}, State};
deliver_or_requeue_msgs_deliver(
true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) ->
{{Msg, true, AckTag}, {Len - 1, AcksAcc, MsgsWithAcks}, State}.
@@ -620,7 +619,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
store_ch_record(C#cr{unacked_messages = NewUAM}),
{ok, MS1};
false ->
- rabbit_mixed_queue:ack([{Msg, AckTag}], MS1)
+ rabbit_mixed_queue:ack([AckTag], MS1)
end,
Message = {QName, self(), NextId, IsDelivered, Msg},
reply({ok, Remaining, Message},
@@ -764,8 +763,9 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
case Txn of
none ->
{MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM),
- {ok, MS} =
- rabbit_mixed_queue:ack(MsgWithAcks, State #q.mixed_state),
+ {ok, MS} = rabbit_mixed_queue:ack(
+ [AckTag || {_Msg, AckTag} <- MsgWithAcks],
+ State #q.mixed_state),
store_ch_record(C#cr{unacked_messages = Remaining}),
noreply(State #q { mixed_state = MS });
_ ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index a6574d4790..5cf0893973 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -32,7 +32,7 @@
-module(rabbit_variable_queue).
-export([init/1, publish/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1,
- fetch/1, len/1, is_empty/1, maybe_start_prefetcher/1]).
+ fetch/1, ack/2, len/1, is_empty/1, maybe_start_prefetcher/1]).
%%----------------------------------------------------------------------------
@@ -246,6 +246,20 @@ maybe_start_prefetcher(State = #vqstate { ram_msg_count = RamMsgCount,
maybe_start_prefetcher(State) ->
State.
+ack(AckTags, State = #vqstate { index_state = IndexState }) ->
+ {MsgIds, SeqIds} =
+ lists:foldl(
+ fun (ack_not_on_disk, Acc) -> Acc;
+ ({ack_index_and_store, MsgId, SeqId}, {MsgIds, SeqIds}) ->
+ {[MsgId | MsgIds], [SeqId, SeqIds]}
+ end, {[], []}, AckTags),
+ IndexState1 = case SeqIds of
+ [] -> IndexState;
+ _ -> rabbit_queue_index:write_acks(SeqIds, IndexState)
+ end,
+ ok = rabbit_msg_store:remove(MsgIds),
+ State #vqstate { index_state = IndexState1 }.
+
%%----------------------------------------------------------------------------
publish(msg, Msg = #basic_message { guid = MsgId,