diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-02-01 20:38:02 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-02-01 20:38:02 +0000 |
| commit | d337e971ea1fc7cb6bedcc8661fd5e6c172dfdd4 (patch) | |
| tree | d159ea32eee32f8cf4a5be4c5b1462fd4cc28914 | |
| parent | ae44f419a09699504168ae47e8f5ce3780f19bfe (diff) | |
| download | rabbitmq-server-git-d337e971ea1fc7cb6bedcc8661fd5e6c172dfdd4.tar.gz | |
introduce channel registry
| -rw-r--r-- | src/rabbit_channel.erl | 23 |
1 files changed, 15 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f8e100979d..ab4fc6e3e0 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -37,6 +37,7 @@ -export([start_link/5, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2]). +-export([all/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). @@ -62,6 +63,7 @@ -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). +-spec(all/0 :: () -> [pid()]). -endif. @@ -91,12 +93,16 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> conserve_memory(Pid, Conserve) -> gen_server2:pcast(Pid, 9, {conserve_memory, Conserve}). +all() -> + pg_local:get_members(rabbit_channels). + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + ok = pg_local:join(rabbit_channels, self()), {ok, #ch{state = starting, channel = Channel, reader_pid = ReaderPid, @@ -168,20 +174,16 @@ handle_info(timeout, State) -> ok = clear_permission_cache(), {noreply, State, hibernate}. -terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid, - state = terminating}) -> - rabbit_writer:shutdown(WriterPid), - rabbit_limiter:shutdown(LimiterPid); +terminate(_Reason, State = #ch{state = terminating}) -> + terminate(State); -terminate(Reason, State = #ch{writer_pid = WriterPid, - limiter_pid = LimiterPid}) -> +terminate(Reason, State) -> Res = rollback_and_notify(State), case Reason of normal -> ok = Res; _ -> ok end, - rabbit_writer:shutdown(WriterPid), - rabbit_limiter:shutdown(LimiterPid). + terminate(State). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -951,3 +953,8 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, WriterPid, QPid, self(), M, Content); false -> rabbit_writer:send_command(WriterPid, M, Content) end. + +terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> + pg_local:leave(rabbit_channels, self()), + rabbit_writer:shutdown(WriterPid), + rabbit_limiter:shutdown(LimiterPid). |
