diff options
| author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-08-31 15:44:20 +0100 |
|---|---|---|
| committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-08-31 15:44:20 +0100 |
| commit | bc2b4fe3fe292479da2a0a2f348ddad137499b5c (patch) | |
| tree | e093d00636f75f50c19905f508665c33fa9d6c80 | |
| parent | 1e9b9d5cc27eb7d8aa451274ea56bada1f2c56fc (diff) | |
| download | rabbitmq-server-git-bc2b4fe3fe292479da2a0a2f348ddad137499b5c.tar.gz | |
get the external pending acks at the beginning
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 5 |
2 files changed, 7 insertions, 5 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index bd33e955c9..6cfc13c765 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -96,7 +96,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback), - ok = gm:broadcast(GM, {length, BQ:len(BQS)}), + ok = gm:broadcast(GM, {length, BQ:len(BQS), BQ:pending_ack(BQS)}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -375,7 +375,7 @@ discard(Msg = #basic_message { id = MsgId }, ChPid, promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> Len = BQ:len(BQS), - ok = gm:broadcast(GM, {length, Len}), + ok = gm:broadcast(GM, {length, Len, BQ:pending_ack(BQS)}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -406,7 +406,8 @@ length_fun() -> fun (?MODULE, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> - ok = gm:broadcast(GM, {length, BQ:len(BQS)}), + ok = gm:broadcast( + GM, {length, BQ:len(BQS), BQ:pending_ack(BQS)}), State end) end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 58b9b64442..38bbf59f3c 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -887,8 +887,9 @@ process_instruction({sender_death, ChPid}, msg_id_status = MS1, known_senders = pmon:demonitor(ChPid, KS) } end}; -process_instruction({length, Length}, State) -> - {ok, set_synchronised(Length, State)}; +process_instruction({length, Length, ExtPending}, State) -> + {ok, set_synchronised(Length, + State #state { external_pending = ExtPending })}; process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> |
