summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-06-24 16:55:14 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-06-24 16:55:14 +0100
commit54e7807e288b84fc0a574a43bc784a090e2341f2 (patch)
tree8f0dbdfcf7cf9874d9cafa850df62e66a7d5985c /src
parent91aee115be7cfbff56ada0bfe979ce7598325875 (diff)
downloadrabbitmq-server-git-54e7807e288b84fc0a574a43bc784a090e2341f2.tar.gz
Codegen up two different versions of rabbit_framing and provide a dispatcher to choose. The codegen now builds header files that are the union of 0-8 and 0-9-1.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_basic.erl8
-rw-r--r--src/rabbit_binary_generator.erl14
-rw-r--r--src/rabbit_channel.erl6
-rw-r--r--src/rabbit_framing.erl102
-rw-r--r--src/rabbit_framing_channel.erl22
-rw-r--r--src/rabbit_reader.erl26
-rw-r--r--src/rabbit_tests.erl2
7 files changed, 129 insertions, 51 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 4ab7a2a0b1..5e2d7d9abd 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -80,7 +80,9 @@ delivery(Mandatory, Immediate, Txn, Message) ->
sender = self(), message = Message}.
build_content(Properties, BodyBin) ->
- {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
+ %% TODO - is this ever used? If so remove hard coded amqp_0_9_1
+ {ClassId, _MethodId} =
+ rabbit_framing:method_id('basic.publish', amqp_0_9_1),
#content{class_id = ClassId,
properties = Properties,
properties_bin = none,
@@ -91,7 +93,9 @@ from_content(Content) ->
properties = Props,
payload_fragments_rev = FragmentsRev} =
rabbit_binary_parser:ensure_content_decoded(Content),
- {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
+ %% TODO - is this ever used? If so remove hard coded amqp_0_9_1
+ {ClassId, _MethodId} =
+ rabbit_framing:method_id('basic.publish', amqp_0_9_1),
{Props, list_to_binary(lists:reverse(FragmentsRev))}.
message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 28f34e7cae..04251d11fc 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -72,19 +72,11 @@
%%----------------------------------------------------------------------------
build_simple_method_frame(ChannelInt, MethodRecord, Protocol) ->
- MethodFields = rabbit_framing:encode_method_fields(MethodRecord),
- MethodName = adjust_close(rabbit_misc:method_record_type(MethodRecord),
- Protocol),
- {ClassId, MethodId} = rabbit_framing:method_id(MethodName),
+ MethodFields = rabbit_framing:encode_method_fields(MethodRecord, Protocol),
+ MethodName = rabbit_misc:method_record_type(MethodRecord),
+ {ClassId, MethodId} = rabbit_framing:method_id(MethodName, Protocol),
create_frame(1, ChannelInt, [<<ClassId:16, MethodId:16>>, MethodFields]).
-adjust_close('connection.close', amqp_0_8) ->
- 'connection.close08';
-adjust_close('connection.close_ok', amqp_0_8) ->
- 'connection.close08_ok';
-adjust_close(MethodName, _Protocol) ->
- MethodName.
-
build_simple_content_frames(ChannelInt,
#content{class_id = ClassId,
properties = ContentProperties,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7c0b94d488..d337df294f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -499,7 +499,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
Content),
{noreply, State1#ch{next_tag = DeliveryTag + 1}};
empty ->
- {reply, #'basic.get_empty'{deprecated_cluster_id = <<>>}, State}
+ {reply, #'basic.get_empty'{cluster_id = <<>>}, State}
end;
handle_method(#'basic.consume'{queue = QueueNameBin,
@@ -656,8 +656,8 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
passive = false,
durable = Durable,
- deprecated_auto_delete = AutoDelete,
- deprecated_internal = false,
+ auto_delete = AutoDelete,
+ internal = false,
nowait = NoWait,
arguments = Args},
_, State = #ch{ virtual_host = VHostPath }) ->
diff --git a/src/rabbit_framing.erl b/src/rabbit_framing.erl
new file mode 100644
index 0000000000..2d4d1ce411
--- /dev/null
+++ b/src/rabbit_framing.erl
@@ -0,0 +1,102 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+-module(rabbit_framing).
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+
+
+-export([encode_method_fields/2]).
+-export([decode_method_fields/3]).
+-export([lookup_method_name/2]).
+-export([method_id/2]).
+
+-export([method_has_content/1]).
+-export([is_method_synchronous/1]).
+-export([method_record/1]).
+-export([method_fieldnames/1]).
+-export([decode_properties/2]).
+-export([encode_properties/1]).
+-export([lookup_amqp_exception/1]).
+-export([amqp_exception/1]).
+
+%% Method signatures
+-ifdef(use_specs).
+-spec(encode_method_fields/2 :: (amqp_method_record(), protocol()) -> binary()).
+-spec(decode_method_fields/3 :: (amqp_method_name(), binary(), protocol()) ->
+ amqp_method_record()).
+-spec(lookup_method_name/2 :: (amqp_method(), protocol()) ->
+ amqp_method_name()).
+-spec(method_id/2 :: (amqp_method_name(), protocol()) -> amqp_method()).
+
+-spec(method_has_content/1 :: (amqp_method_name()) -> boolean()).
+-spec(is_method_synchronous/1 :: (amqp_method_record()) -> boolean()).
+-spec(method_record/1 :: (amqp_method_name()) -> amqp_method_record()).
+-spec(method_fieldnames/1 :: (amqp_method_name()) ->
+ [amqp_method_field_name()]).
+-spec(decode_properties/2 :: (non_neg_integer(), binary()) ->
+ amqp_property_record()).
+-spec(encode_properties/1 :: (amqp_method_record()) -> binary()).
+-spec(lookup_amqp_exception/1 ::
+ (amqp_exception()) -> {boolean(), amqp_exception_code(), binary()}).
+-spec(amqp_exception/1 :: (amqp_exception_code()) -> amqp_exception()).
+-endif. % use_specs
+
+encode_method_fields(MethodRecord, amqp_0_9_1) ->
+ rabbit_framing_amqp_0_9_1:encode_method_fields(MethodRecord);
+encode_method_fields(MethodRecord, amqp_0_8) ->
+ rabbit_framing_amqp_0_8:encode_method_fields(MethodRecord).
+
+decode_method_fields(MethodName, FieldsBin, amqp_0_9_1) ->
+ rabbit_framing_amqp_0_9_1:decode_method_fields(MethodName, FieldsBin);
+decode_method_fields(MethodName, FieldsBin, amqp_0_8) ->
+ rabbit_framing_amqp_0_8:decode_method_fields(MethodName, FieldsBin).
+
+lookup_method_name(ClassMethod, amqp_0_9_1) ->
+ rabbit_framing_amqp_0_9_1:lookup_method_name(ClassMethod);
+lookup_method_name(ClassMethod, amqp_0_8) ->
+ rabbit_framing_amqp_0_8:lookup_method_name(ClassMethod).
+
+method_id(MethodName, amqp_0_9_1) ->
+ rabbit_framing_amqp_0_9_1:method_id(MethodName);
+method_id(MethodName, amqp_0_8) ->
+ rabbit_framing_amqp_0_8:method_id(MethodName).
+
+
+
+%% These ones don't make any difference, let's just use 0-9-1.
+method_has_content(X) -> rabbit_framing_amqp_0_9_1:method_has_content(X).
+method_record(X) -> rabbit_framing_amqp_0_9_1:method_record(X).
+method_fieldnames(X) -> rabbit_framing_amqp_0_9_1:method_fieldnames(X).
+encode_properties(X) -> rabbit_framing_amqp_0_9_1:encode_properties(X).
+amqp_exception(X) -> rabbit_framing_amqp_0_9_1:amqp_exception(X).
+lookup_amqp_exception(X) -> rabbit_framing_amqp_0_9_1:lookup_amqp_exception(X).
+is_method_synchronous(X) -> rabbit_framing_amqp_0_9_1:is_method_synchronous(X).
+decode_properties(X, Y) -> rabbit_framing_amqp_0_9_1:decode_properties(X, Y).
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index 648a3fdd0d..c30cf45143 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -74,7 +74,8 @@ read_frame(ChannelPid) ->
mainloop(ChannelPid, Protocol) ->
{method, MethodName, FieldsBin} = read_frame(ChannelPid),
- Method = decode_method_fields(MethodName, FieldsBin, Protocol),
+ Method = rabbit_framing:decode_method_fields(MethodName, FieldsBin,
+ Protocol),
case rabbit_framing:method_has_content(MethodName) of
true -> rabbit_channel:do(ChannelPid, Method,
collect_content(ChannelPid, MethodName));
@@ -82,24 +83,9 @@ mainloop(ChannelPid, Protocol) ->
end,
?MODULE:mainloop(ChannelPid, Protocol).
-%% Handle 0-8 version of channel.open-ok. In 0-9-1 it gained a longstr
-%% "deprecated_channel_id".
-decode_method_fields('channel.open_ok', FieldsBin, amqp_0_8) ->
- Len = 0,
- rabbit_framing:decode_method_fields(
- 'channel.open_ok', <<FieldsBin/binary, Len:32/unsigned>>);
-%% Handle 0-8 version of basic.consume. In 0-9-1 it gained a table
-%% "filter".
-decode_method_fields('basic.consume', FieldsBin, amqp_0_8) ->
- T = rabbit_binary_generator:generate_table([]),
- TLen = size(T),
- rabbit_framing:decode_method_fields(
- 'basic.consume', <<FieldsBin/binary, TLen:32/unsigned, T:TLen/binary>>);
-decode_method_fields(MethodName, FieldsBin, _Protocol) ->
- rabbit_framing:decode_method_fields(MethodName, FieldsBin).
-
collect_content(ChannelPid, MethodName) ->
- {ClassId, _MethodId} = rabbit_framing:method_id(MethodName),
+ %% Protocol does not matter as we only want the class ID to match
+ {ClassId, _MethodId} = rabbit_framing:method_id(MethodName, amqp_0_9_1),
case read_frame(ChannelPid) of
{content_header, HeaderClassId, 0, BodySize, PropertiesBin} ->
if HeaderClassId == ClassId ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 98b4d64710..c324d008a4 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -508,9 +508,7 @@ handle_frame(Type, Channel, Payload,
analyze_frame(?FRAME_METHOD, <<ClassId:16, MethodId:16, MethodFields/binary>>,
Protocol) ->
- {method, adjust_close(
- rabbit_framing:lookup_method_name({ClassId, MethodId}),
- Protocol),
+ {method, rabbit_framing:lookup_method_name({ClassId, MethodId}, Protocol),
MethodFields};
analyze_frame(?FRAME_HEADER,
<<ClassId:16, Weight:16, BodySize:64, Properties/binary>>,
@@ -523,13 +521,6 @@ analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) ->
analyze_frame(_Type, _Body, _Protocol) ->
error.
-adjust_close('connection.close08', amqp_0_8) ->
- 'connection.close';
-adjust_close('connection.close08_ok', amqp_0_8) ->
- 'connection.close_ok';
-adjust_close(MethodName, _Protocol) ->
- MethodName.
-
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
%%?LOGDEBUG("Got frame header: ~p/~p/~p~n", [Type, Channel, PayloadSize]),
{State, {frame_payload, Type, Channel, PayloadSize}, PayloadSize + 1};
@@ -589,10 +580,11 @@ check_version(ClientVersion, ServerVersion) ->
%%--------------------------------------------------------------------------
-handle_method0(MethodName, FieldsBin, State) ->
+handle_method0(MethodName, FieldsBin,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
try
handle_method0(rabbit_framing:decode_method_fields(
- MethodName, FieldsBin),
+ MethodName, FieldsBin, Protocol),
State)
catch exit:Reason ->
CompleteReason = case Reason of
@@ -653,7 +645,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(
Sock,
- #'connection.open_ok'{deprecated_known_hosts = <<>>},
+ #'connection.open_ok'{known_hosts = <<>>},
Protocol),
State#v1{connection_state = running,
connection = NewConnection};
@@ -755,7 +747,8 @@ handle_exception(State = #v1{connection_state = CS}, Channel, Reason) ->
send_exception(State = #v1{connection = #connection{protocol = Protocol}},
Channel, Reason) ->
- {ShouldClose, CloseChannel, CloseMethod} = map_exception(Channel, Reason),
+ {ShouldClose, CloseChannel, CloseMethod} =
+ map_exception(Channel, Reason, Protocol),
NewState = case ShouldClose of
true -> terminate_channels(),
close_connection(State);
@@ -765,14 +758,15 @@ send_exception(State = #v1{connection = #connection{protocol = Protocol}},
NewState#v1.sock, CloseChannel, CloseMethod, Protocol),
NewState.
-map_exception(Channel, Reason) ->
+map_exception(Channel, Reason, Protocol) ->
{SuggestedClose, ReplyCode, ReplyText, FailedMethod} =
lookup_amqp_exception(Reason),
ShouldClose = SuggestedClose or (Channel == 0),
{ClassId, MethodId} = case FailedMethod of
{_, _} -> FailedMethod;
none -> {0, 0};
- _ -> rabbit_framing:method_id(FailedMethod)
+ _ -> rabbit_framing:method_id(FailedMethod,
+ Protocol)
end,
{CloseChannel, CloseMethod} =
case ShouldClose of
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ecc2613d1b..fc7beedda1 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -913,7 +913,7 @@ test_memory_pressure() ->
%% if we publish at this point, the channel should die
Content = #content{class_id = element(1, rabbit_framing:method_id(
- 'basic.publish')),
+ 'basic.publish', amqp_0_9_1)),
properties = none,
properties_bin = <<>>,
payload_fragments_rev = []},