summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-10-09 17:07:59 +0000
committerRafael H. Schloming <rhs@apache.org>2008-10-09 17:07:59 +0000
commit394823bba7976c170ac58e53b5d80ad12e0f1690 (patch)
tree9b952b30b1b1bcd54c6f1cc453a221328b57c53f /java/client/src
parente78747f63bc73daa6e2035453358e6eaf3237b84 (diff)
downloadqpid-python-394823bba7976c170ac58e53b5d80ad12e0f1690.tar.gz
QPID-1339: refactor of low level client API to permit connections to exist in a disconnected state as well as to provide a central point from which to track session state
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703208 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java98
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java37
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java1
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java24
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/Client.java295
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/ClosedListener.java39
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/Connection.java86
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java137
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/Session.java544
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java163
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java68
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java66
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java54
20 files changed, 180 insertions, 1506 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 27294562e5..ebeb29af78 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -917,7 +917,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// adjust timeout
timeout = adjustTimeout(timeout, startCloseTime);
- _delegate.closeConneciton(timeout);
+ _delegate.closeConnection(timeout);
//If the taskpool hasn't shutdown by now then give it shutdownNow.
// This will interupt any running tasks.
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index 7f36ec6e99..60b827a426 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -42,5 +42,5 @@ public interface AMQConnectionDelegate
public void resubscribeSessions() throws JMSException, AMQException, FailoverException;
- public void closeConneciton(long timeout) throws JMSException, AMQException;
+ public void closeConnection(long timeout) throws JMSException, AMQException;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index c723709d27..a7f04a2090 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -23,6 +23,7 @@ package org.apache.qpid.client;
import java.io.IOException;
+import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.XASession;
@@ -33,15 +34,18 @@ import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.ClosedListener;
import org.apache.qpid.ErrorCode;
-import org.apache.qpid.QpidException;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionClose;
+import org.apache.qpid.transport.ConnectionException;
+import org.apache.qpid.transport.ConnectionListener;
import org.apache.qpid.transport.ProtocolVersionException;
+import org.apache.qpid.transport.TransportException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ClosedListener
+public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ConnectionListener
{
/**
* This class logger.
@@ -56,7 +60,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed
/**
* The QpidConeection instance that is mapped with thie JMS connection.
*/
- org.apache.qpid.nclient.Connection _qpidConnection;
+ org.apache.qpid.transport.Connection _qpidConnection;
//--- constructor
public AMQConnectionDelegate_0_10(AMQConnection conn)
@@ -125,7 +129,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed
*/
public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
{
- _qpidConnection = Client.createConnection();
+ _qpidConnection = new Connection();
try
{
if (_logger.isDebugEnabled())
@@ -134,16 +138,16 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed
.getHost() + " port: " + brokerDetail.getPort() + " virtualhost: " + _conn
.getVirtualHost() + "user name: " + _conn.getUsername() + "password: " + _conn.getPassword());
}
+ _qpidConnection.setConnectionListener(this);
_qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(),
_conn.getUsername(), _conn.getPassword());
- _qpidConnection.setClosedListener(this);
_conn._connected = true;
}
catch(ProtocolVersionException pe)
{
return new ProtocolVersion(pe.getMajor(), pe.getMinor());
}
- catch (QpidException e)
+ catch (ConnectionException e)
{
throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e);
}
@@ -161,34 +165,42 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed
}
- public void closeConneciton(long timeout) throws JMSException, AMQException
+ public void closeConnection(long timeout) throws JMSException, AMQException
{
try
{
_qpidConnection.close();
}
- catch (QpidException e)
+ catch (TransportException e)
{
- throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot close connection", e);
+ throw new AMQException(e.getMessage(), e);
}
-
}
- public void onClosed(ErrorCode errorCode, String reason, Throwable t)
+ public void opened(Connection conn) {}
+
+ public void exception(Connection conn, ConnectionException exc)
{
- if (_logger.isDebugEnabled())
+ ExceptionListener listener = _conn._exceptionListener;
+ if (listener == null)
{
- _logger.debug("Received a connection close from the broker: Error code : " + errorCode.getCode(), t);
+ _logger.error("connection exception: " + conn, exc);
}
- if (_conn._exceptionListener != null)
+ else
{
- JMSException ex = new JMSException(reason,String.valueOf(errorCode.getCode()));
- if (t != null)
+ ConnectionClose close = exc.getClose();
+ String code = null;
+ if (close != null)
{
- ex.initCause(t);
+ code = close.getReplyCode().toString();
}
+ JMSException ex = new JMSException(exc.getMessage(), code);
+ ex.initCause(exc);
_conn._exceptionListener.onException(ex);
}
}
+
+ public void closed(Connection conn) {}
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 2ec8737d16..8d42a2f201 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -58,7 +58,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
private AMQConnection _conn;
- public void closeConneciton(long timeout) throws JMSException, AMQException
+ public void closeConnection(long timeout) throws JMSException, AMQException
{
_conn.getProtocolHandler().closeConnection(timeout);
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 46a667419d..7829966315 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -28,20 +28,27 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.FiledTableSupport;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.UnprocessedMessage_0_10;
import org.apache.qpid.util.Serial;
-import org.apache.qpid.nclient.Session;
-import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
-import org.apache.qpid.ErrorCode;
-import org.apache.qpid.QpidException;
+import org.apache.qpid.transport.ExecutionException;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.ExchangeBoundResult;
import org.apache.qpid.transport.Future;
+import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.qpid.transport.Option.*;
+
import javax.jms.*;
import javax.jms.IllegalStateException;
@@ -53,6 +60,7 @@ import java.util.Map;
* This is a 0.10 Session
*/
public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, BasicMessageProducer_0_10>
+ implements SessionListener
{
/**
@@ -70,10 +78,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* The latest qpid Exception that has been reaised.
*/
private Object _currentExceptionLock = new Object();
- private QpidException _currentException;
+ private SessionException _currentException;
// a ref on the qpid connection
- protected org.apache.qpid.nclient.Connection _qpidConnection;
+ protected org.apache.qpid.transport.Connection _qpidConnection;
private RangeSet unacked = new RangeSet();
private int unackedCount = 0;
@@ -97,7 +105,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
* @param qpidConnection The qpid connection
*/
- AMQSession_0_10(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId,
+ AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry,
int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
@@ -108,7 +116,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
// create the qpid session with an expiry <= 0 so that the session does not expire
_qpidSession = qpidConnection.createSession(0);
// set the exception listnere for this session
- _qpidSession.setClosedListener(new QpidSessionExceptionListener());
+ _qpidSession.setSessionListener(this);
// set transacted if required
if (_transacted)
{
@@ -127,7 +135,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
* @param qpidConnection The connection
*/
- AMQSession_0_10(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId,
+ AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
{
@@ -195,12 +203,26 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
if (unackedCount > 0)
{
- getQpidSession().messageAcknowledge
+ messageAcknowledge
(unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
clearUnacked();
}
}
+ void messageAcknowledge(RangeSet ranges, boolean accept)
+ {
+ Session ssn = getQpidSession();
+ for (Range range : ranges)
+ {
+ ssn.processed(range);
+ }
+ ssn.flushProcessed(accept ? BATCH : NONE);
+ if (accept)
+ {
+ ssn.messageAccept(ranges);
+ }
+ }
+
/**
* Bind a queue with an exchange.
*
@@ -416,11 +438,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
preAcquire = ( ! consumer.isNoConsume() &&
(consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")) )
|| !(consumer.getDestination() instanceof AMQQueue);
- getQpidSession().messageSubscribe(queueName.toString(), String.valueOf(tag),
- getAcknowledgeMode() == NO_ACKNOWLEDGE ? Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED,
- preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE,
- (BasicMessageConsumer_0_10) consumer, null,
- consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+ getQpidSession().messageSubscribe
+ (queueName.toString(), String.valueOf(tag),
+ getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
+ preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, null,
+ consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
catch (JMSException e)
{
@@ -598,7 +620,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
*
* @return The associated Qpid Session.
*/
- protected org.apache.qpid.nclient.Session getQpidSession()
+ protected Session getQpidSession()
{
return _qpidSession;
}
@@ -615,31 +637,41 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
if (_currentException != null)
{
- QpidException toBeThrown = _currentException;
+ SessionException se = _currentException;
_currentException = null;
- throw new AMQException(AMQConstant.getConstant(toBeThrown.getErrorCode().getCode()),
- toBeThrown.getMessage(), toBeThrown);
+ ExecutionException ee = se.getException();
+ int code;
+ if (ee == null)
+ {
+ code = 0;
+ }
+ else
+ {
+ code = ee.getErrorCode().getValue();
+ }
+ throw new AMQException
+ (AMQConstant.getConstant(code), se.getMessage(), se);
}
}
}
- //------ Inner classes
- /**
- * Lstener for qpid protocol exceptions
- */
- private class QpidSessionExceptionListener implements org.apache.qpid.nclient.ClosedListener
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
{
- public void onClosed(ErrorCode errorCode, String reason, Throwable t)
+ messageReceived(new UnprocessedMessage_0_10(xfr));
+ }
+
+ public void exception(Session ssn, SessionException exc)
+ {
+ synchronized (_currentExceptionLock)
{
- synchronized (_currentExceptionLock)
- {
- // todo check the error code for finding out if we need to notify the
- // JMS connection exception listener
- _currentException = new QpidException(reason, errorCode, t);
- }
+ _currentException = exc;
}
}
+ public void closed(Session ssn) {}
+
protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
final boolean noLocal)
throws AMQException
@@ -776,7 +808,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
_connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0)
{
// send completed so consumer credits don't dry up
- getQpidSession().messageAcknowledge(_txRangeSet, false);
+ messageAcknowledge(_txRangeSet, false);
}
}
@@ -787,7 +819,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
if( _txSize > 0 )
{
- getQpidSession().messageAcknowledge(_txRangeSet, true);
+ messageAcknowledge(_txRangeSet, true);
_txRangeSet.clear();
_txSize = 0;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 2a37298a43..7d535643c0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
* This is a 0.10 message consumer.
*/
public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedMessage_0_10>
- implements org.apache.qpid.nclient.MessagePartListener
{
/**
@@ -114,9 +113,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return _consumerTagString;
}
-
- // ----- Interface org.apache.qpid.client.util.MessageListener
-
/**
*
* This is invoked by the session thread when emptying the session message queue.
@@ -159,28 +155,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
-
-
- /**
- * This method is invoked by the transport layer when a message is delivered for this
- * consumer. The message is transformed and pass to the session.
- * @param xfr an 0.10 message transfer
- */
- public void messageTransfer(MessageTransfer xfr)
-
- //public void onMessage(Message message)
- {
- int channelId = getSession().getChannelId();
- int consumerTag = getConsumerTag();
-
- UnprocessedMessage_0_10 newMessage =
- new UnprocessedMessage_0_10(consumerTag, xfr);
-
-
- getSession().messageReceived(newMessage);
- // else ignore this message
- }
-
//----- overwritten methods
/**
@@ -304,8 +278,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
RangeSet ranges = new RangeSet();
ranges.add((int) message.getDeliveryTag());
- _0_10session.getQpidSession().messageAcknowledge(ranges,
- _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE );
+ _0_10session.messageAcknowledge
+ (ranges,
+ _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
_0_10session.getCurrentException();
}
}
@@ -425,10 +400,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
void postDeliver(AbstractJMSMessage msg) throws JMSException
{
super.postDeliver(msg);
- if(_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery())
+ if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery())
{
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index ef8aeb58a1..4e5077f0cd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -36,7 +36,6 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.util.Strings;
-import org.apache.qpid.nclient.util.ByteBufferMessage;
import org.apache.qpid.njms.ExceptionHelper;
import org.apache.qpid.transport.*;
import static org.apache.qpid.transport.Option.*;
diff --git a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
index 6763c72ecd..35adda9348 100644
--- a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
@@ -88,8 +88,7 @@ public class XAResourceImpl implements XAResource
{
// we need to restore the qpid session that has been closed
_xaSession.createSession();
- // we should get a single exception
- convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
+ convertExecutionErrorToXAErr(e.getException().getErrorCode());
}
checkStatus(result.getStatus());
}
@@ -142,8 +141,7 @@ public class XAResourceImpl implements XAResource
{
// we need to restore the qpid session that has been closed
_xaSession.createSession();
- // we should get a single exception
- convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
+ convertExecutionErrorToXAErr(e.getException().getErrorCode());
}
checkStatus(result.getStatus());
}
@@ -171,8 +169,7 @@ public class XAResourceImpl implements XAResource
{
// we need to restore the qpid session that has been closed
_xaSession.createSession();
- // we should get a single exception
- convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
+ convertExecutionErrorToXAErr(e.getException().getErrorCode());
}
}
@@ -201,8 +198,7 @@ public class XAResourceImpl implements XAResource
{
// we need to restore the qpid session that has been closed
_xaSession.createSession();
- // we should get a single exception
- convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
+ convertExecutionErrorToXAErr(e.getException().getErrorCode());
}
}
return result;
@@ -248,8 +244,7 @@ public class XAResourceImpl implements XAResource
{
// we need to restore the qpid session that has been closed
_xaSession.createSession();
- // we should get a single exception
- convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
+ convertExecutionErrorToXAErr(e.getException().getErrorCode());
}
DtxXaStatus status = result.getStatus();
int outcome = XAResource.XA_OK;
@@ -291,8 +286,7 @@ public class XAResourceImpl implements XAResource
{
// we need to restore the qpid session that has been closed
_xaSession.createSession();
- // we should get a single exception
- convertExecutionErrorToXAErr( e.getExceptions().get(0).getErrorCode());
+ convertExecutionErrorToXAErr( e.getException().getErrorCode());
}
Xid[] result = new Xid[res.getInDoubt().size()];
int i = 0;
@@ -329,8 +323,7 @@ public class XAResourceImpl implements XAResource
{
// we need to restore the qpid session that has been closed
_xaSession.createSession();
- // we should get a single exception
- convertExecutionErrorToXAErr( e.getExceptions().get(0).getErrorCode());
+ convertExecutionErrorToXAErr( e.getException().getErrorCode());
}
checkStatus(result.getStatus());
}
@@ -413,8 +406,7 @@ public class XAResourceImpl implements XAResource
{
// we need to restore the qpid session that has been closed
_xaSession.createSession();
- // we should get a single exception
- convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
+ convertExecutionErrorToXAErr(e.getException().getErrorCode());
// TODO: The amqp spec does not allow to make the difference
// between an already known XID and a wrong arguments (join and resume are set)
// TODO: make sure amqp addresses that
diff --git a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
index 51b4b7899f..354b67cd35 100644
--- a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
@@ -17,7 +17,6 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.nclient.DtxSession;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import javax.jms.*;
@@ -36,7 +35,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
/**
* This XASession Qpid DtxSession
*/
- private DtxSession _qpidDtxSession;
+ private org.apache.qpid.transport.Session _qpidDtxSession;
/**
* The standard session
@@ -48,7 +47,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
/**
* Create a JMS XASession
*/
- public XASessionImpl(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId,
+ public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
int defaultPrefetchHigh, int defaultPrefetchLow)
{
super(qpidConnection, con, channelId, false, // this is not a transacted session
@@ -65,7 +64,9 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
*/
public void createSession()
{
- _qpidDtxSession = _qpidConnection.createDTXSession(0);
+ _qpidDtxSession = _qpidConnection.createSession(0);
+ _qpidDtxSession.setSessionListener(this);
+ _qpidDtxSession.dtxSelect();
}
@@ -126,7 +127,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
*
* @return The associated Qpid Session.
*/
- protected org.apache.qpid.nclient.DtxSession getQpidSession()
+ protected org.apache.qpid.transport.Session getQpidSession()
{
return _qpidDtxSession;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index 8b4488f1f9..d064c27754 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -29,7 +29,6 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQPInvalidClassException;
-import org.apache.qpid.nclient.*;
import org.apache.qpid.jms.Message;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.AMQBindingURL;
@@ -138,7 +137,7 @@ public class AMQMessageDelegate_0_10 implements AMQMessageDelegate
}
- public static void updateExchangeTypeMapping(Header header, org.apache.qpid.nclient.Session session)
+ public static void updateExchangeTypeMapping(Header header, org.apache.qpid.transport.Session session)
{
DeliveryProperties deliveryProps = header.get(DeliveryProperties.class);
if(deliveryProps != null)
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
index 6b1301a33f..f31bc88509 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
@@ -33,9 +33,9 @@ public class UnprocessedMessage_0_10 extends UnprocessedMessage
{
private MessageTransfer _transfer;
- public UnprocessedMessage_0_10(int consumerTag, MessageTransfer xfr)
+ public UnprocessedMessage_0_10(MessageTransfer xfr)
{
- super(consumerTag);
+ super(Integer.parseInt(xfr.getDestination()));
_transfer = xfr;
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Client.java b/java/client/src/main/java/org/apache/qpid/nclient/Client.java
deleted file mode 100644
index af0e724e42..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/Client.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.nclient;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.qpid.client.url.URLParser_0_10;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.url.QpidURL;
-import org.apache.qpid.ErrorCode;
-import org.apache.qpid.QpidException;
-import org.apache.qpid.nclient.impl.ClientSession;
-import org.apache.qpid.nclient.impl.ClientSessionDelegate;
-import org.apache.qpid.transport.Channel;
-import org.apache.qpid.transport.ClientDelegate;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionClose;
-import org.apache.qpid.transport.ConnectionCloseCode;
-import org.apache.qpid.transport.ConnectionCloseOk;
-import org.apache.qpid.transport.ProtocolHeader;
-import org.apache.qpid.transport.ProtocolVersionException;
-import org.apache.qpid.transport.SessionDelegate;
-import org.apache.qpid.transport.network.io.IoTransport;
-import org.apache.qpid.transport.network.mina.MinaHandler;
-import org.apache.qpid.transport.network.nio.NioHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class Client implements org.apache.qpid.nclient.Connection
-{
- private Connection _conn;
- private ClosedListener _closedListner;
- private final Lock _lock = new ReentrantLock();
- private static Logger _logger = LoggerFactory.getLogger(Client.class);
- private Condition closeOk;
- private boolean closed = false;
- private long timeout = 60000;
-
- private ProtocolHeader header = null;
-
- /**
- *
- * @return returns a new connection to the broker.
- */
- public static org.apache.qpid.nclient.Connection createConnection()
- {
- return new Client();
- }
-
- public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException
- {
-
- final Condition negotiationComplete = _lock.newCondition();
- closeOk = _lock.newCondition();
- _lock.lock();
-
- ClientDelegate connectionDelegate = new ClientDelegate()
- {
- private boolean receivedClose = false;
- public SessionDelegate getSessionDelegate()
- {
- return new ClientSessionDelegate();
- }
-
- public void exception(Throwable t)
- {
- if (_closedListner != null)
- {
- _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),t);
- }
- else
- {
- throw new RuntimeException("connection closed",t);
- }
- }
-
- public void closed()
- {
- if (_closedListner != null && !this.receivedClose)
- {
- _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),null);
- }
- }
-
- @Override public void connectionCloseOk(Channel context, ConnectionCloseOk struct)
- {
- _lock.lock();
- try
- {
- closed = true;
- this.receivedClose = true;
- closeOk.signalAll();
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- @Override public void connectionClose(Channel context, ConnectionClose connectionClose)
- {
- super.connectionClose(context, connectionClose);
- ErrorCode errorCode = ErrorCode.get(connectionClose.getReplyCode().getValue());
- if (_closedListner == null && errorCode != ErrorCode.NO_ERROR)
- {
- throw new RuntimeException
- (new QpidException("Server closed the connection: Reason " +
- connectionClose.getReplyText(),
- errorCode,
- null));
- }
- else
- {
- _closedListner.onClosed(errorCode, connectionClose.getReplyText(),null);
- }
-
- this.receivedClose = true;
- }
- @Override public void init(Channel ch, ProtocolHeader hdr)
- {
- // TODO: once the merge is done we'll need to update this code
- // for handling 0.8 protocol version type i.e. major=8 and mino
- if (hdr.getMajor() != 0 || hdr.getMinor() != 10)
- {
- Client.this.header = hdr;
- _lock.lock();
- negotiationComplete.signalAll();
- _lock.unlock();
- }
- }
- };
-
- connectionDelegate.setCondition(_lock,negotiationComplete);
- connectionDelegate.setUsername(username);
- connectionDelegate.setPassword(password);
- connectionDelegate.setVirtualHost(virtualHost);
-
- String transport = System.getProperty("transport","io");
- if (transport.equalsIgnoreCase("nio"))
- {
- _logger.info("using NIO Transport");
- _conn = NioHandler.connect(host, port,connectionDelegate);
- }
- else if (transport.equalsIgnoreCase("io"))
- {
- _logger.info("using Plain IO Transport");
- _conn = IoTransport.connect(host, port,connectionDelegate);
- }
- else
- {
- _logger.info("using MINA Transport");
- _conn = MinaHandler.connect(host, port,connectionDelegate);
- // _conn = NativeHandler.connect(host, port,connectionDelegate);
- }
-
- // XXX: hardcoded version numbers
- _conn.send(new ProtocolHeader(1, 0, 10));
-
- try
- {
- negotiationComplete.await(timeout, TimeUnit.MILLISECONDS);
- if (header != null)
- {
- _conn.close();
- throw new ProtocolVersionException(header.getMajor(), header.getMinor());
- }
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- public void connect(String url)throws QpidException
- {
- URLParser_0_10 parser = null;
- try
- {
- parser = new URLParser_0_10(url);
- }
- catch(Exception e)
- {
- throw new QpidException("Error parsing the URL",ErrorCode.UNDEFINED,e);
- }
- List<BrokerDetails> brokers = parser.getAllBrokerDetails();
- BrokerDetails brokerDetail = brokers.get(0);
- connect(brokerDetail.getHost(), brokerDetail.getPort(), brokerDetail.getProperty("virtualhost"),
- brokerDetail.getProperty("username")== null? "guest":brokerDetail.getProperty("username"),
- brokerDetail.getProperty("password")== null? "guest":brokerDetail.getProperty("password"));
- }
-
- /*
- * Until the dust settles with the URL disucssion
- * I am not going to implement this.
- */
- public void connect(QpidURL url) throws QpidException
- {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- /* {
- // temp impl to tests
- BrokerDetails details = url.getAllBrokerDetails().get(0);
- connect(details.getHost(),
- details.getPort(),
- details.getVirtualHost(),
- details.getUserName(),
- details.getPassword());
- }
-*/
-
- public void close() throws QpidException
- {
- Channel ch = _conn.getChannel(0);
- ch.connectionClose(ConnectionCloseCode.NORMAL, "client is closing");
- _lock.lock();
- try
- {
- try
- {
- long start = System.currentTimeMillis();
- long elapsed = 0;
- while (!closed && elapsed < timeout)
- {
- closeOk.await(timeout - elapsed, TimeUnit.MILLISECONDS);
- elapsed = System.currentTimeMillis() - start;
- }
- if(!closed)
- {
- throw new QpidException("Timed out when closing connection", ErrorCode.CONNECTION_ERROR, null);
- }
- }
- catch (InterruptedException e)
- {
- throw new QpidException("Interrupted when closing connection", ErrorCode.CONNECTION_ERROR, null);
- }
- }
- finally
- {
- _lock.unlock();
- }
- _conn.close();
- }
-
- public Session createSession(long expiryInSeconds)
- {
- Channel ch = _conn.getChannel();
- ClientSession ssn = new ClientSession(UUID.randomUUID().toString().getBytes());
- ssn.attach(ch);
- ssn.sessionAttach(ssn.getName());
- ssn.sessionRequestTimeout(expiryInSeconds);
- return ssn;
- }
-
- public DtxSession createDTXSession(int expiryInSeconds)
- {
- ClientSession clientSession = (ClientSession) createSession(expiryInSeconds);
- clientSession.dtxSelect();
- return (DtxSession) clientSession;
- }
-
- public void setClosedListener(ClosedListener closedListner)
- {
-
- _closedListner = closedListner;
- }
-
-}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/ClosedListener.java b/java/client/src/main/java/org/apache/qpid/nclient/ClosedListener.java
deleted file mode 100644
index 4cf0cab1ec..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/ClosedListener.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.nclient;
-
-import org.apache.qpid.ErrorCode;
-
-
-/**
- * If the communication layer detects a serious problem with a <CODE>connection</CODE>, it
- * informs the connection's ExceptionListener
- */
-public interface ClosedListener
-{
- /**
- * If the communication layer detects a serious problem with a connection, it
- * informs the connection's ExceptionListener
- * @param errorCode TODO
- * @param reason TODO
- * @param t TODO
- * @see Connection
- */
- public void onClosed(ErrorCode errorCode, String reason, Throwable t);
-} \ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Connection.java b/java/client/src/main/java/org/apache/qpid/nclient/Connection.java
deleted file mode 100644
index 2d5b50b33a..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/Connection.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.nclient;
-
-import org.apache.qpid.QpidException;
-
-/**
- * This represents a physical connection to a broker.
- */
-public interface Connection
-{
- /**
- * Establish the connection using the given parameters
- *
- * @param host host name
- * @param port port number
- * @param virtualHost the virtual host name
- * @param username user name
- * @param password password
- * @throws QpidException If the communication layer fails to establish the connection.
- */
- public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException;
-
- /**
- * Establish the connection with the broker identified by the URL.
- *
- * @param url Specifies the URL of the broker.
- * @throws QpidException If the communication layer fails to connect with the broker, an exception is thrown.
- */
- public void connect(String url) throws QpidException;
-
- /**
- * Close this connection.
- *
- * @throws QpidException if the communication layer fails to close the connection.
- */
- public void close() throws QpidException;
-
- /**
- * Create a session for this connection.
- * <p> The returned session is suspended
- * (i.e. this session is not attached to an underlying channel)
- *
- * @param expiryInSeconds Expiry time expressed in seconds, if the value is less than
- * or equal to 0 then the session does not expire.
- * @return A newly created (suspended) session.
- */
- public Session createSession(long expiryInSeconds);
-
- /**
- * Create a DtxSession for this connection.
- * <p> A Dtx Session must be used when resources have to be manipulated as
- * part of a global transaction.
- * <p> The retuned DtxSession is suspended
- * (i.e. this session is not attached with an underlying channel)
- *
- * @param expiryInSeconds Expiry time expressed in seconds, if the value is less than or equal
- * to 0 then the session does not expire.
- * @return A newly created (suspended) DtxSession.
- */
- public DtxSession createDTXSession(int expiryInSeconds);
-
- /**
- * If the communication layer detects a serious problem with a connection, it
- * informs the connection's ClosedListener
- *
- * @param exceptionListner The ClosedListener
- */
- public void setClosedListener(ClosedListener exceptionListner);
-}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java b/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java
deleted file mode 100644
index 8a859f2d84..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.nclient;
-
-import org.apache.qpid.transport.Future;
-import org.apache.qpid.transport.GetTimeoutResult;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.RecoverResult;
-import org.apache.qpid.transport.XaResult;
-import org.apache.qpid.transport.Xid;
-
-/**
- * The resources for this session are controlled under the scope of a distributed transaction.
- */
-public interface DtxSession extends Session
-{
-
- /**
- * This method is called when messages should be produced and consumed on behalf a transaction
- * branch identified by xid.
- * possible options are:
- * <ul>
- * <li> {@link Option#JOIN}: Indicate that the start applies to joining a transaction previously seen.
- * <li> {@link Option#RESUME}: Indicate that the start applies to resuming a suspended transaction branch specified.
- * </ul>
- *
- * @param xid Specifies the xid of the transaction branch to be started.
- * @param options Possible options are: {@link Option#JOIN} and {@link Option#RESUME}.
- * @return Confirms to the client that the transaction branch is started or specify the error condition.
- */
- public Future<XaResult> dtxStart(Xid xid, Option... options);
-
- /**
- * This method is called when the work done on behalf of a transaction branch finishes or needs to
- * be suspended.
- * possible options are:
- * <ul>
- * <li> {@link Option#FAIL}: indicates that this portion of work has failed;
- * otherwise this portion of work has
- * completed successfully.
- * <li> {@link Option#SUSPEND}: Indicates that the transaction branch is
- * temporarily suspended in an incomplete state.
- * </ul>
- *
- * @param xid Specifies the xid of the transaction branch to be ended.
- * @param options Available options are: {@link Option#FAIL} and {@link Option#SUSPEND}.
- * @return Confirms to the client that the transaction branch is ended or specifies the error condition.
- */
- public Future<XaResult> dtxEnd(Xid xid, Option... options);
-
- /**
- * Commit the work done on behalf of a transaction branch. This method commits the work associated
- * with xid. Any produced messages are made available and any consumed messages are discarded.
- * The only possible option is:
- * <ul>
- * <li> {@link Option#ONE_PHASE}: When set, one-phase commit optimization is used.
- * </ul>
- *
- * @param xid Specifies the xid of the transaction branch to be committed.
- * @param options Available option is: {@link Option#ONE_PHASE}
- * @return Confirms to the client that the transaction branch is committed or specifies the error condition.
- */
- public Future<XaResult> dtxCommit(Xid xid, Option... options);
-
- /**
- * This method is called to forget about a heuristically completed transaction branch.
- *
- * @param xid Specifies the xid of the transaction branch to be forgotten.
- */
- public void dtxForget(Xid xid, Option ... options);
-
- /**
- * This method obtains the current transaction timeout value in seconds. If set-timeout was not
- * used prior to invoking this method, the return value is the default timeout value; otherwise, the
- * value used in the previous set-timeout call is returned.
- *
- * @param xid Specifies the xid of the transaction branch used for getting the timeout.
- * @return The current transaction timeout value in seconds.
- */
- public Future<GetTimeoutResult> dtxGetTimeout(Xid xid, Option ... options);
-
- /**
- * This method prepares any message produced or consumed on behalf of xid, ready for commitment.
- *
- * @param xid Specifies the xid of the transaction branch to be prepared.
- * @return The status of the prepare operation can be any one of:
- * xa-ok: Normal execution.
- * <p/>
- * xa-rdonly: The transaction branch was read-only and has been committed.
- * <p/>
- * xa-rbrollback: The broker marked the transaction branch rollback-only for an unspecified
- * reason.
- * <p/>
- * xa-rbtimeout: The work represented by this transaction branch took too long.
- */
- public Future<XaResult> dtxPrepare(Xid xid, Option ... options);
-
- /**
- * This method is called to obtain a list of transaction branches that are in a prepared or
- * heuristically completed state.
- * @return a array of xids to be recovered.
- */
- public Future<RecoverResult> dtxRecover(Option ... options);
-
- /**
- * This method rolls back the work associated with xid. Any produced messages are discarded and
- * any consumed messages are re-queued.
- *
- * @param xid Specifies the xid of the transaction branch to be rolled back.
- * @return Confirms to the client that the transaction branch is rolled back or specifies the error condition.
- */
- public Future<XaResult> dtxRollback(Xid xid, Option ... options);
-
- /**
- * Sets the specified transaction branch timeout value in seconds.
- *
- * @param xid Specifies the xid of the transaction branch for setting the timeout.
- * @param timeout The transaction timeout value in seconds.
- */
- public void dtxSetTimeout(Xid xid, long timeout, Option ... options);
-}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/Session.java
deleted file mode 100644
index 0d84394c7c..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/Session.java
+++ /dev/null
@@ -1,544 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.nclient;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-import org.apache.qpid.transport.*;
-import org.apache.qpid.api.Message;
-
-/**
- * <p>A session is associated with a connection.
- * When it is created, a session is not associated with an underlying channel.
- * The session is single threaded. </p>
- * <p/>
- * All the Session commands are asynchronous. Synchronous behavior is achieved through invoking the sync method.
- * For example, <code>command1</code> will be synchronously invoked by using the following sequence:
- * <ul>
- * <li> <code>session.command1()</code>
- * <li> <code>session.sync()</code>
- * </ul>
- */
-public interface Session
-{
- public static final short TRANSFER_ACQUIRE_MODE_NO_ACQUIRE = 1;
- public static final short TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE = 0;
- public static final short TRANSFER_CONFIRM_MODE_REQUIRED = 0;
- public static final short TRANSFER_CONFIRM_MODE_NOT_REQUIRED = 1;
- public static final short MESSAGE_FLOW_MODE_CREDIT = 0;
- public static final short MESSAGE_FLOW_MODE_WINDOW = 1;
- public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0;
- public static final short MESSAGE_FLOW_UNIT_BYTE = 1;
- public static final long MESSAGE_FLOW_MAX_BYTES = 0xFFFFFFFF;
- public static final short MESSAGE_REJECT_CODE_GENERIC = 0;
- public static final short MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED = 1;
- public static final short MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE = 0;
- public static final short MESSAGE_ACQUIRE_MESSAGES_IF_ALL_ARE_AVAILABLE = 1;
-
- //------------------------------------------------------
- // Session housekeeping methods
- //------------------------------------------------------
-
- /**
- * Sync method will block the session until all outstanding commands
- * are executed.
- */
- public void sync();
-
- public void close();
-
- public void sessionDetach(byte[] name, Option ... options);
-
- public void sessionRequestTimeout(long expiry, Option ... options);
-
- public byte[] getName();
-
- public void setAutoSync(boolean value);
-
- //------------------------------------------------------
- // Messaging methods
- // Producer
- //------------------------------------------------------
- /**
- * Transfer a message to a specified exchange.
- * <p/>
- * <p>This transfer provides a complete message
- * using a single method. The method is internally mapped to messageTransfer() and headers() followed
- * by data() and endData().
- * <b><i>This method should only be used by small messages.</b></i></p>
- *
- * @param destination The exchange the message is being sent to.
- * @param msg The Message to be sent.
- * @param confirmMode <ul> </li>off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation
- * is not required. Once a message has been transferred in pre-acquire
- * mode (or once acquire has been sent in no-acquire mode) the message is considered
- * transferred.
- * <p/>
- * <li> on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message
- * is not considered transferred until the original
- * transfer is complete. A complete transfer is signaled by execution.complete.
- * </ul>
- * @param acquireMode <ul>
- * <li> no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message
- * must be explicitly acquired.
- * <li> pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message is
- * acquired when the transfer starts.
- * </ul>
- * @throws java.io.IOException If transferring a message fails due to some internal communication error, an exception is thrown.
- */
- public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode)
- throws IOException;
-
-
- /**
- * This command transfers a message between two peers.
- *
- * @param destination Specifies the destination to which the message is to be transferred.
- * @param acceptMode Indicates whether message.accept, session.complete,
- * or nothing at all is required to indicate successful transfer of the message.
- *
- * @param acquireMode Indicates whether or not the transferred message has been acquired.
- */
- public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode,
- Header header, ByteBuffer body, Option ... options);
-
- /**
- * This command transfers a message between two peers.
- *
- * @param destination Specifies the destination to which the message is to be transferred.
- * @param acceptMode Indicates whether message.accept, session.complete,
- * or nothing at all is required to indicate successful transfer of the message.
- *
- * @param acquireMode Indicates whether or not the transferred message has been acquired.
- */
- public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode,
- Header header, byte[] body, Option ... options);
-
- /**
- * This command transfers a message between two peers.
- *
- * @param destination Specifies the destination to which the message is to be transferred.
- * @param acceptMode Indicates whether message.accept, session.complete,
- * or nothing at all is required to indicate successful transfer of the message.
- *
- * @param acquireMode Indicates whether or not the transferred message has been acquired.
- */
- public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode,
- Header header, String body, Option ... options);
-
- //------------------------------------------------------
- // Messaging methods
- // Consumer
- //------------------------------------------------------
-
- /**
- * Associate a message listener with a destination.
- * <p> The destination is bound to a queue, and messages are filtered based
- * on the provider filter map (message filtering is specific to the provider and in some cases might not be handled).
- * <p> The valid options are:
- * <ul>
- * <li>{@link Option#EXCLUSIVE}: <p> Requests exclusive subscription access, so that only this
- * subscription can access the queue.
- * <li>{@link Option#NONE}: <p> This is an empty option, and has no effect.
- * </ul>
- *
- * @param queue The queue that the receiver is receiving messages from.
- * @param destination The destination, or delivery tag, for the subscriber.
- * @param confirmMode <ul> </li>off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation
- * is not required. Once a message has been transferred in pre-acquire
- * mode (or once acquire has been sent in no-acquire mode) the message is considered
- * transferred.
- * <p/>
- * <li> on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message
- * is not considered transferred until the original
- * transfer is complete. A complete transfer is signaled by execution.complete.
- * </ul>
- * @param acquireMode <ul>
- * <li> no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message must
- * be explicitly acquired.
- * <li> pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message is
- * acquired when the transfer starts.
- * </ul>
- * @param listener The listener for this destination. To transfer large messages
- * use a {@link org.apache.qpid.nclient.MessagePartListener}.
- * @param options Set of options. Valid options are {{@link Option#EXCLUSIVE}
- * and {@link Option#NONE}.
- * @param filter A set of filters for the subscription. The syntax and semantics of these filters varies
- * according to the provider's implementation.
- */
- public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode,
- MessagePartListener listener, Map<String, Object> filter, Option... options);
-
- /**
- * This method cancels a consumer. The server will not send any more messages to the specified destination.
- * This does not affect already delivered messages.
- * The client may receive a
- * number of messages in between sending the cancel method and receiving
- * notification that the cancellation has been completed.
- *
- * @param destination The destination to be cancelled.
- */
- public void messageCancel(String destination, Option ... options);
-
- /**
- * Associate a message listener with a destination.
- * <p> Only one listener is permitted for each destination. When a new listener is created,
- * it replaces the previous message listener. To prevent message loss, this occurs only when the original listener
- * has completed processing a message.
- *
- * @param destination The destination the listener is associated with.
- * @param listener The new listener for this destination.
- */
- public void setMessageListener(String destination, MessagePartListener listener);
-
- /**
- * Sets the mode of flow control used for a given destination.
- * <p> With credit based flow control, the broker continually maintains its current
- * credit balance with the recipient. The credit balance consists of two values, a message
- * count, and a byte count. Whenever message data is sent, both counts must be decremented.
- * If either value reaches zero, the flow of message data must stop. Additional credit is
- * received via the {@link Session#messageFlow} method.
- * <p> Window based flow control is identical to credit based flow control, however message
- * acknowledgment implicitly grants a single unit of message credit, and the size of the
- * message in byte credits for each acknowledged message.
- *
- * @param destination The destination to set the flow mode on.
- * @param mode <ul> <li>credit ({@link Session#MESSAGE_FLOW_MODE_CREDIT}): choose credit based flow control
- * <li> window ({@link Session#MESSAGE_FLOW_MODE_WINDOW}): choose window based flow control</ul>
- */
- public void messageSetFlowMode(String destination, MessageFlowMode mode, Option ... options);
-
-
- /**
- * This method controls the flow of message data to a given destination. It is used by the
- * recipient of messages to dynamically match the incoming rate of message flow to its
- * processing or forwarding capacity. Upon receipt of this method, the sender must add "value"
- * number of the specified unit to the available credit balance for the specified destination.
- * A value of 0 indicates an infinite amount of credit. This disables any limit for
- * the given unit until the credit balance is zeroed with {@link Session#messageStop}
- * or {@link Session#messageFlush}.
- *
- * @param destination The destination to set the flow.
- * @param unit Specifies the unit of credit balance.
- * <p/>
- * One of: <ul>
- * <li> message ({@link Session#MESSAGE_FLOW_UNIT_MESSAGE})
- * <li> byte ({@link Session#MESSAGE_FLOW_UNIT_BYTE})
- * </ul>
- * @param value Number of credits, a value of 0 indicates an infinite amount of credit.
- */
- public void messageFlow(String destination, MessageCreditUnit unit, long value, Option ... options);
-
- /**
- * Forces the broker to exhaust its credit supply.
- * <p> The credit on the broker will remain at zero once
- * this method is completed.
- *
- * @param destination The destination on which the credit supply is to be exhausted.
- */
- public void messageFlush(String destination, Option ... options);
-
- /**
- * On receipt of this method, the brokers set credit to zero for a given
- * destination. When confirmation of this method
- * is issued credit is set to zero. No further messages will be sent until
- * further credit is received.
- *
- * @param destination The destination on which to reset credit.
- */
- public void messageStop(String destination, Option ... options);
-
- /**
- * Acknowledge the receipt of a range of messages.
- * <p>Messages must already be acquired, either by receiving them in
- * pre-acquire mode or by explicitly acquiring them.
- *
- * @param ranges Range of messages to be acknowledged.
- * @param accept pecify whether to send a message accept to the broker
- */
- public void messageAcknowledge(RangeSet ranges, boolean accept);
-
- /**
- * Reject a range of acquired messages.
- * <p>The broker will deliver rejected messages to the
- * alternate-exchange on the queue from which it came. If no alternate-exchange is
- * defined for that queue the broker will discard the message.
- *
- * @param ranges Range of messages to be rejected.
- * @param code The reject code must be one of {@link Session#MESSAGE_REJECT_CODE_GENERIC} or
- * {@link Session#MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED} (immediate delivery was attempted but
- * failed).
- * @param text String describing the reason for a message transfer rejection.
- */
- public void messageReject(RangeSet ranges, MessageRejectCode code, String text, Option ... options);
-
- /**
- * As it is possible that the broker does not manage to reject some messages, after completion of
- * {@link Session#messageReject} this method will return the ranges of rejected messages.
- * <p> Note that {@link Session#messageReject} and this methods are asynchronous therefore for accessing to the
- * previously rejected messages this method must be invoked in conjunction with {@link Session#sync()}.
- * <p> A recommended invocation sequence would be:
- * <ul>
- * <li> {@link Session#messageReject}
- * <li> {@link Session#sync()}
- * <li> {@link Session#getRejectedMessages()}
- * </ul>
- *
- * @return The rejected message ranges
- */
- public RangeSet getRejectedMessages();
-
- /**
- * Try to acquire ranges of messages hence releasing them form the queue.
- * This means that once acknowledged, a message will not be delivered to any other receiver.
- * <p> As those messages may have been consumed by another receivers hence,
- * message acquisition can fail.
- * The outcome of the acquisition is returned as an array of ranges of qcquired messages.
- * <p> This method should only be called on non-acquired messages.
- *
- * @param ranges Ranges of messages to be acquired.
- * @return Indicates the acquired messages
- */
- public Future<Acquired> messageAcquire(RangeSet ranges, Option ... options);
-
- /**
- * Give up responsibility for processing ranges of messages.
- * <p> Released messages are re-enqueued.
- *
- * @param ranges Ranges of messages to be released.
- * @param options Valid option is: {@link Option#SET_REDELIVERED})
- */
- public void messageRelease(RangeSet ranges, Option ... options);
-
- // -----------------------------------------------
- // Local transaction methods
- // ----------------------------------------------
- /**
- * Selects the session for local transaction support.
- */
- public void txSelect(Option ... options);
-
- /**
- * Commit the receipt and delivery of all messages exchanged by this session's resources.
- *
- * @throws IllegalStateException If this session is not transacted, an exception will be thrown.
- */
- public void txCommit(Option ... options) throws IllegalStateException;
-
- /**
- * Roll back the receipt and delivery of all messages exchanged by this session's resources.
- *
- * @throws IllegalStateException If this session is not transacted, an exception will be thrown.
- */
- public void txRollback(Option ... options) throws IllegalStateException;
-
- //---------------------------------------------
- // Queue methods
- //---------------------------------------------
-
- /**
- * Declare a queue with the given queueName
- * <p> Following are the valid options:
- * <ul>
- * <li> {@link Option#AUTO_DELETE}: <p> If this field is set and the exclusive field is also set,
- * then the queue is deleted when the connection closes.
- * If this field is set and the exclusive field is not set the queue is deleted when all
- * the consumers have finished using it.
- * <li> {@link Option#DURABLE}: <p> If set when creating a new queue,
- * the queue will be marked as durable. Durable queues
- * remain active when a server restarts. Non-durable queues (transient queues) are purged
- * if/when a server restarts. Note that durable queues do not necessarily hold persistent
- * messages, although it does not make sense to send persistent messages to a transient
- * queue.
- * <li> {@link Option#EXCLUSIVE}: <p> Exclusive queues can only be used from one connection at a time.
- * Once a connection declares an exclusive queue, that queue cannot be used by any other connections until the
- * declaring connection closes.
- * <li> {@link Option#PASSIVE}: <p> If set, the server will not create the queue.
- * This field allows the client to assert the presence of a queue without modifying the server state.
- * <li>{@link Option#NONE}: <p> Has no effect as it represents an empty option.
- * </ul>
- * <p>In the absence of a particular option, the defaul value is false for each option
- *
- * @param queueName The name of the delcared queue.
- * @param alternateExchange If a message is rejected by a queue, then it is sent to the alternate-exchange. A message
- * may be rejected by a queue for the following reasons:
- * <oL> <li> The queue is deleted when it is not empty;
- * <li> Immediate delivery of a message is requested, but there are no consumers connected to
- * the queue. </ol>
- * @param arguments Used for backward compatibility
- * @param options Set of Options ( valide options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},
- * {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and {@link Option#NONE})
- * @see Option
- */
- public void queueDeclare(String queueName, String alternateExchange, Map<String, Object> arguments,
- Option... options);
-
- /**
- * Bind a queue with an exchange.
- *
- * @param queueName Specifies the name of the queue to bind. If the queue name is empty, refers to the current
- * queue for the session, which is the last declared queue.
- * @param exchangeName The exchange name.
- * @param routingKey Specifies the routing key for the binding. The routing key is used for routing messages
- * depending on the exchange configuration. Not all exchanges use a routing key - refer to
- * the specific exchange documentation. If the queue name is empty, the server uses the last
- * queue declared on the session. If the routing key is also empty, the server uses this
- * queue name for the routing key as well. If the queue name is provided but the routing key
- * is empty, the server does the binding with that empty routing key. The meaning of empty
- * routing keys depends on the exchange implementation.
- * @param arguments Used for backward compatibility
- */
- public void exchangeBind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments,
- Option ... options);
-
- /**
- * Unbind a queue from an exchange.
- *
- * @param queueName Specifies the name of the queue to unbind.
- * @param exchangeName The name of the exchange to unbind from.
- * @param routingKey Specifies the routing key of the binding to unbind.
- */
- public void exchangeUnbind(String queueName, String exchangeName, String routingKey, Option ... options);
-
- /**
- * This method removes all messages from a queue. It does not cancel consumers. Purged messages
- * are deleted without any formal "undo" mechanism.
- *
- * @param queueName Specifies the name of the queue to purge. If the queue name is empty, refers to the
- * current queue for the session, which is the last declared queue.
- */
- public void queuePurge(String queueName, Option ... options);
-
- /**
- * This method deletes a queue. When a queue is deleted any pending messages are sent to a
- * dead-letter queue if this is defined in the server configuration, and all consumers on the
- * queue are cancelled.
- * <p> Following are the valid options:
- * <ul>
- * <li> {@link Option#IF_EMPTY}: <p> If set, the server will only delete the queue if it has no messages.
- * <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the queue if it has no consumers.
- * If the queue has consumers the server does does not delete it but raises a channel exception instead.
- * <li>{@link Option#NONE}: <p> Has no effect as it represents an empty option.
- * </ul>
- * </p>
- * <p/>
- * <p>In the absence of a particular option, the defaul value is false for each option</p>
- *
- * @param queueName Specifies the name of the queue to delete. If the queue name is empty, refers to the
- * current queue for the session, which is the last declared queue.
- * @param options Set of options (Valid options are: {@link Option#IF_EMPTY}, {@link Option#IF_UNUSED}
- * and {@link Option#NONE})
- * @see Option
- */
- public void queueDelete(String queueName, Option... options);
-
-
- /**
- * This method is used to request information on a particular queue.
- *
- * @param queueName The name of the queue for which information is requested.
- * @return Information on the specified queue.
- */
- public Future<QueueQueryResult> queueQuery(String queueName, Option ... options);
-
-
- /**
- * This method is used to request information on a particular binding.
- *
- * @param exchange The exchange name.
- * @param queue The queue name.
- * @param routingKey The routing key
- * @param arguments bacward compatibilties params.
- * @return Information on the specified binding.
- */
- public Future<ExchangeBoundResult> exchangeBound(String exchange, String queue, String routingKey,
- Map<String, Object> arguments, Option ... options);
-
- // --------------------------------------
- // exhcange methods
- // --------------------------------------
-
- /**
- * This method creates an exchange. If the exchange already exists,
- * the method verifies the class and checks the details are correct.
- * <p>Valid options are:
- * <ul>
- * <li>{@link Option#AUTO_DELETE}: <p>If set, the exchange is deleted when all queues have finished using it.
- * <li>{@link Option#DURABLE}: <p>If set, the exchange will
- * be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient
- * exchanges) are purged when a server restarts.
- * <li>{@link Option#PASSIVE}: <p>If set, the server will not create the exchange.
- * The client can use this to check whether an exchange exists without modifying the server state.
- * <li> {@link Option#NONE}: <p>This option is an empty option, and has no effect.
- * </ul>
- * <p>In the absence of a particular option, the defaul value is false for each option</p>
- *
- * @param exchangeName The exchange name.
- * @param type Each exchange belongs to one of a set of exchange types implemented by the server. The
- * exchange types define the functionality of the exchange - i.e. how messages are routed
- * through it. It is not valid or meaningful to attempt to change the type of an existing
- * exchange. Default exchange types are: direct, topic, headers and fanout.
- * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which
- * the message will be sent.
- * @param options Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},
- * {@link Option#PASSIVE}, {@link Option#NONE})
- * @param arguments Used for backward compatibility
- * @see Option
- */
- public void exchangeDeclare(String exchangeName, String type, String alternateExchange,
- Map<String, Object> arguments, Option... options);
-
- /**
- * This method deletes an exchange. When an exchange is deleted all queue bindings on the
- * exchange are cancelled.
- * <p> Following are the valid options:
- * <ul>
- * <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the exchange if it has no queue bindings. If the
- * exchange has queue bindings the server does not delete it but raises a channel exception
- * instead.
- * <li> {@link Option#NONE}: <p> Has no effect as it represents an empty option.
- * </ul>
- * <p>Note that if an option is not set, it will default to false.
- *
- * @param exchangeName The name of exchange to be deleted.
- * @param options Set of options. Valid options are: {@link Option#IF_UNUSED}, {@link Option#NONE}.
- * @see Option
- */
- public void exchangeDelete(String exchangeName, Option... options);
-
-
- /**
- * This method is used to request information about a particular exchange.
- *
- * @param exchangeName The name of the exchange about which information is requested. If not set, the method will
- * return information about the default exchange.
- * @return Information on the specified exchange.
- */
- public Future<ExchangeQueryResult> exchangeQuery(String exchangeName, Option ... options);
-
- /**
- * If the session receives a sessionClosed with an error code it
- * informs the session's exceptionListener
- *
- * @param exceptionListner The exceptionListener
- */
- public void setClosedListener(ClosedListener exceptionListner);
-}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
deleted file mode 100644
index 869f974ae6..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
+++ /dev/null
@@ -1,163 +0,0 @@
-package org.apache.qpid.nclient.impl;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.qpid.QpidException;
-import org.apache.qpid.api.Message;
-import org.apache.qpid.nclient.ClosedListener;
-import org.apache.qpid.nclient.MessagePartListener;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.RangeSet;
-
-import static org.apache.qpid.transport.Option.*;
-
-/**
- * Implements a Qpid Sesion.
- */
-public class ClientSession extends org.apache.qpid.transport.Session implements org.apache.qpid.nclient.DtxSession
-{
- static
- {
- String max = "message_size_before_sync"; // KB's
- try
- {
- MAX_NOT_SYNC_DATA_LENGH = new Long(System.getProperties().getProperty(max, "200000000"));
- }
- catch (NumberFormatException e)
- {
- // use default size
- MAX_NOT_SYNC_DATA_LENGH = 200000000;
- }
- String flush = "message_size_before_flush";
- try
- {
- MAX_NOT_FLUSH_DATA_LENGH = new Long(System.getProperties().getProperty(flush, "2000000"));
- }
- catch (NumberFormatException e)
- {
- // use default size
- MAX_NOT_FLUSH_DATA_LENGH = 20000000;
- }
- }
-
- private static long MAX_NOT_SYNC_DATA_LENGH;
- private static long MAX_NOT_FLUSH_DATA_LENGH;
-
- private Map<String,MessagePartListener> _messageListeners = new ConcurrentHashMap<String,MessagePartListener>();
- private ClosedListener _exceptionListner;
- private RangeSet _rejectedMessages;
- private long _currentDataSizeNotSynced;
- private long _currentDataSizeNotFlushed;
-
- public ClientSession(byte[] name)
- {
- super(name);
- }
-
- public void messageAcknowledge(RangeSet ranges, boolean accept)
- {
- for (Range range : ranges)
- {
- super.processed(range);
- }
- super.flushProcessed(accept ? BATCH : NONE);
- if (accept)
- {
- messageAccept(ranges);
- }
- }
-
- public void messageSubscribe(String queue, String destination, short acceptMode, short acquireMode, MessagePartListener listener, Map<String, Object> filter, Option... options)
- {
- setMessageListener(destination,listener);
- super.messageSubscribe(queue, destination, MessageAcceptMode.get(acceptMode),
- MessageAcquireMode.get(acquireMode), null, 0, filter,
- options);
- }
-
- public void messageTransfer(String destination, Message msg, short acceptMode, short acquireMode) throws IOException
- {
- DeliveryProperties dp = msg.getDeliveryProperties();
- MessageProperties mp = msg.getMessageProperties();
- ByteBuffer body = msg.readData();
- int size = body.remaining();
- super.messageTransfer
- (destination, MessageAcceptMode.get(acceptMode),
- MessageAcquireMode.get(acquireMode),
- new Header(dp, mp), body);
- _currentDataSizeNotSynced += size;
- _currentDataSizeNotFlushed += size;
- }
-
- public void sync()
- {
- super.sync();
- _currentDataSizeNotSynced = 0;
- }
-
- public RangeSet getRejectedMessages()
- {
- return _rejectedMessages;
- }
-
- public void setMessageListener(String destination, MessagePartListener listener)
- {
- if (listener == null)
- {
- throw new IllegalArgumentException("Cannot set message listener to null");
- }
- _messageListeners.put(destination, listener);
- }
-
- public void setClosedListener(ClosedListener exceptionListner)
- {
- _exceptionListner = exceptionListner;
- }
-
- void setRejectedMessages(RangeSet rejectedMessages)
- {
- _rejectedMessages = rejectedMessages;
- }
-
- void notifyException(QpidException ex)
- {
- _exceptionListner.onClosed(null, null, null);
- }
-
- Map<String,MessagePartListener> getMessageListeners()
- {
- return _messageListeners;
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java
deleted file mode 100644
index 6bcd4fbce5..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package org.apache.qpid.nclient.impl;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.ErrorCode;
-
-import org.apache.qpid.nclient.MessagePartListener;
-
-import org.apache.qpid.QpidException;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageReject;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionDetached;
-import org.apache.qpid.transport.SessionDelegate;
-
-
-public class ClientSessionDelegate extends SessionDelegate
-{
-
- // --------------------------------------------
- // Message methods
- // --------------------------------------------
- @Override public void messageTransfer(Session session, MessageTransfer xfr)
- {
- MessagePartListener listener = ((ClientSession)session).getMessageListeners()
- .get(xfr.getDestination());
- listener.messageTransfer(xfr);
- }
-
- @Override public void messageReject(Session session, MessageReject struct)
- {
- for (Range range : struct.getTransfers())
- {
- for (long l = range.getLower(); l <= range.getUpper(); l++)
- {
- System.out.println("message rejected: " +
- session.getCommand((int) l));
- }
- }
- ((ClientSession)session).setRejectedMessages(struct.getTransfers());
- ((ClientSession)session).notifyException(new QpidException("Message Rejected",ErrorCode.MESSAGE_REJECTED,null));
- session.processed(struct);
- }
-
-}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java
index 155de2a678..a1dc48fcda 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java
@@ -23,60 +23,55 @@ package org.apache.qpid.nclient.impl;
import org.apache.qpid.ErrorCode;
import org.apache.qpid.api.Message;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.ClosedListener;
-import org.apache.qpid.nclient.Session;
-import org.apache.qpid.nclient.util.MessageListener;
-import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
import java.nio.ByteBuffer;
import java.util.UUID;
public class DemoClient
{
- public static MessagePartListenerAdapter createAdapter()
+ public static class DemoListener implements SessionListener
{
- return new MessagePartListenerAdapter(new MessageListener()
+ public void opened(Session ssn) {}
+
+ public void exception(Session ssn, SessionException exc)
+ {
+ System.out.println(exc);
+ }
+
+ public void message(Session ssn, MessageTransfer m)
{
- public void onMessage(Message m)
- {
- System.out.println("\n================== Received Msg ==================");
- System.out.println("Message Id : " + m.getMessageProperties().getMessageId());
- System.out.println(m.toString());
- System.out.println("================== End Msg ==================\n");
- }
-
- });
+ System.out.println("\n================== Received Msg ==================");
+ System.out.println("Message Id : " + m.getHeader().get(MessageProperties.class).getMessageId());
+ System.out.println(m.toString());
+ System.out.println("================== End Msg ==================\n");
+ }
+
+ public void closed(Session ssn) {}
}
public static final void main(String[] args)
{
- Connection conn = Client.createConnection();
- try{
- conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
- }catch(Exception e){
- e.printStackTrace();
- }
+ Connection conn = new Connection();
+ conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
Session ssn = conn.createSession(50000);
- ssn.setClosedListener(new ClosedListener()
- {
- public void onClosed(ErrorCode errorCode, String reason, Throwable t)
- {
- System.out.println("ErrorCode : " + errorCode + " reason : " + reason);
- }
- });
+ ssn.setSessionListener(new DemoListener());
ssn.queueDeclare("queue1", null, null);
ssn.exchangeBind("queue1", "amq.direct", "queue1",null);
ssn.sync();
- ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
+ ssn.messageSubscribe("queue1", "myDest", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ null, 0, null);
// queue
ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
@@ -91,9 +86,12 @@ public class DemoClient
ssn.sync();
// topic subs
- ssn.messageSubscribe("topic1", "myDest2", (short)0, (short)0,createAdapter(), null);
- ssn.messageSubscribe("topic2", "myDest3", (short)0, (short)0,createAdapter(), null);
- ssn.messageSubscribe("topic3", "myDest4", (short)0, (short)0,createAdapter(), null);
+ ssn.messageSubscribe("topic1", "myDest2", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ null, 0, null);
+ ssn.messageSubscribe("topic2", "myDest3", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ null, 0, null);
+ ssn.messageSubscribe("topic3", "myDest4", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ null, 0, null);
ssn.sync();
ssn.queueDeclare("topic1", null, null);
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java b/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java
index dd4a78fa2b..6c6cc308e9 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java
@@ -28,12 +28,6 @@ import java.util.Map;
import org.apache.qpid.ErrorCode;
import org.apache.qpid.QpidException;
import org.apache.qpid.api.Message;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.ClosedListener;
-import org.apache.qpid.nclient.Session;
-import org.apache.qpid.nclient.util.MessageListener;
-import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
@@ -41,9 +35,13 @@ import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
import org.apache.qpid.transport.MessageFlowMode;
import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
-public class BasicInteropTest implements ClosedListener
+public class BasicInteropTest implements SessionListener
{
private Session session;
@@ -62,7 +60,7 @@ public class BasicInteropTest implements ClosedListener
public void testCreateConnection(){
System.out.println("------- Creating connection--------");
- conn = Client.createConnection();
+ conn = new Connection();
try{
conn.connect(host, 5672, "test", "guest", "guest");
}catch(Exception e){
@@ -116,23 +114,11 @@ public class BasicInteropTest implements ClosedListener
public void testSubscribe()
{
System.out.println("------- Sending a subscribe --------");
+ session.setSessionListener(this);
session.messageSubscribe("testQueue", "myDest",
- Session.TRANSFER_CONFIRM_MODE_REQUIRED,
- Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
- new MessagePartListenerAdapter(new MessageListener(){
-
- public void onMessage(Message message)
- {
- System.out.println("--------Message Received--------");
- System.out.println(message.toString());
- System.out.println("--------/Message Received--------");
- RangeSet ack = new RangeSet();
- ack.add(message.getMessageTransferId(),message.getMessageTransferId());
- session.messageAcknowledge(ack, true);
- }
-
- }),
- null);
+ MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED,
+ null, 0, null);
System.out.println("------- Setting Credit mode --------");
session.messageSetFlowMode("myDest", MessageFlowMode.WINDOW);
@@ -141,20 +127,32 @@ public class BasicInteropTest implements ClosedListener
session.messageFlow("myDest", MessageCreditUnit.BYTE, -1);
}
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
+ {
+ System.out.println("--------Message Received--------");
+ System.out.println(xfr.toString());
+ System.out.println("--------/Message Received--------");
+ ssn.processed(xfr);
+ ssn.flushProcessed();
+ }
+
public void testMessageFlush()
{
session.messageFlush("myDest");
session.sync();
}
- public void onClosed(ErrorCode errorCode, String reason, Throwable t)
+ public void exception(Session ssn, SessionException exc)
{
System.out.println("------- Broker Notified an error --------");
- System.out.println("------- " + errorCode + " --------");
- System.out.println("------- " + reason + " --------");
+ System.out.println("------- " + exc + " --------");
System.out.println("------- /Broker Notified an error --------");
}
+ public void closed(Session ssn) {}
+
public static void main(String[] args) throws QpidException
{
String host = "0.0.0.0";