diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-10-09 17:07:59 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-10-09 17:07:59 +0000 |
| commit | a09ed43cc8ed5862996e684b924f3405e09734c3 (patch) | |
| tree | 975ceb9d20c055ca4daa5875fdf28932e9ee8eef /qpid/java/management/client/src | |
| parent | 8ccc685eaf36dae9fe73d50d8410a062a015943b (diff) | |
| download | qpid-python-a09ed43cc8ed5862996e684b924f3405e09734c3.tar.gz | |
QPID-1339: refactor of low level client API to permit connections to exist in a disconnected state as well as to provide a central point from which to track session state
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@703208 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/management/client/src')
2 files changed, 150 insertions, 163 deletions
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java index f4428cb1e2..2c36fb3d65 100644 --- a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java +++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java @@ -28,140 +28,95 @@ import org.apache.commons.pool.BasePoolableObjectFactory; import org.apache.commons.pool.ObjectPool; import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.commons.pool.impl.GenericObjectPoolFactory; -import org.apache.qpid.ErrorCode; -import org.apache.qpid.QpidException; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.ClosedListener; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.DtxSession; -import org.apache.qpid.nclient.Session; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionException; import org.apache.qpid.transport.util.Logger; /** * Qpid datasource. - * Basically it is a connection pool manager used for optimizing broker connections usage. - * + * Basically it is a connection pool manager used for optimizing broker connections usage. + * * @author Andrea Gazzarini */ -public final class QpidDatasource +public final class QpidDatasource { private final static Logger LOGGER = Logger.get(QpidDatasource.class); - + /** * A connection decorator used for adding pool interaction behaviour to an existing connection. - * + * * @author Andrea Gazzarini */ - public class ConnectionDecorator implements Connection,ClosedListener + class PooledConnection extends Connection { - private final Connection _decoratee; private final UUID _brokerId; private boolean _valid; - + /** * Builds a new decorator with the given connection. - * + * * @param brokerId the broker identifier. * @param decoratee the underlying connection. */ - private ConnectionDecorator(UUID brokerId, Connection decoratee) + private PooledConnection(UUID brokerId) { - this._decoratee = decoratee; this._brokerId = brokerId; - _decoratee.setClosedListener(this); _valid = true; } - + /** * Returns true if the underlying connection is still valid and can be used. - * + * * @return true if the underlying connection is still valid and can be used. */ boolean isValid() { return _valid; } - + + void reallyClose() + { + super.close(); + } + /** * Returns the connection to the pool. That is, marks this connections as available. * After that, this connection will be available for further operations. */ - public void close () throws QpidException + public void close() { try { pools.get(_brokerId).returnObject(this); LOGGER.debug("<QMAN-200012> : Connection %s returned to the pool.", this); - } catch (Exception exception) + } + catch (Exception e) { - throw new QpidException("Error while closing connection.",ErrorCode.CONNECTION_ERROR,exception); - } - } - - /** - * Do nothing : underlying connection is already connected. - */ - public void connect (String host, int port, String virtualHost, String username, String password) - throws QpidException - { - // DO NOTHING : DECORATEE CONNECTION IS ALREADY CONNECTED. - } - - /** - * Do nothing : underlying connection is already connected. - */ - public void connect (String url) throws QpidException - { - // DO NOTHING : DECORATEE CONNECTION IS ALREADY CONNECTED. - } - - /** - * @see Connection#createDTXSession(int) - */ - public DtxSession createDTXSession (int expiryInSeconds) - { - return _decoratee.createDTXSession(expiryInSeconds); + throw new ConnectionException(e); + } } - /** - * @see Connection#createSession(long) - */ - public Session createSession (long expiryInSeconds) + public void exception(Throwable t) { - return _decoratee.createSession(expiryInSeconds); + super.exception(t); + _valid = false; } + } - /** - * Do nothing : closed listener has been already injected. - */ - public void setClosedListener (ClosedListener exceptionListner) - { - } - - /** - * Callback method used for error notifications while underlying connection is closing. - */ - public void onClosed (ErrorCode errorCode, String reason, Throwable t) - { - _valid = false; - LOGGER.error(t,"<QMAN-100012> : Error on closing connection. Reason is : %s, error code is %s",reason,errorCode.getCode()); - } - }; - /** - * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of + * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of * the broker connection(s). - * + * * @author Andrea Gazzarini */ class QpidConnectionFactory extends BasePoolableObjectFactory - { + { private final BrokerConnectionData _connectionData; private final UUID _brokerId; - + /** * Builds a new connection factory with the given parameters. - * + * * @param brokerId the broker identifier. * @param connectionData the connecton data. */ @@ -170,35 +125,35 @@ public final class QpidDatasource this._connectionData = connectionData; this._brokerId = brokerId; } - + /** * Creates a new underlying connection. */ @Override public Connection makeObject () throws Exception { - Connection connection = Client.createConnection(); + PooledConnection connection = new PooledConnection(_brokerId); connection.connect( - _connectionData.getHost(), - _connectionData.getPort(), - _connectionData.getVirtualHost(), - _connectionData.getUsername(), + _connectionData.getHost(), + _connectionData.getPort(), + _connectionData.getVirtualHost(), + _connectionData.getUsername(), _connectionData.getPassword()); - return new ConnectionDecorator(_brokerId,connection); + return connection; } - + /** * Validates the underlying connection. */ @Override public boolean validateObject (Object obj) { - ConnectionDecorator connection = (ConnectionDecorator) obj; + PooledConnection connection = (PooledConnection) obj; boolean isValid = connection.isValid(); LOGGER.debug("<QMAN-200013> : Test connection on reserve. Is valid? %s",isValid); return isValid; } - + /** * Closes the underlying connection. */ @@ -207,8 +162,8 @@ public final class QpidDatasource { try { - ConnectionDecorator connection = (ConnectionDecorator) obj; - connection._decoratee.close(); + PooledConnection connection = (PooledConnection) obj; + connection.reallyClose(); LOGGER.debug("<QMAN-200014> : Connection has been destroyed."); } catch (Exception e) { @@ -216,21 +171,21 @@ public final class QpidDatasource } } } - + // Singleton instance. private static QpidDatasource instance = new QpidDatasource(); // Each entry contains a connection pool for a specific broker. private Map<UUID, ObjectPool> pools = new HashMap<UUID, ObjectPool>(); - + // Private constructor. private QpidDatasource() { } - + /** * Gets an available connection from the pool of the given broker. - * + * * @param brokerId the broker identifier. * @return a valid connection to the broker associated with the given identifier. */ @@ -238,20 +193,20 @@ public final class QpidDatasource { return (Connection) pools.get(brokerId).borrowObject(); } - + /** * Entry point method for retrieving the singleton instance of this datasource. - * + * * @return the qpid datasource singleton instance. */ - public static QpidDatasource getInstance() + public static QpidDatasource getInstance() { return instance; } - + /** * Adds a connection pool to this datasource. - * + * * @param brokerId the broker identifier that will be associated with the new connection pool. * @param connectionData the broker connection data. * @throws Exception when the pool cannot be created. @@ -265,12 +220,12 @@ public final class QpidDatasource true, false); ObjectPool pool = factory.createPool(); - + for (int i = 0; i < connectionData.getInitialPoolCapacity(); i++) { pool.returnObject(pool.borrowObject()); } - + pools.put(brokerId,pool); } }
\ No newline at end of file diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java index 92689eba52..4bda450315 100644 --- a/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java +++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java @@ -22,42 +22,47 @@ package org.apache.qpid.management.domain.services; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import org.apache.qpid.QpidException; import org.apache.qpid.management.Constants; import org.apache.qpid.management.Names; import org.apache.qpid.management.configuration.Configuration; import org.apache.qpid.management.configuration.QpidDatasource; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.Session; import org.apache.qpid.nclient.util.MessageListener; import org.apache.qpid.nclient.util.MessagePartListenerAdapter; +import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; import org.apache.qpid.transport.util.Logger; /** * Qpid Broker facade. - * + * * @author Andrea Gazzarini */ -public class QpidService +public class QpidService implements SessionListener { private final static Logger LOGGER = Logger.get(QpidService.class); // Inner static class used for logging and avoid conditional logic (isDebugEnabled()) duplication. - private static class Log - { + private static class Log + { /** * Logs the content f the message. * This will be written on log only if DEBUG level is enabled. - * + * * @param messageContent the raw content of the message. */ - static void logMessageContent(byte [] messageContent) + static void logMessageContent(byte [] messageContent) { if (LOGGER.isDebugEnabled()) { LOGGER.debug( @@ -65,56 +70,81 @@ public class QpidService Arrays.toString(messageContent)); } } - + /** * Logs the content f the message. * This will be written on log only if DEBUG level is enabled. - * + * * @param messageContent the raw content of the message. */ - static void logMessageContent(ByteBuffer messageContent) + static void logMessageContent(ByteBuffer messageContent) { if (LOGGER.isDebugEnabled()) { LOGGER.debug( "<QMAN-200002> : Message has been sent to management exchange."); } - } + } } - + private UUID _brokerId; private Connection _connection; private Session _session; - + private Map<String,MessagePartListenerAdapter> _listeners; + /** * Builds a new service with the given connection data. - * + * * @param connectionData the connection data of the broker. */ - public QpidService(UUID brokerId) + public QpidService(UUID brokerId) { this._brokerId = brokerId; } - + /** * Estabilishes a connection with the broker. - * + * * @throws QpidException in case of connection failure. */ public void connect() throws Exception { _connection = QpidDatasource.getInstance().getConnection(_brokerId); + _listeners = new ConcurrentHashMap<String,MessagePartListenerAdapter>(); _session = _connection.createSession(Constants.NO_EXPIRATION); + _session.setSessionListener(this); + } + + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) + { + MessagePartListenerAdapter l = _listeners.get(xfr.getDestination()); + if (l == null) + { + LOGGER.error("unhandled message: %s", xfr); + } + else + { + l.messageTransfer(xfr); + } } - + + public void exception(Session ssn, SessionException exc) + { + LOGGER.error(exc, "session %s exception", ssn); + } + + public void closed(Session ssn) {} + /** - * All the previously entered outstanding commands are asynchronous. + * All the previously entered outstanding commands are asynchronous. * Synchronous behavior is achieved through invoking this method. */ - public void sync() + public void sync() { _session.sync(); } - + /** * Closes communication with broker. */ @@ -124,48 +154,50 @@ public class QpidService { _session.close(); _session = null; + _listeners = null; } catch (Exception e) { } try { - _connection.close(); + _connection.close(); _connection = null; } catch (Exception e) { } } - + /** * Associate a message listener with a destination therefore creating a new subscription. - * + * * @param queueName The name of the queue that the subscriber is receiving messages from. * @param destinationName the name of the destination, or delivery tag, for the subscriber. - * @param listener the listener for this destination. - * + * @param listener the listener for this destination. + * * @see Session#messageSubscribe(String, String, short, short, org.apache.qpid.nclient.MessagePartListener, java.util.Map, org.apache.qpid.transport.Option...) */ - public void createSubscription(String queueName, String destinationName,MessageListener listener) + public void createSubscription(String queueName, String destinationName, MessageListener listener) { - _session.messageSubscribe( - queueName, - destinationName, - Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, - Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, - new MessagePartListenerAdapter(listener), null); - - _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES); - _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Integer.MAX_VALUE); - + _listeners.put(destinationName, new MessagePartListenerAdapter(listener)); + _session.messageSubscribe + (queueName, + destinationName, + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); + + _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT); + _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT); + LOGGER.debug( - "<QMAN-200003> : New subscription between queue %s and destination %s has been declared.", + "<QMAN-200003> : New subscription between queue %s and destination %s has been declared.", queueName, destinationName); } - + /** * Removes a previously declared consumer from the broker. - * + * * @param destinationName the name of the destination, or delivery tag, for the subscriber. * @see Session#messageCancel(String, Option...) */ @@ -173,10 +205,10 @@ public class QpidService { _session.messageCancel(destinationName); LOGGER.debug( - "<QMAN-200026> : Subscription named %s has been removed from remote broker.", + "<QMAN-200026> : Subscription named %s has been removed from remote broker.", destinationName); - } - + } + /** * Declares a queue on the broker with the given name. * @@ -200,27 +232,27 @@ public class QpidService _session.queueDelete(queueName); LOGGER.debug("<QMAN-2000025> : Queue with name %s has been removed.",queueName); } - + /** * Binds (on the broker) a queue with an exchange. * - * @param queueName the name of the queue to bind. + * @param queueName the name of the queue to bind. * @param exchangeName the exchange name. - * @param routingKey the routing key used for the binding. + * @param routingKey the routing key used for the binding. * @see Session#exchangeBind(String, String, String, java.util.Map, Option...) */ public void declareBinding(String queueName, String exchangeName, String routingKey) { _session.exchangeBind(queueName, exchangeName, routingKey, null); LOGGER.debug( - "<QMAN-200005> : New binding with %s as routing key has been declared between queue %s and exchange %s.", + "<QMAN-200005> : New binding with %s as routing key has been declared between queue %s and exchange %s.", routingKey,queueName, exchangeName); } - + /** * Removes a previously declare binding between an exchange and a queue. - * + * * @param queueName the name of the queue. * @param exchangeName the name of the exchange. * @param routingKey the routing key used for binding. @@ -229,42 +261,42 @@ public class QpidService { _session.exchangeUnbind(queueName, exchangeName, routingKey); LOGGER.debug( - "<QMAN-200005> : Binding with %s as routing key has been removed between queue %s and exchange %s.", + "<QMAN-200005> : Binding with %s as routing key has been removed between queue %s and exchange %s.", routingKey,queueName, exchangeName); } - + /** * Sends a command message with the given data on the management queue. - * + * * @param messageData the command message content. */ - public void sendCommandMessage(byte [] messageData) + public void sendCommandMessage(byte [] messageData) { _session.messageTransfer( Names.MANAGEMENT_EXCHANGE, MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED, Configuration.getInstance().getCommandMessageHeader(), - messageData); - + messageData); + Log.logMessageContent (messageData); } - + /** * Sends a command message with the given data on the management queue. - * + * * @param messageData the command message content. */ - public void sendCommandMessage(ByteBuffer messageData) + public void sendCommandMessage(ByteBuffer messageData) { _session.messageTransfer( Names.MANAGEMENT_EXCHANGE, MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED, Configuration.getInstance().getCommandMessageHeader(), - messageData); - + messageData); + Log.logMessageContent (messageData); - } + } }
\ No newline at end of file |
