diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2008-10-13 08:16:03 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2008-10-13 08:16:03 +0000 |
| commit | fcf918276ab87f573f07590a3f3e1f4c9bedbba7 (patch) | |
| tree | 43a864977ae14cde0ceb29ba9499d2d586cb408c /java/management/client/src | |
| parent | 493993c4b8851ba1a1170abb928d2f37d8060e83 (diff) | |
| download | qpid-python-fcf918276ab87f573f07590a3f3e1f4c9bedbba7.tar.gz | |
qpid-1284: on behalf Adnrea (latest posted patch)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703961 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/management/client/src')
44 files changed, 750 insertions, 555 deletions
diff --git a/java/management/client/src/main/java/org/apache/qpid/management/Names.java b/java/management/client/src/main/java/org/apache/qpid/management/Names.java index dde7715509..8ab9e13d4f 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/Names.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/Names.java @@ -29,13 +29,13 @@ public interface Names { /** Name of the qpid management exchange. */ String MANAGEMENT_EXCHANGE = "qpid.management"; - String MANAGEMENT_ROUTING_KEY = "console.#"; + String MANAGEMENT_ROUTING_KEY = "mgmt.#"; String MANAGEMENT_QUEUE_PREFIX = "management."; String METHOD_REPLY_QUEUE_PREFIX = "reply."; String AMQ_DIRECT_QUEUE = "amq.direct"; - String AGENT_ROUTING_KEY = "agent.1.0"; + String AGENT_ROUTING_KEY = "agent.0"; String BROKER_ROUTING_KEY = "broker"; @@ -49,4 +49,4 @@ public interface Names String CONFIGURATION_FILE_NAME = "/org/apache/qpid/management/config.xml"; String ARG_COUNT_PARAM_NAME = "argCount"; -} +}
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/Protocol.java b/java/management/client/src/main/java/org/apache/qpid/management/Protocol.java index 185f417448..169e28e26e 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/Protocol.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/Protocol.java @@ -27,8 +27,15 @@ package org.apache.qpid.management; */ public interface Protocol { - String MAGIC_NUMBER = "AM2"; + String MAGIC_NUMBER = "AM1"; - byte [] METHOD_REQUEST_FIRST_FOUR_BYTES = (MAGIC_NUMBER+"M").getBytes(); - byte [] SCHEMA_REQUEST_FIRST_FOUR_BYTES = (MAGIC_NUMBER+"S").getBytes(); -} + char SCHEMA_REQUEST_OPCODE = 'S'; + char SCHEMA_RESPONSE_OPCODE = Character.toLowerCase(SCHEMA_REQUEST_OPCODE); + + char OPERATION_INVOCATION_REQUEST_OPCODE = 'M'; + char OPERATION_INVOCATION_RESPONSE_OPCODE = Character.toLowerCase(OPERATION_INVOCATION_REQUEST_OPCODE); + + char INSTRUMENTATION_CONTENT_RESPONSE_OPCODE = 'i'; + char CONFIGURATION_CONTENT_RESPONSE_OPCDE = 'c'; + char INSTR_AND_CONFIG_CONTENT_RESPONSE_OPCODE = 'g'; +}
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/config.xml b/java/management/client/src/main/java/org/apache/qpid/management/config.xml index 590e33a0f7..636ef9cb05 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/config.xml +++ b/java/management/client/src/main/java/org/apache/qpid/management/config.xml @@ -1,26 +1,4 @@ <configuration> - <message-handlers> - <management-queue> - <handler> - <opcode>i</opcode> - <class-name>org.apache.qpid.management.domain.handler.impl.InstrumentationMessageHandler</class-name> - </handler> - <handler> - <opcode>c</opcode> - <class-name>org.apache.qpid.management.domain.handler.impl.ConfigurationMessageHandler</class-name> - </handler> - </management-queue> - <method-reply-queue> - <handler> - <opcode>m</opcode> - <class-name>org.apache.qpid.management.domain.handler.impl.MethodResponseMessageHandler</class-name> - </handler> - <handler> - <opcode>s</opcode> - <class-name>org.apache.qpid.management.domain.handler.impl.SchemaResponseMessageHandler</class-name> - </handler> - </method-reply-queue> - </message-handlers> <type-mappings> <mapping> <code>1</code> @@ -99,7 +77,7 @@ <user>guest</user> <password>guest</password> <max-pool-capacity>4</max-pool-capacity> - <initial-pool-capacity>4</initial-pool-capacity> + <initial-pool-capacity>0</initial-pool-capacity> <max-wait-timeout>-1</max-wait-timeout> </broker> </brokers> diff --git a/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java b/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java index d76baed07f..bbb5380d7d 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java @@ -29,13 +29,16 @@ import org.apache.qpid.transport.util.Logger; * For each access-mode-mappings/mapping element found in the configuration file, a new access mode mapping * is built and injected into the bridge configuration. * - * <broker> - <host>192.168.61.130</host> - <port>5673</port> - <virtual-host>test</virtual-host> - <user>andrea</user> - <password>andrea</password> - </broker> + <broker> + <host>192.168.148.131</host> + <port>5672</port> + <virtual-host>test</virtual-host> + <user>guest</user> + <password>guest</password> + <max-pool-capacity>4</max-pool-capacity> + <initial-pool-capacity>4</initial-pool-capacity> + <max-wait-timeout>-1</max-wait-timeout> + </broker> * * @author Andrea Gazzarini */ diff --git a/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java b/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java index cd39d330bb..6b47c06510 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java @@ -25,9 +25,12 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.Map.Entry; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.SynchronousQueue; import org.apache.qpid.management.Names; import org.apache.qpid.management.domain.handler.base.IMessageHandler; +import org.apache.qpid.management.domain.handler.impl.InvocationResult; import org.apache.qpid.management.domain.model.AccessMode; import org.apache.qpid.management.domain.model.type.Type; import org.apache.qpid.transport.DeliveryProperties; @@ -60,6 +63,9 @@ public final class Configuration private String _methodReplyQueueName; private Header _headerForCommandMessages; + private DeliveryProperties _deliveryProperties = new DeliveryProperties(); + private MessageProperties _messageProperties = new MessageProperties(); + public BlockingQueue<InvocationResult> _resultExchangeChannel = new SynchronousQueue<InvocationResult>(); // Private constructor. private Configuration() @@ -236,6 +242,26 @@ public final class Configuration } /** + * Returns the command message properties. + * + * @return the command message properties. + */ + public MessageProperties getCommandMessageProperties () + { + return _messageProperties; + } + + /** + * Returns the command message delivery properties. + * + * @return the command message delivery properties. + */ + public DeliveryProperties getCommandDeliveryProperties () + { + return _deliveryProperties; + } + + /** * Adds a new type mapping to this configuration. * * @param mapping the type mapping that will be added. @@ -314,16 +340,11 @@ public final class Configuration */ private void createHeaderForCommandMessages () { - MessageProperties messageProperties = new MessageProperties(); - ReplyTo replyTo=new ReplyTo(); replyTo.setRoutingKey(_methodReplyQueueName); - messageProperties.setReplyTo(replyTo); - - DeliveryProperties deliveryProperties = new DeliveryProperties(); - deliveryProperties.setRoutingKey(Names.AGENT_ROUTING_KEY); - - _headerForCommandMessages = new Header(deliveryProperties, messageProperties); + _messageProperties.setReplyTo(replyTo); + _deliveryProperties.setRoutingKey(Names.AGENT_ROUTING_KEY); + _headerForCommandMessages = new Header(_deliveryProperties, _messageProperties); } /** @@ -339,5 +360,5 @@ public final class Configuration LOGGER.debug("<QMAN-200021> : Management queue name : %s",_managementQueueName); LOGGER.debug("<QMAN-000022> : Method-reply queue name : %s",_methodReplyQueueName); - } + } }
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java b/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java index fb33622f4d..3cd76c06e9 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configurator.java @@ -27,6 +27,11 @@ import javax.xml.parsers.SAXParser; import javax.xml.parsers.SAXParserFactory; import org.apache.qpid.management.Names; +import org.apache.qpid.management.Protocol; +import org.apache.qpid.management.domain.handler.impl.ConfigurationMessageHandler; +import org.apache.qpid.management.domain.handler.impl.InstrumentationMessageHandler; +import org.apache.qpid.management.domain.handler.impl.MethodResponseMessageHandler; +import org.apache.qpid.management.domain.handler.impl.SchemaResponseMessageHandler; import org.xml.sax.Attributes; import org.xml.sax.InputSource; import org.xml.sax.SAXException; @@ -57,9 +62,6 @@ public class Configurator extends DefaultHandler IParser _typeMappingParser = new TypeMappingParser(); IParser _accessModeMappingParser = new AccessModeMappingParser(); IParser _brokerConfigurationParser = new BrokerConnectionDataParser(); - IParser _managementQueueHandlerParser = new ManagementQueueMessageListenerParser(); - IParser _methodReplyQueueHandlerParser = new MethodReplyQueueMessageListenerParser(); - IParser _currentParser = DEFAULT_PARSER; /** @@ -95,16 +97,6 @@ public class Configurator extends DefaultHandler _currentParser = _brokerConfigurationParser; break; } - case MANAGEMENT_QUEUE: - { - _currentParser = _managementQueueHandlerParser; - break; - } - case METHOD_REPLY_QUEUE: - { - _currentParser = _methodReplyQueueHandlerParser; - break; - } } } @@ -127,6 +119,11 @@ public class Configurator extends DefaultHandler BufferedReader reader = new BufferedReader(new InputStreamReader(getClass().getResourceAsStream(getConfigurationFileName()),"UTF8")); InputSource source = new InputSource(reader); parser.parse(source, this); + + // Hard-coded configuration for message handlers : we need that because those handler mustn't be configurable. + // QMan is not able to work without them! + addMandatoryManagementMessageHandlers(); + addMandatoryMethodReplyMessageHandlers(); } catch (Exception exception) { throw new ConfigurationException(exception); @@ -134,6 +131,38 @@ public class Configurator extends DefaultHandler } /** + * Configures the mandatory management message handlers. + */ + private void addMandatoryMethodReplyMessageHandlers () + { + Configuration.getInstance().addMethodReplyMessageHandlerMapping( + new MessageHandlerMapping( + Protocol.OPERATION_INVOCATION_RESPONSE_OPCODE, + MethodResponseMessageHandler.class.getName())); + + Configuration.getInstance().addMethodReplyMessageHandlerMapping( + new MessageHandlerMapping( + Protocol.SCHEMA_RESPONSE_OPCODE, + SchemaResponseMessageHandler.class.getName())); + } + + /** + * Configures the mandatory management message handlers. + */ + private void addMandatoryManagementMessageHandlers () + { + Configuration.getInstance().addManagementMessageHandlerMapping( + new MessageHandlerMapping( + Protocol.INSTRUMENTATION_CONTENT_RESPONSE_OPCODE, + InstrumentationMessageHandler.class.getName())); + + Configuration.getInstance().addManagementMessageHandlerMapping( + new MessageHandlerMapping( + Protocol.CONFIGURATION_CONTENT_RESPONSE_OPCDE, + ConfigurationMessageHandler.class.getName())); + } + + /** * Returns the name of the configuration file. * * @return the name of the configuration file. diff --git a/java/management/client/src/main/java/org/apache/qpid/management/configuration/MessageHandlerMapping.java b/java/management/client/src/main/java/org/apache/qpid/management/configuration/MessageHandlerMapping.java index 7ce3baae49..5e38346a2d 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/configuration/MessageHandlerMapping.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/configuration/MessageHandlerMapping.java @@ -31,6 +31,24 @@ class MessageHandlerMapping private String _handlerClass; /** + * Builds an empty message handler mapping. + */ + MessageHandlerMapping() + { + } + + /** + * Builds a new mapping with the given opcode and handler class. + * + * @param opcode the opcode. + * @param handlerClass the handler class. + */ + MessageHandlerMapping(Character opcode, String handlerClass) { + this._opcode = opcode; + this._handlerClass = handlerClass; + } + + /** * Returns the opcode of this mapping. * * @return the code of this mapping. diff --git a/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java b/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java index 2c36fb3d65..5a77675824 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java @@ -28,95 +28,158 @@ import org.apache.commons.pool.BasePoolableObjectFactory; import org.apache.commons.pool.ObjectPool; import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.commons.pool.impl.GenericObjectPoolFactory; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionException; +import org.apache.qpid.ErrorCode; +import org.apache.qpid.QpidException; +import org.apache.qpid.nclient.Client; +import org.apache.qpid.nclient.ClosedListener; +import org.apache.qpid.nclient.Connection; +import org.apache.qpid.nclient.DtxSession; +import org.apache.qpid.nclient.Session; import org.apache.qpid.transport.util.Logger; /** * Qpid datasource. - * Basically it is a connection pool manager used for optimizing broker connections usage. - * + * Basically it is a connection pool manager used for optimizing broker connections usage. + * * @author Andrea Gazzarini */ -public final class QpidDatasource +public final class QpidDatasource { private final static Logger LOGGER = Logger.get(QpidDatasource.class); - + /** * A connection decorator used for adding pool interaction behaviour to an existing connection. - * + * * @author Andrea Gazzarini */ - class PooledConnection extends Connection + public class ConnectionDecorator implements Connection,ClosedListener { + private final Connection _decoratee; private final UUID _brokerId; private boolean _valid; - + private BrokerConnectionData _connectionData; + /** * Builds a new decorator with the given connection. - * + * * @param brokerId the broker identifier. * @param decoratee the underlying connection. + * @param connectionData connection details used for logging purposes. */ - private PooledConnection(UUID brokerId) + private ConnectionDecorator(UUID brokerId, Connection decoratee, BrokerConnectionData connectionData) { + this._decoratee = decoratee; this._brokerId = brokerId; + this._decoratee.setClosedListener(this); + this._connectionData = connectionData; _valid = true; + LOGGER.debug("<QMAN-200045> : Connection %s for pool %s created.",this,_connectionData); } - + /** * Returns true if the underlying connection is still valid and can be used. - * + * * @return true if the underlying connection is still valid and can be used. */ boolean isValid() { + if (_valid) + { + LOGGER.debug("<QMAN-200013> : Pooled connection %s for %s seems to be valid.",this,_connectionData); + } else + { + LOGGER.debug("<QMAN-200013> : Pooled connection %s for %s has been marked as invalid.",this,_connectionData); + } return _valid; } - - void reallyClose() - { - super.close(); - } - + /** * Returns the connection to the pool. That is, marks this connections as available. * After that, this connection will be available for further operations. */ - public void close() + public void close () throws QpidException { try { pools.get(_brokerId).returnObject(this); - LOGGER.debug("<QMAN-200012> : Connection %s returned to the pool.", this); - } - catch (Exception e) + LOGGER.debug("<QMAN-200012> : <Connection for pool %s released.", _connectionData); + } catch (Exception exception) { - throw new ConnectionException(e); - } + throw new QpidException("<QMAN-100203> : Error while releasing pooled connection.",ErrorCode.CONNECTION_ERROR,exception); + } } - public void exception(Throwable t) + /** + * Do nothing : underlying connection is already connected. + */ + public void connect (String host, int port, String virtualHost, String username, String password) + throws QpidException { - super.exception(t); - _valid = false; + // DO NOTHING : DECORATEE CONNECTION IS ALREADY CONNECTED. + } + + /** + * Do nothing : underlying connection is already connected. + */ + public void connect (String url) throws QpidException + { + // DO NOTHING : DECORATEE CONNECTION IS ALREADY CONNECTED. + } + + /** + * @see Connection#createDTXSession(int) + */ + public DtxSession createDTXSession (int expiryInSeconds) + { + return _decoratee.createDTXSession(expiryInSeconds); } - } + /** + * @see Connection#createSession(long) + */ + public Session createSession (long expiryInSeconds) + { + return _decoratee.createSession(expiryInSeconds); + } + + /** + * Do nothing : closed listener has been already injected. + */ + public void setClosedListener (ClosedListener exceptionListner) + { + } + + /** + * Callback method used for error notifications while underlying connection is closing. + */ + public void onClosed (ErrorCode errorCode, String reason, Throwable throwable) + { + _valid = false; + LOGGER.error( + throwable, + "<QMAN-100012> : Error on closing connection. Reason is : %s, error code is %s. Connection %s to %s will be " + + "marked as invalid and therefore will be purged..", + reason, + errorCode.getCode(), + this, + _connectionData); + } + }; + /** - * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of + * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of * the broker connection(s). - * + * * @author Andrea Gazzarini */ class QpidConnectionFactory extends BasePoolableObjectFactory - { + { private final BrokerConnectionData _connectionData; private final UUID _brokerId; - + /** * Builds a new connection factory with the given parameters. - * + * * @param brokerId the broker identifier. * @param connectionData the connecton data. */ @@ -125,67 +188,68 @@ public final class QpidDatasource this._connectionData = connectionData; this._brokerId = brokerId; } - + /** * Creates a new underlying connection. */ @Override public Connection makeObject () throws Exception { - PooledConnection connection = new PooledConnection(_brokerId); + Connection connection = Client.createConnection(); connection.connect( - _connectionData.getHost(), - _connectionData.getPort(), - _connectionData.getVirtualHost(), - _connectionData.getUsername(), + _connectionData.getHost(), + _connectionData.getPort(), + _connectionData.getVirtualHost(), + _connectionData.getUsername(), _connectionData.getPassword()); - return connection; + return new ConnectionDecorator(_brokerId,connection,_connectionData); } - + /** * Validates the underlying connection. */ @Override public boolean validateObject (Object obj) { - PooledConnection connection = (PooledConnection) obj; - boolean isValid = connection.isValid(); - LOGGER.debug("<QMAN-200013> : Test connection on reserve. Is valid? %s",isValid); - return isValid; + ConnectionDecorator connection = (ConnectionDecorator) obj; + return connection.isValid(); } - + /** * Closes the underlying connection. */ @Override public void destroyObject (Object obj) throws Exception { + ConnectionDecorator connection = (ConnectionDecorator) obj; try { - PooledConnection connection = (PooledConnection) obj; - connection.reallyClose(); - LOGGER.debug("<QMAN-200014> : Connection has been destroyed."); - } catch (Exception e) + connection._decoratee.close(); + LOGGER.debug("<QMAN-200014> : Connection for %s has been closed.",connection._connectionData); + } catch (Exception exception) { - LOGGER.debug(e, "<QMAN-200015> : Unable to destroy a connection object"); + LOGGER.debug( + exception, + "<QMAN-200015> : Unable to close an underlying qpid connection (target address is %s) .", + connection._connectionData); } } } - + // Singleton instance. private static QpidDatasource instance = new QpidDatasource(); // Each entry contains a connection pool for a specific broker. private Map<UUID, ObjectPool> pools = new HashMap<UUID, ObjectPool>(); - + // Private constructor. private QpidDatasource() { } - + /** * Gets an available connection from the pool of the given broker. - * + * * @param brokerId the broker identifier. * @return a valid connection to the broker associated with the given identifier. */ @@ -193,20 +257,20 @@ public final class QpidDatasource { return (Connection) pools.get(brokerId).borrowObject(); } - + /** * Entry point method for retrieving the singleton instance of this datasource. - * + * * @return the qpid datasource singleton instance. */ - public static QpidDatasource getInstance() + public static QpidDatasource getInstance() { return instance; } - + /** * Adds a connection pool to this datasource. - * + * * @param brokerId the broker identifier that will be associated with the new connection pool. * @param connectionData the broker connection data. * @throws Exception when the pool cannot be created. @@ -221,9 +285,20 @@ public final class QpidDatasource false); ObjectPool pool = factory.createPool(); - for (int i = 0; i < connectionData.getInitialPoolCapacity(); i++) + // Open connections at startup according to initial capacity param value. + int howManyConnectionAtStartup = connectionData.getInitialPoolCapacity(); + Object [] openStartupList = new Object[howManyConnectionAtStartup]; + + // Open... + for (int index = 0; index < howManyConnectionAtStartup; index++) + { + openStartupList[index] = pool.borrowObject(); + } + + // ...and immediately return them to pool. In this way the pooled connection has been opened. + for (int index = 0; index < howManyConnectionAtStartup; index++) { - pool.returnObject(pool.borrowObject()); + pool.returnObject(openStartupList[index]); } pools.put(brokerId,pool); diff --git a/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java b/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java index 9b3a4a3f6f..fed926e214 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/configuration/Tag.java @@ -26,9 +26,7 @@ public enum Tag { VIRTUAL_HOST { @Override public String toString() { return "virtual-host"; }}, USER { @Override public String toString() { return "user"; }}, PASSWORD { @Override public String toString() { return "password"; }}, - BROKERS { @Override public String toString() { return "brokers"; }}, - MANAGEMENT_QUEUE { @Override public String toString() { return "management-queue"; }}, - METHOD_REPLY_QUEUE { @Override public String toString() { return "method-reply-queue"; }}; + BROKERS { @Override public String toString() { return "brokers"; }}; /** * Returns the enum entry associated to the given tag name. diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/base/ContentIndicationMessageHandler.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/base/ContentIndicationMessageHandler.java index 21a3281fa9..a5b30b382a 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/base/ContentIndicationMessageHandler.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/base/ContentIndicationMessageHandler.java @@ -46,8 +46,6 @@ public abstract class ContentIndicationMessageHandler extends BaseMessageHandler long timeObjectWasCreated = decoder.readDatetime(); long timeObjectWasDeleted = decoder.readDatetime(); - - Binary objectId = new Binary(decoder.readBin128()); if (objectHasBeenRemoved(timeObjectWasDeleted, timeStampOfCurrentSample)) diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/MethodResponseMessageHandler.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/MethodResponseMessageHandler.java index 24db2e963d..ba24a8d4e6 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/MethodResponseMessageHandler.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/MethodResponseMessageHandler.java @@ -20,22 +20,90 @@ */ package org.apache.qpid.management.domain.handler.impl; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + import org.apache.qpid.management.domain.handler.base.BaseMessageHandler; +import org.apache.qpid.management.domain.model.DomainModel; +import org.apache.qpid.management.domain.model.InvocationEvent; import org.apache.qpid.transport.codec.ManagementDecoder; import org.apache.qpid.transport.util.Logger; +/** + * Message handler for method response messages. + * This handler is installed on domain model as a method invocation result listener. + * When a method is going to be invoked this listener is notified with the exchange channel that will be used between it and + * the event (method invocation) source object. + * + * @author Andrea Gazzarini + * + */ public class MethodResponseMessageHandler extends BaseMessageHandler { private final static Logger LOGGER = Logger.get(MethodResponseMessageHandler.class); + + private Map<Integer, BlockingQueue<InvocationResult>> _exchangeChannels = new HashMap<Integer, BlockingQueue<InvocationResult>>(); - public void process (ManagementDecoder decoder, int sequenceNumber) + /** + * This is the listener installed on domain model for method invocations. + */ + private final IMethodInvocationListener methodInvocationListener = new IMethodInvocationListener() { - LOGGER.debug("<QMAN-200009> : Incoming method response message."); - - long statusCode = decoder.readUint32(); - String statusText = decoder.readStr8(); - - LOGGER.debug("<QMAN-200010> : Status code : %s", statusCode); - LOGGER.debug("<QMAN-200011> : Status text : %s", statusText); + /** + * Event source callback. + * A method is going to be invoked and this method lets this listener take the exchange channel that will be used + * with the event source for synchronous communication. + * + * @param event the operation invocation event. + */ + public void operationIsGoingToBeInvoked (InvocationEvent event) + { + _exchangeChannels.put(event.getSequenceNumber(), event.getExchangeChannel()); + } + }; + + /** + * Processes the incoming message. + * + * @param decoder the decoder used for parsing incoming data. + * @param sequenceNumber the sequence number of the incoming message. + */ + public void process (ManagementDecoder decoder, int sequenceNumber) + { + InvocationResult result = new InvocationResult(decoder.readUint32(), decoder.readStr8(),decoder.readReaminingBytes()); + BlockingQueue<InvocationResult> exchangeChannel = _exchangeChannels.remove(sequenceNumber); + if (exchangeChannel != null) + { + try + { + exchangeChannel.put(result); + } catch (InterruptedException exception) + { + LOGGER.error( + exception, + "<QMAN-100044> : an exception occurred while storing the result of a method invocation. " + + "Sequence number was %s", + sequenceNumber); + } + } else + { + LOGGER.warn( + "Unable to deal with incoming message because it contains a unknown sequence number (%s).", + sequenceNumber); + } } + + /** + * Sets the domain model on this handler. + * In addiction, this handler registers a method invocation listener on the domain model. + * + * @param domainModel the managed broker domain model. + */ + @Override + public void setDomainModel (DomainModel domainModel) + { + super.setDomainModel(domainModel); + domainModel.setMethodInvocationListener(methodInvocationListener); + } }
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java index 497e264581..7a0ee556d2 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java @@ -49,10 +49,6 @@ public class SchemaResponseMessageHandler extends BaseMessageHandler { try { - int classKind = decoder.readUint8(); - if (classKind != 1) { - return; - } String packageName = decoder.readStr8(); String className = decoder.readStr8(); @@ -61,7 +57,7 @@ public class SchemaResponseMessageHandler extends BaseMessageHandler int howManyProperties = decoder.readUint16(); int howManyStatistics = decoder.readUint16(); int howManyMethods = decoder.readUint16(); - int howManyEvents = 0; + int howManyEvents = decoder.readUint16(); // FIXME : Divide between schema error and raw data conversion error!!!! _domainModel.addSchema( @@ -159,4 +155,4 @@ public class SchemaResponseMessageHandler extends BaseMessageHandler } return result; } - } + }
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/DomainModel.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/DomainModel.java index b24228d122..8ebd667893 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/DomainModel.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/DomainModel.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.qpid.management.domain.handler.impl.IMethodInvocationListener; import org.apache.qpid.management.domain.handler.impl.MethodOrEventDataTransferObject; import org.apache.qpid.management.domain.model.type.Binary; @@ -40,6 +41,8 @@ public class DomainModel /** Here the known packages of the remote broker are stored. */ Map<String,QpidPackage> _packages = new HashMap<String, QpidPackage>(); + + private IMethodInvocationListener _methodInvocationListener; /** * Builds a new domain model with the given broker identifier. @@ -171,4 +174,14 @@ public class DomainModel qpidPackage.releaseResources(); } } + + public void setMethodInvocationListener(IMethodInvocationListener listener) + { + this._methodInvocationListener = listener; + } + + IMethodInvocationListener getMethodInvocationListener () + { + return _methodInvocationListener; + } }
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidArgument.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidArgument.java index e38ad4cee0..94d532571a 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidArgument.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidArgument.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.management.domain.model; -import org.apache.qpid.transport.codec.ManagementEncoder; +import org.apache.qpid.management.messages.AmqpCoDec; +import org.apache.qpid.transport.codec.ManagementDecoder; import org.apache.qpid.transport.util.Logger; class QpidArgument extends QpidProperty @@ -68,10 +69,14 @@ class QpidArgument extends QpidProperty .toString(); } - public void validateAndEncode (Object value,ManagementEncoder encoder) throws ValidationException + public void encode(Object value,AmqpCoDec encoder) { - validate(value); _type.encode(value, encoder); - LOGGER.debug("Encoded value %S for argument %s. Type is %s",value,_name,_type); + LOGGER.debug("<QMAN-200012> : Encoded value %S for argument %s. Type is %s",value,_name,_type); + } + + public Object decode(ManagementDecoder decoder) + { + return _type.decode(decoder); } }
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java index 262e17db9a..9ef83f1d4f 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java @@ -27,6 +27,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; import javax.management.Attribute; import javax.management.AttributeList; @@ -43,12 +46,13 @@ import javax.management.ObjectName; import javax.management.ReflectionException; import javax.management.RuntimeOperationsException; -import org.apache.qpid.management.Protocol; +import org.apache.qpid.management.domain.handler.impl.IMethodInvocationListener; +import org.apache.qpid.management.domain.handler.impl.InvocationResult; import org.apache.qpid.management.domain.handler.impl.MethodOrEventDataTransferObject; import org.apache.qpid.management.domain.model.type.Binary; import org.apache.qpid.management.domain.services.QpidService; +import org.apache.qpid.management.domain.services.SequenceNumberGenerator; import org.apache.qpid.transport.codec.ManagementDecoder; -import org.apache.qpid.transport.codec.ManagementEncoder; import org.apache.qpid.transport.util.Logger; /** @@ -124,10 +128,20 @@ class QpidClass */ public synchronized void addConfigurationData (Binary objectId, byte[] rawData) { - schemaRequest(); - QpidManagedObject instance = getObjectInstance(objectId,false); - instance._rawConfigurationData.add(rawData); - _state = _schemaRequestedButNotYetInjected; + try + { + requestSchema(); + _state = _schemaRequestedButNotYetInjected; + } catch (Exception e) + { + LOGGER.error( + "<QMAN-100012> : Unable to send a schema request schema for %s.%s", + _parent.getName(), + _name); + } finally { + QpidManagedObject instance = getObjectInstance(objectId,false); + instance._rawConfigurationData.add(rawData); + } } /** @@ -139,10 +153,21 @@ class QpidClass */ public synchronized void addInstrumentationData (Binary objectId, byte[] rawData) { - schemaRequest(); - QpidManagedObject instance = getObjectInstance(objectId,false); - instance._rawConfigurationData.add(rawData); - _state = _schemaRequestedButNotYetInjected; + try + { + requestSchema(); + _state = _schemaRequestedButNotYetInjected; + } catch (Exception e) + { + LOGGER.error( + "<QMAN-100012> : Unable to send a schema request schema for %s.%s", + _parent.getName(), + _name); + } finally { + QpidManagedObject instance = getObjectInstance(objectId,false); + instance._rawConfigurationData.add(rawData); + _state = _schemaRequestedButNotYetInjected; + } } /** @@ -394,9 +419,9 @@ class QpidClass { try { - methodRequest(_objectId, method, params); - return null; - } catch (ValidationException exception) + method.validate(params); + return invokeMethod(_objectId, method, params); + } catch (Exception exception) { throw new MBeanException(exception); } @@ -495,10 +520,23 @@ class QpidClass private final QpidService _service; private int _howManyPresenceBitMasks; + private BlockingQueue<InvocationResult> _exchangeChannelForMethodInvocations; + private final IMethodInvocationListener _methodInvocationListener; Map<Binary, QpidManagedObject> _objectInstances = new HashMap<Binary, QpidManagedObject>(); State _state = _schemaNotRequested;; + private final static class Log + { + final static void logMethodInvocationResult(InvocationResult result) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug(String.valueOf(result)); + } + } + } + /** * Builds a new class with the given name and package as parent. * @@ -512,6 +550,9 @@ class QpidClass this._parent = parentPackage; this._hash = hash; this._service = new QpidService(_parent.getOwnerId()); + this._methodInvocationListener = _parent.getMethodInvocationListener(); + this._exchangeChannelForMethodInvocations = new SynchronousQueue<InvocationResult>(); + LOGGER.debug( "<QMAN-200017> : Class definition has been built (without schema) for %s::%s.%s", _parent.getOwnerId(), @@ -688,29 +729,18 @@ class QpidClass } } - private void schemaRequest() - { - ByteBuffer buffer = ByteBuffer.allocate(100); - ManagementEncoder encoder = new ManagementEncoder(buffer); - buffer.put(Protocol.SCHEMA_REQUEST_FIRST_FOUR_BYTES); - - // TODO - encoder.writeSequenceNo(1000); - encoder.writeStr8(_parent.getName()); - encoder.writeStr8(_name); - _hash.encode(encoder); - buffer.rewind(); + /** + * Internal method used to send a schema request for this class. + * + * @throws Exception when the request cannot be sent. + */ + private void requestSchema() throws Exception + { try { _service.connect(); - _service.sendCommandMessage(buffer); + _service.requestSchema(_parent.getName(), _name, _hash); _service.sync(); - } catch (Exception exception) - { - exception.printStackTrace(); - // TODO - // Log.logSchemaRequestNotSent(exception, - // _parent.getOwnerId(),_parent.getName(), _name); } finally { _service.close(); @@ -730,33 +760,30 @@ class QpidClass * @param objectId * @param method * @param parameters - * @throws ValidationException + * @throws Exception */ - private void methodRequest(Binary objectId,QpidMethod method,Object [] parameters) throws ValidationException + private InvocationResult invokeMethod(Binary objectId,QpidMethod method,Object [] parameters) throws Exception { - ByteBuffer buffer = ByteBuffer.allocate(1000); - ManagementEncoder encoder = new ManagementEncoder(buffer); - buffer.put(Protocol.METHOD_REQUEST_FIRST_FOUR_BYTES); - encoder.writeSequenceNo(0); - objectId.encode(encoder); - encoder.writeStr8(_parent.getName()); - encoder.writeStr8(_name); - _hash.encode(encoder); - encoder.writeStr8(method.getName()); - method.encodeParameters(parameters,encoder); - - buffer.rewind(); - try + try { _service.connect(); - _service.sendCommandMessage(buffer); - //_service.sync(); - } catch (Exception exception) - { - exception.printStackTrace(); - // TODO - // Log.logSchemaRequestNotSent(exception, - // _parent.getOwnerId(),_parent.getName(), _name); + + int sequenceNumber = SequenceNumberGenerator.getNextSequenceNumber(); + _methodInvocationListener.operationIsGoingToBeInvoked(new InvocationEvent(this,sequenceNumber,_exchangeChannelForMethodInvocations)); + _service.invoke(_parent.getName(), _name, _hash,objectId,parameters, method,sequenceNumber); + + // TODO : Shoudl be configurable? + InvocationResult result = _exchangeChannelForMethodInvocations.poll(5000,TimeUnit.MILLISECONDS); + Map<String, Object> output = method.decodeParameters(result.getOutputAndBidirectionalArgumentValues()); + result.setOutputSection(output); + + Log.logMethodInvocationResult(result); + + if (result.isException()) + { + result.createAndThrowException(); + } + return result; } finally { _service.close(); diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidFeature.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidFeature.java index 5fccb0a858..e1ca5a4d42 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidFeature.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidFeature.java @@ -60,7 +60,7 @@ abstract class QpidFeature * * @return the name of the feature. */ - String getName () + public String getName () { return _name; } diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidMethod.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidMethod.java index 4d4918e45f..d1011420ef 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidMethod.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidMethod.java @@ -20,10 +20,15 @@ */ package org.apache.qpid.management.domain.model; +import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; + +import org.apache.qpid.management.messages.AmqpCoDec; +import org.apache.qpid.transport.codec.ManagementDecoder; -import org.apache.qpid.transport.codec.ManagementEncoder; /** * Qpid method definition. @@ -31,7 +36,7 @@ import org.apache.qpid.transport.codec.ManagementEncoder; * * @author Andrea Gazzarini */ -class QpidMethod extends QpidFeature +public class QpidMethod extends QpidFeature { /** Argument list */ List<QpidArgument> arguments = new LinkedList<QpidArgument>(); @@ -82,22 +87,61 @@ class QpidMethod extends QpidFeature /** * Encodes the given parameter values according to this method arguments definitions. - * Also provide a validation of the given values according to the invariants defined for each argument. - * Note that only Input/Output and Output parameters are encoded. + * Note that only Input/Output and Input parameters are encoded. * * @param parameters the parameters values. * @param encoder the encoder used for encoding. - * @throws ValidationException when one of the given values is violating an argument invariant. */ - void encodeParameters (Object[] parameters, ManagementEncoder encoder) throws ValidationException + public void encodeParameters (Object[] parameters, AmqpCoDec encoder) { int index = 0; for (QpidArgument argument : arguments) { if (argument.getDirection() != Direction.O) { - argument.validateAndEncode(parameters[index++],encoder); + argument.encode(parameters[index++],encoder); + } + } + } + + /** + * Decodes the given input raw according to this method arguments definitions. + * Note that only Input/Output and Output parameters are encoded. + * + * @param parameters the parameters values. + * @param encoder the encoder used for encoding. + */ + public Map<String, Object> decodeParameters (byte [] values) + { + ManagementDecoder decoder = new ManagementDecoder(); + decoder.init(ByteBuffer.wrap(values)); + Map<String, Object> result = new HashMap<String, Object>(); + + for (QpidArgument argument : arguments) + { + if (argument.getDirection() != Direction.I) + { + result.put(argument.getName(),argument.decode(decoder)); } } + return result; + } + + /** + * Validates the given array of parameters against the constraint defined on this method's arguments. + * + * @param parameters the parameters (values) to be validated. + * @throws ValidationException when one of the supplied values is violating some constraint. + */ + public void validate (Object[] parameters) throws ValidationException + { + int index = 0; + for (QpidArgument argument : arguments) + { + if (argument.getDirection() != Direction.O) + { + argument.validate(parameters[index++]); + } + } } }
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidPackage.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidPackage.java index 524121afe5..1bc3d63129 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidPackage.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidPackage.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.qpid.management.domain.handler.impl.IMethodInvocationListener; import org.apache.qpid.management.domain.handler.impl.MethodOrEventDataTransferObject; import org.apache.qpid.management.domain.model.type.Binary; @@ -224,4 +225,14 @@ final class QpidPackage qpidClass.releaseResources(); } } + + /** + * Returns the method invocation listener of the corresponing parent domain model. + * + * @return the method invocation listener of the corresponing parent domain model. + */ + IMethodInvocationListener getMethodInvocationListener () + { + return _parent.getMethodInvocationListener(); + } }
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/AbsTime.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/AbsTime.java index 27bfa62e2b..dd49636a5c 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/AbsTime.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/AbsTime.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.management.domain.model.type; +import org.apache.qpid.management.messages.AmqpCoDec; import org.apache.qpid.transport.codec.ManagementDecoder; -import org.apache.qpid.transport.codec.ManagementEncoder; public class AbsTime extends Type { @@ -37,8 +37,8 @@ public class AbsTime extends Type } @Override - public void encode (Object value, ManagementEncoder encoder) + public void encode (Object value, AmqpCoDec encoder) { - encoder.writeUint64((Long)value); + encoder.pack64((Long)value); } }
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Binary.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Binary.java index d6d25534d8..3fe134ebe7 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Binary.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Binary.java @@ -24,7 +24,8 @@ import java.io.Serializable; import java.util.Arrays; import java.util.UUID; -import org.apache.qpid.transport.codec.ManagementEncoder; +import org.apache.qpid.management.messages.AmqpCoDec; + /** * It is a simple wrapper for a byte array (for example a 128bin). @@ -115,9 +116,9 @@ public final class Binary implements Serializable * * @param encoder the encoder used to encode instance content. */ - public void encode(ManagementEncoder encoder) + public void encode(AmqpCoDec encoder) { - encoder.writeBin128(bytes); + encoder.pack(bytes); } // TODO diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Boolean.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Boolean.java index 89867bd4c4..d9407fd200 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Boolean.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Boolean.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.management.domain.model.type; +import org.apache.qpid.management.messages.AmqpCoDec; import org.apache.qpid.transport.codec.ManagementDecoder; -import org.apache.qpid.transport.codec.ManagementEncoder; public class Boolean extends Type { @@ -37,8 +37,8 @@ public class Boolean extends Type } @Override - public void encode (Object value, ManagementEncoder encoder) + public void encode (Object value, AmqpCoDec encoder) { - encoder.writeUint8( ((java.lang.Boolean)value) ? (short)1 : (short)0); + encoder.pack8( ((java.lang.Boolean)value) ? 1 : 0); } }
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/DeltaTime.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/DeltaTime.java index 85724f34c8..a05bd3fe58 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/DeltaTime.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/DeltaTime.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.management.domain.model.type; +import org.apache.qpid.management.messages.AmqpCoDec; import org.apache.qpid.transport.codec.ManagementDecoder; -import org.apache.qpid.transport.codec.ManagementEncoder; public class DeltaTime extends Type { @@ -37,8 +37,8 @@ public class DeltaTime extends Type } @Override - public void encode (Object value, ManagementEncoder encoder) + public void encode (Object value, AmqpCoDec encoder) { - encoder.writeUint64((Long)value); + encoder.pack64((Long)value); } }
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Map.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Map.java index f7d6101d3f..645df0d9ac 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Map.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Map.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.management.domain.model.type; +import org.apache.qpid.management.messages.AmqpCoDec; import org.apache.qpid.transport.codec.ManagementDecoder; -import org.apache.qpid.transport.codec.ManagementEncoder; public class Map extends Type { @@ -37,8 +37,9 @@ public class Map extends Type } @Override - public void encode (Object value, ManagementEncoder encoder) + public void encode (Object value, AmqpCoDec encoder) { - encoder.writeMap((java.util.Map<String, Object>)value); + throw new RuntimeException("encode not yet supported for AMQP Map type."); + //encoder.writeMap((java.util.Map<String, Object>)value); } }
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/ObjectReference.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/ObjectReference.java index 784571e0c0..3285c3c37e 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/ObjectReference.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/ObjectReference.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.management.domain.model.type; +import org.apache.qpid.management.messages.AmqpCoDec; import org.apache.qpid.transport.codec.ManagementDecoder; -import org.apache.qpid.transport.codec.ManagementEncoder; public class ObjectReference extends Type { @@ -37,7 +37,7 @@ public class ObjectReference extends Type } @Override - public void encode (Object value, ManagementEncoder encoder) + public void encode (Object value, AmqpCoDec encoder) { ((Binary)value).encode(encoder); } diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Str16.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Str16.java index 7ab1c667f0..0bffaba9fe 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Str16.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Str16.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.management.domain.model.type; +import org.apache.qpid.management.messages.AmqpCoDec; import org.apache.qpid.transport.codec.ManagementDecoder; -import org.apache.qpid.transport.codec.ManagementEncoder; public class Str16 extends Type { @@ -37,8 +37,8 @@ public class Str16 extends Type } @Override - public void encode (Object value, ManagementEncoder encoder) + public void encode (Object value, AmqpCoDec encoder) { - encoder.writeStr16((String)value); + encoder.packStr16((String)value); } }
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Str8.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Str8.java index f8f48bc8ec..730d5b4c78 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Str8.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Str8.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.management.domain.model.type; +import org.apache.qpid.management.messages.AmqpCoDec; import org.apache.qpid.transport.codec.ManagementDecoder; -import org.apache.qpid.transport.codec.ManagementEncoder; public class Str8 extends Type { @@ -37,8 +37,8 @@ public class Str8 extends Type } @Override - public void encode (Object value, ManagementEncoder encoder) + public void encode (Object value, AmqpCoDec encoder) { - encoder.writeStr8((String)value); + encoder.packStr8((String)value); } }
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Type.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Type.java index 786f8d9957..2242b986e0 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Type.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Type.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.management.domain.model.type; +import org.apache.qpid.management.messages.AmqpCoDec; import org.apache.qpid.transport.codec.ManagementDecoder; -import org.apache.qpid.transport.codec.ManagementEncoder; /** * Layer supertype for all management "types". @@ -91,5 +91,5 @@ public abstract class Type return getClass().hashCode(); } - public abstract void encode (Object value, ManagementEncoder encoder); + public abstract void encode (Object value, AmqpCoDec encoder); }
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Uint16.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Uint16.java index 31aeab954d..045924157d 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Uint16.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Uint16.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.management.domain.model.type; +import org.apache.qpid.management.messages.AmqpCoDec; import org.apache.qpid.transport.codec.ManagementDecoder; -import org.apache.qpid.transport.codec.ManagementEncoder; public class Uint16 extends Type { @@ -37,8 +37,8 @@ public class Uint16 extends Type } @Override - public void encode (Object value, ManagementEncoder encoder) + public void encode (Object value, AmqpCoDec encoder) { - encoder.writeUint16((Integer)value); + encoder.pack16((Integer)value); } }
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Uint32.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Uint32.java index deb9772953..fa62e32d5e 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Uint32.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Uint32.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.management.domain.model.type; +import org.apache.qpid.management.messages.AmqpCoDec; import org.apache.qpid.transport.codec.ManagementDecoder; -import org.apache.qpid.transport.codec.ManagementEncoder; public class Uint32 extends Type { @@ -37,8 +37,8 @@ public class Uint32 extends Type } @Override - public void encode (Object value, ManagementEncoder encoder) + public void encode (Object value, AmqpCoDec encoder) { - encoder.writeUint32((Integer)value); + encoder.pack32((Integer)value); } }
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Uint64.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Uint64.java index 9d414cf225..bd35500400 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Uint64.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Uint64.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.management.domain.model.type; +import org.apache.qpid.management.messages.AmqpCoDec; import org.apache.qpid.transport.codec.ManagementDecoder; -import org.apache.qpid.transport.codec.ManagementEncoder; public class Uint64 extends Type { @@ -37,8 +37,8 @@ public class Uint64 extends Type } @Override - public void encode (Object value, ManagementEncoder encoder) + public void encode (Object value, AmqpCoDec encoder) { - encoder.writeUint64((Integer)value); + encoder.pack64((Long)value); } }
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Uint8.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Uint8.java index 681537c48f..c71648ae85 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Uint8.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Uint8.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.management.domain.model.type; +import org.apache.qpid.management.messages.AmqpCoDec; import org.apache.qpid.transport.codec.ManagementDecoder; -import org.apache.qpid.transport.codec.ManagementEncoder; public class Uint8 extends Type { @@ -37,8 +37,8 @@ public class Uint8 extends Type } @Override - public void encode (Object value, ManagementEncoder encoder) + public void encode (Object value, AmqpCoDec encoder) { - encoder.writeUint8((Short)value); + encoder.pack8((Short)value); } }
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Uuid.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Uuid.java index 74e0337173..3b22449ea4 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Uuid.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Uuid.java @@ -22,8 +22,8 @@ package org.apache.qpid.management.domain.model.type; import java.util.UUID; +import org.apache.qpid.management.messages.AmqpCoDec; import org.apache.qpid.transport.codec.ManagementDecoder; -import org.apache.qpid.transport.codec.ManagementEncoder; public class Uuid extends Type { @@ -39,8 +39,9 @@ public class Uuid extends Type } @Override - public void encode (Object value, ManagementEncoder encoder) + public void encode (Object value, AmqpCoDec encoder) { - encoder.writeUuid((UUID)value); + throw new RuntimeException("Not yet implemented encode for UUID type."); +// encoder.writeUuid((UUID)value); } }
\ No newline at end of file diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/ManagementClient.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/ManagementClient.java index b2f1b19a6c..d4535596e9 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/ManagementClient.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/ManagementClient.java @@ -129,7 +129,6 @@ final class ManagementClient BrokerMessageListener methodReplyChannelListener = new BrokerMessageListener(_domainModel); methodReplyChannelListener.setHandlers(Configuration.getInstance().getMethodReplyQueueHandlers()); _service.createSubscription(_methodReplyQueueName, _methodReplyQueueName, methodReplyChannelListener); - LOGGER.info( "<QMAN-000039> : Method-reply queue consumer has been successfully installed and bound on broker %s.", _domainModel.getBrokerId()); diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QMan.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QMan.java index 00dbec261d..e2808213f1 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QMan.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QMan.java @@ -57,9 +57,15 @@ public class QMan public void run () { LOGGER.info("<QMAN-000006> : Shutting down Q-Man..."); - for (ManagementClient client : managementClients) - { - client.shutdown(); + try + { + for (ManagementClient client : managementClients) + { + client.shutdown(); + } + } catch(Exception exception) + { + } LOGGER.info("<QMAN-000007> : Q-Man shut down."); } @@ -85,7 +91,7 @@ public class QMan LOGGER.info("<QMAN-000004> : Management client for broker %s successfully connected.",brokerId); } catch(StartupFailureException exception) { - LOGGER.error(exception, "<QMAN-100001>: Cannot connect to broker %s on %s:%s",brokerId,data.getHost(),data.getPort()); + LOGGER.error(exception, "<QMAN-100001>: Cannot connect to broker %s on %s",brokerId,data); } } LOGGER.info("<QMAN-000004> : Q-Man open for e-business."); @@ -96,7 +102,9 @@ public class QMan while ( !"q".equals(reader.readLine()) ){ } } catch(ConfigurationException exception) { - LOGGER.error(exception, "<QMAN-100002> : Q-Man was unable to startup correctly : a configuration error occurred."); + LOGGER.error( + exception, + "<QMAN-100002> : Q-Man was unable to startup correctly : a configuration error occurred."); System.exit(1); } } diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java index 4bda450315..fa2d1c4021 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java @@ -20,131 +20,70 @@ */ package org.apache.qpid.management.domain.services; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Map; +import java.io.IOException; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import org.apache.qpid.QpidException; -import org.apache.qpid.management.Constants; +import org.apache.qpid.api.Message; import org.apache.qpid.management.Names; -import org.apache.qpid.management.configuration.Configuration; import org.apache.qpid.management.configuration.QpidDatasource; +import org.apache.qpid.management.domain.model.QpidMethod; +import org.apache.qpid.management.domain.model.type.Binary; +import org.apache.qpid.management.messages.MethodInvocationRequestMessage; +import org.apache.qpid.management.messages.SchemaRequestMessage; +import org.apache.qpid.nclient.Connection; +import org.apache.qpid.nclient.Session; import org.apache.qpid.nclient.util.MessageListener; import org.apache.qpid.nclient.util.MessagePartListenerAdapter; -import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionException; -import org.apache.qpid.transport.SessionListener; import org.apache.qpid.transport.util.Logger; /** * Qpid Broker facade. - * + * * @author Andrea Gazzarini */ -public class QpidService implements SessionListener +public class QpidService { private final static Logger LOGGER = Logger.get(QpidService.class); - - // Inner static class used for logging and avoid conditional logic (isDebugEnabled()) duplication. - private static class Log - { - /** - * Logs the content f the message. - * This will be written on log only if DEBUG level is enabled. - * - * @param messageContent the raw content of the message. - */ - static void logMessageContent(byte [] messageContent) - { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "<QMAN-200001> : Message has been sent to management exchange. Message content : %s", - Arrays.toString(messageContent)); - } - } - - /** - * Logs the content f the message. - * This will be written on log only if DEBUG level is enabled. - * - * @param messageContent the raw content of the message. - */ - static void logMessageContent(ByteBuffer messageContent) - { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "<QMAN-200002> : Message has been sent to management exchange."); - } - } - } - + private UUID _brokerId; private Connection _connection; private Session _session; - private Map<String,MessagePartListenerAdapter> _listeners; - + /** * Builds a new service with the given connection data. - * + * * @param connectionData the connection data of the broker. */ - public QpidService(UUID brokerId) + public QpidService(UUID brokerId) { this._brokerId = brokerId; } - + /** * Estabilishes a connection with the broker. - * + * * @throws QpidException in case of connection failure. */ public void connect() throws Exception { _connection = QpidDatasource.getInstance().getConnection(_brokerId); - _listeners = new ConcurrentHashMap<String,MessagePartListenerAdapter>(); - _session = _connection.createSession(Constants.NO_EXPIRATION); - _session.setSessionListener(this); - } - - public void opened(Session ssn) {} - - public void message(Session ssn, MessageTransfer xfr) - { - MessagePartListenerAdapter l = _listeners.get(xfr.getDestination()); - if (l == null) - { - LOGGER.error("unhandled message: %s", xfr); - } - else - { - l.messageTransfer(xfr); - } + _session = _connection.createSession(0); } - - public void exception(Session ssn, SessionException exc) - { - LOGGER.error(exc, "session %s exception", ssn); - } - - public void closed(Session ssn) {} - + /** - * All the previously entered outstanding commands are asynchronous. + * All the previously entered outstanding commands are asynchronous. * Synchronous behavior is achieved through invoking this method. */ - public void sync() + public void sync() { _session.sync(); } - + /** * Closes communication with broker. */ @@ -152,52 +91,51 @@ public class QpidService implements SessionListener { try { + _session.close(); _session = null; - _listeners = null; } catch (Exception e) { } try { - _connection.close(); + _connection.close(); _connection = null; } catch (Exception e) { } } - + /** * Associate a message listener with a destination therefore creating a new subscription. - * + * * @param queueName The name of the queue that the subscriber is receiving messages from. * @param destinationName the name of the destination, or delivery tag, for the subscriber. - * @param listener the listener for this destination. - * + * @param listener the listener for this destination. + * * @see Session#messageSubscribe(String, String, short, short, org.apache.qpid.nclient.MessagePartListener, java.util.Map, org.apache.qpid.transport.Option...) */ - public void createSubscription(String queueName, String destinationName, MessageListener listener) + public void createSubscription(String queueName, String destinationName,MessageListener listener) { - _listeners.put(destinationName, new MessagePartListenerAdapter(listener)); - _session.messageSubscribe - (queueName, - destinationName, - MessageAcceptMode.NONE, - MessageAcquireMode.PRE_ACQUIRED, - null, 0, null); - - _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT); - _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT); - + _session.messageSubscribe( + queueName, + destinationName, + Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, + Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, + new MessagePartListenerAdapter(listener), null); + + _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES); + _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Integer.MAX_VALUE); + LOGGER.debug( - "<QMAN-200003> : New subscription between queue %s and destination %s has been declared.", + "<QMAN-200003> : New subscription between queue %s and destination %s has been declared.", queueName, destinationName); } - + /** * Removes a previously declared consumer from the broker. - * + * * @param destinationName the name of the destination, or delivery tag, for the subscriber. * @see Session#messageCancel(String, Option...) */ @@ -205,10 +143,10 @@ public class QpidService implements SessionListener { _session.messageCancel(destinationName); LOGGER.debug( - "<QMAN-200026> : Subscription named %s has been removed from remote broker.", + "<QMAN-200026> : Subscription named %s has been removed from remote broker.", destinationName); - } - + } + /** * Declares a queue on the broker with the given name. * @@ -232,27 +170,27 @@ public class QpidService implements SessionListener _session.queueDelete(queueName); LOGGER.debug("<QMAN-2000025> : Queue with name %s has been removed.",queueName); } - + /** * Binds (on the broker) a queue with an exchange. * - * @param queueName the name of the queue to bind. + * @param queueName the name of the queue to bind. * @param exchangeName the exchange name. - * @param routingKey the routing key used for the binding. + * @param routingKey the routing key used for the binding. * @see Session#exchangeBind(String, String, String, java.util.Map, Option...) */ public void declareBinding(String queueName, String exchangeName, String routingKey) { _session.exchangeBind(queueName, exchangeName, routingKey, null); LOGGER.debug( - "<QMAN-200005> : New binding with %s as routing key has been declared between queue %s and exchange %s.", + "<QMAN-200005> : New binding with %s as routing key has been declared between queue %s and exchange %s.", routingKey,queueName, exchangeName); } - + /** * Removes a previously declare binding between an exchange and a queue. - * + * * @param queueName the name of the queue. * @param exchangeName the name of the exchange. * @param routingKey the routing key used for binding. @@ -261,42 +199,141 @@ public class QpidService implements SessionListener { _session.exchangeUnbind(queueName, exchangeName, routingKey); LOGGER.debug( - "<QMAN-200005> : Binding with %s as routing key has been removed between queue %s and exchange %s.", + "<QMAN-200005> : Binding with %s as routing key has been removed between queue %s and exchange %s.", routingKey,queueName, exchangeName); } - + /** - * Sends a command message with the given data on the management queue. - * - * @param messageData the command message content. + * Sends a command message. + * + * @param message the command message. + * @throws IOException when the message cannot be sent. */ - public void sendCommandMessage(byte [] messageData) + public void sendMessage(Message message) throws IOException { _session.messageTransfer( Names.MANAGEMENT_EXCHANGE, - MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED, - Configuration.getInstance().getCommandMessageHeader(), - messageData); + message, + MessageAcceptMode.EXPLICIT.getValue(), + MessageAcquireMode.PRE_ACQUIRED.getValue()); + } + + /** + * Requests a schema for the given package.class.hash. + * + * @param packageName the package name. + * @param className the class name. + * @param schemaHash the schema hash. + * @throws IOException when the schema request cannot be sent. + */ + public void requestSchema(final String packageName, final String className, final Binary schemaHash) throws IOException + { + Message message = new SchemaRequestMessage() + { + @Override + protected String className () + { + return className; + } - Log.logMessageContent (messageData); - } + @Override + protected String packageName () + { + return packageName; + } + @Override + protected Binary schemaHash () + { + return schemaHash; + } + }; + + sendMessage(message); + } + /** - * Sends a command message with the given data on the management queue. - * - * @param messageData the command message content. + * Invokes an operation on a broker object instance. + * + * @param packageName the package name. + * @param className the class name. + * @param schemaHash the schema hash of the corresponding class. + * @param objectId the object instance identifier. + * @param parameters the parameters for this invocation. + * @param method the method (definition) invoked. + * @return the sequence number used for this message. + * @throws MethodInvocationException when the invoked method returns an error code. + * @throws UnableToComplyException when it wasn't possibile to invoke the requested operation. */ - public void sendCommandMessage(ByteBuffer messageData) + public void invoke( + final String packageName, + final String className, + final Binary schemaHash, + final Binary objectId, + final Object[] parameters, + final QpidMethod method, + final int sequenceNumber) throws MethodInvocationException, UnableToComplyException { - _session.messageTransfer( - Names.MANAGEMENT_EXCHANGE, - MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED, - Configuration.getInstance().getCommandMessageHeader(), - messageData); + Message message = new MethodInvocationRequestMessage() + { + + @Override + protected int sequenceNumber () + { + return sequenceNumber; + } + + protected Binary objectId() { + return objectId; + } + + protected String packageName() + { + return packageName; + } + + protected String className() + { + return className; + } + + @Override + protected QpidMethod method () + { + return method; + } - Log.logMessageContent (messageData); - } + @Override + protected Object[] parameters () + { + return parameters; + } + + @Override + protected Binary schemaHash () + { + return schemaHash; + } + }; + + try { + sendMessage(message); + sync(); +// ReturnValueObject invocationResult = Configuration.getInstance()._resultExchangeChannel.poll(2000,TimeUnit.MILLISECONDS); +// if (invocationResult == null) { +// return null; +// } +// if (invocationResult.isException()) +// { +// invocationResult.createAndThrowException(); +// } +// return invocationResult; +// } catch(MethodInvocationException exception) +// { +// throw exception; + } catch(Exception exception) { + throw new UnableToComplyException(exception); + } + } }
\ No newline at end of file diff --git a/java/management/client/src/test/java/org/apache/qpid/management/configuration/ConfiguratorTest.java b/java/management/client/src/test/java/org/apache/qpid/management/configuration/ConfiguratorTest.java index c3fa0c13f3..639fe224ec 100644 --- a/java/management/client/src/test/java/org/apache/qpid/management/configuration/ConfiguratorTest.java +++ b/java/management/client/src/test/java/org/apache/qpid/management/configuration/ConfiguratorTest.java @@ -86,12 +86,6 @@ public class ConfiguratorTest extends TestCase configurator.startElement(null, null, Tag.BROKERS.toString(), null); assertSame(configurator._brokerConfigurationParser,configurator._currentParser); - - configurator.startElement(null, null, Tag.MANAGEMENT_QUEUE.toString(), null); - assertSame(configurator._managementQueueHandlerParser,configurator._currentParser); - - configurator.startElement(null, null, Tag.METHOD_REPLY_QUEUE.toString(), null); - assertSame(configurator._methodReplyQueueHandlerParser,configurator._currentParser); } /** * Create a stub configurator which returns the given datafile path. diff --git a/java/management/client/src/test/java/org/apache/qpid/management/configuration/MappingParsersTest.java b/java/management/client/src/test/java/org/apache/qpid/management/configuration/MappingParsersTest.java index 9755c62991..4c8885dbc8 100644 --- a/java/management/client/src/test/java/org/apache/qpid/management/configuration/MappingParsersTest.java +++ b/java/management/client/src/test/java/org/apache/qpid/management/configuration/MappingParsersTest.java @@ -20,16 +20,11 @@ */ package org.apache.qpid.management.configuration; -import java.util.Map; import java.util.UUID; import junit.framework.TestCase; import org.apache.qpid.management.TestConstants; -import org.apache.qpid.management.domain.handler.base.IMessageHandler; -import org.apache.qpid.management.domain.handler.impl.ConfigurationMessageHandler; -import org.apache.qpid.management.domain.handler.impl.InstrumentationMessageHandler; -import org.apache.qpid.management.domain.handler.impl.SchemaResponseMessageHandler; import org.apache.qpid.management.domain.model.AccessMode; import org.apache.qpid.management.domain.model.type.Type; import org.apache.qpid.management.domain.model.type.Uint8; @@ -103,73 +98,6 @@ public class MappingParsersTest extends TestCase assertEquals(username,result.getUsername()); assertEquals(password,result.getPassword()); } - - /** - * Tests the execution of the management queue handler mapping parser. - * - * <br>precondition: Two managenent queue handlers mapping are built by the parser; - * <br>postcondition: the corresponding management handlers are available on the configuration. - */ - public void testManagementQueueMessageListenerParser() - { - String instrOpcode = "i"; - String configOpCode = "c"; - - ManagementQueueMessageListenerParser parser = new ManagementQueueMessageListenerParser(); - - parser.setCurrrentAttributeValue(instrOpcode); - parser.setCurrentAttributeName(Tag.OPCODE.toString()); - parser.setCurrrentAttributeValue(InstrumentationMessageHandler.class.getName()); - parser.setCurrentAttributeName(Tag.CLASS_NAME.toString()); - parser.setCurrentAttributeName(Tag.HANDLER.toString()); - - parser.setCurrrentAttributeValue(configOpCode); - parser.setCurrentAttributeName(Tag.OPCODE.toString()); - parser.setCurrrentAttributeValue(ConfigurationMessageHandler.class.getName()); - parser.setCurrentAttributeName(Tag.CLASS_NAME.toString()); - parser.setCurrentAttributeName(Tag.HANDLER.toString()); - - - Map<Character,IMessageHandler> result = Configuration.getInstance().getManagementQueueHandlers(); - - assertEquals(2,result.size()); - - assertEquals(InstrumentationMessageHandler.class,result.get(instrOpcode.charAt(0)).getClass()); - assertEquals(ConfigurationMessageHandler.class,result.get(configOpCode.charAt(0)).getClass()); - } - - /** - * Tests the execution of the method-reply queue handler mapping parser. - * - * <br>precondition: two method-reply queue handler mappings are built by the parser; - * <br>postcondition: the corresponding method-reply handlers are available on the configuration. - */ - public void testMethodReplyQueueMessageListenerParser() - { - String schemaOpcode = "s"; - String configOpCode = "c"; - - MethodReplyQueueMessageListenerParser parser = new MethodReplyQueueMessageListenerParser(); - - parser.setCurrrentAttributeValue(schemaOpcode); - parser.setCurrentAttributeName(Tag.OPCODE.toString()); - parser.setCurrrentAttributeValue(SchemaResponseMessageHandler.class.getName()); - parser.setCurrentAttributeName(Tag.CLASS_NAME.toString()); - parser.setCurrentAttributeName(Tag.HANDLER.toString()); - - parser.setCurrrentAttributeValue(configOpCode); - parser.setCurrentAttributeName(Tag.OPCODE.toString()); - parser.setCurrrentAttributeValue(ConfigurationMessageHandler.class.getName()); - parser.setCurrentAttributeName(Tag.CLASS_NAME.toString()); - parser.setCurrentAttributeName(Tag.HANDLER.toString()); - - Map<Character,IMessageHandler> result = Configuration.getInstance().getMethodReplyQueueHandlers(); - - assertEquals(2,result.size()); - - assertEquals(SchemaResponseMessageHandler.class,result.get(schemaOpcode.charAt(0)).getClass()); - assertEquals(ConfigurationMessageHandler.class,result.get(configOpCode.charAt(0)).getClass()); - } /** * Tests the execution of the type mapping parser. diff --git a/java/management/client/src/test/java/org/apache/qpid/management/configuration/StubConfigurator.java b/java/management/client/src/test/java/org/apache/qpid/management/configuration/StubConfigurator.java index 6d92e3b6f8..426a1a29f1 100644 --- a/java/management/client/src/test/java/org/apache/qpid/management/configuration/StubConfigurator.java +++ b/java/management/client/src/test/java/org/apache/qpid/management/configuration/StubConfigurator.java @@ -46,6 +46,15 @@ public class StubConfigurator extends Configurator Configuration.getInstance().addTypeMapping(mapping); } + public void addTypeMapping(String code,String clazzName,String validatorClassName) + { + TypeMapping mapping = new TypeMapping(); + mapping.setCode(code); + mapping.setType(clazzName); + mapping.setValidatorClassName(validatorClassName); + Configuration.getInstance().addTypeMapping(mapping); + } + public void addAccessModeMapping(String code, String value) { AccessModeMapping mapping = new AccessModeMapping(); diff --git a/java/management/client/src/test/java/org/apache/qpid/management/domain/handler/base/ContentIndicationMessageHandlerTest.java b/java/management/client/src/test/java/org/apache/qpid/management/domain/handler/base/ContentIndicationMessageHandlerTest.java index 6be4484f5c..d6b51b64fc 100644 --- a/java/management/client/src/test/java/org/apache/qpid/management/domain/handler/base/ContentIndicationMessageHandlerTest.java +++ b/java/management/client/src/test/java/org/apache/qpid/management/domain/handler/base/ContentIndicationMessageHandlerTest.java @@ -20,14 +20,9 @@ */ package org.apache.qpid.management.domain.handler.base; -import java.nio.ByteBuffer; +import junit.framework.TestCase; -import org.apache.qpid.management.TestConstants; import org.apache.qpid.management.domain.model.type.Binary; -import org.apache.qpid.transport.codec.ManagementDecoder; -import org.apache.qpid.transport.codec.ManagementEncoder; - -import junit.framework.TestCase; /** * Test case for Content indication message handler (base class). @@ -37,62 +32,6 @@ import junit.framework.TestCase; public class ContentIndicationMessageHandlerTest extends TestCase { /** - * Tests the execution of the process method when the message is processed correctly. - */ - public void testProcessOk() { - final String expectedPackageName = "org.apache.qpid.broker"; - final String expectedClassName ="connection"; - final long expectedMessageTimestamp = System.currentTimeMillis(); - final long expectedCreationTime = expectedMessageTimestamp - 1000; - final long expectedDeletionTime = 0; - final Binary expectedClassHash = new Binary(new byte[]{9,9,9,9,8,8,8,8,7,7,7,7,6,6,6,6}); - final Binary expectedObjectId = new Binary(new byte[]{1,2,3,4,5,6,7,8,9,0,11,12,13,14,15,16}); - final Binary expectedBody = new Binary(new byte[]{1,1,1,1,2,2,2,2,3,3,3,3,4,4,4,4}); - - ContentIndicationMessageHandler mockHandler = new ContentIndicationMessageHandler() - { - @Override - protected void updateDomainModel (String packageName, String className, Binary classHash, Binary objectId, - long timeStampOfCurrentSample, long timeObjectWasCreated, long timeObjectWasDeleted, byte[] contentData) - { - assertEquals(expectedPackageName,packageName); - assertEquals(expectedClassName,className); - assertEquals(expectedClassHash,classHash); - assertEquals(expectedMessageTimestamp,timeStampOfCurrentSample); - assertEquals(expectedCreationTime,timeObjectWasCreated); - assertEquals(expectedDeletionTime,timeObjectWasDeleted); - assertEquals(expectedObjectId,objectId); - assertEquals(expectedBody,new Binary(contentData)); - } - - @Override - void removeObjectInstance (String packageName, String className, Binary classHash, Binary objectId) - { - fail("The object shouldn't be deleted because deletion time was set to 0!"); - } - }; - mockHandler.setDomainModel(TestConstants.DOMAIN_MODEL); - - ByteBuffer buffer = ByteBuffer.allocate(1000); - ManagementEncoder encoder = new ManagementEncoder(buffer); - - encoder.writeStr8(expectedPackageName); - encoder.writeStr8(expectedClassName); - expectedClassHash.encode(encoder); - encoder.writeDatetime(expectedMessageTimestamp); - encoder.writeDatetime(expectedCreationTime); - encoder.writeDatetime(expectedDeletionTime); - expectedObjectId.encode(encoder); - expectedBody.encode(encoder); - - buffer.flip(); - ManagementDecoder decoder = new ManagementDecoder(); - decoder.init(buffer); - - mockHandler.process(decoder, 1); - } - - /** * Tests the behaviour of the objectHasBeenRemoved method(). */ public void testObjectHasBeenRemoved() diff --git a/java/management/client/src/test/java/org/apache/qpid/management/domain/model/DomainModelTest.java b/java/management/client/src/test/java/org/apache/qpid/management/domain/model/DomainModelTest.java index 185302c182..578fa36bc7 100644 --- a/java/management/client/src/test/java/org/apache/qpid/management/domain/model/DomainModelTest.java +++ b/java/management/client/src/test/java/org/apache/qpid/management/domain/model/DomainModelTest.java @@ -36,7 +36,6 @@ public class DomainModelTest extends BaseDomainModelTestCase @Override protected void setUp () throws Exception { - super.setUp(); _model = new DomainModel(UUID.randomUUID()); } diff --git a/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidMethodBuilderTest.java b/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidMethodBuilderTest.java index 6032721d1b..53320e5c28 100644 --- a/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidMethodBuilderTest.java +++ b/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidMethodBuilderTest.java @@ -25,7 +25,6 @@ import static org.apache.qpid.management.domain.model.QpidFeatureBuilder.Attribu import static org.apache.qpid.management.domain.model.QpidFeatureBuilder.Attribute.type; import static org.apache.qpid.management.domain.model.QpidFeatureBuilder.Attribute.unit; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -36,7 +35,6 @@ import javax.management.MBeanOperationInfo; import org.apache.qpid.management.Names; import org.apache.qpid.management.domain.handler.impl.MethodOrEventDataTransferObject; import org.apache.qpid.management.domain.model.QpidFeatureBuilder.Attribute; -import org.apache.qpid.transport.codec.ManagementEncoder; /** * Test case for Qpid Statistic builder. @@ -146,21 +144,4 @@ public class QpidMethodBuilderTest extends BaseQpidFeatureBuilderTestCase assertEquals(method.getDescription(),info.getDescription()); assertEquals(method.getName(),info.getName()); } - - public void testEncodeParameters() throws ValidationException, UnableToBuildFeatureException { - _builder.build(); - - Object [] parameters = new Object[]{new Integer(1), new Integer(2),new Integer(3)}; - - ManagementEncoder encoder = new ManagementEncoder(ByteBuffer.allocate(1)){ - @Override - public void writeUint16 (int s) - { - assertTrue(s == 1 || s == 2); - } - }; - - QpidMethod method = (QpidMethod) _builder.getQpidFeature(); - method.encodeParameters(parameters, encoder); - } }
\ No newline at end of file diff --git a/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidNumberPropertyTest.java b/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidNumberPropertyTest.java index 2611923f71..55a9403bc8 100644 --- a/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidNumberPropertyTest.java +++ b/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidNumberPropertyTest.java @@ -20,9 +20,12 @@ */ package org.apache.qpid.management.domain.model; +import junit.framework.TestCase; + +import org.apache.qpid.management.configuration.StubConfigurator; import org.apache.qpid.management.domain.model.type.Uint64; -public class QpidNumberPropertyTest extends BaseDomainModelTestCase +public class QpidNumberPropertyTest extends TestCase { private QpidProperty _property; private Long _value = 55432L; @@ -30,7 +33,8 @@ public class QpidNumberPropertyTest extends BaseDomainModelTestCase @Override protected void setUp () throws Exception { - super.setUp(); + StubConfigurator configurator = new StubConfigurator(); + configurator.addTypeMapping("1", Uint64.class.getName(),QpidProperty.NumberValidator.class.getName()); _property = new QpidProperty(); _property.setName("average"); _property.setAccessMode(AccessMode.RW); diff --git a/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidPackageTest.java b/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidPackageTest.java index 530b526bec..b7eb9055ba 100644 --- a/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidPackageTest.java +++ b/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidPackageTest.java @@ -34,7 +34,6 @@ public class QpidPackageTest extends BaseDomainModelTestCase @Override protected void setUp () throws Exception { - super.setUp(); _qpidPackage = new QpidPackage(TestConstants.QPID_PACKAGE_NAME, TestConstants.DOMAIN_MODEL); } diff --git a/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidStringPropertyTest.java b/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidStringPropertyTest.java index 263e4209a6..534a019503 100644 --- a/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidStringPropertyTest.java +++ b/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidStringPropertyTest.java @@ -20,9 +20,12 @@ */ package org.apache.qpid.management.domain.model; +import junit.framework.TestCase; + +import org.apache.qpid.management.configuration.StubConfigurator; import org.apache.qpid.management.domain.model.type.Str16; -public class QpidStringPropertyTest extends BaseDomainModelTestCase +public class QpidStringPropertyTest extends TestCase { private QpidProperty _property; private final String _5LettersString = "12345"; @@ -30,7 +33,8 @@ public class QpidStringPropertyTest extends BaseDomainModelTestCase @Override protected void setUp () throws Exception { - super.setUp(); + StubConfigurator configurator = new StubConfigurator(); + configurator.addTypeMapping("1", Str16.class.getName(),QpidProperty.StringValidator.class.getName()); _property = new QpidProperty(); _property.setName("name"); _property.setAccessMode(AccessMode.RW); |
