diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2008-10-13 09:29:54 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2008-10-13 09:29:54 +0000 |
| commit | 35e4d6fca145a6ab5261d132af0d88b962cd50dd (patch) | |
| tree | e7b43cc1b670724614148a81951232f22c7cb94f | |
| parent | 7be40701f012f945373e234f3c3df75b041340bc (diff) | |
| download | qpid-python-35e4d6fca145a6ab5261d132af0d88b962cd50dd.tar.gz | |
qpid-1284: on behalf Adnrea: revert to previous revision so to include latest changes from rhs
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703989 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 201 insertions, 313 deletions
diff --git a/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java b/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java index 5a77675824..2c36fb3d65 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java @@ -28,158 +28,95 @@ import org.apache.commons.pool.BasePoolableObjectFactory; import org.apache.commons.pool.ObjectPool; import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.commons.pool.impl.GenericObjectPoolFactory; -import org.apache.qpid.ErrorCode; -import org.apache.qpid.QpidException; -import org.apache.qpid.nclient.Client; -import org.apache.qpid.nclient.ClosedListener; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.DtxSession; -import org.apache.qpid.nclient.Session; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionException; import org.apache.qpid.transport.util.Logger; /** * Qpid datasource. - * Basically it is a connection pool manager used for optimizing broker connections usage. - * + * Basically it is a connection pool manager used for optimizing broker connections usage. + * * @author Andrea Gazzarini */ -public final class QpidDatasource +public final class QpidDatasource { private final static Logger LOGGER = Logger.get(QpidDatasource.class); - + /** * A connection decorator used for adding pool interaction behaviour to an existing connection. - * + * * @author Andrea Gazzarini */ - public class ConnectionDecorator implements Connection,ClosedListener + class PooledConnection extends Connection { - private final Connection _decoratee; private final UUID _brokerId; private boolean _valid; - private BrokerConnectionData _connectionData; - + /** * Builds a new decorator with the given connection. - * + * * @param brokerId the broker identifier. * @param decoratee the underlying connection. - * @param connectionData connection details used for logging purposes. */ - private ConnectionDecorator(UUID brokerId, Connection decoratee, BrokerConnectionData connectionData) + private PooledConnection(UUID brokerId) { - this._decoratee = decoratee; this._brokerId = brokerId; - this._decoratee.setClosedListener(this); - this._connectionData = connectionData; _valid = true; - LOGGER.debug("<QMAN-200045> : Connection %s for pool %s created.",this,_connectionData); } - + /** * Returns true if the underlying connection is still valid and can be used. - * + * * @return true if the underlying connection is still valid and can be used. */ boolean isValid() { - if (_valid) - { - LOGGER.debug("<QMAN-200013> : Pooled connection %s for %s seems to be valid.",this,_connectionData); - } else - { - LOGGER.debug("<QMAN-200013> : Pooled connection %s for %s has been marked as invalid.",this,_connectionData); - } return _valid; } - + + void reallyClose() + { + super.close(); + } + /** * Returns the connection to the pool. That is, marks this connections as available. * After that, this connection will be available for further operations. */ - public void close () throws QpidException + public void close() { try { pools.get(_brokerId).returnObject(this); - LOGGER.debug("<QMAN-200012> : <Connection for pool %s released.", _connectionData); - } catch (Exception exception) + LOGGER.debug("<QMAN-200012> : Connection %s returned to the pool.", this); + } + catch (Exception e) { - throw new QpidException("<QMAN-100203> : Error while releasing pooled connection.",ErrorCode.CONNECTION_ERROR,exception); - } - } - - /** - * Do nothing : underlying connection is already connected. - */ - public void connect (String host, int port, String virtualHost, String username, String password) - throws QpidException - { - // DO NOTHING : DECORATEE CONNECTION IS ALREADY CONNECTED. - } - - /** - * Do nothing : underlying connection is already connected. - */ - public void connect (String url) throws QpidException - { - // DO NOTHING : DECORATEE CONNECTION IS ALREADY CONNECTED. - } - - /** - * @see Connection#createDTXSession(int) - */ - public DtxSession createDTXSession (int expiryInSeconds) - { - return _decoratee.createDTXSession(expiryInSeconds); + throw new ConnectionException(e); + } } - /** - * @see Connection#createSession(long) - */ - public Session createSession (long expiryInSeconds) + public void exception(Throwable t) { - return _decoratee.createSession(expiryInSeconds); + super.exception(t); + _valid = false; } + } - /** - * Do nothing : closed listener has been already injected. - */ - public void setClosedListener (ClosedListener exceptionListner) - { - } - - /** - * Callback method used for error notifications while underlying connection is closing. - */ - public void onClosed (ErrorCode errorCode, String reason, Throwable throwable) - { - _valid = false; - LOGGER.error( - throwable, - "<QMAN-100012> : Error on closing connection. Reason is : %s, error code is %s. Connection %s to %s will be " + - "marked as invalid and therefore will be purged..", - reason, - errorCode.getCode(), - this, - _connectionData); - } - }; - /** - * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of + * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of * the broker connection(s). - * + * * @author Andrea Gazzarini */ class QpidConnectionFactory extends BasePoolableObjectFactory - { + { private final BrokerConnectionData _connectionData; private final UUID _brokerId; - + /** * Builds a new connection factory with the given parameters. - * + * * @param brokerId the broker identifier. * @param connectionData the connecton data. */ @@ -188,68 +125,67 @@ public final class QpidDatasource this._connectionData = connectionData; this._brokerId = brokerId; } - + /** * Creates a new underlying connection. */ @Override public Connection makeObject () throws Exception { - Connection connection = Client.createConnection(); + PooledConnection connection = new PooledConnection(_brokerId); connection.connect( - _connectionData.getHost(), - _connectionData.getPort(), - _connectionData.getVirtualHost(), - _connectionData.getUsername(), + _connectionData.getHost(), + _connectionData.getPort(), + _connectionData.getVirtualHost(), + _connectionData.getUsername(), _connectionData.getPassword()); - return new ConnectionDecorator(_brokerId,connection,_connectionData); + return connection; } - + /** * Validates the underlying connection. */ @Override public boolean validateObject (Object obj) { - ConnectionDecorator connection = (ConnectionDecorator) obj; - return connection.isValid(); + PooledConnection connection = (PooledConnection) obj; + boolean isValid = connection.isValid(); + LOGGER.debug("<QMAN-200013> : Test connection on reserve. Is valid? %s",isValid); + return isValid; } - + /** * Closes the underlying connection. */ @Override public void destroyObject (Object obj) throws Exception { - ConnectionDecorator connection = (ConnectionDecorator) obj; try { - connection._decoratee.close(); - LOGGER.debug("<QMAN-200014> : Connection for %s has been closed.",connection._connectionData); - } catch (Exception exception) + PooledConnection connection = (PooledConnection) obj; + connection.reallyClose(); + LOGGER.debug("<QMAN-200014> : Connection has been destroyed."); + } catch (Exception e) { - LOGGER.debug( - exception, - "<QMAN-200015> : Unable to close an underlying qpid connection (target address is %s) .", - connection._connectionData); + LOGGER.debug(e, "<QMAN-200015> : Unable to destroy a connection object"); } } } - + // Singleton instance. private static QpidDatasource instance = new QpidDatasource(); // Each entry contains a connection pool for a specific broker. private Map<UUID, ObjectPool> pools = new HashMap<UUID, ObjectPool>(); - + // Private constructor. private QpidDatasource() { } - + /** * Gets an available connection from the pool of the given broker. - * + * * @param brokerId the broker identifier. * @return a valid connection to the broker associated with the given identifier. */ @@ -257,20 +193,20 @@ public final class QpidDatasource { return (Connection) pools.get(brokerId).borrowObject(); } - + /** * Entry point method for retrieving the singleton instance of this datasource. - * + * * @return the qpid datasource singleton instance. */ - public static QpidDatasource getInstance() + public static QpidDatasource getInstance() { return instance; } - + /** * Adds a connection pool to this datasource. - * + * * @param brokerId the broker identifier that will be associated with the new connection pool. * @param connectionData the broker connection data. * @throws Exception when the pool cannot be created. @@ -285,20 +221,9 @@ public final class QpidDatasource false); ObjectPool pool = factory.createPool(); - // Open connections at startup according to initial capacity param value. - int howManyConnectionAtStartup = connectionData.getInitialPoolCapacity(); - Object [] openStartupList = new Object[howManyConnectionAtStartup]; - - // Open... - for (int index = 0; index < howManyConnectionAtStartup; index++) - { - openStartupList[index] = pool.borrowObject(); - } - - // ...and immediately return them to pool. In this way the pooled connection has been opened. - for (int index = 0; index < howManyConnectionAtStartup; index++) + for (int i = 0; i < connectionData.getInitialPoolCapacity(); i++) { - pool.returnObject(openStartupList[index]); + pool.returnObject(pool.borrowObject()); } pools.put(brokerId,pool); diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java index 9ef83f1d4f..58578c407d 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java @@ -739,7 +739,7 @@ class QpidClass try { _service.connect(); - _service.requestSchema(_parent.getName(), _name, _hash); + // _service.requestSchema(_parent.getName(), _name, _hash); _service.sync(); } finally { @@ -770,7 +770,7 @@ class QpidClass int sequenceNumber = SequenceNumberGenerator.getNextSequenceNumber(); _methodInvocationListener.operationIsGoingToBeInvoked(new InvocationEvent(this,sequenceNumber,_exchangeChannelForMethodInvocations)); - _service.invoke(_parent.getName(), _name, _hash,objectId,parameters, method,sequenceNumber); + // _service.invoke(_parent.getName(), _name, _hash,objectId,parameters, method,sequenceNumber); // TODO : Shoudl be configurable? InvocationResult result = _exchangeChannelForMethodInvocations.poll(5000,TimeUnit.MILLISECONDS); diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java index fa2d1c4021..4bda450315 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java @@ -20,70 +20,131 @@ */ package org.apache.qpid.management.domain.services; -import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import org.apache.qpid.QpidException; -import org.apache.qpid.api.Message; +import org.apache.qpid.management.Constants; import org.apache.qpid.management.Names; +import org.apache.qpid.management.configuration.Configuration; import org.apache.qpid.management.configuration.QpidDatasource; -import org.apache.qpid.management.domain.model.QpidMethod; -import org.apache.qpid.management.domain.model.type.Binary; -import org.apache.qpid.management.messages.MethodInvocationRequestMessage; -import org.apache.qpid.management.messages.SchemaRequestMessage; -import org.apache.qpid.nclient.Connection; -import org.apache.qpid.nclient.Session; import org.apache.qpid.nclient.util.MessageListener; import org.apache.qpid.nclient.util.MessagePartListenerAdapter; +import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; import org.apache.qpid.transport.util.Logger; /** * Qpid Broker facade. - * + * * @author Andrea Gazzarini */ -public class QpidService +public class QpidService implements SessionListener { private final static Logger LOGGER = Logger.get(QpidService.class); - + + // Inner static class used for logging and avoid conditional logic (isDebugEnabled()) duplication. + private static class Log + { + /** + * Logs the content f the message. + * This will be written on log only if DEBUG level is enabled. + * + * @param messageContent the raw content of the message. + */ + static void logMessageContent(byte [] messageContent) + { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "<QMAN-200001> : Message has been sent to management exchange. Message content : %s", + Arrays.toString(messageContent)); + } + } + + /** + * Logs the content f the message. + * This will be written on log only if DEBUG level is enabled. + * + * @param messageContent the raw content of the message. + */ + static void logMessageContent(ByteBuffer messageContent) + { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "<QMAN-200002> : Message has been sent to management exchange."); + } + } + } + private UUID _brokerId; private Connection _connection; private Session _session; - + private Map<String,MessagePartListenerAdapter> _listeners; + /** * Builds a new service with the given connection data. - * + * * @param connectionData the connection data of the broker. */ - public QpidService(UUID brokerId) + public QpidService(UUID brokerId) { this._brokerId = brokerId; } - + /** * Estabilishes a connection with the broker. - * + * * @throws QpidException in case of connection failure. */ public void connect() throws Exception { _connection = QpidDatasource.getInstance().getConnection(_brokerId); - _session = _connection.createSession(0); + _listeners = new ConcurrentHashMap<String,MessagePartListenerAdapter>(); + _session = _connection.createSession(Constants.NO_EXPIRATION); + _session.setSessionListener(this); + } + + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) + { + MessagePartListenerAdapter l = _listeners.get(xfr.getDestination()); + if (l == null) + { + LOGGER.error("unhandled message: %s", xfr); + } + else + { + l.messageTransfer(xfr); + } } - + + public void exception(Session ssn, SessionException exc) + { + LOGGER.error(exc, "session %s exception", ssn); + } + + public void closed(Session ssn) {} + /** - * All the previously entered outstanding commands are asynchronous. + * All the previously entered outstanding commands are asynchronous. * Synchronous behavior is achieved through invoking this method. */ - public void sync() + public void sync() { _session.sync(); } - + /** * Closes communication with broker. */ @@ -91,51 +152,52 @@ public class QpidService { try { - _session.close(); _session = null; + _listeners = null; } catch (Exception e) { } try { - _connection.close(); + _connection.close(); _connection = null; } catch (Exception e) { } } - + /** * Associate a message listener with a destination therefore creating a new subscription. - * + * * @param queueName The name of the queue that the subscriber is receiving messages from. * @param destinationName the name of the destination, or delivery tag, for the subscriber. - * @param listener the listener for this destination. - * + * @param listener the listener for this destination. + * * @see Session#messageSubscribe(String, String, short, short, org.apache.qpid.nclient.MessagePartListener, java.util.Map, org.apache.qpid.transport.Option...) */ - public void createSubscription(String queueName, String destinationName,MessageListener listener) + public void createSubscription(String queueName, String destinationName, MessageListener listener) { - _session.messageSubscribe( - queueName, - destinationName, - Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, - Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, - new MessagePartListenerAdapter(listener), null); - - _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES); - _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Integer.MAX_VALUE); - + _listeners.put(destinationName, new MessagePartListenerAdapter(listener)); + _session.messageSubscribe + (queueName, + destinationName, + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); + + _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT); + _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT); + LOGGER.debug( - "<QMAN-200003> : New subscription between queue %s and destination %s has been declared.", + "<QMAN-200003> : New subscription between queue %s and destination %s has been declared.", queueName, destinationName); } - + /** * Removes a previously declared consumer from the broker. - * + * * @param destinationName the name of the destination, or delivery tag, for the subscriber. * @see Session#messageCancel(String, Option...) */ @@ -143,10 +205,10 @@ public class QpidService { _session.messageCancel(destinationName); LOGGER.debug( - "<QMAN-200026> : Subscription named %s has been removed from remote broker.", + "<QMAN-200026> : Subscription named %s has been removed from remote broker.", destinationName); - } - + } + /** * Declares a queue on the broker with the given name. * @@ -170,27 +232,27 @@ public class QpidService _session.queueDelete(queueName); LOGGER.debug("<QMAN-2000025> : Queue with name %s has been removed.",queueName); } - + /** * Binds (on the broker) a queue with an exchange. * - * @param queueName the name of the queue to bind. + * @param queueName the name of the queue to bind. * @param exchangeName the exchange name. - * @param routingKey the routing key used for the binding. + * @param routingKey the routing key used for the binding. * @see Session#exchangeBind(String, String, String, java.util.Map, Option...) */ public void declareBinding(String queueName, String exchangeName, String routingKey) { _session.exchangeBind(queueName, exchangeName, routingKey, null); LOGGER.debug( - "<QMAN-200005> : New binding with %s as routing key has been declared between queue %s and exchange %s.", + "<QMAN-200005> : New binding with %s as routing key has been declared between queue %s and exchange %s.", routingKey,queueName, exchangeName); } - + /** * Removes a previously declare binding between an exchange and a queue. - * + * * @param queueName the name of the queue. * @param exchangeName the name of the exchange. * @param routingKey the routing key used for binding. @@ -199,141 +261,42 @@ public class QpidService { _session.exchangeUnbind(queueName, exchangeName, routingKey); LOGGER.debug( - "<QMAN-200005> : Binding with %s as routing key has been removed between queue %s and exchange %s.", + "<QMAN-200005> : Binding with %s as routing key has been removed between queue %s and exchange %s.", routingKey,queueName, exchangeName); } - + /** - * Sends a command message. - * - * @param message the command message. - * @throws IOException when the message cannot be sent. + * Sends a command message with the given data on the management queue. + * + * @param messageData the command message content. */ - public void sendMessage(Message message) throws IOException + public void sendCommandMessage(byte [] messageData) { _session.messageTransfer( Names.MANAGEMENT_EXCHANGE, - message, - MessageAcceptMode.EXPLICIT.getValue(), - MessageAcquireMode.PRE_ACQUIRED.getValue()); - } - - /** - * Requests a schema for the given package.class.hash. - * - * @param packageName the package name. - * @param className the class name. - * @param schemaHash the schema hash. - * @throws IOException when the schema request cannot be sent. - */ - public void requestSchema(final String packageName, final String className, final Binary schemaHash) throws IOException - { - Message message = new SchemaRequestMessage() - { - @Override - protected String className () - { - return className; - } - - @Override - protected String packageName () - { - return packageName; - } + MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED, + Configuration.getInstance().getCommandMessageHeader(), + messageData); - @Override - protected Binary schemaHash () - { - return schemaHash; - } - }; - - sendMessage(message); + Log.logMessageContent (messageData); } - + /** - * Invokes an operation on a broker object instance. - * - * @param packageName the package name. - * @param className the class name. - * @param schemaHash the schema hash of the corresponding class. - * @param objectId the object instance identifier. - * @param parameters the parameters for this invocation. - * @param method the method (definition) invoked. - * @return the sequence number used for this message. - * @throws MethodInvocationException when the invoked method returns an error code. - * @throws UnableToComplyException when it wasn't possibile to invoke the requested operation. + * Sends a command message with the given data on the management queue. + * + * @param messageData the command message content. */ - public void invoke( - final String packageName, - final String className, - final Binary schemaHash, - final Binary objectId, - final Object[] parameters, - final QpidMethod method, - final int sequenceNumber) throws MethodInvocationException, UnableToComplyException + public void sendCommandMessage(ByteBuffer messageData) { - Message message = new MethodInvocationRequestMessage() - { - - @Override - protected int sequenceNumber () - { - return sequenceNumber; - } - - protected Binary objectId() { - return objectId; - } - - protected String packageName() - { - return packageName; - } - - protected String className() - { - return className; - } - - @Override - protected QpidMethod method () - { - return method; - } - - @Override - protected Object[] parameters () - { - return parameters; - } + _session.messageTransfer( + Names.MANAGEMENT_EXCHANGE, + MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED, + Configuration.getInstance().getCommandMessageHeader(), + messageData); - @Override - protected Binary schemaHash () - { - return schemaHash; - } - }; - - try { - sendMessage(message); - sync(); -// ReturnValueObject invocationResult = Configuration.getInstance()._resultExchangeChannel.poll(2000,TimeUnit.MILLISECONDS); -// if (invocationResult == null) { -// return null; -// } -// if (invocationResult.isException()) -// { -// invocationResult.createAndThrowException(); -// } -// return invocationResult; -// } catch(MethodInvocationException exception) -// { -// throw exception; - } catch(Exception exception) { - throw new UnableToComplyException(exception); - } - } + Log.logMessageContent (messageData); + } }
\ No newline at end of file |
