From f5997e515738fabd366a90d4352bfe7d9d60abb6 Mon Sep 17 00:00:00 2001 From: Arnaud Simon Date: Fri, 24 Oct 2008 14:09:26 +0000 Subject: qpid-1378: applied qman_24102008_compound_message_handling.patch that adds compound message handling improvements. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@707640 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/management/Messages.java | 94 +++++++++++++++ .../java/org/apache/qpid/management/Names.java | 1 + .../java/org/apache/qpid/management/Protocol.java | 2 - .../configuration/BrokerConnectionDataParser.java | 3 +- .../management/configuration/Configuration.java | 28 ++--- .../management/configuration/QpidDatasource.java | 15 ++- .../handler/impl/MethodResponseMessageHandler.java | 9 +- .../handler/impl/SchemaResponseMessageHandler.java | 40 +++++-- .../qpid/management/domain/model/JmxService.java | 10 +- .../qpid/management/domain/model/QpidArgument.java | 5 +- .../management/domain/model/QpidAttribute.java | 9 +- .../qpid/management/domain/model/QpidClass.java | 35 ++++-- .../qpid/management/domain/model/QpidEntity.java | 3 +- .../qpid/management/domain/model/QpidEvent.java | 20 ++-- .../domain/model/QpidFeatureBuilder.java | 5 +- .../qpid/management/domain/model/QpidProperty.java | 7 +- .../qpid/management/domain/model/type/Binary.java | 40 +++++-- .../domain/services/BrokerMessageListener.java | 113 +++++++++-------- .../domain/services/ManagementClient.java | 50 ++------ .../domain/services/MessageTokenizer.java | 133 +++++++++++++++++++++ .../qpid/management/domain/services/QMan.java | 71 ++++++----- .../management/domain/services/QpidService.java | 99 +++------------ .../domain/services/SequenceNumberGenerator.java | 7 +- .../apache/qpid/management/messages/AmqpCoDec.java | 21 ++++ .../messages/MethodInvocationRequestMessage.java | 110 ++++++++++++++++- .../management/messages/SchemaRequestMessage.java | 23 +++- .../management/domain/model/QpidClassTest.java | 8 +- .../domain/services/BrokerMessageListenerTest.java | 71 +++++++++++ .../domain/services/MessageTokenizerTest.java | 120 +++++++++++++++++++ 29 files changed, 840 insertions(+), 312 deletions(-) create mode 100644 java/management/client/src/main/java/org/apache/qpid/management/Messages.java create mode 100644 java/management/client/src/main/java/org/apache/qpid/management/domain/services/MessageTokenizer.java create mode 100644 java/management/client/src/test/java/org/apache/qpid/management/domain/services/MessageTokenizerTest.java (limited to 'java') diff --git a/java/management/client/src/main/java/org/apache/qpid/management/Messages.java b/java/management/client/src/main/java/org/apache/qpid/management/Messages.java new file mode 100644 index 0000000000..1448ab379e --- /dev/null +++ b/java/management/client/src/main/java/org/apache/qpid/management/Messages.java @@ -0,0 +1,94 @@ +package org.apache.qpid.management; + +/** + * Enumerative interfaces containing all QMan messages. + * + * @author Andrea Gazzarini + */ +public interface Messages +{ + // INFO + String QMAN_000001_STARTING_QMAN = " : Starting Q-Man..."; + String QMAN_000002_READING_CONFIGURATION = " : Reading Q-Man configuration..."; + String QMAN_000003_CREATING_MANAGEMENT_CLIENTS = " : Creating management client(s)..."; + String QMAN_000004_MANAGEMENT_CLIENT_CONNECTED = " : Management client for broker %s successfully connected."; + String QMAN_000005_TYPE_MAPPING_CONFIGURED = " : Type mapping : code = %s associated to %s (validator class is %s)"; + String QMAN_000006_ACCESS_MODE_MAPPING_CONFIGURED = " : Access Mode mapping : code = %s associated to %s"; + String QMAN_000007_MANAGEMENT_HANDLER_MAPPING_CONFIGURED = " : Management Queue Message Handler Mapping : opcode = %s associated with %s"; + String QMAN_000008_METHOD_REPLY_HANDLER_MAPPING_CONFIGURED = " : Method-Reply Queue Message Handler Mapping : opcode = %s associated with %s"; + String QMAN_000009_BROKER_DATA_CONFIGURED = " : Broker configuration %s: %s"; + String QMAN_000010_INCOMING_SCHEMA = " : Incoming schema for %s::%s.%s"; + String QMAN_000011_SHUTDOWN_INITIATED = " : The shutdown sequence has been initiated for management client connected with broker %s"; + String QMAN_000012_MANAGEMENT_CLIENT_SHUT_DOWN = " : Management client connected with broker %s shut down successfully."; + String QMAN_000013_METHOD_REPLY_CONSUMER_INSTALLED = " : Method-reply queue consumer has been successfully installed and bound on broker %s."; + String QMAN_000014_MANAGEMENT_CONSUMER_INSTALLED =" : Management queue consumer has been successfully installed and bound on broker %s."; + String QMAN_000015_MANAGEMENT_QUEUE_DECLARED = " : Management queue with name %s has been successfully declared and bound on broker %s."; + String QMAN_000016_METHOD_REPLY_QUEUE_DECLARED = " : Method-reply queue with name %s has been successfully declared and bound on broker %s."; + String QMAN_000017_CONSUMER_HAS_BEEN_REMOVED = " : Consumer %s has been removed from broker %s."; + String QMAN_000018_QUEUE_UNDECLARED = " : Queue %s has been removed from broker %s."; + String QMAN_000019_QMAN_STARTED = " : Q-Man open for e-business."; + String QMAN_000020_SHUTTING_DOWN_QMAN = " : Shutting down Q-Man..."; + String QMAN_000021_SHUT_DOWN = " : Q-Man shut down."; + + // DEBUG + String QMAN_200001_INCOMING_MESSAGE_HAS_BEEN_RECEIVED = " : New incoming message has been received. Message content is %s"; + String QMAN_200002_OPCODE_HANDLER_ASSOCIATION = " : \"%s\" opcode is associated to handler %s"; + String QMAN_200003_MESSAGE_FORWARDING = " : Incoming message with \"%s\" as opcode will be forwarded to %s for processing."; + String QMAN_200004_MANAGEMENT_QUEUE_NAME = " : Management queue name : %s"; + String QMAN_200005_METHOD_REPLY_QUEUE_NAME = " : Method-reply queue name : %s"; + String QMAN_200006_QPID_CONNECTION_RELEASED = " : Connection %s returned to the pool."; + String QMAN_200007_TEST_CONNECTION_ON_RESERVE = " : Test connection on reserve. Is valid? %s"; + String QMAN_200008_CONNECTION_DESTROYED = " : Connection has been destroyed."; + String QMAN_200009_CONNECTION_DESTROY_FAILURE = " : Unable to destroy a connection object."; + String QMAN_200010_EVENT_MBEAN_REGISTERED = " : Event instance %s::%s::%s successfully registered with MBean Server with name %s"; + String QMAN_200011_OBJECT_MBEAN_REGISTERED = " : Object instance %s::%s::%s:%s successfully registered with MBean Server with name %s"; + String QMAN_200012_OBJECT_MBEAN_UNREGISTERED = " : Object instance %s::%s::%s:%s successfully unregistered from MBean Server. Name was %s"; + String QMAN_200013_ARGUMENT_VALUE_ENCODED = " : Encoded value %S for argument %s. Type is %s"; + String QMAN_200014_INCOMING_INSTRUMENTATION_DATA = " : Incoming instrumentation data for %s::%s.%s.%s"; + String QMAN_200015_INCOMING_CONFIGURATION_DATA = " : Incoming configuration data for %s::%s.%s.%s"; + String QMAN_200016_PROPERTY_DEFINITION_HAS_BEEN_BUILT = " : Property definition for %s::%s.%s has been built."; + String QMAN_200017_STATISTIC_DEFINITION_HAS_BEEN_BUILT = " : Statistic definition for %s::%s.%s has been built."; + String QMAN_200018_OPTIONAL_PROPERTIES_INFO = " : Class %s::%s.%s has %s optional properties."; + String QMAN_200019_EVENT_ARGUMENT_DEFINITION_HAS_BEEN_BUILT = " : Event argument definition for %s::%s.%s has been built."; + String QMAN_200020_ENTITY_DEFINITION_HAS_BEEN_BUILT = " : Entity definition has been built (without schema) for %s::%s.%s"; + String QMAN_200021_INCOMING_EVENT_DATA = " : Incoming data for event %s::%s.%s"; + String QMAN_200022_VALIDATOR_INSTALLED = " : Validator %s for type %s successfully installed."; + String QMAN_200023_VALIDATOR_NOT_FOUND = " : No validator was found for type %s. The default (empty) validator will be used."; + String QMAN_200024_MANAGEMENT_MESSAGE_HAS_BEEN_SENT = " : Message has been sent to management exchange. Message content : %s"; + String QMAN_200025_SUBSCRIPTION_DECLARED = " : New subscription between queue %s and destination %s has been declared."; + String QMAN_200026_SUBSCRIPTION_REMOVED = " : Subscription named %s has been removed from remote broker."; + String QMAN_200027_QUEUE_DECLARED = " : New queue with name %s has been declared."; + String QMAN_200028_QUEUE_REMOVED= " : New queue with name %s has been undeclared."; + String QMAN_200029_BINDING_DECLARED = " : New binding with %s as routing key has been declared between queue %s and exchange %s."; + String QMAN_200030_BINDING_REMOVED = " : Binding with %s as routing key has been removed between queue %s and exchange %s."; + String QMAN_200031_COMPOUND_MESSAGE_CONTAINS = " : Incoming compound message contains %s message(s)."; + String QMAN_200032_COMMAND_MESSAGE_ROUTING_KEY = " : Command message routing key : %s"; + + // WARNING + String QMAN_300001_MESSAGE_DISCARDED = " : No handler has been configured for processing messages with \"%s\" as opcode. Message will be discarded."; + String QMAN_300002_UNKNOWN_SEQUENCE_NUMBER = " : Unable to deal with incoming message because it contains a unknown sequence number (%s)."; + + // ERROR + String QMAN_100001_BAD_MAGIC_NUMBER_FAILURE = " : Message processing failure : incoming message contains a bad magic number (%s) and therefore will be discaded."; + String QMAN_100002_MESSAGE_READ_FAILURE = " : Message I/O failure : unable to read byte message content and therefore it will be discarded."; + String QMAN_100003_MESSAGE_PROCESS_FAILURE = " : Message processing failure : unknown exception; see logs for more details."; + String QMAN_100004_HANDLER_INITIALIZATION_FAILURE = " : Message handler configured for opcode %s thrown an exception in initialization and therefore will be discarded."; + String QMAN_100005_CLASS_SCHEMA_PROCESSING_FAILURE = " : Q-Man was unable to process the schema response message."; + String QMAN_100006_EVENT_SCHEMA_PROCESSING_FAILURE = " : Q-Man was unable to process the schema response message."; + String QMAN_100007_UNABLE_TO_CONNECT_WITH_BROKER = " : Unable to connect with broker located on %s. This broker will be ignored."; + String QMAN_100008_MANAGEMENT_MESSAGE_HANDLER_NOT_AVAILABLE = " : Management Message Handler configured for opcode %s is not available and therefore will be discarded."; + String QMAN_100009_METHOD_REPLY_MESSAGE_HANDLER_NOT_AVAILABLE = " :Method-Reply Message Handler configured for opcode %s is not available and therefore will be discarded."; + String QMAN_100010_METHOD_INVOCATION_RESULT_FAILURE = " : an exception occurred while storing the result of a method invocation. Sequence number was %s"; + String QMAN_100011_UNKNOWN_CLASS_KIND = " : Unknwon class kind : %s)."; + String QMAN_100012_SCHEMA_MESSAGE_PROCESSING_FAILURE = " : Q-Man was unable to process the schema response message."; + String QMAN_100013_MBEAN_REGISTRATION_FAILURE = " : Unable to unregister object instance %s."; + String QMAN_100014_ATTRIBUTE_DECODING_FAILURE = " : Unable to decode value for attribute %s"; + String QMAN_100015_UNABLE_TO_SEND_SCHEMA_REQUEST = " : Unable to send a schema request schema for %s.%s"; + String QMAN_100016_UNABLE_TO_DECODE_FEATURE_VALUE = " : Unable to decode value for %s::%s::%s"; + String QMAN_100017_UNABLE_TO_CONNECT = ": Cannot connect to broker %s on %s"; + + // MESSAGES + String EVENT_SEVERITY_ATTRIBUTE_DESCRIPTION = "Severity level for this event."; + String EVENT_TIMESTAMP_ATTRIBUTE_DESCRIPTION = "Current timestamp of this event."; + String UNABLE_TO_STARTUP_CORRECTLY = " : Q-Man was unable to startup correctly : a configuration error occurred."; +} diff --git a/java/management/client/src/main/java/org/apache/qpid/management/Names.java b/java/management/client/src/main/java/org/apache/qpid/management/Names.java index dde7715509..4fabb40640 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/Names.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/Names.java @@ -49,4 +49,5 @@ public interface Names String CONFIGURATION_FILE_NAME = "/org/apache/qpid/management/config.xml"; String ARG_COUNT_PARAM_NAME = "argCount"; + String DEFAULT_PARAM_NAME ="default"; } diff --git a/java/management/client/src/main/java/org/apache/qpid/management/Protocol.java b/java/management/client/src/main/java/org/apache/qpid/management/Protocol.java index 222b5d8aa6..a37332f1f0 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/Protocol.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/Protocol.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.management; -import com.sun.org.apache.xerces.internal.jaxp.validation.ErrorHandlerAdaptor; - /** * Protocol defined constants. * diff --git a/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java b/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java index bbb5380d7d..368970af00 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java @@ -22,6 +22,7 @@ package org.apache.qpid.management.configuration; import java.util.UUID; +import org.apache.qpid.management.Messages; import org.apache.qpid.transport.util.Logger; /** @@ -118,7 +119,7 @@ class BrokerConnectionDataParser implements IParser Configuration.getInstance().addBrokerConnectionData(getUUId(),_connectionData); } catch(Exception exception) { - LOGGER.error(exception, "Unable to connect with broker located on %s and. Hence this broker will be ignored.", _connectionData); + LOGGER.error(exception, Messages.QMAN_100007_UNABLE_TO_CONNECT_WITH_BROKER, _connectionData); } _connectionData = new BrokerConnectionData(); break; diff --git a/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java b/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java index 6b47c06510..2c27926f47 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java @@ -28,6 +28,7 @@ import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.Names; import org.apache.qpid.management.domain.handler.base.IMessageHandler; import org.apache.qpid.management.domain.handler.impl.InvocationResult; @@ -193,10 +194,7 @@ public final class Configuration result.put(opcode, (IMessageHandler)Class.forName(className).newInstance()); } catch(Exception exception) { - LOGGER.error( - exception, - " : Management Message Handler configured for opcode %s is not available and therefore will be discarded.", - opcode); + LOGGER.error(exception,Messages.QMAN_100008_MANAGEMENT_MESSAGE_HANDLER_NOT_AVAILABLE,opcode); } } return result; @@ -222,10 +220,7 @@ public final class Configuration result.put(opcode, (IMessageHandler)Class.forName(className).newInstance()); } catch(Exception exception) { - LOGGER.error( - exception, - " :Method-Reply Message Handler configured for opcode %s is not available and therefore will be discarded.", - opcode); + LOGGER.error(exception,Messages.QMAN_100009_METHOD_REPLY_MESSAGE_HANDLER_NOT_AVAILABLE,opcode); } } return result; @@ -273,7 +268,7 @@ public final class Configuration _typeMappings.put(code, type); _validators.put(type, validatorClassName); - LOGGER.info(" : Type mapping : code = %s associated to %s (validator class is %s)", code,type,validatorClassName); + LOGGER.info(Messages.QMAN_000005_TYPE_MAPPING_CONFIGURED, code,type,validatorClassName); } /** @@ -286,7 +281,7 @@ public final class Configuration AccessMode accessMode = mapping.getAccessMode(); _accessModes.put(code, accessMode); - LOGGER.info(" : Access Mode mapping : code = %s associated to %s", code,accessMode); + LOGGER.info(Messages.QMAN_000006_ACCESS_MODE_MAPPING_CONFIGURED, code,accessMode); } /** @@ -302,7 +297,7 @@ public final class Configuration String handlerClass = mapping.getMessageHandlerClass(); _managementQueueHandlers.put(opcode, handlerClass); - LOGGER.info(" : Management Queue Message Handler Mapping : opcode = %s associated with %s", opcode,handlerClass); + LOGGER.info(Messages.QMAN_000007_MANAGEMENT_HANDLER_MAPPING_CONFIGURED, opcode,handlerClass); } /** @@ -318,7 +313,7 @@ public final class Configuration String handlerClass = mapping.getMessageHandlerClass(); _methodReplyQueueHandlers.put(opcode, handlerClass); - LOGGER.info(" : Method-Reply Queue Message Handler Mapping : opcode = %s associated with %s", opcode,handlerClass); + LOGGER.info(Messages.QMAN_000008_METHOD_REPLY_HANDLER_MAPPING_CONFIGURED, opcode,handlerClass); } /** @@ -332,7 +327,8 @@ public final class Configuration { QpidDatasource.getInstance().addConnectionPool(brokerId, connectionData); _brokerConnectionInfos.put(brokerId,connectionData); - LOGGER.info(" : Broker Configuration %s: %s",brokerId,connectionData); + + LOGGER.info(Messages.QMAN_000009_BROKER_DATA_CONFIGURED,brokerId,connectionData); } /** @@ -358,7 +354,7 @@ public final class Configuration _managementQueueName = Names.MANAGEMENT_QUEUE_PREFIX+uuid; _methodReplyQueueName = Names.METHOD_REPLY_QUEUE_PREFIX+uuid; - LOGGER.debug(" : Management queue name : %s",_managementQueueName); - LOGGER.debug(" : Method-reply queue name : %s",_methodReplyQueueName); + LOGGER.debug(Messages.QMAN_200004_MANAGEMENT_QUEUE_NAME,_managementQueueName); + LOGGER.debug(Messages.QMAN_200005_METHOD_REPLY_QUEUE_NAME,_methodReplyQueueName); } -} \ No newline at end of file +} diff --git a/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java b/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java index 1a51085ad3..21bdea59b5 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java @@ -28,6 +28,7 @@ import org.apache.commons.pool.BasePoolableObjectFactory; import org.apache.commons.pool.ObjectPool; import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.commons.pool.impl.GenericObjectPoolFactory; +import org.apache.qpid.management.Messages; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionException; import org.apache.qpid.transport.util.Logger; @@ -87,7 +88,8 @@ public final class QpidDatasource try { pools.get(_brokerId).returnObject(this); - LOGGER.debug(" : Connection %s returned to the pool.", this); + + LOGGER.debug(Messages.QMAN_200006_QPID_CONNECTION_RELEASED, this); } catch (Exception e) { @@ -150,7 +152,9 @@ public final class QpidDatasource { PooledConnection connection = (PooledConnection) obj; boolean isValid = connection.isValid(); - LOGGER.debug(" : Test connection on reserve. Is valid? %s",isValid); + + LOGGER.debug(Messages.QMAN_200007_TEST_CONNECTION_ON_RESERVE,isValid); + return isValid; } @@ -164,10 +168,11 @@ public final class QpidDatasource { PooledConnection connection = (PooledConnection) obj; connection.reallyClose(); - LOGGER.debug(" : Connection has been destroyed."); - } catch (Exception e) + + LOGGER.debug(Messages.QMAN_200008_CONNECTION_DESTROYED); + } catch (Exception exception) { - LOGGER.debug(e, " : Unable to destroy a connection object"); + LOGGER.debug(exception, Messages.QMAN_200009_CONNECTION_DESTROY_FAILURE); } } } diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/MethodResponseMessageHandler.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/MethodResponseMessageHandler.java index ba24a8d4e6..303708a010 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/MethodResponseMessageHandler.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/MethodResponseMessageHandler.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.domain.handler.base.BaseMessageHandler; import org.apache.qpid.management.domain.model.DomainModel; import org.apache.qpid.management.domain.model.InvocationEvent; @@ -80,11 +81,7 @@ public class MethodResponseMessageHandler extends BaseMessageHandler exchangeChannel.put(result); } catch (InterruptedException exception) { - LOGGER.error( - exception, - " : an exception occurred while storing the result of a method invocation. " + - "Sequence number was %s", - sequenceNumber); + LOGGER.error(exception,Messages.QMAN_100010_METHOD_INVOCATION_RESULT_FAILURE,sequenceNumber); } } else { @@ -106,4 +103,4 @@ public class MethodResponseMessageHandler extends BaseMessageHandler super.setDomainModel(domainModel); domainModel.setMethodInvocationListener(methodInvocationListener); } -} \ No newline at end of file +} diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java index fdee2f2f2d..add7fd073b 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.Names; import org.apache.qpid.management.Protocol; import org.apache.qpid.management.domain.handler.base.BaseMessageHandler; @@ -39,12 +40,25 @@ import org.apache.qpid.transport.codec.ManagementDecoder; */ public class SchemaResponseMessageHandler extends BaseMessageHandler { + /** + * Behavioural interface for classes that are responsible to deal with schema messages. + * + * @author Andrea Gazzarini + */ interface IProcessor { + /** + * Processes the incoming message using the given decoder. + * + * @param decoder the decoder used for dealing with incoming message. + */ void process(ManagementDecoder decoder); } - final IProcessor classDefinitionProcessor = new IProcessor() + /** + * Processor responsible to deal with class schema related messages. + */ + final IProcessor _classSchemaProcessor = new IProcessor() { @Override public void process(ManagementDecoder decoder) @@ -69,13 +83,15 @@ public class SchemaResponseMessageHandler extends BaseMessageHandler getMethods(decoder, howManyMethods)); } catch(Exception exception) { - _logger.error(exception," : Q-Man was unable to process the schema response message."); + _logger.error(exception,Messages.QMAN_100005_CLASS_SCHEMA_PROCESSING_FAILURE); } - } }; - final IProcessor eventDefinitionProcessor = new IProcessor() + /** + * Processor responsible to deal with class event related messages. + */ + final IProcessor _eventSchemaProcessor = new IProcessor() { @Override public void process(ManagementDecoder decoder) @@ -94,7 +110,7 @@ public class SchemaResponseMessageHandler extends BaseMessageHandler getAttributes(decoder, howManyArguments)); } catch(Exception exception) { - _logger.error(exception," : Q-Man was unable to process the schema response message."); + _logger.error(exception,Messages.QMAN_100006_EVENT_SCHEMA_PROCESSING_FAILURE); } } }; @@ -115,22 +131,22 @@ public class SchemaResponseMessageHandler extends BaseMessageHandler { case Protocol.CLASS : { - classDefinitionProcessor.process(decoder); + _classSchemaProcessor.process(decoder); break; } case Protocol.EVENT : { - eventDefinitionProcessor.process(decoder); + _eventSchemaProcessor.process(decoder); break; } default : { - _logger.error(" : Q-Man was unable to process the schema response message (reason : unknown class kind %s).",classKind); + _logger.error(Messages.QMAN_100011_UNKNOWN_CLASS_KIND,classKind); } } } catch(Exception exception) { - _logger.error(exception," : Q-Man was unable to process the schema response message."); + _logger.error(exception,Messages.QMAN_100012_SCHEMA_MESSAGE_PROCESSING_FAILURE); } } @@ -167,7 +183,8 @@ public class SchemaResponseMessageHandler extends BaseMessageHandler int howManyArguments = (Integer) method.get(Names.ARG_COUNT_PARAM_NAME); List> arguments = new ArrayList>(howManyArguments); - for (int argIndex = 0; argIndex < howManyArguments; argIndex++){ + for (int argIndex = 0; argIndex < howManyArguments; argIndex++) + { arguments.add(decoder.readMap()); } result.add(new MethodOrEventDataTransferObject(method,arguments)); @@ -191,7 +208,8 @@ public class SchemaResponseMessageHandler extends BaseMessageHandler int howManyArguments = (Integer) method.get(Names.ARG_COUNT_PARAM_NAME); List> arguments = new ArrayList>(howManyArguments); - for (int argIndex = 0; argIndex < howManyArguments; argIndex++){ + for (int argIndex = 0; argIndex < howManyArguments; argIndex++) + { arguments.add(decoder.readMap()); } result.add(new MethodOrEventDataTransferObject(method,arguments)); diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/JmxService.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/JmxService.java index 09003fac9e..1e03815768 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/JmxService.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/JmxService.java @@ -28,6 +28,7 @@ import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.Names; import org.apache.qpid.management.domain.model.QpidClass.QpidManagedObject; import org.apache.qpid.management.domain.model.QpidEvent.QpidManagedEvent; @@ -58,7 +59,7 @@ class JmxService _mxServer.registerMBean(eventInstance, name); LOGGER.debug( - " : Event instance %s::%s::%s successfully registered with MBean Server with name %s", + Messages.QMAN_200010_EVENT_MBEAN_REGISTERED, brokerId, packageName, eventClassName, @@ -95,7 +96,7 @@ class JmxService _mxServer.registerMBean(instance, name); LOGGER.debug( - " : Object instance %s::%s::%s:%s successfully registered with MBean Server with name %s", + Messages.QMAN_200011_OBJECT_MBEAN_REGISTERED, brokerId, packageName, className, @@ -130,8 +131,7 @@ class JmxService _mxServer.unregisterMBean(name); LOGGER.debug( - " : Object instance %s::%s::%s:%s successfully unregistered from MBean Server. " + - "Name was %s", + Messages.QMAN_200012_OBJECT_MBEAN_UNREGISTERED, brokerId, packageName, className, @@ -139,7 +139,7 @@ class JmxService name); } catch (Exception exception) { - LOGGER.error(exception," : Unable to unregister object instance %s.",name); + LOGGER.error(exception,Messages.QMAN_100013_MBEAN_REGISTRATION_FAILURE,name); } } } diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidArgument.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidArgument.java index 94d532571a..db3ddb97e7 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidArgument.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidArgument.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.management.domain.model; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.messages.AmqpCoDec; import org.apache.qpid.transport.codec.ManagementDecoder; import org.apache.qpid.transport.util.Logger; @@ -72,11 +73,11 @@ class QpidArgument extends QpidProperty public void encode(Object value,AmqpCoDec encoder) { _type.encode(value, encoder); - LOGGER.debug(" : Encoded value %S for argument %s. Type is %s",value,_name,_type); + LOGGER.debug(Messages.QMAN_200013_ARGUMENT_VALUE_ENCODED,value,_name,_type); } public Object decode(ManagementDecoder decoder) { return _type.decode(decoder); } -} \ No newline at end of file +} diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidAttribute.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidAttribute.java index d0bc470de9..69b5cb0565 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidAttribute.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidAttribute.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.management.domain.model; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.domain.model.type.Type; import org.apache.qpid.transport.codec.ManagementDecoder; import org.apache.qpid.transport.util.Logger; @@ -57,7 +58,6 @@ class QpidAttribute extends QpidFeature void setUnit (String unit) { this._unit = unit; - LOGGER.debug("Unit : %s", unit); } /** @@ -90,8 +90,9 @@ class QpidAttribute extends QpidFeature { try { return _type.decode(decoder); - } catch(RuntimeException exception) { - LOGGER.error(exception,"Unable to decode value for attribute %s",this); + } catch(RuntimeException exception) + { + LOGGER.error(exception,Messages.QMAN_100014_ATTRIBUTE_DECODING_FAILURE,this); throw exception; } } @@ -101,4 +102,4 @@ class QpidAttribute extends QpidFeature { return super.toString()+",type="+_type; } -} \ No newline at end of file +} diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java index a119b6c2b1..ec26ef819e 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java @@ -30,6 +30,7 @@ import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.management.Attribute; import javax.management.AttributeList; @@ -45,6 +46,7 @@ import javax.management.ObjectName; import javax.management.ReflectionException; import javax.management.RuntimeOperationsException; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.domain.handler.impl.IMethodInvocationListener; import org.apache.qpid.management.domain.handler.impl.InvocationResult; import org.apache.qpid.management.domain.handler.impl.MethodOrEventDataTransferObject; @@ -131,7 +133,7 @@ class QpidClass extends QpidEntity } catch (Exception e) { _logger.error( - " : Unable to send a schema request schema for %s.%s", + Messages.QMAN_100015_UNABLE_TO_SEND_SCHEMA_REQUEST, _parent.getName(), _name); } finally { @@ -156,7 +158,7 @@ class QpidClass extends QpidEntity } catch (Exception e) { _logger.error( - " : Unable to send a schema request schema for %s.%s", + Messages.QMAN_100015_UNABLE_TO_SEND_SCHEMA_REQUEST, _parent.getName(), _name); } finally { @@ -485,7 +487,8 @@ class QpidClass extends QpidEntity */ void addInstrumentationData (Binary objectId, byte[] rawData) { - _logger.debug(" : Incoming instrumentation data for %s::%s.%s.%s", + _logger.debug( + Messages.QMAN_200014_INCOMING_INSTRUMENTATION_DATA, _parent.getOwnerId(), _parent.getName(), _name, @@ -501,7 +504,8 @@ class QpidClass extends QpidEntity */ void addConfigurationData (Binary objectId, byte[] rawData) { - _logger.debug(" : Incoming configuration data for %s::%s.%s.%s", + _logger.debug( + Messages.QMAN_200015_INCOMING_CONFIGURATION_DATA, _parent.getOwnerId(), _parent.getName(), _name, @@ -523,7 +527,7 @@ class QpidClass extends QpidEntity List> statisticDefinitions, List methodDefinitions) throws UnableToBuildFeatureException { - _logger.info(" : Incoming schema for %s::%s.%s",_parent.getOwnerId(),_parent.getName(),_name); + _logger.info(Messages.QMAN_000010_INCOMING_SCHEMA,_parent.getOwnerId(),_parent.getName(),_name); _state.setSchema(propertyDefinitions, statisticDefinitions, methodDefinitions); } @@ -557,7 +561,7 @@ class QpidClass extends QpidEntity attributes[index++]=(MBeanAttributeInfo) builder.getManagementFeature(); _logger.debug( - " : Property definition for %s::%s.%s has been built.", + Messages.QMAN_200016_PROPERTY_DEFINITION_HAS_BEEN_BUILT, _parent.getName(), _name, property); @@ -566,7 +570,7 @@ class QpidClass extends QpidEntity _howManyPresenceBitMasks = (int)Math.ceil((double)howManyOptionalProperties / 8); _logger.debug( - " : Class %s::%s.%s has %s optional properties.", + Messages.QMAN_200018_OPTIONAL_PROPERTIES_INFO, _parent.getOwnerId(), _parent.getName(), _name, @@ -583,7 +587,7 @@ class QpidClass extends QpidEntity attributes[index++]=(MBeanAttributeInfo) builder.getManagementFeature(); _logger.debug( - " : Statistic definition for %s::%s.%s has been built.", + Messages.QMAN_200017_STATISTIC_DEFINITION_HAS_BEEN_BUILT, _parent.getName(), _name, statistic); @@ -656,10 +660,16 @@ class QpidClass extends QpidEntity int sequenceNumber = SequenceNumberGenerator.getNextSequenceNumber(); _methodInvocationListener.operationIsGoingToBeInvoked(new InvocationEvent(this,sequenceNumber,_exchangeChannelForMethodInvocations)); - _service.invoke(_parent.getName(), _name, _hash,objectId,parameters, method,sequenceNumber); - + _service.invoke(_parent.getName(), _name, _hash,objectId,parameters, method,sequenceNumber,objectId.getBankId(),objectId.getBrokerId()); + // TODO : Shoudl be configurable? InvocationResult result = _exchangeChannelForMethodInvocations.poll(5000,TimeUnit.MILLISECONDS); + + if (result == null) + { + throw new TimeoutException(); + } + Map output = method.decodeParameters(result.getOutputAndBidirectionalArgumentValues()); result.setOutputSection(output); @@ -694,7 +704,7 @@ class QpidClass extends QpidEntity Object value = property.decodeValue(decoder,presenceBitMasks); instance.createOrReplaceAttributeValue(property.getName(),value); } catch(Exception ignore) { - _logger.error("Unable to decode value for %s::%s::%s", _parent.getName(),_name,property.getName()); + _logger.error(Messages.QMAN_100016_UNABLE_TO_DECODE_FEATURE_VALUE, _parent.getName(),_name,property.getName()); } } } @@ -716,7 +726,7 @@ class QpidClass extends QpidEntity Object value = statistic.decodeValue(decoder); instance.createOrReplaceAttributeValue(statistic.getName(),value); } catch(Exception ignore) { - _logger.error("Unable to decode value for %s::%s::%s", _parent.getName(),_name,statistic.getName()); + _logger.error(Messages.QMAN_100016_UNABLE_TO_DECODE_FEATURE_VALUE, _parent.getName(),_name,statistic.getName()); } } } @@ -752,7 +762,6 @@ class QpidClass extends QpidEntity */ void releaseResources () { - // Chiamlo entityInstances e mettilo nella superclasse? _objectInstances.clear(); JMX_SERVICE.unregisterObjectInstances(); _service.close(); diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidEntity.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidEntity.java index 62f469e53c..95db936329 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidEntity.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidEntity.java @@ -9,6 +9,7 @@ import javax.management.DynamicMBean; import javax.management.MBeanInfo; import javax.management.RuntimeOperationsException; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.domain.model.type.Binary; import org.apache.qpid.management.domain.services.QpidService; import org.apache.qpid.transport.util.Logger; @@ -110,7 +111,7 @@ public abstract class QpidEntity this._service = new QpidService(_parent.getOwnerId()); _logger.debug( - " : Entity definition has been built (without schema) for %s::%s.%s", + Messages.QMAN_200020_ENTITY_DEFINITION_HAS_BEEN_BUILT, _parent.getOwnerId(), _parent.getName(), _name); diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidEvent.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidEvent.java index 98a05bf931..e6205bba76 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidEvent.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidEvent.java @@ -38,6 +38,7 @@ import javax.management.MBeanInfo; import javax.management.ReflectionException; import javax.management.RuntimeOperationsException; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.domain.model.type.Binary; import org.apache.qpid.transport.codec.ManagementDecoder; @@ -101,7 +102,7 @@ class QpidEvent extends QpidEntity } catch (Exception e) { _logger.error( - " : Unable to send a schema request schema for %s.%s", + Messages.QMAN_100015_UNABLE_TO_SEND_SCHEMA_REQUEST, _parent.getName(), _name); } finally { @@ -306,7 +307,8 @@ class QpidEvent extends QpidEntity */ void addEventData (byte[] rawData, long currentTimestamp, int severity) { - _logger.debug(" : Incoming data for event %s::%s.%s", + _logger.debug( + Messages.QMAN_200021_INCOMING_EVENT_DATA, _parent.getOwnerId(), _parent.getName(), _name); @@ -324,7 +326,7 @@ class QpidEvent extends QpidEntity */ void setSchema (List> argumentDefinitions) throws UnableToBuildFeatureException { - _logger.info(" : Incoming schema for %s::%s.%s",_parent.getOwnerId(),_parent.getName(),_name); + _logger.info(Messages.QMAN_000010_INCOMING_SCHEMA,_parent.getOwnerId(),_parent.getName(),_name); _state.setSchema(argumentDefinitions); } @@ -360,7 +362,7 @@ class QpidEvent extends QpidEntity attributes[index++]=(MBeanAttributeInfo) builder.getManagementFeature(); _logger.debug( - " : Event argument definition for %s::%s.%s has been built.", + Messages.QMAN_200019_EVENT_ARGUMENT_DEFINITION_HAS_BEEN_BUILT, _parent.getName(), _name, argument); @@ -369,7 +371,7 @@ class QpidEvent extends QpidEntity attributes[index++] = new MBeanAttributeInfo( SEVERITY_ATTR_NAME, Integer.class.getName(), - "Severity level for this event.", + Messages.EVENT_SEVERITY_ATTRIBUTE_DESCRIPTION, true, false, false); @@ -377,11 +379,10 @@ class QpidEvent extends QpidEntity attributes[index++] = new MBeanAttributeInfo( TIMESTAMP_ATTR_NAME, Date.class.getName(), - "Current timestamp of this event.", + Messages.EVENT_TIMESTAMP_ATTRIBUTE_DESCRIPTION, true, false, false); - } /** @@ -416,7 +417,7 @@ class QpidEvent extends QpidEntity Object value = property.decodeValue(decoder); instance.createOrReplaceAttributeValue(property.getName(),value); } catch(Exception ignore) { - _logger.error("Unable to decode value for %s::%s::%s", _parent.getName(),_name,property.getName()); + _logger.error(Messages.QMAN_100016_UNABLE_TO_DECODE_FEATURE_VALUE, _parent.getName(),_name,property.getName()); } } } @@ -448,7 +449,8 @@ class QpidEvent extends QpidEntity * * @return true if there is one or more managed instances. */ - boolean hasNoInstances() { + boolean hasNoInstances() + { return _eventInstances.isEmpty(); } } diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidFeatureBuilder.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidFeatureBuilder.java index 23353bf37e..d0862c15ca 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidFeatureBuilder.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidFeatureBuilder.java @@ -34,6 +34,7 @@ import javax.management.MBeanParameterInfo; import org.apache.qpid.management.configuration.Configuration; import org.apache.qpid.management.configuration.UnknownTypeCodeException; import org.apache.qpid.management.domain.handler.impl.MethodOrEventDataTransferObject; +import org.apache.qpid.management.Names; /** * A builder used to parse incoming schema message and therefore to build a feature (property, statistic, method, event) @@ -240,7 +241,7 @@ class QpidFeatureBuilder for (Entry argumentAttribute : _featureDefinition.entrySet()) { String key = argumentAttribute.getKey(); - if ("default".equals(key)) + if (Names.DEFAULT_PARAM_NAME.equals(key)) { argument.setDefaultValue(argumentAttribute.getValue()); } else { @@ -307,7 +308,7 @@ class QpidFeatureBuilder throw new MissingFeatureAttributesException(_mandatoryAttributes); } - QpidMethod method = new QpidMethod((String)definition.get("name"),(String) definition.get("desc")); + QpidMethod method = new QpidMethod((String)definition.get(Attribute.name.name()),(String) definition.get(Attribute.desc.name())); List> args = _methodOrEventDefinition.getArgumentsDefinitions(); diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidProperty.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidProperty.java index 8b2544af1c..6335a553ae 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidProperty.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidProperty.java @@ -22,6 +22,7 @@ package org.apache.qpid.management.domain.model; import java.lang.reflect.Constructor; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.configuration.Configuration; import org.apache.qpid.management.domain.model.type.Type; import org.apache.qpid.transport.codec.ManagementDecoder; @@ -172,10 +173,10 @@ class QpidProperty extends QpidAttribute Class validatorClass = Class.forName(Configuration.getInstance().getValidatorClassName(type)); Constructor validatorConstructor = validatorClass.getDeclaredConstructor(QpidProperty.class); _validator = (IValidator) validatorConstructor.newInstance(this); - LOGGER.debug("Validator %s for type %s successfully installed." ,validatorClass.getName(), type); + LOGGER.debug(Messages.QMAN_200022_VALIDATOR_INSTALLED ,validatorClass.getName(), type); } catch(Exception exception) { _validator = EMPTY_VALIDATOR; - LOGGER.debug("No validator was found for type %s. The default (empty) validator will be used." , type); + LOGGER.debug(Messages.QMAN_200023_VALIDATOR_NOT_FOUND , type); } } @@ -292,4 +293,4 @@ class QpidProperty extends QpidAttribute { return _decoder == _optionalPropertyDecoder; } -} \ No newline at end of file +} diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Binary.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Binary.java index 4c0dd926d0..02b38d8499 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Binary.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Binary.java @@ -35,9 +35,6 @@ import org.apache.qpid.management.messages.AmqpCoDec; public final class Binary implements Serializable { private static final long serialVersionUID = -6865585077320637567L; - - // instance identifider. - private final UUID uuid; // Marker internal (empty) interface private interface State extends Serializable{} @@ -53,7 +50,7 @@ public final class Binary implements Serializable @Override public int hashCode () { - hashCode = Arrays.hashCode(bytes); + hashCode = Arrays.hashCode(_bytes); state = hashCodeAlreadyComputed; return hashCode; } @@ -73,8 +70,10 @@ public final class Binary implements Serializable return hashCode; } }; - - private final byte [] bytes; + + private final UUID uuid; + private final byte [] _bytes; + private long _first; private int hashCode; /** Current state (hashcode computation). */ @@ -87,7 +86,10 @@ public final class Binary implements Serializable */ public Binary(byte [] bytes) { - this.bytes = bytes; + this._bytes = bytes; + byte [] array = new byte [8]; + System.arraycopy(_bytes, 0, array, 0, 8); + _first = AmqpCoDec.unpack64(array); uuid = UUID.randomUUID(); } @@ -103,7 +105,7 @@ public final class Binary implements Serializable try { Binary binary = (Binary)obj; - return Arrays.equals(bytes, binary.bytes); + return Arrays.equals(_bytes, binary._bytes); } catch (Exception exception) { return false; @@ -117,7 +119,7 @@ public final class Binary implements Serializable */ public void encode(AmqpCoDec encoder) { - encoder.pack(bytes); + encoder.pack(_bytes); } @Override @@ -125,4 +127,24 @@ public final class Binary implements Serializable { return uuid.toString(); } + + /** + * Returns the bank identifier derived from this object identifier. + * + * @return the bank identifier derived from this object identifier. + */ + public long getBankId() + { + return _first & 0x000000000FFFFFFF; + } + + /** + * Returns the broker identifier derived from this object identifier. + * + * @return the broker identifier derived from this object identifier. + */ + public long getBrokerId() + { + return (_first & 281474708275200L) >> 28; + } } diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/BrokerMessageListener.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/BrokerMessageListener.java index d5fdce09fc..4be48d1436 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/BrokerMessageListener.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/BrokerMessageListener.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Map.Entry; import org.apache.qpid.api.Message; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.Protocol; import org.apache.qpid.management.domain.handler.base.IMessageHandler; import org.apache.qpid.management.domain.model.DomainModel; @@ -56,10 +57,8 @@ class BrokerMessageListener implements MessageListener static void debugIncomingMessage(ByteBuffer message) { if (LOGGER.isDebugEnabled()) - { - LOGGER.debug( - " : New incoming message has been received. Message content : %s", - Arrays.toString(message.array())); + { + LOGGER.debug(Messages.QMAN_200001_INCOMING_MESSAGE_HAS_BEEN_RECEIVED, Arrays.toString(message.array())); } } @@ -70,7 +69,7 @@ class BrokerMessageListener implements MessageListener { for (Entry entry : _handlers.entrySet()) { - LOGGER.debug(" : \"%s\" opcode is associated to handler %s",entry.getKey(),entry.getValue()); + LOGGER.debug(Messages.QMAN_200002_OPCODE_HANDLER_ASSOCIATION,entry.getKey(),entry.getValue()); } } } @@ -98,57 +97,22 @@ class BrokerMessageListener implements MessageListener * * @param message the incoming message. */ - public void onMessage (Message message) + public void onMessage (Message compoundMessage) { - try - { - ByteBuffer buffer = message.readData(); - - // TODO : Should be better...! - String magicNumber = new String(new byte[] {buffer.get(),buffer.get(),buffer.get()}); - if (!Protocol.MAGIC_NUMBER.equals(magicNumber)) - { - LOGGER.error( - " : Message processing failure : incoming message contains a bad magic number (%s) " + - "and therefore will be discaded.", - magicNumber); - return; - } - - char opcode = (char)buffer.get(); - - IMessageHandler handler = _handlers.get(opcode); - if (handler != null) - { - ManagementDecoder decoder = new ManagementDecoder(); - decoder.init(buffer); - - LOGGER.debug( - " : Incoming message with \"%s\" as opcode will be forwarded to %s for processing.", - opcode, - handler); - - handler.process(decoder,decoder.readSequenceNo()); - } else - { - LOGGER.warn( - " : No handler has been configured for processing messages with \"%s\" as opcode. " + - "This message will be discarded.", - opcode); - - Log.debugConfiguredHandlers(_handlers); - } - } catch(IOException exception) + try + { + MessageTokenizer tokenizer = new MessageTokenizer(compoundMessage); + while (tokenizer.hasMoreElements()) + { + dispatch(tokenizer.nextElement()); + } + } catch(IOException exception) { - LOGGER.error( - exception, - " : Message I/O failure : unable to read byte message content and therefore it will be discarded."); + LOGGER.error(exception,Messages.QMAN_100002_MESSAGE_READ_FAILURE); } catch(Exception exception) - { - LOGGER.error( - exception, - " : Message processing failure : unknown exception; see logs for more details."); - } + { + LOGGER.error(exception,Messages.QMAN_100003_MESSAGE_PROCESS_FAILURE); + } } /** @@ -171,11 +135,44 @@ class BrokerMessageListener implements MessageListener handler.setDomainModel(_domainModel); _handlers.put(opcode, handler); } catch(Exception exception) { - LOGGER.error( - exception, - " : Message handler configured for opcode %s thrown an exception in initialization and therefore will be discarded.", - opcode); + LOGGER.error(exception,Messages.QMAN_100004_HANDLER_INITIALIZATION_FAILURE, opcode); } } } -} \ No newline at end of file + + /** + * Dispatches the given message to the appropriate handler. + * + * @param message the incoming message. + * @throws IOException when the message content cannot be read. + */ + private void dispatch(Message message) throws IOException + { + ByteBuffer buffer = message.readData(); + + // TODO : Should be better...! + String magicNumber = new String(new byte[] {buffer.get(),buffer.get(),buffer.get()}); + if (!Protocol.MAGIC_NUMBER.equals(magicNumber)) + { + LOGGER.error(Messages.QMAN_100001_BAD_MAGIC_NUMBER_FAILURE,magicNumber); + return; + } + + char opcode = (char)buffer.get(); + + IMessageHandler handler = _handlers.get(opcode); + if (handler != null) + { + ManagementDecoder decoder = new ManagementDecoder(); + decoder.init(buffer); + + LOGGER.debug(Messages.QMAN_200003_MESSAGE_FORWARDING,opcode,handler); + + handler.process(decoder,decoder.readSequenceNo()); + } else + { + LOGGER.warn(Messages.QMAN_300001_MESSAGE_DISCARDED,opcode); + Log.debugConfiguredHandlers(_handlers); + } + } +} diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/ManagementClient.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/ManagementClient.java index d4535596e9..a3584571f3 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/ManagementClient.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/ManagementClient.java @@ -23,6 +23,7 @@ package org.apache.qpid.management.domain.services; import java.util.UUID; import org.apache.qpid.QpidException; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.Names; import org.apache.qpid.management.configuration.BrokerConnectionData; import org.apache.qpid.management.configuration.Configuration; @@ -102,9 +103,7 @@ final class ManagementClient */ void shutdown () { - LOGGER.info( - " : The shutdown sequence has been initiated for management client connected with broker %s", - _domainModel.getBrokerId()); + LOGGER.info(Messages.QMAN_000011_SHUTDOWN_INITIATED,_domainModel.getBrokerId()); removeMethodReplyConsumer(); destroyAndUnbingMethodReplyQueue(); @@ -116,9 +115,7 @@ final class ManagementClient _service.close(); - LOGGER.info( - " : Management client connected with broker %s shut down successfully.", - _domainModel.getBrokerId()); + LOGGER.info(Messages.QMAN_000012_MANAGEMENT_CLIENT_SHUT_DOWN,_domainModel.getBrokerId()); } /** @@ -129,9 +126,8 @@ final class ManagementClient BrokerMessageListener methodReplyChannelListener = new BrokerMessageListener(_domainModel); methodReplyChannelListener.setHandlers(Configuration.getInstance().getMethodReplyQueueHandlers()); _service.createSubscription(_methodReplyQueueName, _methodReplyQueueName, methodReplyChannelListener); - LOGGER.info( - " : Method-reply queue consumer has been successfully installed and bound on broker %s.", - _domainModel.getBrokerId()); + + LOGGER.info(Messages.QMAN_000013_METHOD_REPLY_CONSUMER_INSTALLED, _domainModel.getBrokerId()); } /** @@ -143,9 +139,7 @@ final class ManagementClient managementChannelListener.setHandlers(Configuration.getInstance().getManagementQueueHandlers()); _service.createSubscription(_managementQueueName, _managementQueueName, managementChannelListener); - LOGGER.info( - " : Management queue consumer has been successfully installed and bound on broker %s.", - _domainModel.getBrokerId()); + LOGGER.info(Messages.QMAN_000014_MANAGEMENT_CONSUMER_INSTALLED, _domainModel.getBrokerId()); } /** @@ -159,10 +153,7 @@ final class ManagementClient Names.MANAGEMENT_EXCHANGE, Names.MANAGEMENT_ROUTING_KEY); - LOGGER.info( - " : Management queue with name %s has been successfully declared and bound on broker %s.", - _managementQueueName, - _domainModel.getBrokerId()); + LOGGER.info(Messages.QMAN_000015_MANAGEMENT_QUEUE_DECLARED,_managementQueueName,_domainModel.getBrokerId()); } /** @@ -174,10 +165,7 @@ final class ManagementClient _service.declareQueue(_methodReplyQueueName); _service.declareBinding(_methodReplyQueueName, Names.AMQ_DIRECT_QUEUE, _methodReplyQueueName); - LOGGER.info( - " : Method-reply queue with name %s has been successfully declared and bound on broker %s.", - _methodReplyQueueName, - _domainModel.getBrokerId()); + LOGGER.info(Messages.QMAN_000016_METHOD_REPLY_QUEUE_DECLARED,_methodReplyQueueName, _domainModel.getBrokerId()); } /** @@ -187,10 +175,7 @@ final class ManagementClient { _service.removeSubscription(_methodReplyQueueName); - LOGGER.info( - " : Consumer %s has been removed from broker %s.", - _methodReplyQueueName, - _domainModel.getBrokerId()); + LOGGER.info(Messages.QMAN_000017_CONSUMER_HAS_BEEN_REMOVED,_methodReplyQueueName,_domainModel.getBrokerId()); } /** @@ -201,10 +186,7 @@ final class ManagementClient _service.declareUnbinding(_methodReplyQueueName, Names.AMQ_DIRECT_QUEUE, _methodReplyQueueName); _service.deleteQueue(_methodReplyQueueName); - LOGGER.info( - " : Queue %s has been removed from broker %s.", - _methodReplyQueueName, - _domainModel.getBrokerId()); + LOGGER.info(Messages.QMAN_000018_QUEUE_UNDECLARED,_methodReplyQueueName,_domainModel.getBrokerId()); } /** @@ -214,10 +196,7 @@ final class ManagementClient { _service.removeSubscription(_managementQueueName); - LOGGER.info( - " : Consumer %s has been removed from broker %s.", - _managementQueueName, - _domainModel.getBrokerId()); + LOGGER.info(Messages.QMAN_000017_CONSUMER_HAS_BEEN_REMOVED,_managementQueueName,_domainModel.getBrokerId()); } /** @@ -228,10 +207,7 @@ final class ManagementClient _service.declareUnbinding(_managementQueueName, Names.MANAGEMENT_EXCHANGE, Names.MANAGEMENT_ROUTING_KEY); _service.deleteQueue(_managementQueueName); - LOGGER.info( - " : Queue %s has been removed from broker %s.", - _managementQueueName, - _domainModel.getBrokerId()); + LOGGER.info(Messages.QMAN_000018_QUEUE_UNDECLARED, _managementQueueName,_domainModel.getBrokerId()); } /** @@ -252,4 +228,4 @@ final class ManagementClient { _service.sync(); } -} \ No newline at end of file +} diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/MessageTokenizer.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/MessageTokenizer.java new file mode 100644 index 0000000000..623d74e737 --- /dev/null +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/MessageTokenizer.java @@ -0,0 +1,133 @@ +package org.apache.qpid.management.domain.services; + +import java.io.IOException; +import java.util.Enumeration; +import java.util.Iterator; +import java.util.LinkedList; + +import org.apache.qpid.api.Message; +import org.apache.qpid.management.Messages; +import org.apache.qpid.management.Protocol; +import org.apache.qpid.nclient.util.ByteBufferMessage; +import org.apache.qpid.transport.codec.ManagementDecoder; +import org.apache.qpid.transport.util.Logger; + +/** + * The message tokenizer class allows a multi message listener to break a + * message into tokens where each token is itself a valid AMQP message. + * + * @author Andrea Gazzarini + * @see QPID-1368 + */ +class MessageTokenizer implements Enumeration +{ + private final static Logger LOGGER = Logger.get(MessageTokenizer.class); + + static byte [] MAGIC_NUMBER_BYTES; + + private LinkedList _messages = new LinkedList(); + private Iterator _iterator; + + static + { + try + { + MAGIC_NUMBER_BYTES = Protocol.MAGIC_NUMBER.getBytes("UTF-8"); + } catch(Exception exception) + { + throw new ExceptionInInitializerError(exception); + } + } + + /** + * Builds a new Message tokenizer with the given message. + * Note that if the given message is not a "compound" message this tokenizer will producer only one token; + * That is, the token is a message equals to the given message. + * + * @param compoundMessage the compound message + * @throws IOException when it's not possible to read the given message content. + */ + MessageTokenizer(Message compoundMessage) throws IOException + { + build(compoundMessage); + } + + @Override + public boolean hasMoreElements() + { + return _iterator.hasNext(); + } + + @Override + public Message nextElement() + { + return _iterator.next(); + } + + /** + * Retruns the number of the tokens produced by this tokenizer. + * + * @return the number of the tokens produced by this tokenizer. + */ + public int countTokens() + { + return _messages.size(); + } + + // Internal methods used for splitting the multi message byte array. + int indexOf(byte[] source, int startIndex) + { + int currentSourceIndex; + int currentExampleIndex; + + if (startIndex + 3 > source.length) + return -1; + + for (currentSourceIndex = startIndex; currentSourceIndex <= source.length - 3; currentSourceIndex++) + { + for (currentExampleIndex = 0; currentExampleIndex < 3; currentExampleIndex++) + { + if (source[currentSourceIndex + currentExampleIndex] != MAGIC_NUMBER_BYTES[currentExampleIndex]) + break; + } + + if (currentExampleIndex == 3) + return currentSourceIndex; + } + return -1; + } + + // Internal method used for building the tokens. + private void build(Message compoundMessage) throws IOException + { + int startIndex = 0; + int indexOfMagicNumber = 0; + + ManagementDecoder decoder = new ManagementDecoder(); + decoder.init(compoundMessage.readData()); + byte [] source = decoder.readReaminingBytes(); + + int howManyTokens = 1; + + while ((indexOfMagicNumber = indexOf(source, startIndex+1)) != -1) + { + addMessageToken(source, startIndex, (indexOfMagicNumber-startIndex)); + startIndex = indexOfMagicNumber; + howManyTokens++; + } + addMessageToken(source, startIndex, (source.length-startIndex)); + _iterator = _messages.iterator(); + + LOGGER.debug(Messages.QMAN_200031_COMPOUND_MESSAGE_CONTAINS,howManyTokens); + }; + + // Builds & adds a new "message" token + private void addMessageToken(byte [] source,int startIndex,int length) throws IOException + { + byte [] messageData = new byte[length]; + System.arraycopy(source, startIndex, messageData, 0, messageData.length); + Message message = new ByteBufferMessage(); + message.appendData(messageData); + _messages.add(message); + } +} diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QMan.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QMan.java index ce700aa283..f2a4ea54a0 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QMan.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QMan.java @@ -29,6 +29,7 @@ import java.util.UUID; import java.util.Map.Entry; import org.apache.log4j.xml.DOMConfigurator; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.configuration.BrokerConnectionData; import org.apache.qpid.management.configuration.Configuration; import org.apache.qpid.management.configuration.ConfigurationException; @@ -51,8 +52,8 @@ public class QMan */ void start() throws StartupFailureException { - LOGGER.info(" : Starting Q-Man..."); - LOGGER.info(" : Reading Q-Man configuration..."); + LOGGER.info(Messages.QMAN_000001_STARTING_QMAN); + LOGGER.info(Messages.QMAN_000002_READING_CONFIGURATION); addShutDownHook(); @@ -60,7 +61,7 @@ public class QMan try { configurator.configure(); - LOGGER.info(" : Creating management client(s)..."); + LOGGER.info(Messages.QMAN_000003_CREATING_MANAGEMENT_CLIENTS); for (Entry entry : Configuration.getInstance().getConnectionInfos()) { UUID brokerId = entry.getKey(); @@ -71,28 +72,16 @@ public class QMan managementClients.add(client); client.estabilishFirstConnectionWithBroker(); - LOGGER.info(" : Management client for broker %s successfully connected.",brokerId); + LOGGER.info(Messages.QMAN_000004_MANAGEMENT_CLIENT_CONNECTED,brokerId); } catch(StartupFailureException exception) { - LOGGER.error(exception, ": Cannot connect to broker %s on %s",brokerId,data); + LOGGER.error(exception, Messages.QMAN_100017_UNABLE_TO_CONNECT,brokerId,data); } } - LOGGER.info(" : Q-Man open for e-business."); - - // TODO : console enhancement (i.e. : connect another broker) - System.out.println("Type \"q\" to quit."); - BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); - while ( !"q".equals(reader.readLine()) ){ - } + LOGGER.info(Messages.QMAN_000019_QMAN_STARTED); } catch(ConfigurationException exception) { - LOGGER.error( - exception, - " : Q-Man was unable to startup correctly : a configuration error occurred."); - System.exit(1); + LOGGER.error(exception,Messages.UNABLE_TO_STARTUP_CORRECTLY); + throw new StartupFailureException(exception); } - catch(IOException exception) - { - throw new StartupFailureException(exception); - } } /** @@ -101,11 +90,12 @@ public class QMan private void addShutDownHook() { // SHUTDOWN HOOK - Runtime.getRuntime().addShutdownHook(new Thread(){ + Runtime.getRuntime().addShutdownHook(new Thread() + { @Override public void run () { - LOGGER.info(" : Shutting down Q-Man..."); + LOGGER.info(Messages.QMAN_000020_SHUTTING_DOWN_QMAN); try { for (ManagementClient client : managementClients) @@ -114,9 +104,8 @@ public class QMan } } catch(Exception exception) { - } - LOGGER.info(" : Q-Man shut down."); + LOGGER.info(Messages.QMAN_000021_SHUT_DOWN); } }); } @@ -134,18 +123,26 @@ public class QMan DOMConfigurator.configureAndWatch(logFileName,5000); } - new Thread() - { - public void run() - { - QMan qman = new QMan(); - try - { - qman.start(); - } catch (StartupFailureException exception) { - exception.printStackTrace(); - } - } - }.start(); + QMan qman = new QMan(); + try + { + qman.start(); + + // TODO : console enhancement (i.e. : connect another broker) + System.out.println("Type \"q\" to quit."); + BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); + while ( !"q".equals(reader.readLine()) ) + { + + } + System.exit(-1); + } catch (StartupFailureException exception) + { + exception.printStackTrace(); + System.exit(-1); + } catch (IOException exception) + { + System.exit(-1); + } } } diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java index d9500ae2dd..a12993d40e 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java @@ -21,16 +21,14 @@ package org.apache.qpid.management.domain.services; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import org.apache.qpid.QpidException; import org.apache.qpid.api.Message; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.Names; -import org.apache.qpid.management.configuration.Configuration; import org.apache.qpid.management.configuration.QpidDatasource; import org.apache.qpid.management.domain.model.QpidMethod; import org.apache.qpid.management.domain.model.type.Binary; @@ -58,39 +56,6 @@ public class QpidService implements SessionListener { private final static Logger LOGGER = Logger.get(QpidService.class); - // Inner static class used for logging and avoid conditional logic (isDebugEnabled()) duplication. - private static class Log - { - /** - * Logs the content f the message. - * This will be written on log only if DEBUG level is enabled. - * - * @param messageContent the raw content of the message. - */ - static void logMessageContent(byte [] messageContent) - { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - " : Message has been sent to management exchange. Message content : %s", - Arrays.toString(messageContent)); - } - } - - /** - * Logs the content f the message. - * This will be written on log only if DEBUG level is enabled. - * - * @param messageContent the raw content of the message. - */ - static void logMessageContent(ByteBuffer messageContent) - { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - " : Message has been sent to management exchange."); - } - } - } - private UUID _brokerId; private Connection _connection; private Session _session; @@ -134,9 +99,10 @@ public class QpidService implements SessionListener } } + public void exception(Session ssn, SessionException exc) { - LOGGER.error(exc, "session %s exception", ssn); + } public void closed(Session ssn) {} @@ -194,10 +160,7 @@ public class QpidService implements SessionListener _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT); _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT); - LOGGER.debug( - " : New subscription between queue %s and destination %s has been declared.", - queueName, - destinationName); + LOGGER.debug(Messages.QMAN_200025_SUBSCRIPTION_DECLARED,queueName,destinationName); } /** @@ -209,9 +172,7 @@ public class QpidService implements SessionListener public void removeSubscription(String destinationName) { _session.messageCancel(destinationName); - LOGGER.debug( - " : Subscription named %s has been removed from remote broker.", - destinationName); + LOGGER.debug(Messages.QMAN_200026_SUBSCRIPTION_REMOVED,destinationName); } /** @@ -223,7 +184,7 @@ public class QpidService implements SessionListener public void declareQueue(String queueName) { _session.queueDeclare(queueName, null, null); - LOGGER.debug(" : New queue with name %s has been declared.",queueName); + LOGGER.debug(Messages.QMAN_200027_QUEUE_DECLARED,queueName); } /** @@ -235,7 +196,7 @@ public class QpidService implements SessionListener public void deleteQueue(String queueName) { _session.queueDelete(queueName); - LOGGER.debug(" : Queue with name %s has been removed.",queueName); + LOGGER.debug(Messages.QMAN_200028_QUEUE_REMOVED,queueName); } /** @@ -249,10 +210,7 @@ public class QpidService implements SessionListener public void declareBinding(String queueName, String exchangeName, String routingKey) { _session.exchangeBind(queueName, exchangeName, routingKey, null); - LOGGER.debug( - " : New binding with %s as routing key has been declared between queue %s and exchange %s.", - routingKey,queueName, - exchangeName); + LOGGER.debug(Messages.QMAN_200029_BINDING_DECLARED,routingKey,queueName,exchangeName); } /** @@ -265,27 +223,7 @@ public class QpidService implements SessionListener public void declareUnbinding(String queueName, String exchangeName, String routingKey) { _session.exchangeUnbind(queueName, exchangeName, routingKey); - LOGGER.debug( - " : Binding with %s as routing key has been removed between queue %s and exchange %s.", - routingKey,queueName, - exchangeName); - } - - /** - * Sends a command message with the given data on the management queue. - * - * @param messageData the command message content. - */ - public void sendCommandMessage(byte [] messageData) - { - _session.messageTransfer( - Names.MANAGEMENT_EXCHANGE, - MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED, - Configuration.getInstance().getCommandMessageHeader(), - messageData); - - Log.logMessageContent (messageData); + LOGGER.debug(Messages.QMAN_200030_BINDING_REMOVED,routingKey,queueName,exchangeName); } /** @@ -293,17 +231,6 @@ public class QpidService implements SessionListener * * @param messageData the command message content. */ - public void sendCommandMessage(ByteBuffer messageData) - { - _session.messageTransfer( - Names.MANAGEMENT_EXCHANGE, - MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED, - Configuration.getInstance().getCommandMessageHeader(), - messageData); - - Log.logMessageContent (messageData); - } /** * Requests a schema for the given package.class.hash. @@ -348,6 +275,8 @@ public class QpidService implements SessionListener * @param objectId the object instance identifier. * @param parameters the parameters for this invocation. * @param method the method (definition) invoked. + * @param bankId the object bank identifier. + * @param brokerId the broker identifier. * @return the sequence number used for this message. * @throws MethodInvocationException when the invoked method returns an error code. * @throws UnableToComplyException when it wasn't possibile to invoke the requested operation. @@ -359,9 +288,11 @@ public class QpidService implements SessionListener final Binary objectId, final Object[] parameters, final QpidMethod method, - final int sequenceNumber) throws MethodInvocationException, UnableToComplyException + final int sequenceNumber, + final long bankId, + final long brokerId) throws MethodInvocationException, UnableToComplyException { - Message message = new MethodInvocationRequestMessage() + Message message = new MethodInvocationRequestMessage(bankId, brokerId) { @Override diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/SequenceNumberGenerator.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/SequenceNumberGenerator.java index 9915d565ab..e6d99971cd 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/SequenceNumberGenerator.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/SequenceNumberGenerator.java @@ -29,8 +29,13 @@ public class SequenceNumberGenerator { private static int sequenceNumber; + /** + * Returns a valid sequence number. + * + * @return a sequence number. + */ public static synchronized int getNextSequenceNumber() { return sequenceNumber++; } -} \ No newline at end of file +} diff --git a/java/management/client/src/main/java/org/apache/qpid/management/messages/AmqpCoDec.java b/java/management/client/src/main/java/org/apache/qpid/management/messages/AmqpCoDec.java index c7ab9b7fd2..a56d5be7b1 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/messages/AmqpCoDec.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/messages/AmqpCoDec.java @@ -3,11 +3,19 @@ package org.apache.qpid.management.messages; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; +/** + * AMQP Management messages codec. + * + * @author Andrea Gazzarini + */ public class AmqpCoDec { private byte [] _buffer; private int _position; + /** + * Builds a new codec. + */ AmqpCoDec() { _buffer = new byte [1000]; @@ -118,6 +126,19 @@ public class AmqpCoDec } } + public static final long unpack64(byte data[]) { + return ( + ((long) (data[0] & 0xff) << 56) | + ((long)(data[1] & 0xff) << 48) | + ((long)(data[2] & 0xff) << 40) | + ((long)(data[3] & 0xff) << 32) | + ((long)(data[4] & 0xff) << 24) | + ((long)(data[5] & 0xff) << 16) | + ((long)(data[6] & 0xff) << 8) | + (long) data[7] & 0xff); + } + + public void pack (byte[] bytes) { System.arraycopy(bytes, 0, _buffer, _position, bytes.length); diff --git a/java/management/client/src/main/java/org/apache/qpid/management/messages/MethodInvocationRequestMessage.java b/java/management/client/src/main/java/org/apache/qpid/management/messages/MethodInvocationRequestMessage.java index 85dfe915bc..3021e2f3d9 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/messages/MethodInvocationRequestMessage.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/messages/MethodInvocationRequestMessage.java @@ -20,25 +20,131 @@ */ package org.apache.qpid.management.messages; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.Protocol; +import org.apache.qpid.management.configuration.Configuration; import org.apache.qpid.management.domain.model.QpidMethod; import org.apache.qpid.management.domain.model.type.Binary; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.ReplyTo; +import org.apache.qpid.transport.util.Logger; +/** + * Abstract representation of a method invocation request message. + * Concrete subclasses must supply the values needed to build & encode the message. + * + * @author Andrea Gazzarini + */ public abstract class MethodInvocationRequestMessage extends ManagementMessage { + private final static Logger LOGGER = Logger.get(MethodInvocationRequestMessage.class); + + private DeliveryProperties _deliveryProperties; + private MessageProperties _messageProperties; + private Header _header; + + /** + * Builds a new method invocation request message with the given target identifiers. + * + * @param bankId the bank identifier. + * @param brokerId the broker identifier. + */ + public MethodInvocationRequestMessage(long bankId, long brokerId) + { + ReplyTo replyTo=new ReplyTo(); + replyTo.setRoutingKey(Configuration.getInstance().getMethodReplyQueueName()); + _messageProperties = new MessageProperties(); + _messageProperties.setReplyTo(replyTo); + + String routingKey = String.format("agent.%s.%s", brokerId,bankId); + + LOGGER.debug(Messages.QMAN_200032_COMMAND_MESSAGE_ROUTING_KEY, routingKey); + + _deliveryProperties = new DeliveryProperties(); + _deliveryProperties.setRoutingKey(routingKey); + _header = new Header(_deliveryProperties, _messageProperties); + } + @Override char opcode () { return Protocol.OPERATION_INVOCATION_REQUEST_OPCODE; } - + + /** + * Returns the package name. + * + * @return the package name. + */ protected abstract String packageName(); + + /** + * Returns the class name. + * + * @return the class name. + */ protected abstract String className(); + + /** + * Returns the schema hash. + * + * @return the schema hash. + */ protected abstract Binary schemaHash(); + + /** + * Returns the object identifier. + * + * @return the object identifier. + */ protected abstract Binary objectId(); + + /** + * Returns the method to be invoked. + * + * @return the method to be invoked. + */ protected abstract QpidMethod method(); + + /** + * Returns the parameters used for method invocation. + * + * @return the parameters used for method invocation. + */ protected abstract Object[] parameters(); + /** + * Returns the delivery properties of this message. + * + * @return the delivery properties of this message. + */ + public DeliveryProperties getDeliveryProperties () + { + return _deliveryProperties; + } + + /** + * Returns the header of this message. + * + * @return the header of this message. + */ + public Header getHeader () + { + return _header; + } + + /** + * Returns the messages header properties of this message. + * + * @return the message header properties of this message. + */ + public MessageProperties getMessageProperties () + { + return _messageProperties; + } + @Override void specificMessageEncoding () { @@ -51,4 +157,4 @@ public abstract class MethodInvocationRequestMessage extends ManagementMessage _codec.packStr8(method.getName()); method.encodeParameters(parameters(), _codec); } -} \ No newline at end of file +} diff --git a/java/management/client/src/main/java/org/apache/qpid/management/messages/SchemaRequestMessage.java b/java/management/client/src/main/java/org/apache/qpid/management/messages/SchemaRequestMessage.java index 6bbd97d9a4..aa596c8413 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/messages/SchemaRequestMessage.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/messages/SchemaRequestMessage.java @@ -23,6 +23,12 @@ package org.apache.qpid.management.messages; import org.apache.qpid.management.Protocol; import org.apache.qpid.management.domain.model.type.Binary; +/** + * Abstract representation of a schema request message. + * Concrete subclasses must supply the values needed to build & encode the message. + * + * @author Andrea Gazzarini + */ public abstract class SchemaRequestMessage extends ManagementMessage { @Override @@ -31,10 +37,25 @@ public abstract class SchemaRequestMessage extends ManagementMessage return Protocol.SCHEMA_REQUEST_OPCODE; } + /** + * Returns the package name. + * + * @return the package name. + */ protected abstract String packageName(); + /** + * Returns the class name. + * + * @return the class name. + */ protected abstract String className(); + /** + * Returns the schema hash. + * + * @return the schema hash. + */ protected abstract Binary schemaHash(); @Override @@ -44,4 +65,4 @@ public abstract class SchemaRequestMessage extends ManagementMessage _codec.packStr8(className()); schemaHash().encode(_codec); } -} \ No newline at end of file +} diff --git a/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidClassTest.java b/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidClassTest.java index 837810ea0d..fab35d4c59 100644 --- a/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidClassTest.java +++ b/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidClassTest.java @@ -1,6 +1,5 @@ package org.apache.qpid.management.domain.model; - import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -19,6 +18,11 @@ import org.apache.qpid.management.configuration.StubConfigurator; import org.apache.qpid.management.domain.handler.impl.MethodOrEventDataTransferObject; import org.apache.qpid.management.domain.model.QpidClass.QpidManagedObject; +/** + * Test case for Qpid Class. + * + * @author Andrea Gazzarini + */ public class QpidClassTest extends TestCase { private QpidClass _class; @@ -197,8 +201,6 @@ public class QpidClassTest extends TestCase TestConstants._1, true, TestConstants._1)); - - List> statisticDefinitions = new ArrayList>(2); _class.setSchema(propertyDefinitions, TestConstants.EMPTY_STATISTICS_SCHEMA, TestConstants.EMPTY_METHODS_SCHEMA); diff --git a/java/management/client/src/test/java/org/apache/qpid/management/domain/services/BrokerMessageListenerTest.java b/java/management/client/src/test/java/org/apache/qpid/management/domain/services/BrokerMessageListenerTest.java index 533c98c973..c489f7d767 100644 --- a/java/management/client/src/test/java/org/apache/qpid/management/domain/services/BrokerMessageListenerTest.java +++ b/java/management/client/src/test/java/org/apache/qpid/management/domain/services/BrokerMessageListenerTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.management.domain.services; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Random; import junit.framework.TestCase; @@ -167,4 +168,74 @@ public class BrokerMessageListenerTest extends TestCase _listener.onMessage(message); } + + /** + * Tests the execution of the onMessage() method when the incoming message is a compound message. + * + *
precondition : the incoming message is a compound message. + *
postcondition : each tokenized message is forwarded to the appropriate handler. + */ + public void testOnMessageOK_WithCompoundMessage() throws Exception + { + final Map handlersMap = new HashMap(); + char [] opcodes = {'a','b','c','d','e'}; + + class MockMessageHandler implements IMessageHandler + { + private final char _opcode; + + public MockMessageHandler(char opcode) + { + this._opcode = opcode; + } + + public void process (ManagementDecoder decoder, int sequenceNumber) + { + handlersMap.remove(_opcode); + } + + public void setDomainModel (DomainModel domainModel) + { + // Do nothing here. It's just a mock handler. + } + }; + + for (char opcode : opcodes) + { + handlersMap.put(opcode, new MockMessageHandler(opcode)); + } + + // Removes previously injected handlers (i.e. x & y) + _listener._handlers.clear(); + _listener.setHandlers(handlersMap); + + Message compoundMessage = createCompoundMessage(opcodes); + _listener.onMessage(compoundMessage); + + assertTrue(handlersMap.isEmpty()); + } + + // Creates a (non valid) compound message. + private Message createCompoundMessage(char[] opcodes) throws IOException { + byte [] compoundMessageData = new byte [12 * opcodes.length]; + Random randomizer = new Random(); + int position = 0; + + for (char opcode : opcodes) { + System.arraycopy(MessageTokenizer.MAGIC_NUMBER_BYTES, 0, compoundMessageData, position, MessageTokenizer.MAGIC_NUMBER_BYTES.length); + position+=MessageTokenizer.MAGIC_NUMBER_BYTES.length; + + compoundMessageData[position++] = (byte)opcode; + + for (int c = 4; c < 12; c++) + { + byte aByte = (byte)randomizer.nextInt(127); + compoundMessageData[position++] = aByte; + } + } + + Message compoundMessage = new ByteBufferMessage(); + compoundMessage.appendData(compoundMessageData); + return compoundMessage; + } } diff --git a/java/management/client/src/test/java/org/apache/qpid/management/domain/services/MessageTokenizerTest.java b/java/management/client/src/test/java/org/apache/qpid/management/domain/services/MessageTokenizerTest.java new file mode 100644 index 0000000000..754686bc2f --- /dev/null +++ b/java/management/client/src/test/java/org/apache/qpid/management/domain/services/MessageTokenizerTest.java @@ -0,0 +1,120 @@ +package org.apache.qpid.management.domain.services; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.*; +import java.util.Random; + +import junit.framework.TestCase; + +import org.apache.qpid.api.Message; +import org.apache.qpid.nclient.util.ByteBufferMessage; +import org.apache.qpid.transport.codec.ManagementDecoder; + +/** + * Tests case for messaeg tokenizer. + * + * @author Andrea Gazzarini + */ +public class MessageTokenizerTest extends TestCase { + + /** + * Tests the execution of the message tokenizer when the given message is not a valid AMQP message. + * + *
precondition : the incoming message is not a valid AMQP message. + *
postcondition : no exception is thrown and there will be exactly one token with the given message. + */ + public void testOK_WithNoMessage() throws IOException{ + byte [] noMessage = {2,10,120,23,23,23,4,10,11,12,2,1,3,-22}; + + Message multiMessage = new ByteBufferMessage(); + multiMessage.appendData(noMessage); + MessageTokenizer tokenizer = new MessageTokenizer(multiMessage); + + assertEquals(1, tokenizer.countTokens()); + assertEquals(tokenizer.nextElement(),noMessage); + assertFalse(tokenizer.hasMoreElements()); + } + + /** + * Tests the execution of the message tokenizer when the given message contains only one message. + * + *
precondition : the incoming message contains only one message. + *
postcondition : no exception is thrown and there will be exactly one token with the given message. + */ + public void testOK_WithOneMessage() throws IOException{ + byte [] oneEncodedMessage = {'A','M','2',23,23,23,4,10,11,12,2,1,3,-22}; + + Message multiMessage = new ByteBufferMessage(); + multiMessage.appendData(oneEncodedMessage); + MessageTokenizer tokenizer = new MessageTokenizer(multiMessage); + + assertEquals(1, tokenizer.countTokens()); + assertEquals(tokenizer.nextElement(),oneEncodedMessage); + assertFalse(tokenizer.hasMoreElements()); + } + + /** + * Tests the execution of the message tokenizer when the given message contains a random number of messages. + * + *
precondition : the incoming message contains a random number of messages. + *
postcondition : no exception is thrown and each built token is a valid message starting with right header. + */ + public void testOK_WithRandomNUmberOfMessages() throws IOException{ + Random randomizer = new Random(); + + int howManyLoops = randomizer.nextInt(10000); + byte [] compoundMessageData = new byte [12 * howManyLoops]; + + List messages = new ArrayList(howManyLoops); + + int position = 0; + for (int i = 0; i < howManyLoops; i++) + { + byte [] message = new byte[12]; + System.arraycopy(MessageTokenizer.MAGIC_NUMBER_BYTES, 0, compoundMessageData, position, MessageTokenizer.MAGIC_NUMBER_BYTES.length); + System.arraycopy(MessageTokenizer.MAGIC_NUMBER_BYTES, 0, message, 0, MessageTokenizer.MAGIC_NUMBER_BYTES.length); + position+=MessageTokenizer.MAGIC_NUMBER_BYTES.length; + + for (int c = 3; c < 12; c++) + { + byte aByte = (byte)randomizer.nextInt(127); + compoundMessageData[position++] = aByte; + message[c] = aByte; + } + messages.add(message); + } + + Message multiMessage = new ByteBufferMessage(); + multiMessage.appendData(compoundMessageData); + MessageTokenizer tokenizer = new MessageTokenizer(multiMessage); + + int howManyTokens = tokenizer.countTokens(); + assertEquals(howManyLoops, howManyTokens); + + int index = 0; + while (tokenizer.hasMoreElements()) + { + assertEquals(tokenizer.nextElement(),messages.get(index++)); + } + + assertEquals((index),howManyTokens); + } + + /** + * Internal method used for comparison of two messages. + * + * @param message the token message just built by the tokenizer. + * @param expected the expected result. + */ + private void assertEquals(Message message, byte [] expected) throws IOException + { + ByteBuffer messageContent = message.readData(); + ManagementDecoder decoder = new ManagementDecoder(); + decoder.init(messageContent); + byte [] content = decoder.readReaminingBytes(); + assertTrue(Arrays.equals(content, expected)); + } +} -- cgit v1.2.1