diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-17 00:31:15 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-17 00:31:15 +0000 |
| commit | 91e9438fb63cab946e8dde6af72cdc2d81c9bb1b (patch) | |
| tree | b35d877f260a35aa8673592c90a0368c3373ec9a /src | |
| parent | faa35a799468acc38a014d781e535d08f1b97f35 (diff) | |
| download | rabbitmq-server-git-91e9438fb63cab946e8dde6af72cdc2d81c9bb1b.tar.gz | |
Support maybe_run_queue_via_backing_queue in the slaves, and add some comments about where to deal with confirmations. I think. Assuming my understanding of pubacks is right.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 25 |
2 files changed, 23 insertions, 4 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 2299c3d17b..0d64ab8e77 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -52,7 +52,7 @@ stop() -> %% Same as start/1. exit({not_valid_for_generic_backing_queue, ?MODULE}). -init(#amqqueue { arguments = Args, durable = false } = Q, Recover) -> +init(#amqqueue { arguments = Args } = Q, Recover) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, undefined), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), {_Type, Nodes} = rabbit_misc:table_lookup(Args, <<"x-mirror">>), diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index a9429ab80f..ac49b10b29 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -86,7 +86,9 @@ sender_queues, %% :: Pid -> MsgQ guid_ack, %% :: Guid -> AckTag - instructions %% :: InstrQ + instructions, %% :: InstrQ + + guid_to_channel %% for confirms }). -define(RAM_DURATION_UPDATE_INTERVAL, 5000). @@ -138,7 +140,9 @@ init([#amqqueue { name = QueueName } = Q]) -> sender_queues = dict:new(), guid_ack = dict:new(), - instructions = queue:new() + instructions = queue:new(), + + guid_to_channel = dict:new() }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}; @@ -172,8 +176,14 @@ handle_call({gm_deaths, Deaths}, From, gen_server2:reply(From, ok), ok = gm:broadcast(GM, heartbeat), noreply(State #state { master_node = MNode1 }) - end. + end; +handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> + reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). + + +handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> + noreply(maybe_run_queue_via_backing_queue(Fun, State)); handle_cast({gm, Instruction}, State = #state { instructions = InstrQ }) -> State1 = State #state { instructions = queue:in(Instruction, InstrQ) }, @@ -271,6 +281,12 @@ handle_msg([SPid], _From, Msg) -> %% Others %% --------------------------------------------------------------------------- +maybe_run_queue_via_backing_queue( + Fun, State = #state { backing_queue_state = BQS }) -> + %% TODO: some CONFIRM-like thing with these Guids + {_Guids, BQS1} = Fun(BQS), + State #state { backing_queue_state = BQS1 }. + handle_process_result({continue, State}) -> noreply(State); handle_process_result({stop, State}) -> {stop, normal, State}. @@ -380,6 +396,7 @@ process_instruction({publish, Deliver, Guid, MsgProps, ChPid}, {processed, case Deliver of false -> + %% RECORD CONFIRM - modify MsgProps BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), State1 #state {backing_queue_state = BQS1 }; {true, AckRequired} -> @@ -439,6 +456,7 @@ process_instruction({ack, Guids}, {AckTags, GA1} = guids_to_acktags(Guids, GA), {Guids1, BQS1} = BQ:ack(AckTags, BQS), [] = Guids1 -- Guids, %% ASSERTION + %% CONFIRM - persistent but delivered faster than disk sync {processed, State #state { guid_ack = GA1, backing_queue_state = BQS1 }}; process_instruction({requeue, MsgPropsFun, Guids}, @@ -457,6 +475,7 @@ process_instruction({requeue, MsgPropsFun, Guids}, %% GA {_Count, BQS1} = BQ:purge(BQS), {Guids, BQS2} = ack_all(BQ, GA, BQS1), + %% CONFIRM these Guids State #state { guid_ack = dict:new(), backing_queue_state = BQS2 } end}; |
