diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-17 12:05:43 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-17 12:05:43 +0000 |
| commit | 13f55320029362d956ee84109e2448cbb5eea567 (patch) | |
| tree | f9b4a772e920c1afe8f5db11c00d21fd5a013cc9 /src | |
| parent | 71809f4e35e7c5c98735d13e970e7c19296d0345 (diff) | |
| download | rabbitmq-server-git-13f55320029362d956ee84109e2448cbb5eea567.tar.gz | |
That might just be enough to support confirms
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 64 |
1 files changed, 48 insertions, 16 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 7fb13c5c23..d4623bf5fc 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -282,10 +282,34 @@ handle_msg([SPid], _From, Msg) -> %% --------------------------------------------------------------------------- maybe_run_queue_via_backing_queue( - Fun, State = #state { backing_queue_state = BQS }) -> - %% TODO: some CONFIRM-like thing with these Guids - {_Guids, BQS1} = Fun(BQS), - State #state { backing_queue_state = BQS1 }. + Fun, State = #state { backing_queue_state = BQS, + guid_to_channel = GTC }) -> + {Guids, BQS1} = Fun(BQS), + GTC1 = lists:foldl(fun maybe_confirm_message/2, GTC, Guids), + State #state { backing_queue_state = BQS1, + guid_to_channel = GTC1 }. + +record_confirm_or_confirm(#delivery { msg_seq_no = undefined }, _Q, GTC) -> + GTC; +record_confirm_or_confirm( + #delivery { sender = ChPid, + message = #basic_message { is_persistent = true, + guid = Guid }, + msg_seq_no = MsgSeqNo }, #amqqueue { durable = true }, GTC) -> + dict:store(Guid, {ChPid, MsgSeqNo}, GTC); +record_confirm_or_confirm(#delivery { sender = ChPid, msg_seq_no = MsgSeqNo }, + _Q, GTC) -> + ok = rabbit_channel:confirm(ChPid, MsgSeqNo), + GTC. + +maybe_confirm_message(Guid, GTC) -> + case dict:find(Guid, GTC) of + {ok, {ChPid, MsgSeqNo}} when MsgSeqNo =/= undefined -> + ok = rabbit_channel:confirm(ChPid, MsgSeqNo), + dict:erase(Guid, GTC); + error -> + GTC + end. handle_process_result({continue, State}) -> noreply(State); handle_process_result({stop, State}) -> {stop, normal, State}. @@ -361,7 +385,7 @@ enqueue_message(Delivery = #delivery { sender = ChPid }, false -> {continue, State1} end. -process_instructions(State = #state { instructions = InstrQ }) -> +process_instructions(State = #state { instructions = InstrQ }) -> case queue:out(InstrQ) of {empty, _InstrQ} -> {continue, State}; @@ -378,10 +402,12 @@ process_instructions(State = #state { instructions = InstrQ }) -> end. process_instruction({publish, Deliver, Guid, MsgProps, ChPid}, - State = #state { sender_queues = SQ, + State = #state { q = Q, + sender_queues = SQ, backing_queue = BQ, backing_queue_state = BQS, - guid_ack = GA }) -> + guid_ack = GA, + guid_to_channel = GTC }) -> case dict:find(ChPid, SQ) of error -> blocked; @@ -389,26 +415,32 @@ process_instruction({publish, Deliver, Guid, MsgProps, ChPid}, case queue:out(Q) of {empty, _Q} -> blocked; - {{value, #delivery { - message = Msg = #basic_message { guid = Guid } }}, Q1} -> + {{value, Delivery = #delivery { + message = Msg = #basic_message { guid = Guid } }}, + Q1} -> State1 = State #state { sender_queues = dict:store(ChPid, Q1, SQ) }, + GTC1 = record_confirm_or_confirm(Delivery, Q, GTC), {processed, case Deliver of false -> - %% RECORD CONFIRM - modify MsgProps BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), - State1 #state {backing_queue_state = BQS1 }; + State1 #state { backing_queue_state = BQS1, + guid_to_channel = GTC1 }; {true, AckRequired} -> {AckTag, BQS1} = BQ:publish_delivered( AckRequired, Msg, MsgProps, ChPid, BQS), - GA1 = case AckRequired of - true -> dict:store(Guid, AckTag, GA); - false -> GA - end, + {GA1, GTC2} = + case AckRequired of + true -> + {dict:store(Guid, AckTag, GA), GTC1}; + false -> + {GA, maybe_confirm_message(Guid, GTC1)} + end, State1 #state { backing_queue_state = BQS1, - guid_ack = GA1 } + guid_ack = GA1, + guid_to_channel = GTC2 } end}; {{value, #delivery {}}, _Q1} -> %% throw away the instruction: we'll never receive |
