From 394823bba7976c170ac58e53b5d80ad12e0f1690 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Thu, 9 Oct 2008 17:07:59 +0000 Subject: QPID-1339: refactor of low level client API to permit connections to exist in a disconnected state as well as to provide a central point from which to track session state git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703208 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 2 +- .../apache/qpid/client/AMQConnectionDelegate.java | 2 +- .../qpid/client/AMQConnectionDelegate_0_10.java | 50 +- .../qpid/client/AMQConnectionDelegate_8_0.java | 2 +- .../org/apache/qpid/client/AMQSession_0_10.java | 98 ++-- .../qpid/client/BasicMessageConsumer_0_10.java | 37 +- .../qpid/client/BasicMessageProducer_0_10.java | 1 - .../org/apache/qpid/client/XAResourceImpl.java | 24 +- .../java/org/apache/qpid/client/XASessionImpl.java | 11 +- .../client/message/AMQMessageDelegate_0_10.java | 3 +- .../client/message/UnprocessedMessage_0_10.java | 4 +- .../main/java/org/apache/qpid/nclient/Client.java | 295 ----------- .../org/apache/qpid/nclient/ClosedListener.java | 39 -- .../java/org/apache/qpid/nclient/Connection.java | 86 ---- .../java/org/apache/qpid/nclient/DtxSession.java | 137 ------ .../main/java/org/apache/qpid/nclient/Session.java | 544 --------------------- .../apache/qpid/nclient/impl/ClientSession.java | 163 ------ .../qpid/nclient/impl/ClientSessionDelegate.java | 68 --- .../org/apache/qpid/nclient/impl/DemoClient.java | 66 ++- .../qpid/nclient/interop/BasicInteropTest.java | 54 +- 20 files changed, 180 insertions(+), 1506 deletions(-) delete mode 100644 java/client/src/main/java/org/apache/qpid/nclient/Client.java delete mode 100644 java/client/src/main/java/org/apache/qpid/nclient/ClosedListener.java delete mode 100644 java/client/src/main/java/org/apache/qpid/nclient/Connection.java delete mode 100644 java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java delete mode 100644 java/client/src/main/java/org/apache/qpid/nclient/Session.java delete mode 100644 java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java delete mode 100644 java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 27294562e5..ebeb29af78 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -917,7 +917,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // adjust timeout timeout = adjustTimeout(timeout, startCloseTime); - _delegate.closeConneciton(timeout); + _delegate.closeConnection(timeout); //If the taskpool hasn't shutdown by now then give it shutdownNow. // This will interupt any running tasks. diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 7f36ec6e99..60b827a426 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -42,5 +42,5 @@ public interface AMQConnectionDelegate public void resubscribeSessions() throws JMSException, AMQException, FailoverException; - public void closeConneciton(long timeout) throws JMSException, AMQException; + public void closeConnection(long timeout) throws JMSException, AMQException; } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index c723709d27..a7f04a2090 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -23,6 +23,7 @@ package org.apache.qpid.client; import java.io.IOException; +import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.XASession; @@ -33,15 +34,18 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.ClosedListener; import org.apache.qpid.ErrorCode; -import org.apache.qpid.QpidException; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionClose; +import org.apache.qpid.transport.ConnectionException; +import org.apache.qpid.transport.ConnectionListener; import org.apache.qpid.transport.ProtocolVersionException; +import org.apache.qpid.transport.TransportException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ClosedListener +public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ConnectionListener { /** * This class logger. @@ -56,7 +60,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed /** * The QpidConeection instance that is mapped with thie JMS connection. */ - org.apache.qpid.nclient.Connection _qpidConnection; + org.apache.qpid.transport.Connection _qpidConnection; //--- constructor public AMQConnectionDelegate_0_10(AMQConnection conn) @@ -125,7 +129,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed */ public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException { - _qpidConnection = Client.createConnection(); + _qpidConnection = new Connection(); try { if (_logger.isDebugEnabled()) @@ -134,16 +138,16 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed .getHost() + " port: " + brokerDetail.getPort() + " virtualhost: " + _conn .getVirtualHost() + "user name: " + _conn.getUsername() + "password: " + _conn.getPassword()); } + _qpidConnection.setConnectionListener(this); _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(), _conn.getUsername(), _conn.getPassword()); - _qpidConnection.setClosedListener(this); _conn._connected = true; } catch(ProtocolVersionException pe) { return new ProtocolVersion(pe.getMajor(), pe.getMinor()); } - catch (QpidException e) + catch (ConnectionException e) { throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e); } @@ -161,34 +165,42 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed } - public void closeConneciton(long timeout) throws JMSException, AMQException + public void closeConnection(long timeout) throws JMSException, AMQException { try { _qpidConnection.close(); } - catch (QpidException e) + catch (TransportException e) { - throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot close connection", e); + throw new AMQException(e.getMessage(), e); } - } - public void onClosed(ErrorCode errorCode, String reason, Throwable t) + public void opened(Connection conn) {} + + public void exception(Connection conn, ConnectionException exc) { - if (_logger.isDebugEnabled()) + ExceptionListener listener = _conn._exceptionListener; + if (listener == null) { - _logger.debug("Received a connection close from the broker: Error code : " + errorCode.getCode(), t); + _logger.error("connection exception: " + conn, exc); } - if (_conn._exceptionListener != null) + else { - JMSException ex = new JMSException(reason,String.valueOf(errorCode.getCode())); - if (t != null) + ConnectionClose close = exc.getClose(); + String code = null; + if (close != null) { - ex.initCause(t); + code = close.getReplyCode().toString(); } + JMSException ex = new JMSException(exc.getMessage(), code); + ex.initCause(exc); _conn._exceptionListener.onException(ex); } } + + public void closed(Connection conn) {} + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 2ec8737d16..8d42a2f201 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -58,7 +58,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate private AMQConnection _conn; - public void closeConneciton(long timeout) throws JMSException, AMQException + public void closeConnection(long timeout) throws JMSException, AMQException { _conn.getProtocolHandler().closeConnection(timeout); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 46a667419d..7829966315 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -28,20 +28,27 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.FiledTableSupport; import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.message.UnprocessedMessage_0_10; import org.apache.qpid.util.Serial; -import org.apache.qpid.nclient.Session; -import org.apache.qpid.nclient.util.MessagePartListenerAdapter; -import org.apache.qpid.ErrorCode; -import org.apache.qpid.QpidException; +import org.apache.qpid.transport.ExecutionException; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageCreditUnit; import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.transport.RangeSet; import org.apache.qpid.transport.Option; import org.apache.qpid.transport.ExchangeBoundResult; import org.apache.qpid.transport.Future; +import org.apache.qpid.transport.Range; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.qpid.transport.Option.*; + import javax.jms.*; import javax.jms.IllegalStateException; @@ -53,6 +60,7 @@ import java.util.Map; * This is a 0.10 Session */ public class AMQSession_0_10 extends AMQSession + implements SessionListener { /** @@ -70,10 +78,10 @@ public class AMQSession_0_10 extends AMQSession 0) { - getQpidSession().messageAcknowledge + messageAcknowledge (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); clearUnacked(); } } + void messageAcknowledge(RangeSet ranges, boolean accept) + { + Session ssn = getQpidSession(); + for (Range range : ranges) + { + ssn.processed(range); + } + ssn.flushProcessed(accept ? BATCH : NONE); + if (accept) + { + ssn.messageAccept(ranges); + } + } + /** * Bind a queue with an exchange. * @@ -416,11 +438,11 @@ public class AMQSession_0_10 extends AMQSession 0 ) { - getQpidSession().messageAcknowledge(_txRangeSet, true); + messageAcknowledge(_txRangeSet, true); _txRangeSet.clear(); _txSize = 0; } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 2a37298a43..7d535643c0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicBoolean; * This is a 0.10 message consumer. */ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer - implements org.apache.qpid.nclient.MessagePartListener { /** @@ -114,9 +113,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer brokers = parser.getAllBrokerDetails(); - BrokerDetails brokerDetail = brokers.get(0); - connect(brokerDetail.getHost(), brokerDetail.getPort(), brokerDetail.getProperty("virtualhost"), - brokerDetail.getProperty("username")== null? "guest":brokerDetail.getProperty("username"), - brokerDetail.getProperty("password")== null? "guest":brokerDetail.getProperty("password")); - } - - /* - * Until the dust settles with the URL disucssion - * I am not going to implement this. - */ - public void connect(QpidURL url) throws QpidException - { - throw new UnsupportedOperationException("Not implemented"); - } - - /* { - // temp impl to tests - BrokerDetails details = url.getAllBrokerDetails().get(0); - connect(details.getHost(), - details.getPort(), - details.getVirtualHost(), - details.getUserName(), - details.getPassword()); - } -*/ - - public void close() throws QpidException - { - Channel ch = _conn.getChannel(0); - ch.connectionClose(ConnectionCloseCode.NORMAL, "client is closing"); - _lock.lock(); - try - { - try - { - long start = System.currentTimeMillis(); - long elapsed = 0; - while (!closed && elapsed < timeout) - { - closeOk.await(timeout - elapsed, TimeUnit.MILLISECONDS); - elapsed = System.currentTimeMillis() - start; - } - if(!closed) - { - throw new QpidException("Timed out when closing connection", ErrorCode.CONNECTION_ERROR, null); - } - } - catch (InterruptedException e) - { - throw new QpidException("Interrupted when closing connection", ErrorCode.CONNECTION_ERROR, null); - } - } - finally - { - _lock.unlock(); - } - _conn.close(); - } - - public Session createSession(long expiryInSeconds) - { - Channel ch = _conn.getChannel(); - ClientSession ssn = new ClientSession(UUID.randomUUID().toString().getBytes()); - ssn.attach(ch); - ssn.sessionAttach(ssn.getName()); - ssn.sessionRequestTimeout(expiryInSeconds); - return ssn; - } - - public DtxSession createDTXSession(int expiryInSeconds) - { - ClientSession clientSession = (ClientSession) createSession(expiryInSeconds); - clientSession.dtxSelect(); - return (DtxSession) clientSession; - } - - public void setClosedListener(ClosedListener closedListner) - { - - _closedListner = closedListner; - } - -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/ClosedListener.java b/java/client/src/main/java/org/apache/qpid/nclient/ClosedListener.java deleted file mode 100644 index 4cf0cab1ec..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/ClosedListener.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient; - -import org.apache.qpid.ErrorCode; - - -/** - * If the communication layer detects a serious problem with a connection, it - * informs the connection's ExceptionListener - */ -public interface ClosedListener -{ - /** - * If the communication layer detects a serious problem with a connection, it - * informs the connection's ExceptionListener - * @param errorCode TODO - * @param reason TODO - * @param t TODO - * @see Connection - */ - public void onClosed(ErrorCode errorCode, String reason, Throwable t); -} \ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Connection.java b/java/client/src/main/java/org/apache/qpid/nclient/Connection.java deleted file mode 100644 index 2d5b50b33a..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/Connection.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient; - -import org.apache.qpid.QpidException; - -/** - * This represents a physical connection to a broker. - */ -public interface Connection -{ - /** - * Establish the connection using the given parameters - * - * @param host host name - * @param port port number - * @param virtualHost the virtual host name - * @param username user name - * @param password password - * @throws QpidException If the communication layer fails to establish the connection. - */ - public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException; - - /** - * Establish the connection with the broker identified by the URL. - * - * @param url Specifies the URL of the broker. - * @throws QpidException If the communication layer fails to connect with the broker, an exception is thrown. - */ - public void connect(String url) throws QpidException; - - /** - * Close this connection. - * - * @throws QpidException if the communication layer fails to close the connection. - */ - public void close() throws QpidException; - - /** - * Create a session for this connection. - *

