summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-06-30 11:58:20 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-06-30 11:58:20 +0100
commit90ff70a20162ca1c9fd8f20bb16d9c5ed416f682 (patch)
treea916628a3a6dadffbd5a554db30360579c39e418 /src
parent85586512a898bd0a2757a586d274e26137d185a8 (diff)
parent5cca679cd5057a1962ef23ddbfb6e98b79b92129 (diff)
downloadrabbitmq-server-git-90ff70a20162ca1c9fd8f20bb16d9c5ed416f682.tar.gz
Merging bug21673 into bug22896
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl57
-rw-r--r--src/rabbit_msg_store.erl10
-rw-r--r--src/rabbit_msg_store_gc.erl2
-rw-r--r--src/rabbit_networking.erl2
-rw-r--r--src/rabbit_reader.erl18
5 files changed, 49 insertions, 40 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index d337df294f..8649ecc7f1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -726,42 +726,33 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
end,
%% We use this in both branches, because queue_declare may yet return an
%% existing queue.
- Finish = fun (#amqqueue{name = QueueName,
- durable = Durable1,
- auto_delete = AutoDelete1} = Q)
- when Durable =:= Durable1, AutoDelete =:= AutoDelete1 ->
- check_exclusive_access(Q, Owner, strict),
- check_configure_permitted(QueueName, State),
- %% We need to notify the reader within the channel
- %% process so that we can be sure there are no
- %% outstanding exclusive queues being declared as the
- %% connection shuts down.
- case Owner of
- none -> ok;
- _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
- end,
- Q;
- %% non-equivalence trumps exclusivity arbitrarily
- (#amqqueue{name = QueueName}) ->
- rabbit_misc:protocol_error(
- precondition_failed,
- "parameters for ~s not equivalent",
- [rabbit_misc:rs(QueueName)])
- end,
- Q = case rabbit_amqqueue:with(
- rabbit_misc:r(VHostPath, queue, QueueNameBin),
- Finish) of
- {error, not_found} ->
- ActualNameBin =
- case QueueNameBin of
+ ActualNameBin = case QueueNameBin of
<<>> -> rabbit_guid:binstring_guid("amq.gen");
Other -> check_name('queue', Other)
end,
- QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
- Args, Owner));
- #amqqueue{} = Other ->
- Other
+ QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
+ Q = case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
+ Args, Owner) of
+ #amqqueue{name = QueueName,
+ durable = Durable1,
+ auto_delete = AutoDelete1} = Q1
+ when Durable =:= Durable1, AutoDelete =:= AutoDelete1 ->
+ check_exclusive_access(Q1, Owner, strict),
+ check_configure_permitted(QueueName, State),
+ %% We need to notify the reader within the channel
+ %% process so that we can be sure there are no
+ %% outstanding exclusive queues being declared as the
+ %% connection shuts down.
+ case Owner of
+ none -> ok;
+ _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q1)
+ end,
+ Q1;
+ %% non-equivalence trumps exclusivity arbitrarily
+ #amqqueue{name = QueueName} ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "parameters for ~s not equivalent",
+ [rabbit_misc:rs(QueueName)])
end,
return_queue_declare_ok(State, NoWait, Q);
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 706a7fae70..60c13372aa 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -112,10 +112,10 @@
index_module :: atom(),
dir :: file_path(),
gc_pid :: pid(),
- file_handles_ets :: tid(),
- file_summary_ets :: tid(),
- dedup_cache_ets :: tid(),
- cur_file_cache_ets :: tid() }).
+ file_handles_ets :: ets:tid(),
+ file_summary_ets :: ets:tid(),
+ dedup_cache_ets :: ets:tid(),
+ cur_file_cache_ets :: ets:tid() }).
-type(startup_fun_state() ::
{(fun ((A) -> 'finished' | {guid(), non_neg_integer(), A})), A}).
@@ -140,7 +140,7 @@
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
-spec(gc/3 :: (non_neg_integer(), non_neg_integer(),
- {tid(), file_path(), atom(), any()}) ->
+ {ets:tid(), file_path(), atom(), any()}) ->
'concurrent_readers' | non_neg_integer()).
-endif.
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 4b80d088d7..a02d137523 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -55,7 +55,7 @@
-ifdef(use_specs).
--spec(start_link/4 :: (file_path(), any(), atom(), tid()) ->
+-spec(start_link/4 :: (file_path(), any(), atom(), ets:tid()) ->
{'ok', pid()} | 'ignore' | {'error', any()}).
-spec(gc/3 :: (pid(), non_neg_integer(), non_neg_integer()) -> 'ok').
-spec(no_readers/2 :: (pid(), non_neg_integer()) -> 'ok').
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index c3d0b7b786..68ffc98ad7 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -102,7 +102,7 @@ boot_ssl() ->
{ok, []} ->
ok;
{ok, SslListeners} ->
- ok = rabbit_misc:start_applications([crypto, ssl]),
+ ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
{ok, SslOpts} = application:get_env(ssl_options),
[start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners],
ok
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 8ba5874061..f2a903dcca 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -103,6 +103,8 @@
%% heartbeat timeout -> *throw*
%% closing:
%% socket close -> *terminate*
+%% receive connection.close -> send connection.close_ok,
+%% *closing*
%% receive frame -> ignore, *closing*
%% handshake_timeout -> ignore, *closing*
%% heartbeat timeout -> *throw*
@@ -119,6 +121,8 @@
%% start terminate_connection timer, *closed*
%% closed:
%% socket close -> *terminate*
+%% receive connection.close -> send connection.close_ok,
+%% *closed*
%% receive connection.close_ok -> self() ! terminate_connection,
%% *closed*
%% receive frame -> ignore, *closed*
@@ -486,10 +490,18 @@ handle_frame(Type, Channel, Payload, State) ->
closing ->
%% According to the spec, after sending a
%% channel.close we must ignore all frames except
+ %% channel.close and channel.close_ok. In the
+ %% event of a channel.close, we should send back a
%% channel.close_ok.
case AnalyzedFrame of
{method, 'channel.close_ok', _} ->
erase({channel, Channel});
+ {method, 'channel.close', _} ->
+ %% We're already closing this channel, so
+ %% there's no cleanup to do (notify
+ %% queues, etc.)
+ ok = rabbit_writer:send_command(State#v1.sock,
+ #'channel.close_ok'{});
_ -> ok
end,
State;
@@ -666,6 +678,12 @@ handle_method0(#'connection.close'{},
State = #v1{connection_state = running}) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
+handle_method0(#'connection.close'{}, State = #v1{connection_state = CS})
+ when CS =:= closing; CS =:= closed ->
+ %% We're already closed or closing, so we don't need to cleanup
+ %% anything.
+ ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
+ State;
handle_method0(#'connection.close_ok'{},
State = #v1{connection_state = closed}) ->
self() ! terminate_connection,