summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-12-17 12:05:43 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2010-12-17 12:05:43 +0000
commit13f55320029362d956ee84109e2448cbb5eea567 (patch)
treef9b4a772e920c1afe8f5db11c00d21fd5a013cc9 /src
parent71809f4e35e7c5c98735d13e970e7c19296d0345 (diff)
downloadrabbitmq-server-git-13f55320029362d956ee84109e2448cbb5eea567.tar.gz
That might just be enough to support confirms
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_slave.erl64
1 files changed, 48 insertions, 16 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 7fb13c5c23..d4623bf5fc 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -282,10 +282,34 @@ handle_msg([SPid], _From, Msg) ->
%% ---------------------------------------------------------------------------
maybe_run_queue_via_backing_queue(
- Fun, State = #state { backing_queue_state = BQS }) ->
- %% TODO: some CONFIRM-like thing with these Guids
- {_Guids, BQS1} = Fun(BQS),
- State #state { backing_queue_state = BQS1 }.
+ Fun, State = #state { backing_queue_state = BQS,
+ guid_to_channel = GTC }) ->
+ {Guids, BQS1} = Fun(BQS),
+ GTC1 = lists:foldl(fun maybe_confirm_message/2, GTC, Guids),
+ State #state { backing_queue_state = BQS1,
+ guid_to_channel = GTC1 }.
+
+record_confirm_or_confirm(#delivery { msg_seq_no = undefined }, _Q, GTC) ->
+ GTC;
+record_confirm_or_confirm(
+ #delivery { sender = ChPid,
+ message = #basic_message { is_persistent = true,
+ guid = Guid },
+ msg_seq_no = MsgSeqNo }, #amqqueue { durable = true }, GTC) ->
+ dict:store(Guid, {ChPid, MsgSeqNo}, GTC);
+record_confirm_or_confirm(#delivery { sender = ChPid, msg_seq_no = MsgSeqNo },
+ _Q, GTC) ->
+ ok = rabbit_channel:confirm(ChPid, MsgSeqNo),
+ GTC.
+
+maybe_confirm_message(Guid, GTC) ->
+ case dict:find(Guid, GTC) of
+ {ok, {ChPid, MsgSeqNo}} when MsgSeqNo =/= undefined ->
+ ok = rabbit_channel:confirm(ChPid, MsgSeqNo),
+ dict:erase(Guid, GTC);
+ error ->
+ GTC
+ end.
handle_process_result({continue, State}) -> noreply(State);
handle_process_result({stop, State}) -> {stop, normal, State}.
@@ -361,7 +385,7 @@ enqueue_message(Delivery = #delivery { sender = ChPid },
false -> {continue, State1}
end.
-process_instructions(State = #state { instructions = InstrQ }) ->
+process_instructions(State = #state { instructions = InstrQ }) ->
case queue:out(InstrQ) of
{empty, _InstrQ} ->
{continue, State};
@@ -378,10 +402,12 @@ process_instructions(State = #state { instructions = InstrQ }) ->
end.
process_instruction({publish, Deliver, Guid, MsgProps, ChPid},
- State = #state { sender_queues = SQ,
+ State = #state { q = Q,
+ sender_queues = SQ,
backing_queue = BQ,
backing_queue_state = BQS,
- guid_ack = GA }) ->
+ guid_ack = GA,
+ guid_to_channel = GTC }) ->
case dict:find(ChPid, SQ) of
error ->
blocked;
@@ -389,26 +415,32 @@ process_instruction({publish, Deliver, Guid, MsgProps, ChPid},
case queue:out(Q) of
{empty, _Q} ->
blocked;
- {{value, #delivery {
- message = Msg = #basic_message { guid = Guid } }}, Q1} ->
+ {{value, Delivery = #delivery {
+ message = Msg = #basic_message { guid = Guid } }},
+ Q1} ->
State1 = State #state { sender_queues =
dict:store(ChPid, Q1, SQ) },
+ GTC1 = record_confirm_or_confirm(Delivery, Q, GTC),
{processed,
case Deliver of
false ->
- %% RECORD CONFIRM - modify MsgProps
BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
- State1 #state {backing_queue_state = BQS1 };
+ State1 #state { backing_queue_state = BQS1,
+ guid_to_channel = GTC1 };
{true, AckRequired} ->
{AckTag, BQS1} = BQ:publish_delivered(
AckRequired, Msg, MsgProps,
ChPid, BQS),
- GA1 = case AckRequired of
- true -> dict:store(Guid, AckTag, GA);
- false -> GA
- end,
+ {GA1, GTC2} =
+ case AckRequired of
+ true ->
+ {dict:store(Guid, AckTag, GA), GTC1};
+ false ->
+ {GA, maybe_confirm_message(Guid, GTC1)}
+ end,
State1 #state { backing_queue_state = BQS1,
- guid_ack = GA1 }
+ guid_ack = GA1,
+ guid_to_channel = GTC2 }
end};
{{value, #delivery {}}, _Q1} ->
%% throw away the instruction: we'll never receive