diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2009-10-26 20:37:36 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2009-10-26 20:37:36 +0000 |
| commit | 2bdb82202071953aaaeadced1cbc94c2dec9a0a5 (patch) | |
| tree | 1caacbea8c780ba33daa55adde3d79ff9fe66d9f /java/client/src | |
| parent | b6eb88609aea82e676f33ae8ff68918b68b81d33 (diff) | |
| download | qpid-python-2bdb82202071953aaaeadced1cbc94c2dec9a0a5.tar.gz | |
Added AMQP 0-9-1 support
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@829944 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
6 files changed, 214 insertions, 2 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index b57c834598..461f4d01c4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -308,7 +308,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ private final ExecutorService _taskPool = Executors.newCachedThreadPool(); private static final long DEFAULT_TIMEOUT = 1000 * 30; - private ProtocolVersion _protocolVersion = ProtocolVersion.v0_9; // FIXME TGM, shouldn't need this + private ProtocolVersion _protocolVersion = ProtocolVersion.v0_91; // FIXME TGM, shouldn't need this protected AMQConnectionDelegate _delegate; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java new file mode 100755 index 0000000000..1bb93f66a3 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + + +public class AMQConnectionDelegate_9_1 extends AMQConnectionDelegate_8_0 +{ + + public AMQConnectionDelegate_9_1(AMQConnection conn) + { + super(conn); + } + +}
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index bc1453beaf..862e23385a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -36,6 +36,7 @@ import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.*; +import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; @@ -186,6 +187,11 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false); _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class); } + else if(getProtocolVersion().equals(ProtocolVersion.v0_91)) + { + BasicRecoverSyncBody body = ((MethodRegistry_0_91)getMethodRegistry()).createBasicRecoverSyncBody(false); + _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class); + } else { throw new RuntimeException("Unsupported version of the AMQP Protocol: " + getProtocolVersion()); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java index 0e3333940a..909ac8aae8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java @@ -79,6 +79,16 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher } }); + + _dispatcherFactories.put(ProtocolVersion.v0_91, + new DispatcherFactory() + { + public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session) + { + return new ClientMethodDispatcherImpl_0_91(session); + } + }); + } public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQProtocolSession session) diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java new file mode 100644 index 0000000000..f15340ae00 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java @@ -0,0 +1,158 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.handler; + +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.amqp_0_91.MethodDispatcher_0_91; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.AMQMethodNotImplementedException; +import org.apache.qpid.client.protocol.AMQProtocolSession; + +public class ClientMethodDispatcherImpl_0_91 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_91 +{ + public ClientMethodDispatcherImpl_0_91(AMQProtocolSession session) + { + super(session); + } + + public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException + { + return false; + } + +}
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 6500a82818..65d3fa9a76 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.mina.filter.codec.ProtocolCodecException; import org.apache.qpid.AMQConnectionClosedException; @@ -124,6 +125,8 @@ public class AMQProtocolHandler implements ProtocolEngine private static final Logger _protocolLogger = LoggerFactory.getLogger("qpid.protocol"); private static final boolean PROTOCOL_DEBUG = (System.getProperty("amqj.protocol.logging.level") != null); + private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000")); + /** * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection * instances and protocol handler instances. @@ -736,7 +739,10 @@ public class AMQProtocolHandler implements ProtocolEngine { if (_failoverLatch != null) { - _failoverLatch.await(); + if(!_failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS)) + { + + } } } } |
