From 0f9044243547ded8521af0c8d0ff81d791d8048d Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Tue, 27 Mar 2007 16:31:05 +0000 Subject: This is the initial checkup for the new client git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@522988 13f79535-47bb-0310-9956-ffa450edef68 --- java/newclient/src/main/java/client.log4j | 28 ++ .../org/apache/qpid/nclient/amqp/AMQPCallBack.java | 23 ++ .../qpid/nclient/amqp/AMQPCallBackSupport.java | 64 ++++ .../org/apache/qpid/nclient/amqp/AMQPChannel.java | 284 ++++++++++++++ .../apache/qpid/nclient/amqp/AMQPConnection.java | 348 +++++++++++++++++ .../org/apache/qpid/nclient/amqp/AMQPExchange.java | 88 +++++ .../org/apache/qpid/nclient/amqp/AMQPMessage.java | 232 ++++++++++++ .../qpid/nclient/amqp/AMQPMessageCallBack.java | 78 ++++ .../org/apache/qpid/nclient/amqp/AMQPQueue.java | 117 ++++++ .../org/apache/qpid/nclient/amqp/EventManager.java | 129 +++++++ .../qpid/nclient/amqp/sample/SecurityHelper.java | 54 +++ .../qpid/nclient/amqp/sample/TestClient.java | 93 +++++ .../apache/qpid/nclient/amqp/state/AMQPState.java | 59 +++ .../qpid/nclient/amqp/state/AMQPStateListener.java | 8 + .../qpid/nclient/amqp/state/AMQPStateMachine.java | 26 ++ .../qpid/nclient/amqp/state/AMQPStateManager.java | 11 + .../qpid/nclient/amqp/state/AMQPStateType.java | 50 +++ .../state/IllegalStateTransitionException.java | 47 +++ .../qpid/nclient/config/ClientConfiguration.java | 84 +++++ .../java/org/apache/qpid/nclient/config/client.xml | 86 +++++ .../apache/qpid/nclient/core/AMQPException.java | 55 +++ .../apache/qpid/nclient/core/AbstractPhase.java | 97 +++++ .../qpid/nclient/core/DefaultPhaseContext.java | 20 + .../java/org/apache/qpid/nclient/core/Phase.java | 38 ++ .../org/apache/qpid/nclient/core/PhaseContext.java | 35 ++ .../org/apache/qpid/nclient/core/PhaseFactory.java | 60 +++ .../apache/qpid/nclient/core/QpidConstants.java | 66 ++++ .../qpid/nclient/core/TransientPhaseContext.java | 19 + .../qpid/nclient/execution/ExecutionPhase.java | 151 ++++++++ .../qpid/nclient/execution/RequestManager.java | 129 +++++++ .../qpid/nclient/execution/ResponseManager.java | 240 ++++++++++++ .../nclient/message/AMQPApplicationMessage.java | 124 +++++++ .../apache/qpid/nclient/message/MessagePhase.java | 64 ++++ .../apache/qpid/nclient/message/MessageStore.java | 17 + .../nclient/message/TransientMessageStore.java | 40 ++ .../apache/qpid/nclient/model/AMQPMethodEvent.java | 64 ++++ .../qpid/nclient/model/AMQPMethodListener.java | 11 + .../org/apache/qpid/nclient/model/ModelPhase.java | 133 +++++++ .../qpid/nclient/security/AMQPCallbackHandler.java | 30 ++ .../nclient/security/CallbackHandlerRegistry.java | 98 +++++ .../nclient/security/DynamicSaslRegistrar.java | 75 ++++ .../apache/qpid/nclient/security/JCAProvider.java | 46 +++ .../security/UsernamePasswordCallbackHandler.java | 60 +++ .../qpid/nclient/transport/AMQPBrokerDetails.java | 344 +++++++++++++++++ .../qpid/nclient/transport/AMQPConnectionURL.java | 412 +++++++++++++++++++++ .../qpid/nclient/transport/BrokerDetails.java | 73 ++++ .../qpid/nclient/transport/ConnectionURL.java | 77 ++++ .../qpid/nclient/transport/TCPConnection.java | 82 ++++ .../nclient/transport/TransportConnection.java | 34 ++ .../transport/TransportConnectionFactory.java | 44 +++ .../qpid/nclient/transport/TransportPhase.java | 263 +++++++++++++ .../qpid/nclient/transport/VMConnection.java | 133 +++++++ .../apache/qpid/nclient/util/AMQPValidator.java | 14 + 53 files changed, 5057 insertions(+) create mode 100644 java/newclient/src/main/java/client.log4j create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/amqp/EventManager.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java create mode 100644 java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java (limited to 'java/newclient/src') diff --git a/java/newclient/src/main/java/client.log4j b/java/newclient/src/main/java/client.log4j new file mode 100644 index 0000000000..525433e9a9 --- /dev/null +++ b/java/newclient/src/main/java/client.log4j @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +log4j.rootLogger=${root.logging.level} + + +log4j.logger.org.apache.qpid=${amqj.logging.level}, console +log4j.additivity.org.apache.qpid=false + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=all +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java new file mode 100644 index 0000000000..49575be8d2 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java @@ -0,0 +1,23 @@ +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; + +public abstract class AMQPCallBack +{ + private boolean _isComplete = false; + + public abstract void brokerResponded(AMQMethodBody body); + + public abstract void brokerRespondedWithError(AMQException e); + + public void setIsComplete(boolean isComplete) + { + _isComplete = isComplete; + } + + public boolean isComplete() + { + return _isComplete; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java new file mode 100644 index 0000000000..890b0dd6eb --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java @@ -0,0 +1,64 @@ +package org.apache.qpid.nclient.amqp; + +import java.security.SecureRandom; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.model.AMQPMethodEvent; + +public abstract class AMQPCallBackSupport +{ + private SecureRandom _localCorrelationIdGenerator = new SecureRandom(); + protected ConcurrentHashMap _cbMap = new ConcurrentHashMap(); + + //the channelId assigned for this instance + protected int _channelId; + + public AMQPCallBackSupport(int channelId) + { + _channelId = channelId; + } + + private long getNextCorrelationId() + { + return _localCorrelationIdGenerator.nextLong(); + } + + + // For methods that still use nowait, hopefully they will remove nowait + protected AMQPMethodEvent handleNoWait(boolean noWait,AMQMethodBody methodBody,AMQPCallBack cb) + { + if(noWait) + { + // u only need to register if u are expecting a response + long localCorrelationId = getNextCorrelationId(); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId); + _cbMap.put(localCorrelationId, cb); + return msg; + } + else + { + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID); + return msg; + } + } + + protected AMQPMethodEvent handleAsynchronousCall(AMQMethodBody methodBody,AMQPCallBack cb) + { + long localCorrelationId = getNextCorrelationId(); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId); + _cbMap.put(localCorrelationId, cb); + return msg; + } + + protected void invokeCallBack(long localCorrelationId, AMQMethodBody methodBody) + { + if(_cbMap.contains(localCorrelationId)) + { + AMQPCallBack cb = (AMQPCallBack)_cbMap.get(localCorrelationId); + cb.brokerResponded(methodBody); + } + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java new file mode 100644 index 0000000000..d86f948e28 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java @@ -0,0 +1,284 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.nclient.amqp; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ChannelFlowBody; +import org.apache.qpid.framing.ChannelFlowOkBody; +import org.apache.qpid.framing.ChannelOkBody; +import org.apache.qpid.framing.ChannelOpenBody; +import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.ChannelResumeBody; +import org.apache.qpid.nclient.amqp.state.AMQPState; +import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.model.AMQPMethodEvent; +import org.apache.qpid.nclient.model.AMQPMethodListener; +import org.apache.qpid.nclient.util.AMQPValidator; + +/** + * This represents the Channel class defined in the AMQP protocol. + * This class is a finite state machine and is thread safe by design. + * Only valid state changes are allowed or else an IllegalStateTransitionException will be thrown. + * Only one thread can enter the methods that change state, at a given time. + * The AMQP protocol recommends one thread per channel by design. + * + * A JMS Session can wrap an instance of this class. + */ + +public class AMQPChannel extends AMQPStateMachine implements AMQPMethodListener +{ +private static final Logger _logger = Logger.getLogger(AMQPChannel.class); + + //the channelId assigned for this channel + private int _channelId; + private Phase _phase; + private AMQPState _currentState; + private final AMQPState[] _validCloseStates = new AMQPState[]{AMQPState.CHANNEL_OPENED,AMQPState.CHANNEL_SUSPEND}; + private final AMQPState[] _validResumeStates = new AMQPState[]{AMQPState.CHANNEL_CLOSED,AMQPState.CHANNEL_NOT_OPENED}; + + // The wait period until a server sends a respond + private long _serverTimeOut = 1000; + private final Lock _lock = new ReentrantLock(); + private final Condition _channelNotOpend = _lock.newCondition(); + private final Condition _channelNotClosed = _lock.newCondition(); + private final Condition _channelFlowNotResponded = _lock.newCondition(); + private final Condition _channelNotResumed = _lock.newCondition(); + + private ChannelOpenOkBody _channelOpenOkBody; + private ChannelCloseOkBody _channelCloseOkBody; + private ChannelFlowOkBody _channelFlowOkBody; + private ChannelOkBody _channelOkBody; + + public AMQPChannel(int channelId) + { + _channelId = channelId; + _currentState = AMQPState.CHANNEL_NOT_OPENED; + _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); + } + + /**------------------------------------------- + * API Methods + *-------------------------------------------- + */ + + /** + * Opens the channel + */ + public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException + { + _lock.lock(); + try { + _channelOpenOkBody = null; + checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED,_currentState,AMQPState.CHANNEL_OPENED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelOpenBody,QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + _channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS); + AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time"); + _currentState = AMQPState.CHANNEL_OPENED; + return _channelOpenOkBody; + } + catch(Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + + /** + * Close the channel + */ + public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException + { + _lock.lock(); + try { + _channelCloseOkBody = null; + checkIfValidStateTransition(_validCloseStates,_currentState,AMQPState.CHANNEL_CLOSED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelCloseBody,QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + _channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); + AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time"); + _currentState = AMQPState.CHANNEL_CLOSED; + return _channelCloseOkBody; + } + catch(Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + + /** + * Channel Flow + */ + public ChannelFlowOkBody close(ChannelFlowBody channelFlowBody) throws AMQPException + { + _lock.lock(); + try { + _channelFlowOkBody = null; + if(channelFlowBody.active) + { + checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND,_currentState,AMQPState.CHANNEL_OPENED); + } + else + { + checkIfValidStateTransition(AMQPState.CHANNEL_OPENED,_currentState,AMQPState.CHANNEL_SUSPEND); + } + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelFlowBody,QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + _channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS); + AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time"); + handleChannelFlowState(_channelFlowOkBody.active); + return _channelFlowOkBody; + } + catch(Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + + /** + * Close the channel + */ + public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException + { + _lock.lock(); + try { + _channelOkBody = null; + checkIfValidStateTransition(_validResumeStates,_currentState,AMQPState.CHANNEL_OPENED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelResumeBody,QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + _channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS); + AMQPValidator.throwExceptionOnNull(_channelOkBody, "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time"); + _currentState = AMQPState.CHANNEL_OPENED; + return _channelOkBody; + } + catch(Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + + /**------------------------------------------- + * AMQPMethodListener methods + *-------------------------------------------- + */ + public boolean methodReceived(AMQPMethodEvent evt) throws AMQPException + { + if (evt.getMethod() instanceof ChannelOpenOkBody) + { + _channelOpenOkBody = (ChannelOpenOkBody)evt.getMethod(); + _channelNotOpend.signal(); + return true; + } + else if (evt.getMethod() instanceof ChannelCloseOkBody) + { + _channelCloseOkBody = (ChannelCloseOkBody)evt.getMethod(); + _channelNotClosed.signal(); + return true; + } + else if (evt.getMethod() instanceof ChannelCloseBody) + { + handleChannelClose((ChannelCloseBody)evt.getMethod()); + return true; + } + else if (evt.getMethod() instanceof ChannelFlowOkBody) + { + _channelFlowOkBody = (ChannelFlowOkBody)evt.getMethod(); + _channelFlowNotResponded.signal(); + return true; + } + else if (evt.getMethod() instanceof ChannelFlowBody) + { + handleChannelFlow((ChannelFlowBody)evt.getMethod()); + return true; + } + else if (evt.getMethod() instanceof ChannelOkBody) + { + _channelOkBody = (ChannelOkBody)evt.getMethod(); + //In this case the only method expecting channel-ok is channel-resume + // haven't implemented ping and pong. + _channelNotResumed.signal(); + return true; + } + else + { + return false; + } + } + + private void handleChannelClose(ChannelCloseBody channelCloseBody) + { + try + { + _lock.lock(); + _currentState = AMQPState.CHANNEL_CLOSED; + } + finally + { + _lock.unlock(); + } + } + + private void handleChannelFlow(ChannelFlowBody channelFlowBody) + { + _lock.lock(); + try + { + handleChannelFlowState(channelFlowBody.active); + } + finally + { + _lock.unlock(); + } + } + + private void handleChannelFlowState(boolean flow) + { + _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java new file mode 100644 index 0000000000..9c9e913cd3 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java @@ -0,0 +1,348 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionCloseOkBody; +import org.apache.qpid.framing.ConnectionOpenBody; +import org.apache.qpid.framing.ConnectionOpenOkBody; +import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.framing.ConnectionSecureOkBody; +import org.apache.qpid.framing.ConnectionStartBody; +import org.apache.qpid.framing.ConnectionStartOkBody; +import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.nclient.amqp.state.AMQPState; +import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.model.AMQPMethodEvent; +import org.apache.qpid.nclient.model.AMQPMethodListener; +import org.apache.qpid.nclient.transport.TransportConnection; +import org.apache.qpid.nclient.util.AMQPValidator; + +/** + * This maps directly to the Connection class defined in the AMQP protocol This class is a finite state machine and is + * thread safe by design A particular method (state changing) can only be invoked once and only in sequence or else an + * IllegalStateTransitionException will be thrown Also only one thread can enter those methods at a given time. + */ +public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListener +{ + private static final Logger _logger = Logger.getLogger(AMQPConnection.class); + + private Phase _phase; + + private TransportConnection _connection; + + private long _correlationId; + + private AMQPState _currentState; + + private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CONNECTION_NOT_STARTED, + AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED, + AMQPState.CONNECTION_OPEN, }; + + // The wait period until a server sends a respond + private long _serverTimeOut = 1000; + + private final Lock _lock = new ReentrantLock(); + + private final Condition _connectionNotStarted = _lock.newCondition(); + + private final Condition _connectionNotSecure = _lock.newCondition(); + + private final Condition _connectionNotTuned = _lock.newCondition(); + + private final Condition _connectionNotOpened = _lock.newCondition(); + + private final Condition _connectionNotClosed = _lock.newCondition(); + + private ConnectionStartBody _connectionStartBody; + + private ConnectionSecureBody _connectionSecureBody; + + private ConnectionTuneBody _connectionTuneBody; + + private ConnectionOpenOkBody _connectionOpenOkBody; + + private ConnectionCloseOkBody _connectionCloseOkBody; + + public AMQPConnection(TransportConnection connection) + { + _connection = connection; + _currentState = AMQPState.CONNECTION_UNDEFINED; + _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); + } + + /** + * ------------------------------------------- API Methods -------------------------------------------- + */ + + /** + * Opens the TCP connection and let the formalities begin. + */ + public ConnectionStartBody openTCPConnection() throws AMQPException + { + _lock.lock(); + // open the TCP connection + try + { + _connectionStartBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED); + _phase = _connection.connect(); + + // waiting for ConnectionStartBody or error in connection + _connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS); + AMQPValidator.throwExceptionOnNull(_connectionStartBody, + "The broker didn't send the ConnectionStartBody in time"); + _currentState = AMQPState.CONNECTION_NOT_STARTED; + return _connectionStartBody; + } + catch (Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + + public ConnectionSecureBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException + { + _lock.lock(); + try + { + _connectionSecureBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, + AMQPState.CONNECTION_NOT_SECURE); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId); + _phase.messageSent(msg); + _connectionNotSecure.await(_serverTimeOut, TimeUnit.MILLISECONDS); + AMQPValidator.throwExceptionOnNull(_connectionSecureBody, + "The broker didn't send the ConnectionSecureBody in time"); + _currentState = AMQPState.CONNECTION_NOT_SECURE; + return _connectionSecureBody; + } + catch (Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + + /** + * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could + * issue a new challenge + */ + public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException + { + _lock.lock(); + try + { + _connectionTuneBody = null; + _connectionSecureBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED); + _connectionSecureBody = null; // The server could send a fresh challenge + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, + _correlationId); + _phase.messageSent(msg); + _connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS); + if (_connectionTuneBody != null) + { + _currentState = AMQPState.CONNECTION_NOT_TUNED; + return _connectionTuneBody; + } + else if (_connectionSecureBody != null) + { // oops the server sent another challenge + _currentState = AMQPState.CONNECTION_NOT_SECURE; + return _connectionSecureBody; + } + else + { + throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); + } + } + catch (Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + + public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException + { + _lock.lock(); + try + { + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED); + _connectionSecureBody = null; + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId); + _phase.messageSent(msg); + _currentState = AMQPState.CONNECTION_NOT_OPENED; + } + catch (Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + + public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException + { + _lock.lock(); + try + { + _connectionOpenOkBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, + QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + _connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS); + AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, + "The broker didn't send the ConnectionOpenOkBody in time"); + _currentState = AMQPState.CONNECTION_OPEN; + return _connectionOpenOkBody; + } + catch (Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + + public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException + { + _lock.lock(); + try + { + _connectionCloseOkBody = null; + checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, + QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); + AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, + "The broker didn't send the ConnectionCloseOkBody in time"); + _currentState = AMQPState.CONNECTION_CLOSED; + return _connectionCloseOkBody; + } + catch (Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + + /** + * ------------------------------------------- AMQMethodListener methods + * -------------------------------------------- + */ + public boolean methodReceived(AMQPMethodEvent evt) throws AMQPException + { + _correlationId = evt.getCorrelationId(); + + if (evt.getMethod() instanceof ConnectionStartBody) + { + _connectionStartBody = (ConnectionStartBody) evt.getMethod(); + _connectionNotStarted.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionSecureBody) + { + _connectionSecureBody = (ConnectionSecureBody) evt.getMethod(); + _connectionNotSecure.signal(); + _connectionNotTuned.signal(); // in case the server has sent another chanllenge + return true; + } + else if (evt.getMethod() instanceof ConnectionTuneBody) + { + _connectionTuneBody = (ConnectionTuneBody) evt.getMethod(); + _connectionNotTuned.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionOpenOkBody) + { + _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod(); + _connectionNotOpened.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionCloseOkBody) + { + _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod(); + _connectionNotClosed.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionCloseBody) + { + handleClose(); + return true; + } + else + { + return false; + } + } + + public void handleClose() throws AMQPException + { + _lock.lock(); + try + { + checkIfValidStateTransition(AMQPState.CONNECTION_OPEN, _currentState, AMQPState.CONNECTION_CLOSING); + _currentState = AMQPState.CONNECTION_CLOSING; + // do the required cleanup and send a ConnectionCloseOkBody + } + catch (Exception e) + { + throw new AMQPException("XXX"); + } + finally + { + _lock.unlock(); + } + } + +} \ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java new file mode 100644 index 0000000000..5315f7f318 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.ExchangeDeclareOkBody; +import org.apache.qpid.framing.ExchangeDeleteBody; +import org.apache.qpid.framing.ExchangeDeleteOkBody; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.model.AMQPMethodEvent; +import org.apache.qpid.nclient.model.AMQPMethodListener; + +/** + * + * This class represents the Exchange class defined in AMQP. + * Each method takes an @see AMQPCallBack object if it wants to know + * the response from the broker to particular method. + * Clients can handle the reponse asynchronously or block for a response + * using AMQPCallBack.isComplete() periodically using a loop. + */ +public class AMQPExchange extends AMQPCallBackSupport implements AMQPMethodListener +{ + private Phase _phase; + + public AMQPExchange(int channelId,Phase phase) + { + super(channelId); + _phase = phase; + } + + /** + * ----------------------------------------------- + * API Methods + * ----------------------------------------------- + */ + public void declare(ExchangeDeclareBody exchangeDeclareBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(exchangeDeclareBody.nowait,exchangeDeclareBody,cb); + _phase.messageSent(msg); + } + + public void delete(ExchangeDeleteBody exchangeDeleteBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(exchangeDeleteBody.nowait,exchangeDeleteBody,cb); + _phase.messageSent(msg); + } + + + /**------------------------------------------- + * AMQPMethodListener methods + *-------------------------------------------- + */ + public boolean methodReceived(AMQPMethodEvent evt) throws AMQPException + { + long localCorrelationId = evt.getLocalCorrelationId(); + AMQMethodBody methodBody = evt.getMethod(); + if ( methodBody instanceof ExchangeDeclareOkBody || methodBody instanceof ExchangeDeleteOkBody) + { + invokeCallBack(localCorrelationId,methodBody); + return true; + } + else + { + return false; + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java new file mode 100644 index 0000000000..e3ad9d6306 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java @@ -0,0 +1,232 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.MessageAppendBody; +import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.framing.MessageCheckpointBody; +import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.framing.MessageEmptyBody; +import org.apache.qpid.framing.MessageGetBody; +import org.apache.qpid.framing.MessageOffsetBody; +import org.apache.qpid.framing.MessageOkBody; +import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.framing.MessageQosBody; +import org.apache.qpid.framing.MessageRecoverBody; +import org.apache.qpid.framing.MessageRejectBody; +import org.apache.qpid.framing.MessageResumeBody; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.model.AMQPMethodEvent; +import org.apache.qpid.nclient.model.AMQPMethodListener; + +/** + * This class represents the AMQP Message class. + * You need an instance of this class per channel. + * A @see AMQPMessageCallBack class is taken as an argument in the constructor. + * A client can use this class to issue Message class methods on the broker. + * When the broker issues Message class methods on the client, the client is notified + * via the AMQPMessageCallBack interface. + * + * A JMS Message producer implementation can wrap an instance if this and map + * JMS method calls to the appropriate AMQP methods. + * + * AMQPMessageCallBack can be implemented by the JMS MessageConsumer implementation. + * + */ +public class AMQPMessage extends AMQPCallBackSupport implements AMQPMethodListener +{ + private Phase _phase; + private AMQPMessageCallBack _messageCb; + + public AMQPMessage(int channelId,Phase phase,AMQPMessageCallBack messageCb) + { + super(channelId); + _phase = phase; + _messageCb = messageCb; + } + + /** + * ----------------------------------------------- + * API Methods + * ----------------------------------------------- + */ + + public void transfer(MessageTransferBody messageTransferBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageTransferBody,cb); + _phase.messageSent(msg); + } + + public void consume(MessageCancelBody messageCancelBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageCancelBody,cb); + _phase.messageSent(msg); + } + + public void cancel(MessageCancelBody messageCancelBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageCancelBody,cb); + _phase.messageSent(msg); + } + + public void get(MessageGetBody messageGetBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageGetBody,cb); + _phase.messageSent(msg); + } + + public void recover(MessageRecoverBody messageRecoverBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageRecoverBody,cb); + _phase.messageSent(msg); + } + + public void open(MessageOpenBody messageOpenBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageOpenBody,cb); + _phase.messageSent(msg); + } + + public void close(MessageCloseBody messageCloseBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageCloseBody,cb); + _phase.messageSent(msg); + } + + public void append(MessageAppendBody messageAppendBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageAppendBody,cb); + _phase.messageSent(msg); + } + + public void checkpoint(MessageCheckpointBody messageCheckpointBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageCheckpointBody,cb); + _phase.messageSent(msg); + } + + public void resume(MessageResumeBody messageResumeBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageResumeBody,cb); + _phase.messageSent(msg); + } + + public void qos(MessageQosBody messageQosBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageQosBody,cb); + _phase.messageSent(msg); + } + + /** + * The correlationId from the request. + * For example if a message.transfer is sent with correlationId "ABCD" + * then u need to pass that in. This correlation id is used by the execution layer + * to handle the correlation of method requests and responses + */ + public void ok(MessageOkBody messageOkBody,long correlationId) throws AMQPException + { + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOkBody,correlationId); + _phase.messageSent(msg); + } + + /** + * The correlationId from the request. + * For example if a message.transfer is sent with correlationId "ABCD" + * then u need to pass that in. This correlation id is used by the execution layer + * to handle the correlation of method requests and responses + */ + public void reject(MessageRejectBody messageRejectBody,long correlationId) throws AMQPException + { + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageRejectBody,correlationId); + _phase.messageSent(msg); + } + + /** + * The correlationId from the request. + * For example if a message.resume is sent with correlationId "ABCD" + * then u need to pass that in. This correlation id is used by the execution layer + * to handle the correlation of method requests and responses + */ + public void offset(MessageOffsetBody messageOffsetBody,long correlationId) throws AMQPException + { + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOffsetBody,correlationId); + _phase.messageSent(msg); + } + + /**------------------------------------------- + * AMQPMethodListener methods + *-------------------------------------------- + */ + public boolean methodReceived(AMQPMethodEvent evt) throws AMQPException + { + long localCorrelationId = evt.getLocalCorrelationId(); + AMQMethodBody methodBody = evt.getMethod(); + if ( methodBody instanceof MessageOkBody || + methodBody instanceof MessageRejectBody || + methodBody instanceof MessageEmptyBody) + { + invokeCallBack(localCorrelationId,methodBody); + return true; + } + else if (methodBody instanceof MessageTransferBody) + { + _messageCb.transfer((MessageTransferBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageAppendBody) + { + _messageCb.append((MessageAppendBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageOpenBody) + { + _messageCb.open((MessageOpenBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageCloseBody) + { + _messageCb.close((MessageCloseBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageCheckpointBody) + { + _messageCb.checkpoint((MessageCheckpointBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageRecoverBody) + { + _messageCb.recover((MessageRecoverBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageResumeBody) + { + _messageCb.resume((MessageResumeBody)methodBody, evt.getCorrelationId()); + return true; + } + else + { + return false; + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java new file mode 100644 index 0000000000..183ed9dba8 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.framing.MessageAppendBody; +import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.framing.MessageCheckpointBody; +import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.framing.MessageGetBody; +import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.framing.MessageRecoverBody; +import org.apache.qpid.framing.MessageResumeBody; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.nclient.core.AMQPException; + +/** + * This class also represents the AMQP Message class. + * You need an instance per channel. + * This is passed in as an argument in the constructor of an AMQPMessage instance. + * A client who implements this interface is notified When the broker issues + * Message class methods on the client. + * + * A Client should use the AMQPMessage class when it wants to issue Message class + * methods on the broker. + * + * A JMS MessageConsumer implementation can implement this interface and map + * AMQP Method notifications to the appropriate JMS methods. + * + * Simillarly a JMS MessageProducer implementation can wrap an AMQPMessage instance. + * + */ + +public interface AMQPMessageCallBack +{ + /** + * ----------------------------------------------------------------------- + * This provides Notifications for broker initiated Message class methods. + * All methods have a correlationId that u need to pass into + * the corresponding Message methods when responding to the broker. + * + * For example the correlationID passed in from Message.trasnfer + * should be passed back when u call Message.ok in AMQPMessage + * ----------------------------------------------------------------------- + */ + + + public void transfer(MessageTransferBody messageTransferBody,long correlationId) throws AMQPException; + + public void recover(MessageRecoverBody messageRecoverBody,long correlationId) throws AMQPException; + + public void open(MessageOpenBody messageOpenBody,long correlationId) throws AMQPException ; + + public void close(MessageCloseBody messageCloseBody,long correlationId) throws AMQPException; + + public void append(MessageAppendBody messageAppendBody,long correlationId) throws AMQPException; + + public void checkpoint(MessageCheckpointBody messageCheckpointBody,long correlationId) throws AMQPException; + + public void resume(MessageResumeBody messageResumeBody,long correlationId) throws AMQPException; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java new file mode 100644 index 0000000000..a5fe6de298 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java @@ -0,0 +1,117 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.QueueBindBody; +import org.apache.qpid.framing.QueueBindOkBody; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeclareOkBody; +import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.framing.QueuePurgeBody; +import org.apache.qpid.framing.QueuePurgeOkBody; +import org.apache.qpid.framing.QueueUnbindBody; +import org.apache.qpid.framing.QueueUnbindOkBody; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.model.AMQPMethodEvent; +import org.apache.qpid.nclient.model.AMQPMethodListener; + +/** + * + * This class represents the Queue class defined in AMQP. + * Each method takes an @see AMQPCallBack object if it wants to know + * the response from the broker to a particular method. + * Clients can handle the reponse asynchronously or block for a response + * using AMQPCallBack.isComplete() periodically using a loop. + */ +public class AMQPQueue extends AMQPCallBackSupport implements AMQPMethodListener +{ + private Phase _phase; + + public AMQPQueue(int channelId,Phase phase) + { + super(channelId); + _phase = phase; + } + + /** + * ----------------------------------------------- + * API Methods + * ----------------------------------------------- + */ + public void declare(QueueDeclareBody queueDeclareBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(queueDeclareBody.nowait,queueDeclareBody,cb); + _phase.messageSent(msg); + } + + public void bind(QueueBindBody queueBindBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(queueBindBody.nowait,queueBindBody,cb); + _phase.messageSent(msg); + } + + // Queue.unbind doesn't have nowait + public void unbind(QueueUnbindBody queueUnbindBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(queueUnbindBody,cb); + _phase.messageSent(msg); + } + + public void purge(QueuePurgeBody queuePurgeBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(queuePurgeBody.nowait,queuePurgeBody,cb); + _phase.messageSent(msg); + } + + public void delete(QueueDeleteBody queueDeleteBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(queueDeleteBody.nowait,queueDeleteBody,cb); + _phase.messageSent(msg); + } + + + /**------------------------------------------- + * AMQPMethodListener methods + *-------------------------------------------- + */ + public boolean methodReceived(AMQPMethodEvent evt) throws AMQPException + { + long localCorrelationId = evt.getLocalCorrelationId(); + AMQMethodBody methodBody = evt.getMethod(); + if ( methodBody instanceof QueueDeclareOkBody || + methodBody instanceof QueueBindOkBody || + methodBody instanceof QueueUnbindOkBody || + methodBody instanceof QueuePurgeOkBody || + methodBody instanceof QueueDeleteOkBody + ) + { + invokeCallBack(localCorrelationId,methodBody); + return true; + } + else + { + return false; + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/EventManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/EventManager.java new file mode 100644 index 0000000000..38421cfca3 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/EventManager.java @@ -0,0 +1,129 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.model.AMQPMethodEvent; +import org.apache.qpid.nclient.model.AMQPMethodListener; + +/** + * This class registeres with the ModelPhase as a AMQMethodListener, + * to receive method events and then it distributes methods to other listerners + * using a filtering criteria. The criteria is channel id and method body class. + * The method listeners are added and removed dynamically + * + *

+ */ +public class EventManager implements AMQPMethodListener +{ + private static final Logger _logger = Logger.getLogger(EventManager.class); + + private Map _channelMap = new ConcurrentHashMap(); + + /** + * ------------------------------------------------ + * methods introduced by AMQPMethodEventManager + * ------------------------------------------------ + */ + public void addMethodEventListener(int channelId,Class clazz,AMQPMethodListener l) + { + Map _methodListenerMap; + if (_channelMap.containsKey(channelId)) + { + _methodListenerMap = _channelMap.get(channelId); + + } + else + { + _methodListenerMap = new ConcurrentHashMap(); + _channelMap.put(channelId, _methodListenerMap); + } + + List _listeners; + if (_methodListenerMap.containsKey(clazz)) + { + _listeners = _methodListenerMap.get(clazz); + + } + else + { + _listeners = new ArrayList(); + _methodListenerMap.put(clazz, _listeners); + } + + _listeners.add(l); + + } + + public void removeMethodEventListener(int channelId,Class clazz,AMQPMethodListener l) + { + if (_channelMap.containsKey(channelId)) + { + Map _methodListenerMap = _channelMap.get(channelId); + + if (_methodListenerMap.containsKey(clazz)) + { + List _listeners = _methodListenerMap.get(clazz); + _listeners.remove(l); + } + + } + } + + + /** + * ------------------------------------------------ + * methods introduced by AMQMethodListener + * ------------------------------------------------ + */ + /* (non-Javadoc) + * @see org.apache.qpid.nclient.model.AMQStateManager#methodReceived(org.apache.qpid.protocol.AMQMethodEvent) + */ + public boolean methodReceived(AMQPMethodEvent evt) throws AMQPException + { + if (_channelMap.containsKey(evt.getChannelId())) + { + Map _methodListenerMap = _channelMap.get(evt.getChannelId()); + + if (_methodListenerMap.containsKey(evt.getMethod().getClass())) + { + + List _listeners = _methodListenerMap.get(evt.getMethod().getClass()); + for (AMQPMethodListener l:_listeners) + { + l.methodReceived(evt); + } + + return (_listeners.size()>0); + } + + } + + return false; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java new file mode 100644 index 0000000000..8c4cb56971 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java @@ -0,0 +1,54 @@ +package org.apache.qpid.nclient.amqp.sample; + +import java.io.UnsupportedEncodingException; +import java.util.HashSet; +import java.util.StringTokenizer; + +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.security.AMQPCallbackHandler; +import org.apache.qpid.nclient.security.CallbackHandlerRegistry; +import org.apache.qpid.nclient.transport.ConnectionURL; + +public class SecurityHelper +{ + public static String chooseMechanism(byte[] availableMechanisms) throws UnsupportedEncodingException + { + final String mechanisms = new String(availableMechanisms, "utf8"); + StringTokenizer tokenizer = new StringTokenizer(mechanisms, " "); + HashSet mechanismSet = new HashSet(); + while (tokenizer.hasMoreTokens()) + { + mechanismSet.add(tokenizer.nextToken()); + } + + String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms(); + StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " "); + while (prefTokenizer.hasMoreTokens()) + { + String mech = prefTokenizer.nextToken(); + if (mechanismSet.contains(mech)) + { + return mech; + } + } + return null; + } + + public static AMQPCallbackHandler createCallbackHandler(String mechanism, ConnectionURL url) + throws AMQPException + { + Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism); + try + { + Object instance = mechanismClass.newInstance(); + AMQPCallbackHandler cbh = (AMQPCallbackHandler) instance; + cbh.initialise(url); + return cbh; + } + catch (Exception e) + { + throw new AMQPException("Unable to create callback handler: " + e, e); + } + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java new file mode 100644 index 0000000000..69d2564112 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java @@ -0,0 +1,93 @@ +package org.apache.qpid.nclient.amqp.sample; + +import java.util.StringTokenizer; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; + +import org.apache.qpid.common.ClientProperties; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.framing.ConnectionSecureOkBody; +import org.apache.qpid.framing.ConnectionStartBody; +import org.apache.qpid.framing.ConnectionStartOkBody; +import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; +import org.apache.qpid.nclient.amqp.AMQPConnection; +import org.apache.qpid.nclient.transport.AMQPConnectionURL; +import org.apache.qpid.nclient.transport.ConnectionURL; +import org.apache.qpid.nclient.transport.TransportConnection; +import org.apache.qpid.nclient.transport.TransportConnectionFactory; +import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType; + +/** + * This class illustrates the usage of the API + * Notes this is just a simple demo. + * + * I have used Helper classes to keep the code cleaner. + */ +public class TestClient +{ + private byte major; + private byte minor; + private ConnectionURL _url; + + public AMQPConnection openConnection() throws Exception + { + _url = new AMQPConnectionURL(""); + TransportConnection conn = TransportConnectionFactory.createTransportConnection(_url, ConnectionType.VM); + return new AMQPConnection(conn); + } + + public void handleProtocolNegotiation(AMQPConnection con) throws Exception + { + // ConnectionStartBody + ConnectionStartBody connectionStartBody = con.openTCPConnection(); + major = connectionStartBody.getMajor(); + minor = connectionStartBody.getMajor(); + + FieldTable clientProperties = FieldTableFactory.newFieldTable(); + clientProperties.put(new AMQShortString(ClientProperties.instance.toString()),"Test"); // setting only the client id + + final String locales = new String(connectionStartBody.getLocales(), "utf8"); + final StringTokenizer tokenizer = new StringTokenizer(locales, " "); + + final String mechanism = SecurityHelper.chooseMechanism(connectionStartBody.getMechanisms()); + + SaslClient sc = Sasl.createSaslClient(new String[]{mechanism}, + null, "AMQP", "localhost", + null, SecurityHelper.createCallbackHandler(mechanism,_url)); + + ConnectionStartOkBody connectionStartOkBody = + ConnectionStartOkBody.createMethodBody(major, minor, clientProperties, + new AMQShortString(tokenizer.nextToken()), + new AMQShortString(mechanism), + (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null)); + // ConnectionSecureBody + ConnectionSecureBody connectionSecureBody = con.startOk(connectionStartOkBody); + + ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody( + major,minor,sc.evaluateChallenge(connectionSecureBody.getChallenge())); + + // Assuming the server is not going to send another challenge + ConnectionTuneBody connectionTuneBody = (ConnectionTuneBody)con.secureOk(connectionSecureOkBody); + + // Using broker supplied values + ConnectionTuneOkBody connectionTuneOkBody = + ConnectionTuneOkBody.createMethodBody(major,minor, + connectionTuneBody.getChannelMax(), + connectionTuneBody.getFrameMax(), + connectionTuneBody.getHeartbeat()); + con.tuneOk(connectionTuneOkBody); + } + + public static void main(String[] args) + { + TestClient test = new TestClient(); + AMQPConnection con = test.openConnection(); + + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java new file mode 100644 index 0000000000..3555704c4f --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.state; + +/** + * States used in the AMQ protocol. Used by the finite state machine to determine + * valid responses. + */ +public class AMQPState +{ + private final int _id; + + private final String _name; + + private AMQPState(int id, String name) + { + _id = id; + _name = name; + } + + public String toString() + { + return "AMQState: id = " + _id + " name: " + _name; + } + + // Connection state + public static final AMQPState CONNECTION_UNDEFINED = new AMQPState(0, "CONNECTION_UNDEFINED"); + public static final AMQPState CONNECTION_NOT_STARTED = new AMQPState(1, "CONNECTION_NOT_STARTED"); + public static final AMQPState CONNECTION_NOT_SECURE = new AMQPState(2, "CONNECTION_NOT_SECURE"); + public static final AMQPState CONNECTION_NOT_TUNED = new AMQPState(2, "CONNECTION_NOT_TUNED"); + public static final AMQPState CONNECTION_NOT_OPENED = new AMQPState(3, "CONNECTION_NOT_OPENED"); + public static final AMQPState CONNECTION_OPEN = new AMQPState(4, "CONNECTION_OPEN"); + public static final AMQPState CONNECTION_CLOSING = new AMQPState(5, "CONNECTION_CLOSING"); + public static final AMQPState CONNECTION_CLOSED = new AMQPState(6, "CONNECTION_CLOSED"); + + // Channel state + public static final AMQPState CHANNEL_NOT_OPENED = new AMQPState(10, "CHANNEL_NOT_OPENED"); + public static final AMQPState CHANNEL_OPENED = new AMQPState(11, "CHANNEL_OPENED"); + public static final AMQPState CHANNEL_CLOSED = new AMQPState(11, "CHANNEL_CLOSED"); + public static final AMQPState CHANNEL_SUSPEND = new AMQPState(11, "CHANNEL_SUSPEND"); +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java new file mode 100644 index 0000000000..67f854caf9 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java @@ -0,0 +1,8 @@ +package org.apache.qpid.nclient.amqp.state; + +import org.apache.qpid.nclient.core.AMQPException; + +public interface AMQPStateListener +{ + public void stateChanged(AMQPState oldState, AMQPState newState) throws AMQPException; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java new file mode 100644 index 0000000000..c1fde7181d --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java @@ -0,0 +1,26 @@ +package org.apache.qpid.nclient.amqp.state; + +import org.apache.qpid.nclient.core.AMQPException; + +public class AMQPStateMachine +{ + protected void checkIfValidStateTransition(AMQPState correctState,AMQPState currentState,AMQPState requiredState) throws IllegalStateTransitionException + { + if (currentState != correctState) + { + throw new IllegalStateTransitionException(currentState,requiredState); + } + } + + protected void checkIfValidStateTransition(AMQPState[] correctStates,AMQPState currentState,AMQPState requiredState) throws IllegalStateTransitionException + { + for(AMQPState correctState :correctStates) + { + if (currentState == correctState) + { + return; + } + } + throw new IllegalStateTransitionException(currentState,requiredState); + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java new file mode 100644 index 0000000000..2956a19e66 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java @@ -0,0 +1,11 @@ +package org.apache.qpid.nclient.amqp.state; + +import org.apache.qpid.AMQException; + +public interface AMQPStateManager +{ + + public void addListener(AMQPStateListener l)throws AMQException; + + public void removeListener(AMQPStateListener l)throws AMQException; +} \ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java new file mode 100644 index 0000000000..cbae4aafee --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.state; + +/** + * The Type of States used in the AMQ protocol. + * This allows to partition listeners by the type of states they want + * to listen rather than all. + * For example an Object might only be interested in Channel state + */ +public class AMQPStateType +{ + private final int _typeId; + + private final String _typeName; + + private AMQPStateType(int id, String name) + { + _typeId = id; + _typeName = name; + } + + public String toString() + { + return "AMQState: id = " + _typeId + " name: " + _typeName; + } + + // Connection state + public static final AMQPStateType CONNECTION_STATE = new AMQPStateType(0, "CONNECTION_STATE"); + public static final AMQPStateType CHANNEL_STATE = new AMQPStateType(1, "CHANNEL_STATE"); + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java new file mode 100644 index 0000000000..fdc24d1d2f --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.state; + +import org.apache.qpid.nclient.core.AMQPException; + +public class IllegalStateTransitionException extends AMQPException +{ + private AMQPState _currentState; + private AMQPState _desiredState; + + public IllegalStateTransitionException(AMQPState currentState, AMQPState desiredState) + { + super("No valid state transition defined from state " + currentState + + " to state " + desiredState); + _currentState = currentState; + _desiredState = desiredState; + } + + public AMQPState getCurrentState() + { + return _currentState; + } + + public AMQPState getDesiredState() + { + return _desiredState; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java b/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java new file mode 100644 index 0000000000..1c3bb788a0 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java @@ -0,0 +1,84 @@ +package org.apache.qpid.nclient.config; + +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.configuration.CombinedConfiguration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.SystemConfiguration; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.log4j.Logger; +import org.apache.qpid.nclient.core.QpidConstants; + +/** + * Loads a properties file from classpath. + * These values can be overwritten using system properties + */ +public class ClientConfiguration extends CombinedConfiguration { + + private static final Logger _logger = Logger.getLogger(ClientConfiguration.class); + private static ClientConfiguration _instance = new ClientConfiguration(); + + ClientConfiguration() + { + super(); + addConfiguration(new SystemConfiguration()); + try + { + XMLConfiguration config = new XMLConfiguration(); + config.load(getInputStream()); + addConfiguration(config); + } + catch (ConfigurationException e) + { + _logger.warn("Client Properties missing, using defaults",e); + } + } + + public static ClientConfiguration get() + { + return _instance; + } + + private InputStream getInputStream() + { + if (System.getProperty(QpidConstants.CONFIG_FILE_PATH) != null) + { + try + { + return new FileInputStream((String)System.getProperty(QpidConstants.CONFIG_FILE_PATH)); + } + catch(Exception e) + { + return this.getClass().getResourceAsStream("client.xml"); + } + } + else + { + return this.getClass().getResourceAsStream("client.xml"); + } + + } + + public static void main(String[] args) + { + System.out.println(ClientConfiguration.get().getString(QpidConstants.USE_SHARED_READ_WRITE_POOL)); + + //System.out.println(ClientConfiguration.get().getString("methodListeners.methodListener(1).[@class]")); + int count = ClientConfiguration.get().getMaxIndex(QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER); + System.out.println(count); + + for(int i=0 ;i list = ClientConfiguration.get().getList(methodListener + "." + QpidConstants.METHOD_CLASS); + for(String s:list) + { + System.out.println(s); + } + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml b/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml new file mode 100644 index 0000000000..587271acd1 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml @@ -0,0 +1,86 @@ + + + + + + org.apache.qpid.client.security.amqplain.AmqPlainSaslClientFactory + + + org.apache.qpid.client.security.UsernamePasswordCallbackHandler + org.apache.qpid.client.security.UsernamePasswordCallbackHandler + + + + +true +true +false +true +32 +32 +org.apache.qpid.server.protocol.AMQPFastProtocolHandler + + +20 + + +1000 +20 + + + + + + + + + + + + + org.apache.qpid.framing.ConnectionStartBody + org.apache.qpid.framing.ConnectionSecureBody + org.apache.qpid.framing.ConnectionTuneBody + org.apache.qpid.framing.ConnectionOpenOkBody + org.apache.qpid.framing.ConnectionCloseBody + org.apache.qpid.framing.ConnectionCloseOkBody + + org.apache.qpid.framing.ChannelOpenOkBody + org.apache.qpid.framing.ChannelCloseBody + org.apache.qpid.framing.ChannelCloseOkBody + org.apache.qpid.framing.ChannelFlowBody + org.apache.qpid.framing.ChannelFlowOkBody + org.apache.qpid.framing.ChannelOkBody + + org.apache.qpid.framing.ExchangeDeclareOkBody + org.apache.qpid.framing.ExchangeDeleteOkBody + + org.apache.qpid.framing.QueueDeclareOkBody + org.apache.qpid.framing.QueueBindOkBody + org.apache.qpid.framing.QueueUnbindOkBody + org.apache.qpid.framing.QueuePurgeOkBody + org.apache.qpid.framing.QueueDeleteOkBody + + org.apache.qpid.framing.MessageAppendBody + org.apache.qpid.framing.MessageCancelBody + org.apache.qpid.framing.MessageCheckpointBody + org.apache.qpid.framing.MessageCloseBody + org.apache.qpid.framing.MessageGetBody + org.apache.qpid.framing.MessageOffsetBody + org.apache.qpid.framing.MessageOkBody + org.apache.qpid.framing.MessageOpenBody + org.apache.qpid.framing.MessageQosBody + org.apache.qpid.framing.MessageRecoverBody + org.apache.qpid.framing.MessageRejectBody + org.apache.qpid.framing.MessageResumeBody + org.apache.qpid.framing.MessageTransferBody + + + + + org.apache.qpid.nclient.transport.TransportPhase + org.apache.qpid.nclient.execution.ExecutionPhase + org.apache.qpid.nclient.model.ModelPhase + + + \ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java new file mode 100644 index 0000000000..a029f7d4ff --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java @@ -0,0 +1,55 @@ +package org.apache.qpid.nclient.core; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; + + +public class AMQPException extends Exception +{ + private int _errorCode; + + public AMQPException(String message) + { + super(message); + } + + public AMQPException(String msg, Throwable t) + { + super(msg, t); + } + + public AMQPException(int errorCode, String msg, Throwable t) + { + super(msg + " [error code " + errorCode + ']', t); + _errorCode = errorCode; + } + + public AMQPException(int errorCode, String msg) + { + super(msg + " [error code " + errorCode + ']'); + _errorCode = errorCode; + } + + public AMQPException(Logger logger, String msg, Throwable t) + { + this(msg, t); + logger.error(getMessage(), this); + } + + public AMQPException(Logger logger, String msg) + { + this(msg); + logger.error(getMessage(), this); + } + + public AMQPException(Logger logger, int errorCode, String msg) + { + this(errorCode, msg); + logger.error(getMessage(), this); + } + + public int getErrorCode() + { + return _errorCode; + } + } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java new file mode 100644 index 0000000000..bf6a19b920 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java @@ -0,0 +1,97 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.core; + +public abstract class AbstractPhase implements Phase { + + protected PhaseContext _ctx; + protected Phase _nextInFlowPhase; + protected Phase _nextOutFlowPhase; + + + /** + * ------------------------------------------------ + * Phase - method introduced by Phase + * ------------------------------------------------ + */ + public void init(PhaseContext ctx,Phase nextInFlowPhase, Phase nextOutFlowPhase) { + _nextInFlowPhase = nextInFlowPhase; + _nextOutFlowPhase = nextOutFlowPhase; + _ctx = ctx; + } + + /** + * The start is called from the top + * of the pipe and is propogated the + * bottom. + * + * Each phase can override this to do + * any phase specific logic related + * pipe.start() + */ + public void start()throws AMQPException + { + if(_nextOutFlowPhase != null) + { + _nextOutFlowPhase.start(); + } + } + + /** + * Each phase can override this to do + * any phase specific cleanup + */ + public void close()throws AMQPException + { + + } + + public void messageReceived(Object frame) throws AMQPException + { + if(_nextInFlowPhase != null) + { + _nextInFlowPhase.messageReceived(frame); + } + } + + public void messageSent(Object frame) throws AMQPException + { + if (_nextOutFlowPhase != null) + { + _nextOutFlowPhase.messageSent(frame); + } + } + + public PhaseContext getPhaseContext() + { + return _ctx; + } + + public Phase getNextInFlowPhase() { + return _nextInFlowPhase; + } + + public Phase getNextOutFlowPhase() { + return _nextOutFlowPhase; + } + + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java new file mode 100644 index 0000000000..a3455cdacd --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java @@ -0,0 +1,20 @@ +package org.apache.qpid.nclient.core; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class DefaultPhaseContext implements PhaseContext +{ + public Map _props = new ConcurrentHashMap(); + + public Object getProperty(String name) + { + return _props.get(name); + } + + public void setProperty(String name, Object value) + { + _props.put(name, value); + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java new file mode 100644 index 0000000000..7aa10b77ff --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java @@ -0,0 +1,38 @@ +package org.apache.qpid.nclient.core; + + +public interface Phase +{ + + /** + * This method is used to initialize a phase + * + * @param ctx + * @param nextInFlowPhase + * @param nextOutFlowPhase + */ + public void init(PhaseContext ctx,Phase nextInFlowPhase, Phase nextOutFlowPhase); + + /** + * + * Implement logic related to physical opening + * of the pipe + */ + public void start()throws AMQPException; + + /** + * Implement cleanup in this method. + * This indicates the pipe is closing + */ + public void close()throws AMQPException; + + public void messageReceived(Object msg) throws AMQPException; + + public void messageSent(Object msg) throws AMQPException; + + public PhaseContext getPhaseContext(); + + public Phase getNextOutFlowPhase(); + + public Phase getNextInFlowPhase(); +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java new file mode 100644 index 0000000000..d5942fd785 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.core; + +/** + * This can be thought of as a session context associated + * with the pipe. This is transient and is scoped by the + * duration of the physical connection. + * + */ +public interface PhaseContext { + + public Object getProperty(String name); + + public void setProperty(String name, Object value); + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java new file mode 100644 index 0000000000..21602fec8e --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java @@ -0,0 +1,60 @@ +package org.apache.qpid.nclient.core; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.nclient.config.ClientConfiguration; + +public class PhaseFactory +{ + /** + * This method will create the pipe and return a reference + * to the top of the pipeline. + * + * The application can then use this (top most) phase and all + * calls will propogated down the pipe. + * + * Simillar calls orginating at the bottom of the pipeline + * will be propogated to the top. + * + * @param ctx + * @return + * @throws AMQPException + */ + public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException + { + Map phaseMap = new HashMap(); + List list = ClientConfiguration.get().getList(QpidConstants.PHASE_PIPE + "." + QpidConstants.PHASE); + for(String s:list) + { + try + { + Phase temp = (Phase)Class.forName(ClientConfiguration.get().getString(s)).newInstance(); + phaseMap.put(ClientConfiguration.get().getInt(s + "." + QpidConstants.INDEX),temp) ; + } + catch(Exception e) + { + throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s),e); + } + } + + Phase current = null; + Phase prev = null; + Phase next = null; + //Lets build the phase pipe. + for (int i=0; i events; + try + { + events = messageResponseBodyReceived(frame.getChannel(), (AMQResponseBody)bodyFrame); + for (AMQPMethodEvent event: events) + { + super.messageReceived(event); + } + } + catch (Exception e) + { + _logger.error("Error handling response",e); + } + } + } + + /** + * Need to figure out if the message is a request or a response + * that needs to be sent and then delegate it to the Request or response manager + * to prepare it. + */ + public void messageSent(Object msg) throws AMQPException + { + AMQPMethodEvent evt = (AMQPMethodEvent)msg; + if(evt.getCorrelationId() == QpidConstants.EMPTY_CORRELATION_ID) + { + // This is a request + AMQFrame frame = handleRequest(evt); + super.messageSent(frame); + } + else + { +// This is a response + List frames = handleResponse(evt); + for(AMQFrame frame: frames) + { + super.messageSent(frame); + } + } + } + + /** + * ------------------------------------------------ + * Methods to handle request response + * ----------------------------------------------- + */ + private AMQPMethodEvent messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Request frame received: " + requestBody); + } + ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(channelId); + if (responseManager == null) + throw new AMQException("Unable to find ResponseManager for channel " + channelId); + return responseManager.requestReceived(requestBody); + } + + private List messageResponseBodyReceived(int channelId, AMQResponseBody responseBody) throws Exception + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Response frame received: " + responseBody); + } + RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(channelId); + if (requestManager == null) + throw new AMQException("Unable to find RequestManager for channel " + channelId); + return requestManager.responseReceived(responseBody); + } + + private AMQFrame handleRequest(AMQPMethodEvent evt) + { + RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(evt.getChannelId()); + return requestManager.sendRequest(evt); + } + + private List handleResponse(AMQPMethodEvent evt) throws AMQPException + { + ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(evt.getChannelId()); + try + { + return responseManager.sendResponse(evt); + } + catch(Exception e) + { + throw new AMQPException("Error handling response",e); + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java new file mode 100644 index 0000000000..761ec1b050 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java @@ -0,0 +1,129 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.execution; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQRequestBody; +import org.apache.qpid.framing.AMQResponseBody; +import org.apache.qpid.framing.RequestResponseMappingException; +import org.apache.qpid.nclient.model.AMQPMethodEvent; + +public class RequestManager +{ + private static final Logger logger = Logger.getLogger(RequestManager.class); + + private int channel; + + /** + * Used for logging and debugging only - allows the context of this instance + * to be known. + */ + private boolean serverFlag; + private long connectionId; + + /** + * Request and response frames must have a requestID and responseID which + * indepenedently increment from 0 on a per-channel basis. These are the + * counters, and contain the value of the next (not yet used) frame. + */ + private long requestIdCount; + + /** + * These keep track of the last requestId and responseId to be received. + */ + private long lastProcessedResponseId; + + private ConcurrentHashMap requestSentMap; + + public RequestManager(long connectionId, int channel, boolean serverFlag) + { + this.channel = channel; + this.serverFlag = serverFlag; + this.connectionId = connectionId; + requestIdCount = 1L; + lastProcessedResponseId = 0L; + requestSentMap = new ConcurrentHashMap(); + } + + // *** Functions to originate a request *** + + public AMQFrame sendRequest(AMQPMethodEvent evt) + { + long requestId = getNextRequestId(); // Get new request ID + AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId, + lastProcessedResponseId, evt.getMethod()); + if (logger.isDebugEnabled()) + { + logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + + "] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + evt.getMethod()); + } + requestSentMap.put(requestId, evt.getCorrelationId()); + return requestFrame; + } + + public List responseReceived(AMQResponseBody responseBody) + throws Exception + { + long requestIdStart = responseBody.getRequestId(); + long requestIdStop = requestIdStart + responseBody.getBatchOffset(); + if (logger.isDebugEnabled()) + { + logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX RES: " + + responseBody + "; " + responseBody.getMethodPayload()); + } + + List events = new ArrayList(); + for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++) + { + if (requestSentMap.get(requestId) == null) + { + throw new RequestResponseMappingException(requestId, + "Failed to locate requestId " + requestId + " in requestSentMap."); + } + long localCorrelationId = requestSentMap.get(requestId); + AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel, responseBody.getMethodPayload(), + requestId,localCorrelationId); + events.add(methodEvent); + requestSentMap.remove(requestId); + } + lastProcessedResponseId = responseBody.getResponseId(); + return events; + } + + // *** Management functions *** + + public int requestsMapSize() + { + return requestSentMap.size(); + } + + // *** Private helper functions *** + + private long getNextRequestId() + { + return requestIdCount++; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java new file mode 100644 index 0000000000..97d9576a4e --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java @@ -0,0 +1,240 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.execution; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQRequestBody; +import org.apache.qpid.framing.AMQResponseBody; +import org.apache.qpid.framing.RequestResponseMappingException; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.model.AMQPMethodEvent; + +public class ResponseManager +{ + private static final Logger logger = Logger.getLogger(ResponseManager.class); + + private int channel; + + /** + * Used for logging and debugging only - allows the context of this instance + * to be known. + */ + private boolean serverFlag; + private long connectionId; + + private int maxAccumulatedResponses = 20; // Default + + /** + * Request and response frames must have a requestID and responseID which + * indepenedently increment from 0 on a per-channel basis. These are the + * counters, and contain the value of the next (not yet used) frame. + */ + private long responseIdCount; + + /** + * These keep track of the last requestId and responseId to be received. + */ + private long lastReceivedRequestId; + + /** + * Last requestID sent in a response (for batching) + */ + private long lastSentRequestId; + + private class ResponseStatus implements Comparable + { + private long requestId; + private AMQMethodBody responseMethodBody; + + public ResponseStatus(long requestId) + { + this.requestId = requestId; + responseMethodBody = null; + } + + public int compareTo(ResponseStatus o) + { + return (int)(requestId - o.requestId); + } + + public String toString() + { + // Need to define this + return ""; + } + } + + private ConcurrentHashMap responseMap; + + public ResponseManager(long connectionId, int channel, boolean serverFlag) + { + this.channel = channel; + this.serverFlag = serverFlag; + this.connectionId = connectionId; + responseIdCount = 1L; + lastReceivedRequestId = 0L; + maxAccumulatedResponses = ClientConfiguration.get().getInt(QpidConstants.MAX_ACCUMILATED_RESPONSES); + responseMap = new ConcurrentHashMap(); + } + + // *** Functions to handle an incoming request *** + + public AMQPMethodEvent requestReceived(AMQRequestBody requestBody) throws Exception + { + long requestId = requestBody.getRequestId(); + if (logger.isDebugEnabled()) + { + logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX REQ: " + + requestBody + "; " + requestBody.getMethodPayload()); + } + long responseMark = requestBody.getResponseMark(); + lastReceivedRequestId = requestId; + responseMap.put(requestId, new ResponseStatus(requestId)); + + AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel, + requestBody.getMethodPayload(), requestId); + + return methodEvent; + } + + public List sendResponse(AMQPMethodEvent evt) + throws RequestResponseMappingException + { + long requestId = evt.getCorrelationId(); + AMQMethodBody responseMethodBody = evt.getMethod(); + + if (logger.isDebugEnabled()) + { + logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + + "] TX RES: Res[# " + requestId + "]; " + responseMethodBody); + } + + ResponseStatus responseStatus = responseMap.get(requestId); + if (responseStatus == null) + { + throw new RequestResponseMappingException(requestId, + "Failed to locate requestId " + requestId + " in responseMap." + responseMap); + } + if (responseStatus.responseMethodBody != null) + { + throw new RequestResponseMappingException(requestId, "RequestId " + + requestId + " already has a response in responseMap."); + } + responseStatus.responseMethodBody = responseMethodBody; + return doBatches(); + } + + // *** Management functions *** + + /** + * Sends batched responses - i.e. all those members of responseMap that have + * received a response. + */ + public synchronized List doBatches() + { + long startRequestId = 0; + int numAdditionalRequestIds = 0; + Class responseMethodBodyClass = null; + List frames = new ArrayList(); + Iterator lItr = responseMap.keySet().iterator(); + while (lItr.hasNext()) + { + long requestId = lItr.next(); + ResponseStatus responseStatus = responseMap.get(requestId); + if (responseStatus.responseMethodBody != null) + { + frames.add(sendResponseBatchFrame(requestId, 0, responseStatus.responseMethodBody)); + lItr.remove(); + } + } + + return frames; + } + + /** + * Total number of entries in the responseMap - including both those that + * are outstanding (i.e. no response has been received) and those that are + * batched (those for which responses have been received but have not yet + * been collected together and sent). + */ + public int responsesMapSize() + { + return responseMap.size(); + } + + /** + * As the responseMap may contain both outstanding responses (those with + * ResponseStatus.responseMethodBody still null) and responses waiting to + * be batched (those with ResponseStatus.responseMethodBody not null), we + * need to count only those in the map with responseMethodBody null. + */ + public int outstandingResponses() + { + int cnt = 0; + for (Long requestId : responseMap.keySet()) + { + if (responseMap.get(requestId).responseMethodBody == null) + cnt++; + } + return cnt; + } + + /** + * As the responseMap may contain both outstanding responses (those with + * ResponseStatus.responseMethodBody still null) and responses waiting to + * be batched (those with ResponseStatus.responseMethodBody not null), we + * need to count only those in the map with responseMethodBody not null. + */ + public int batchedResponses() + { + int cnt = 0; + for (Long requestId : responseMap.keySet()) + { + if (responseMap.get(requestId).responseMethodBody != null) + cnt++; + } + return cnt; + } + + // *** Private helper functions *** + + private long getNextResponseId() + { + return responseIdCount++; + } + + private AMQFrame sendResponseBatchFrame(long firstRequestId, int numAdditionalRequests, + AMQMethodBody responseMethodBody) + { + long responseId = getNextResponseId(); // Get new response ID + AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId, + firstRequestId, numAdditionalRequests, responseMethodBody); + return responseFrame; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java new file mode 100644 index 0000000000..e7b762b77a --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.nclient.message; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.qpid.client.message.MessageHeaders; + +public class AMQPApplicationMessage { + + private int bytesReceived = 0; + private int channelId; + private byte[] referenceId; + private List contents = new LinkedList(); + private long deliveryTag; + private boolean redeliveredFlag; + private MessageHeaders messageHeaders; + + public AMQPApplicationMessage(int channelId, byte[] referenceId) + { + this.channelId = channelId; + this.referenceId = referenceId; + } + + public AMQPApplicationMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, boolean redeliveredFlag) + { + this.channelId = channelId; + this.deliveryTag = deliveryTag; + this.messageHeaders = messageHeaders; + this.redeliveredFlag = redeliveredFlag; + } + + public AMQPApplicationMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, byte[] content, boolean redeliveredFlag) + { + this.channelId = channelId; + this.deliveryTag = deliveryTag; + this.messageHeaders = messageHeaders; + this.redeliveredFlag = redeliveredFlag; + addContent(content); + } + + public void addContent(byte[] content) + { + contents.add(content); + bytesReceived += content.length; + } + + public int getBytesReceived() + { + return bytesReceived; + } + + public int getChannelId() + { + return channelId; + } + + public byte[] getReferenceId() + { + return referenceId; + } + + public List getContents() + { + return contents; + } + + public long getDeliveryTag() + { + return deliveryTag; + } + + public boolean getRedeliveredFlag() + { + return redeliveredFlag; + } + + public MessageHeaders getMessageHeaders() + { + return messageHeaders; + } + + public String toString() + { + return "UnprocessedMessage: ch=" + channelId + "; bytesReceived=" + bytesReceived + "; deliveryTag=" + + deliveryTag + "; MsgHdrs=" + messageHeaders + "Num contents=" + contents.size() + "; First content=" + + new String(contents.get(0)); + } + + public void setDeliveryTag(long deliveryTag) + { + this.deliveryTag = deliveryTag; + } + + public void setMessageHeaders(MessageHeaders messageHeaders) + { + this.messageHeaders = messageHeaders; + } + + public void setRedeliveredFlag(boolean redeliveredFlag) + { + this.redeliveredFlag = redeliveredFlag; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java new file mode 100644 index 0000000000..53a2142718 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.message; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.AbstractPhase; +import org.apache.qpid.nclient.model.ModelPhase; + +public class MessagePhase extends AbstractPhase { + + private final BlockingQueue _queue = new LinkedBlockingQueue(); + private static final Logger _logger = Logger.getLogger(ModelPhase.class); + + public void messageReceived(Object msg) throws AMQPException + { + try + { + _queue.put((AMQPApplicationMessage)msg); + } + catch (InterruptedException e) + { + _logger.error("Error adding message to queue", e); + } + super.messageReceived(msg); + } + + public void messageSent(Object msg) throws AMQPException + { + super.messageSent(msg); + } + + public AMQPApplicationMessage getNextMessage() + { + return _queue.poll(); + } + + public AMQPApplicationMessage getNextMessage(long timeout, TimeUnit tu) throws InterruptedException + { + return _queue.poll(timeout, tu); + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java new file mode 100644 index 0000000000..93eecdc0cc --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java @@ -0,0 +1,17 @@ +package org.apache.qpid.nclient.message; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.message.MessageHeaders; + +public interface MessageStore { + + public void removeMessage(String identifier); + + public void storeContentBodyChunk(String identifier,byte[] contentBody) throws AMQException; + + public void storeMessageMetaData(String identifier, MessageHeaders messageHeaders) throws AMQException; + + public AMQPApplicationMessage getMessage(String identifier) throws AMQException; + + public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQException; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java new file mode 100644 index 0000000000..26cf2327d7 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java @@ -0,0 +1,40 @@ +package org.apache.qpid.nclient.message; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.message.MessageHeaders; + +public class TransientMessageStore implements MessageStore { + + private Map messageMap = new ConcurrentHashMap(); + + public AMQPApplicationMessage getMessage(String identifier) + throws AMQException + { + return messageMap.get(identifier); + } + + public void removeMessage(String identifier) + { + messageMap.remove(identifier); + } + + public void storeContentBodyChunk(String identifier, byte[] contentBody) + throws AMQException + { + + } + + public void storeMessageMetaData(String identifier, + MessageHeaders messageHeaders) throws AMQException + { + + } + + public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQException + { + + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java b/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java new file mode 100644 index 0000000000..c33c087da8 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java @@ -0,0 +1,64 @@ +package org.apache.qpid.nclient.model; + +import org.apache.qpid.framing.AMQMethodBody; + +/** + * This class is exactly the same as the AMQMethod event. + * Except I renamed requestId to corelationId, so I could use it both ways. + * + * I didn't want to modify anything in common so that there is no + * impact on the existing code. + * + */ +public class AMQPMethodEvent { + + private final M _method; + private final int _channelId; + private final long _correlationId; + private long _localCorrletionId = 0; + + public AMQPMethodEvent(int channelId, M method, long correlationId,long localCorrletionId) + { + _channelId = channelId; + _method = method; + _correlationId = correlationId; + _localCorrletionId = localCorrletionId; + } + + public AMQPMethodEvent(int channelId, M method, long correlationId) + { + _channelId = channelId; + _method = method; + _correlationId = correlationId; + } + + public M getMethod() + { + return _method; + } + + public int getChannelId() + { + return _channelId; + } + + public long getCorrelationId() + { + return _correlationId; + } + + public long getLocalCorrelationId() + { + return _localCorrletionId; + } + + public String toString() + { + StringBuilder buf = new StringBuilder("Method event: \n"); + buf.append("Channel id: \n").append(_channelId); + buf.append("Method: \n").append(_method); + buf.append("Request Id: ").append(_correlationId); + buf.append("Local Correlation Id: ").append(_localCorrletionId); + return buf.toString(); + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java b/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java new file mode 100644 index 0000000000..52b9f6de91 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java @@ -0,0 +1,11 @@ +package org.apache.qpid.nclient.model; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.nclient.core.AMQPException; + +public interface AMQPMethodListener +{ + + public boolean methodReceived(AMQPMethodEvent evt) throws AMQPException; + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java new file mode 100644 index 0000000000..77003b4d21 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java @@ -0,0 +1,133 @@ +package org.apache.qpid.nclient.model; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.log4j.Logger; +import org.apache.qpid.nclient.amqp.state.AMQPStateManager; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.AbstractPhase; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.PhaseContext; +import org.apache.qpid.nclient.core.QpidConstants; + +/** + * This Phase handles Layer 3 functionality of the AMQP spec. + * This class acts as the interface between the API and the pipeline + */ +public class ModelPhase extends AbstractPhase { + + private static final Logger _logger = Logger.getLogger(ModelPhase.class); + + private Map _methodListners = new HashMap(); + + /** + * ------------------------------------------------ + * Phase - methods introduced by Phase + * ------------------------------------------------ + */ + public void init(PhaseContext ctx, Phase nextInFlowPhase, Phase nextOutFlowPhase) + { + super.init(ctx, nextInFlowPhase, nextOutFlowPhase); + try + { + loadMethodListeners(); + } + catch(Exception e) + { + _logger.fatal("Error loading method listeners", e); + } + } + + public void messageReceived(Object msg) throws AMQPException + { + notifyMethodListerners((AMQPMethodEvent)msg); + + // not doing super.methodReceived here, as this is the end of + // the pipeline + //super.messageReceived(msg); + } + + /** + * This method should only except and pass messages + * of Type @see AMQPMethodEvent + */ + public void messageSent(Object msg) throws AMQPException + { + super.messageSent(msg); + } + + /** + * ------------------------------------------------ + * Event Handling + * ------------------------------------------------ + */ + + public void notifyMethodListerners(AMQPMethodEvent event) throws AMQPException + { + if (_methodListners.containsKey(event.getMethod().getClass())) + { + List listeners = _methodListners.get(event.getMethod().getClass()); + + if(listeners.size()>0) + { + throw new AMQPException("There are no registered listeners for this method"); + } + + for(AMQPMethodListener l : listeners) + { + try + { + l.methodReceived(event); + } + catch (Exception e) + { + _logger.error("Error handling method event " + event, e); + } + } + } + } + + /** + * ------------------------------------------------ + * Configuration + * ------------------------------------------------ + */ + + /** + * This method loads method listeners from the client.xml file + * For each method class there is a list of listeners + */ + private void loadMethodListeners() throws Exception + { + int count = ClientConfiguration.get().getMaxIndex(QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER); + System.out.println(count); + + for(int i=0 ;i list = ClientConfiguration.get().getList(methodListener + "." + QpidConstants.METHOD_CLASS); + for(String s:list) + { + List listeners; + Class methodClass = Class.forName(s); + if (_methodListners.containsKey(methodClass)) + { + listeners = _methodListners.get(methodClass); + } + else + { + listeners = new ArrayList(); + _methodListners.put(methodClass,listeners); + } + listeners.add(listenerClass); + } + } + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java new file mode 100644 index 0000000000..fc5878f6ef --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.security; + +import javax.security.auth.callback.CallbackHandler; + +import org.apache.qpid.nclient.transport.ConnectionURL; + +public interface AMQPCallbackHandler extends CallbackHandler +{ + void initialise(ConnectionURL url); +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java new file mode 100644 index 0000000000..28ba2e355c --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.security; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.log4j.Logger; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.QpidConstants; + +public class CallbackHandlerRegistry +{ + private static final Logger _logger = Logger.getLogger(CallbackHandlerRegistry.class); + + private static CallbackHandlerRegistry _instance = new CallbackHandlerRegistry(); + + private Map _mechanismToHandlerClassMap = new HashMap(); + + private String _mechanisms; + + public static CallbackHandlerRegistry getInstance() + { + return _instance; + } + + public Class getCallbackHandlerClass(String mechanism) + { + return _mechanismToHandlerClassMap.get(mechanism); + } + + public String getMechanisms() + { + return _mechanisms; + } + + private CallbackHandlerRegistry() + { + // first we register any Sasl client factories + DynamicSaslRegistrar.registerSaslProviders(); + parseProperties(); + } + + private void parseProperties() + { + List mechanisms = ClientConfiguration.get().getList(QpidConstants.AMQP_SECURITY_MECHANISMS); + + for (String mechanism : mechanisms) + { + String className = ClientConfiguration.get().getString(QpidConstants.AMQP_SECURITY_MECHANISM_HANDLER + "_" + mechanism); + Class clazz = null; + try + { + clazz = Class.forName(className); + if (!AMQPCallbackHandler.class.isAssignableFrom(clazz)) + { + _logger.warn("SASL provider " + clazz + " does not implement " + AMQPCallbackHandler.class + + ". Skipping"); + continue; + } + _mechanismToHandlerClassMap.put(mechanism, clazz); + if (_mechanisms == null) + { + _mechanisms = mechanism; + } + else + { + // one time cost + _mechanisms = _mechanisms + " " + mechanism; + } + } + catch (ClassNotFoundException ex) + { + _logger.warn("Unable to load class " + className + ". Skipping that SASL provider"); + continue; + } + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java new file mode 100644 index 0000000000..958c6c4782 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.security; + +import java.security.Security; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import javax.security.sasl.SaslClientFactory; + +import org.apache.log4j.Logger; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.QpidConstants; + +public class DynamicSaslRegistrar +{ + private static final Logger _logger = Logger.getLogger(DynamicSaslRegistrar.class); + + public static void registerSaslProviders() + { + Map> factories = parseProperties(); + if (factories.size() > 0) + { + Security.addProvider(new JCAProvider(factories)); + _logger.debug("Dynamic SASL provider added as a security provider"); + } + } + + private static Map> parseProperties() + { + List mechanisms = ClientConfiguration.get().getList(QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES); + TreeMap> factoriesToRegister = + new TreeMap>(); + for (String mechanism: mechanisms) + { + String className = ClientConfiguration.get().getString(QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY + "_" + mechanism); + try + { + Class clazz = Class.forName(className); + if (!(SaslClientFactory.class.isAssignableFrom(clazz))) + { + _logger.error("Class " + clazz + " does not implement " + SaslClientFactory.class + " - skipping"); + continue; + } + factoriesToRegister.put(mechanism, (Class) clazz); + } + catch (Exception ex) + { + _logger.error("Error instantiating SaslClientFactory calss " + className + " - skipping"); + } + } + return factoriesToRegister; + } + + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java new file mode 100644 index 0000000000..10ccb88821 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.security; + +import javax.security.sasl.SaslClientFactory; +import java.security.Provider; +import java.security.Security; +import java.util.Map; + +public class JCAProvider extends Provider +{ + public JCAProvider(Map> providerMap) + { + super("AMQSASLProvider", 1.0, "A JCA provider that registers all " + + "AMQ SASL providers that want to be registered"); + register(providerMap); + Security.addProvider(this); + } + + private void register(Map> providerMap) + { + for (Map.Entry> me : + providerMap.entrySet()) + { + put("SaslClientFactory." + me.getKey(), me.getValue().getName()); + } + } +} \ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java new file mode 100644 index 0000000000..7297d07134 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.security; + +import java.io.IOException; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; + +import org.apache.qpid.nclient.transport.ConnectionURL; + +public class UsernamePasswordCallbackHandler implements AMQPCallbackHandler +{ + private ConnectionURL _url; + + public void initialise(ConnectionURL url) + { + _url = url; + } + + public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException + { + for (int i = 0; i < callbacks.length; i++) + { + Callback cb = callbacks[i]; + if (cb instanceof NameCallback) + { + ((NameCallback)cb).setName(_url.getUsername()); + } + else if (cb instanceof PasswordCallback) + { + ((PasswordCallback)cb).setPassword((_url.getPassword()).toCharArray()); + } + else + { + throw new UnsupportedCallbackException(cb); + } + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java new file mode 100644 index 0000000000..9e878fb839 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java @@ -0,0 +1,344 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.transport; + +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.url.URLSyntaxException; + +import java.util.HashMap; +import java.net.URISyntaxException; +import java.net.URI; + +public class AMQPBrokerDetails implements BrokerDetails +{ + private String _host; + + private int _port; + + private String _transport; + + private HashMap _options; + + public AMQPBrokerDetails() + { + _options = new HashMap(); + } + + public AMQPBrokerDetails(String url) throws URLSyntaxException + { + this(); + // URL should be of format tcp://host:port?option='value',option='value' + try + { + URI connection = new URI(url); + + String transport = connection.getScheme(); + + // Handles some defaults to minimise changes to existing broker URLS e.g. localhost + if (transport != null) + { + //todo this list of valid transports should be enumerated somewhere + if ((!(transport.equalsIgnoreCase("vm") || transport.equalsIgnoreCase("tcp")))) + { + if (transport.equalsIgnoreCase("localhost")) + { + connection = new URI(DEFAULT_TRANSPORT + "://" + url); + transport = connection.getScheme(); + } + else + { + if (url.charAt(transport.length()) == ':' && url.charAt(transport.length() + 1) != '/') + { + //Then most likely we have a host:port value + connection = new URI(DEFAULT_TRANSPORT + "://" + url); + transport = connection.getScheme(); + } + else + { + URLHelper.parseError(0, transport.length(), "Unknown transport", url); + } + } + } + } + else + { + //Default the transport + connection = new URI(DEFAULT_TRANSPORT + "://" + url); + transport = connection.getScheme(); + } + + if (transport == null) + { + URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" + " In broker URL:'" + url + + "' Format: " + URL_FORMAT_EXAMPLE, ""); + } + + setTransport(transport); + + String host = connection.getHost(); + + // Fix for Java 1.5 + if (host == null) + { + host = ""; + } + + setHost(host); + + int port = connection.getPort(); + + if (port == -1) + { + // Fix for when there is port data but it is not automatically parseable by getPort(). + String auth = connection.getAuthority(); + + if (auth != null && auth.contains(":")) + { + int start = auth.indexOf(":") + 1; + int end = start; + boolean looking = true; + boolean found = false; + //Walk the authority looking for a port value. + while (looking) + { + try + { + end++; + Integer.parseInt(auth.substring(start, end)); + + if (end >= auth.length()) + { + looking = false; + found = true; + } + } + catch (NumberFormatException nfe) + { + looking = false; + } + + } + if (found) + { + setPort(Integer.parseInt(auth.substring(start, end))); + } + else + { + URLHelper.parseError(connection.toString().indexOf(connection.getAuthority()) + end - 1, + "Illegal character in port number", connection.toString()); + } + + } + else + { + setPort(DEFAULT_PORT); + } + } + else + { + setPort(port); + } + + String queryString = connection.getQuery(); + + URLHelper.parseOptions(_options, queryString); + + //Fragment is #string (not used) + } + catch (URISyntaxException uris) + { + if (uris instanceof URLSyntaxException) + { + throw (URLSyntaxException) uris; + } + + URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); + } + } + + public AMQPBrokerDetails(String host, int port, boolean useSSL) + { + _host = host; + _port = port; + + if (useSSL) + { + setOption(OPTIONS_SSL, "true"); + } + } + + public String getHost() + { + return _host; + } + + public void setHost(String _host) + { + this._host = _host; + } + + public int getPort() + { + return _port; + } + + public void setPort(int _port) + { + this._port = _port; + } + + public String getTransport() + { + return _transport; + } + + public void setTransport(String _transport) + { + this._transport = _transport; + } + + public String getOption(String key) + { + return _options.get(key); + } + + public void setOption(String key, String value) + { + _options.put(key, value); + } + + public long getTimeout() + { + if (_options.containsKey(OPTIONS_CONNECT_TIMEOUT)) + { + try + { + return Long.parseLong(_options.get(OPTIONS_CONNECT_TIMEOUT)); + } + catch (NumberFormatException nfe) + { + //Do nothing as we will use the default below. + } + } + + return BrokerDetails.DEFAULT_CONNECT_TIMEOUT; + } + + public void setTimeout(long timeout) + { + setOption(OPTIONS_CONNECT_TIMEOUT, Long.toString(timeout)); + } + + public String toString() + { + StringBuffer sb = new StringBuffer(); + + sb.append(_transport); + sb.append("://"); + + if (!(_transport.equalsIgnoreCase("vm"))) + { + sb.append(_host); + } + + sb.append(':'); + sb.append(_port); + + sb.append(printOptionsURL()); + + return sb.toString(); + } + + public boolean equals(Object o) + { + if (!(o instanceof BrokerDetails)) + { + return false; + } + + BrokerDetails bd = (BrokerDetails) o; + + return _host.equalsIgnoreCase(bd.getHost()) && (_port == bd.getPort()) + && _transport.equalsIgnoreCase(bd.getTransport()) && (useSSL() == bd.useSSL()); + + //todo do we need to compare all the options as well? + } + + private String printOptionsURL() + { + StringBuffer optionsURL = new StringBuffer(); + + optionsURL.append('?'); + + if (!(_options.isEmpty())) + { + + for (String key : _options.keySet()) + { + optionsURL.append(key); + + optionsURL.append("='"); + + optionsURL.append(_options.get(key)); + + optionsURL.append("'"); + + optionsURL.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + } + + //removeKey the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options + optionsURL.deleteCharAt(optionsURL.length() - 1); + + return optionsURL.toString(); + } + + public boolean useSSL() + { + // To be friendly to users we should be case insensitive. + // or simply force users to conform to OPTIONS_SSL + // todo make case insensitive by trying ssl Ssl sSl ssL SSl SsL sSL SSL + + if (_options.containsKey(OPTIONS_SSL)) + { + return _options.get(OPTIONS_SSL).equalsIgnoreCase("true"); + } + + return USE_SSL_DEFAULT; + } + + public void useSSL(boolean ssl) + { + setOption(OPTIONS_SSL, Boolean.toString(ssl)); + } + + public static String checkTransport(String broker) + { + if ((!broker.contains("://"))) + { + return "tcp://" + broker; + } + else + { + return broker; + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java new file mode 100644 index 0000000000..d0259830c6 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java @@ -0,0 +1,412 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.transport; + +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.url.URLSyntaxException; + +import java.util.*; +import java.net.InetAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; + +public class AMQPConnectionURL implements ConnectionURL +{ + private String _url; + private String _failoverMethod; + private HashMap _failoverOptions; + private HashMap _options; + private List _brokers; + private String _clientName; + private String _username; + private String _password; + private String _virtualHost; + + public AMQPConnectionURL(String fullURL) throws URLSyntaxException + { + _url = fullURL; + _options = new HashMap(); + _brokers = new LinkedList(); + _failoverOptions = new HashMap(); + + try + { + URI connection = new URI(fullURL); + + if (connection.getScheme() == null || !(connection.getScheme().equalsIgnoreCase(AMQ_PROTOCOL))) + { + throw new URISyntaxException(fullURL, "Not an AMQP URL"); + } + + if (connection.getHost() == null || connection.getHost().equals("")) + { + String uid = getUniqueClientID(); + if (uid == null) + { + URLHelper.parseError(-1, "Client Name not specified", fullURL); + } + else + { + setClientName(uid); + } + + } + else + { + setClientName(connection.getHost()); + } + + String userInfo = connection.getUserInfo(); + + if (userInfo == null) + { + //Fix for Java 1.5 which doesn't parse UserInfo for non http URIs + userInfo = connection.getAuthority(); + + if (userInfo != null) + { + int atIndex = userInfo.indexOf('@'); + + if (atIndex != -1) + { + userInfo = userInfo.substring(0, atIndex); + } + else + { + userInfo = null; + } + } + + } + + if (userInfo == null) + { + URLHelper.parseError(AMQ_PROTOCOL.length() + 3, + "User information not found on url", fullURL); + } + else + { + parseUserInfo(userInfo); + } + String virtualHost = connection.getPath(); + + if (virtualHost != null && (!virtualHost.equals(""))) + { + setVirtualHost(virtualHost); + } + else + { + int authLength = connection.getAuthority().length(); + int start = AMQ_PROTOCOL.length() + 3; + int testIndex = start + authLength; + if (testIndex < fullURL.length() && fullURL.charAt(testIndex) == '?') + { + URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL); + } + else + { + URLHelper.parseError(-1, "Virtual host not specified", fullURL); + } + + } + + + URLHelper.parseOptions(_options, connection.getQuery()); + + processOptions(); + + //Fragment is #string (not used) + //System.out.println(connection.getFragment()); + + } + catch (URISyntaxException uris) + { + if (uris instanceof URLSyntaxException) + { + throw (URLSyntaxException) uris; + } + + int slash = fullURL.indexOf("\\"); + + if (slash == -1) + { + URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); + } + else + { + if (slash != 0 && fullURL.charAt(slash - 1) == ':') + { + URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL); + } + else + { + URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL); + } + } + + } + } + + private String getUniqueClientID() + { + try + { + InetAddress addr = InetAddress.getLocalHost(); + return addr.getHostName() + System.currentTimeMillis(); + } + catch (UnknownHostException e) + { + return null; + } + } + + private void parseUserInfo(String userinfo) throws URLSyntaxException + { + //user info = user:pass + + int colonIndex = userinfo.indexOf(':'); + + if (colonIndex == -1) + { + URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(), + "Null password in user information not allowed.", _url); + } + else + { + setUsername(userinfo.substring(0, colonIndex)); + setPassword(userinfo.substring(colonIndex + 1)); + } + + } + + private void processOptions() throws URLSyntaxException + { + if (_options.containsKey(OPTIONS_BROKERLIST)) + { + String brokerlist = _options.get(OPTIONS_BROKERLIST); + + //brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value' + StringTokenizer st = new StringTokenizer(brokerlist, "" + URLHelper.BROKER_SEPARATOR); + + while (st.hasMoreTokens()) + { + String broker = st.nextToken(); + + _brokers.add(new AMQPBrokerDetails(broker)); + } + + _options.remove(OPTIONS_BROKERLIST); + } + + if (_options.containsKey(OPTIONS_FAILOVER)) + { + String failover = _options.get(OPTIONS_FAILOVER); + + // failover='method?option='value',option='value'' + + int methodIndex = failover.indexOf('?'); + + if (methodIndex > -1) + { + _failoverMethod = failover.substring(0, methodIndex); + URLHelper.parseOptions(_failoverOptions, failover.substring(methodIndex + 1)); + } + else + { + _failoverMethod = failover; + } + + _options.remove(OPTIONS_FAILOVER); + } + } + + public String getURL() + { + return _url; + } + + public String getFailoverMethod() + { + return _failoverMethod; + } + + public String getFailoverOption(String key) + { + return _failoverOptions.get(key); + } + + public void setFailoverOption(String key, String value) + { + _failoverOptions.put(key, value); + } + + public int getBrokerCount() + { + return _brokers.size(); + } + + public BrokerDetails getBrokerDetails(int index) + { + if (index < _brokers.size()) + { + return _brokers.get(index); + } + else + { + return null; + } + } + + public void addBrokerDetails(BrokerDetails broker) + { + if (!(_brokers.contains(broker))) + { + _brokers.add(broker); + } + } + + public List getAllBrokerDetails() + { + return _brokers; + } + + public String getClientName() + { + return _clientName; + } + + public void setClientName(String clientName) + { + _clientName = clientName; + } + + public String getUsername() + { + return _username; + } + + public void setUsername(String username) + { + _username = username; + } + + public String getPassword() + { + return _password; + } + + public void setPassword(String password) + { + _password = password; + } + + public String getVirtualHost() + { + return _virtualHost; + } + + public void setVirtualHost(String virtuaHost) + { + _virtualHost = virtuaHost; + } + + public String getOption(String key) + { + return _options.get(key); + } + + public void setOption(String key, String value) + { + _options.put(key, value); + } + + public String toString() + { + StringBuffer sb = new StringBuffer(); + + sb.append(AMQ_PROTOCOL); + sb.append("://"); + + if (_username != null) + { + sb.append(_username); + + if (_password != null) + { + sb.append(':'); + sb.append(_password); + } + + sb.append('@'); + } + + sb.append(_clientName); + + sb.append(_virtualHost); + + sb.append(optionsToString()); + + return sb.toString(); + } + + private String optionsToString() + { + StringBuffer sb = new StringBuffer(); + + sb.append("?" + OPTIONS_BROKERLIST + "='"); + + for (BrokerDetails service : _brokers) + { + sb.append(service.toString()); + sb.append(';'); + } + + sb.deleteCharAt(sb.length() - 1); + sb.append("'"); + + if (_failoverMethod != null) + { + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + sb.append(OPTIONS_FAILOVER + "='"); + sb.append(_failoverMethod); + sb.append(URLHelper.printOptions(_failoverOptions)); + sb.append("'"); + } + + return sb.toString(); + } + + + public static void main(String[] args) throws URLSyntaxException + { + + String url2 = "amqp://ritchiem:bob@temp?brokerlist='tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'"; + //"amqp://user:pass@clientid/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''"; + + //ConnectionURL connectionurl2 = new AMQConnectionURL(url2); + + System.out.println(url2); + //System.out.println(connectionurl2); + + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java new file mode 100644 index 0000000000..fe20a1e8dd --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.transport; + +public interface BrokerDetails +{ + + /* + * Known URL Options + * @see ConnectionURL + */ + public static final String OPTIONS_RETRY = "retries"; + public static final String OPTIONS_SSL = ConnectionURL.OPTIONS_SSL; + public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout"; + public static final int DEFAULT_PORT = 5672; + + public static final String TCP = "tcp"; + public static final String VM = "vm"; + + public static final String DEFAULT_TRANSPORT = TCP; + + public static final String URL_FORMAT_EXAMPLE = + "://[:][?