diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2007-02-25 01:08:57 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2007-02-25 01:08:57 +0000 |
commit | 2ea003c24ab3170dec118af6f9f8c128241cec65 (patch) | |
tree | 5d062e29ec7eceeda453c1402117978c211a2db8 /java | |
parent | b9f9c16645933e0e2f4c6c9b58e8cd1716434467 (diff) | |
download | qpid-python-2ea003c24ab3170dec118af6f9f8c128241cec65.tar.gz |
QPID-391 : Broker Refactoring - initial tidy... add some mechanisms for multi version
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@511389 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
35 files changed, 1166 insertions, 476 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 7ceb3a7eef..be2cee79ee 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -433,7 +433,10 @@ public class AMQChannel } - /** Called to resend all outstanding unacknowledged messages to this same channel. */ + /** Called to resend all outstanding unacknowledged messages to this same channel. + * @param session the session + * @param requeue if true then requeue, else resend + * @throws org.apache.qpid.AMQException */ public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException { final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>(); @@ -752,7 +755,9 @@ public class AMQChannel for (RequiredDeliveryException bouncedMessage : _returnMessages) { AMQMessage message = bouncedMessage.getAMQMessage(); - message.writeReturn(session, _channelId, bouncedMessage.getReplyCode().getCode(), new AMQShortString(bouncedMessage.getMessage())); + session.getProtocolOutputConverter().writeReturn(message, _channelId, + bouncedMessage.getReplyCode().getCode(), + new AMQShortString(bouncedMessage.getMessage())); } _returnMessages.clear(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index 42fe8c5274..a48bc5df7f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -45,7 +45,7 @@ import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ProtocolVersionList; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.protocol.AMQPFastProtocolHandler; @@ -59,7 +59,8 @@ import org.apache.qpid.url.URLSyntaxException; * Main entry point for AMQPD. * */ -public class Main implements ProtocolVersionList +@SuppressWarnings({"AccessStaticViaInstance"}) +public class Main { private static final Logger _logger = Logger.getLogger(Main.class); @@ -143,12 +144,21 @@ public class Main implements ProtocolVersionList else if (commandLine.hasOption("v")) { String ver = "Qpid 0.9.0.0"; - String protocol = "AMQP version(s) [major.minor]: "; - for (int i=0; i<pv.length; i++) + StringBuilder protocol = new StringBuilder("AMQP version(s) [major.minor]: "); + + boolean first = true; + for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions()) { - if (i > 0) - protocol += ", "; - protocol += pv[i][PROTOCOL_MAJOR] + "." + pv[i][PROTOCOL_MINOR]; + if(first) + { + first = false; + } + else + { + protocol.append(", "); + } + protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion()); + } System.out.println(ver + " (" + protocol + ")"); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index fdf087fdea..99cc60011a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -209,7 +209,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap if(consumerTag != null) { - msg.writeDeliver(protocolSession, channelId, deliveryTag, consumerTag); + protocolSession.getProtocolOutputConverter().writeDeliver(msg, channelId, deliveryTag, consumerTag); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java index 348bfa5e68..bdabcbf5be 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java @@ -25,7 +25,8 @@ import java.util.HashMap; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.CommonContentHeaderProperties; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQMessage; /** @@ -63,8 +64,9 @@ public class PropertyExpression implements Expression { try { - BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; - return _properties.getReplyTo(); + CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties; + AMQShortString replyTo = _properties.getReplyTo(); + return replyTo == null ? null : replyTo.toString(); } catch (AMQException e) { @@ -83,8 +85,9 @@ public class PropertyExpression implements Expression { try { - BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; - return _properties.getType(); + CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties; + AMQShortString type = _properties.getType(); + return type == null ? null : type.toString(); } catch (AMQException e) { @@ -126,7 +129,7 @@ public class PropertyExpression implements Expression { try { - BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; + CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties; return (int) _properties.getPriority(); } catch (AMQException e) @@ -147,8 +150,9 @@ public class PropertyExpression implements Expression try { - BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; - return _properties.getMessageId(); + CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties; + AMQShortString messageId = _properties.getMessageId(); + return messageId == null ? null : messageId; } catch (AMQException e) { @@ -168,7 +172,7 @@ public class PropertyExpression implements Expression try { - BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; + CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties; return _properties.getTimestamp(); } catch (AMQException e) @@ -189,8 +193,9 @@ public class PropertyExpression implements Expression try { - BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; - return _properties.getCorrelationId(); + CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties; + AMQShortString correlationId = _properties.getCorrelationId(); + return correlationId == null ? null : correlationId.toString(); } catch (AMQException e) { @@ -210,7 +215,7 @@ public class PropertyExpression implements Expression try { - BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; + CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties; return _properties.getExpiration(); } catch (AMQException e) @@ -254,7 +259,7 @@ public class PropertyExpression implements Expression else { - BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; + CommonContentHeaderProperties _properties = (CommonContentHeaderProperties) message.getContentHeaderBody().properties; if(_logger.isDebugEnabled()) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java new file mode 100644 index 0000000000..e01c5aabbf --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java @@ -0,0 +1,57 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP versions:
+ * 8-0
+ */
+package org.apache.qpid.server.output;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.AMQException;
+
+public interface ProtocolOutputConverter
+{
+ void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag);
+
+ interface Factory
+ {
+ ProtocolOutputConverter newInstance(AMQProtocolSession session);
+ }
+
+ void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException;
+
+ void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException;
+
+ byte getProtocolMinorVersion();
+
+ byte getProtocolMajorVersion();
+
+ void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
+ throws AMQException;
+
+ void writeFrame(AMQDataBlock block);
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java new file mode 100644 index 0000000000..8366c426dd --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java @@ -0,0 +1,62 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP versions:
+ * 8-0
+ */
+package org.apache.qpid.server.output;
+
+import org.apache.qpid.server.output.ProtocolOutputConverter.Factory;
+import org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+import java.util.Map;
+import java.util.HashMap;
+
+public class ProtocolOutputConverterRegistry
+{
+
+ private static final Map<Byte, Map<Byte, Factory>> _registry =
+ new HashMap<Byte, Map<Byte, Factory>>();
+
+
+ static
+ {
+ register((byte) 8, (byte) 0, ProtocolOutputConverterImpl.getInstanceFactory());
+ }
+
+ private static void register(byte major, byte minor, Factory converter)
+ {
+ if(!_registry.containsKey(major))
+ {
+ _registry.put(major, new HashMap<Byte, Factory>());
+ }
+ _registry.get(major).put(minor, converter);
+ }
+
+
+ public static ProtocolOutputConverter getConverter(AMQProtocolSession session)
+ {
+ return _registry.get(session.getProtocolMajorVersion()).get(session.getProtocolMinorVersion()).newInstance(session);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java new file mode 100644 index 0000000000..bd5bb632fe --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java @@ -0,0 +1,288 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP versions:
+ * 8-0
+ */
+package org.apache.qpid.server.output.amqp0_8;
+
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.util.Iterator;
+
+public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+{
+
+ public static Factory getInstanceFactory()
+ {
+ return new Factory()
+ {
+
+ public ProtocolOutputConverter newInstance(AMQProtocolSession session)
+ {
+ return new ProtocolOutputConverterImpl(session);
+ }
+ };
+ }
+
+ private final AMQProtocolSession _protocolSession;
+
+ private ProtocolOutputConverterImpl(AMQProtocolSession session)
+ {
+ _protocolSession = session;
+ }
+
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
+
+ public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ ByteBuffer deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ message.getContentHeaderBody());
+
+ final AMQMessageHandle messageHandle = message.getMessageHandle();
+ final StoreContext storeContext = message.getStoreContext();
+ final long messageId = message.getMessageId();
+
+ final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
+
+ if(bodyCount == 0)
+ {
+ SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+ contentHeader);
+
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+
+
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
+
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ writeFrame(compositeBlock);
+
+ //
+ // Now start writing out the other content bodies
+ //
+ for(int i = 1; i < bodyCount; i++)
+ {
+ cb = messageHandle.getContentChunk(storeContext,messageId, i);
+ writeFrame(new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ }
+
+
+ }
+
+
+ }
+
+
+ public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+
+ final AMQMessageHandle messageHandle = message.getMessageHandle();
+ final StoreContext storeContext = message.getStoreContext();
+ final long messageId = message.getMessageId();
+
+ ByteBuffer deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ message.getContentHeaderBody());
+
+ final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
+ if(bodyCount == 0)
+ {
+ SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+ contentHeader);
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+
+
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
+
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ writeFrame(compositeBlock);
+
+ //
+ // Now start writing out the other content bodies
+ //
+ for(int i = 1; i < bodyCount; i++)
+ {
+ cb = messageHandle.getContentChunk(storeContext, messageId, i);
+ writeFrame(new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ }
+
+
+ }
+
+
+ }
+
+
+ private ByteBuffer createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ final AMQMessageHandle messageHandle = message.getMessageHandle();
+
+ AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ consumerTag,
+ deliveryTag, pb.getExchange(), messageHandle.isRedelivered(),
+ pb.getRoutingKey());
+
+ ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
+ deliverFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+
+ private ByteBuffer createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+ throws AMQException
+ {
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ final AMQMessageHandle messageHandle = message.getMessageHandle();
+
+ AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ deliveryTag, pb.getExchange(),
+ queueSize,
+ messageHandle.isRedelivered(),
+ pb.getRoutingKey());
+ ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
+ getOkFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return getProtocolSession().getProtocolMinorVersion();
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return getProtocolSession().getProtocolMajorVersion();
+ }
+
+ private ByteBuffer createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ {
+ AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ message.getMessagePublishInfo().getExchange(),
+ replyCode, replyText,
+ message.getMessagePublishInfo().getRoutingKey());
+ ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
+ returnFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+
+ public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
+ throws AMQException
+ {
+ ByteBuffer returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ message.getContentHeaderBody());
+
+ Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ if (bodyFrameIterator.hasNext())
+ {
+ AMQDataBlock firstContentBody = bodyFrameIterator.next();
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
+ new AMQDataBlock[]{contentHeader});
+
+ writeFrame(compositeBlock);
+ }
+
+ //
+ // Now start writing out the other content bodies
+ // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
+ //
+ while (bodyFrameIterator.hasNext())
+ {
+ writeFrame(bodyFrameIterator.next());
+ }
+ }
+
+
+ public void writeFrame(AMQDataBlock block)
+ {
+ getProtocolSession().writeFrame(block);
+ }
+
+
+ public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+ {
+
+ writeFrame(BasicCancelOkBody.createAMQFrame(channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ consumerTag // consumerTag
+ ));
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 2de32c2f0f..d71f6e3046 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -43,25 +43,14 @@ import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.common.ClientProperties; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ConnectionStartBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.framing.MainRegistry; -import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersionList; -import org.apache.qpid.framing.VersionSpecificRegistry; -import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.*; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -71,7 +60,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; public class AMQMinaProtocolSession implements AMQProtocolSession, - ProtocolVersionList, Managable { private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); @@ -111,12 +99,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private long _maxNoOfChannels = 1000; /* AMQP Version for this session */ - private byte _major = pv[pv.length - 1][PROTOCOL_MAJOR]; - private byte _minor = pv[pv.length - 1][PROTOCOL_MINOR]; + private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); + private FieldTable _clientProperties; private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); - private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length - 1][PROTOCOL_MAJOR], pv[pv.length - 1][PROTOCOL_MINOR]); + private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(_protocolVersion); private List<Integer> _closingChannelsList = new ArrayList<Integer>(); + private ProtocolOutputConverter _protocolOutputConverter; public ManagedObject getManagedObject() @@ -195,86 +184,116 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _lastReceived = message; if (message instanceof ProtocolInitiation) { - ProtocolInitiation pi = (ProtocolInitiation) message; - // this ensures the codec never checks for a PI message again - ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false); - try - { - pi.checkVersion(this); // Fails if not correct + protocolInitiationReceived((ProtocolInitiation) message); - // This sets the protocol version (and hence framing classes) for this session. - setProtocolVersion(pi.protocolMajor, pi.protocolMinor); + } + else if (message instanceof AMQFrame) + { + AMQFrame frame = (AMQFrame) message; + frameReceived(frame); - String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms(); + } + else + { + throw new UnknnownMessageTypeException(message); + } + } - String locales = "en_US"; + private void frameReceived(AMQFrame frame) + throws AMQException + { + int channelId = frame.getChannel(); + AMQBody body = frame.getBodyFrame(); - // Interfacing with generated code - be aware of possible changes to parameter order as versions change. - AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, - _major, _minor, // AMQP version (major, minor) - locales.getBytes(), // locales - mechanisms.getBytes(), // mechanisms - null, // serverProperties - (short) _major, // versionMajor - (short) _minor); // versionMinor - _minaProtocolSession.write(response); - } - catch (AMQException e) - { - _logger.error("Received incorrect protocol initiation", e); - /* Find last protocol version in protocol version list. Make sure last protocol version - listed in the build file (build-module.xml) is the latest version which will be used - here. */ - int i = pv.length - 1; - _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); - // TODO: Close connection (but how to wait until message is sent?) - // ritchiem 2006-12-04 will this not do? -// WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); -// future.join(); -// close connection + if(_logger.isDebugEnabled()) + { + _logger.debug("Frame Received: " + frame); + } - } + if (body instanceof AMQMethodBody) + { + methodFrameReceived(channelId, (AMQMethodBody)body); + } + else if (body instanceof ContentHeaderBody) + { + contentHeaderReceived(channelId, (ContentHeaderBody)body); + } + else if (body instanceof ContentBody) + { + contentBodyReceived(channelId, (ContentBody)body); + } + else if (body instanceof HeartbeatBody) + { + // NO OP } else { - AMQFrame frame = (AMQFrame) message; - - if (frame.getBodyFrame() instanceof AMQMethodBody) - { - methodFrameReceived(frame); - } - else - { - contentFrameReceived(frame); - } + _logger.warn("Unrecognised frame " + frame.getClass().getName()); } } - private void methodFrameReceived(AMQFrame frame) + private void protocolInitiationReceived(ProtocolInitiation pi) { - if (_logger.isDebugEnabled()) + // this ensures the codec never checks for a PI message again + ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false); + try { - _logger.debug("Method frame received: " + frame); + pi.checkVersion(); // Fails if not correct + + // This sets the protocol version (and hence framing classes) for this session. + setProtocolVersion(pi._protocolMajor, pi._protocolMinor); + + String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms(); + + String locales = "en_US"; + + // Interfacing with generated code - be aware of possible changes to parameter order as versions change. + AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) + locales.getBytes(), // locales + mechanisms.getBytes(), // mechanisms + null, // serverProperties + (short) getProtocolMajorVersion(), // versionMajor + (short) getProtocolMinorVersion()); // versionMinor + _minaProtocolSession.write(response); + } + catch (AMQException e) + { + _logger.error("Received incorrect protocol initiation", e); + + _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); + + // TODO: Close connection (but how to wait until message is sent?) + // ritchiem 2006-12-04 will this not do? +// WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()])); +// future.join(); +// close connection + } + } + - final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), - (AMQMethodBody) frame.getBodyFrame()); + private void methodFrameReceived(int channelId, AMQMethodBody methodBody) + { + + final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, + methodBody); //Check that this channel is not closing - if (channelAwaitingClosure(frame.getChannel())) + if (channelAwaitingClosure(channelId)) { if ((evt.getMethod() instanceof ChannelCloseOkBody)) { if (_logger.isInfoEnabled()) { - _logger.info("Channel[" + frame.getChannel() + "] awaiting closure - processing close-ok"); + _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); } } else { if (_logger.isInfoEnabled()) { - _logger.info("Channel[" + frame.getChannel() + "] awaiting closure ignoring"); + _logger.info("Channel[" + channelId + "] awaiting closure ignoring"); } return; } @@ -298,19 +317,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } if (!wasAnyoneInterested) { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener on Broker."); + throw new AMQNoMethodHandlerException(evt); } } catch (AMQChannelException e) { - if (getChannel(frame.getChannel()) != null) + if (getChannel(channelId) != null) { if (_logger.isInfoEnabled()) { _logger.info("Closing channel due to: " + e.getMessage()); } - writeFrame(e.getCloseFrame(frame.getChannel())); - closeChannel(frame.getChannel()); + writeFrame(e.getCloseFrame(channelId)); + closeChannel(channelId); } else { @@ -328,7 +347,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, AMQConstant.CHANNEL_ERROR.getName().toString()); _stateManager.changeState(AMQState.CONNECTION_CLOSING); - writeFrame(ce.getCloseFrame(frame.getChannel())); + writeFrame(ce.getCloseFrame(channelId)); } } catch (AMQConnectionException e) @@ -339,7 +358,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } closeSession(); _stateManager.changeState(AMQState.CONNECTION_CLOSING); - writeFrame(e.getCloseFrame(frame.getChannel())); + writeFrame(e.getCloseFrame(channelId)); } } catch (Exception e) @@ -353,61 +372,21 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } } - private void contentFrameReceived(AMQFrame frame) throws AMQException - { - if (frame.getBodyFrame() instanceof ContentHeaderBody) - { - contentHeaderReceived(frame); - } - else if (frame.getBodyFrame() instanceof ContentBody) - { - contentBodyReceived(frame); - } - else if (frame.getBodyFrame() instanceof HeartbeatBody) - { - _logger.debug("Received heartbeat from client"); - } - else - { - _logger.warn("Unrecognised frame " + frame.getClass().getName()); - } - } - private void contentHeaderReceived(AMQFrame frame) throws AMQException + private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException { - if (_logger.isDebugEnabled()) - { - _logger.debug("Content header frame received: " + frame); - } - AMQChannel channel = getChannel(frame.getChannel()); + AMQChannel channel = getAndAssertChannel(channelId); + + channel.publishContentHeader(body); - if (channel == null) - { - throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel()); - } - else - { - channel.publishContentHeader((ContentHeaderBody) frame.getBodyFrame()); - } } - private void contentBodyReceived(AMQFrame frame) throws AMQException + private void contentBodyReceived(int channelId, ContentBody body) throws AMQException { - if (_logger.isDebugEnabled()) - { - _logger.debug("Content body frame received: " + frame); - } - AMQChannel channel = getChannel(frame.getChannel()); + AMQChannel channel = getAndAssertChannel(channelId); - if (channel == null) - { - throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel()); - } - else - { - channel.publishContentBody((ContentBody) frame.getBodyFrame(), this); - } + channel.publishContentBody(body, this); } /** @@ -437,6 +416,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return new ArrayList<AMQChannel>(_channelMap.values()); } + public AMQChannel getAndAssertChannel(int channelId) throws AMQException + { + AMQChannel channel = getChannel(channelId); + if (channel == null) + { + throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId); + } + return channel; + } + public AMQChannel getChannel(int channelId) throws AMQException { if (channelAwaitingClosure(channelId)) @@ -685,24 +674,26 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private void setProtocolVersion(byte major, byte minor) { - _major = major; - _minor = minor; - _registry = MainRegistry.getVersionSpecificRegistry(major, minor); + _protocolVersion = new ProtocolVersion(major,minor); + + _registry = MainRegistry.getVersionSpecificRegistry(_protocolVersion); + + _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this); } public byte getProtocolMajorVersion() { - return _major; + return _protocolVersion.getMajorVersion(); } public byte getProtocolMinorVersion() { - return _minor; + return _protocolVersion.getMinorVersion(); } public boolean isProtocolVersion(byte major, byte minor) { - return _major == major && _minor == minor; + return getProtocolMajorVersion() == major && getProtocolMinorVersion() == minor; } public VersionSpecificRegistry getRegistry() @@ -739,5 +730,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _taskList.remove(task); } + public ProtocolOutputConverter getProtocolOutputConverter() + { + return _protocolOutputConverter; + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java new file mode 100644 index 0000000000..16d74b6fc0 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java @@ -0,0 +1,34 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+
+public class AMQNoMethodHandlerException extends AMQException
+{
+
+ public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBody> evt)
+ {
+ super("AMQMethodEvent " + evt + " was not processed by any listener on Broker.");
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 9d397505dc..756a8b5ebe 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -39,7 +39,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersionList; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.transport.ConnectorConfiguration; @@ -53,7 +53,7 @@ import org.apache.qpid.ssl.SSLContextFactory; * the state for the connection. * */ -public class AMQPFastProtocolHandler extends IoHandlerAdapter implements ProtocolVersionList +public class AMQPFastProtocolHandler extends IoHandlerAdapter { private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class); @@ -162,12 +162,11 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco AMQProtocolSession session = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession); if (throwable instanceof AMQProtocolHeaderException) { - /* Find last protocol version in protocol version list. Make sure last protocol version - listed in the build file (build-module.xml) is the latest version which will be returned - here. */ - int i = pv.length - 1; - protocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); + + protocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); + protocolSession.close(); + _logger.error("Error in protocol initiation " + session + ": " + throwable.getMessage(), throwable); } else if(throwable instanceof IOException) @@ -176,8 +175,6 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco } else { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. protocolSession.write(ConnectionCloseBody.createAMQFrame(0, session.getProtocolMajorVersion(), diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 503dc8b554..4cfee06850 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -162,4 +163,6 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession void removeSessionCloseTask(Task task); + public ProtocolOutputConverter getProtocolOutputConverter(); + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index ea89136a62..d2a20cdf57 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -24,6 +24,7 @@ import javax.management.JMException; import javax.management.MBeanException; import javax.management.MBeanNotificationInfo; import javax.management.Notification; +import javax.management.NotCompliantMBeanException; import javax.management.monitor.MonitorNotification; import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; @@ -65,7 +66,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed new AMQShortString("Broker Management Console has closed the connection."); @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection") - public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws JMException + public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws NotCompliantMBeanException, OpenDataException { super(ManagedConnection.class, ManagedConnection.TYPE); _session = session; @@ -74,6 +75,8 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed _name = jmxEncode(new StringBuffer(remote), 0).toString(); init(); } + + static { try @@ -94,7 +97,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed { _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames, - _channelAtttibuteNames, _channelAttributeTypes); + _channelAtttibuteNames, _channelAttributeTypes); _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java new file mode 100644 index 0000000000..45d09e8f3e --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java @@ -0,0 +1,33 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.AMQException;
+
+public class UnknnownMessageTypeException extends AMQException
+{
+ public UnknnownMessageTypeException(AMQDataBlock message)
+ {
+ super("Unknown message type: " + message.getClass().getName() + ": " + message);
+
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index aa7ea16afc..dedea68d18 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -20,23 +20,34 @@ */ package org.apache.qpid.server.queue; -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; -import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.txn.TransactionalContext; /** Combines the information that make up a deliverable message into a more manageable form. */ + +import org.apache.log4j.Logger; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Combines the information that make up a deliverable message into a more manageable form. + */ public class AMQMessage { private static final Logger _log = Logger.getLogger(AMQMessage.class); @@ -136,7 +147,7 @@ public class AMQMessage } } - private StoreContext getStoreContext() + public StoreContext getStoreContext() { return _txnContext.getStoreContext(); } @@ -579,6 +590,7 @@ public class AMQMessage } } +/* public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException { @@ -746,6 +758,12 @@ public class AMQMessage protocolSession.writeFrame(bodyFrameIterator.next()); } } +*/ + + public AMQMessageHandle getMessageHandle() + { + return _messageHandle; + } public long getSize() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 4fd89f39da..c9329a244c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -41,9 +41,9 @@ import javax.management.openmbean.TabularType; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.CommonContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.MBeanConstructor; @@ -344,12 +344,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que try { // Create header attributes list - BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties; + CommonContentHeaderProperties headerProperties = (CommonContentHeaderProperties) msg.getContentHeaderBody().properties; String mimeType = null, encoding = null; if (headerProperties != null) { - mimeType = headerProperties.getContentType(); - encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding(); + AMQShortString mimeTypeShortSting = headerProperties.getContentType(); + mimeType = mimeTypeShortSting == null ? null : mimeTypeShortSting.toString(); + encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding().toString(); } Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])}; return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues); @@ -382,7 +383,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que AMQMessage msg = list.get(i - 1); ContentHeaderBody headerBody = msg.getContentHeaderBody(); // Create header attributes list - BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties; + CommonContentHeaderProperties headerProperties = (CommonContentHeaderProperties) headerBody.properties; String[] headerAttributes = headerProperties.toString().split(","); Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()}; CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 208a59516c..e70926736d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -270,7 +270,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager channel.addUnacknowledgedMessage(msg, deliveryTag, null, _queue); } - msg.writeGetOk(protocolSession, channel.getChannelId(), deliveryTag, _queue.getMessageCount()); + protocolSession.getProtocolOutputConverter().writeGetOk(msg, channel.getChannelId(), + deliveryTag, _queue.getMessageCount()); _totalMessageSize.addAndGet(-msg.getSize()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index ede7731a06..0a2e73880c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -29,9 +29,9 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -258,7 +258,7 @@ public class SubscriptionImpl implements Subscription { channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue); } - msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag); + protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); } } @@ -294,7 +294,7 @@ public class SubscriptionImpl implements Subscription msg.decrementReference(storeContext); } - msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag); + protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); } } @@ -466,13 +466,9 @@ public class SubscriptionImpl implements Subscription if (_autoClose && !_sentClose) { _logger.info("Closing autoclose subscription:" + this); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), - (byte) 8, (byte) 0, // AMQP version (major, minor) - consumerTag // consumerTag - )); + ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter(); + converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag); + _sentClose = true; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java deleted file mode 100644 index 90aa7bb998..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java +++ /dev/null @@ -1,57 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-
-import org.apache.mina.common.ByteBuffer;
-
-public class ContentChunkAdapter
-{
- public static ContentBody toConentBody(ContentChunk contentBodyChunk)
- {
- return new ContentBody(contentBodyChunk.getData());
- }
-
- public static ContentChunk toConentChunk(final ContentBody contentBodyChunk)
- {
- return new ContentChunk() {
-
- public int getSize()
- {
- return contentBodyChunk.getSize();
- }
-
- public ByteBuffer getData()
- {
- return contentBodyChunk.payload;
- }
-
- public void reduceToFit()
- {
- contentBodyChunk.reduceBufferToFit();
- }
- };
-
- }
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java deleted file mode 100644 index 6ee2fa784d..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java +++ /dev/null @@ -1,62 +0,0 @@ -package org.apache.qpid.server.store;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-
-public class MessagePublishInfoAdapter
-{
- private final byte _majorVersion;
- private final byte _minorVersion;
- private final int _classId;
- private final int _methodId;
-
-
- public MessagePublishInfoAdapter(byte majorVersion, byte minorVersion)
- {
- _majorVersion = majorVersion;
- _minorVersion = minorVersion;
- _classId = BasicPublishBody.getClazz(majorVersion,minorVersion);
- _methodId = BasicPublishBody.getMethod(majorVersion,minorVersion);
- }
-
- public BasicPublishBody toMethodBody(MessagePublishInfo pubInfo)
- {
- return new BasicPublishBody(_majorVersion,
- _minorVersion,
- _classId,
- _methodId,
- pubInfo.getExchange(),
- pubInfo.isImmediate(),
- pubInfo.isMandatory(),
- pubInfo.getRoutingKey(),
- 0) ; // ticket
- }
-
- public MessagePublishInfo toMessagePublishInfo(final BasicPublishBody body)
- {
- return new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return body.getExchange();
- }
-
- public boolean isImmediate()
- {
- return body.getImmediate();
- }
-
- public boolean isMandatory()
- {
- return body.getMandatory();
- }
-
- public AMQShortString getRoutingKey()
- {
- return body.getRoutingKey();
- }
- };
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index 2aa2c1872b..f6ddfdc715 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -44,7 +44,7 @@ import org.apache.qpid.framing.ConnectionStartBody; import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; -import org.apache.qpid.framing.ProtocolVersionList; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.protocol.AMQMethodEvent; public class ConnectionStartMethodHandler implements StateAwareMethodListener @@ -69,28 +69,21 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener ConnectionStartBody body = (ConnectionStartBody) evt.getMethod(); - byte major = (byte) body.versionMajor; - byte minor = (byte) body.versionMinor; - boolean versionOk = false; + ProtocolVersion pv = new ProtocolVersion((byte) body.versionMajor,(byte) body.versionMinor); + // For the purposes of interop, we can make the client accept the broker's version string. // If it does, it then internally records the version as being the latest one that it understands. // It needs to do this since frame lookup is done by version. - if (Boolean.getBoolean("qpid.accept.broker.version")) - { - versionOk = true; - int lastIndex = ProtocolVersionList.pv.length - 1; - major = ProtocolVersionList.pv[lastIndex][ProtocolVersionList.PROTOCOL_MAJOR]; - minor = ProtocolVersionList.pv[lastIndex][ProtocolVersionList.PROTOCOL_MINOR]; - } - else + if (Boolean.getBoolean("qpid.accept.broker.version") && !pv.isSupported()) { - versionOk = checkVersionOK(major, minor); + + pv = ProtocolVersion.getLatestSupportedVersion(); } - if (versionOk) + if (pv.isSupported()) { - protocolSession.setProtocolVersion(major, minor); + protocolSession.setProtocolVersion(pv.getMajorVersion(), pv.getMinorVersion()); try { @@ -189,20 +182,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener } } - private boolean checkVersionOK(byte versionMajor, byte versionMinor) - { - byte[][] supportedVersions = ProtocolVersionList.pv; - boolean supported = false; - int i = supportedVersions.length; - while ((i-- != 0) && !supported) - { - supported = (supportedVersions[i][ProtocolVersionList.PROTOCOL_MAJOR] == versionMajor) - && (supportedVersions[i][ProtocolVersionList.PROTOCOL_MINOR] == versionMinor); - } - - return supported; - } - + private String getFullSystemInfo() { StringBuffer fullSystemInfo = new StringBuffer(); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 36dd4d400c..66524edce3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -121,12 +121,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public String getJMSMessageID() throws JMSException { - if (getContentHeaderProperties().getMessageId() == null) + if (getContentHeaderProperties().getMessageIdAsString() == null) { getContentHeaderProperties().setMessageId("ID:" + _deliveryTag); } - return getContentHeaderProperties().getMessageId(); + return getContentHeaderProperties().getMessageIdAsString(); } public void setJMSMessageID(String messageId) throws JMSException @@ -146,7 +146,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public byte[] getJMSCorrelationIDAsBytes() throws JMSException { - return getContentHeaderProperties().getCorrelationId().getBytes(); + return getContentHeaderProperties().getCorrelationIdAsString().getBytes(); } public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException @@ -161,12 +161,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public String getJMSCorrelationID() throws JMSException { - return getContentHeaderProperties().getCorrelationId(); + return getContentHeaderProperties().getCorrelationIdAsString(); } public Destination getJMSReplyTo() throws JMSException { - String replyToEncoding = getContentHeaderProperties().getReplyTo(); + String replyToEncoding = getContentHeaderProperties().getReplyToAsString(); if (replyToEncoding == null) { return null; @@ -250,7 +250,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public String getJMSType() throws JMSException { - return getContentHeaderProperties().getType(); + return getContentHeaderProperties().getTypeAsString(); } public void setJMSType(String string) throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index c05667902f..763af312f4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -116,7 +116,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text _data.limit(text.length()) ; //_data.sweep(); _data.setAutoExpand(true); - final String encoding = getContentHeaderProperties().getEncoding(); + final String encoding = getContentHeaderProperties().getEncodingAsString(); if (encoding == null) { _data.put(text.getBytes()); @@ -155,11 +155,11 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text { return null; } - if (getContentHeaderProperties().getEncoding() != null) + if (getContentHeaderProperties().getEncodingAsString() != null) { try { - _decodedValue = _data.getString(Charset.forName(getContentHeaderProperties().getEncoding()).newDecoder()); + _decodedValue = _data.getString(Charset.forName(getContentHeaderProperties().getEncodingAsString()).newDecoder()); } catch (CharacterCodingException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index e02771d8f5..c2015f9e7c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -92,14 +92,14 @@ public class MessageFactoryRegistry // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over // AMQP. When the type is null, it can only be assumed that the message is a byte message. - AMQShortString contentTypeShortString = properties.getContentTypeShortString(); + AMQShortString contentTypeShortString = properties.getContentType(); contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(JMSBytesMessage.MIME_TYPE) : contentTypeShortString; MessageFactory mf = _mimeShortStringToFactoryMap.get(contentTypeShortString); if (mf == null) { - throw new AMQException("Unsupport MIME type of " + properties.getContentType()); + throw new AMQException("Unsupport MIME type of " + properties.getContentTypeAsString()); } else { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 055109d3be..19767b6575 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -45,8 +45,8 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.MainRegistry; import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersionList; import org.apache.qpid.framing.VersionSpecificRegistry; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.protocol.AMQConstant; @@ -56,7 +56,7 @@ import org.apache.qpid.protocol.AMQConstant; * The underlying protocol session is still available but clients should not * use it to obtain session attributes. */ -public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareProtocolSession +public class AMQProtocolSession implements AMQVersionAwareProtocolSession { protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2; @@ -104,7 +104,7 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP private byte _protocolMinorVersion; private byte _protocolMajorVersion; - private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]); + private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion()); /** @@ -147,11 +147,8 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP { // start the process of setting up the connection. This is the first place that // data is written to the server. - /* Find last protocol version in protocol version list. Make sure last protocol version - listed in the build file (build-module.xml) is the latest version which will be used - here. */ - int i = pv.length - 1; - _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); + + _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); } public String getClientID() diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java new file mode 100644 index 0000000000..4d2737edce --- /dev/null +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java @@ -0,0 +1,32 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+
+public class AMQConnectionWaitException extends AMQException
+{
+ public AMQConnectionWaitException(String s, Throwable e)
+ {
+ super(s, e);
+
+ }
+}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java new file mode 100644 index 0000000000..22a94d3c75 --- /dev/null +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java @@ -0,0 +1,33 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQBody;
+
+public class AMQUnexpectedBodyTypeException extends AMQException
+{
+
+ public AMQUnexpectedBodyTypeException(Class<? extends AMQBody> expectedClass, AMQBody body)
+ {
+ super("Unexpected body type. Expected: " + expectedClass.getName() + "; got: " + body.getClass().getName());
+ }
+}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java new file mode 100644 index 0000000000..721da24d53 --- /dev/null +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java @@ -0,0 +1,31 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+
+public class AMQUnexpectedFrameTypeException extends AMQException
+{
+ public AMQUnexpectedFrameTypeException(String s)
+ {
+ super(s);
+ }
+}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java index 401a54444b..b01ec491ec 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java @@ -39,7 +39,7 @@ import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.ConnectionRedirectBody; import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersionList; +import org.apache.qpid.framing.ProtocolVersion; import java.io.IOException; import java.net.InetSocketAddress; @@ -138,7 +138,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler } } - public void send(AMQDataBlock data) throws AMQException + public void send(AMQDataBlock data) throws AMQConnectionWaitException { if (_session == null) { @@ -146,9 +146,9 @@ public class MinaBrokerProxy extends Broker implements MethodHandler { _connectionMonitor.waitUntilOpen(); } - catch (Exception e) + catch (InterruptedException e) { - throw new AMQException("Failed to send " + data + ": " + e, e); + throw new AMQConnectionWaitException("Failed to send " + data + ": " + e, e); } } _session.write(data); @@ -207,7 +207,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler } else { - throw new AMQException("Client only expects method body, got: " + body); + throw new AMQUnexpectedBodyTypeException(AMQMethodBody.class, body); } } @@ -216,7 +216,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler return "MinaBrokerProxy[" + (_session == null ? super.toString() : _session.getRemoteAddress()) + "]"; } - private class MinaBinding extends IoHandlerAdapter implements ProtocolVersionList + private class MinaBinding extends IoHandlerAdapter { public void sessionCreated(IoSession session) throws Exception { @@ -228,8 +228,8 @@ public class MinaBrokerProxy extends Broker implements MethodHandler /* Find last protocol version in protocol version list. Make sure last protocol version listed in the build file (build-module.xml) is the latest version which will be used here. */ - int i = pv.length - 1; - session.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); + + session.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); } public void sessionOpened(IoSession session) throws Exception @@ -260,7 +260,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler } else { - throw new AMQException("Received message of unrecognised type: " + object); + throw new AMQUnexpectedFrameTypeException("Received message of unrecognised type: " + object); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index 111d9a8f20..f2e91083ca 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -86,9 +86,9 @@ public abstract class AMQMethodBody extends AMQBody public String toString() { - StringBuffer buf = new StringBuffer(getClass().toString()); - buf.append(" Class: ").append(getClazz()); - buf.append(" Method: ").append(getMethod()); + StringBuffer buf = new StringBuffer(getClass().getName()); + buf.append("[ Class: ").append(getClazz()); + buf.append(" Method: ").append(getMethod()).append(']'); return buf.toString(); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java index 1045b02868..8b784fa3f7 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -24,7 +24,7 @@ import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; -public class BasicContentHeaderProperties implements ContentHeaderProperties +public class BasicContentHeaderProperties implements CommonContentHeaderProperties { private static final Logger _logger = Logger.getLogger(BasicContentHeaderProperties.class); @@ -421,14 +421,14 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties } } - public AMQShortString getContentTypeShortString() + public AMQShortString getContentType() { decodeContentTypeIfNecessary(); return _contentType; } - public String getContentType() + public String getContentTypeAsString() { decodeContentTypeIfNecessary(); return _contentType == null ? null : _contentType.toString(); @@ -444,15 +444,19 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties public void setContentType(String contentType) { - clearEncodedForm(); - _propertyFlags |= (1 << 15); - _contentType = contentType == null ? null : new AMQShortString(contentType); + setContentType(contentType == null ? null : new AMQShortString(contentType)); + } + + public String getEncodingAsString() + { + + return getEncoding() == null ? null : getEncoding().toString(); } - public String getEncoding() + public AMQShortString getEncoding() { decodeIfNecessary(); - return _encoding == null ? null : _encoding.toString(); + return _encoding; } public void setEncoding(String encoding) @@ -462,6 +466,14 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties _encoding = encoding == null ? null : new AMQShortString(encoding); } + public void setEncoding(AMQShortString encoding) + { + clearEncodedForm(); + _propertyFlags |= (1 << 14); + _encoding = encoding; + } + + public FieldTable getHeaders() { decodeHeadersIfNecessary(); @@ -508,7 +520,13 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties _priority = priority; } - public String getCorrelationId() + public AMQShortString getCorrelationId() + { + decodeIfNecessary(); + return _correlationId; + } + + public String getCorrelationIdAsString() { decodeIfNecessary(); return _correlationId == null ? null : _correlationId.toString(); @@ -516,18 +534,23 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties public void setCorrelationId(String correlationId) { + setCorrelationId(correlationId == null ? null : new AMQShortString(correlationId)); + } + + public void setCorrelationId(AMQShortString correlationId) + { clearEncodedForm(); _propertyFlags |= (1 << 10); - _correlationId = correlationId == null ? null : new AMQShortString(correlationId); + _correlationId = correlationId; } - public String getReplyTo() + public String getReplyToAsString() { decodeIfNecessary(); return _replyTo == null ? null : _replyTo.toString(); } - public AMQShortString getReplyToAsShortString() + public AMQShortString getReplyTo() { decodeIfNecessary(); return _replyTo; @@ -561,7 +584,13 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties } - public String getMessageId() + public AMQShortString getMessageId() + { + decodeIfNecessary(); + return _messageId; + } + + public String getMessageIdAsString() { decodeIfNecessary(); return _messageId == null ? null : _messageId.toString(); @@ -574,6 +603,14 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties _messageId = messageId == null ? null : new AMQShortString(messageId); } + public void setMessageId(AMQShortString messageId) + { + clearEncodedForm(); + _propertyFlags |= (1 << 7); + _messageId = messageId; + } + + public long getTimestamp() { decodeIfNecessary(); @@ -587,56 +624,102 @@ public class BasicContentHeaderProperties implements ContentHeaderProperties _timestamp = timestamp; } - public String getType() + public String getTypeAsString() { decodeIfNecessary(); return _type == null ? null : _type.toString(); } + + public AMQShortString getType() + { + decodeIfNecessary(); + return _type; + } + + public void setType(String type) { + setType(type == null ? null : new AMQShortString(type)); + } + + public void setType(AMQShortString type) + { clearEncodedForm(); _propertyFlags |= (1 << 5); - _type = type == null ? null : new AMQShortString(type); + _type = type; } - public String getUserId() + public String getUserIdAsString() { decodeIfNecessary(); return _userId == null ? null : _userId.toString(); } + public AMQShortString getUserId() + { + decodeIfNecessary(); + return _userId; + } + public void setUserId(String userId) { + setUserId(userId == null ? null : new AMQShortString(userId)); + } + + public void setUserId(AMQShortString userId) + { clearEncodedForm(); _propertyFlags |= (1 << 4); - _userId = userId == null ? null : new AMQShortString(userId); + _userId = userId; } - public String getAppId() + public String getAppIdAsString() { decodeIfNecessary(); return _appId == null ? null : _appId.toString(); } + public AMQShortString getAppId() + { + decodeIfNecessary(); + return _appId; + } + public void setAppId(String appId) { + setAppId(appId == null ? null : new AMQShortString(appId)); + } + + public void setAppId(AMQShortString appId) + { clearEncodedForm(); _propertyFlags |= (1 << 3); - _appId = appId == null ? null : new AMQShortString(appId); + _appId = appId; } - public String getClusterId() + public String getClusterIdAsString() { decodeIfNecessary(); return _clusterId == null ? null : _clusterId.toString(); } + public AMQShortString getClusterId() + { + decodeIfNecessary(); + return _clusterId; + } + public void setClusterId(String clusterId) { + setClusterId(clusterId == null ? null : new AMQShortString(clusterId)); + } + + public void setClusterId(AMQShortString clusterId) + { clearEncodedForm(); _propertyFlags |= (1 << 2); - _clusterId = clusterId == null ? null : new AMQShortString(clusterId); + _clusterId = clusterId; } public String toString() diff --git a/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java new file mode 100644 index 0000000000..1641cbf4e8 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java @@ -0,0 +1,65 @@ +package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.log4j.Logger;
+
+public interface CommonContentHeaderProperties extends ContentHeaderProperties
+{
+
+ AMQShortString getContentType();
+
+ void setContentType(AMQShortString contentType);
+
+ FieldTable getHeaders();
+
+ void setHeaders(FieldTable headers);
+
+ byte getDeliveryMode();
+
+ void setDeliveryMode(byte deliveryMode);
+
+ byte getPriority();
+
+ void setPriority(byte priority);
+
+ AMQShortString getCorrelationId();
+
+ void setCorrelationId(AMQShortString correlationId);
+
+ AMQShortString getReplyTo();
+
+ void setReplyTo(AMQShortString replyTo);
+
+ long getExpiration();
+
+ void setExpiration(long expiration);
+
+ AMQShortString getMessageId();
+
+ void setMessageId(AMQShortString messageId);
+
+ long getTimestamp();
+
+ void setTimestamp(long timestamp);
+
+ AMQShortString getType();
+
+ void setType(AMQShortString type);
+
+ AMQShortString getUserId();
+
+ void setUserId(AMQShortString userId);
+
+ AMQShortString getAppId();
+
+ void setAppId(AMQShortString appId);
+
+ AMQShortString getClusterId();
+
+ void setClusterId(AMQShortString clusterId);
+
+ AMQShortString getEncoding();
+
+ void setEncoding(AMQShortString encoding);
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index 246e5ebc90..a7544c5747 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -41,10 +41,14 @@ public class FieldTable private LinkedHashMap<AMQShortString, AMQTypedValue> _properties; private long _encodedSize; private static final int INITIAL_HASHMAP_CAPACITY = 16; + private static final int INITIAL_ENCODED_FORM_SIZE = 256; public FieldTable() { super(); +// _encodedForm = ByteBuffer.allocate(INITIAL_ENCODED_FORM_SIZE); +// _encodedForm.setAutoExpand(true); +// _encodedForm.limit(0); } /** @@ -109,11 +113,28 @@ public class FieldTable private AMQTypedValue setProperty(AMQShortString key, AMQTypedValue val) { initMapIfNecessary(); - _encodedForm = null; - if(val == null) + if(_properties.containsKey(key)) + { + _encodedForm = null; + + if(val == null) + { + return removeKey(key); + } + } + else if(_encodedForm != null && val != null) + { + EncodingUtils.writeShortStringBytes(_encodedForm, key); + val.writeToBuffer(_encodedForm); + + } + else if (val == null) { - return removeKey(key); + return null; } + + + AMQTypedValue oldVal = _properties.put(key,val); if(oldVal != null) { @@ -134,7 +155,7 @@ public class FieldTable { if(_properties == null) { - if(_encodedForm == null) + if(_encodedForm == null || _encodedSize == 0) { _properties = new LinkedHashMap<AMQShortString,AMQTypedValue>(); } @@ -655,6 +676,7 @@ public class FieldTable if (trace) { _logger.trace("FieldTable::writeToBuffer: Writing encoded length of " + getEncodedSize() + "..."); + _logger.trace(_properties); } EncodingUtils.writeUnsignedInteger(buffer, getEncodedSize()); @@ -701,6 +723,7 @@ public class FieldTable public void addAll(FieldTable fieldTable) { initMapIfNecessary(); + _encodedForm = null; _properties.putAll(fieldTable._properties); recalculateEncodedSize(); } @@ -836,7 +859,13 @@ public class FieldTable if(_encodedForm != null) { - buffer.put(_encodedForm); + + if(_encodedForm.position() != 0) + { + _encodedForm.flip(); + } +// _encodedForm.limit((int)getEncodedSize()); + buffer.put(_encodedForm); } else if(_properties != null) { @@ -924,4 +953,33 @@ public class FieldTable } } + public int hashCode() + { + initMapIfNecessary(); + return _properties.hashCode(); + } + + + public boolean equals(Object o) + { + if(o == this) + { + return true; + } + if(o == null) + { + return false; + } + if(!(o instanceof FieldTable)) + { + return false; + } + + initMapIfNecessary(); + + FieldTable f = (FieldTable) o; + f.initMapIfNecessary(); + + return _properties.equals(f._properties); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index 697a0f4249..8b40fe72eb 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -25,25 +25,50 @@ import org.apache.mina.common.IoSession; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.qpid.AMQException; +import java.io.UnsupportedEncodingException; + public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock { - public char[] header = new char[]{'A','M','Q','P'}; + // TODO: generate these constants automatically from the xml protocol spec file + public static final byte[] AMQP_HEADER = new byte[]{(byte)'A',(byte)'M',(byte)'Q',(byte)'P'}; - private static byte CURRENT_PROTOCOL_CLASS = 1; - private static final int CURRENT_PROTOCOL_INSTANCE = 1; + private static final byte CURRENT_PROTOCOL_CLASS = 1; + private static final byte TCP_PROTOCOL_INSTANCE = 1; + + public final byte[] _protocolHeader; + public final byte _protocolClass; + public final byte _protocolInstance; + public final byte _protocolMajor; + public final byte _protocolMinor; - public byte protocolClass = CURRENT_PROTOCOL_CLASS; - public byte protocolInstance = CURRENT_PROTOCOL_INSTANCE; - public byte protocolMajor; - public byte protocolMinor; // public ProtocolInitiation() {} - public ProtocolInitiation(byte major, byte minor) + public ProtocolInitiation(byte[] protocolHeader, byte protocolClass, byte protocolInstance, byte protocolMajor, byte protocolMinor) + { + _protocolHeader = protocolHeader; + _protocolClass = protocolClass; + _protocolInstance = protocolInstance; + _protocolMajor = protocolMajor; + _protocolMinor = protocolMinor; + } + + public ProtocolInitiation(ProtocolVersion pv) { - protocolMajor = major; - protocolMinor = minor; + this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion()); + } + + + public ProtocolInitiation(ByteBuffer in) + { + _protocolHeader = new byte[4]; + in.get(_protocolHeader); + + _protocolClass = in.get(); + _protocolInstance = in.get(); + _protocolMajor = in.get(); + _protocolMinor = in.get(); } public long getSize() @@ -53,19 +78,12 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData public void writePayload(ByteBuffer buffer) { - for (int i = 0; i < header.length; i++) - { - buffer.put((byte) header[i]); - } - buffer.put(protocolClass); - buffer.put(protocolInstance); - buffer.put(protocolMajor); - buffer.put(protocolMinor); - } - public void populateFromBuffer(ByteBuffer buffer) throws AMQException - { - throw new AMQException("Method not implemented"); + buffer.put(_protocolHeader); + buffer.put(_protocolClass); + buffer.put(_protocolInstance); + buffer.put(_protocolMajor); + buffer.put(_protocolMinor); } public boolean equals(Object o) @@ -76,36 +94,36 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData } ProtocolInitiation pi = (ProtocolInitiation) o; - if (pi.header == null) + if (pi._protocolHeader == null) { return false; } - if (header.length != pi.header.length) + if (_protocolHeader.length != pi._protocolHeader.length) { return false; } - for (int i = 0; i < header.length; i++) + for (int i = 0; i < _protocolHeader.length; i++) { - if (header[i] != pi.header[i]) + if (_protocolHeader[i] != pi._protocolHeader[i]) { return false; } } - return (protocolClass == pi.protocolClass && - protocolInstance == pi.protocolInstance && - protocolMajor == pi.protocolMajor && - protocolMinor == pi.protocolMinor); + return (_protocolClass == pi._protocolClass && + _protocolInstance == pi._protocolInstance && + _protocolMajor == pi._protocolMajor && + _protocolMinor == pi._protocolMinor); } public static class Decoder //implements MessageDecoder { /** * - * @param session - * @param in + * @param session the session + * @param in input buffer * @return true if we have enough data to decode the PI frame fully, false if more * data is required */ @@ -115,63 +133,62 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData } public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) - throws Exception { - byte[] theHeader = new byte[4]; - in.get(theHeader); - ProtocolInitiation pi = new ProtocolInitiation((byte)0, (byte)0); - pi.header = new char[]{(char) theHeader[0],(char) theHeader[CURRENT_PROTOCOL_INSTANCE],(char) theHeader[2], (char) theHeader[3]}; - String stringHeader = new String(pi.header); - if (!"AMQP".equals(stringHeader)) - { - throw new AMQProtocolHeaderException("Invalid protocol header - read " + stringHeader); - } - pi.protocolClass = in.get(); - pi.protocolInstance = in.get(); - pi.protocolMajor = in.get(); - pi.protocolMinor = in.get(); + ProtocolInitiation pi = new ProtocolInitiation(in); out.write(pi); } } - public void checkVersion(ProtocolVersionList pvl) throws AMQException + public void checkVersion() throws AMQException { - if (protocolClass != CURRENT_PROTOCOL_CLASS) - { - throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " + - protocolClass); - } - if (protocolInstance != CURRENT_PROTOCOL_INSTANCE) + + if(_protocolHeader.length != 4) { - throw new AMQProtocolInstanceException("Protocol instance " + CURRENT_PROTOCOL_INSTANCE + " was expected; received " + - protocolInstance); + throw new AMQProtocolHeaderException("Protocol header should have exactly four octets"); } - - /* Look through list of available protocol versions */ - boolean found = false; - for (int i=0; i<pvl.pv.length; i++) + for(int i = 0; i < 4; i++) { - if (pvl.pv[i][pvl.PROTOCOL_MAJOR] == protocolMajor && - pvl.pv[i][pvl.PROTOCOL_MINOR] == protocolMinor) + if(_protocolHeader[i] != AMQP_HEADER[i]) { - found = true; + try + { + throw new AMQProtocolHeaderException("Protocol header is not correct: Got " + new String(_protocolHeader,"ISO-8859-1") + " should be: " + new String(AMQP_HEADER, "ISO-8859-1")); + } + catch (UnsupportedEncodingException e) + { + + } } } - if (!found) + if (_protocolClass != CURRENT_PROTOCOL_CLASS) + { + throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " + + _protocolClass); + } + if (_protocolInstance != TCP_PROTOCOL_INSTANCE) + { + throw new AMQProtocolInstanceException("Protocol instance " + TCP_PROTOCOL_INSTANCE + " was expected; received " + + _protocolInstance); + } + + ProtocolVersion pv = new ProtocolVersion(_protocolMajor, _protocolMinor); + + + if (!pv.isSupported()) { // TODO: add list of available versions in list to msg... throw new AMQProtocolVersionException("Protocol version " + - protocolMajor + "." + protocolMinor + " not found in protocol version list."); + _protocolMajor + "." + _protocolMinor + " not suppoerted by this version of the Qpid broker."); } } public String toString() { - StringBuffer buffer = new StringBuffer(new String(header)); - buffer.append(Integer.toHexString(protocolClass)); - buffer.append(Integer.toHexString(protocolInstance)); - buffer.append(Integer.toHexString(protocolMajor)); - buffer.append(Integer.toHexString(protocolMinor)); + StringBuffer buffer = new StringBuffer(new String(_protocolHeader)); + buffer.append(Integer.toHexString(_protocolClass)); + buffer.append(Integer.toHexString(_protocolInstance)); + buffer.append(Integer.toHexString(_protocolMajor)); + buffer.append(Integer.toHexString(_protocolMinor)); return buffer.toString(); } } diff --git a/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java b/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java index 0f706ac553..4fd1f60d69 100644 --- a/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java +++ b/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java @@ -22,8 +22,6 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; -import java.util.HashMap; - import junit.framework.TestCase; @@ -94,14 +92,14 @@ public class BasicContentHeaderPropertiesTest extends TestCase { String contentType = "contentType"; _testProperties.setContentType(contentType); - assertEquals(contentType, _testProperties.getContentType()); + assertEquals(contentType, _testProperties.getContentTypeAsString()); } public void testSetGetEncoding() { String encoding = "encoding"; _testProperties.setEncoding(encoding); - assertEquals(encoding, _testProperties.getEncoding()); + assertEquals(encoding, _testProperties.getEncodingAsString()); } public void testSetGetHeaders() @@ -128,14 +126,14 @@ public class BasicContentHeaderPropertiesTest extends TestCase { String correlationId = "correlationId"; _testProperties.setCorrelationId(correlationId); - assertEquals(correlationId, _testProperties.getCorrelationId()); + assertEquals(correlationId, _testProperties.getCorrelationIdAsString()); } public void testSetGetReplyTo() { String replyTo = "replyTo"; _testProperties.setReplyTo(replyTo); - assertEquals(replyTo, _testProperties.getReplyTo()); + assertEquals(replyTo, _testProperties.getReplyToAsString()); } public void testSetGetExpiration() @@ -149,7 +147,7 @@ public class BasicContentHeaderPropertiesTest extends TestCase { String messageId = "messageId"; _testProperties.setMessageId(messageId); - assertEquals(messageId, _testProperties.getMessageId()); + assertEquals(messageId, _testProperties.getMessageIdAsString()); } public void testSetGetTimestamp() @@ -163,28 +161,28 @@ public class BasicContentHeaderPropertiesTest extends TestCase { String type = "type"; _testProperties.setType(type); - assertEquals(type, _testProperties.getType()); + assertEquals(type, _testProperties.getTypeAsString()); } public void testSetGetUserId() { String userId = "userId"; _testProperties.setUserId(userId); - assertEquals(userId, _testProperties.getUserId()); + assertEquals(userId, _testProperties.getUserIdAsString()); } public void testSetGetAppId() { String appId = "appId"; _testProperties.setAppId(appId); - assertEquals(appId, _testProperties.getAppId()); + assertEquals(appId, _testProperties.getAppIdAsString()); } public void testSetGetClusterId() { String clusterId = "clusterId"; _testProperties.setClusterId(clusterId); - assertEquals(clusterId, _testProperties.getClusterId()); + assertEquals(clusterId, _testProperties.getClusterIdAsString()); } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java index bab7954d11..c334547b7f 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -26,6 +26,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.VersionSpecificRegistry; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; @@ -170,6 +171,11 @@ public class MockProtocolSession implements AMQProtocolSession //To change body of implemented methods use File | Settings | File Templates. } + public ProtocolOutputConverter getProtocolOutputConverter() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public byte getProtocolMajorVersion() { return 8; //To change body of implemented methods use File | Settings | File Templates. |