diff options
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 49 |
1 files changed, 27 insertions, 22 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 45899d5020..02212ef9a5 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -49,6 +49,8 @@ -define(CREATION_EVENT_KEYS, [pid, + name, + master_pid, is_synchronised ]). @@ -108,31 +110,32 @@ init([#amqqueue { name = QueueName } = Q]) -> end), erlang:monitor(process, MPid), ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [self()]), + rabbit_amqqueue, set_maximum_since_use, [Self]), ok = rabbit_memory_monitor:register( - self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), + Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), {ok, BQ} = application:get_env(backing_queue_module), BQS = bq_init(BQ, Q, false), + State = #state { q = Q, + gm = GM, + master_pid = MPid, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = undefined, + sync_timer_ref = undefined, + + sender_queues = dict:new(), + msg_id_ack = dict:new(), + ack_num = 0, + + msg_id_status = dict:new(), + known_senders = dict:new(), + + synchronised = false + }, rabbit_event:notify(queue_slave_created, - [{name, QueueName}, {pid, self()}, {master_pid, MPid}]), + infos(?CREATION_EVENT_KEYS, State)), ok = gm:broadcast(GM, request_length), - {ok, #state { q = Q, - gm = GM, - master_pid = MPid, - backing_queue = BQ, - backing_queue_state = BQS, - rate_timer_ref = undefined, - sync_timer_ref = undefined, - - sender_queues = dict:new(), - msg_id_ack = dict:new(), - ack_num = 0, - - msg_id_status = dict:new(), - known_senders = dict:new(), - - synchronised = false - }, hibernate, + {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) -> @@ -340,8 +343,10 @@ inform_deaths(SPid, Deaths) -> infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. -i(pid, _State) -> self(); -i(is_synchronised, State) -> State #state.synchronised; +i(pid, _State) -> self(); +i(name, #state { q = #amqqueue { name = Name } }) -> Name; +i(master_pid, #state { master_pid = MPid }) -> MPid; +i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised; i(Item, _State) -> throw({bad_argument, Item}). bq_init(BQ, Q, Recover) -> |