The returned session is suspended - * (i.e. this session is not attached to an underlying channel) - * - * @param expiryInSeconds Expiry time expressed in seconds, if the value is less than - * or equal to 0 then the session does not expire. - * @return A newly created (suspended) session. - */ - public Session createSession(long expiryInSeconds); - - /** - * Create a DtxSession for this connection. - *

A Dtx Session must be used when resources have to be manipulated as - * part of a global transaction. - *

The retuned DtxSession is suspended - * (i.e. this session is not attached with an underlying channel) - * - * @param expiryInSeconds Expiry time expressed in seconds, if the value is less than or equal - * to 0 then the session does not expire. - * @return A newly created (suspended) DtxSession. - */ - public DtxSession createDTXSession(int expiryInSeconds); - - /** - * If the communication layer detects a serious problem with a connection, it - * informs the connection's ClosedListener - * - * @param exceptionListner The ClosedListener - */ - public void setClosedListener(ClosedListener exceptionListner); -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java b/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java deleted file mode 100644 index 8a859f2d84..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient; - -import org.apache.qpid.transport.Future; -import org.apache.qpid.transport.GetTimeoutResult; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.RecoverResult; -import org.apache.qpid.transport.XaResult; -import org.apache.qpid.transport.Xid; - -/** - * The resources for this session are controlled under the scope of a distributed transaction. - */ -public interface DtxSession extends Session -{ - - /** - * This method is called when messages should be produced and consumed on behalf a transaction - * branch identified by xid. - * possible options are: - *

    - *
  • {@link Option#JOIN}: Indicate that the start applies to joining a transaction previously seen. - *
  • {@link Option#RESUME}: Indicate that the start applies to resuming a suspended transaction branch specified. - *
- * - * @param xid Specifies the xid of the transaction branch to be started. - * @param options Possible options are: {@link Option#JOIN} and {@link Option#RESUME}. - * @return Confirms to the client that the transaction branch is started or specify the error condition. - */ - public Future dtxStart(Xid xid, Option... options); - - /** - * This method is called when the work done on behalf of a transaction branch finishes or needs to - * be suspended. - * possible options are: - *
    - *
  • {@link Option#FAIL}: indicates that this portion of work has failed; - * otherwise this portion of work has - * completed successfully. - *
  • {@link Option#SUSPEND}: Indicates that the transaction branch is - * temporarily suspended in an incomplete state. - *
- * - * @param xid Specifies the xid of the transaction branch to be ended. - * @param options Available options are: {@link Option#FAIL} and {@link Option#SUSPEND}. - * @return Confirms to the client that the transaction branch is ended or specifies the error condition. - */ - public Future dtxEnd(Xid xid, Option... options); - - /** - * Commit the work done on behalf of a transaction branch. This method commits the work associated - * with xid. Any produced messages are made available and any consumed messages are discarded. - * The only possible option is: - *
    - *
  • {@link Option#ONE_PHASE}: When set, one-phase commit optimization is used. - *
- * - * @param xid Specifies the xid of the transaction branch to be committed. - * @param options Available option is: {@link Option#ONE_PHASE} - * @return Confirms to the client that the transaction branch is committed or specifies the error condition. - */ - public Future dtxCommit(Xid xid, Option... options); - - /** - * This method is called to forget about a heuristically completed transaction branch. - * - * @param xid Specifies the xid of the transaction branch to be forgotten. - */ - public void dtxForget(Xid xid, Option ... options); - - /** - * This method obtains the current transaction timeout value in seconds. If set-timeout was not - * used prior to invoking this method, the return value is the default timeout value; otherwise, the - * value used in the previous set-timeout call is returned. - * - * @param xid Specifies the xid of the transaction branch used for getting the timeout. - * @return The current transaction timeout value in seconds. - */ - public Future dtxGetTimeout(Xid xid, Option ... options); - - /** - * This method prepares any message produced or consumed on behalf of xid, ready for commitment. - * - * @param xid Specifies the xid of the transaction branch to be prepared. - * @return The status of the prepare operation can be any one of: - * xa-ok: Normal execution. - *

- * xa-rdonly: The transaction branch was read-only and has been committed. - *

- * xa-rbrollback: The broker marked the transaction branch rollback-only for an unspecified - * reason. - *

- * xa-rbtimeout: The work represented by this transaction branch took too long. - */ - public Future dtxPrepare(Xid xid, Option ... options); - - /** - * This method is called to obtain a list of transaction branches that are in a prepared or - * heuristically completed state. - * @return a array of xids to be recovered. - */ - public Future dtxRecover(Option ... options); - - /** - * This method rolls back the work associated with xid. Any produced messages are discarded and - * any consumed messages are re-queued. - * - * @param xid Specifies the xid of the transaction branch to be rolled back. - * @return Confirms to the client that the transaction branch is rolled back or specifies the error condition. - */ - public Future dtxRollback(Xid xid, Option ... options); - - /** - * Sets the specified transaction branch timeout value in seconds. - * - * @param xid Specifies the xid of the transaction branch for setting the timeout. - * @param timeout The transaction timeout value in seconds. - */ - public void dtxSetTimeout(Xid xid, long timeout, Option ... options); -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/Session.java deleted file mode 100644 index 0d84394c7c..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/Session.java +++ /dev/null @@ -1,544 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.nclient; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Map; - -import org.apache.qpid.transport.*; -import org.apache.qpid.api.Message; - -/** - *

A session is associated with a connection. - * When it is created, a session is not associated with an underlying channel. - * The session is single threaded.

- *

- * All the Session commands are asynchronous. Synchronous behavior is achieved through invoking the sync method. - * For example, command1 will be synchronously invoked by using the following sequence: - *

    - *
  • session.command1() - *
  • session.sync() - *
- */ -public interface Session -{ - public static final short TRANSFER_ACQUIRE_MODE_NO_ACQUIRE = 1; - public static final short TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE = 0; - public static final short TRANSFER_CONFIRM_MODE_REQUIRED = 0; - public static final short TRANSFER_CONFIRM_MODE_NOT_REQUIRED = 1; - public static final short MESSAGE_FLOW_MODE_CREDIT = 0; - public static final short MESSAGE_FLOW_MODE_WINDOW = 1; - public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0; - public static final short MESSAGE_FLOW_UNIT_BYTE = 1; - public static final long MESSAGE_FLOW_MAX_BYTES = 0xFFFFFFFF; - public static final short MESSAGE_REJECT_CODE_GENERIC = 0; - public static final short MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED = 1; - public static final short MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE = 0; - public static final short MESSAGE_ACQUIRE_MESSAGES_IF_ALL_ARE_AVAILABLE = 1; - - //------------------------------------------------------ - // Session housekeeping methods - //------------------------------------------------------ - - /** - * Sync method will block the session until all outstanding commands - * are executed. - */ - public void sync(); - - public void close(); - - public void sessionDetach(byte[] name, Option ... options); - - public void sessionRequestTimeout(long expiry, Option ... options); - - public byte[] getName(); - - public void setAutoSync(boolean value); - - //------------------------------------------------------ - // Messaging methods - // Producer - //------------------------------------------------------ - /** - * Transfer a message to a specified exchange. - *

- *

This transfer provides a complete message - * using a single method. The method is internally mapped to messageTransfer() and headers() followed - * by data() and endData(). - * This method should only be used by small messages.

- * - * @param destination The exchange the message is being sent to. - * @param msg The Message to be sent. - * @param confirmMode
    off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation - * is not required. Once a message has been transferred in pre-acquire - * mode (or once acquire has been sent in no-acquire mode) the message is considered - * transferred. - *

    - *

  • on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message - * is not considered transferred until the original - * transfer is complete. A complete transfer is signaled by execution.complete. - *
- * @param acquireMode
    - *
  • no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message - * must be explicitly acquired. - *
  • pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message is - * acquired when the transfer starts. - *
- * @throws java.io.IOException If transferring a message fails due to some internal communication error, an exception is thrown. - */ - public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode) - throws IOException; - - - /** - * This command transfers a message between two peers. - * - * @param destination Specifies the destination to which the message is to be transferred. - * @param acceptMode Indicates whether message.accept, session.complete, - * or nothing at all is required to indicate successful transfer of the message. - * - * @param acquireMode Indicates whether or not the transferred message has been acquired. - */ - public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, - Header header, ByteBuffer body, Option ... options); - - /** - * This command transfers a message between two peers. - * - * @param destination Specifies the destination to which the message is to be transferred. - * @param acceptMode Indicates whether message.accept, session.complete, - * or nothing at all is required to indicate successful transfer of the message. - * - * @param acquireMode Indicates whether or not the transferred message has been acquired. - */ - public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, - Header header, byte[] body, Option ... options); - - /** - * This command transfers a message between two peers. - * - * @param destination Specifies the destination to which the message is to be transferred. - * @param acceptMode Indicates whether message.accept, session.complete, - * or nothing at all is required to indicate successful transfer of the message. - * - * @param acquireMode Indicates whether or not the transferred message has been acquired. - */ - public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, - Header header, String body, Option ... options); - - //------------------------------------------------------ - // Messaging methods - // Consumer - //------------------------------------------------------ - - /** - * Associate a message listener with a destination. - *

