summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-07-08 15:17:27 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-07-08 15:17:27 +0100
commit6b4377154da24d33d1258798171f1201cbe0027b (patch)
tree71907aed5cb57941a55060a7357ed95ee22d96c9 /src
parenta750351082950358a41189f7a00dca1c6203a587 (diff)
parentebdd43e4c7c0fc19be9e99e71ae6b6a71a44a771 (diff)
downloadrabbitmq-server-git-6b4377154da24d33d1258798171f1201cbe0027b.tar.gz
merge default into bug21673
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_invariable_queue.erl3
-rw-r--r--src/rabbit_queue_collector.erl (renamed from src/rabbit_reader_queue_collector.erl)30
-rw-r--r--src/rabbit_reader.erl6
4 files changed, 19 insertions, 22 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index fbbc50b9a1..da8225debc 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -735,7 +735,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
%% the connection shuts down.
ok = case Owner of
none -> ok;
- _ -> rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
+ _ -> rabbit_queue_collector:register(CollectorPid, Q)
end,
return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
{existing, _Q} ->
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index 0ae6ddb9d2..cf8bc8f77e 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -242,8 +242,7 @@ do_if_persistent(F, Txn, QName) ->
persist_message(QName, true, Txn, Msg = #basic_message {
is_persistent = true }) ->
Msg1 = Msg #basic_message {
- %% don't persist any recoverable decoded properties,
- %% rebuild from properties_bin on restore
+ %% don't persist any recoverable decoded properties
content = rabbit_binary_parser:clear_decoded_content(
Msg #basic_message.content)},
persist_work(Txn, QName,
diff --git a/src/rabbit_reader_queue_collector.erl b/src/rabbit_queue_collector.erl
index a9117e9c17..ea3768d4b4 100644
--- a/src/rabbit_reader_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -29,16 +29,16 @@
%% Contributor(s): ______________________________________.
%%
--module(rabbit_reader_queue_collector).
+-module(rabbit_queue_collector).
-behaviour(gen_server).
--export([start_link/0, register_exclusive_queue/2, delete_all/1, shutdown/1]).
+-export([start_link/0, register/2, delete_all/1, shutdown/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {exclusive_queues}).
+-record(state, {queues}).
-include("rabbit.hrl").
@@ -47,7 +47,7 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok(pid())).
--spec(register_exclusive_queue/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok').
+-spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok').
-spec(delete_all/1 :: (pid()) -> 'ok').
-endif.
@@ -57,8 +57,8 @@
start_link() ->
gen_server:start_link(?MODULE, [], []).
-register_exclusive_queue(CollectorPid, Q) ->
- gen_server:call(CollectorPid, {register_exclusive_queue, Q}, infinity).
+register(CollectorPid, Q) ->
+ gen_server:call(CollectorPid, {register, Q}, infinity).
delete_all(CollectorPid) ->
gen_server:call(CollectorPid, delete_all, infinity).
@@ -69,25 +69,24 @@ shutdown(CollectorPid) ->
%%----------------------------------------------------------------------------
init([]) ->
- {ok, #state{exclusive_queues = dict:new()}}.
+ {ok, #state{queues = dict:new()}}.
%%--------------------------------------------------------------------------
-handle_call({register_exclusive_queue, Q}, _From,
- State = #state{exclusive_queues = Queues}) ->
+handle_call({register, Q}, _From,
+ State = #state{queues = Queues}) ->
MonitorRef = erlang:monitor(process, Q#amqqueue.pid),
{reply, ok,
- State#state{exclusive_queues = dict:store(MonitorRef, Q, Queues)}};
+ State#state{queues = dict:store(MonitorRef, Q, Queues)}};
-handle_call(delete_all, _From,
- State = #state{exclusive_queues = ExclusiveQueues}) ->
+handle_call(delete_all, _From, State = #state{queues = Queues}) ->
[rabbit_misc:with_exit_handler(
fun () -> ok end,
fun () ->
erlang:demonitor(MonitorRef),
rabbit_amqqueue:delete(Q, false, false)
end)
- || {MonitorRef, Q} <- dict:to_list(ExclusiveQueues)],
+ || {MonitorRef, Q} <- dict:to_list(Queues)],
{reply, ok, State};
handle_call(shutdown, _From, State) ->
@@ -97,9 +96,8 @@ handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
- State = #state{exclusive_queues = ExclusiveQueues}) ->
- {noreply, State#state{exclusive_queues =
- dict:erase(MonitorRef, ExclusiveQueues)}}.
+ State = #state{queues = Queues}) ->
+ {noreply, State#state{queues = dict:erase(MonitorRef, Queues)}}.
terminate(_Reason, _State) ->
ok.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e355cd2632..b5514c822a 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -240,7 +240,7 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
ProfilingValue = setup_profiling(),
- {ok, Collector} = rabbit_reader_queue_collector:start_link(),
+ {ok, Collector} = rabbit_queue_collector:start_link(),
try
mainloop(Parent, Deb, switch_callback(
#v1{sock = ClientSock,
@@ -272,7 +272,7 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
%%
%% gen_tcp:close(ClientSock),
teardown_profiling(ProfilingValue),
- rabbit_reader_queue_collector:shutdown(Collector),
+ rabbit_queue_collector:shutdown(Collector),
rabbit_misc:unlink_and_capture_exit(Collector)
end,
done.
@@ -444,7 +444,7 @@ maybe_close(State = #v1{connection_state = closing,
%% connection, and are deleted when that connection closes."
%% This does not strictly imply synchrony, but in practice it seems
%% to be what people assume.
- rabbit_reader_queue_collector:delete_all(Collector),
+ rabbit_queue_collector:delete_all(Collector),
ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
close_connection(State);
_ -> State