summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-04-13 10:56:26 -0700
committerTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-04-13 10:56:26 -0700
commitefb2d4151e4a8dec1918e8d71434457f9925566c (patch)
treeb97c0d88ccbeb7129a730cddbc2fc6684dc5bdc6
parent40cc23ce4796a38bfddaac1d11e568ca1fc282c0 (diff)
downloadrabbitmq-server-git-efb2d4151e4a8dec1918e8d71434457f9925566c.tar.gz
Initial 0-9-1 port.
- exchange auto-delete no longer supported - access.request no longer supported - connection redirection no longer supported - trace frames no longer supported
-rw-r--r--Makefile2
-rw-r--r--include/rabbit.hrl3
-rw-r--r--src/rabbit_access_control.erl2
-rw-r--r--src/rabbit_channel.erl10
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_exchange.erl36
-rw-r--r--src/rabbit_reader.erl48
7 files changed, 19 insertions, 84 deletions
diff --git a/Makefile b/Makefile
index 47aa586c8c..bec10a2b10 100644
--- a/Makefile
+++ b/Makefile
@@ -31,7 +31,7 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME)
SIBLING_CODEGEN_DIR=../rabbitmq-codegen/
AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen)
-AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json
+AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.9.1.json
ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index c707112f87..7515e71407 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -49,7 +49,7 @@
-record(resource, {virtual_host, kind, name}).
--record(exchange, {name, type, durable, auto_delete, arguments}).
+-record(exchange, {name, type, durable, arguments}).
-record(amqqueue, {name, durable, auto_delete, arguments, pid}).
@@ -105,7 +105,6 @@
#exchange{name :: exchange_name(),
type :: exchange_type(),
durable :: bool(),
- auto_delete :: bool(),
arguments :: amqp_table()}).
-type(binding() ::
#binding{exchange_name :: exchange_name(),
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 54348d9a1c..fed2fd5bb8 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -234,7 +234,7 @@ add_vhost(VHostPath) ->
write),
[rabbit_exchange:declare(
rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, []) ||
+ Type, true, []) ||
{Name,Type} <-
[{<<"">>, direct},
{<<"amq.direct">>, direct},
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7574cd673a..80dde26bec 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -299,9 +299,6 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
stop;
-handle_method(#'access.request'{},_, State) ->
- {reply, #'access.request_ok'{ticket = 1}, State};
-
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
@@ -372,7 +369,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
Content),
{noreply, State1#ch{next_tag = DeliveryTag + 1}};
empty ->
- {reply, #'basic.get_empty'{cluster_id = <<>>}, State}
+ {reply, #'basic.get_empty'{deprecated_cluster_id = <<>>}, State}
end;
handle_method(#'basic.consume'{queue = QueueNameBin,
@@ -539,8 +536,8 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
passive = false,
durable = Durable,
- auto_delete = AutoDelete,
- internal = false,
+ deprecated_auto_delete = false, %% 0-9-1: true not supported
+ deprecated_internal = false, %% 0-9-1: true not supported
nowait = NoWait,
arguments = Args},
_, State = #ch{ virtual_host = VHostPath }) ->
@@ -554,7 +551,6 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
rabbit_exchange:declare(ExchangeName,
CheckedType,
Durable,
- AutoDelete,
Args)
end,
ok = rabbit_exchange:assert_type(X, CheckedType),
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index dc5824f1c9..625bea90db 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -41,7 +41,7 @@
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
- topic, true, false, []),
+ topic, true, []),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
name = ?LOG_EXCH_NAME}}.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index a57e8076bf..7ce6949da3 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -34,7 +34,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, declare/5, lookup/1, lookup_or_die/1,
+-export([recover/0, declare/4, lookup/1, lookup_or_die/1,
list/1, info/1, info/2, info_all/1, info_all/2,
simple_publish/6, simple_publish/3,
route/3]).
@@ -62,8 +62,7 @@
-type(bind_res() :: 'ok' |
{'error', 'queue_not_found' | 'exchange_not_found'}).
-spec(recover/0 :: () -> 'ok').
--spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(),
- amqp_table()) -> exchange()).
+-spec(declare/4 :: (exchange_name(), exchange_type(), bool(), amqp_table()) -> exchange()).
-spec(check_type/1 :: (binary()) -> atom()).
-spec(assert_type/2 :: (exchange(), atom()) -> 'ok').
-spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()).
@@ -100,7 +99,7 @@
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
+-define(INFO_KEYS, [name, type, durable, arguments].
recover() ->
ok = rabbit_misc:table_foreach(
@@ -115,11 +114,10 @@ recover() ->
ReverseRoute, write)
end, rabbit_durable_route).
-declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
+declare(ExchangeName, Type, Durable, Args) ->
Exchange = #exchange{name = ExchangeName,
type = Type,
durable = Durable,
- auto_delete = AutoDelete,
arguments = Args},
rabbit_misc:execute_mnesia_transaction(
fun () ->
@@ -181,7 +179,6 @@ infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
i(name, #exchange{name = Name}) -> Name;
i(type, #exchange{type = Type}) -> Type;
i(durable, #exchange{durable = Durable}) -> Durable;
-i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
i(Item, _) -> throw({bad_argument, Item}).
@@ -306,7 +303,6 @@ delete_bindings_for_exchange(ExchangeName) ->
ok.
delete_bindings_for_queue(QueueName) ->
- Exchanges = exchanges_for_queue(QueueName),
[begin
ok = delete_forward_routes(reverse_route(Route)),
ok = mnesia:delete_object(rabbit_reverse_route, Route, write)
@@ -316,25 +312,12 @@ delete_bindings_for_queue(QueueName) ->
#route{binding = #binding{queue_name = QueueName,
_ = '_'}}),
write)],
- [begin
- [X] = mnesia:read({rabbit_exchange, ExchangeName}),
- ok = maybe_auto_delete(X)
- end || ExchangeName <- Exchanges],
ok.
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
ok = mnesia:delete_object(rabbit_durable_route, Route, write).
-exchanges_for_queue(QueueName) ->
- MatchHead = reverse_route(
- #route{binding = #binding{exchange_name = '$1',
- queue_name = QueueName,
- _ = '_'}}),
- sets:to_list(
- sets:from_list(
- mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))).
-
has_bindings(ExchangeName) ->
MatchHead = #route{binding = #binding{exchange_name = ExchangeName,
_ = '_'}},
@@ -386,11 +369,10 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
call_with_exchange_and_queue(
ExchangeName, QueueName,
- fun (X, Q) ->
+ fun (_X, Q) ->
ok = sync_binding(
ExchangeName, QueueName, RoutingKey, Arguments,
- Q#amqqueue.durable, fun mnesia:delete_object/3),
- maybe_auto_delete(X)
+ Q#amqqueue.durable, fun mnesia:delete_object/3)
end).
sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) ->
@@ -545,12 +527,6 @@ delete(ExchangeName, _IfUnused = true) ->
delete(ExchangeName, _IfUnused = false) ->
call_with_exchange(ExchangeName, fun unconditional_delete/1).
-maybe_auto_delete(#exchange{auto_delete = false}) ->
- ok;
-maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
- conditional_delete(Exchange),
- ok.
-
conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
case has_bindings(ExchangeName) of
false -> unconditional_delete(Exchange);
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index ef8038e7aa..3760f5d65c 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -453,7 +453,6 @@ handle_frame(Type, 0, Payload, State) ->
case analyze_frame(Type, Payload) of
error -> throw({unknown_frame, Type, Payload});
heartbeat -> State;
- trace -> State;
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
Other -> throw({unexpected_frame_on_channel0, Other})
@@ -462,7 +461,6 @@ handle_frame(Type, Channel, Payload, State) ->
case analyze_frame(Type, Payload) of
error -> throw({unknown_frame, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- trace -> throw({unexpected_trace_frame, Channel});
AnalyzedFrame ->
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
@@ -487,8 +485,6 @@ analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/bi
{content_header, ClassId, Weight, BodySize, Properties};
analyze_frame(?FRAME_BODY, Body) ->
{content_body, Body};
-analyze_frame(?FRAME_TRACE, _Body) ->
- trace;
analyze_frame(?FRAME_HEARTBEAT, <<>>) ->
heartbeat;
analyze_frame(_Type, _Body) ->
@@ -606,35 +602,18 @@ handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
frame_max = FrameMax}};
-handle_method0(#'connection.open'{virtual_host = VHostPath,
- insist = Insist},
+handle_method0(#'connection.open'{virtual_host = VHostPath},
State = #v1{connection_state = opening,
connection = Connection = #connection{
user = User},
sock = Sock}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
- KnownHosts = format_listeners(rabbit_networking:active_listeners()),
- Redirects = compute_redirects(Insist),
- if Redirects == [] ->
- ok = send_on_channel0(
- Sock,
- #'connection.open_ok'{known_hosts = KnownHosts}),
- State#v1{connection_state = running,
- connection = NewConnection};
- true ->
- %% FIXME: 'host' is supposed to only contain one
- %% address; but which one do we pick? This is
- %% really a problem with the spec.
- Host = format_listeners(Redirects),
- rabbit_log:info("connection ~p redirecting to ~p~n",
- [self(), Host]),
- ok = send_on_channel0(
- Sock,
- #'connection.redirect'{host = Host,
- known_hosts = KnownHosts}),
- close_connection(State#v1{connection = NewConnection})
- end;
+ ok = send_on_channel0(
+ Sock,
+ #'connection.open_ok'{deprecated_known_hosts = <<>>}),
+ State#v1{connection_state = running,
+ connection = NewConnection};
handle_method0(#'connection.close'{},
State = #v1{connection_state = running}) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
@@ -653,21 +632,6 @@ handle_method0(_Method, #v1{connection_state = S}) ->
send_on_channel0(Sock, Method) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method).
-format_listeners(Listeners) ->
- list_to_binary(
- rabbit_misc:intersperse(
- $,,
- [io_lib:format("~s:~w", [Host, Port]) ||
- #listener{host = Host, port = Port} <- Listeners])).
-
-compute_redirects(true) -> [];
-compute_redirects(false) ->
- Node = node(),
- LNode = rabbit_load:pick(),
- if Node == LNode -> [];
- true -> rabbit_networking:node_listeners(LNode)
- end.
-
%%--------------------------------------------------------------------------
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].