The destination is bound to a queue, and messages are filtered based - * on the provider filter map (message filtering is specific to the provider and in some cases might not be handled). - *

The valid options are: - *

    - *
  • {@link Option#EXCLUSIVE}:

    Requests exclusive subscription access, so that only this - * subscription can access the queue. - *

  • {@link Option#NONE}:

    This is an empty option, and has no effect. - *

- * - * @param queue The queue that the receiver is receiving messages from. - * @param destination The destination, or delivery tag, for the subscriber. - * @param confirmMode
    off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation - * is not required. Once a message has been transferred in pre-acquire - * mode (or once acquire has been sent in no-acquire mode) the message is considered - * transferred. - *

    - *

  • on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message - * is not considered transferred until the original - * transfer is complete. A complete transfer is signaled by execution.complete. - *
- * @param acquireMode
    - *
  • no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message must - * be explicitly acquired. - *
  • pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message is - * acquired when the transfer starts. - *
- * @param listener The listener for this destination. To transfer large messages - * use a {@link org.apache.qpid.nclient.MessagePartListener}. - * @param options Set of options. Valid options are {{@link Option#EXCLUSIVE} - * and {@link Option#NONE}. - * @param filter A set of filters for the subscription. The syntax and semantics of these filters varies - * according to the provider's implementation. - */ - public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, - MessagePartListener listener, Map filter, Option... options); - - /** - * This method cancels a consumer. The server will not send any more messages to the specified destination. - * This does not affect already delivered messages. - * The client may receive a - * number of messages in between sending the cancel method and receiving - * notification that the cancellation has been completed. - * - * @param destination The destination to be cancelled. - */ - public void messageCancel(String destination, Option ... options); - - /** - * Associate a message listener with a destination. - *

Only one listener is permitted for each destination. When a new listener is created, - * it replaces the previous message listener. To prevent message loss, this occurs only when the original listener - * has completed processing a message. - * - * @param destination The destination the listener is associated with. - * @param listener The new listener for this destination. - */ - public void setMessageListener(String destination, MessagePartListener listener); - - /** - * Sets the mode of flow control used for a given destination. - *

With credit based flow control, the broker continually maintains its current - * credit balance with the recipient. The credit balance consists of two values, a message - * count, and a byte count. Whenever message data is sent, both counts must be decremented. - * If either value reaches zero, the flow of message data must stop. Additional credit is - * received via the {@link Session#messageFlow} method. - *

Window based flow control is identical to credit based flow control, however message - * acknowledgment implicitly grants a single unit of message credit, and the size of the - * message in byte credits for each acknowledged message. - * - * @param destination The destination to set the flow mode on. - * @param mode

  • credit ({@link Session#MESSAGE_FLOW_MODE_CREDIT}): choose credit based flow control - *
  • window ({@link Session#MESSAGE_FLOW_MODE_WINDOW}): choose window based flow control
- */ - public void messageSetFlowMode(String destination, MessageFlowMode mode, Option ... options); - - - /** - * This method controls the flow of message data to a given destination. It is used by the - * recipient of messages to dynamically match the incoming rate of message flow to its - * processing or forwarding capacity. Upon receipt of this method, the sender must add "value" - * number of the specified unit to the available credit balance for the specified destination. - * A value of 0 indicates an infinite amount of credit. This disables any limit for - * the given unit until the credit balance is zeroed with {@link Session#messageStop} - * or {@link Session#messageFlush}. - * - * @param destination The destination to set the flow. - * @param unit Specifies the unit of credit balance. - *

- * One of:

    - *
  • message ({@link Session#MESSAGE_FLOW_UNIT_MESSAGE}) - *
  • byte ({@link Session#MESSAGE_FLOW_UNIT_BYTE}) - *
- * @param value Number of credits, a value of 0 indicates an infinite amount of credit. - */ - public void messageFlow(String destination, MessageCreditUnit unit, long value, Option ... options); - - /** - * Forces the broker to exhaust its credit supply. - *

The credit on the broker will remain at zero once - * this method is completed. - * - * @param destination The destination on which the credit supply is to be exhausted. - */ - public void messageFlush(String destination, Option ... options); - - /** - * On receipt of this method, the brokers set credit to zero for a given - * destination. When confirmation of this method - * is issued credit is set to zero. No further messages will be sent until - * further credit is received. - * - * @param destination The destination on which to reset credit. - */ - public void messageStop(String destination, Option ... options); - - /** - * Acknowledge the receipt of a range of messages. - *

Messages must already be acquired, either by receiving them in - * pre-acquire mode or by explicitly acquiring them. - * - * @param ranges Range of messages to be acknowledged. - * @param accept pecify whether to send a message accept to the broker - */ - public void messageAcknowledge(RangeSet ranges, boolean accept); - - /** - * Reject a range of acquired messages. - *

The broker will deliver rejected messages to the - * alternate-exchange on the queue from which it came. If no alternate-exchange is - * defined for that queue the broker will discard the message. - * - * @param ranges Range of messages to be rejected. - * @param code The reject code must be one of {@link Session#MESSAGE_REJECT_CODE_GENERIC} or - * {@link Session#MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED} (immediate delivery was attempted but - * failed). - * @param text String describing the reason for a message transfer rejection. - */ - public void messageReject(RangeSet ranges, MessageRejectCode code, String text, Option ... options); - - /** - * As it is possible that the broker does not manage to reject some messages, after completion of - * {@link Session#messageReject} this method will return the ranges of rejected messages. - *

Note that {@link Session#messageReject} and this methods are asynchronous therefore for accessing to the - * previously rejected messages this method must be invoked in conjunction with {@link Session#sync()}. - *

A recommended invocation sequence would be: - *

    - *
  • {@link Session#messageReject} - *
  • {@link Session#sync()} - *
  • {@link Session#getRejectedMessages()} - *
- * - * @return The rejected message ranges - */ - public RangeSet getRejectedMessages(); - - /** - * Try to acquire ranges of messages hence releasing them form the queue. - * This means that once acknowledged, a message will not be delivered to any other receiver. - *

As those messages may have been consumed by another receivers hence, - * message acquisition can fail. - * The outcome of the acquisition is returned as an array of ranges of qcquired messages. - *

This method should only be called on non-acquired messages. - * - * @param ranges Ranges of messages to be acquired. - * @return Indicates the acquired messages - */ - public Future messageAcquire(RangeSet ranges, Option ... options); - - /** - * Give up responsibility for processing ranges of messages. - *

Released messages are re-enqueued. - * - * @param ranges Ranges of messages to be released. - * @param options Valid option is: {@link Option#SET_REDELIVERED}) - */ - public void messageRelease(RangeSet ranges, Option ... options); - - // ----------------------------------------------- - // Local transaction methods - // ---------------------------------------------- - /** - * Selects the session for local transaction support. - */ - public void txSelect(Option ... options); - - /** - * Commit the receipt and delivery of all messages exchanged by this session's resources. - * - * @throws IllegalStateException If this session is not transacted, an exception will be thrown. - */ - public void txCommit(Option ... options) throws IllegalStateException; - - /** - * Roll back the receipt and delivery of all messages exchanged by this session's resources. - * - * @throws IllegalStateException If this session is not transacted, an exception will be thrown. - */ - public void txRollback(Option ... options) throws IllegalStateException; - - //--------------------------------------------- - // Queue methods - //--------------------------------------------- - - /** - * Declare a queue with the given queueName - *

Following are the valid options: - *

    - *
  • {@link Option#AUTO_DELETE}:

    If this field is set and the exclusive field is also set, - * then the queue is deleted when the connection closes. - * If this field is set and the exclusive field is not set the queue is deleted when all - * the consumers have finished using it. - *

  • {@link Option#DURABLE}:

    If set when creating a new queue, - * the queue will be marked as durable. Durable queues - * remain active when a server restarts. Non-durable queues (transient queues) are purged - * if/when a server restarts. Note that durable queues do not necessarily hold persistent - * messages, although it does not make sense to send persistent messages to a transient - * queue. - *

  • {@link Option#EXCLUSIVE}:

    Exclusive queues can only be used from one connection at a time. - * Once a connection declares an exclusive queue, that queue cannot be used by any other connections until the - * declaring connection closes. - *

  • {@link Option#PASSIVE}:

    If set, the server will not create the queue. - * This field allows the client to assert the presence of a queue without modifying the server state. - *

  • {@link Option#NONE}:

    Has no effect as it represents an empty option. - *

- *

In the absence of a particular option, the defaul value is false for each option - * - * @param queueName The name of the delcared queue. - * @param alternateExchange If a message is rejected by a queue, then it is sent to the alternate-exchange. A message - * may be rejected by a queue for the following reasons: - *

  1. The queue is deleted when it is not empty; - *
  2. Immediate delivery of a message is requested, but there are no consumers connected to - * the queue.
- * @param arguments Used for backward compatibility - * @param options Set of Options ( valide options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE}, - * {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and {@link Option#NONE}) - * @see Option - */ - public void queueDeclare(String queueName, String alternateExchange, Map arguments, - Option... options); - - /** - * Bind a queue with an exchange. - * - * @param queueName Specifies the name of the queue to bind. If the queue name is empty, refers to the current - * queue for the session, which is the last declared queue. - * @param exchangeName The exchange name. - * @param routingKey Specifies the routing key for the binding. The routing key is used for routing messages - * depending on the exchange configuration. Not all exchanges use a routing key - refer to - * the specific exchange documentation. If the queue name is empty, the server uses the last - * queue declared on the session. If the routing key is also empty, the server uses this - * queue name for the routing key as well. If the queue name is provided but the routing key - * is empty, the server does the binding with that empty routing key. The meaning of empty - * routing keys depends on the exchange implementation. - * @param arguments Used for backward compatibility - */ - public void exchangeBind(String queueName, String exchangeName, String routingKey, Map arguments, - Option ... options); - - /** - * Unbind a queue from an exchange. - * - * @param queueName Specifies the name of the queue to unbind. - * @param exchangeName The name of the exchange to unbind from. - * @param routingKey Specifies the routing key of the binding to unbind. - */ - public void exchangeUnbind(String queueName, String exchangeName, String routingKey, Option ... options); - - /** - * This method removes all messages from a queue. It does not cancel consumers. Purged messages - * are deleted without any formal "undo" mechanism. - * - * @param queueName Specifies the name of the queue to purge. If the queue name is empty, refers to the - * current queue for the session, which is the last declared queue. - */ - public void queuePurge(String queueName, Option ... options); - - /** - * This method deletes a queue. When a queue is deleted any pending messages are sent to a - * dead-letter queue if this is defined in the server configuration, and all consumers on the - * queue are cancelled. - *

Following are the valid options: - *

    - *
  • {@link Option#IF_EMPTY}:

    If set, the server will only delete the queue if it has no messages. - *

  • {@link Option#IF_UNUSED}:

    If set, the server will only delete the queue if it has no consumers. - * If the queue has consumers the server does does not delete it but raises a channel exception instead. - *

  • {@link Option#NONE}:

    Has no effect as it represents an empty option. - *

- *

- *

- *

In the absence of a particular option, the defaul value is false for each option

- * - * @param queueName Specifies the name of the queue to delete. If the queue name is empty, refers to the - * current queue for the session, which is the last declared queue. - * @param options Set of options (Valid options are: {@link Option#IF_EMPTY}, {@link Option#IF_UNUSED} - * and {@link Option#NONE}) - * @see Option - */ - public void queueDelete(String queueName, Option... options); - - - /** - * This method is used to request information on a particular queue. - * - * @param queueName The name of the queue for which information is requested. - * @return Information on the specified queue. - */ - public Future queueQuery(String queueName, Option ... options); - - - /** - * This method is used to request information on a particular binding. - * - * @param exchange The exchange name. - * @param queue The queue name. - * @param routingKey The routing key - * @param arguments bacward compatibilties params. - * @return Information on the specified binding. - */ - public Future exchangeBound(String exchange, String queue, String routingKey, - Map arguments, Option ... options); - - // -------------------------------------- - // exhcange methods - // -------------------------------------- - - /** - * This method creates an exchange. If the exchange already exists, - * the method verifies the class and checks the details are correct. - *

Valid options are: - *

    - *
  • {@link Option#AUTO_DELETE}:

    If set, the exchange is deleted when all queues have finished using it. - *

  • {@link Option#DURABLE}:

    If set, the exchange will - * be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient - * exchanges) are purged when a server restarts. - *

  • {@link Option#PASSIVE}:

    If set, the server will not create the exchange. - * The client can use this to check whether an exchange exists without modifying the server state. - *

  • {@link Option#NONE}:

    This option is an empty option, and has no effect. - *

- *

In the absence of a particular option, the defaul value is false for each option

- * - * @param exchangeName The exchange name. - * @param type Each exchange belongs to one of a set of exchange types implemented by the server. The - * exchange types define the functionality of the exchange - i.e. how messages are routed - * through it. It is not valid or meaningful to attempt to change the type of an existing - * exchange. Default exchange types are: direct, topic, headers and fanout. - * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which - * the message will be sent. - * @param options Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE}, - * {@link Option#PASSIVE}, {@link Option#NONE}) - * @param arguments Used for backward compatibility - * @see Option - */ - public void exchangeDeclare(String exchangeName, String type, String alternateExchange, - Map arguments, Option... options); - - /** - * This method deletes an exchange. When an exchange is deleted all queue bindings on the - * exchange are cancelled. - *

Following are the valid options: - *

    - *
  • {@link Option#IF_UNUSED}:

    If set, the server will only delete the exchange if it has no queue bindings. If the - * exchange has queue bindings the server does not delete it but raises a channel exception - * instead. - *

  • {@link Option#NONE}:

    Has no effect as it represents an empty option. - *

- *

Note that if an option is not set, it will default to false. - * - * @param exchangeName The name of exchange to be deleted. - * @param options Set of options. Valid options are: {@link Option#IF_UNUSED}, {@link Option#NONE}. - * @see Option - */ - public void exchangeDelete(String exchangeName, Option... options); - - - /** - * This method is used to request information about a particular exchange. - * - * @param exchangeName The name of the exchange about which information is requested. If not set, the method will - * return information about the default exchange. - * @return Information on the specified exchange. - */ - public Future exchangeQuery(String exchangeName, Option ... options); - - /** - * If the session receives a sessionClosed with an error code it - * informs the session's exceptionListener - * - * @param exceptionListner The exceptionListener - */ - public void setClosedListener(ClosedListener exceptionListner); -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java deleted file mode 100644 index 869f974ae6..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java +++ /dev/null @@ -1,163 +0,0 @@ -package org.apache.qpid.nclient.impl; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.io.EOFException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.qpid.QpidException; -import org.apache.qpid.api.Message; -import org.apache.qpid.nclient.ClosedListener; -import org.apache.qpid.nclient.MessagePartListener; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.Range; -import org.apache.qpid.transport.RangeSet; - -import static org.apache.qpid.transport.Option.*; - -/** - * Implements a Qpid Sesion. - */ -public class ClientSession extends org.apache.qpid.transport.Session implements org.apache.qpid.nclient.DtxSession -{ - static - { - String max = "message_size_before_sync"; // KB's - try - { - MAX_NOT_SYNC_DATA_LENGH = new Long(System.getProperties().getProperty(max, "200000000")); - } - catch (NumberFormatException e) - { - // use default size - MAX_NOT_SYNC_DATA_LENGH = 200000000; - } - String flush = "message_size_before_flush"; - try - { - MAX_NOT_FLUSH_DATA_LENGH = new Long(System.getProperties().getProperty(flush, "2000000")); - } - catch (NumberFormatException e) - { - // use default size - MAX_NOT_FLUSH_DATA_LENGH = 20000000; - } - } - - private static long MAX_NOT_SYNC_DATA_LENGH; - private static long MAX_NOT_FLUSH_DATA_LENGH; - - private Map _messageListeners = new ConcurrentHashMap(); - private ClosedListener _exceptionListner; - private RangeSet _rejectedMessages; - private long _currentDataSizeNotSynced; - private long _currentDataSizeNotFlushed; - - public ClientSession(byte[] name) - { - super(name); - } - - public void messageAcknowledge(RangeSet ranges, boolean accept) - { - for (Range range : ranges) - { - super.processed(range); - } - super.flushProcessed(accept ? BATCH : NONE); - if (accept) - { - messageAccept(ranges); - } - } - - public void messageSubscribe(String queue, String destination, short acceptMode, short acquireMode, MessagePartListener listener, Map filter, Option... options) - { - setMessageListener(destination,listener); - super.messageSubscribe(queue, destination, MessageAcceptMode.get(acceptMode), - MessageAcquireMode.get(acquireMode), null, 0, filter, - options); - } - - public void messageTransfer(String destination, Message msg, short acceptMode, short acquireMode) throws IOException - { - DeliveryProperties dp = msg.getDeliveryProperties(); - MessageProperties mp = msg.getMessageProperties(); - ByteBuffer body = msg.readData(); - int size = body.remaining(); - super.messageTransfer - (destination, MessageAcceptMode.get(acceptMode), - MessageAcquireMode.get(acquireMode), - new Header(dp, mp), body); - _currentDataSizeNotSynced += size; - _currentDataSizeNotFlushed += size; - } - - public void sync() - { - super.sync(); - _currentDataSizeNotSynced = 0; - } - - public RangeSet getRejectedMessages() - { - return _rejectedMessages; - } - - public void setMessageListener(String destination, MessagePartListener listener) - { - if (listener == null) - { - throw new IllegalArgumentException("Cannot set message listener to null"); - } - _messageListeners.put(destination, listener); - } - - public void setClosedListener(ClosedListener exceptionListner) - { - _exceptionListner = exceptionListner; - } - - void setRejectedMessages(RangeSet rejectedMessages) - { - _rejectedMessages = rejectedMessages; - } - - void notifyException(QpidException ex) - { - _exceptionListner.onClosed(null, null, null); - } - - Map getMessageListeners() - { - return _messageListeners; - } -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java deleted file mode 100644 index 6bcd4fbce5..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java +++ /dev/null @@ -1,68 +0,0 @@ -package org.apache.qpid.nclient.impl; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.nio.ByteBuffer; - -import org.apache.qpid.ErrorCode; - -import org.apache.qpid.nclient.MessagePartListener; - -import org.apache.qpid.QpidException; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageReject; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Range; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionDetached; -import org.apache.qpid.transport.SessionDelegate; - - -public class ClientSessionDelegate extends SessionDelegate -{ - - // -------------------------------------------- - // Message methods - // -------------------------------------------- - @Override public void messageTransfer(Session session, MessageTransfer xfr) - { - MessagePartListener listener = ((ClientSession)session).getMessageListeners() - .get(xfr.getDestination()); - listener.messageTransfer(xfr); - } - - @Override public void messageReject(Session session, MessageReject struct) - { - for (Range range : struct.getTransfers()) - { - for (long l = range.getLower(); l <= range.getUpper(); l++) - { - System.out.println("message rejected: " + - session.getCommand((int) l)); - } - } - ((ClientSession)session).setRejectedMessages(struct.getTransfers()); - ((ClientSession)session).notifyException(new QpidException("Message Rejected",ErrorCode.MESSAGE_REJECTED,null)); - session.processed(struct); - } - -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java index 155de2a678..a1dc48fcda 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java @@ -23,60 +23,55 @@ package org.apache.qpid.nclient.impl; import org.apache.qpid.ErrorCode; import org.apache.qpid.api.Message; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.ClosedListener; -import org.apache.qpid.nclient.Session; -import org.apache.qpid.nclient.util.MessageListener; -import org.apache.qpid.nclient.util.MessagePartListenerAdapter; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; import java.nio.ByteBuffer; import java.util.UUID; public class DemoClient { - public static MessagePartListenerAdapter createAdapter() + public static class DemoListener implements SessionListener { - return new MessagePartListenerAdapter(new MessageListener() + public void opened(Session ssn) {} + + public void exception(Session ssn, SessionException exc) + { + System.out.println(exc); + } + + public void message(Session ssn, MessageTransfer m) { - public void onMessage(Message m) - { - System.out.println("\n================== Received Msg =================="); - System.out.println("Message Id : " + m.getMessageProperties().getMessageId()); - System.out.println(m.toString()); - System.out.println("================== End Msg ==================\n"); - } - - }); + System.out.println("\n================== Received Msg =================="); + System.out.println("Message Id : " + m.getHeader().get(MessageProperties.class).getMessageId()); + System.out.println(m.toString()); + System.out.println("================== End Msg ==================\n"); + } + + public void closed(Session ssn) {} } public static final void main(String[] args) { - Connection conn = Client.createConnection(); - try{ - conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); - }catch(Exception e){ - e.printStackTrace(); - } + Connection conn = new Connection(); + conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); Session ssn = conn.createSession(50000); - ssn.setClosedListener(new ClosedListener() - { - public void onClosed(ErrorCode errorCode, String reason, Throwable t) - { - System.out.println("ErrorCode : " + errorCode + " reason : " + reason); - } - }); + ssn.setSessionListener(new DemoListener()); ssn.queueDeclare("queue1", null, null); ssn.exchangeBind("queue1", "amq.direct", "queue1",null); ssn.sync(); - ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null); + ssn.messageSubscribe("queue1", "myDest", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); // queue ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, @@ -91,9 +86,12 @@ public class DemoClient ssn.sync(); // topic subs - ssn.messageSubscribe("topic1", "myDest2", (short)0, (short)0,createAdapter(), null); - ssn.messageSubscribe("topic2", "myDest3", (short)0, (short)0,createAdapter(), null); - ssn.messageSubscribe("topic3", "myDest4", (short)0, (short)0,createAdapter(), null); + ssn.messageSubscribe("topic1", "myDest2", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); + ssn.messageSubscribe("topic2", "myDest3", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); + ssn.messageSubscribe("topic3", "myDest4", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); ssn.sync(); ssn.queueDeclare("topic1", null, null); diff --git a/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java b/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java index dd4a78fa2b..6c6cc308e9 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java @@ -28,12 +28,6 @@ import java.util.Map; import org.apache.qpid.ErrorCode; import org.apache.qpid.QpidException; import org.apache.qpid.api.Message; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.ClosedListener; -import org.apache.qpid.nclient.Session; -import org.apache.qpid.nclient.util.MessageListener; -import org.apache.qpid.nclient.util.MessagePartListenerAdapter; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; @@ -41,9 +35,13 @@ import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageCreditUnit; import org.apache.qpid.transport.MessageFlowMode; import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; -public class BasicInteropTest implements ClosedListener +public class BasicInteropTest implements SessionListener { private Session session; @@ -62,7 +60,7 @@ public class BasicInteropTest implements ClosedListener public void testCreateConnection(){ System.out.println("------- Creating connection--------"); - conn = Client.createConnection(); + conn = new Connection(); try{ conn.connect(host, 5672, "test", "guest", "guest"); }catch(Exception e){ @@ -116,23 +114,11 @@ public class BasicInteropTest implements ClosedListener public void testSubscribe() { System.out.println("------- Sending a subscribe --------"); + session.setSessionListener(this); session.messageSubscribe("testQueue", "myDest", - Session.TRANSFER_CONFIRM_MODE_REQUIRED, - Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, - new MessagePartListenerAdapter(new MessageListener(){ - - public void onMessage(Message message) - { - System.out.println("--------Message Received--------"); - System.out.println(message.toString()); - System.out.println("--------/Message Received--------"); - RangeSet ack = new RangeSet(); - ack.add(message.getMessageTransferId(),message.getMessageTransferId()); - session.messageAcknowledge(ack, true); - } - - }), - null); + MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); System.out.println("------- Setting Credit mode --------"); session.messageSetFlowMode("myDest", MessageFlowMode.WINDOW); @@ -141,20 +127,32 @@ public class BasicInteropTest implements ClosedListener session.messageFlow("myDest", MessageCreditUnit.BYTE, -1); } + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) + { + System.out.println("--------Message Received--------"); + System.out.println(xfr.toString()); + System.out.println("--------/Message Received--------"); + ssn.processed(xfr); + ssn.flushProcessed(); + } + public void testMessageFlush() { session.messageFlush("myDest"); session.sync(); } - public void onClosed(ErrorCode errorCode, String reason, Throwable t) + public void exception(Session ssn, SessionException exc) { System.out.println("------- Broker Notified an error --------"); - System.out.println("------- " + errorCode + " --------"); - System.out.println("------- " + reason + " --------"); + System.out.println("------- " + exc + " --------"); System.out.println("------- /Broker Notified an error --------"); } + public void closed(Session ssn) {} + public static void main(String[] args) throws QpidException { String host = "0.0.0.0"; -- cgit v1.2.1