diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 5 |
2 files changed, 7 insertions, 5 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index bd33e955c9..6cfc13c765 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -96,7 +96,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback), - ok = gm:broadcast(GM, {length, BQ:len(BQS)}), + ok = gm:broadcast(GM, {length, BQ:len(BQS), BQ:pending_ack(BQS)}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -375,7 +375,7 @@ discard(Msg = #basic_message { id = MsgId }, ChPid, promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> Len = BQ:len(BQS), - ok = gm:broadcast(GM, {length, Len}), + ok = gm:broadcast(GM, {length, Len, BQ:pending_ack(BQS)}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -406,7 +406,8 @@ length_fun() -> fun (?MODULE, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> - ok = gm:broadcast(GM, {length, BQ:len(BQS)}), + ok = gm:broadcast( + GM, {length, BQ:len(BQS), BQ:pending_ack(BQS)}), State end) end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 58b9b64442..38bbf59f3c 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -887,8 +887,9 @@ process_instruction({sender_death, ChPid}, msg_id_status = MS1, known_senders = pmon:demonitor(ChPid, KS) } end}; -process_instruction({length, Length}, State) -> - {ok, set_synchronised(Length, State)}; +process_instruction({length, Length, ExtPending}, State) -> + {ok, set_synchronised(Length, + State #state { external_pending = ExtPending })}; process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> |
