From 123fc382cb0eac7edf8345ecfab9cb66997392cf Mon Sep 17 00:00:00 2001 From: Bhupendra Bhusman Bhardwaj Date: Fri, 23 Mar 2007 16:02:51 +0000 Subject: QPID-420 (merged from trunk) And r518998:518999 and r520846:520850 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@521782 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/server/AMQBrokerManagerMBean.java | 4 +-- .../server/protocol/AMQMinaProtocolSession.java | 18 ++++++++++--- .../server/protocol/AMQProtocolSessionMBean.java | 17 +++++++++++- .../qpid/server/protocol/ManagedConnection.java | 9 +++++++ .../qpid/client/message/TestMessageFactory.java | 2 +- .../apache/qpid/requestreply/PingPongProducer.java | 30 +++++++++++++++++----- .../main/java/org/apache/qpid/topic/Config.java | 15 +++++++++++ 7 files changed, 81 insertions(+), 14 deletions(-) (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index 386b0e6c45..23c32aceab 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -171,7 +171,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr } catch (AMQException ex) { - throw new MBeanException(ex,"Error in creating queue " + queueName); + throw new MBeanException(new JMException(ex.getMessage()),"Error in creating queue " + queueName); } } @@ -202,7 +202,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr } catch (AMQException ex) { - throw new MBeanException(ex, ex.toString()); + throw new MBeanException(new JMException(ex.getMessage()), "Error in deleting queue " + queueName); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 1c741ead1e..fd8fb2d5cb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -74,6 +74,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private AMQShortString _contextKey; + private AMQShortString _clientVersion = null; + private VirtualHost _virtualHost; private final Map _channelMap = new HashMap(); @@ -667,9 +669,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public void setClientProperties(FieldTable clientProperties) { _clientProperties = clientProperties; - if ((_clientProperties != null) && (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null)) + if (_clientProperties != null) { - setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE))); + if (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null) + { + setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE))); + } + if (_clientProperties.getString(ClientProperties.version.toString()) != null) + { + _clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString())); + } } } @@ -745,5 +754,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { return _authorizedID; } - + public String getClientVersion() + { + return _clientVersion == null ? null : _clientVersion.toString(); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index d2a20cdf57..5eebd4c524 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -56,6 +56,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed { private AMQMinaProtocolSession _session = null; private String _name = null; + //openmbean data types for representing the channel attributes private final static String[] _channelAtttibuteNames = {"Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"}; private final static String[] _indexNames = {_channelAtttibuteNames[0]}; @@ -95,12 +96,26 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed */ private static void init() throws OpenDataException { - _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames, _channelAtttibuteNames, _channelAttributeTypes); _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames); } + public String getClientId() + { + return _session.getContextKey() == null ? null : _session.getContextKey().toString(); + } + + public String getAuthorizedId() + { + return _session.getAuthorizedID(); + } + + public String getVersion() + { + return _session.getClientVersion() == null ? null : _session.getClientVersion().toString(); + } + public Date getLastIoTime() { return new Date(_session.getIOSession().getLastIoTime()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java index f9a0c4d18f..990c4c0794 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java @@ -41,6 +41,15 @@ public interface ManagedConnection { static final String TYPE = "Connection"; + @MBeanAttribute(name = "ClientId", description = "Client Id") + String getClientId(); + + @MBeanAttribute(name = "AuthorizedId", description = "User Name") + String getAuthorizedId(); + + @MBeanAttribute(name = "Version", description = "Client Version") + String getVersion(); + /** * Tells the remote address of this connection. * @return remote address diff --git a/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java b/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java index c0f236b833..eeb4021f34 100644 --- a/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java +++ b/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java @@ -103,7 +103,7 @@ public class TestMessageFactory { StringBuffer buf = new StringBuffer(size); int count = 0; - while (count < size) + while (count <= (size - MESSAGE_DATA_BYTES.length())) { buf.append(MESSAGE_DATA_BYTES); count += MESSAGE_DATA_BYTES.length(); diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 2a3aff4692..c4074806b5 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -497,11 +497,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis boolean transacted = config.isTransacted(); boolean persistent = config.usePersistentMessages(); int messageSize = (config.getPayload() != 0) ? config.getPayload() : MESSAGE_SIZE_DEAFULT; - // int messageCount = config.getMessages(); + int messageCount = config.getMessages(); int destCount = (config.getDestinationsCount() != 0) ? config.getDestinationsCount() : DESTINATION_COUNT_DEFAULT; int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : TX_BATCH_SIZE_DEFAULT; int rate = (config.getRate() != 0) ? config.getRate() : RATE_DEFAULT; boolean pubsub = config.isPubSub(); + long timeout = (config.getTimeout() != 0) ? config.getTimeout() : TIMEOUT_DEFAULT; String destName = config.getDestination(); if (destName == null) @@ -561,10 +562,20 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. pingProducer.getConnection().setExceptionListener(pingProducer); - // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception. - Thread pingThread = new Thread(pingProducer); - pingThread.run(); - pingThread.join(); + // If messageount is 0, then continue sending + if (messageCount == 0) + { + // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception. + Thread pingThread = new Thread(pingProducer); + pingThread.start(); + pingThread.join(); + } + else + { + // This feature is needed, when we want to send fix no of messages + pingProducer.pingLoop(messageCount, timeout); + } + pingProducer.close(); } /** @@ -963,7 +974,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** * The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each. */ - public void pingLoop() + public void pingLoop(int pingCount, long timeout) { try { @@ -972,7 +983,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); // Send the message and wait for a reply. - pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT); + pingAndWaitForReply(msg, pingCount, timeout); } catch (JMSException e) { @@ -986,6 +997,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } } + public void pingLoop() + { + pingLoop(TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT); + } + public Destination getReplyDestination() { return getReplyDestinations().get(0); diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Config.java b/java/perftests/src/main/java/org/apache/qpid/topic/Config.java index 60aa9f3930..342b28ca17 100644 --- a/java/perftests/src/main/java/org/apache/qpid/topic/Config.java +++ b/java/perftests/src/main/java/org/apache/qpid/topic/Config.java @@ -51,6 +51,7 @@ public class Config extends AbstractConfig implements ConnectorConfig private int batchSize; private int rate; private boolean ispubsub; + private long timeout; public Config() { @@ -161,6 +162,16 @@ public class Config extends AbstractConfig implements ConnectorConfig this.delay = delay; } + public long getTimeout() + { + return timeout; + } + + public void setTimeout(long time) + { + this.timeout = time; + } + public String getClientId() { return clientId; @@ -285,6 +296,10 @@ public class Config extends AbstractConfig implements ConnectorConfig { destinationName = value; } + else if("-timeout".equalsIgnoreCase(key)) + { + setTimeout(parseLong("Bad timeout data", value)); + } else { System.out.println("Ignoring unrecognised option: " + key); -- cgit v1.2.1