diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-11-26 12:02:53 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-11-26 12:02:53 +0000 |
| commit | 49a8392111d5ce1e1b87e4d831a4f61f114ed9a5 (patch) | |
| tree | 51c1da64107ea9102cb145ec0770fc12e384c390 /src | |
| parent | 29cdb59f55490ee190a393b9fc03dd29fe08f142 (diff) | |
| download | rabbitmq-server-git-49a8392111d5ce1e1b87e4d831a4f61f114ed9a5.tar.gz | |
Add spec, fix API, refuse to run if there are pending acks, return status, don't throw away state.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 1 |
4 files changed, 28 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4d95778947..ad81ba032c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -173,6 +173,8 @@ (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -spec(start_mirroring/1 :: (pid()) -> 'ok'). -spec(stop_mirroring/1 :: (pid()) -> 'ok'). +-spec(sync_mirrors/1 :: (rabbit_types:amqqueue()) -> + 'ok' | error('queue_has_pending_acks') | error('queue_not_mirrored')). -endif. @@ -591,11 +593,7 @@ set_maximum_since_use(QPid, Age) -> start_mirroring(QPid) -> ok = delegate_call(QPid, start_mirroring). stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring). -sync_mirrors(Name) -> - case lookup(Name) of - {ok, #amqqueue{pid = QPid}} -> delegate_cast(QPid, sync_mirrors); - _ -> ok - end. +sync_mirrors(#amqqueue{pid = QPid}) -> delegate_call(QPid, sync_mirrors). on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 07f4c3b190..19872d8d0c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1171,6 +1171,24 @@ handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ, reply(ok, State#q{backing_queue = BQ1, backing_queue_state = BQS1}); +handle_call(sync_mirrors, From, + State = #q{q = #amqqueue{name = Name}, + backing_queue = rabbit_mirror_queue_master = BQ, + backing_queue_state = BQS}) -> + case BQ:depth(BQS) - BQ:len(BQS) of + 0 -> + {ok, #amqqueue{slave_pids = SPids, sync_slave_pids = SSPids}} = + rabbit_amqqueue:lookup(Name), + gen_server2:reply(From, ok), + noreply(rabbit_mirror_queue_master:sync_mirrors( + SPids -- SSPids, Name, BQS)); + _ -> + reply({error, queue_has_pending_acks}, State) + end; + +handle_call(sync_mirrors, _From, State) -> + reply({error, queue_not_mirrored}, State); + handle_call(force_event_refresh, _From, State = #q{exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), @@ -1300,20 +1318,6 @@ handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) -> cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State) end; -handle_cast(sync_mirrors, - State = #q{q = #amqqueue{name = Name}, - backing_queue = BQ, - backing_queue_state = BQS}) -> - case BQ of - rabbit_mirror_queue_master -> - {ok, #amqqueue{slave_pids = SPids, sync_slave_pids = SSPids}} = - rabbit_amqqueue:lookup(Name), - rabbit_mirror_queue_master:sync_mirrors(SPids -- SSPids, Name, BQS); - _ -> - ok - end, - noreply(State); - handle_cast(wake_up, State) -> noreply(State). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 1664260423..ea9e04fedf 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -127,12 +127,12 @@ stop_mirroring(State = #state { coordinator = CPid, stop_all_slaves(shutdown, State), {BQ, BQS}. -sync_mirrors([], Name, _State) -> +sync_mirrors([], Name, State) -> rabbit_log:info("Synchronising ~s: nothing to do~n", [rabbit_misc:rs(Name)]), - ok; -sync_mirrors(SPids, Name, #state { backing_queue = BQ, - backing_queue_state = BQS }) -> + State; +sync_mirrors(SPids, Name, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> rabbit_log:info("Synchronising ~s with slaves ~p~n", [rabbit_misc:rs(Name), SPids]), Ref = make_ref(), @@ -153,7 +153,7 @@ sync_mirrors(SPids, Name, #state { backing_queue = BQ, end], SPid1 =/= dead], [erlang:demonitor(MRef) || {_, MRef} <- SPidsMRefs], - {Total, _BQS} = + {Total, BQS1} = BQ:fold(fun ({Msg, MsgProps}, I) -> wait_for_credit(), [begin @@ -171,7 +171,7 @@ sync_mirrors(SPids, Name, #state { backing_queue = BQ, [SPid ! {sync_complete, Ref} || SPid <- SPids1], rabbit_log:info("Synchronising ~s: ~p messages; complete~n", [rabbit_misc:rs(Name), Total]), - ok. + State#state{backing_queue_state = BQS1}. wait_for_credit() -> case credit_flow:blocked() of diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index d408c56eda..f4130e0250 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -857,6 +857,7 @@ sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ, sync_loop(Ref, MRef, MPid, State); {sync_complete, Ref} -> erlang:demonitor(MRef), + %% We can only sync when there are no pending acks set_delta(0, State); {sync_message, Ref, Msg, Props0} -> credit_flow:ack(MPid, ?CREDIT_DISC_BOUND), |
