diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-10-09 17:07:59 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-10-09 17:07:59 +0000 |
| commit | 394823bba7976c170ac58e53b5d80ad12e0f1690 (patch) | |
| tree | 9b952b30b1b1bcd54c6f1cc453a221328b57c53f /java/client/src | |
| parent | e78747f63bc73daa6e2035453358e6eaf3237b84 (diff) | |
| download | qpid-python-394823bba7976c170ac58e53b5d80ad12e0f1690.tar.gz | |
QPID-1339: refactor of low level client API to permit connections to exist in a disconnected state as well as to provide a central point from which to track session state
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703208 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
20 files changed, 180 insertions, 1506 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 27294562e5..ebeb29af78 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -917,7 +917,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // adjust timeout timeout = adjustTimeout(timeout, startCloseTime); - _delegate.closeConneciton(timeout); + _delegate.closeConnection(timeout); //If the taskpool hasn't shutdown by now then give it shutdownNow. // This will interupt any running tasks. diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 7f36ec6e99..60b827a426 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -42,5 +42,5 @@ public interface AMQConnectionDelegate public void resubscribeSessions() throws JMSException, AMQException, FailoverException; - public void closeConneciton(long timeout) throws JMSException, AMQException; + public void closeConnection(long timeout) throws JMSException, AMQException; } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index c723709d27..a7f04a2090 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -23,6 +23,7 @@ package org.apache.qpid.client; import java.io.IOException; +import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.XASession; @@ -33,15 +34,18 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.ClosedListener; import org.apache.qpid.ErrorCode; -import org.apache.qpid.QpidException; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionClose; +import org.apache.qpid.transport.ConnectionException; +import org.apache.qpid.transport.ConnectionListener; import org.apache.qpid.transport.ProtocolVersionException; +import org.apache.qpid.transport.TransportException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ClosedListener +public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ConnectionListener { /** * This class logger. @@ -56,7 +60,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed /** * The QpidConeection instance that is mapped with thie JMS connection. */ - org.apache.qpid.nclient.Connection _qpidConnection; + org.apache.qpid.transport.Connection _qpidConnection; //--- constructor public AMQConnectionDelegate_0_10(AMQConnection conn) @@ -125,7 +129,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed */ public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException { - _qpidConnection = Client.createConnection(); + _qpidConnection = new Connection(); try { if (_logger.isDebugEnabled()) @@ -134,16 +138,16 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed .getHost() + " port: " + brokerDetail.getPort() + " virtualhost: " + _conn .getVirtualHost() + "user name: " + _conn.getUsername() + "password: " + _conn.getPassword()); } + _qpidConnection.setConnectionListener(this); _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(), _conn.getUsername(), _conn.getPassword()); - _qpidConnection.setClosedListener(this); _conn._connected = true; } catch(ProtocolVersionException pe) { return new ProtocolVersion(pe.getMajor(), pe.getMinor()); } - catch (QpidException e) + catch (ConnectionException e) { throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e); } @@ -161,34 +165,42 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed } - public void closeConneciton(long timeout) throws JMSException, AMQException + public void closeConnection(long timeout) throws JMSException, AMQException { try { _qpidConnection.close(); } - catch (QpidException e) + catch (TransportException e) { - throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot close connection", e); + throw new AMQException(e.getMessage(), e); } - } - public void onClosed(ErrorCode errorCode, String reason, Throwable t) + public void opened(Connection conn) {} + + public void exception(Connection conn, ConnectionException exc) { - if (_logger.isDebugEnabled()) + ExceptionListener listener = _conn._exceptionListener; + if (listener == null) { - _logger.debug("Received a connection close from the broker: Error code : " + errorCode.getCode(), t); + _logger.error("connection exception: " + conn, exc); } - if (_conn._exceptionListener != null) + else { - JMSException ex = new JMSException(reason,String.valueOf(errorCode.getCode())); - if (t != null) + ConnectionClose close = exc.getClose(); + String code = null; + if (close != null) { - ex.initCause(t); + code = close.getReplyCode().toString(); } + JMSException ex = new JMSException(exc.getMessage(), code); + ex.initCause(exc); _conn._exceptionListener.onException(ex); } } + + public void closed(Connection conn) {} + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 2ec8737d16..8d42a2f201 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -58,7 +58,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate private AMQConnection _conn; - public void closeConneciton(long timeout) throws JMSException, AMQException + public void closeConnection(long timeout) throws JMSException, AMQException { _conn.getProtocolHandler().closeConnection(timeout); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 46a667419d..7829966315 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -28,20 +28,27 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.FiledTableSupport; import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.message.UnprocessedMessage_0_10; import org.apache.qpid.util.Serial; -import org.apache.qpid.nclient.Session; -import org.apache.qpid.nclient.util.MessagePartListenerAdapter; -import org.apache.qpid.ErrorCode; -import org.apache.qpid.QpidException; +import org.apache.qpid.transport.ExecutionException; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageCreditUnit; import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.transport.RangeSet; import org.apache.qpid.transport.Option; import org.apache.qpid.transport.ExchangeBoundResult; import org.apache.qpid.transport.Future; +import org.apache.qpid.transport.Range; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.qpid.transport.Option.*; + import javax.jms.*; import javax.jms.IllegalStateException; @@ -53,6 +60,7 @@ import java.util.Map; * This is a 0.10 Session */ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, BasicMessageProducer_0_10> + implements SessionListener { /** @@ -70,10 +78,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * The latest qpid Exception that has been reaised. */ private Object _currentExceptionLock = new Object(); - private QpidException _currentException; + private SessionException _currentException; // a ref on the qpid connection - protected org.apache.qpid.nclient.Connection _qpidConnection; + protected org.apache.qpid.transport.Connection _qpidConnection; private RangeSet unacked = new RangeSet(); private int unackedCount = 0; @@ -97,7 +105,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. * @param qpidConnection The qpid connection */ - AMQSession_0_10(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId, + AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { @@ -108,7 +116,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // create the qpid session with an expiry <= 0 so that the session does not expire _qpidSession = qpidConnection.createSession(0); // set the exception listnere for this session - _qpidSession.setClosedListener(new QpidSessionExceptionListener()); + _qpidSession.setSessionListener(this); // set transacted if required if (_transacted) { @@ -127,7 +135,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. * @param qpidConnection The connection */ - AMQSession_0_10(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId, + AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) { @@ -195,12 +203,26 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { if (unackedCount > 0) { - getQpidSession().messageAcknowledge + messageAcknowledge (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); clearUnacked(); } } + void messageAcknowledge(RangeSet ranges, boolean accept) + { + Session ssn = getQpidSession(); + for (Range range : ranges) + { + ssn.processed(range); + } + ssn.flushProcessed(accept ? BATCH : NONE); + if (accept) + { + ssn.messageAccept(ranges); + } + } + /** * Bind a queue with an exchange. * @@ -416,11 +438,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic preAcquire = ( ! consumer.isNoConsume() && (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")) ) || !(consumer.getDestination() instanceof AMQQueue); - getQpidSession().messageSubscribe(queueName.toString(), String.valueOf(tag), - getAcknowledgeMode() == NO_ACKNOWLEDGE ? Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED, - preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE, - (BasicMessageConsumer_0_10) consumer, null, - consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + getQpidSession().messageSubscribe + (queueName.toString(), String.valueOf(tag), + getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, + preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, null, + consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } catch (JMSException e) { @@ -598,7 +620,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * * @return The associated Qpid Session. */ - protected org.apache.qpid.nclient.Session getQpidSession() + protected Session getQpidSession() { return _qpidSession; } @@ -615,31 +637,41 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { if (_currentException != null) { - QpidException toBeThrown = _currentException; + SessionException se = _currentException; _currentException = null; - throw new AMQException(AMQConstant.getConstant(toBeThrown.getErrorCode().getCode()), - toBeThrown.getMessage(), toBeThrown); + ExecutionException ee = se.getException(); + int code; + if (ee == null) + { + code = 0; + } + else + { + code = ee.getErrorCode().getValue(); + } + throw new AMQException + (AMQConstant.getConstant(code), se.getMessage(), se); } } } - //------ Inner classes - /** - * Lstener for qpid protocol exceptions - */ - private class QpidSessionExceptionListener implements org.apache.qpid.nclient.ClosedListener + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) { - public void onClosed(ErrorCode errorCode, String reason, Throwable t) + messageReceived(new UnprocessedMessage_0_10(xfr)); + } + + public void exception(Session ssn, SessionException exc) + { + synchronized (_currentExceptionLock) { - synchronized (_currentExceptionLock) - { - // todo check the error code for finding out if we need to notify the - // JMS connection exception listener - _currentException = new QpidException(reason, errorCode, t); - } + _currentException = exc; } } + public void closed(Session ssn) {} + protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, final boolean noLocal) throws AMQException @@ -776,7 +808,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0) { // send completed so consumer credits don't dry up - getQpidSession().messageAcknowledge(_txRangeSet, false); + messageAcknowledge(_txRangeSet, false); } } @@ -787,7 +819,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { if( _txSize > 0 ) { - getQpidSession().messageAcknowledge(_txRangeSet, true); + messageAcknowledge(_txRangeSet, true); _txRangeSet.clear(); _txSize = 0; } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 2a37298a43..7d535643c0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicBoolean; * This is a 0.10 message consumer. */ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedMessage_0_10> - implements org.apache.qpid.nclient.MessagePartListener { /** @@ -114,9 +113,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return _consumerTagString; } - - // ----- Interface org.apache.qpid.client.util.MessageListener - /** * * This is invoked by the session thread when emptying the session message queue. @@ -159,28 +155,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } } - - - /** - * This method is invoked by the transport layer when a message is delivered for this - * consumer. The message is transformed and pass to the session. - * @param xfr an 0.10 message transfer - */ - public void messageTransfer(MessageTransfer xfr) - - //public void onMessage(Message message) - { - int channelId = getSession().getChannelId(); - int consumerTag = getConsumerTag(); - - UnprocessedMessage_0_10 newMessage = - new UnprocessedMessage_0_10(consumerTag, xfr); - - - getSession().messageReceived(newMessage); - // else ignore this message - } - //----- overwritten methods /** @@ -304,8 +278,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { RangeSet ranges = new RangeSet(); ranges.add((int) message.getDeliveryTag()); - _0_10session.getQpidSession().messageAcknowledge(ranges, - _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE ); + _0_10session.messageAcknowledge + (ranges, + _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); _0_10session.getCurrentException(); } } @@ -425,10 +400,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM void postDeliver(AbstractJMSMessage msg) throws JMSException { super.postDeliver(msg); - if(_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery()) + if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery()) { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index ef8aeb58a1..4e5077f0cd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -36,7 +36,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.util.Strings; -import org.apache.qpid.nclient.util.ByteBufferMessage; import org.apache.qpid.njms.ExceptionHelper; import org.apache.qpid.transport.*; import static org.apache.qpid.transport.Option.*; diff --git a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java index 6763c72ecd..35adda9348 100644 --- a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java @@ -88,8 +88,7 @@ public class XAResourceImpl implements XAResource { // we need to restore the qpid session that has been closed _xaSession.createSession(); - // we should get a single exception - convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode()); + convertExecutionErrorToXAErr(e.getException().getErrorCode()); } checkStatus(result.getStatus()); } @@ -142,8 +141,7 @@ public class XAResourceImpl implements XAResource { // we need to restore the qpid session that has been closed _xaSession.createSession(); - // we should get a single exception - convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode()); + convertExecutionErrorToXAErr(e.getException().getErrorCode()); } checkStatus(result.getStatus()); } @@ -171,8 +169,7 @@ public class XAResourceImpl implements XAResource { // we need to restore the qpid session that has been closed _xaSession.createSession(); - // we should get a single exception - convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode()); + convertExecutionErrorToXAErr(e.getException().getErrorCode()); } } @@ -201,8 +198,7 @@ public class XAResourceImpl implements XAResource { // we need to restore the qpid session that has been closed _xaSession.createSession(); - // we should get a single exception - convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode()); + convertExecutionErrorToXAErr(e.getException().getErrorCode()); } } return result; @@ -248,8 +244,7 @@ public class XAResourceImpl implements XAResource { // we need to restore the qpid session that has been closed _xaSession.createSession(); - // we should get a single exception - convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode()); + convertExecutionErrorToXAErr(e.getException().getErrorCode()); } DtxXaStatus status = result.getStatus(); int outcome = XAResource.XA_OK; @@ -291,8 +286,7 @@ public class XAResourceImpl implements XAResource { // we need to restore the qpid session that has been closed _xaSession.createSession(); - // we should get a single exception - convertExecutionErrorToXAErr( e.getExceptions().get(0).getErrorCode()); + convertExecutionErrorToXAErr( e.getException().getErrorCode()); } Xid[] result = new Xid[res.getInDoubt().size()]; int i = 0; @@ -329,8 +323,7 @@ public class XAResourceImpl implements XAResource { // we need to restore the qpid session that has been closed _xaSession.createSession(); - // we should get a single exception - convertExecutionErrorToXAErr( e.getExceptions().get(0).getErrorCode()); + convertExecutionErrorToXAErr( e.getException().getErrorCode()); } checkStatus(result.getStatus()); } @@ -413,8 +406,7 @@ public class XAResourceImpl implements XAResource { // we need to restore the qpid session that has been closed _xaSession.createSession(); - // we should get a single exception - convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode()); + convertExecutionErrorToXAErr(e.getException().getErrorCode()); // TODO: The amqp spec does not allow to make the difference // between an already known XID and a wrong arguments (join and resume are set) // TODO: make sure amqp addresses that diff --git a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java index 51b4b7899f..354b67cd35 100644 --- a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java @@ -17,7 +17,6 @@ */ package org.apache.qpid.client; -import org.apache.qpid.nclient.DtxSession; import org.apache.qpid.client.message.MessageFactoryRegistry; import javax.jms.*; @@ -36,7 +35,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic /** * This XASession Qpid DtxSession */ - private DtxSession _qpidDtxSession; + private org.apache.qpid.transport.Session _qpidDtxSession; /** * The standard session @@ -48,7 +47,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic /** * Create a JMS XASession */ - public XASessionImpl(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId, + public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId, int defaultPrefetchHigh, int defaultPrefetchLow) { super(qpidConnection, con, channelId, false, // this is not a transacted session @@ -65,7 +64,9 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic */ public void createSession() { - _qpidDtxSession = _qpidConnection.createDTXSession(0); + _qpidDtxSession = _qpidConnection.createSession(0); + _qpidDtxSession.setSessionListener(this); + _qpidDtxSession.dtxSelect(); } @@ -126,7 +127,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic * * @return The associated Qpid Session. */ - protected org.apache.qpid.nclient.DtxSession getQpidSession() + protected org.apache.qpid.transport.Session getQpidSession() { return _qpidDtxSession; } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index 8b4488f1f9..d064c27754 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -29,7 +29,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; import org.apache.qpid.AMQPInvalidClassException; -import org.apache.qpid.nclient.*; import org.apache.qpid.jms.Message; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.AMQBindingURL; @@ -138,7 +137,7 @@ public class AMQMessageDelegate_0_10 implements AMQMessageDelegate } - public static void updateExchangeTypeMapping(Header header, org.apache.qpid.nclient.Session session) + public static void updateExchangeTypeMapping(Header header, org.apache.qpid.transport.Session session) { DeliveryProperties deliveryProps = header.get(DeliveryProperties.class); if(deliveryProps != null) diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java index 6b1301a33f..f31bc88509 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java @@ -33,9 +33,9 @@ public class UnprocessedMessage_0_10 extends UnprocessedMessage { private MessageTransfer _transfer; - public UnprocessedMessage_0_10(int consumerTag, MessageTransfer xfr) + public UnprocessedMessage_0_10(MessageTransfer xfr) { - super(consumerTag); + super(Integer.parseInt(xfr.getDestination())); _transfer = xfr; } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Client.java b/java/client/src/main/java/org/apache/qpid/nclient/Client.java deleted file mode 100644 index af0e724e42..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/Client.java +++ /dev/null @@ -1,295 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpid.nclient; - -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.qpid.client.url.URLParser_0_10; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.url.QpidURL; -import org.apache.qpid.ErrorCode; -import org.apache.qpid.QpidException; -import org.apache.qpid.nclient.impl.ClientSession; -import org.apache.qpid.nclient.impl.ClientSessionDelegate; -import org.apache.qpid.transport.Channel; -import org.apache.qpid.transport.ClientDelegate; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionClose; -import org.apache.qpid.transport.ConnectionCloseCode; -import org.apache.qpid.transport.ConnectionCloseOk; -import org.apache.qpid.transport.ProtocolHeader; -import org.apache.qpid.transport.ProtocolVersionException; -import org.apache.qpid.transport.SessionDelegate; -import org.apache.qpid.transport.network.io.IoTransport; -import org.apache.qpid.transport.network.mina.MinaHandler; -import org.apache.qpid.transport.network.nio.NioHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class Client implements org.apache.qpid.nclient.Connection -{ - private Connection _conn; - private ClosedListener _closedListner; - private final Lock _lock = new ReentrantLock(); - private static Logger _logger = LoggerFactory.getLogger(Client.class); - private Condition closeOk; - private boolean closed = false; - private long timeout = 60000; - - private ProtocolHeader header = null; - - /** - * - * @return returns a new connection to the broker. - */ - public static org.apache.qpid.nclient.Connection createConnection() - { - return new Client(); - } - - public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException - { - - final Condition negotiationComplete = _lock.newCondition(); - closeOk = _lock.newCondition(); - _lock.lock(); - - ClientDelegate connectionDelegate = new ClientDelegate() - { - private boolean receivedClose = false; - public SessionDelegate getSessionDelegate() - { - return new ClientSessionDelegate(); - } - - public void exception(Throwable t) - { - if (_closedListner != null) - { - _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),t); - } - else - { - throw new RuntimeException("connection closed",t); - } - } - - public void closed() - { - if (_closedListner != null && !this.receivedClose) - { - _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),null); - } - } - - @Override public void connectionCloseOk(Channel context, ConnectionCloseOk struct) - { - _lock.lock(); - try - { - closed = true; - this.receivedClose = true; - closeOk.signalAll(); - } - finally - { - _lock.unlock(); - } - } - - @Override public void connectionClose(Channel context, ConnectionClose connectionClose) - { - super.connectionClose(context, connectionClose); - ErrorCode errorCode = ErrorCode.get(connectionClose.getReplyCode().getValue()); - if (_closedListner == null && errorCode != ErrorCode.NO_ERROR) - { - throw new RuntimeException - (new QpidException("Server closed the connection: Reason " + - connectionClose.getReplyText(), - errorCode, - null)); - } - else - { - _closedListner.onClosed(errorCode, connectionClose.getReplyText(),null); - } - - this.receivedClose = true; - } - @Override public void init(Channel ch, ProtocolHeader hdr) - { - // TODO: once the merge is done we'll need to update this code - // for handling 0.8 protocol version type i.e. major=8 and mino - if (hdr.getMajor() != 0 || hdr.getMinor() != 10) - { - Client.this.header = hdr; - _lock.lock(); - negotiationComplete.signalAll(); - _lock.unlock(); - } - } - }; - - connectionDelegate.setCondition(_lock,negotiationComplete); - connectionDelegate.setUsername(username); - connectionDelegate.setPassword(password); - connectionDelegate.setVirtualHost(virtualHost); - - String transport = System.getProperty("transport","io"); - if (transport.equalsIgnoreCase("nio")) - { - _logger.info("using NIO Transport"); - _conn = NioHandler.connect(host, port,connectionDelegate); - } - else if (transport.equalsIgnoreCase("io")) - { - _logger.info("using Plain IO Transport"); - _conn = IoTransport.connect(host, port,connectionDelegate); - } - else - { - _logger.info("using MINA Transport"); - _conn = MinaHandler.connect(host, port,connectionDelegate); - // _conn = NativeHandler.connect(host, port,connectionDelegate); - } - - // XXX: hardcoded version numbers - _conn.send(new ProtocolHeader(1, 0, 10)); - - try - { - negotiationComplete.await(timeout, TimeUnit.MILLISECONDS); - if (header != null) - { - _conn.close(); - throw new ProtocolVersionException(header.getMajor(), header.getMinor()); - } - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - finally - { - _lock.unlock(); - } - } - - public void connect(String url)throws QpidException - { - URLParser_0_10 parser = null; - try - { - parser = new URLParser_0_10(url); - } - catch(Exception e) - { - throw new QpidException("Error parsing the URL",ErrorCode.UNDEFINED,e); - } - List<BrokerDetails> brokers = parser.getAllBrokerDetails(); - BrokerDetails brokerDetail = brokers.get(0); - connect(brokerDetail.getHost(), brokerDetail.getPort(), brokerDetail.getProperty("virtualhost"), - brokerDetail.getProperty("username")== null? "guest":brokerDetail.getProperty("username"), - brokerDetail.getProperty("password")== null? "guest":brokerDetail.getProperty("password")); - } - - /* - * Until the dust settles with the URL disucssion - * I am not going to implement this. - */ - public void connect(QpidURL url) throws QpidException - { - throw new UnsupportedOperationException("Not implemented"); - } - - /* { - // temp impl to tests - BrokerDetails details = url.getAllBrokerDetails().get(0); - connect(details.getHost(), - details.getPort(), - details.getVirtualHost(), - details.getUserName(), - details.getPassword()); - } -*/ - - public void close() throws QpidException - { - Channel ch = _conn.getChannel(0); - ch.connectionClose(ConnectionCloseCode.NORMAL, "client is closing"); - _lock.lock(); - try - { - try - { - long start = System.currentTimeMillis(); - long elapsed = 0; - while (!closed && elapsed < timeout) - { - closeOk.await(timeout - elapsed, TimeUnit.MILLISECONDS); - elapsed = System.currentTimeMillis() - start; - } - if(!closed) - { - throw new QpidException("Timed out when closing connection", ErrorCode.CONNECTION_ERROR, null); - } - } - catch (InterruptedException e) - { - throw new QpidException("Interrupted when closing connection", ErrorCode.CONNECTION_ERROR, null); - } - } - finally - { - _lock.unlock(); - } - _conn.close(); - } - - public Session createSession(long expiryInSeconds) - { - Channel ch = _conn.getChannel(); - ClientSession ssn = new ClientSession(UUID.randomUUID().toString().getBytes()); - ssn.attach(ch); - ssn.sessionAttach(ssn.getName()); - ssn.sessionRequestTimeout(expiryInSeconds); - return ssn; - } - - public DtxSession createDTXSession(int expiryInSeconds) - { - ClientSession clientSession = (ClientSession) createSession(expiryInSeconds); - clientSession.dtxSelect(); - return (DtxSession) clientSession; - } - - public void setClosedListener(ClosedListener closedListner) - { - - _closedListner = closedListner; - } - -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/ClosedListener.java b/java/client/src/main/java/org/apache/qpid/nclient/ClosedListener.java deleted file mode 100644 index 4cf0cab1ec..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/ClosedListener.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient; - -import org.apache.qpid.ErrorCode; - - -/** - * If the communication layer detects a serious problem with a <CODE>connection</CODE>, it - * informs the connection's ExceptionListener - */ -public interface ClosedListener -{ - /** - * If the communication layer detects a serious problem with a connection, it - * informs the connection's ExceptionListener - * @param errorCode TODO - * @param reason TODO - * @param t TODO - * @see Connection - */ - public void onClosed(ErrorCode errorCode, String reason, Throwable t); -}
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Connection.java b/java/client/src/main/java/org/apache/qpid/nclient/Connection.java deleted file mode 100644 index 2d5b50b33a..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/Connection.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient; - -import org.apache.qpid.QpidException; - -/** - * This represents a physical connection to a broker. - */ -public interface Connection -{ - /** - * Establish the connection using the given parameters - * - * @param host host name - * @param port port number - * @param virtualHost the virtual host name - * @param username user name - * @param password password - * @throws QpidException If the communication layer fails to establish the connection. - */ - public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException; - - /** - * Establish the connection with the broker identified by the URL. - * - * @param url Specifies the URL of the broker. - * @throws QpidException If the communication layer fails to connect with the broker, an exception is thrown. - */ - public void connect(String url) throws QpidException; - - /** - * Close this connection. - * - * @throws QpidException if the communication layer fails to close the connection. - */ - public void close() throws QpidException; - - /** - * Create a session for this connection. - * <p> The returned session is suspended - * (i.e. this session is not attached to an underlying channel) - * - * @param expiryInSeconds Expiry time expressed in seconds, if the value is less than - * or equal to 0 then the session does not expire. - * @return A newly created (suspended) session. - */ - public Session createSession(long expiryInSeconds); - - /** - * Create a DtxSession for this connection. - * <p> A Dtx Session must be used when resources have to be manipulated as - * part of a global transaction. - * <p> The retuned DtxSession is suspended - * (i.e. this session is not attached with an underlying channel) - * - * @param expiryInSeconds Expiry time expressed in seconds, if the value is less than or equal - * to 0 then the session does not expire. - * @return A newly created (suspended) DtxSession. - */ - public DtxSession createDTXSession(int expiryInSeconds); - - /** - * If the communication layer detects a serious problem with a connection, it - * informs the connection's ClosedListener - * - * @param exceptionListner The ClosedListener - */ - public void setClosedListener(ClosedListener exceptionListner); -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java b/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java deleted file mode 100644 index 8a859f2d84..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient; - -import org.apache.qpid.transport.Future; -import org.apache.qpid.transport.GetTimeoutResult; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.RecoverResult; -import org.apache.qpid.transport.XaResult; -import org.apache.qpid.transport.Xid; - -/** - * The resources for this session are controlled under the scope of a distributed transaction. - */ -public interface DtxSession extends Session -{ - - /** - * This method is called when messages should be produced and consumed on behalf a transaction - * branch identified by xid. - * possible options are: - * <ul> - * <li> {@link Option#JOIN}: Indicate that the start applies to joining a transaction previously seen. - * <li> {@link Option#RESUME}: Indicate that the start applies to resuming a suspended transaction branch specified. - * </ul> - * - * @param xid Specifies the xid of the transaction branch to be started. - * @param options Possible options are: {@link Option#JOIN} and {@link Option#RESUME}. - * @return Confirms to the client that the transaction branch is started or specify the error condition. - */ - public Future<XaResult> dtxStart(Xid xid, Option... options); - - /** - * This method is called when the work done on behalf of a transaction branch finishes or needs to - * be suspended. - * possible options are: - * <ul> - * <li> {@link Option#FAIL}: indicates that this portion of work has failed; - * otherwise this portion of work has - * completed successfully. - * <li> {@link Option#SUSPEND}: Indicates that the transaction branch is - * temporarily suspended in an incomplete state. - * </ul> - * - * @param xid Specifies the xid of the transaction branch to be ended. - * @param options Available options are: {@link Option#FAIL} and {@link Option#SUSPEND}. - * @return Confirms to the client that the transaction branch is ended or specifies the error condition. - */ - public Future<XaResult> dtxEnd(Xid xid, Option... options); - - /** - * Commit the work done on behalf of a transaction branch. This method commits the work associated - * with xid. Any produced messages are made available and any consumed messages are discarded. - * The only possible option is: - * <ul> - * <li> {@link Option#ONE_PHASE}: When set, one-phase commit optimization is used. - * </ul> - * - * @param xid Specifies the xid of the transaction branch to be committed. - * @param options Available option is: {@link Option#ONE_PHASE} - * @return Confirms to the client that the transaction branch is committed or specifies the error condition. - */ - public Future<XaResult> dtxCommit(Xid xid, Option... options); - - /** - * This method is called to forget about a heuristically completed transaction branch. - * - * @param xid Specifies the xid of the transaction branch to be forgotten. - */ - public void dtxForget(Xid xid, Option ... options); - - /** - * This method obtains the current transaction timeout value in seconds. If set-timeout was not - * used prior to invoking this method, the return value is the default timeout value; otherwise, the - * value used in the previous set-timeout call is returned. - * - * @param xid Specifies the xid of the transaction branch used for getting the timeout. - * @return The current transaction timeout value in seconds. - */ - public Future<GetTimeoutResult> dtxGetTimeout(Xid xid, Option ... options); - - /** - * This method prepares any message produced or consumed on behalf of xid, ready for commitment. - * - * @param xid Specifies the xid of the transaction branch to be prepared. - * @return The status of the prepare operation can be any one of: - * xa-ok: Normal execution. - * <p/> - * xa-rdonly: The transaction branch was read-only and has been committed. - * <p/> - * xa-rbrollback: The broker marked the transaction branch rollback-only for an unspecified - * reason. - * <p/> - * xa-rbtimeout: The work represented by this transaction branch took too long. - */ - public Future<XaResult> dtxPrepare(Xid xid, Option ... options); - - /** - * This method is called to obtain a list of transaction branches that are in a prepared or - * heuristically completed state. - * @return a array of xids to be recovered. - */ - public Future<RecoverResult> dtxRecover(Option ... options); - - /** - * This method rolls back the work associated with xid. Any produced messages are discarded and - * any consumed messages are re-queued. - * - * @param xid Specifies the xid of the transaction branch to be rolled back. - * @return Confirms to the client that the transaction branch is rolled back or specifies the error condition. - */ - public Future<XaResult> dtxRollback(Xid xid, Option ... options); - - /** - * Sets the specified transaction branch timeout value in seconds. - * - * @param xid Specifies the xid of the transaction branch for setting the timeout. - * @param timeout The transaction timeout value in seconds. - */ - public void dtxSetTimeout(Xid xid, long timeout, Option ... options); -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/Session.java deleted file mode 100644 index 0d84394c7c..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/Session.java +++ /dev/null @@ -1,544 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Map; - -import org.apache.qpid.transport.*; -import org.apache.qpid.api.Message; - -/** - * <p>A session is associated with a connection. - * When it is created, a session is not associated with an underlying channel. - * The session is single threaded. </p> - * <p/> - * All the Session commands are asynchronous. Synchronous behavior is achieved through invoking the sync method. - * For example, <code>command1</code> will be synchronously invoked by using the following sequence: - * <ul> - * <li> <code>session.command1()</code> - * <li> <code>session.sync()</code> - * </ul> - */ -public interface Session -{ - public static final short TRANSFER_ACQUIRE_MODE_NO_ACQUIRE = 1; - public static final short TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE = 0; - public static final short TRANSFER_CONFIRM_MODE_REQUIRED = 0; - public static final short TRANSFER_CONFIRM_MODE_NOT_REQUIRED = 1; - public static final short MESSAGE_FLOW_MODE_CREDIT = 0; - public static final short MESSAGE_FLOW_MODE_WINDOW = 1; - public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0; - public static final short MESSAGE_FLOW_UNIT_BYTE = 1; - public static final long MESSAGE_FLOW_MAX_BYTES = 0xFFFFFFFF; - public static final short MESSAGE_REJECT_CODE_GENERIC = 0; - public static final short MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED = 1; - public static final short MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE = 0; - public static final short MESSAGE_ACQUIRE_MESSAGES_IF_ALL_ARE_AVAILABLE = 1; - - //------------------------------------------------------ - // Session housekeeping methods - //------------------------------------------------------ - - /** - * Sync method will block the session until all outstanding commands - * are executed. - */ - public void sync(); - - public void close(); - - public void sessionDetach(byte[] name, Option ... options); - - public void sessionRequestTimeout(long expiry, Option ... options); - - public byte[] getName(); - - public void setAutoSync(boolean value); - - //------------------------------------------------------ - // Messaging methods - // Producer - //------------------------------------------------------ - /** - * Transfer a message to a specified exchange. - * <p/> - * <p>This transfer provides a complete message - * using a single method. The method is internally mapped to messageTransfer() and headers() followed - * by data() and endData(). - * <b><i>This method should only be used by small messages.</b></i></p> - * - * @param destination The exchange the message is being sent to. - * @param msg The Message to be sent. - * @param confirmMode <ul> </li>off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation - * is not required. Once a message has been transferred in pre-acquire - * mode (or once acquire has been sent in no-acquire mode) the message is considered - * transferred. - * <p/> - * <li> on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message - * is not considered transferred until the original - * transfer is complete. A complete transfer is signaled by execution.complete. - * </ul> - * @param acquireMode <ul> - * <li> no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message - * must be explicitly acquired. - * <li> pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message is - * acquired when the transfer starts. - * </ul> - * @throws java.io.IOException If transferring a message fails due to some internal communication error, an exception is thrown. - */ - public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode) - throws IOException; - - - /** - * This command transfers a message between two peers. - * - * @param destination Specifies the destination to which the message is to be transferred. - * @param acceptMode Indicates whether message.accept, session.complete, - * or nothing at all is required to indicate successful transfer of the message. - * - * @param acquireMode Indicates whether or not the transferred message has been acquired. - */ - public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, - Header header, ByteBuffer body, Option ... options); - - /** - * This command transfers a message between two peers. - * - * @param destination Specifies the destination to which the message is to be transferred. - * @param acceptMode Indicates whether message.accept, session.complete, - * or nothing at all is required to indicate successful transfer of the message. - * - * @param acquireMode Indicates whether or not the transferred message has been acquired. - */ - public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, - Header header, byte[] body, Option ... options); - - /** - * This command transfers a message between two peers. - * - * @param destination Specifies the destination to which the message is to be transferred. - * @param acceptMode Indicates whether message.accept, session.complete, - * or nothing at all is required to indicate successful transfer of the message. - * - * @param acquireMode Indicates whether or not the transferred message has been acquired. - */ - public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, - Header header, String body, Option ... options); - - //------------------------------------------------------ - // Messaging methods - // Consumer - //------------------------------------------------------ - - /** - * Associate a message listener with a destination. - * <p> The destination is bound to a queue, and messages are filtered based - * on the provider filter map (message filtering is specific to the provider and in some cases might not be handled). - * <p> The valid options are: - * <ul> - * <li>{@link Option#EXCLUSIVE}: <p> Requests exclusive subscription access, so that only this - * subscription can access the queue. - * <li>{@link Option#NONE}: <p> This is an empty option, and has no effect. - * </ul> - * - * @param queue The queue that the receiver is receiving messages from. - * @param destination The destination, or delivery tag, for the subscriber. - * @param confirmMode <ul> </li>off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation - * is not required. Once a message has been transferred in pre-acquire - * mode (or once acquire has been sent in no-acquire mode) the message is considered - * transferred. - * <p/> - * <li> on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message - * is not considered transferred until the original - * transfer is complete. A complete transfer is signaled by execution.complete. - * </ul> - * @param acquireMode <ul> - * <li> no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message must - * be explicitly acquired. - * <li> pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message is - * acquired when the transfer starts. - * </ul> - * @param listener The listener for this destination. To transfer large messages - * use a {@link org.apache.qpid.nclient.MessagePartListener}. - * @param options Set of options. Valid options are {{@link Option#EXCLUSIVE} - * and {@link Option#NONE}. - * @param filter A set of filters for the subscription. The syntax and semantics of these filters varies - * according to the provider's implementation. - */ - public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, - MessagePartListener listener, Map<String, Object> filter, Option... options); - - /** - * This method cancels a consumer. The server will not send any more messages to the specified destination. - * This does not affect already delivered messages. - * The client may receive a - * number of messages in between sending the cancel method and receiving - * notification that the cancellation has been completed. - * - * @param destination The destination to be cancelled. - */ - public void messageCancel(String destination, Option ... options); - - /** - * Associate a message listener with a destination. - * <p> Only one listener is permitted for each destination. When a new listener is created, - * it replaces the previous message listener. To prevent message loss, this occurs only when the original listener - * has completed processing a message. - * - * @param destination The destination the listener is associated with. - * @param listener The new listener for this destination. - */ - public void setMessageListener(String destination, MessagePartListener listener); - - /** - * Sets the mode of flow control used for a given destination. - * <p> With credit based flow control, the broker continually maintains its current - * credit balance with the recipient. The credit balance consists of two values, a message - * count, and a byte count. Whenever message data is sent, both counts must be decremented. - * If either value reaches zero, the flow of message data must stop. Additional credit is - * received via the {@link Session#messageFlow} method. - * <p> Window based flow control is identical to credit based flow control, however message - * acknowledgment implicitly grants a single unit of message credit, and the size of the - * message in byte credits for each acknowledged message. - * - * @param destination The destination to set the flow mode on. - * @param mode <ul> <li>credit ({@link Session#MESSAGE_FLOW_MODE_CREDIT}): choose credit based flow control - * <li> window ({@link Session#MESSAGE_FLOW_MODE_WINDOW}): choose window based flow control</ul> - */ - public void messageSetFlowMode(String destination, MessageFlowMode mode, Option ... options); - - - /** - * This method controls the flow of message data to a given destination. It is used by the - * recipient of messages to dynamically match the incoming rate of message flow to its - * processing or forwarding capacity. Upon receipt of this method, the sender must add "value" - * number of the specified unit to the available credit balance for the specified destination. - * A value of 0 indicates an infinite amount of credit. This disables any limit for - * the given unit until the credit balance is zeroed with {@link Session#messageStop} - * or {@link Session#messageFlush}. - * - * @param destination The destination to set the flow. - * @param unit Specifies the unit of credit balance. - * <p/> - * One of: <ul> - * <li> message ({@link Session#MESSAGE_FLOW_UNIT_MESSAGE}) - * <li> byte ({@link Session#MESSAGE_FLOW_UNIT_BYTE}) - * </ul> - * @param value Number of credits, a value of 0 indicates an infinite amount of credit. - */ - public void messageFlow(String destination, MessageCreditUnit unit, long value, Option ... options); - - /** - * Forces the broker to exhaust its credit supply. - * <p> The credit on the broker will remain at zero once - * this method is completed. - * - * @param destination The destination on which the credit supply is to be exhausted. - */ - public void messageFlush(String destination, Option ... options); - - /** - * On receipt of this method, the brokers set credit to zero for a given - * destination. When confirmation of this method - * is issued credit is set to zero. No further messages will be sent until - * further credit is received. - * - * @param destination The destination on which to reset credit. - */ - public void messageStop(String destination, Option ... options); - - /** - * Acknowledge the receipt of a range of messages. - * <p>Messages must already be acquired, either by receiving them in - * pre-acquire mode or by explicitly acquiring them. - * - * @param ranges Range of messages to be acknowledged. - * @param accept pecify whether to send a message accept to the broker - */ - public void messageAcknowledge(RangeSet ranges, boolean accept); - - /** - * Reject a range of acquired messages. - * <p>The broker will deliver rejected messages to the - * alternate-exchange on the queue from which it came. If no alternate-exchange is - * defined for that queue the broker will discard the message. - * - * @param ranges Range of messages to be rejected. - * @param code The reject code must be one of {@link Session#MESSAGE_REJECT_CODE_GENERIC} or - * {@link Session#MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED} (immediate delivery was attempted but - * failed). - * @param text String describing the reason for a message transfer rejection. - */ - public void messageReject(RangeSet ranges, MessageRejectCode code, String text, Option ... options); - - /** - * As it is possible that the broker does not manage to reject some messages, after completion of - * {@link Session#messageReject} this method will return the ranges of rejected messages. - * <p> Note that {@link Session#messageReject} and this methods are asynchronous therefore for accessing to the - * previously rejected messages this method must be invoked in conjunction with {@link Session#sync()}. - * <p> A recommended invocation sequence would be: - * <ul> - * <li> {@link Session#messageReject} - * <li> {@link Session#sync()} - * <li> {@link Session#getRejectedMessages()} - * </ul> - * - * @return The rejected message ranges - */ - public RangeSet getRejectedMessages(); - - /** - * Try to acquire ranges of messages hence releasing them form the queue. - * This means that once acknowledged, a message will not be delivered to any other receiver. - * <p> As those messages may have been consumed by another receivers hence, - * message acquisition can fail. - * The outcome of the acquisition is returned as an array of ranges of qcquired messages. - * <p> This method should only be called on non-acquired messages. - * - * @param ranges Ranges of messages to be acquired. - * @return Indicates the acquired messages - */ - public Future<Acquired> messageAcquire(RangeSet ranges, Option ... options); - - /** - * Give up responsibility for processing ranges of messages. - * <p> Released messages are re-enqueued. - * - * @param ranges Ranges of messages to be released. - * @param options Valid option is: {@link Option#SET_REDELIVERED}) - */ - public void messageRelease(RangeSet ranges, Option ... options); - - // ----------------------------------------------- - // Local transaction methods - // ---------------------------------------------- - /** - * Selects the session for local transaction support. - */ - public void txSelect(Option ... options); - - /** - * Commit the receipt and delivery of all messages exchanged by this session's resources. - * - * @throws IllegalStateException If this session is not transacted, an exception will be thrown. - */ - public void txCommit(Option ... options) throws IllegalStateException; - - /** - * Roll back the receipt and delivery of all messages exchanged by this session's resources. - * - * @throws IllegalStateException If this session is not transacted, an exception will be thrown. - */ - public void txRollback(Option ... options) throws IllegalStateException; - - //--------------------------------------------- - // Queue methods - //--------------------------------------------- - - /** - * Declare a queue with the given queueName - * <p> Following are the valid options: - * <ul> - * <li> {@link Option#AUTO_DELETE}: <p> If this field is set and the exclusive field is also set, - * then the queue is deleted when the connection closes. - * If this field is set and the exclusive field is not set the queue is deleted when all - * the consumers have finished using it. - * <li> {@link Option#DURABLE}: <p> If set when creating a new queue, - * the queue will be marked as durable. Durable queues - * remain active when a server restarts. Non-durable queues (transient queues) are purged - * if/when a server restarts. Note that durable queues do not necessarily hold persistent - * messages, although it does not make sense to send persistent messages to a transient - * queue. - * <li> {@link Option#EXCLUSIVE}: <p> Exclusive queues can only be used from one connection at a time. - * Once a connection declares an exclusive queue, that queue cannot be used by any other connections until the - * declaring connection closes. - * <li> {@link Option#PASSIVE}: <p> If set, the server will not create the queue. - * This field allows the client to assert the presence of a queue without modifying the server state. - * <li>{@link Option#NONE}: <p> Has no effect as it represents an empty option. - * </ul> - * <p>In the absence of a particular option, the defaul value is false for each option - * - * @param queueName The name of the delcared queue. - * @param alternateExchange If a message is rejected by a queue, then it is sent to the alternate-exchange. A message - * may be rejected by a queue for the following reasons: - * <oL> <li> The queue is deleted when it is not empty; - * <li> Immediate delivery of a message is requested, but there are no consumers connected to - * the queue. </ol> - * @param arguments Used for backward compatibility - * @param options Set of Options ( valide options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE}, - * {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and {@link Option#NONE}) - * @see Option - */ - public void queueDeclare(String queueName, String alternateExchange, Map<String, Object> arguments, - Option... options); - - /** - * Bind a queue with an exchange. - * - * @param queueName Specifies the name of the queue to bind. If the queue name is empty, refers to the current - * queue for the session, which is the last declared queue. - * @param exchangeName The exchange name. - * @param routingKey Specifies the routing key for the binding. The routing key is used for routing messages - * depending on the exchange configuration. Not all exchanges use a routing key - refer to - * the specific exchange documentation. If the queue name is empty, the server uses the last - * queue declared on the session. If the routing key is also empty, the server uses this - * queue name for the routing key as well. If the queue name is provided but the routing key - * is empty, the server does the binding with that empty routing key. The meaning of empty - * routing keys depends on the exchange implementation. - * @param arguments Used for backward compatibility - */ - public void exchangeBind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments, - Option ... options); - - /** - * Unbind a queue from an exchange. - * - * @param queueName Specifies the name of the queue to unbind. - * @param exchangeName The name of the exchange to unbind from. - * @param routingKey Specifies the routing key of the binding to unbind. - */ - public void exchangeUnbind(String queueName, String exchangeName, String routingKey, Option ... options); - - /** - * This method removes all messages from a queue. It does not cancel consumers. Purged messages - * are deleted without any formal "undo" mechanism. - * - * @param queueName Specifies the name of the queue to purge. If the queue name is empty, refers to the - * current queue for the session, which is the last declared queue. - */ - public void queuePurge(String queueName, Option ... options); - - /** - * This method deletes a queue. When a queue is deleted any pending messages are sent to a - * dead-letter queue if this is defined in the server configuration, and all consumers on the - * queue are cancelled. - * <p> Following are the valid options: - * <ul> - * <li> {@link Option#IF_EMPTY}: <p> If set, the server will only delete the queue if it has no messages. - * <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the queue if it has no consumers. - * If the queue has consumers the server does does not delete it but raises a channel exception instead. - * <li>{@link Option#NONE}: <p> Has no effect as it represents an empty option. - * </ul> - * </p> - * <p/> - * <p>In the absence of a particular option, the defaul value is false for each option</p> - * - * @param queueName Specifies the name of the queue to delete. If the queue name is empty, refers to the - * current queue for the session, which is the last declared queue. - * @param options Set of options (Valid options are: {@link Option#IF_EMPTY}, {@link Option#IF_UNUSED} - * and {@link Option#NONE}) - * @see Option - */ - public void queueDelete(String queueName, Option... options); - - - /** - * This method is used to request information on a particular queue. - * - * @param queueName The name of the queue for which information is requested. - * @return Information on the specified queue. - */ - public Future<QueueQueryResult> queueQuery(String queueName, Option ... options); - - - /** - * This method is used to request information on a particular binding. - * - * @param exchange The exchange name. - * @param queue The queue name. - * @param routingKey The routing key - * @param arguments bacward compatibilties params. - * @return Information on the specified binding. - */ - public Future<ExchangeBoundResult> exchangeBound(String exchange, String queue, String routingKey, - Map<String, Object> arguments, Option ... options); - - // -------------------------------------- - // exhcange methods - // -------------------------------------- - - /** - * This method creates an exchange. If the exchange already exists, - * the method verifies the class and checks the details are correct. - * <p>Valid options are: - * <ul> - * <li>{@link Option#AUTO_DELETE}: <p>If set, the exchange is deleted when all queues have finished using it. - * <li>{@link Option#DURABLE}: <p>If set, the exchange will - * be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient - * exchanges) are purged when a server restarts. - * <li>{@link Option#PASSIVE}: <p>If set, the server will not create the exchange. - * The client can use this to check whether an exchange exists without modifying the server state. - * <li> {@link Option#NONE}: <p>This option is an empty option, and has no effect. - * </ul> - * <p>In the absence of a particular option, the defaul value is false for each option</p> - * - * @param exchangeName The exchange name. - * @param type Each exchange belongs to one of a set of exchange types implemented by the server. The - * exchange types define the functionality of the exchange - i.e. how messages are routed - * through it. It is not valid or meaningful to attempt to change the type of an existing - * exchange. Default exchange types are: direct, topic, headers and fanout. - * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which - * the message will be sent. - * @param options Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE}, - * {@link Option#PASSIVE}, {@link Option#NONE}) - * @param arguments Used for backward compatibility - * @see Option - */ - public void exchangeDeclare(String exchangeName, String type, String alternateExchange, - Map<String, Object> arguments, Option... options); - - /** - * This method deletes an exchange. When an exchange is deleted all queue bindings on the - * exchange are cancelled. - * <p> Following are the valid options: - * <ul> - * <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the exchange if it has no queue bindings. If the - * exchange has queue bindings the server does not delete it but raises a channel exception - * instead. - * <li> {@link Option#NONE}: <p> Has no effect as it represents an empty option. - * </ul> - * <p>Note that if an option is not set, it will default to false. - * - * @param exchangeName The name of exchange to be deleted. - * @param options Set of options. Valid options are: {@link Option#IF_UNUSED}, {@link Option#NONE}. - * @see Option - */ - public void exchangeDelete(String exchangeName, Option... options); - - - /** - * This method is used to request information about a particular exchange. - * - * @param exchangeName The name of the exchange about which information is requested. If not set, the method will - * return information about the default exchange. - * @return Information on the specified exchange. - */ - public Future<ExchangeQueryResult> exchangeQuery(String exchangeName, Option ... options); - - /** - * If the session receives a sessionClosed with an error code it - * informs the session's exceptionListener - * - * @param exceptionListner The exceptionListener - */ - public void setClosedListener(ClosedListener exceptionListner); -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java deleted file mode 100644 index 869f974ae6..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java +++ /dev/null @@ -1,163 +0,0 @@ -package org.apache.qpid.nclient.impl; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.io.EOFException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.qpid.QpidException; -import org.apache.qpid.api.Message; -import org.apache.qpid.nclient.ClosedListener; -import org.apache.qpid.nclient.MessagePartListener; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.Range; -import org.apache.qpid.transport.RangeSet; - -import static org.apache.qpid.transport.Option.*; - -/** - * Implements a Qpid Sesion. - */ -public class ClientSession extends org.apache.qpid.transport.Session implements org.apache.qpid.nclient.DtxSession -{ - static - { - String max = "message_size_before_sync"; // KB's - try - { - MAX_NOT_SYNC_DATA_LENGH = new Long(System.getProperties().getProperty(max, "200000000")); - } - catch (NumberFormatException e) - { - // use default size - MAX_NOT_SYNC_DATA_LENGH = 200000000; - } - String flush = "message_size_before_flush"; - try - { - MAX_NOT_FLUSH_DATA_LENGH = new Long(System.getProperties().getProperty(flush, "2000000")); - } - catch (NumberFormatException e) - { - // use default size - MAX_NOT_FLUSH_DATA_LENGH = 20000000; - } - } - - private static long MAX_NOT_SYNC_DATA_LENGH; - private static long MAX_NOT_FLUSH_DATA_LENGH; - - private Map<String,MessagePartListener> _messageListeners = new ConcurrentHashMap<String,MessagePartListener>(); - private ClosedListener _exceptionListner; - private RangeSet _rejectedMessages; - private long _currentDataSizeNotSynced; - private long _currentDataSizeNotFlushed; - - public ClientSession(byte[] name) - { - super(name); - } - - public void messageAcknowledge(RangeSet ranges, boolean accept) - { - for (Range range : ranges) - { - super.processed(range); - } - super.flushProcessed(accept ? BATCH : NONE); - if (accept) - { - messageAccept(ranges); - } - } - - public void messageSubscribe(String queue, String destination, short acceptMode, short acquireMode, MessagePartListener listener, Map<String, Object> filter, Option... options) - { - setMessageListener(destination,listener); - super.messageSubscribe(queue, destination, MessageAcceptMode.get(acceptMode), - MessageAcquireMode.get(acquireMode), null, 0, filter, - options); - } - - public void messageTransfer(String destination, Message msg, short acceptMode, short acquireMode) throws IOException - { - DeliveryProperties dp = msg.getDeliveryProperties(); - MessageProperties mp = msg.getMessageProperties(); - ByteBuffer body = msg.readData(); - int size = body.remaining(); - super.messageTransfer - (destination, MessageAcceptMode.get(acceptMode), - MessageAcquireMode.get(acquireMode), - new Header(dp, mp), body); - _currentDataSizeNotSynced += size; - _currentDataSizeNotFlushed += size; - } - - public void sync() - { - super.sync(); - _currentDataSizeNotSynced = 0; - } - - public RangeSet getRejectedMessages() - { - return _rejectedMessages; - } - - public void setMessageListener(String destination, MessagePartListener listener) - { - if (listener == null) - { - throw new IllegalArgumentException("Cannot set message listener to null"); - } - _messageListeners.put(destination, listener); - } - - public void setClosedListener(ClosedListener exceptionListner) - { - _exceptionListner = exceptionListner; - } - - void setRejectedMessages(RangeSet rejectedMessages) - { - _rejectedMessages = rejectedMessages; - } - - void notifyException(QpidException ex) - { - _exceptionListner.onClosed(null, null, null); - } - - Map<String,MessagePartListener> getMessageListeners() - { - return _messageListeners; - } -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java deleted file mode 100644 index 6bcd4fbce5..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java +++ /dev/null @@ -1,68 +0,0 @@ -package org.apache.qpid.nclient.impl; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.nio.ByteBuffer; - -import org.apache.qpid.ErrorCode; - -import org.apache.qpid.nclient.MessagePartListener; - -import org.apache.qpid.QpidException; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageReject; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Range; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionDetached; -import org.apache.qpid.transport.SessionDelegate; - - -public class ClientSessionDelegate extends SessionDelegate -{ - - // -------------------------------------------- - // Message methods - // -------------------------------------------- - @Override public void messageTransfer(Session session, MessageTransfer xfr) - { - MessagePartListener listener = ((ClientSession)session).getMessageListeners() - .get(xfr.getDestination()); - listener.messageTransfer(xfr); - } - - @Override public void messageReject(Session session, MessageReject struct) - { - for (Range range : struct.getTransfers()) - { - for (long l = range.getLower(); l <= range.getUpper(); l++) - { - System.out.println("message rejected: " + - session.getCommand((int) l)); - } - } - ((ClientSession)session).setRejectedMessages(struct.getTransfers()); - ((ClientSession)session).notifyException(new QpidException("Message Rejected",ErrorCode.MESSAGE_REJECTED,null)); - session.processed(struct); - } - -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java index 155de2a678..a1dc48fcda 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java @@ -23,60 +23,55 @@ package org.apache.qpid.nclient.impl; import org.apache.qpid.ErrorCode; import org.apache.qpid.api.Message; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.ClosedListener; -import org.apache.qpid.nclient.Session; -import org.apache.qpid.nclient.util.MessageListener; -import org.apache.qpid.nclient.util.MessagePartListenerAdapter; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; import java.nio.ByteBuffer; import java.util.UUID; public class DemoClient { - public static MessagePartListenerAdapter createAdapter() + public static class DemoListener implements SessionListener { - return new MessagePartListenerAdapter(new MessageListener() + public void opened(Session ssn) {} + + public void exception(Session ssn, SessionException exc) + { + System.out.println(exc); + } + + public void message(Session ssn, MessageTransfer m) { - public void onMessage(Message m) - { - System.out.println("\n================== Received Msg =================="); - System.out.println("Message Id : " + m.getMessageProperties().getMessageId()); - System.out.println(m.toString()); - System.out.println("================== End Msg ==================\n"); - } - - }); + System.out.println("\n================== Received Msg =================="); + System.out.println("Message Id : " + m.getHeader().get(MessageProperties.class).getMessageId()); + System.out.println(m.toString()); + System.out.println("================== End Msg ==================\n"); + } + + public void closed(Session ssn) {} } public static final void main(String[] args) { - Connection conn = Client.createConnection(); - try{ - conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); - }catch(Exception e){ - e.printStackTrace(); - } + Connection conn = new Connection(); + conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); Session ssn = conn.createSession(50000); - ssn.setClosedListener(new ClosedListener() - { - public void onClosed(ErrorCode errorCode, String reason, Throwable t) - { - System.out.println("ErrorCode : " + errorCode + " reason : " + reason); - } - }); + ssn.setSessionListener(new DemoListener()); ssn.queueDeclare("queue1", null, null); ssn.exchangeBind("queue1", "amq.direct", "queue1",null); ssn.sync(); - ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null); + ssn.messageSubscribe("queue1", "myDest", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); // queue ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, @@ -91,9 +86,12 @@ public class DemoClient ssn.sync(); // topic subs - ssn.messageSubscribe("topic1", "myDest2", (short)0, (short)0,createAdapter(), null); - ssn.messageSubscribe("topic2", "myDest3", (short)0, (short)0,createAdapter(), null); - ssn.messageSubscribe("topic3", "myDest4", (short)0, (short)0,createAdapter(), null); + ssn.messageSubscribe("topic1", "myDest2", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); + ssn.messageSubscribe("topic2", "myDest3", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); + ssn.messageSubscribe("topic3", "myDest4", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); ssn.sync(); ssn.queueDeclare("topic1", null, null); diff --git a/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java b/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java index dd4a78fa2b..6c6cc308e9 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java @@ -28,12 +28,6 @@ import java.util.Map; import org.apache.qpid.ErrorCode; import org.apache.qpid.QpidException; import org.apache.qpid.api.Message; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.ClosedListener; -import org.apache.qpid.nclient.Session; -import org.apache.qpid.nclient.util.MessageListener; -import org.apache.qpid.nclient.util.MessagePartListenerAdapter; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; @@ -41,9 +35,13 @@ import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageCreditUnit; import org.apache.qpid.transport.MessageFlowMode; import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; -public class BasicInteropTest implements ClosedListener +public class BasicInteropTest implements SessionListener { private Session session; @@ -62,7 +60,7 @@ public class BasicInteropTest implements ClosedListener public void testCreateConnection(){ System.out.println("------- Creating connection--------"); - conn = Client.createConnection(); + conn = new Connection(); try{ conn.connect(host, 5672, "test", "guest", "guest"); }catch(Exception e){ @@ -116,23 +114,11 @@ public class BasicInteropTest implements ClosedListener public void testSubscribe() { System.out.println("------- Sending a subscribe --------"); + session.setSessionListener(this); session.messageSubscribe("testQueue", "myDest", - Session.TRANSFER_CONFIRM_MODE_REQUIRED, - Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, - new MessagePartListenerAdapter(new MessageListener(){ - - public void onMessage(Message message) - { - System.out.println("--------Message Received--------"); - System.out.println(message.toString()); - System.out.println("--------/Message Received--------"); - RangeSet ack = new RangeSet(); - ack.add(message.getMessageTransferId(),message.getMessageTransferId()); - session.messageAcknowledge(ack, true); - } - - }), - null); + MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); System.out.println("------- Setting Credit mode --------"); session.messageSetFlowMode("myDest", MessageFlowMode.WINDOW); @@ -141,20 +127,32 @@ public class BasicInteropTest implements ClosedListener session.messageFlow("myDest", MessageCreditUnit.BYTE, -1); } + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) + { + System.out.println("--------Message Received--------"); + System.out.println(xfr.toString()); + System.out.println("--------/Message Received--------"); + ssn.processed(xfr); + ssn.flushProcessed(); + } + public void testMessageFlush() { session.messageFlush("myDest"); session.sync(); } - public void onClosed(ErrorCode errorCode, String reason, Throwable t) + public void exception(Session ssn, SessionException exc) { System.out.println("------- Broker Notified an error --------"); - System.out.println("------- " + errorCode + " --------"); - System.out.println("------- " + reason + " --------"); + System.out.println("------- " + exc + " --------"); System.out.println("------- /Broker Notified an error --------"); } + public void closed(Session ssn) {} + public static void main(String[] args) throws QpidException { String host = "0.0.0.0"; |
