diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2008-10-24 14:09:26 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2008-10-24 14:09:26 +0000 |
| commit | f5997e515738fabd366a90d4352bfe7d9d60abb6 (patch) | |
| tree | a0ae54a95164846050fbab232c3a67648b47d56a | |
| parent | 38c31043f2dd69abba0b3d05e307a4b6d85ffd52 (diff) | |
| download | qpid-python-f5997e515738fabd366a90d4352bfe7d9d60abb6.tar.gz | |
qpid-1378: applied qman_24102008_compound_message_handling.patch that adds compound message handling improvements.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@707640 13f79535-47bb-0310-9956-ffa450edef68
29 files changed, 840 insertions, 312 deletions
diff --git a/java/management/client/src/main/java/org/apache/qpid/management/Messages.java b/java/management/client/src/main/java/org/apache/qpid/management/Messages.java new file mode 100644 index 0000000000..1448ab379e --- /dev/null +++ b/java/management/client/src/main/java/org/apache/qpid/management/Messages.java @@ -0,0 +1,94 @@ +package org.apache.qpid.management;
+
+/**
+ * Enumerative interfaces containing all QMan messages.
+ *
+ * @author Andrea Gazzarini
+ */
+public interface Messages
+{
+ // INFO
+ String QMAN_000001_STARTING_QMAN = "<QMAN-000001> : Starting Q-Man...";
+ String QMAN_000002_READING_CONFIGURATION = "<QMAN-000002> : Reading Q-Man configuration...";
+ String QMAN_000003_CREATING_MANAGEMENT_CLIENTS = "<QMAN-000003> : Creating management client(s)...";
+ String QMAN_000004_MANAGEMENT_CLIENT_CONNECTED = "<QMAN-000004> : Management client for broker %s successfully connected.";
+ String QMAN_000005_TYPE_MAPPING_CONFIGURED = "<QMAN-000005> : Type mapping : code = %s associated to %s (validator class is %s)";
+ String QMAN_000006_ACCESS_MODE_MAPPING_CONFIGURED = "<QMAN-000006> : Access Mode mapping : code = %s associated to %s";
+ String QMAN_000007_MANAGEMENT_HANDLER_MAPPING_CONFIGURED = "<QMAN-000007> : Management Queue Message Handler Mapping : opcode = %s associated with %s";
+ String QMAN_000008_METHOD_REPLY_HANDLER_MAPPING_CONFIGURED = "<QMAN-000008> : Method-Reply Queue Message Handler Mapping : opcode = %s associated with %s";
+ String QMAN_000009_BROKER_DATA_CONFIGURED = "<QMAN-000009> : Broker configuration %s: %s";
+ String QMAN_000010_INCOMING_SCHEMA = "<QMAN-000010> : Incoming schema for %s::%s.%s";
+ String QMAN_000011_SHUTDOWN_INITIATED = "<QMAN-000011> : The shutdown sequence has been initiated for management client connected with broker %s";
+ String QMAN_000012_MANAGEMENT_CLIENT_SHUT_DOWN = "<QMAN-000012> : Management client connected with broker %s shut down successfully.";
+ String QMAN_000013_METHOD_REPLY_CONSUMER_INSTALLED = "<QMAN-000013> : Method-reply queue consumer has been successfully installed and bound on broker %s.";
+ String QMAN_000014_MANAGEMENT_CONSUMER_INSTALLED ="<QMAN-000014> : Management queue consumer has been successfully installed and bound on broker %s.";
+ String QMAN_000015_MANAGEMENT_QUEUE_DECLARED = "<QMAN-000015> : Management queue with name %s has been successfully declared and bound on broker %s.";
+ String QMAN_000016_METHOD_REPLY_QUEUE_DECLARED = "<QMAN-000016> : Method-reply queue with name %s has been successfully declared and bound on broker %s.";
+ String QMAN_000017_CONSUMER_HAS_BEEN_REMOVED = "<QMAN-000017> : Consumer %s has been removed from broker %s.";
+ String QMAN_000018_QUEUE_UNDECLARED = "<QMAN-000018> : Queue %s has been removed from broker %s.";
+ String QMAN_000019_QMAN_STARTED = "<QMAN-000019> : Q-Man open for e-business.";
+ String QMAN_000020_SHUTTING_DOWN_QMAN = "<QMAN-000020> : Shutting down Q-Man...";
+ String QMAN_000021_SHUT_DOWN = "<QMAN-000021> : Q-Man shut down.";
+
+ // DEBUG
+ String QMAN_200001_INCOMING_MESSAGE_HAS_BEEN_RECEIVED = "<QMAN-200001> : New incoming message has been received. Message content is %s";
+ String QMAN_200002_OPCODE_HANDLER_ASSOCIATION = "<QMAN-200002> : \"%s\" opcode is associated to handler %s";
+ String QMAN_200003_MESSAGE_FORWARDING = "<QMAN-200003> : Incoming message with \"%s\" as opcode will be forwarded to %s for processing.";
+ String QMAN_200004_MANAGEMENT_QUEUE_NAME = "<QMAN-200004> : Management queue name : %s";
+ String QMAN_200005_METHOD_REPLY_QUEUE_NAME = "<QMAN-200005> : Method-reply queue name : %s";
+ String QMAN_200006_QPID_CONNECTION_RELEASED = "<QMAN-200006> : Connection %s returned to the pool.";
+ String QMAN_200007_TEST_CONNECTION_ON_RESERVE = "<QMAN-200007> : Test connection on reserve. Is valid? %s";
+ String QMAN_200008_CONNECTION_DESTROYED = "<QMAN-200008> : Connection has been destroyed.";
+ String QMAN_200009_CONNECTION_DESTROY_FAILURE = "<QMAN-200009> : Unable to destroy a connection object.";
+ String QMAN_200010_EVENT_MBEAN_REGISTERED = "<QMAN-200010> : Event instance %s::%s::%s successfully registered with MBean Server with name %s";
+ String QMAN_200011_OBJECT_MBEAN_REGISTERED = "<QMAN-200011> : Object instance %s::%s::%s:%s successfully registered with MBean Server with name %s";
+ String QMAN_200012_OBJECT_MBEAN_UNREGISTERED = "<QMAN-200012> : Object instance %s::%s::%s:%s successfully unregistered from MBean Server. Name was %s";
+ String QMAN_200013_ARGUMENT_VALUE_ENCODED = "<QMAN-200013> : Encoded value %S for argument %s. Type is %s";
+ String QMAN_200014_INCOMING_INSTRUMENTATION_DATA = "<QMAN-200014> : Incoming instrumentation data for %s::%s.%s.%s";
+ String QMAN_200015_INCOMING_CONFIGURATION_DATA = "<QMAN-200015> : Incoming configuration data for %s::%s.%s.%s";
+ String QMAN_200016_PROPERTY_DEFINITION_HAS_BEEN_BUILT = "<QMAN-200016> : Property definition for %s::%s.%s has been built.";
+ String QMAN_200017_STATISTIC_DEFINITION_HAS_BEEN_BUILT = "<QMAN-200017> : Statistic definition for %s::%s.%s has been built.";
+ String QMAN_200018_OPTIONAL_PROPERTIES_INFO = "<QMAN-200018> : Class %s::%s.%s has %s optional properties.";
+ String QMAN_200019_EVENT_ARGUMENT_DEFINITION_HAS_BEEN_BUILT = "<QMAN-200019> : Event argument definition for %s::%s.%s has been built.";
+ String QMAN_200020_ENTITY_DEFINITION_HAS_BEEN_BUILT = "<QMAN-200020> : Entity definition has been built (without schema) for %s::%s.%s";
+ String QMAN_200021_INCOMING_EVENT_DATA = "<QMAN-200021> : Incoming data for event %s::%s.%s";
+ String QMAN_200022_VALIDATOR_INSTALLED = "<QMAN-200022> : Validator %s for type %s successfully installed.";
+ String QMAN_200023_VALIDATOR_NOT_FOUND = "<QMAN-200023> : No validator was found for type %s. The default (empty) validator will be used.";
+ String QMAN_200024_MANAGEMENT_MESSAGE_HAS_BEEN_SENT = "<QMAN-200024> : Message has been sent to management exchange. Message content : %s";
+ String QMAN_200025_SUBSCRIPTION_DECLARED = "<QMAN-200025> : New subscription between queue %s and destination %s has been declared.";
+ String QMAN_200026_SUBSCRIPTION_REMOVED = "<QMAN-200026> : Subscription named %s has been removed from remote broker.";
+ String QMAN_200027_QUEUE_DECLARED = "<QMAN-200027> : New queue with name %s has been declared.";
+ String QMAN_200028_QUEUE_REMOVED= "<QMAN-200028> : New queue with name %s has been undeclared.";
+ String QMAN_200029_BINDING_DECLARED = "<QMAN-200029> : New binding with %s as routing key has been declared between queue %s and exchange %s.";
+ String QMAN_200030_BINDING_REMOVED = "<QMAN-200030> : Binding with %s as routing key has been removed between queue %s and exchange %s.";
+ String QMAN_200031_COMPOUND_MESSAGE_CONTAINS = "<QMAN-200031> : Incoming compound message contains %s message(s).";
+ String QMAN_200032_COMMAND_MESSAGE_ROUTING_KEY = "<QMAN-200032> : Command message routing key : %s";
+
+ // WARNING
+ String QMAN_300001_MESSAGE_DISCARDED = "<QMAN-300001> : No handler has been configured for processing messages with \"%s\" as opcode. Message will be discarded.";
+ String QMAN_300002_UNKNOWN_SEQUENCE_NUMBER = "<QMAN-300002> : Unable to deal with incoming message because it contains a unknown sequence number (%s).";
+
+ // ERROR
+ String QMAN_100001_BAD_MAGIC_NUMBER_FAILURE = "<QMAN-100001> : Message processing failure : incoming message contains a bad magic number (%s) and therefore will be discaded.";
+ String QMAN_100002_MESSAGE_READ_FAILURE = "<QMAN-100002> : Message I/O failure : unable to read byte message content and therefore it will be discarded.";
+ String QMAN_100003_MESSAGE_PROCESS_FAILURE = "<QMAN-100003> : Message processing failure : unknown exception; see logs for more details.";
+ String QMAN_100004_HANDLER_INITIALIZATION_FAILURE = "<QMAN-100004> : Message handler configured for opcode %s thrown an exception in initialization and therefore will be discarded.";
+ String QMAN_100005_CLASS_SCHEMA_PROCESSING_FAILURE = "<QMAN-100005> : Q-Man was unable to process the schema response message.";
+ String QMAN_100006_EVENT_SCHEMA_PROCESSING_FAILURE = "<QMAN-100006> : Q-Man was unable to process the schema response message.";
+ String QMAN_100007_UNABLE_TO_CONNECT_WITH_BROKER = "<QMAN-100007> : Unable to connect with broker located on %s. This broker will be ignored.";
+ String QMAN_100008_MANAGEMENT_MESSAGE_HANDLER_NOT_AVAILABLE = "<QMAN-100008> : Management Message Handler configured for opcode %s is not available and therefore will be discarded.";
+ String QMAN_100009_METHOD_REPLY_MESSAGE_HANDLER_NOT_AVAILABLE = "<QMAN-100009> :Method-Reply Message Handler configured for opcode %s is not available and therefore will be discarded.";
+ String QMAN_100010_METHOD_INVOCATION_RESULT_FAILURE = "<QMAN-100010> : an exception occurred while storing the result of a method invocation. Sequence number was %s";
+ String QMAN_100011_UNKNOWN_CLASS_KIND = "<QMAN-100011> : Unknwon class kind : %s).";
+ String QMAN_100012_SCHEMA_MESSAGE_PROCESSING_FAILURE = "<QMAN-100012> : Q-Man was unable to process the schema response message.";
+ String QMAN_100013_MBEAN_REGISTRATION_FAILURE = "<QMAN-100013> : Unable to unregister object instance %s.";
+ String QMAN_100014_ATTRIBUTE_DECODING_FAILURE = "<QMAN-100014> : Unable to decode value for attribute %s";
+ String QMAN_100015_UNABLE_TO_SEND_SCHEMA_REQUEST = "<QMAN-100015> : Unable to send a schema request schema for %s.%s";
+ String QMAN_100016_UNABLE_TO_DECODE_FEATURE_VALUE = "<QMAN-100016> : Unable to decode value for %s::%s::%s";
+ String QMAN_100017_UNABLE_TO_CONNECT = "<QMAN-100017>: Cannot connect to broker %s on %s";
+
+ // MESSAGES
+ String EVENT_SEVERITY_ATTRIBUTE_DESCRIPTION = "Severity level for this event.";
+ String EVENT_TIMESTAMP_ATTRIBUTE_DESCRIPTION = "Current timestamp of this event.";
+ String UNABLE_TO_STARTUP_CORRECTLY = "<QMAN-100002> : Q-Man was unable to startup correctly : a configuration error occurred.";
+}
diff --git a/java/management/client/src/main/java/org/apache/qpid/management/Names.java b/java/management/client/src/main/java/org/apache/qpid/management/Names.java index dde7715509..4fabb40640 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/Names.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/Names.java @@ -49,4 +49,5 @@ public interface Names String CONFIGURATION_FILE_NAME = "/org/apache/qpid/management/config.xml"; String ARG_COUNT_PARAM_NAME = "argCount"; + String DEFAULT_PARAM_NAME ="default"; } diff --git a/java/management/client/src/main/java/org/apache/qpid/management/Protocol.java b/java/management/client/src/main/java/org/apache/qpid/management/Protocol.java index 222b5d8aa6..a37332f1f0 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/Protocol.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/Protocol.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.management; -import com.sun.org.apache.xerces.internal.jaxp.validation.ErrorHandlerAdaptor; - /** * Protocol defined constants. * diff --git a/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java b/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java index bbb5380d7d..368970af00 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/configuration/BrokerConnectionDataParser.java @@ -22,6 +22,7 @@ package org.apache.qpid.management.configuration; import java.util.UUID; +import org.apache.qpid.management.Messages; import org.apache.qpid.transport.util.Logger; /** @@ -118,7 +119,7 @@ class BrokerConnectionDataParser implements IParser Configuration.getInstance().addBrokerConnectionData(getUUId(),_connectionData); } catch(Exception exception) { - LOGGER.error(exception, "Unable to connect with broker located on %s and. Hence this broker will be ignored.", _connectionData); + LOGGER.error(exception, Messages.QMAN_100007_UNABLE_TO_CONNECT_WITH_BROKER, _connectionData); } _connectionData = new BrokerConnectionData(); break; diff --git a/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java b/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java index 6b47c06510..2c27926f47 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java @@ -28,6 +28,7 @@ import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.Names; import org.apache.qpid.management.domain.handler.base.IMessageHandler; import org.apache.qpid.management.domain.handler.impl.InvocationResult; @@ -193,10 +194,7 @@ public final class Configuration result.put(opcode, (IMessageHandler)Class.forName(className).newInstance()); } catch(Exception exception) { - LOGGER.error( - exception, - "<QMAN-100020> : Management Message Handler configured for opcode %s is not available and therefore will be discarded.", - opcode); + LOGGER.error(exception,Messages.QMAN_100008_MANAGEMENT_MESSAGE_HANDLER_NOT_AVAILABLE,opcode); } } return result; @@ -222,10 +220,7 @@ public final class Configuration result.put(opcode, (IMessageHandler)Class.forName(className).newInstance()); } catch(Exception exception) { - LOGGER.error( - exception, - "<QMAN-100021> :Method-Reply Message Handler configured for opcode %s is not available and therefore will be discarded.", - opcode); + LOGGER.error(exception,Messages.QMAN_100009_METHOD_REPLY_MESSAGE_HANDLER_NOT_AVAILABLE,opcode); } } return result; @@ -273,7 +268,7 @@ public final class Configuration _typeMappings.put(code, type); _validators.put(type, validatorClassName); - LOGGER.info("<QMAN-000020> : Type mapping : code = %s associated to %s (validator class is %s)", code,type,validatorClassName); + LOGGER.info(Messages.QMAN_000005_TYPE_MAPPING_CONFIGURED, code,type,validatorClassName); } /** @@ -286,7 +281,7 @@ public final class Configuration AccessMode accessMode = mapping.getAccessMode(); _accessModes.put(code, accessMode); - LOGGER.info("<QMAN-000021> : Access Mode mapping : code = %s associated to %s", code,accessMode); + LOGGER.info(Messages.QMAN_000006_ACCESS_MODE_MAPPING_CONFIGURED, code,accessMode); } /** @@ -302,7 +297,7 @@ public final class Configuration String handlerClass = mapping.getMessageHandlerClass(); _managementQueueHandlers.put(opcode, handlerClass); - LOGGER.info("<QMAN-000022> : Management Queue Message Handler Mapping : opcode = %s associated with %s", opcode,handlerClass); + LOGGER.info(Messages.QMAN_000007_MANAGEMENT_HANDLER_MAPPING_CONFIGURED, opcode,handlerClass); } /** @@ -318,7 +313,7 @@ public final class Configuration String handlerClass = mapping.getMessageHandlerClass(); _methodReplyQueueHandlers.put(opcode, handlerClass); - LOGGER.info("<QMAN-000023> : Method-Reply Queue Message Handler Mapping : opcode = %s associated with %s", opcode,handlerClass); + LOGGER.info(Messages.QMAN_000008_METHOD_REPLY_HANDLER_MAPPING_CONFIGURED, opcode,handlerClass); } /** @@ -332,7 +327,8 @@ public final class Configuration { QpidDatasource.getInstance().addConnectionPool(brokerId, connectionData); _brokerConnectionInfos.put(brokerId,connectionData); - LOGGER.info("<QMAN-000024> : Broker Configuration %s: %s",brokerId,connectionData); + + LOGGER.info(Messages.QMAN_000009_BROKER_DATA_CONFIGURED,brokerId,connectionData); } /** @@ -358,7 +354,7 @@ public final class Configuration _managementQueueName = Names.MANAGEMENT_QUEUE_PREFIX+uuid; _methodReplyQueueName = Names.METHOD_REPLY_QUEUE_PREFIX+uuid; - LOGGER.debug("<QMAN-200021> : Management queue name : %s",_managementQueueName); - LOGGER.debug("<QMAN-000022> : Method-reply queue name : %s",_methodReplyQueueName); + LOGGER.debug(Messages.QMAN_200004_MANAGEMENT_QUEUE_NAME,_managementQueueName); + LOGGER.debug(Messages.QMAN_200005_METHOD_REPLY_QUEUE_NAME,_methodReplyQueueName); } -}
\ No newline at end of file +} diff --git a/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java b/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java index 1a51085ad3..21bdea59b5 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java @@ -28,6 +28,7 @@ import org.apache.commons.pool.BasePoolableObjectFactory; import org.apache.commons.pool.ObjectPool; import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.commons.pool.impl.GenericObjectPoolFactory; +import org.apache.qpid.management.Messages; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionException; import org.apache.qpid.transport.util.Logger; @@ -87,7 +88,8 @@ public final class QpidDatasource try { pools.get(_brokerId).returnObject(this); - LOGGER.debug("<QMAN-200012> : Connection %s returned to the pool.", this); + + LOGGER.debug(Messages.QMAN_200006_QPID_CONNECTION_RELEASED, this); } catch (Exception e) { @@ -150,7 +152,9 @@ public final class QpidDatasource { PooledConnection connection = (PooledConnection) obj; boolean isValid = connection.isValid(); - LOGGER.debug("<QMAN-200013> : Test connection on reserve. Is valid? %s",isValid); + + LOGGER.debug(Messages.QMAN_200007_TEST_CONNECTION_ON_RESERVE,isValid); + return isValid; } @@ -164,10 +168,11 @@ public final class QpidDatasource { PooledConnection connection = (PooledConnection) obj; connection.reallyClose(); - LOGGER.debug("<QMAN-200014> : Connection has been destroyed."); - } catch (Exception e) + + LOGGER.debug(Messages.QMAN_200008_CONNECTION_DESTROYED); + } catch (Exception exception) { - LOGGER.debug(e, "<QMAN-200015> : Unable to destroy a connection object"); + LOGGER.debug(exception, Messages.QMAN_200009_CONNECTION_DESTROY_FAILURE); } } } diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/MethodResponseMessageHandler.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/MethodResponseMessageHandler.java index ba24a8d4e6..303708a010 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/MethodResponseMessageHandler.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/MethodResponseMessageHandler.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.domain.handler.base.BaseMessageHandler; import org.apache.qpid.management.domain.model.DomainModel; import org.apache.qpid.management.domain.model.InvocationEvent; @@ -80,11 +81,7 @@ public class MethodResponseMessageHandler extends BaseMessageHandler exchangeChannel.put(result); } catch (InterruptedException exception) { - LOGGER.error( - exception, - "<QMAN-100044> : an exception occurred while storing the result of a method invocation. " + - "Sequence number was %s", - sequenceNumber); + LOGGER.error(exception,Messages.QMAN_100010_METHOD_INVOCATION_RESULT_FAILURE,sequenceNumber); } } else { @@ -106,4 +103,4 @@ public class MethodResponseMessageHandler extends BaseMessageHandler super.setDomainModel(domainModel); domainModel.setMethodInvocationListener(methodInvocationListener); } -}
\ No newline at end of file +} diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java index fdee2f2f2d..add7fd073b 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.Names; import org.apache.qpid.management.Protocol; import org.apache.qpid.management.domain.handler.base.BaseMessageHandler; @@ -39,12 +40,25 @@ import org.apache.qpid.transport.codec.ManagementDecoder; */ public class SchemaResponseMessageHandler extends BaseMessageHandler { + /** + * Behavioural interface for classes that are responsible to deal with schema messages. + * + * @author Andrea Gazzarini + */ interface IProcessor { + /** + * Processes the incoming message using the given decoder. + * + * @param decoder the decoder used for dealing with incoming message. + */ void process(ManagementDecoder decoder); } - final IProcessor classDefinitionProcessor = new IProcessor() + /** + * Processor responsible to deal with class schema related messages. + */ + final IProcessor _classSchemaProcessor = new IProcessor() { @Override public void process(ManagementDecoder decoder) @@ -69,13 +83,15 @@ public class SchemaResponseMessageHandler extends BaseMessageHandler getMethods(decoder, howManyMethods)); } catch(Exception exception) { - _logger.error(exception,"<QMAN-100007> : Q-Man was unable to process the schema response message."); + _logger.error(exception,Messages.QMAN_100005_CLASS_SCHEMA_PROCESSING_FAILURE); } - } }; - final IProcessor eventDefinitionProcessor = new IProcessor() + /** + * Processor responsible to deal with class event related messages. + */ + final IProcessor _eventSchemaProcessor = new IProcessor() { @Override public void process(ManagementDecoder decoder) @@ -94,7 +110,7 @@ public class SchemaResponseMessageHandler extends BaseMessageHandler getAttributes(decoder, howManyArguments)); } catch(Exception exception) { - _logger.error(exception,"<QMAN-100007> : Q-Man was unable to process the schema response message."); + _logger.error(exception,Messages.QMAN_100006_EVENT_SCHEMA_PROCESSING_FAILURE); } } }; @@ -115,22 +131,22 @@ public class SchemaResponseMessageHandler extends BaseMessageHandler { case Protocol.CLASS : { - classDefinitionProcessor.process(decoder); + _classSchemaProcessor.process(decoder); break; } case Protocol.EVENT : { - eventDefinitionProcessor.process(decoder); + _eventSchemaProcessor.process(decoder); break; } default : { - _logger.error("<QMAN-100035> : Q-Man was unable to process the schema response message (reason : unknown class kind %s).",classKind); + _logger.error(Messages.QMAN_100011_UNKNOWN_CLASS_KIND,classKind); } } } catch(Exception exception) { - _logger.error(exception,"<QMAN-100007> : Q-Man was unable to process the schema response message."); + _logger.error(exception,Messages.QMAN_100012_SCHEMA_MESSAGE_PROCESSING_FAILURE); } } @@ -167,7 +183,8 @@ public class SchemaResponseMessageHandler extends BaseMessageHandler int howManyArguments = (Integer) method.get(Names.ARG_COUNT_PARAM_NAME); List<Map<String,Object>> arguments = new ArrayList<Map<String,Object>>(howManyArguments); - for (int argIndex = 0; argIndex < howManyArguments; argIndex++){ + for (int argIndex = 0; argIndex < howManyArguments; argIndex++) + { arguments.add(decoder.readMap()); } result.add(new MethodOrEventDataTransferObject(method,arguments)); @@ -191,7 +208,8 @@ public class SchemaResponseMessageHandler extends BaseMessageHandler int howManyArguments = (Integer) method.get(Names.ARG_COUNT_PARAM_NAME); List<Map<String,Object>> arguments = new ArrayList<Map<String,Object>>(howManyArguments); - for (int argIndex = 0; argIndex < howManyArguments; argIndex++){ + for (int argIndex = 0; argIndex < howManyArguments; argIndex++) + { arguments.add(decoder.readMap()); } result.add(new MethodOrEventDataTransferObject(method,arguments)); diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/JmxService.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/JmxService.java index 09003fac9e..1e03815768 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/JmxService.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/JmxService.java @@ -28,6 +28,7 @@ import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.Names; import org.apache.qpid.management.domain.model.QpidClass.QpidManagedObject; import org.apache.qpid.management.domain.model.QpidEvent.QpidManagedEvent; @@ -58,7 +59,7 @@ class JmxService _mxServer.registerMBean(eventInstance, name); LOGGER.debug( - "<QMAN-200026> : Event instance %s::%s::%s successfully registered with MBean Server with name %s", + Messages.QMAN_200010_EVENT_MBEAN_REGISTERED, brokerId, packageName, eventClassName, @@ -95,7 +96,7 @@ class JmxService _mxServer.registerMBean(instance, name); LOGGER.debug( - "<QMAN-200026> : Object instance %s::%s::%s:%s successfully registered with MBean Server with name %s", + Messages.QMAN_200011_OBJECT_MBEAN_REGISTERED, brokerId, packageName, className, @@ -130,8 +131,7 @@ class JmxService _mxServer.unregisterMBean(name); LOGGER.debug( - "<QMAN-200012> : Object instance %s::%s::%s:%s successfully unregistered from MBean Server. " + - "Name was %s", + Messages.QMAN_200012_OBJECT_MBEAN_UNREGISTERED, brokerId, packageName, className, @@ -139,7 +139,7 @@ class JmxService name); } catch (Exception exception) { - LOGGER.error(exception,"<QMAN-100010> : Unable to unregister object instance %s.",name); + LOGGER.error(exception,Messages.QMAN_100013_MBEAN_REGISTRATION_FAILURE,name); } } } diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidArgument.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidArgument.java index 94d532571a..db3ddb97e7 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidArgument.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidArgument.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.management.domain.model; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.messages.AmqpCoDec; import org.apache.qpid.transport.codec.ManagementDecoder; import org.apache.qpid.transport.util.Logger; @@ -72,11 +73,11 @@ class QpidArgument extends QpidProperty public void encode(Object value,AmqpCoDec encoder) { _type.encode(value, encoder); - LOGGER.debug("<QMAN-200012> : Encoded value %S for argument %s. Type is %s",value,_name,_type); + LOGGER.debug(Messages.QMAN_200013_ARGUMENT_VALUE_ENCODED,value,_name,_type); } public Object decode(ManagementDecoder decoder) { return _type.decode(decoder); } -}
\ No newline at end of file +} diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidAttribute.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidAttribute.java index d0bc470de9..69b5cb0565 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidAttribute.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidAttribute.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.management.domain.model; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.domain.model.type.Type; import org.apache.qpid.transport.codec.ManagementDecoder; import org.apache.qpid.transport.util.Logger; @@ -57,7 +58,6 @@ class QpidAttribute extends QpidFeature void setUnit (String unit) { this._unit = unit; - LOGGER.debug("Unit : %s", unit); } /** @@ -90,8 +90,9 @@ class QpidAttribute extends QpidFeature { try { return _type.decode(decoder); - } catch(RuntimeException exception) { - LOGGER.error(exception,"Unable to decode value for attribute %s",this); + } catch(RuntimeException exception) + { + LOGGER.error(exception,Messages.QMAN_100014_ATTRIBUTE_DECODING_FAILURE,this); throw exception; } } @@ -101,4 +102,4 @@ class QpidAttribute extends QpidFeature { return super.toString()+",type="+_type; } -}
\ No newline at end of file +} diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java index a119b6c2b1..ec26ef819e 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java @@ -30,6 +30,7 @@ import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.management.Attribute; import javax.management.AttributeList; @@ -45,6 +46,7 @@ import javax.management.ObjectName; import javax.management.ReflectionException; import javax.management.RuntimeOperationsException; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.domain.handler.impl.IMethodInvocationListener; import org.apache.qpid.management.domain.handler.impl.InvocationResult; import org.apache.qpid.management.domain.handler.impl.MethodOrEventDataTransferObject; @@ -131,7 +133,7 @@ class QpidClass extends QpidEntity } catch (Exception e) { _logger.error( - "<QMAN-100012> : Unable to send a schema request schema for %s.%s", + Messages.QMAN_100015_UNABLE_TO_SEND_SCHEMA_REQUEST, _parent.getName(), _name); } finally { @@ -156,7 +158,7 @@ class QpidClass extends QpidEntity } catch (Exception e) { _logger.error( - "<QMAN-100012> : Unable to send a schema request schema for %s.%s", + Messages.QMAN_100015_UNABLE_TO_SEND_SCHEMA_REQUEST, _parent.getName(), _name); } finally { @@ -485,7 +487,8 @@ class QpidClass extends QpidEntity */ void addInstrumentationData (Binary objectId, byte[] rawData) { - _logger.debug("<QMAN-200015> : Incoming instrumentation data for %s::%s.%s.%s", + _logger.debug( + Messages.QMAN_200014_INCOMING_INSTRUMENTATION_DATA, _parent.getOwnerId(), _parent.getName(), _name, @@ -501,7 +504,8 @@ class QpidClass extends QpidEntity */ void addConfigurationData (Binary objectId, byte[] rawData) { - _logger.debug("<QMAN-200016> : Incoming configuration data for %s::%s.%s.%s", + _logger.debug( + Messages.QMAN_200015_INCOMING_CONFIGURATION_DATA, _parent.getOwnerId(), _parent.getName(), _name, @@ -523,7 +527,7 @@ class QpidClass extends QpidEntity List<Map<String, Object>> statisticDefinitions, List<MethodOrEventDataTransferObject> methodDefinitions) throws UnableToBuildFeatureException { - _logger.info("<QMAN-000012> : Incoming schema for %s::%s.%s",_parent.getOwnerId(),_parent.getName(),_name); + _logger.info(Messages.QMAN_000010_INCOMING_SCHEMA,_parent.getOwnerId(),_parent.getName(),_name); _state.setSchema(propertyDefinitions, statisticDefinitions, methodDefinitions); } @@ -557,7 +561,7 @@ class QpidClass extends QpidEntity attributes[index++]=(MBeanAttributeInfo) builder.getManagementFeature(); _logger.debug( - "<QMAN-200017> : Property definition for %s::%s.%s has been built.", + Messages.QMAN_200016_PROPERTY_DEFINITION_HAS_BEEN_BUILT, _parent.getName(), _name, property); @@ -566,7 +570,7 @@ class QpidClass extends QpidEntity _howManyPresenceBitMasks = (int)Math.ceil((double)howManyOptionalProperties / 8); _logger.debug( - "<QMAN-200018> : Class %s::%s.%s has %s optional properties.", + Messages.QMAN_200018_OPTIONAL_PROPERTIES_INFO, _parent.getOwnerId(), _parent.getName(), _name, @@ -583,7 +587,7 @@ class QpidClass extends QpidEntity attributes[index++]=(MBeanAttributeInfo) builder.getManagementFeature(); _logger.debug( - "<QMAN-200019> : Statistic definition for %s::%s.%s has been built.", + Messages.QMAN_200017_STATISTIC_DEFINITION_HAS_BEEN_BUILT, _parent.getName(), _name, statistic); @@ -656,10 +660,16 @@ class QpidClass extends QpidEntity int sequenceNumber = SequenceNumberGenerator.getNextSequenceNumber(); _methodInvocationListener.operationIsGoingToBeInvoked(new InvocationEvent(this,sequenceNumber,_exchangeChannelForMethodInvocations)); - _service.invoke(_parent.getName(), _name, _hash,objectId,parameters, method,sequenceNumber); - + _service.invoke(_parent.getName(), _name, _hash,objectId,parameters, method,sequenceNumber,objectId.getBankId(),objectId.getBrokerId()); + // TODO : Shoudl be configurable? InvocationResult result = _exchangeChannelForMethodInvocations.poll(5000,TimeUnit.MILLISECONDS); + + if (result == null) + { + throw new TimeoutException(); + } + Map<String, Object> output = method.decodeParameters(result.getOutputAndBidirectionalArgumentValues()); result.setOutputSection(output); @@ -694,7 +704,7 @@ class QpidClass extends QpidEntity Object value = property.decodeValue(decoder,presenceBitMasks); instance.createOrReplaceAttributeValue(property.getName(),value); } catch(Exception ignore) { - _logger.error("Unable to decode value for %s::%s::%s", _parent.getName(),_name,property.getName()); + _logger.error(Messages.QMAN_100016_UNABLE_TO_DECODE_FEATURE_VALUE, _parent.getName(),_name,property.getName()); } } } @@ -716,7 +726,7 @@ class QpidClass extends QpidEntity Object value = statistic.decodeValue(decoder); instance.createOrReplaceAttributeValue(statistic.getName(),value); } catch(Exception ignore) { - _logger.error("Unable to decode value for %s::%s::%s", _parent.getName(),_name,statistic.getName()); + _logger.error(Messages.QMAN_100016_UNABLE_TO_DECODE_FEATURE_VALUE, _parent.getName(),_name,statistic.getName()); } } } @@ -752,7 +762,6 @@ class QpidClass extends QpidEntity */ void releaseResources () { - // Chiamlo entityInstances<QpidManagedEntity> e mettilo nella superclasse? _objectInstances.clear(); JMX_SERVICE.unregisterObjectInstances(); _service.close(); diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidEntity.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidEntity.java index 62f469e53c..95db936329 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidEntity.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidEntity.java @@ -9,6 +9,7 @@ import javax.management.DynamicMBean; import javax.management.MBeanInfo;
import javax.management.RuntimeOperationsException;
+import org.apache.qpid.management.Messages;
import org.apache.qpid.management.domain.model.type.Binary;
import org.apache.qpid.management.domain.services.QpidService;
import org.apache.qpid.transport.util.Logger;
@@ -110,7 +111,7 @@ public abstract class QpidEntity this._service = new QpidService(_parent.getOwnerId());
_logger.debug(
- "<QMAN-200017> : Entity definition has been built (without schema) for %s::%s.%s",
+ Messages.QMAN_200020_ENTITY_DEFINITION_HAS_BEEN_BUILT,
_parent.getOwnerId(),
_parent.getName(),
_name);
diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidEvent.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidEvent.java index 98a05bf931..e6205bba76 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidEvent.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidEvent.java @@ -38,6 +38,7 @@ import javax.management.MBeanInfo; import javax.management.ReflectionException; import javax.management.RuntimeOperationsException; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.domain.model.type.Binary; import org.apache.qpid.transport.codec.ManagementDecoder; @@ -101,7 +102,7 @@ class QpidEvent extends QpidEntity } catch (Exception e) { _logger.error( - "<QMAN-100012> : Unable to send a schema request schema for %s.%s", + Messages.QMAN_100015_UNABLE_TO_SEND_SCHEMA_REQUEST, _parent.getName(), _name); } finally { @@ -306,7 +307,8 @@ class QpidEvent extends QpidEntity */ void addEventData (byte[] rawData, long currentTimestamp, int severity) { - _logger.debug("<QMAN-200015> : Incoming data for event %s::%s.%s", + _logger.debug( + Messages.QMAN_200021_INCOMING_EVENT_DATA, _parent.getOwnerId(), _parent.getName(), _name); @@ -324,7 +326,7 @@ class QpidEvent extends QpidEntity */ void setSchema (List<Map<String, Object>> argumentDefinitions) throws UnableToBuildFeatureException { - _logger.info("<QMAN-000012> : Incoming schema for %s::%s.%s",_parent.getOwnerId(),_parent.getName(),_name); + _logger.info(Messages.QMAN_000010_INCOMING_SCHEMA,_parent.getOwnerId(),_parent.getName(),_name); _state.setSchema(argumentDefinitions); } @@ -360,7 +362,7 @@ class QpidEvent extends QpidEntity attributes[index++]=(MBeanAttributeInfo) builder.getManagementFeature(); _logger.debug( - "<QMAN-200017> : Event argument definition for %s::%s.%s has been built.", + Messages.QMAN_200019_EVENT_ARGUMENT_DEFINITION_HAS_BEEN_BUILT, _parent.getName(), _name, argument); @@ -369,7 +371,7 @@ class QpidEvent extends QpidEntity attributes[index++] = new MBeanAttributeInfo( SEVERITY_ATTR_NAME, Integer.class.getName(), - "Severity level for this event.", + Messages.EVENT_SEVERITY_ATTRIBUTE_DESCRIPTION, true, false, false); @@ -377,11 +379,10 @@ class QpidEvent extends QpidEntity attributes[index++] = new MBeanAttributeInfo( TIMESTAMP_ATTR_NAME, Date.class.getName(), - "Current timestamp of this event.", + Messages.EVENT_TIMESTAMP_ATTRIBUTE_DESCRIPTION, true, false, false); - } /** @@ -416,7 +417,7 @@ class QpidEvent extends QpidEntity Object value = property.decodeValue(decoder); instance.createOrReplaceAttributeValue(property.getName(),value); } catch(Exception ignore) { - _logger.error("Unable to decode value for %s::%s::%s", _parent.getName(),_name,property.getName()); + _logger.error(Messages.QMAN_100016_UNABLE_TO_DECODE_FEATURE_VALUE, _parent.getName(),_name,property.getName()); } } } @@ -448,7 +449,8 @@ class QpidEvent extends QpidEntity * * @return true if there is one or more managed instances. */ - boolean hasNoInstances() { + boolean hasNoInstances() + { return _eventInstances.isEmpty(); } } diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidFeatureBuilder.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidFeatureBuilder.java index 23353bf37e..d0862c15ca 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidFeatureBuilder.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidFeatureBuilder.java @@ -34,6 +34,7 @@ import javax.management.MBeanParameterInfo; import org.apache.qpid.management.configuration.Configuration; import org.apache.qpid.management.configuration.UnknownTypeCodeException; import org.apache.qpid.management.domain.handler.impl.MethodOrEventDataTransferObject; +import org.apache.qpid.management.Names; /** * A builder used to parse incoming schema message and therefore to build a feature (property, statistic, method, event) @@ -240,7 +241,7 @@ class QpidFeatureBuilder for (Entry<String, Object> argumentAttribute : _featureDefinition.entrySet()) { String key = argumentAttribute.getKey(); - if ("default".equals(key)) + if (Names.DEFAULT_PARAM_NAME.equals(key)) { argument.setDefaultValue(argumentAttribute.getValue()); } else { @@ -307,7 +308,7 @@ class QpidFeatureBuilder throw new MissingFeatureAttributesException(_mandatoryAttributes); } - QpidMethod method = new QpidMethod((String)definition.get("name"),(String) definition.get("desc")); + QpidMethod method = new QpidMethod((String)definition.get(Attribute.name.name()),(String) definition.get(Attribute.desc.name())); List<Map<String,Object>> args = _methodOrEventDefinition.getArgumentsDefinitions(); diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidProperty.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidProperty.java index 8b2544af1c..6335a553ae 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidProperty.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidProperty.java @@ -22,6 +22,7 @@ package org.apache.qpid.management.domain.model; import java.lang.reflect.Constructor; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.configuration.Configuration; import org.apache.qpid.management.domain.model.type.Type; import org.apache.qpid.transport.codec.ManagementDecoder; @@ -172,10 +173,10 @@ class QpidProperty extends QpidAttribute Class<?> validatorClass = Class.forName(Configuration.getInstance().getValidatorClassName(type)); Constructor<?> validatorConstructor = validatorClass.getDeclaredConstructor(QpidProperty.class); _validator = (IValidator) validatorConstructor.newInstance(this); - LOGGER.debug("Validator %s for type %s successfully installed." ,validatorClass.getName(), type); + LOGGER.debug(Messages.QMAN_200022_VALIDATOR_INSTALLED ,validatorClass.getName(), type); } catch(Exception exception) { _validator = EMPTY_VALIDATOR; - LOGGER.debug("No validator was found for type %s. The default (empty) validator will be used." , type); + LOGGER.debug(Messages.QMAN_200023_VALIDATOR_NOT_FOUND , type); } } @@ -292,4 +293,4 @@ class QpidProperty extends QpidAttribute { return _decoder == _optionalPropertyDecoder; } -}
\ No newline at end of file +} diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Binary.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Binary.java index 4c0dd926d0..02b38d8499 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Binary.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/model/type/Binary.java @@ -35,9 +35,6 @@ import org.apache.qpid.management.messages.AmqpCoDec; public final class Binary implements Serializable { private static final long serialVersionUID = -6865585077320637567L; - - // instance identifider. - private final UUID uuid; // Marker internal (empty) interface private interface State extends Serializable{} @@ -53,7 +50,7 @@ public final class Binary implements Serializable @Override public int hashCode () { - hashCode = Arrays.hashCode(bytes); + hashCode = Arrays.hashCode(_bytes); state = hashCodeAlreadyComputed; return hashCode; } @@ -73,8 +70,10 @@ public final class Binary implements Serializable return hashCode; } }; - - private final byte [] bytes; + + private final UUID uuid; + private final byte [] _bytes; + private long _first; private int hashCode; /** Current state (hashcode computation). */ @@ -87,7 +86,10 @@ public final class Binary implements Serializable */ public Binary(byte [] bytes) { - this.bytes = bytes; + this._bytes = bytes; + byte [] array = new byte [8]; + System.arraycopy(_bytes, 0, array, 0, 8); + _first = AmqpCoDec.unpack64(array); uuid = UUID.randomUUID(); } @@ -103,7 +105,7 @@ public final class Binary implements Serializable try { Binary binary = (Binary)obj; - return Arrays.equals(bytes, binary.bytes); + return Arrays.equals(_bytes, binary._bytes); } catch (Exception exception) { return false; @@ -117,7 +119,7 @@ public final class Binary implements Serializable */ public void encode(AmqpCoDec encoder) { - encoder.pack(bytes); + encoder.pack(_bytes); } @Override @@ -125,4 +127,24 @@ public final class Binary implements Serializable { return uuid.toString(); } + + /** + * Returns the bank identifier derived from this object identifier. + * + * @return the bank identifier derived from this object identifier. + */ + public long getBankId() + { + return _first & 0x000000000FFFFFFF; + } + + /** + * Returns the broker identifier derived from this object identifier. + * + * @return the broker identifier derived from this object identifier. + */ + public long getBrokerId() + { + return (_first & 281474708275200L) >> 28; + } } diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/BrokerMessageListener.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/BrokerMessageListener.java index d5fdce09fc..4be48d1436 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/BrokerMessageListener.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/BrokerMessageListener.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Map.Entry; import org.apache.qpid.api.Message; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.Protocol; import org.apache.qpid.management.domain.handler.base.IMessageHandler; import org.apache.qpid.management.domain.model.DomainModel; @@ -56,10 +57,8 @@ class BrokerMessageListener implements MessageListener static void debugIncomingMessage(ByteBuffer message) { if (LOGGER.isDebugEnabled()) - { - LOGGER.debug( - "<QMAN-200006> : New incoming message has been received. Message content : %s", - Arrays.toString(message.array())); + { + LOGGER.debug(Messages.QMAN_200001_INCOMING_MESSAGE_HAS_BEEN_RECEIVED, Arrays.toString(message.array())); } } @@ -70,7 +69,7 @@ class BrokerMessageListener implements MessageListener { for (Entry<Character, IMessageHandler> entry : _handlers.entrySet()) { - LOGGER.debug("<QMAN-200007> : \"%s\" opcode is associated to handler %s",entry.getKey(),entry.getValue()); + LOGGER.debug(Messages.QMAN_200002_OPCODE_HANDLER_ASSOCIATION,entry.getKey(),entry.getValue()); } } } @@ -98,57 +97,22 @@ class BrokerMessageListener implements MessageListener * * @param message the incoming message. */ - public void onMessage (Message message) + public void onMessage (Message compoundMessage) { - try - { - ByteBuffer buffer = message.readData(); - - // TODO : Should be better...! - String magicNumber = new String(new byte[] {buffer.get(),buffer.get(),buffer.get()}); - if (!Protocol.MAGIC_NUMBER.equals(magicNumber)) - { - LOGGER.error( - "<QMAN-100003> : Message processing failure : incoming message contains a bad magic number (%s) " + - "and therefore will be discaded.", - magicNumber); - return; - } - - char opcode = (char)buffer.get(); - - IMessageHandler handler = _handlers.get(opcode); - if (handler != null) - { - ManagementDecoder decoder = new ManagementDecoder(); - decoder.init(buffer); - - LOGGER.debug( - "<QMAN-200008> : Incoming message with \"%s\" as opcode will be forwarded to %s for processing.", - opcode, - handler); - - handler.process(decoder,decoder.readSequenceNo()); - } else - { - LOGGER.warn( - "<QMAN-300001> : No handler has been configured for processing messages with \"%s\" as opcode. " + - "This message will be discarded.", - opcode); - - Log.debugConfiguredHandlers(_handlers); - } - } catch(IOException exception) + try + { + MessageTokenizer tokenizer = new MessageTokenizer(compoundMessage); + while (tokenizer.hasMoreElements()) + { + dispatch(tokenizer.nextElement()); + } + } catch(IOException exception) { - LOGGER.error( - exception, - "<QMAN-100004> : Message I/O failure : unable to read byte message content and therefore it will be discarded."); + LOGGER.error(exception,Messages.QMAN_100002_MESSAGE_READ_FAILURE); } catch(Exception exception) - { - LOGGER.error( - exception, - "<QMAN-100005> : Message processing failure : unknown exception; see logs for more details."); - } + { + LOGGER.error(exception,Messages.QMAN_100003_MESSAGE_PROCESS_FAILURE); + } } /** @@ -171,11 +135,44 @@ class BrokerMessageListener implements MessageListener handler.setDomainModel(_domainModel); _handlers.put(opcode, handler); } catch(Exception exception) { - LOGGER.error( - exception, - "<QMAN-100006> : Message handler configured for opcode %s thrown an exception in initialization and therefore will be discarded.", - opcode); + LOGGER.error(exception,Messages.QMAN_100004_HANDLER_INITIALIZATION_FAILURE, opcode); } } } -}
\ No newline at end of file + + /** + * Dispatches the given message to the appropriate handler. + * + * @param message the incoming message. + * @throws IOException when the message content cannot be read. + */ + private void dispatch(Message message) throws IOException + { + ByteBuffer buffer = message.readData(); + + // TODO : Should be better...! + String magicNumber = new String(new byte[] {buffer.get(),buffer.get(),buffer.get()}); + if (!Protocol.MAGIC_NUMBER.equals(magicNumber)) + { + LOGGER.error(Messages.QMAN_100001_BAD_MAGIC_NUMBER_FAILURE,magicNumber); + return; + } + + char opcode = (char)buffer.get(); + + IMessageHandler handler = _handlers.get(opcode); + if (handler != null) + { + ManagementDecoder decoder = new ManagementDecoder(); + decoder.init(buffer); + + LOGGER.debug(Messages.QMAN_200003_MESSAGE_FORWARDING,opcode,handler); + + handler.process(decoder,decoder.readSequenceNo()); + } else + { + LOGGER.warn(Messages.QMAN_300001_MESSAGE_DISCARDED,opcode); + Log.debugConfiguredHandlers(_handlers); + } + } +} diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/ManagementClient.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/ManagementClient.java index d4535596e9..a3584571f3 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/ManagementClient.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/ManagementClient.java @@ -23,6 +23,7 @@ package org.apache.qpid.management.domain.services; import java.util.UUID; import org.apache.qpid.QpidException; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.Names; import org.apache.qpid.management.configuration.BrokerConnectionData; import org.apache.qpid.management.configuration.Configuration; @@ -102,9 +103,7 @@ final class ManagementClient */ void shutdown () { - LOGGER.info( - "<QMAN-000033> : The shutdown sequence has been initiated for management client connected with broker %s", - _domainModel.getBrokerId()); + LOGGER.info(Messages.QMAN_000011_SHUTDOWN_INITIATED,_domainModel.getBrokerId()); removeMethodReplyConsumer(); destroyAndUnbingMethodReplyQueue(); @@ -116,9 +115,7 @@ final class ManagementClient _service.close(); - LOGGER.info( - "<QMAN-000034> : Management client connected with broker %s shut down successfully.", - _domainModel.getBrokerId()); + LOGGER.info(Messages.QMAN_000012_MANAGEMENT_CLIENT_SHUT_DOWN,_domainModel.getBrokerId()); } /** @@ -129,9 +126,8 @@ final class ManagementClient BrokerMessageListener methodReplyChannelListener = new BrokerMessageListener(_domainModel); methodReplyChannelListener.setHandlers(Configuration.getInstance().getMethodReplyQueueHandlers()); _service.createSubscription(_methodReplyQueueName, _methodReplyQueueName, methodReplyChannelListener); - LOGGER.info( - "<QMAN-000039> : Method-reply queue consumer has been successfully installed and bound on broker %s.", - _domainModel.getBrokerId()); + + LOGGER.info(Messages.QMAN_000013_METHOD_REPLY_CONSUMER_INSTALLED, _domainModel.getBrokerId()); } /** @@ -143,9 +139,7 @@ final class ManagementClient managementChannelListener.setHandlers(Configuration.getInstance().getManagementQueueHandlers()); _service.createSubscription(_managementQueueName, _managementQueueName, managementChannelListener); - LOGGER.info( - "<QMAN-000038> : Management queue consumer has been successfully installed and bound on broker %s.", - _domainModel.getBrokerId()); + LOGGER.info(Messages.QMAN_000014_MANAGEMENT_CONSUMER_INSTALLED, _domainModel.getBrokerId()); } /** @@ -159,10 +153,7 @@ final class ManagementClient Names.MANAGEMENT_EXCHANGE, Names.MANAGEMENT_ROUTING_KEY); - LOGGER.info( - "<QMAN-000037> : Management queue with name %s has been successfully declared and bound on broker %s.", - _managementQueueName, - _domainModel.getBrokerId()); + LOGGER.info(Messages.QMAN_000015_MANAGEMENT_QUEUE_DECLARED,_managementQueueName,_domainModel.getBrokerId()); } /** @@ -174,10 +165,7 @@ final class ManagementClient _service.declareQueue(_methodReplyQueueName); _service.declareBinding(_methodReplyQueueName, Names.AMQ_DIRECT_QUEUE, _methodReplyQueueName); - LOGGER.info( - "<QMAN-000037> : Method-reply queue with name %s has been successfully declared and bound on broker %s.", - _methodReplyQueueName, - _domainModel.getBrokerId()); + LOGGER.info(Messages.QMAN_000016_METHOD_REPLY_QUEUE_DECLARED,_methodReplyQueueName, _domainModel.getBrokerId()); } /** @@ -187,10 +175,7 @@ final class ManagementClient { _service.removeSubscription(_methodReplyQueueName); - LOGGER.info( - "<QMAN-000034> : Consumer %s has been removed from broker %s.", - _methodReplyQueueName, - _domainModel.getBrokerId()); + LOGGER.info(Messages.QMAN_000017_CONSUMER_HAS_BEEN_REMOVED,_methodReplyQueueName,_domainModel.getBrokerId()); } /** @@ -201,10 +186,7 @@ final class ManagementClient _service.declareUnbinding(_methodReplyQueueName, Names.AMQ_DIRECT_QUEUE, _methodReplyQueueName); _service.deleteQueue(_methodReplyQueueName); - LOGGER.info( - "<QMAN-000035> : Queue %s has been removed from broker %s.", - _methodReplyQueueName, - _domainModel.getBrokerId()); + LOGGER.info(Messages.QMAN_000018_QUEUE_UNDECLARED,_methodReplyQueueName,_domainModel.getBrokerId()); } /** @@ -214,10 +196,7 @@ final class ManagementClient { _service.removeSubscription(_managementQueueName); - LOGGER.info( - "<QMAN-000036> : Consumer %s has been removed from broker %s.", - _managementQueueName, - _domainModel.getBrokerId()); + LOGGER.info(Messages.QMAN_000017_CONSUMER_HAS_BEEN_REMOVED,_managementQueueName,_domainModel.getBrokerId()); } /** @@ -228,10 +207,7 @@ final class ManagementClient _service.declareUnbinding(_managementQueueName, Names.MANAGEMENT_EXCHANGE, Names.MANAGEMENT_ROUTING_KEY); _service.deleteQueue(_managementQueueName); - LOGGER.info( - "<QMAN-000037> : Queue %s has been removed from broker %s.", - _managementQueueName, - _domainModel.getBrokerId()); + LOGGER.info(Messages.QMAN_000018_QUEUE_UNDECLARED, _managementQueueName,_domainModel.getBrokerId()); } /** @@ -252,4 +228,4 @@ final class ManagementClient { _service.sync(); } -}
\ No newline at end of file +} diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/MessageTokenizer.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/MessageTokenizer.java new file mode 100644 index 0000000000..623d74e737 --- /dev/null +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/MessageTokenizer.java @@ -0,0 +1,133 @@ +package org.apache.qpid.management.domain.services;
+
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.qpid.api.Message;
+import org.apache.qpid.management.Messages;
+import org.apache.qpid.management.Protocol;
+import org.apache.qpid.nclient.util.ByteBufferMessage;
+import org.apache.qpid.transport.codec.ManagementDecoder;
+import org.apache.qpid.transport.util.Logger;
+
+/**
+ * The message tokenizer class allows a multi message listener to break a
+ * message into tokens where each token is itself a valid AMQP message.
+ *
+ * @author Andrea Gazzarini
+ * @see QPID-1368
+ */
+class MessageTokenizer implements Enumeration<Message>
+{
+ private final static Logger LOGGER = Logger.get(MessageTokenizer.class);
+
+ static byte [] MAGIC_NUMBER_BYTES;
+
+ private LinkedList<Message> _messages = new LinkedList<Message>();
+ private Iterator<Message> _iterator;
+
+ static
+ {
+ try
+ {
+ MAGIC_NUMBER_BYTES = Protocol.MAGIC_NUMBER.getBytes("UTF-8");
+ } catch(Exception exception)
+ {
+ throw new ExceptionInInitializerError(exception);
+ }
+ }
+
+ /**
+ * Builds a new Message tokenizer with the given message.
+ * Note that if the given message is not a "compound" message this tokenizer will producer only one token;
+ * That is, the token is a message equals to the given message.
+ *
+ * @param compoundMessage the compound message
+ * @throws IOException when it's not possible to read the given message content.
+ */
+ MessageTokenizer(Message compoundMessage) throws IOException
+ {
+ build(compoundMessage);
+ }
+
+ @Override
+ public boolean hasMoreElements()
+ {
+ return _iterator.hasNext();
+ }
+
+ @Override
+ public Message nextElement()
+ {
+ return _iterator.next();
+ }
+
+ /**
+ * Retruns the number of the tokens produced by this tokenizer.
+ *
+ * @return the number of the tokens produced by this tokenizer.
+ */
+ public int countTokens()
+ {
+ return _messages.size();
+ }
+
+ // Internal methods used for splitting the multi message byte array.
+ int indexOf(byte[] source, int startIndex)
+ {
+ int currentSourceIndex;
+ int currentExampleIndex;
+
+ if (startIndex + 3 > source.length)
+ return -1;
+
+ for (currentSourceIndex = startIndex; currentSourceIndex <= source.length - 3; currentSourceIndex++)
+ {
+ for (currentExampleIndex = 0; currentExampleIndex < 3; currentExampleIndex++)
+ {
+ if (source[currentSourceIndex + currentExampleIndex] != MAGIC_NUMBER_BYTES[currentExampleIndex])
+ break;
+ }
+
+ if (currentExampleIndex == 3)
+ return currentSourceIndex;
+ }
+ return -1;
+ }
+
+ // Internal method used for building the tokens.
+ private void build(Message compoundMessage) throws IOException
+ {
+ int startIndex = 0;
+ int indexOfMagicNumber = 0;
+
+ ManagementDecoder decoder = new ManagementDecoder();
+ decoder.init(compoundMessage.readData());
+ byte [] source = decoder.readReaminingBytes();
+
+ int howManyTokens = 1;
+
+ while ((indexOfMagicNumber = indexOf(source, startIndex+1)) != -1)
+ {
+ addMessageToken(source, startIndex, (indexOfMagicNumber-startIndex));
+ startIndex = indexOfMagicNumber;
+ howManyTokens++;
+ }
+ addMessageToken(source, startIndex, (source.length-startIndex));
+ _iterator = _messages.iterator();
+
+ LOGGER.debug(Messages.QMAN_200031_COMPOUND_MESSAGE_CONTAINS,howManyTokens);
+ };
+
+ // Builds & adds a new "message" token
+ private void addMessageToken(byte [] source,int startIndex,int length) throws IOException
+ {
+ byte [] messageData = new byte[length];
+ System.arraycopy(source, startIndex, messageData, 0, messageData.length);
+ Message message = new ByteBufferMessage();
+ message.appendData(messageData);
+ _messages.add(message);
+ }
+}
diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QMan.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QMan.java index ce700aa283..f2a4ea54a0 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QMan.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QMan.java @@ -29,6 +29,7 @@ import java.util.UUID; import java.util.Map.Entry; import org.apache.log4j.xml.DOMConfigurator; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.configuration.BrokerConnectionData; import org.apache.qpid.management.configuration.Configuration; import org.apache.qpid.management.configuration.ConfigurationException; @@ -51,8 +52,8 @@ public class QMan */ void start() throws StartupFailureException { - LOGGER.info("<QMAN-000001> : Starting Q-Man..."); - LOGGER.info("<QMAN-000002> : Reading Q-Man configuration..."); + LOGGER.info(Messages.QMAN_000001_STARTING_QMAN); + LOGGER.info(Messages.QMAN_000002_READING_CONFIGURATION); addShutDownHook(); @@ -60,7 +61,7 @@ public class QMan try { configurator.configure(); - LOGGER.info("<QMAN-000003> : Creating management client(s)..."); + LOGGER.info(Messages.QMAN_000003_CREATING_MANAGEMENT_CLIENTS); for (Entry<UUID, BrokerConnectionData> entry : Configuration.getInstance().getConnectionInfos()) { UUID brokerId = entry.getKey(); @@ -71,28 +72,16 @@ public class QMan managementClients.add(client); client.estabilishFirstConnectionWithBroker(); - LOGGER.info("<QMAN-000004> : Management client for broker %s successfully connected.",brokerId); + LOGGER.info(Messages.QMAN_000004_MANAGEMENT_CLIENT_CONNECTED,brokerId); } catch(StartupFailureException exception) { - LOGGER.error(exception, "<QMAN-100001>: Cannot connect to broker %s on %s",brokerId,data); + LOGGER.error(exception, Messages.QMAN_100017_UNABLE_TO_CONNECT,brokerId,data); } } - LOGGER.info("<QMAN-000004> : Q-Man open for e-business."); - - // TODO : console enhancement (i.e. : connect another broker) - System.out.println("Type \"q\" to quit."); - BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); - while ( !"q".equals(reader.readLine()) ){ - } + LOGGER.info(Messages.QMAN_000019_QMAN_STARTED); } catch(ConfigurationException exception) { - LOGGER.error( - exception, - "<QMAN-100002> : Q-Man was unable to startup correctly : a configuration error occurred."); - System.exit(1); + LOGGER.error(exception,Messages.UNABLE_TO_STARTUP_CORRECTLY); + throw new StartupFailureException(exception); } - catch(IOException exception) - { - throw new StartupFailureException(exception); - } } /** @@ -101,11 +90,12 @@ public class QMan private void addShutDownHook() { // SHUTDOWN HOOK - Runtime.getRuntime().addShutdownHook(new Thread(){ + Runtime.getRuntime().addShutdownHook(new Thread() + { @Override public void run () { - LOGGER.info("<QMAN-000006> : Shutting down Q-Man..."); + LOGGER.info(Messages.QMAN_000020_SHUTTING_DOWN_QMAN); try { for (ManagementClient client : managementClients) @@ -114,9 +104,8 @@ public class QMan } } catch(Exception exception) { - } - LOGGER.info("<QMAN-000007> : Q-Man shut down."); + LOGGER.info(Messages.QMAN_000021_SHUT_DOWN); } }); } @@ -134,18 +123,26 @@ public class QMan DOMConfigurator.configureAndWatch(logFileName,5000); } - new Thread() - { - public void run() - { - QMan qman = new QMan(); - try - { - qman.start(); - } catch (StartupFailureException exception) { - exception.printStackTrace(); - } - } - }.start(); + QMan qman = new QMan(); + try + { + qman.start(); + + // TODO : console enhancement (i.e. : connect another broker) + System.out.println("Type \"q\" to quit."); + BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); + while ( !"q".equals(reader.readLine()) ) + { + + } + System.exit(-1); + } catch (StartupFailureException exception) + { + exception.printStackTrace(); + System.exit(-1); + } catch (IOException exception) + { + System.exit(-1); + } } } diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java index d9500ae2dd..a12993d40e 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java @@ -21,16 +21,14 @@ package org.apache.qpid.management.domain.services; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import org.apache.qpid.QpidException; import org.apache.qpid.api.Message; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.Names; -import org.apache.qpid.management.configuration.Configuration; import org.apache.qpid.management.configuration.QpidDatasource; import org.apache.qpid.management.domain.model.QpidMethod; import org.apache.qpid.management.domain.model.type.Binary; @@ -58,39 +56,6 @@ public class QpidService implements SessionListener { private final static Logger LOGGER = Logger.get(QpidService.class); - // Inner static class used for logging and avoid conditional logic (isDebugEnabled()) duplication. - private static class Log - { - /** - * Logs the content f the message. - * This will be written on log only if DEBUG level is enabled. - * - * @param messageContent the raw content of the message. - */ - static void logMessageContent(byte [] messageContent) - { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "<QMAN-200001> : Message has been sent to management exchange. Message content : %s", - Arrays.toString(messageContent)); - } - } - - /** - * Logs the content f the message. - * This will be written on log only if DEBUG level is enabled. - * - * @param messageContent the raw content of the message. - */ - static void logMessageContent(ByteBuffer messageContent) - { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "<QMAN-200002> : Message has been sent to management exchange."); - } - } - } - private UUID _brokerId; private Connection _connection; private Session _session; @@ -134,9 +99,10 @@ public class QpidService implements SessionListener } } + public void exception(Session ssn, SessionException exc) { - LOGGER.error(exc, "session %s exception", ssn); + } public void closed(Session ssn) {} @@ -194,10 +160,7 @@ public class QpidService implements SessionListener _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT); _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT); - LOGGER.debug( - "<QMAN-200003> : New subscription between queue %s and destination %s has been declared.", - queueName, - destinationName); + LOGGER.debug(Messages.QMAN_200025_SUBSCRIPTION_DECLARED,queueName,destinationName); } /** @@ -209,9 +172,7 @@ public class QpidService implements SessionListener public void removeSubscription(String destinationName) { _session.messageCancel(destinationName); - LOGGER.debug( - "<QMAN-200026> : Subscription named %s has been removed from remote broker.", - destinationName); + LOGGER.debug(Messages.QMAN_200026_SUBSCRIPTION_REMOVED,destinationName); } /** @@ -223,7 +184,7 @@ public class QpidService implements SessionListener public void declareQueue(String queueName) { _session.queueDeclare(queueName, null, null); - LOGGER.debug("<QMAN-200004> : New queue with name %s has been declared.",queueName); + LOGGER.debug(Messages.QMAN_200027_QUEUE_DECLARED,queueName); } /** @@ -235,7 +196,7 @@ public class QpidService implements SessionListener public void deleteQueue(String queueName) { _session.queueDelete(queueName); - LOGGER.debug("<QMAN-2000025> : Queue with name %s has been removed.",queueName); + LOGGER.debug(Messages.QMAN_200028_QUEUE_REMOVED,queueName); } /** @@ -249,10 +210,7 @@ public class QpidService implements SessionListener public void declareBinding(String queueName, String exchangeName, String routingKey) { _session.exchangeBind(queueName, exchangeName, routingKey, null); - LOGGER.debug( - "<QMAN-200005> : New binding with %s as routing key has been declared between queue %s and exchange %s.", - routingKey,queueName, - exchangeName); + LOGGER.debug(Messages.QMAN_200029_BINDING_DECLARED,routingKey,queueName,exchangeName); } /** @@ -265,27 +223,7 @@ public class QpidService implements SessionListener public void declareUnbinding(String queueName, String exchangeName, String routingKey) { _session.exchangeUnbind(queueName, exchangeName, routingKey); - LOGGER.debug( - "<QMAN-200005> : Binding with %s as routing key has been removed between queue %s and exchange %s.", - routingKey,queueName, - exchangeName); - } - - /** - * Sends a command message with the given data on the management queue. - * - * @param messageData the command message content. - */ - public void sendCommandMessage(byte [] messageData) - { - _session.messageTransfer( - Names.MANAGEMENT_EXCHANGE, - MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED, - Configuration.getInstance().getCommandMessageHeader(), - messageData); - - Log.logMessageContent (messageData); + LOGGER.debug(Messages.QMAN_200030_BINDING_REMOVED,routingKey,queueName,exchangeName); } /** @@ -293,17 +231,6 @@ public class QpidService implements SessionListener * * @param messageData the command message content. */ - public void sendCommandMessage(ByteBuffer messageData) - { - _session.messageTransfer( - Names.MANAGEMENT_EXCHANGE, - MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED, - Configuration.getInstance().getCommandMessageHeader(), - messageData); - - Log.logMessageContent (messageData); - } /** * Requests a schema for the given package.class.hash. @@ -348,6 +275,8 @@ public class QpidService implements SessionListener * @param objectId the object instance identifier. * @param parameters the parameters for this invocation. * @param method the method (definition) invoked. + * @param bankId the object bank identifier. + * @param brokerId the broker identifier. * @return the sequence number used for this message. * @throws MethodInvocationException when the invoked method returns an error code. * @throws UnableToComplyException when it wasn't possibile to invoke the requested operation. @@ -359,9 +288,11 @@ public class QpidService implements SessionListener final Binary objectId, final Object[] parameters, final QpidMethod method, - final int sequenceNumber) throws MethodInvocationException, UnableToComplyException + final int sequenceNumber, + final long bankId, + final long brokerId) throws MethodInvocationException, UnableToComplyException { - Message message = new MethodInvocationRequestMessage() + Message message = new MethodInvocationRequestMessage(bankId, brokerId) { @Override diff --git a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/SequenceNumberGenerator.java b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/SequenceNumberGenerator.java index 9915d565ab..e6d99971cd 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/domain/services/SequenceNumberGenerator.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/domain/services/SequenceNumberGenerator.java @@ -29,8 +29,13 @@ public class SequenceNumberGenerator { private static int sequenceNumber; + /** + * Returns a valid sequence number. + * + * @return a sequence number. + */ public static synchronized int getNextSequenceNumber() { return sequenceNumber++; } -}
\ No newline at end of file +} diff --git a/java/management/client/src/main/java/org/apache/qpid/management/messages/AmqpCoDec.java b/java/management/client/src/main/java/org/apache/qpid/management/messages/AmqpCoDec.java index c7ab9b7fd2..a56d5be7b1 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/messages/AmqpCoDec.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/messages/AmqpCoDec.java @@ -3,11 +3,19 @@ package org.apache.qpid.management.messages; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; +/** + * AMQP Management messages codec. + * + * @author Andrea Gazzarini + */ public class AmqpCoDec { private byte [] _buffer; private int _position; + /** + * Builds a new codec. + */ AmqpCoDec() { _buffer = new byte [1000]; @@ -118,6 +126,19 @@ public class AmqpCoDec } } + public static final long unpack64(byte data[]) { + return ( + ((long) (data[0] & 0xff) << 56) | + ((long)(data[1] & 0xff) << 48) | + ((long)(data[2] & 0xff) << 40) | + ((long)(data[3] & 0xff) << 32) | + ((long)(data[4] & 0xff) << 24) | + ((long)(data[5] & 0xff) << 16) | + ((long)(data[6] & 0xff) << 8) | + (long) data[7] & 0xff); + } + + public void pack (byte[] bytes) { System.arraycopy(bytes, 0, _buffer, _position, bytes.length); diff --git a/java/management/client/src/main/java/org/apache/qpid/management/messages/MethodInvocationRequestMessage.java b/java/management/client/src/main/java/org/apache/qpid/management/messages/MethodInvocationRequestMessage.java index 85dfe915bc..3021e2f3d9 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/messages/MethodInvocationRequestMessage.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/messages/MethodInvocationRequestMessage.java @@ -20,25 +20,131 @@ */ package org.apache.qpid.management.messages; +import org.apache.qpid.management.Messages; import org.apache.qpid.management.Protocol; +import org.apache.qpid.management.configuration.Configuration; import org.apache.qpid.management.domain.model.QpidMethod; import org.apache.qpid.management.domain.model.type.Binary; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.ReplyTo; +import org.apache.qpid.transport.util.Logger; +/** + * Abstract representation of a method invocation request message. + * Concrete subclasses must supply the values needed to build & encode the message. + * + * @author Andrea Gazzarini + */ public abstract class MethodInvocationRequestMessage extends ManagementMessage { + private final static Logger LOGGER = Logger.get(MethodInvocationRequestMessage.class); + + private DeliveryProperties _deliveryProperties; + private MessageProperties _messageProperties; + private Header _header; + + /** + * Builds a new method invocation request message with the given target identifiers. + * + * @param bankId the bank identifier. + * @param brokerId the broker identifier. + */ + public MethodInvocationRequestMessage(long bankId, long brokerId) + { + ReplyTo replyTo=new ReplyTo(); + replyTo.setRoutingKey(Configuration.getInstance().getMethodReplyQueueName()); + _messageProperties = new MessageProperties(); + _messageProperties.setReplyTo(replyTo); + + String routingKey = String.format("agent.%s.%s", brokerId,bankId); + + LOGGER.debug(Messages.QMAN_200032_COMMAND_MESSAGE_ROUTING_KEY, routingKey); + + _deliveryProperties = new DeliveryProperties(); + _deliveryProperties.setRoutingKey(routingKey); + _header = new Header(_deliveryProperties, _messageProperties); + } + @Override char opcode () { return Protocol.OPERATION_INVOCATION_REQUEST_OPCODE; } - + + /** + * Returns the package name. + * + * @return the package name. + */ protected abstract String packageName(); + + /** + * Returns the class name. + * + * @return the class name. + */ protected abstract String className(); + + /** + * Returns the schema hash. + * + * @return the schema hash. + */ protected abstract Binary schemaHash(); + + /** + * Returns the object identifier. + * + * @return the object identifier. + */ protected abstract Binary objectId(); + + /** + * Returns the method to be invoked. + * + * @return the method to be invoked. + */ protected abstract QpidMethod method(); + + /** + * Returns the parameters used for method invocation. + * + * @return the parameters used for method invocation. + */ protected abstract Object[] parameters(); + /** + * Returns the delivery properties of this message. + * + * @return the delivery properties of this message. + */ + public DeliveryProperties getDeliveryProperties () + { + return _deliveryProperties; + } + + /** + * Returns the header of this message. + * + * @return the header of this message. + */ + public Header getHeader () + { + return _header; + } + + /** + * Returns the messages header properties of this message. + * + * @return the message header properties of this message. + */ + public MessageProperties getMessageProperties () + { + return _messageProperties; + } + @Override void specificMessageEncoding () { @@ -51,4 +157,4 @@ public abstract class MethodInvocationRequestMessage extends ManagementMessage _codec.packStr8(method.getName()); method.encodeParameters(parameters(), _codec); } -}
\ No newline at end of file +} diff --git a/java/management/client/src/main/java/org/apache/qpid/management/messages/SchemaRequestMessage.java b/java/management/client/src/main/java/org/apache/qpid/management/messages/SchemaRequestMessage.java index 6bbd97d9a4..aa596c8413 100644 --- a/java/management/client/src/main/java/org/apache/qpid/management/messages/SchemaRequestMessage.java +++ b/java/management/client/src/main/java/org/apache/qpid/management/messages/SchemaRequestMessage.java @@ -23,6 +23,12 @@ package org.apache.qpid.management.messages; import org.apache.qpid.management.Protocol; import org.apache.qpid.management.domain.model.type.Binary; +/** + * Abstract representation of a schema request message. + * Concrete subclasses must supply the values needed to build & encode the message. + * + * @author Andrea Gazzarini + */ public abstract class SchemaRequestMessage extends ManagementMessage { @Override @@ -31,10 +37,25 @@ public abstract class SchemaRequestMessage extends ManagementMessage return Protocol.SCHEMA_REQUEST_OPCODE; } + /** + * Returns the package name. + * + * @return the package name. + */ protected abstract String packageName(); + /** + * Returns the class name. + * + * @return the class name. + */ protected abstract String className(); + /** + * Returns the schema hash. + * + * @return the schema hash. + */ protected abstract Binary schemaHash(); @Override @@ -44,4 +65,4 @@ public abstract class SchemaRequestMessage extends ManagementMessage _codec.packStr8(className()); schemaHash().encode(_codec); } -}
\ No newline at end of file +} diff --git a/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidClassTest.java b/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidClassTest.java index 837810ea0d..fab35d4c59 100644 --- a/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidClassTest.java +++ b/java/management/client/src/test/java/org/apache/qpid/management/domain/model/QpidClassTest.java @@ -1,6 +1,5 @@ package org.apache.qpid.management.domain.model; - import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -19,6 +18,11 @@ import org.apache.qpid.management.configuration.StubConfigurator; import org.apache.qpid.management.domain.handler.impl.MethodOrEventDataTransferObject; import org.apache.qpid.management.domain.model.QpidClass.QpidManagedObject; +/** + * Test case for Qpid Class. + * + * @author Andrea Gazzarini + */ public class QpidClassTest extends TestCase { private QpidClass _class; @@ -197,8 +201,6 @@ public class QpidClassTest extends TestCase TestConstants._1, true, TestConstants._1)); - - List<Map<String,Object>> statisticDefinitions = new ArrayList<Map<String,Object>>(2); _class.setSchema(propertyDefinitions, TestConstants.EMPTY_STATISTICS_SCHEMA, TestConstants.EMPTY_METHODS_SCHEMA); diff --git a/java/management/client/src/test/java/org/apache/qpid/management/domain/services/BrokerMessageListenerTest.java b/java/management/client/src/test/java/org/apache/qpid/management/domain/services/BrokerMessageListenerTest.java index 533c98c973..c489f7d767 100644 --- a/java/management/client/src/test/java/org/apache/qpid/management/domain/services/BrokerMessageListenerTest.java +++ b/java/management/client/src/test/java/org/apache/qpid/management/domain/services/BrokerMessageListenerTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.management.domain.services; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Random; import junit.framework.TestCase; @@ -167,4 +168,74 @@ public class BrokerMessageListenerTest extends TestCase _listener.onMessage(message); } + + /** + * Tests the execution of the onMessage() method when the incoming message is a compound message. + * + * <br>precondition : the incoming message is a compound message. + * <br>postcondition : each tokenized message is forwarded to the appropriate handler. + */ + public void testOnMessageOK_WithCompoundMessage() throws Exception + { + final Map<Character,IMessageHandler> handlersMap = new HashMap<Character,IMessageHandler>(); + char [] opcodes = {'a','b','c','d','e'}; + + class MockMessageHandler implements IMessageHandler + { + private final char _opcode; + + public MockMessageHandler(char opcode) + { + this._opcode = opcode; + } + + public void process (ManagementDecoder decoder, int sequenceNumber) + { + handlersMap.remove(_opcode); + } + + public void setDomainModel (DomainModel domainModel) + { + // Do nothing here. It's just a mock handler. + } + }; + + for (char opcode : opcodes) + { + handlersMap.put(opcode, new MockMessageHandler(opcode)); + } + + // Removes previously injected handlers (i.e. x & y) + _listener._handlers.clear(); + _listener.setHandlers(handlersMap); + + Message compoundMessage = createCompoundMessage(opcodes); + _listener.onMessage(compoundMessage); + + assertTrue(handlersMap.isEmpty()); + } + + // Creates a (non valid) compound message. + private Message createCompoundMessage(char[] opcodes) throws IOException { + byte [] compoundMessageData = new byte [12 * opcodes.length]; + Random randomizer = new Random(); + int position = 0; + + for (char opcode : opcodes) { + System.arraycopy(MessageTokenizer.MAGIC_NUMBER_BYTES, 0, compoundMessageData, position, MessageTokenizer.MAGIC_NUMBER_BYTES.length); + position+=MessageTokenizer.MAGIC_NUMBER_BYTES.length; + + compoundMessageData[position++] = (byte)opcode; + + for (int c = 4; c < 12; c++) + { + byte aByte = (byte)randomizer.nextInt(127); + compoundMessageData[position++] = aByte; + } + } + + Message compoundMessage = new ByteBufferMessage(); + compoundMessage.appendData(compoundMessageData); + return compoundMessage; + } } diff --git a/java/management/client/src/test/java/org/apache/qpid/management/domain/services/MessageTokenizerTest.java b/java/management/client/src/test/java/org/apache/qpid/management/domain/services/MessageTokenizerTest.java new file mode 100644 index 0000000000..754686bc2f --- /dev/null +++ b/java/management/client/src/test/java/org/apache/qpid/management/domain/services/MessageTokenizerTest.java @@ -0,0 +1,120 @@ +package org.apache.qpid.management.domain.services;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.*;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.api.Message;
+import org.apache.qpid.nclient.util.ByteBufferMessage;
+import org.apache.qpid.transport.codec.ManagementDecoder;
+
+/**
+ * Tests case for messaeg tokenizer.
+ *
+ * @author Andrea Gazzarini
+ */
+public class MessageTokenizerTest extends TestCase {
+
+ /**
+ * Tests the execution of the message tokenizer when the given message is not a valid AMQP message.
+ *
+ * <br>precondition : the incoming message is not a valid AMQP message.
+ * <br>postcondition : no exception is thrown and there will be exactly one token with the given message.
+ */
+ public void testOK_WithNoMessage() throws IOException{
+ byte [] noMessage = {2,10,120,23,23,23,4,10,11,12,2,1,3,-22};
+
+ Message multiMessage = new ByteBufferMessage();
+ multiMessage.appendData(noMessage);
+ MessageTokenizer tokenizer = new MessageTokenizer(multiMessage);
+
+ assertEquals(1, tokenizer.countTokens());
+ assertEquals(tokenizer.nextElement(),noMessage);
+ assertFalse(tokenizer.hasMoreElements());
+ }
+
+ /**
+ * Tests the execution of the message tokenizer when the given message contains only one message.
+ *
+ * <br>precondition : the incoming message contains only one message.
+ * <br>postcondition : no exception is thrown and there will be exactly one token with the given message.
+ */
+ public void testOK_WithOneMessage() throws IOException{
+ byte [] oneEncodedMessage = {'A','M','2',23,23,23,4,10,11,12,2,1,3,-22};
+
+ Message multiMessage = new ByteBufferMessage();
+ multiMessage.appendData(oneEncodedMessage);
+ MessageTokenizer tokenizer = new MessageTokenizer(multiMessage);
+
+ assertEquals(1, tokenizer.countTokens());
+ assertEquals(tokenizer.nextElement(),oneEncodedMessage);
+ assertFalse(tokenizer.hasMoreElements());
+ }
+
+ /**
+ * Tests the execution of the message tokenizer when the given message contains a random number of messages.
+ *
+ * <br>precondition : the incoming message contains a random number of messages.
+ * <br>postcondition : no exception is thrown and each built token is a valid message starting with right header.
+ */
+ public void testOK_WithRandomNUmberOfMessages() throws IOException{
+ Random randomizer = new Random();
+
+ int howManyLoops = randomizer.nextInt(10000);
+ byte [] compoundMessageData = new byte [12 * howManyLoops];
+
+ List<byte []> messages = new ArrayList<byte[]>(howManyLoops);
+
+ int position = 0;
+ for (int i = 0; i < howManyLoops; i++)
+ {
+ byte [] message = new byte[12];
+ System.arraycopy(MessageTokenizer.MAGIC_NUMBER_BYTES, 0, compoundMessageData, position, MessageTokenizer.MAGIC_NUMBER_BYTES.length);
+ System.arraycopy(MessageTokenizer.MAGIC_NUMBER_BYTES, 0, message, 0, MessageTokenizer.MAGIC_NUMBER_BYTES.length);
+ position+=MessageTokenizer.MAGIC_NUMBER_BYTES.length;
+
+ for (int c = 3; c < 12; c++)
+ {
+ byte aByte = (byte)randomizer.nextInt(127);
+ compoundMessageData[position++] = aByte;
+ message[c] = aByte;
+ }
+ messages.add(message);
+ }
+
+ Message multiMessage = new ByteBufferMessage();
+ multiMessage.appendData(compoundMessageData);
+ MessageTokenizer tokenizer = new MessageTokenizer(multiMessage);
+
+ int howManyTokens = tokenizer.countTokens();
+ assertEquals(howManyLoops, howManyTokens);
+
+ int index = 0;
+ while (tokenizer.hasMoreElements())
+ {
+ assertEquals(tokenizer.nextElement(),messages.get(index++));
+ }
+
+ assertEquals((index),howManyTokens);
+ }
+
+ /**
+ * Internal method used for comparison of two messages.
+ *
+ * @param message the token message just built by the tokenizer.
+ * @param expected the expected result.
+ */
+ private void assertEquals(Message message, byte [] expected) throws IOException
+ {
+ ByteBuffer messageContent = message.readData();
+ ManagementDecoder decoder = new ManagementDecoder();
+ decoder.init(messageContent);
+ byte [] content = decoder.readReaminingBytes();
+ assertTrue(Arrays.equals(content, expected));
+ }
+}
|
