summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2007-02-25 01:08:57 +0000
committerRobert Godfrey <rgodfrey@apache.org>2007-02-25 01:08:57 +0000
commit2ea003c24ab3170dec118af6f9f8c128241cec65 (patch)
tree5d062e29ec7eceeda453c1402117978c211a2db8 /java
parentb9f9c16645933e0e2f4c6c9b58e8cd1716434467 (diff)
downloadqpid-python-2ea003c24ab3170dec118af6f9f8c128241cec65.tar.gz
QPID-391 : Broker Refactoring - initial tidy... add some mechanisms for multi version
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@511389 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Main.java24
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java31
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java57
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java62
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java288
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java254
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java34
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java33
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java40
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java57
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java62
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java38
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java13
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java32
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java33
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java31
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java18
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java125
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java65
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/FieldTable.java68
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java155
-rw-r--r--java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java20
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java6
35 files changed, 1166 insertions, 476 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 7ceb3a7eef..be2cee79ee 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -433,7 +433,10 @@ public class AMQChannel
}
- /** Called to resend all outstanding unacknowledged messages to this same channel. */
+ /** Called to resend all outstanding unacknowledged messages to this same channel.
+ * @param session the session
+ * @param requeue if true then requeue, else resend
+ * @throws org.apache.qpid.AMQException */
public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException
{
final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
@@ -752,7 +755,9 @@ public class AMQChannel
for (RequiredDeliveryException bouncedMessage : _returnMessages)
{
AMQMessage message = bouncedMessage.getAMQMessage();
- message.writeReturn(session, _channelId, bouncedMessage.getReplyCode().getCode(), new AMQShortString(bouncedMessage.getMessage()));
+ session.getProtocolOutputConverter().writeReturn(message, _channelId,
+ bouncedMessage.getReplyCode().getCode(),
+ new AMQShortString(bouncedMessage.getMessage()));
}
_returnMessages.clear();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java
index 42fe8c5274..a48bc5df7f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -45,7 +45,7 @@ import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
@@ -59,7 +59,8 @@ import org.apache.qpid.url.URLSyntaxException;
* Main entry point for AMQPD.
*
*/
-public class Main implements ProtocolVersionList
+@SuppressWarnings({"AccessStaticViaInstance"})
+public class Main
{
private static final Logger _logger = Logger.getLogger(Main.class);
@@ -143,12 +144,21 @@ public class Main implements ProtocolVersionList
else if (commandLine.hasOption("v"))
{
String ver = "Qpid 0.9.0.0";
- String protocol = "AMQP version(s) [major.minor]: ";
- for (int i=0; i<pv.length; i++)
+ StringBuilder protocol = new StringBuilder("AMQP version(s) [major.minor]: ");
+
+ boolean first = true;
+ for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions())
{
- if (i > 0)
- protocol += ", ";
- protocol += pv[i][PROTOCOL_MAJOR] + "." + pv[i][PROTOCOL_MINOR];
+ if(first)
+ {
+ first = false;
+ }
+ else
+ {
+ protocol.append(", ");
+ }
+ protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion());
+
}
System.out.println(ver + " (" + protocol + ")");
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index fdf087fdea..99cc60011a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -209,7 +209,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
if(consumerTag != null)
{
- msg.writeDeliver(protocolSession, channelId, deliveryTag, consumerTag);
+ protocolSession.getProtocolOutputConverter().writeDeliver(msg, channelId, deliveryTag, consumerTag);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
index 348bfa5e68..bdabcbf5be 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
@@ -25,7 +25,8 @@ import java.util.HashMap;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.CommonContentHeaderProperties;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQMessage;
/**
@@ -63,8 +64,9 @@ public class PropertyExpression implements Expression
{
try
{
- BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
- return _properties.getReplyTo();
+ CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
+ AMQShortString replyTo = _properties.getReplyTo();
+ return replyTo == null ? null : replyTo.toString();
}
catch (AMQException e)
{
@@ -83,8 +85,9 @@ public class PropertyExpression implements Expression
{
try
{
- BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
- return _properties.getType();
+ CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
+ AMQShortString type = _properties.getType();
+ return type == null ? null : type.toString();
}
catch (AMQException e)
{
@@ -126,7 +129,7 @@ public class PropertyExpression implements Expression
{
try
{
- BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
+ CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
return (int) _properties.getPriority();
}
catch (AMQException e)
@@ -147,8 +150,9 @@ public class PropertyExpression implements Expression
try
{
- BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
- return _properties.getMessageId();
+ CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
+ AMQShortString messageId = _properties.getMessageId();
+ return messageId == null ? null : messageId;
}
catch (AMQException e)
{
@@ -168,7 +172,7 @@ public class PropertyExpression implements Expression
try
{
- BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
+ CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
return _properties.getTimestamp();
}
catch (AMQException e)
@@ -189,8 +193,9 @@ public class PropertyExpression implements Expression
try
{
- BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
- return _properties.getCorrelationId();
+ CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
+ AMQShortString correlationId = _properties.getCorrelationId();
+ return correlationId == null ? null : correlationId.toString();
}
catch (AMQException e)
{
@@ -210,7 +215,7 @@ public class PropertyExpression implements Expression
try
{
- BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
+ CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
return _properties.getExpiration();
}
catch (AMQException e)
@@ -254,7 +259,7 @@ public class PropertyExpression implements Expression
else
{
- BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
+ CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
if(_logger.isDebugEnabled())
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java
new file mode 100644
index 0000000000..e01c5aabbf
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP versions:
+ * 8-0
+ */
+package org.apache.qpid.server.output;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.AMQException;
+
+public interface ProtocolOutputConverter
+{
+ void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag);
+
+ interface Factory
+ {
+ ProtocolOutputConverter newInstance(AMQProtocolSession session);
+ }
+
+ void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException;
+
+ void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException;
+
+ byte getProtocolMinorVersion();
+
+ byte getProtocolMajorVersion();
+
+ void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
+ throws AMQException;
+
+ void writeFrame(AMQDataBlock block);
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
new file mode 100644
index 0000000000..8366c426dd
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP versions:
+ * 8-0
+ */
+package org.apache.qpid.server.output;
+
+import org.apache.qpid.server.output.ProtocolOutputConverter.Factory;
+import org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+import java.util.Map;
+import java.util.HashMap;
+
+public class ProtocolOutputConverterRegistry
+{
+
+ private static final Map<Byte, Map<Byte, Factory>> _registry =
+ new HashMap<Byte, Map<Byte, Factory>>();
+
+
+ static
+ {
+ register((byte) 8, (byte) 0, ProtocolOutputConverterImpl.getInstanceFactory());
+ }
+
+ private static void register(byte major, byte minor, Factory converter)
+ {
+ if(!_registry.containsKey(major))
+ {
+ _registry.put(major, new HashMap<Byte, Factory>());
+ }
+ _registry.get(major).put(minor, converter);
+ }
+
+
+ public static ProtocolOutputConverter getConverter(AMQProtocolSession session)
+ {
+ return _registry.get(session.getProtocolMajorVersion()).get(session.getProtocolMinorVersion()).newInstance(session);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
new file mode 100644
index 0000000000..bd5bb632fe
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
@@ -0,0 +1,288 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP versions:
+ * 8-0
+ */
+package org.apache.qpid.server.output.amqp0_8;
+
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.util.Iterator;
+
+public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+{
+
+ public static Factory getInstanceFactory()
+ {
+ return new Factory()
+ {
+
+ public ProtocolOutputConverter newInstance(AMQProtocolSession session)
+ {
+ return new ProtocolOutputConverterImpl(session);
+ }
+ };
+ }
+
+ private final AMQProtocolSession _protocolSession;
+
+ private ProtocolOutputConverterImpl(AMQProtocolSession session)
+ {
+ _protocolSession = session;
+ }
+
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
+
+ public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ ByteBuffer deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ message.getContentHeaderBody());
+
+ final AMQMessageHandle messageHandle = message.getMessageHandle();
+ final StoreContext storeContext = message.getStoreContext();
+ final long messageId = message.getMessageId();
+
+ final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
+
+ if(bodyCount == 0)
+ {
+ SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+ contentHeader);
+
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+
+
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
+
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ writeFrame(compositeBlock);
+
+ //
+ // Now start writing out the other content bodies
+ //
+ for(int i = 1; i < bodyCount; i++)
+ {
+ cb = messageHandle.getContentChunk(storeContext,messageId, i);
+ writeFrame(new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ }
+
+
+ }
+
+
+ }
+
+
+ public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+
+ final AMQMessageHandle messageHandle = message.getMessageHandle();
+ final StoreContext storeContext = message.getStoreContext();
+ final long messageId = message.getMessageId();
+
+ ByteBuffer deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ message.getContentHeaderBody());
+
+ final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
+ if(bodyCount == 0)
+ {
+ SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+ contentHeader);
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+
+
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
+
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ writeFrame(compositeBlock);
+
+ //
+ // Now start writing out the other content bodies
+ //
+ for(int i = 1; i < bodyCount; i++)
+ {
+ cb = messageHandle.getContentChunk(storeContext, messageId, i);
+ writeFrame(new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ }
+
+
+ }
+
+
+ }
+
+
+ private ByteBuffer createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ final AMQMessageHandle messageHandle = message.getMessageHandle();
+
+ AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ consumerTag,
+ deliveryTag, pb.getExchange(), messageHandle.isRedelivered(),
+ pb.getRoutingKey());
+
+ ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
+ deliverFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+
+ private ByteBuffer createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+ throws AMQException
+ {
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ final AMQMessageHandle messageHandle = message.getMessageHandle();
+
+ AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ deliveryTag, pb.getExchange(),
+ queueSize,
+ messageHandle.isRedelivered(),
+ pb.getRoutingKey());
+ ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
+ getOkFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return getProtocolSession().getProtocolMinorVersion();
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return getProtocolSession().getProtocolMajorVersion();
+ }
+
+ private ByteBuffer createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ {
+ AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ message.getMessagePublishInfo().getExchange(),
+ replyCode, replyText,
+ message.getMessagePublishInfo().getRoutingKey());
+ ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
+ returnFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+
+ public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
+ throws AMQException
+ {
+ ByteBuffer returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ message.getContentHeaderBody());
+
+ Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ if (bodyFrameIterator.hasNext())
+ {
+ AMQDataBlock firstContentBody = bodyFrameIterator.next();
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
+ new AMQDataBlock[]{contentHeader});
+
+ writeFrame(compositeBlock);
+ }
+
+ //
+ // Now start writing out the other content bodies
+ // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
+ //
+ while (bodyFrameIterator.hasNext())
+ {
+ writeFrame(bodyFrameIterator.next());
+ }
+ }
+
+
+ public void writeFrame(AMQDataBlock block)
+ {
+ getProtocolSession().writeFrame(block);
+ }
+
+
+ public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+ {
+
+ writeFrame(BasicCancelOkBody.createAMQFrame(channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ consumerTag // consumerTag
+ ));
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 2de32c2f0f..d71f6e3046 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -43,25 +43,14 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.common.ClientProperties;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionStartBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.MainRegistry;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersionList;
-import org.apache.qpid.framing.VersionSpecificRegistry;
-import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -71,7 +60,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
public class AMQMinaProtocolSession implements AMQProtocolSession,
- ProtocolVersionList,
Managable
{
private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
@@ -111,12 +99,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private long _maxNoOfChannels = 1000;
/* AMQP Version for this session */
- private byte _major = pv[pv.length - 1][PROTOCOL_MAJOR];
- private byte _minor = pv[pv.length - 1][PROTOCOL_MINOR];
+ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
+
private FieldTable _clientProperties;
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
- private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length - 1][PROTOCOL_MAJOR], pv[pv.length - 1][PROTOCOL_MINOR]);
+ private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(_protocolVersion);
private List<Integer> _closingChannelsList = new ArrayList<Integer>();
+ private ProtocolOutputConverter _protocolOutputConverter;
public ManagedObject getManagedObject()
@@ -195,86 +184,116 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_lastReceived = message;
if (message instanceof ProtocolInitiation)
{
- ProtocolInitiation pi = (ProtocolInitiation) message;
- // this ensures the codec never checks for a PI message again
- ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
- try
- {
- pi.checkVersion(this); // Fails if not correct
+ protocolInitiationReceived((ProtocolInitiation) message);
- // This sets the protocol version (and hence framing classes) for this session.
- setProtocolVersion(pi.protocolMajor, pi.protocolMinor);
+ }
+ else if (message instanceof AMQFrame)
+ {
+ AMQFrame frame = (AMQFrame) message;
+ frameReceived(frame);
- String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
+ }
+ else
+ {
+ throw new UnknnownMessageTypeException(message);
+ }
+ }
- String locales = "en_US";
+ private void frameReceived(AMQFrame frame)
+ throws AMQException
+ {
+ int channelId = frame.getChannel();
+ AMQBody body = frame.getBodyFrame();
- // Interfacing with generated code - be aware of possible changes to parameter order as versions change.
- AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
- _major, _minor, // AMQP version (major, minor)
- locales.getBytes(), // locales
- mechanisms.getBytes(), // mechanisms
- null, // serverProperties
- (short) _major, // versionMajor
- (short) _minor); // versionMinor
- _minaProtocolSession.write(response);
- }
- catch (AMQException e)
- {
- _logger.error("Received incorrect protocol initiation", e);
- /* Find last protocol version in protocol version list. Make sure last protocol version
- listed in the build file (build-module.xml) is the latest version which will be used
- here. */
- int i = pv.length - 1;
- _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
- // TODO: Close connection (but how to wait until message is sent?)
- // ritchiem 2006-12-04 will this not do?
-// WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
-// future.join();
-// close connection
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Frame Received: " + frame);
+ }
- }
+ if (body instanceof AMQMethodBody)
+ {
+ methodFrameReceived(channelId, (AMQMethodBody)body);
+ }
+ else if (body instanceof ContentHeaderBody)
+ {
+ contentHeaderReceived(channelId, (ContentHeaderBody)body);
+ }
+ else if (body instanceof ContentBody)
+ {
+ contentBodyReceived(channelId, (ContentBody)body);
+ }
+ else if (body instanceof HeartbeatBody)
+ {
+ // NO OP
}
else
{
- AMQFrame frame = (AMQFrame) message;
-
- if (frame.getBodyFrame() instanceof AMQMethodBody)
- {
- methodFrameReceived(frame);
- }
- else
- {
- contentFrameReceived(frame);
- }
+ _logger.warn("Unrecognised frame " + frame.getClass().getName());
}
}
- private void methodFrameReceived(AMQFrame frame)
+ private void protocolInitiationReceived(ProtocolInitiation pi)
{
- if (_logger.isDebugEnabled())
+ // this ensures the codec never checks for a PI message again
+ ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+ try
{
- _logger.debug("Method frame received: " + frame);
+ pi.checkVersion(); // Fails if not correct
+
+ // This sets the protocol version (and hence framing classes) for this session.
+ setProtocolVersion(pi._protocolMajor, pi._protocolMinor);
+
+ String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
+
+ String locales = "en_US";
+
+ // Interfacing with generated code - be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
+ getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
+ locales.getBytes(), // locales
+ mechanisms.getBytes(), // mechanisms
+ null, // serverProperties
+ (short) getProtocolMajorVersion(), // versionMajor
+ (short) getProtocolMinorVersion()); // versionMinor
+ _minaProtocolSession.write(response);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Received incorrect protocol initiation", e);
+
+ _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
+
+ // TODO: Close connection (but how to wait until message is sent?)
+ // ritchiem 2006-12-04 will this not do?
+// WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()]));
+// future.join();
+// close connection
+
}
+ }
+
- final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(),
- (AMQMethodBody) frame.getBodyFrame());
+ private void methodFrameReceived(int channelId, AMQMethodBody methodBody)
+ {
+
+ final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId,
+ methodBody);
//Check that this channel is not closing
- if (channelAwaitingClosure(frame.getChannel()))
+ if (channelAwaitingClosure(channelId))
{
if ((evt.getMethod() instanceof ChannelCloseOkBody))
{
if (_logger.isInfoEnabled())
{
- _logger.info("Channel[" + frame.getChannel() + "] awaiting closure - processing close-ok");
+ _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
}
}
else
{
if (_logger.isInfoEnabled())
{
- _logger.info("Channel[" + frame.getChannel() + "] awaiting closure ignoring");
+ _logger.info("Channel[" + channelId + "] awaiting closure ignoring");
}
return;
}
@@ -298,19 +317,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
if (!wasAnyoneInterested)
{
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener on Broker.");
+ throw new AMQNoMethodHandlerException(evt);
}
}
catch (AMQChannelException e)
{
- if (getChannel(frame.getChannel()) != null)
+ if (getChannel(channelId) != null)
{
if (_logger.isInfoEnabled())
{
_logger.info("Closing channel due to: " + e.getMessage());
}
- writeFrame(e.getCloseFrame(frame.getChannel()));
- closeChannel(frame.getChannel());
+ writeFrame(e.getCloseFrame(channelId));
+ closeChannel(channelId);
}
else
{
@@ -328,7 +347,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
AMQConstant.CHANNEL_ERROR.getName().toString());
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
- writeFrame(ce.getCloseFrame(frame.getChannel()));
+ writeFrame(ce.getCloseFrame(channelId));
}
}
catch (AMQConnectionException e)
@@ -339,7 +358,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
closeSession();
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
- writeFrame(e.getCloseFrame(frame.getChannel()));
+ writeFrame(e.getCloseFrame(channelId));
}
}
catch (Exception e)
@@ -353,61 +372,21 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
}
- private void contentFrameReceived(AMQFrame frame) throws AMQException
- {
- if (frame.getBodyFrame() instanceof ContentHeaderBody)
- {
- contentHeaderReceived(frame);
- }
- else if (frame.getBodyFrame() instanceof ContentBody)
- {
- contentBodyReceived(frame);
- }
- else if (frame.getBodyFrame() instanceof HeartbeatBody)
- {
- _logger.debug("Received heartbeat from client");
- }
- else
- {
- _logger.warn("Unrecognised frame " + frame.getClass().getName());
- }
- }
- private void contentHeaderReceived(AMQFrame frame) throws AMQException
+ private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Content header frame received: " + frame);
- }
- AMQChannel channel = getChannel(frame.getChannel());
+ AMQChannel channel = getAndAssertChannel(channelId);
+
+ channel.publishContentHeader(body);
- if (channel == null)
- {
- throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel());
- }
- else
- {
- channel.publishContentHeader((ContentHeaderBody) frame.getBodyFrame());
- }
}
- private void contentBodyReceived(AMQFrame frame) throws AMQException
+ private void contentBodyReceived(int channelId, ContentBody body) throws AMQException
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Content body frame received: " + frame);
- }
- AMQChannel channel = getChannel(frame.getChannel());
+ AMQChannel channel = getAndAssertChannel(channelId);
- if (channel == null)
- {
- throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel());
- }
- else
- {
- channel.publishContentBody((ContentBody) frame.getBodyFrame(), this);
- }
+ channel.publishContentBody(body, this);
}
/**
@@ -437,6 +416,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return new ArrayList<AMQChannel>(_channelMap.values());
}
+ public AMQChannel getAndAssertChannel(int channelId) throws AMQException
+ {
+ AMQChannel channel = getChannel(channelId);
+ if (channel == null)
+ {
+ throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId);
+ }
+ return channel;
+ }
+
public AMQChannel getChannel(int channelId) throws AMQException
{
if (channelAwaitingClosure(channelId))
@@ -685,24 +674,26 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private void setProtocolVersion(byte major, byte minor)
{
- _major = major;
- _minor = minor;
- _registry = MainRegistry.getVersionSpecificRegistry(major, minor);
+ _protocolVersion = new ProtocolVersion(major,minor);
+
+ _registry = MainRegistry.getVersionSpecificRegistry(_protocolVersion);
+
+ _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this);
}
public byte getProtocolMajorVersion()
{
- return _major;
+ return _protocolVersion.getMajorVersion();
}
public byte getProtocolMinorVersion()
{
- return _minor;
+ return _protocolVersion.getMinorVersion();
}
public boolean isProtocolVersion(byte major, byte minor)
{
- return _major == major && _minor == minor;
+ return getProtocolMajorVersion() == major && getProtocolMinorVersion() == minor;
}
public VersionSpecificRegistry getRegistry()
@@ -739,5 +730,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_taskList.remove(task);
}
+ public ProtocolOutputConverter getProtocolOutputConverter()
+ {
+ return _protocolOutputConverter;
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java
new file mode 100644
index 0000000000..16d74b6fc0
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+
+public class AMQNoMethodHandlerException extends AMQException
+{
+
+ public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBody> evt)
+ {
+ super("AMQMethodEvent " + evt + " was not processed by any listener on Broker.");
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index 9d397505dc..756a8b5ebe 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -39,7 +39,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.transport.ConnectorConfiguration;
@@ -53,7 +53,7 @@ import org.apache.qpid.ssl.SSLContextFactory;
* the state for the connection.
*
*/
-public class AMQPFastProtocolHandler extends IoHandlerAdapter implements ProtocolVersionList
+public class AMQPFastProtocolHandler extends IoHandlerAdapter
{
private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class);
@@ -162,12 +162,11 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco
AMQProtocolSession session = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
if (throwable instanceof AMQProtocolHeaderException)
{
- /* Find last protocol version in protocol version list. Make sure last protocol version
- listed in the build file (build-module.xml) is the latest version which will be returned
- here. */
- int i = pv.length - 1;
- protocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
+
+ protocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
+
protocolSession.close();
+
_logger.error("Error in protocol initiation " + session + ": " + throwable.getMessage(), throwable);
}
else if(throwable instanceof IOException)
@@ -176,8 +175,6 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco
}
else
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
protocolSession.write(ConnectionCloseBody.createAMQFrame(0,
session.getProtocolMajorVersion(),
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 503dc8b554..4cfee06850 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -162,4 +163,6 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
void removeSessionCloseTask(Task task);
+ public ProtocolOutputConverter getProtocolOutputConverter();
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
index ea89136a62..d2a20cdf57 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -24,6 +24,7 @@ import javax.management.JMException;
import javax.management.MBeanException;
import javax.management.MBeanNotificationInfo;
import javax.management.Notification;
+import javax.management.NotCompliantMBeanException;
import javax.management.monitor.MonitorNotification;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
@@ -65,7 +66,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
new AMQShortString("Broker Management Console has closed the connection.");
@MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
- public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws JMException
+ public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws NotCompliantMBeanException, OpenDataException
{
super(ManagedConnection.class, ManagedConnection.TYPE);
_session = session;
@@ -74,6 +75,8 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
_name = jmxEncode(new StringBuffer(remote), 0).toString();
init();
}
+
+
static
{
try
@@ -94,7 +97,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
{
_channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames,
- _channelAtttibuteNames, _channelAttributeTypes);
+ _channelAtttibuteNames, _channelAttributeTypes);
_channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java
new file mode 100644
index 0000000000..45d09e8f3e
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.AMQException;
+
+public class UnknnownMessageTypeException extends AMQException
+{
+ public UnknnownMessageTypeException(AMQDataBlock message)
+ {
+ super("Unknown message type: " + message.getClass().getName() + ": " + message);
+
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index aa7ea16afc..dedea68d18 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -20,23 +20,34 @@
*/
package org.apache.qpid.server.queue;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.txn.TransactionalContext;
/** Combines the information that make up a deliverable message into a more manageable form. */
+
+import org.apache.log4j.Logger;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Combines the information that make up a deliverable message into a more manageable form.
+ */
public class AMQMessage
{
private static final Logger _log = Logger.getLogger(AMQMessage.class);
@@ -136,7 +147,7 @@ public class AMQMessage
}
}
- private StoreContext getStoreContext()
+ public StoreContext getStoreContext()
{
return _txnContext.getStoreContext();
}
@@ -579,6 +590,7 @@ public class AMQMessage
}
}
+/*
public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
@@ -746,6 +758,12 @@ public class AMQMessage
protocolSession.writeFrame(bodyFrameIterator.next());
}
}
+*/
+
+ public AMQMessageHandle getMessageHandle()
+ {
+ return _messageHandle;
+ }
public long getSize()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 4fd89f39da..c9329a244c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -41,9 +41,9 @@ import javax.management.openmbean.TabularType;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.CommonContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
@@ -344,12 +344,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
try
{
// Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties;
+ CommonContentHeaderProperties headerProperties = (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
String mimeType = null, encoding = null;
if (headerProperties != null)
{
- mimeType = headerProperties.getContentType();
- encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
+ AMQShortString mimeTypeShortSting = headerProperties.getContentType();
+ mimeType = mimeTypeShortSting == null ? null : mimeTypeShortSting.toString();
+ encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding().toString();
}
Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
@@ -382,7 +383,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
AMQMessage msg = list.get(i - 1);
ContentHeaderBody headerBody = msg.getContentHeaderBody();
// Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
+ CommonContentHeaderProperties headerProperties = (CommonContentHeaderProperties) headerBody.properties;
String[] headerAttributes = headerProperties.toString().split(",");
Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()};
CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 208a59516c..e70926736d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -270,7 +270,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
channel.addUnacknowledgedMessage(msg, deliveryTag, null, _queue);
}
- msg.writeGetOk(protocolSession, channel.getChannelId(), deliveryTag, _queue.getMessageCount());
+ protocolSession.getProtocolOutputConverter().writeGetOk(msg, channel.getChannelId(),
+ deliveryTag, _queue.getMessageCount());
_totalMessageSize.addAndGet(-msg.getSize());
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index ede7731a06..0a2e73880c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -29,9 +29,9 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -258,7 +258,7 @@ public class SubscriptionImpl implements Subscription
{
channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
}
- msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag);
+ protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
}
}
@@ -294,7 +294,7 @@ public class SubscriptionImpl implements Subscription
msg.decrementReference(storeContext);
}
- msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag);
+ protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
}
}
@@ -466,13 +466,9 @@ public class SubscriptionImpl implements Subscription
if (_autoClose && !_sentClose)
{
_logger.info("Closing autoclose subscription:" + this);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
- (byte) 8, (byte) 0, // AMQP version (major, minor)
- consumerTag // consumerTag
- ));
+ ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter();
+ converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag);
+
_sentClose = true;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java
deleted file mode 100644
index 90aa7bb998..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-
-import org.apache.mina.common.ByteBuffer;
-
-public class ContentChunkAdapter
-{
- public static ContentBody toConentBody(ContentChunk contentBodyChunk)
- {
- return new ContentBody(contentBodyChunk.getData());
- }
-
- public static ContentChunk toConentChunk(final ContentBody contentBodyChunk)
- {
- return new ContentChunk() {
-
- public int getSize()
- {
- return contentBodyChunk.getSize();
- }
-
- public ByteBuffer getData()
- {
- return contentBodyChunk.payload;
- }
-
- public void reduceToFit()
- {
- contentBodyChunk.reduceBufferToFit();
- }
- };
-
- }
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java
deleted file mode 100644
index 6ee2fa784d..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package org.apache.qpid.server.store;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-
-public class MessagePublishInfoAdapter
-{
- private final byte _majorVersion;
- private final byte _minorVersion;
- private final int _classId;
- private final int _methodId;
-
-
- public MessagePublishInfoAdapter(byte majorVersion, byte minorVersion)
- {
- _majorVersion = majorVersion;
- _minorVersion = minorVersion;
- _classId = BasicPublishBody.getClazz(majorVersion,minorVersion);
- _methodId = BasicPublishBody.getMethod(majorVersion,minorVersion);
- }
-
- public BasicPublishBody toMethodBody(MessagePublishInfo pubInfo)
- {
- return new BasicPublishBody(_majorVersion,
- _minorVersion,
- _classId,
- _methodId,
- pubInfo.getExchange(),
- pubInfo.isImmediate(),
- pubInfo.isMandatory(),
- pubInfo.getRoutingKey(),
- 0) ; // ticket
- }
-
- public MessagePublishInfo toMessagePublishInfo(final BasicPublishBody body)
- {
- return new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return body.getExchange();
- }
-
- public boolean isImmediate()
- {
- return body.getImmediate();
- }
-
- public boolean isMandatory()
- {
- return body.getMandatory();
- }
-
- public AMQShortString getRoutingKey()
- {
- return body.getRoutingKey();
- }
- };
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index 2aa2c1872b..f6ddfdc715 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -44,7 +44,7 @@ import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.AMQMethodEvent;
public class ConnectionStartMethodHandler implements StateAwareMethodListener
@@ -69,28 +69,21 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
ConnectionStartBody body = (ConnectionStartBody) evt.getMethod();
- byte major = (byte) body.versionMajor;
- byte minor = (byte) body.versionMinor;
- boolean versionOk = false;
+ ProtocolVersion pv = new ProtocolVersion((byte) body.versionMajor,(byte) body.versionMinor);
+
// For the purposes of interop, we can make the client accept the broker's version string.
// If it does, it then internally records the version as being the latest one that it understands.
// It needs to do this since frame lookup is done by version.
- if (Boolean.getBoolean("qpid.accept.broker.version"))
- {
- versionOk = true;
- int lastIndex = ProtocolVersionList.pv.length - 1;
- major = ProtocolVersionList.pv[lastIndex][ProtocolVersionList.PROTOCOL_MAJOR];
- minor = ProtocolVersionList.pv[lastIndex][ProtocolVersionList.PROTOCOL_MINOR];
- }
- else
+ if (Boolean.getBoolean("qpid.accept.broker.version") && !pv.isSupported())
{
- versionOk = checkVersionOK(major, minor);
+
+ pv = ProtocolVersion.getLatestSupportedVersion();
}
- if (versionOk)
+ if (pv.isSupported())
{
- protocolSession.setProtocolVersion(major, minor);
+ protocolSession.setProtocolVersion(pv.getMajorVersion(), pv.getMinorVersion());
try
{
@@ -189,20 +182,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
}
}
- private boolean checkVersionOK(byte versionMajor, byte versionMinor)
- {
- byte[][] supportedVersions = ProtocolVersionList.pv;
- boolean supported = false;
- int i = supportedVersions.length;
- while ((i-- != 0) && !supported)
- {
- supported = (supportedVersions[i][ProtocolVersionList.PROTOCOL_MAJOR] == versionMajor)
- && (supportedVersions[i][ProtocolVersionList.PROTOCOL_MINOR] == versionMinor);
- }
-
- return supported;
- }
-
+
private String getFullSystemInfo()
{
StringBuffer fullSystemInfo = new StringBuffer();
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 36dd4d400c..66524edce3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -121,12 +121,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public String getJMSMessageID() throws JMSException
{
- if (getContentHeaderProperties().getMessageId() == null)
+ if (getContentHeaderProperties().getMessageIdAsString() == null)
{
getContentHeaderProperties().setMessageId("ID:" + _deliveryTag);
}
- return getContentHeaderProperties().getMessageId();
+ return getContentHeaderProperties().getMessageIdAsString();
}
public void setJMSMessageID(String messageId) throws JMSException
@@ -146,7 +146,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public byte[] getJMSCorrelationIDAsBytes() throws JMSException
{
- return getContentHeaderProperties().getCorrelationId().getBytes();
+ return getContentHeaderProperties().getCorrelationIdAsString().getBytes();
}
public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException
@@ -161,12 +161,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public String getJMSCorrelationID() throws JMSException
{
- return getContentHeaderProperties().getCorrelationId();
+ return getContentHeaderProperties().getCorrelationIdAsString();
}
public Destination getJMSReplyTo() throws JMSException
{
- String replyToEncoding = getContentHeaderProperties().getReplyTo();
+ String replyToEncoding = getContentHeaderProperties().getReplyToAsString();
if (replyToEncoding == null)
{
return null;
@@ -250,7 +250,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public String getJMSType() throws JMSException
{
- return getContentHeaderProperties().getType();
+ return getContentHeaderProperties().getTypeAsString();
}
public void setJMSType(String string) throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
index c05667902f..763af312f4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
@@ -116,7 +116,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
_data.limit(text.length()) ;
//_data.sweep();
_data.setAutoExpand(true);
- final String encoding = getContentHeaderProperties().getEncoding();
+ final String encoding = getContentHeaderProperties().getEncodingAsString();
if (encoding == null)
{
_data.put(text.getBytes());
@@ -155,11 +155,11 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
{
return null;
}
- if (getContentHeaderProperties().getEncoding() != null)
+ if (getContentHeaderProperties().getEncodingAsString() != null)
{
try
{
- _decodedValue = _data.getString(Charset.forName(getContentHeaderProperties().getEncoding()).newDecoder());
+ _decodedValue = _data.getString(Charset.forName(getContentHeaderProperties().getEncodingAsString()).newDecoder());
}
catch (CharacterCodingException e)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
index e02771d8f5..c2015f9e7c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
@@ -92,14 +92,14 @@ public class MessageFactoryRegistry
// Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over
// AMQP. When the type is null, it can only be assumed that the message is a byte message.
- AMQShortString contentTypeShortString = properties.getContentTypeShortString();
+ AMQShortString contentTypeShortString = properties.getContentType();
contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(JMSBytesMessage.MIME_TYPE)
: contentTypeShortString;
MessageFactory mf = _mimeShortStringToFactoryMap.get(contentTypeShortString);
if (mf == null)
{
- throw new AMQException("Unsupport MIME type of " + properties.getContentType());
+ throw new AMQException("Unsupport MIME type of " + properties.getContentTypeAsString());
}
else
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 055109d3be..19767b6575 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -45,8 +45,8 @@ import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.MainRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersionList;
import org.apache.qpid.framing.VersionSpecificRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.protocol.AMQConstant;
@@ -56,7 +56,7 @@ import org.apache.qpid.protocol.AMQConstant;
* The underlying protocol session is still available but clients should not
* use it to obtain session attributes.
*/
-public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareProtocolSession
+public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2;
@@ -104,7 +104,7 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP
private byte _protocolMinorVersion;
private byte _protocolMajorVersion;
- private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]);
+ private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
/**
@@ -147,11 +147,8 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP
{
// start the process of setting up the connection. This is the first place that
// data is written to the server.
- /* Find last protocol version in protocol version list. Make sure last protocol version
- listed in the build file (build-module.xml) is the latest version which will be used
- here. */
- int i = pv.length - 1;
- _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
+
+ _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
}
public String getClientID()
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java
new file mode 100644
index 0000000000..4d2737edce
--- /dev/null
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+
+public class AMQConnectionWaitException extends AMQException
+{
+ public AMQConnectionWaitException(String s, Throwable e)
+ {
+ super(s, e);
+
+ }
+}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java
new file mode 100644
index 0000000000..22a94d3c75
--- /dev/null
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQBody;
+
+public class AMQUnexpectedBodyTypeException extends AMQException
+{
+
+ public AMQUnexpectedBodyTypeException(Class<? extends AMQBody> expectedClass, AMQBody body)
+ {
+ super("Unexpected body type. Expected: " + expectedClass.getName() + "; got: " + body.getClass().getName());
+ }
+}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java
new file mode 100644
index 0000000000..721da24d53
--- /dev/null
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+
+public class AMQUnexpectedFrameTypeException extends AMQException
+{
+ public AMQUnexpectedFrameTypeException(String s)
+ {
+ super(s);
+ }
+}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
index 401a54444b..b01ec491ec 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
@@ -39,7 +39,7 @@ import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ConnectionRedirectBody;
import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.framing.ProtocolVersion;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -138,7 +138,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
}
}
- public void send(AMQDataBlock data) throws AMQException
+ public void send(AMQDataBlock data) throws AMQConnectionWaitException
{
if (_session == null)
{
@@ -146,9 +146,9 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
{
_connectionMonitor.waitUntilOpen();
}
- catch (Exception e)
+ catch (InterruptedException e)
{
- throw new AMQException("Failed to send " + data + ": " + e, e);
+ throw new AMQConnectionWaitException("Failed to send " + data + ": " + e, e);
}
}
_session.write(data);
@@ -207,7 +207,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
}
else
{
- throw new AMQException("Client only expects method body, got: " + body);
+ throw new AMQUnexpectedBodyTypeException(AMQMethodBody.class, body);
}
}
@@ -216,7 +216,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
return "MinaBrokerProxy[" + (_session == null ? super.toString() : _session.getRemoteAddress()) + "]";
}
- private class MinaBinding extends IoHandlerAdapter implements ProtocolVersionList
+ private class MinaBinding extends IoHandlerAdapter
{
public void sessionCreated(IoSession session) throws Exception
{
@@ -228,8 +228,8 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
/* Find last protocol version in protocol version list. Make sure last protocol version
listed in the build file (build-module.xml) is the latest version which will be used
here. */
- int i = pv.length - 1;
- session.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
+
+ session.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
}
public void sessionOpened(IoSession session) throws Exception
@@ -260,7 +260,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
}
else
{
- throw new AMQException("Received message of unrecognised type: " + object);
+ throw new AMQUnexpectedFrameTypeException("Received message of unrecognised type: " + object);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
index 111d9a8f20..f2e91083ca 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
@@ -86,9 +86,9 @@ public abstract class AMQMethodBody extends AMQBody
public String toString()
{
- StringBuffer buf = new StringBuffer(getClass().toString());
- buf.append(" Class: ").append(getClazz());
- buf.append(" Method: ").append(getMethod());
+ StringBuffer buf = new StringBuffer(getClass().getName());
+ buf.append("[ Class: ").append(getClazz());
+ buf.append(" Method: ").append(getMethod()).append(']');
return buf.toString();
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
index 1045b02868..8b784fa3f7 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
@@ -24,7 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
-public class BasicContentHeaderProperties implements ContentHeaderProperties
+public class BasicContentHeaderProperties implements CommonContentHeaderProperties
{
private static final Logger _logger = Logger.getLogger(BasicContentHeaderProperties.class);
@@ -421,14 +421,14 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties
}
}
- public AMQShortString getContentTypeShortString()
+ public AMQShortString getContentType()
{
decodeContentTypeIfNecessary();
return _contentType;
}
- public String getContentType()
+ public String getContentTypeAsString()
{
decodeContentTypeIfNecessary();
return _contentType == null ? null : _contentType.toString();
@@ -444,15 +444,19 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties
public void setContentType(String contentType)
{
- clearEncodedForm();
- _propertyFlags |= (1 << 15);
- _contentType = contentType == null ? null : new AMQShortString(contentType);
+ setContentType(contentType == null ? null : new AMQShortString(contentType));
+ }
+
+ public String getEncodingAsString()
+ {
+
+ return getEncoding() == null ? null : getEncoding().toString();
}
- public String getEncoding()
+ public AMQShortString getEncoding()
{
decodeIfNecessary();
- return _encoding == null ? null : _encoding.toString();
+ return _encoding;
}
public void setEncoding(String encoding)
@@ -462,6 +466,14 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties
_encoding = encoding == null ? null : new AMQShortString(encoding);
}
+ public void setEncoding(AMQShortString encoding)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 14);
+ _encoding = encoding;
+ }
+
+
public FieldTable getHeaders()
{
decodeHeadersIfNecessary();
@@ -508,7 +520,13 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties
_priority = priority;
}
- public String getCorrelationId()
+ public AMQShortString getCorrelationId()
+ {
+ decodeIfNecessary();
+ return _correlationId;
+ }
+
+ public String getCorrelationIdAsString()
{
decodeIfNecessary();
return _correlationId == null ? null : _correlationId.toString();
@@ -516,18 +534,23 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties
public void setCorrelationId(String correlationId)
{
+ setCorrelationId(correlationId == null ? null : new AMQShortString(correlationId));
+ }
+
+ public void setCorrelationId(AMQShortString correlationId)
+ {
clearEncodedForm();
_propertyFlags |= (1 << 10);
- _correlationId = correlationId == null ? null : new AMQShortString(correlationId);
+ _correlationId = correlationId;
}
- public String getReplyTo()
+ public String getReplyToAsString()
{
decodeIfNecessary();
return _replyTo == null ? null : _replyTo.toString();
}
- public AMQShortString getReplyToAsShortString()
+ public AMQShortString getReplyTo()
{
decodeIfNecessary();
return _replyTo;
@@ -561,7 +584,13 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties
}
- public String getMessageId()
+ public AMQShortString getMessageId()
+ {
+ decodeIfNecessary();
+ return _messageId;
+ }
+
+ public String getMessageIdAsString()
{
decodeIfNecessary();
return _messageId == null ? null : _messageId.toString();
@@ -574,6 +603,14 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties
_messageId = messageId == null ? null : new AMQShortString(messageId);
}
+ public void setMessageId(AMQShortString messageId)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 7);
+ _messageId = messageId;
+ }
+
+
public long getTimestamp()
{
decodeIfNecessary();
@@ -587,56 +624,102 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties
_timestamp = timestamp;
}
- public String getType()
+ public String getTypeAsString()
{
decodeIfNecessary();
return _type == null ? null : _type.toString();
}
+
+ public AMQShortString getType()
+ {
+ decodeIfNecessary();
+ return _type;
+ }
+
+
public void setType(String type)
{
+ setType(type == null ? null : new AMQShortString(type));
+ }
+
+ public void setType(AMQShortString type)
+ {
clearEncodedForm();
_propertyFlags |= (1 << 5);
- _type = type == null ? null : new AMQShortString(type);
+ _type = type;
}
- public String getUserId()
+ public String getUserIdAsString()
{
decodeIfNecessary();
return _userId == null ? null : _userId.toString();
}
+ public AMQShortString getUserId()
+ {
+ decodeIfNecessary();
+ return _userId;
+ }
+
public void setUserId(String userId)
{
+ setUserId(userId == null ? null : new AMQShortString(userId));
+ }
+
+ public void setUserId(AMQShortString userId)
+ {
clearEncodedForm();
_propertyFlags |= (1 << 4);
- _userId = userId == null ? null : new AMQShortString(userId);
+ _userId = userId;
}
- public String getAppId()
+ public String getAppIdAsString()
{
decodeIfNecessary();
return _appId == null ? null : _appId.toString();
}
+ public AMQShortString getAppId()
+ {
+ decodeIfNecessary();
+ return _appId;
+ }
+
public void setAppId(String appId)
{
+ setAppId(appId == null ? null : new AMQShortString(appId));
+ }
+
+ public void setAppId(AMQShortString appId)
+ {
clearEncodedForm();
_propertyFlags |= (1 << 3);
- _appId = appId == null ? null : new AMQShortString(appId);
+ _appId = appId;
}
- public String getClusterId()
+ public String getClusterIdAsString()
{
decodeIfNecessary();
return _clusterId == null ? null : _clusterId.toString();
}
+ public AMQShortString getClusterId()
+ {
+ decodeIfNecessary();
+ return _clusterId;
+ }
+
public void setClusterId(String clusterId)
{
+ setClusterId(clusterId == null ? null : new AMQShortString(clusterId));
+ }
+
+ public void setClusterId(AMQShortString clusterId)
+ {
clearEncodedForm();
_propertyFlags |= (1 << 2);
- _clusterId = clusterId == null ? null : new AMQShortString(clusterId);
+ _clusterId = clusterId;
}
public String toString()
diff --git a/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java
new file mode 100644
index 0000000000..1641cbf4e8
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java
@@ -0,0 +1,65 @@
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.log4j.Logger;
+
+public interface CommonContentHeaderProperties extends ContentHeaderProperties
+{
+
+ AMQShortString getContentType();
+
+ void setContentType(AMQShortString contentType);
+
+ FieldTable getHeaders();
+
+ void setHeaders(FieldTable headers);
+
+ byte getDeliveryMode();
+
+ void setDeliveryMode(byte deliveryMode);
+
+ byte getPriority();
+
+ void setPriority(byte priority);
+
+ AMQShortString getCorrelationId();
+
+ void setCorrelationId(AMQShortString correlationId);
+
+ AMQShortString getReplyTo();
+
+ void setReplyTo(AMQShortString replyTo);
+
+ long getExpiration();
+
+ void setExpiration(long expiration);
+
+ AMQShortString getMessageId();
+
+ void setMessageId(AMQShortString messageId);
+
+ long getTimestamp();
+
+ void setTimestamp(long timestamp);
+
+ AMQShortString getType();
+
+ void setType(AMQShortString type);
+
+ AMQShortString getUserId();
+
+ void setUserId(AMQShortString userId);
+
+ AMQShortString getAppId();
+
+ void setAppId(AMQShortString appId);
+
+ AMQShortString getClusterId();
+
+ void setClusterId(AMQShortString clusterId);
+
+ AMQShortString getEncoding();
+
+ void setEncoding(AMQShortString encoding);
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
index 246e5ebc90..a7544c5747 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
@@ -41,10 +41,14 @@ public class FieldTable
private LinkedHashMap<AMQShortString, AMQTypedValue> _properties;
private long _encodedSize;
private static final int INITIAL_HASHMAP_CAPACITY = 16;
+ private static final int INITIAL_ENCODED_FORM_SIZE = 256;
public FieldTable()
{
super();
+// _encodedForm = ByteBuffer.allocate(INITIAL_ENCODED_FORM_SIZE);
+// _encodedForm.setAutoExpand(true);
+// _encodedForm.limit(0);
}
/**
@@ -109,11 +113,28 @@ public class FieldTable
private AMQTypedValue setProperty(AMQShortString key, AMQTypedValue val)
{
initMapIfNecessary();
- _encodedForm = null;
- if(val == null)
+ if(_properties.containsKey(key))
+ {
+ _encodedForm = null;
+
+ if(val == null)
+ {
+ return removeKey(key);
+ }
+ }
+ else if(_encodedForm != null && val != null)
+ {
+ EncodingUtils.writeShortStringBytes(_encodedForm, key);
+ val.writeToBuffer(_encodedForm);
+
+ }
+ else if (val == null)
{
- return removeKey(key);
+ return null;
}
+
+
+
AMQTypedValue oldVal = _properties.put(key,val);
if(oldVal != null)
{
@@ -134,7 +155,7 @@ public class FieldTable
{
if(_properties == null)
{
- if(_encodedForm == null)
+ if(_encodedForm == null || _encodedSize == 0)
{
_properties = new LinkedHashMap<AMQShortString,AMQTypedValue>();
}
@@ -655,6 +676,7 @@ public class FieldTable
if (trace)
{
_logger.trace("FieldTable::writeToBuffer: Writing encoded length of " + getEncodedSize() + "...");
+ _logger.trace(_properties);
}
EncodingUtils.writeUnsignedInteger(buffer, getEncodedSize());
@@ -701,6 +723,7 @@ public class FieldTable
public void addAll(FieldTable fieldTable)
{
initMapIfNecessary();
+ _encodedForm = null;
_properties.putAll(fieldTable._properties);
recalculateEncodedSize();
}
@@ -836,7 +859,13 @@ public class FieldTable
if(_encodedForm != null)
{
- buffer.put(_encodedForm);
+
+ if(_encodedForm.position() != 0)
+ {
+ _encodedForm.flip();
+ }
+// _encodedForm.limit((int)getEncodedSize());
+ buffer.put(_encodedForm);
}
else if(_properties != null)
{
@@ -924,4 +953,33 @@ public class FieldTable
}
}
+ public int hashCode()
+ {
+ initMapIfNecessary();
+ return _properties.hashCode();
+ }
+
+
+ public boolean equals(Object o)
+ {
+ if(o == this)
+ {
+ return true;
+ }
+ if(o == null)
+ {
+ return false;
+ }
+ if(!(o instanceof FieldTable))
+ {
+ return false;
+ }
+
+ initMapIfNecessary();
+
+ FieldTable f = (FieldTable) o;
+ f.initMapIfNecessary();
+
+ return _properties.equals(f._properties);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
index 697a0f4249..8b40fe72eb 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
@@ -25,25 +25,50 @@ import org.apache.mina.common.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.qpid.AMQException;
+import java.io.UnsupportedEncodingException;
+
public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
{
- public char[] header = new char[]{'A','M','Q','P'};
+
// TODO: generate these constants automatically from the xml protocol spec file
+ public static final byte[] AMQP_HEADER = new byte[]{(byte)'A',(byte)'M',(byte)'Q',(byte)'P'};
- private static byte CURRENT_PROTOCOL_CLASS = 1;
- private static final int CURRENT_PROTOCOL_INSTANCE = 1;
+ private static final byte CURRENT_PROTOCOL_CLASS = 1;
+ private static final byte TCP_PROTOCOL_INSTANCE = 1;
+
+ public final byte[] _protocolHeader;
+ public final byte _protocolClass;
+ public final byte _protocolInstance;
+ public final byte _protocolMajor;
+ public final byte _protocolMinor;
- public byte protocolClass = CURRENT_PROTOCOL_CLASS;
- public byte protocolInstance = CURRENT_PROTOCOL_INSTANCE;
- public byte protocolMajor;
- public byte protocolMinor;
// public ProtocolInitiation() {}
- public ProtocolInitiation(byte major, byte minor)
+ public ProtocolInitiation(byte[] protocolHeader, byte protocolClass, byte protocolInstance, byte protocolMajor, byte protocolMinor)
+ {
+ _protocolHeader = protocolHeader;
+ _protocolClass = protocolClass;
+ _protocolInstance = protocolInstance;
+ _protocolMajor = protocolMajor;
+ _protocolMinor = protocolMinor;
+ }
+
+ public ProtocolInitiation(ProtocolVersion pv)
{
- protocolMajor = major;
- protocolMinor = minor;
+ this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion());
+ }
+
+
+ public ProtocolInitiation(ByteBuffer in)
+ {
+ _protocolHeader = new byte[4];
+ in.get(_protocolHeader);
+
+ _protocolClass = in.get();
+ _protocolInstance = in.get();
+ _protocolMajor = in.get();
+ _protocolMinor = in.get();
}
public long getSize()
@@ -53,19 +78,12 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
public void writePayload(ByteBuffer buffer)
{
- for (int i = 0; i < header.length; i++)
- {
- buffer.put((byte) header[i]);
- }
- buffer.put(protocolClass);
- buffer.put(protocolInstance);
- buffer.put(protocolMajor);
- buffer.put(protocolMinor);
- }
- public void populateFromBuffer(ByteBuffer buffer) throws AMQException
- {
- throw new AMQException("Method not implemented");
+ buffer.put(_protocolHeader);
+ buffer.put(_protocolClass);
+ buffer.put(_protocolInstance);
+ buffer.put(_protocolMajor);
+ buffer.put(_protocolMinor);
}
public boolean equals(Object o)
@@ -76,36 +94,36 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
}
ProtocolInitiation pi = (ProtocolInitiation) o;
- if (pi.header == null)
+ if (pi._protocolHeader == null)
{
return false;
}
- if (header.length != pi.header.length)
+ if (_protocolHeader.length != pi._protocolHeader.length)
{
return false;
}
- for (int i = 0; i < header.length; i++)
+ for (int i = 0; i < _protocolHeader.length; i++)
{
- if (header[i] != pi.header[i])
+ if (_protocolHeader[i] != pi._protocolHeader[i])
{
return false;
}
}
- return (protocolClass == pi.protocolClass &&
- protocolInstance == pi.protocolInstance &&
- protocolMajor == pi.protocolMajor &&
- protocolMinor == pi.protocolMinor);
+ return (_protocolClass == pi._protocolClass &&
+ _protocolInstance == pi._protocolInstance &&
+ _protocolMajor == pi._protocolMajor &&
+ _protocolMinor == pi._protocolMinor);
}
public static class Decoder //implements MessageDecoder
{
/**
*
- * @param session
- * @param in
+ * @param session the session
+ * @param in input buffer
* @return true if we have enough data to decode the PI frame fully, false if more
* data is required
*/
@@ -115,63 +133,62 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
}
public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out)
- throws Exception
{
- byte[] theHeader = new byte[4];
- in.get(theHeader);
- ProtocolInitiation pi = new ProtocolInitiation((byte)0, (byte)0);
- pi.header = new char[]{(char) theHeader[0],(char) theHeader[CURRENT_PROTOCOL_INSTANCE],(char) theHeader[2], (char) theHeader[3]};
- String stringHeader = new String(pi.header);
- if (!"AMQP".equals(stringHeader))
- {
- throw new AMQProtocolHeaderException("Invalid protocol header - read " + stringHeader);
- }
- pi.protocolClass = in.get();
- pi.protocolInstance = in.get();
- pi.protocolMajor = in.get();
- pi.protocolMinor = in.get();
+ ProtocolInitiation pi = new ProtocolInitiation(in);
out.write(pi);
}
}
- public void checkVersion(ProtocolVersionList pvl) throws AMQException
+ public void checkVersion() throws AMQException
{
- if (protocolClass != CURRENT_PROTOCOL_CLASS)
- {
- throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " +
- protocolClass);
- }
- if (protocolInstance != CURRENT_PROTOCOL_INSTANCE)
+
+ if(_protocolHeader.length != 4)
{
- throw new AMQProtocolInstanceException("Protocol instance " + CURRENT_PROTOCOL_INSTANCE + " was expected; received " +
- protocolInstance);
+ throw new AMQProtocolHeaderException("Protocol header should have exactly four octets");
}
-
- /* Look through list of available protocol versions */
- boolean found = false;
- for (int i=0; i<pvl.pv.length; i++)
+ for(int i = 0; i < 4; i++)
{
- if (pvl.pv[i][pvl.PROTOCOL_MAJOR] == protocolMajor &&
- pvl.pv[i][pvl.PROTOCOL_MINOR] == protocolMinor)
+ if(_protocolHeader[i] != AMQP_HEADER[i])
{
- found = true;
+ try
+ {
+ throw new AMQProtocolHeaderException("Protocol header is not correct: Got " + new String(_protocolHeader,"ISO-8859-1") + " should be: " + new String(AMQP_HEADER, "ISO-8859-1"));
+ }
+ catch (UnsupportedEncodingException e)
+ {
+
+ }
}
}
- if (!found)
+ if (_protocolClass != CURRENT_PROTOCOL_CLASS)
+ {
+ throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " +
+ _protocolClass);
+ }
+ if (_protocolInstance != TCP_PROTOCOL_INSTANCE)
+ {
+ throw new AMQProtocolInstanceException("Protocol instance " + TCP_PROTOCOL_INSTANCE + " was expected; received " +
+ _protocolInstance);
+ }
+
+ ProtocolVersion pv = new ProtocolVersion(_protocolMajor, _protocolMinor);
+
+
+ if (!pv.isSupported())
{
// TODO: add list of available versions in list to msg...
throw new AMQProtocolVersionException("Protocol version " +
- protocolMajor + "." + protocolMinor + " not found in protocol version list.");
+ _protocolMajor + "." + _protocolMinor + " not suppoerted by this version of the Qpid broker.");
}
}
public String toString()
{
- StringBuffer buffer = new StringBuffer(new String(header));
- buffer.append(Integer.toHexString(protocolClass));
- buffer.append(Integer.toHexString(protocolInstance));
- buffer.append(Integer.toHexString(protocolMajor));
- buffer.append(Integer.toHexString(protocolMinor));
+ StringBuffer buffer = new StringBuffer(new String(_protocolHeader));
+ buffer.append(Integer.toHexString(_protocolClass));
+ buffer.append(Integer.toHexString(_protocolInstance));
+ buffer.append(Integer.toHexString(_protocolMajor));
+ buffer.append(Integer.toHexString(_protocolMinor));
return buffer.toString();
}
}
diff --git a/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java b/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
index 0f706ac553..4fd1f60d69 100644
--- a/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
+++ b/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
@@ -22,8 +22,6 @@ package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
-import java.util.HashMap;
-
import junit.framework.TestCase;
@@ -94,14 +92,14 @@ public class BasicContentHeaderPropertiesTest extends TestCase
{
String contentType = "contentType";
_testProperties.setContentType(contentType);
- assertEquals(contentType, _testProperties.getContentType());
+ assertEquals(contentType, _testProperties.getContentTypeAsString());
}
public void testSetGetEncoding()
{
String encoding = "encoding";
_testProperties.setEncoding(encoding);
- assertEquals(encoding, _testProperties.getEncoding());
+ assertEquals(encoding, _testProperties.getEncodingAsString());
}
public void testSetGetHeaders()
@@ -128,14 +126,14 @@ public class BasicContentHeaderPropertiesTest extends TestCase
{
String correlationId = "correlationId";
_testProperties.setCorrelationId(correlationId);
- assertEquals(correlationId, _testProperties.getCorrelationId());
+ assertEquals(correlationId, _testProperties.getCorrelationIdAsString());
}
public void testSetGetReplyTo()
{
String replyTo = "replyTo";
_testProperties.setReplyTo(replyTo);
- assertEquals(replyTo, _testProperties.getReplyTo());
+ assertEquals(replyTo, _testProperties.getReplyToAsString());
}
public void testSetGetExpiration()
@@ -149,7 +147,7 @@ public class BasicContentHeaderPropertiesTest extends TestCase
{
String messageId = "messageId";
_testProperties.setMessageId(messageId);
- assertEquals(messageId, _testProperties.getMessageId());
+ assertEquals(messageId, _testProperties.getMessageIdAsString());
}
public void testSetGetTimestamp()
@@ -163,28 +161,28 @@ public class BasicContentHeaderPropertiesTest extends TestCase
{
String type = "type";
_testProperties.setType(type);
- assertEquals(type, _testProperties.getType());
+ assertEquals(type, _testProperties.getTypeAsString());
}
public void testSetGetUserId()
{
String userId = "userId";
_testProperties.setUserId(userId);
- assertEquals(userId, _testProperties.getUserId());
+ assertEquals(userId, _testProperties.getUserIdAsString());
}
public void testSetGetAppId()
{
String appId = "appId";
_testProperties.setAppId(appId);
- assertEquals(appId, _testProperties.getAppId());
+ assertEquals(appId, _testProperties.getAppIdAsString());
}
public void testSetGetClusterId()
{
String clusterId = "clusterId";
_testProperties.setClusterId(clusterId);
- assertEquals(clusterId, _testProperties.getClusterId());
+ assertEquals(clusterId, _testProperties.getClusterIdAsString());
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
index bab7954d11..c334547b7f 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -26,6 +26,7 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.VersionSpecificRegistry;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
@@ -170,6 +171,11 @@ public class MockProtocolSession implements AMQProtocolSession
//To change body of implemented methods use File | Settings | File Templates.
}
+ public ProtocolOutputConverter getProtocolOutputConverter()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public byte getProtocolMajorVersion()
{
return 8; //To change body of implemented methods use File | Settings | File Templates.