diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-30 11:58:20 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-30 11:58:20 +0100 |
| commit | 90ff70a20162ca1c9fd8f20bb16d9c5ed416f682 (patch) | |
| tree | a916628a3a6dadffbd5a554db30360579c39e418 /src | |
| parent | 85586512a898bd0a2757a586d274e26137d185a8 (diff) | |
| parent | 5cca679cd5057a1962ef23ddbfb6e98b79b92129 (diff) | |
| download | rabbitmq-server-git-90ff70a20162ca1c9fd8f20bb16d9c5ed416f682.tar.gz | |
Merging bug21673 into bug22896
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 57 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 18 |
5 files changed, 49 insertions, 40 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d337df294f..8649ecc7f1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -726,42 +726,33 @@ handle_method(#'queue.declare'{queue = QueueNameBin, end, %% We use this in both branches, because queue_declare may yet return an %% existing queue. - Finish = fun (#amqqueue{name = QueueName, - durable = Durable1, - auto_delete = AutoDelete1} = Q) - when Durable =:= Durable1, AutoDelete =:= AutoDelete1 -> - check_exclusive_access(Q, Owner, strict), - check_configure_permitted(QueueName, State), - %% We need to notify the reader within the channel - %% process so that we can be sure there are no - %% outstanding exclusive queues being declared as the - %% connection shuts down. - case Owner of - none -> ok; - _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q) - end, - Q; - %% non-equivalence trumps exclusivity arbitrarily - (#amqqueue{name = QueueName}) -> - rabbit_misc:protocol_error( - precondition_failed, - "parameters for ~s not equivalent", - [rabbit_misc:rs(QueueName)]) - end, - Q = case rabbit_amqqueue:with( - rabbit_misc:r(VHostPath, queue, QueueNameBin), - Finish) of - {error, not_found} -> - ActualNameBin = - case QueueNameBin of + ActualNameBin = case QueueNameBin of <<>> -> rabbit_guid:binstring_guid("amq.gen"); Other -> check_name('queue', Other) end, - QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), - Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, - Args, Owner)); - #amqqueue{} = Other -> - Other + QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), + Q = case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, + Args, Owner) of + #amqqueue{name = QueueName, + durable = Durable1, + auto_delete = AutoDelete1} = Q1 + when Durable =:= Durable1, AutoDelete =:= AutoDelete1 -> + check_exclusive_access(Q1, Owner, strict), + check_configure_permitted(QueueName, State), + %% We need to notify the reader within the channel + %% process so that we can be sure there are no + %% outstanding exclusive queues being declared as the + %% connection shuts down. + case Owner of + none -> ok; + _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q1) + end, + Q1; + %% non-equivalence trumps exclusivity arbitrarily + #amqqueue{name = QueueName} -> + rabbit_misc:protocol_error( + precondition_failed, "parameters for ~s not equivalent", + [rabbit_misc:rs(QueueName)]) end, return_queue_declare_ok(State, NoWait, Q); diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 706a7fae70..60c13372aa 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -112,10 +112,10 @@ index_module :: atom(), dir :: file_path(), gc_pid :: pid(), - file_handles_ets :: tid(), - file_summary_ets :: tid(), - dedup_cache_ets :: tid(), - cur_file_cache_ets :: tid() }). + file_handles_ets :: ets:tid(), + file_summary_ets :: ets:tid(), + dedup_cache_ets :: ets:tid(), + cur_file_cache_ets :: ets:tid() }). -type(startup_fun_state() :: {(fun ((A) -> 'finished' | {guid(), non_neg_integer(), A})), A}). @@ -140,7 +140,7 @@ -spec(successfully_recovered_state/1 :: (server()) -> boolean()). -spec(gc/3 :: (non_neg_integer(), non_neg_integer(), - {tid(), file_path(), atom(), any()}) -> + {ets:tid(), file_path(), atom(), any()}) -> 'concurrent_readers' | non_neg_integer()). -endif. diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 4b80d088d7..a02d137523 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -55,7 +55,7 @@ -ifdef(use_specs). --spec(start_link/4 :: (file_path(), any(), atom(), tid()) -> +-spec(start_link/4 :: (file_path(), any(), atom(), ets:tid()) -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(gc/3 :: (pid(), non_neg_integer(), non_neg_integer()) -> 'ok'). -spec(no_readers/2 :: (pid(), non_neg_integer()) -> 'ok'). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index c3d0b7b786..68ffc98ad7 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -102,7 +102,7 @@ boot_ssl() -> {ok, []} -> ok; {ok, SslListeners} -> - ok = rabbit_misc:start_applications([crypto, ssl]), + ok = rabbit_misc:start_applications([crypto, public_key, ssl]), {ok, SslOpts} = application:get_env(ssl_options), [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners], ok diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 8ba5874061..f2a903dcca 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -103,6 +103,8 @@ %% heartbeat timeout -> *throw* %% closing: %% socket close -> *terminate* +%% receive connection.close -> send connection.close_ok, +%% *closing* %% receive frame -> ignore, *closing* %% handshake_timeout -> ignore, *closing* %% heartbeat timeout -> *throw* @@ -119,6 +121,8 @@ %% start terminate_connection timer, *closed* %% closed: %% socket close -> *terminate* +%% receive connection.close -> send connection.close_ok, +%% *closed* %% receive connection.close_ok -> self() ! terminate_connection, %% *closed* %% receive frame -> ignore, *closed* @@ -486,10 +490,18 @@ handle_frame(Type, Channel, Payload, State) -> closing -> %% According to the spec, after sending a %% channel.close we must ignore all frames except + %% channel.close and channel.close_ok. In the + %% event of a channel.close, we should send back a %% channel.close_ok. case AnalyzedFrame of {method, 'channel.close_ok', _} -> erase({channel, Channel}); + {method, 'channel.close', _} -> + %% We're already closing this channel, so + %% there's no cleanup to do (notify + %% queues, etc.) + ok = rabbit_writer:send_command(State#v1.sock, + #'channel.close_ok'{}); _ -> ok end, State; @@ -666,6 +678,12 @@ handle_method0(#'connection.close'{}, State = #v1{connection_state = running}) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); +handle_method0(#'connection.close'{}, State = #v1{connection_state = CS}) + when CS =:= closing; CS =:= closed -> + %% We're already closed or closing, so we don't need to cleanup + %% anything. + ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}), + State; handle_method0(#'connection.close_ok'{}, State = #v1{connection_state = closed}) -> self() ! terminate_connection, |
