summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-06-26 17:40:01 +0000
committerTed Ross <tross@apache.org>2009-06-26 17:40:01 +0000
commit3851ab7c8e7668638d4e0fb4627db96013a753c6 (patch)
tree843af86708a86678534295a4c09188d906ad0913 /java
parent77cd3793e12416dc017ace43f6ef5331fb0602c4 (diff)
downloadqpid-python-3851ab7c8e7668638d4e0fb4627db96013a753c6.tar.gz
QPID-1948 - Changes to the java consle as a result of a code generated front end.
Patch from Bryan Kearney git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@788782 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/build.deps1
-rw-r--r--java/management/console/src/main/java/org/apache/qpid/console/Broker.java1008
-rw-r--r--java/management/console/src/main/java/org/apache/qpid/console/ClassKey.java20
-rw-r--r--java/management/console/src/main/java/org/apache/qpid/console/QMFObject.java2
-rw-r--r--java/management/console/src/main/java/org/apache/qpid/console/SchemaClass.java2
-rw-r--r--java/management/console/src/main/java/org/apache/qpid/console/Session.java3
-rw-r--r--java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java197
-rw-r--r--java/management/console/src/test/java/org/apache/qpid/console/ClassKeyTest.java39
8 files changed, 674 insertions, 598 deletions
diff --git a/java/build.deps b/java/build.deps
index 9ee1f47463..7f7d5d94ea 100644
--- a/java/build.deps
+++ b/java/build.deps
@@ -167,6 +167,7 @@ client-example.test.libs=${test.libs}
tools.test.libs=${client.test.libs}
testkit.test.libs=${test.libs}
management-client.test.libs=${muse.libs} ${test.libs} ${log4j} ${javassist} ${geronimo-servlet} ${commons-pool}
+management-console.test.libs=${junit4} ${slf4j-log4j} ${log4j} ${client.libs}
management-eclipse-plugin.test.libs=${systests.libs}
broker-plugins.test.libs=${test.libs}
management-tools-qpid-cli.test.libs=${junit4} ${slf4j-log4j} ${log4j} ${client.libs}
diff --git a/java/management/console/src/main/java/org/apache/qpid/console/Broker.java b/java/management/console/src/main/java/org/apache/qpid/console/Broker.java
index a672beb548..8c71925f4e 100644
--- a/java/management/console/src/main/java/org/apache/qpid/console/Broker.java
+++ b/java/management/console/src/main/java/org/apache/qpid/console/Broker.java
@@ -1,505 +1,505 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.console;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.UUID;
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.transport.codec.BBDecoder;
-import org.apache.qpid.transport.codec.BBEncoder;
-import org.apache.qpid.transport.codec.Decoder;
-import org.apache.qpid.transport.codec.Encoder;
-
-public class Broker implements MessageListener
-{
- class HeaderInfo
- {
- boolean valid;
- long sequence;
- char opcode;
-
- public String toString()
- {
- return String.format("%s Header with opcode %s and sequence %s",
- (valid ? "Valid" : "Invalid"), opcode, sequence);
- }
- }
- private static Logger log = LoggerFactory.getLogger(Broker.class);
- public static int SYNC_TIME = 60000;
- // JMS Stuff
- private javax.jms.Session session;
- boolean sessionTransacted = false;
- private String replyName;
- private String topicName;
- private MessageProducer prod;
- private ArrayList<MessageConsumer> consumers = new ArrayList<MessageConsumer>();
- private Queue reply;
- private Queue topic;
- private int acknowledgeMode = javax.jms.Session.AUTO_ACKNOWLEDGE;
- // QMF Stuff
- AMQConnection connection;
- public String url;
- public java.util.HashMap<String, Agent> Agents = new java.util.HashMap<String, Agent>();
- private Session consoleSession;
- private boolean connected = false;
- private boolean syncInFlight = false;
- private boolean topicBound = false;
- private int reqsOutstanding = 0;
- private Object lockObject = new Object();
-
- UUID brokerId = UUID.randomUUID();
-
- public Broker(org.apache.qpid.console.Session session, String url)
- {
- log.debug("Creating a new Broker for url " + url);
- this.url = url;
- consoleSession = session;
- this.tryToConnect();
- }
-
- public int brokerBank()
- {
- return 1;
- }
-
- protected HeaderInfo CheckHeader(Decoder decoder)
- {
- HeaderInfo returnValue = new HeaderInfo();
- returnValue.opcode = 'x';
- returnValue.sequence = -1;
- returnValue.valid = false;
- if (decoder.hasRemaining())
- {
- char character = (char) decoder.readUint8();
- if (character != 'A')
- {
- return returnValue;
- }
- character = (char) decoder.readUint8();
- if (character != 'M')
- {
- return returnValue;
- }
- character = (char) decoder.readUint8();
- if (character != '3')
- {
- return returnValue;
- }
- returnValue.valid = true;
- returnValue.opcode = (char) decoder.readUint8();
- returnValue.sequence = decoder.readUint32();
- }
- return returnValue;
- }
-
- public Encoder createEncoder(char opcode, long sequence)
- {
- return setHeader(new BBEncoder(1024), opcode, sequence);
- }
-
- public Message createMessage(Encoder enc)
- {
- try
- {
- byte[] buf = new byte[1024];
- byte[] body = new byte[1024];
- BBEncoder bbenc = (BBEncoder) enc;
- BytesMessage msg = session.createBytesMessage();
- ByteBuffer slice = bbenc.buffer();
- while (slice.hasRemaining())
- {
- int n = Math.min(buf.length, slice.remaining());
- slice.get(buf, 0, n);
- msg.writeBytes(buf, 0, n);
- }
- return msg;
- } catch (JMSException e)
- {
- throw new ConsoleException(e);
- }
- }
-
- public void decrementOutstanding()
- {
- synchronized (lockObject)
- {
- this.reqsOutstanding -= 1;
- if ((reqsOutstanding == 0) & !topicBound)
- {
- for (String key : consoleSession.bindingKeys())
- {
- try
- {
- // this.clientSession.exchangeBind(topicName,
- // "qpid.mannagement", key) ;
- log.debug("Setting Topic Binding " + key);
- // topicName = "management://qpid.management//" + key;
- String rk = String.format("&routingkey='%s'", key);
- Queue aQueue = session.createQueue(topicName + rk);
- MessageConsumer cons = session.createConsumer(aQueue);
- cons.setMessageListener(this);
- consumers.add(cons);
- } catch (JMSException e)
- {
- throw new ConsoleException(e);
- }
- }
- topicBound = true;
- }
- if ((reqsOutstanding == 0) & syncInFlight)
- {
- syncInFlight = false;
- lockObject.notifyAll();
- }
- }
- }
-
- private byte[] ensure(int capacity, byte[] body, int size)
- {
- if (capacity > body.length)
- {
- byte[] copy = new byte[capacity];
- System.arraycopy(body, 0, copy, 0, size);
- body = copy;
- }
- return body;
- }
-
- protected void finalize()
- {
- if (connected)
- {
- this.shutdown();
- }
- }
-
- public boolean getSyncInFlight()
- {
- return syncInFlight;
- }
-
- public void incrementOutstanding()
- {
- synchronized (lockObject)
- {
- this.reqsOutstanding += 1;
- }
- }
-
- public boolean isConnected()
- {
- return connected;
- }
-
- public void onMessage(Message msg)
- {
- Decoder decoder = readBody(msg);
- HeaderInfo headerInfo = this.CheckHeader(decoder);
- // log.debug(headerInfo.toString());
- while (headerInfo.valid)
- {
- long seq = headerInfo.sequence;
- switch (headerInfo.opcode)
- {
- case 'b':
- consoleSession.handleBrokerResponse(this, decoder, seq);
- break;
- case 'p':
- consoleSession.handlePackageIndicator(this, decoder, seq);
- break;
- case 'z':
- consoleSession.handleCommandComplete(this, decoder, seq);
- break;
- case 'q':
- consoleSession.handleClassIndicator(this, decoder, seq);
- break;
- case 'm':
- consoleSession.handleMethodResponse(this, decoder, seq);
- break;
- case 'h':
- consoleSession
- .handleHeartbeatIndicator(this, decoder, seq, msg);
- break;
- case 'e':
- consoleSession.handleEventIndicator(this, decoder, seq);
- break;
- case 's':
- consoleSession.handleSchemaResponse(this, decoder, seq);
- break;
- case 'c':
- consoleSession.handleContentIndicator(this, decoder, seq, true,
- false);
- break;
- case 'i':
- consoleSession.handleContentIndicator(this, decoder, seq,
- false, true);
- break;
- case 'g':
- consoleSession.handleContentIndicator(this, decoder, seq, true,
- true);
- break;
- default:
- log.error("Invalid message type recieved with opcode "
- + headerInfo.opcode);
- break;
- }
- headerInfo = this.CheckHeader(decoder);
- }
- }
-
- private Decoder readBody(Message message)
- {
- BytesMessage msg = (BytesMessage) message;
- BBDecoder dec = new BBDecoder();
- byte[] buf = new byte[1024];
- byte[] body = new byte[1024];
- int size = 0;
- int n;
- try
- {
- while ((n = msg.readBytes(buf)) > 0)
- {
- body = ensure(size + n, body, size);
- System.arraycopy(buf, 0, body, size, n);
- size += n;
- }
- } catch (JMSException e)
- {
- throw new ConsoleException(e);
- }
- dec.init(ByteBuffer.wrap(body, 0, size));
- return dec;
- }
-
- public void send(Encoder enc)
- {
- this.send(this.createMessage(enc), "broker");
- }
-
- public void send(Message msg)
- {
- this.send(msg, "broker", -1);
- }
-
- public void send(Message msg, String routingKey)
- {
- this.send(msg, routingKey, -1);
- }
-
- public void send(Message msg, String routingKey, int ttl)
- {
- synchronized (lockObject)
- {
- try
- {
- log.debug(String.format("Sending message to routing key '%s'",
- routingKey));
- String destName = String.format(
- "management://qpid.management//?routingkey='%s'",
- routingKey);
- log.debug(destName);
- Queue dest = session.createQueue(destName);
- // Queue jmsReply = session
- // createQueue("direct://amq.direct//?routingkey='reply-"
- // + brokerId + "'");
- if (ttl != -1)
- {
- msg.setJMSExpiration(ttl);
- }
- msg.setJMSReplyTo(reply);
- prod.send(dest, msg);
- } catch (Exception e)
- {
- throw new ConsoleException(e);
- }
- }
- }
-
- protected Encoder setHeader(Encoder enc, char opcode, long sequence)
- {
- enc.writeUint8((short) 'A');
- enc.writeUint8((short) 'M');
- enc.writeUint8((short) '3');
- enc.writeUint8((short) opcode);
- enc.writeUint32(sequence);
- return enc;
- }
-
- public void setSyncInFlight(boolean inFlight)
- {
- synchronized (lockObject)
- {
- syncInFlight = inFlight;
- lockObject.notifyAll();
- }
- }
-
- public void shutdown()
- {
- if (connected)
- {
- this.waitForStable();
- try
- {
- session.close();
- for (MessageConsumer cons : consumers)
- {
- cons.close();
- }
- connection.close();
- } catch (Exception e)
- {
- throw new ConsoleException(e);
- } finally
- {
- this.connected = false;
- }
- }
- }
-
- protected void tryToConnect()
- {
- try
- {
- reqsOutstanding = 1;
- Agent newAgent = new Agent(this, 0, "BrokerAgent");
- Agents.put(newAgent.agentKey(), newAgent);
- connection = new AMQConnection(url);
- session = connection.createSession(sessionTransacted,
- acknowledgeMode);
- replyName = String
- .format(
- "direct://amq.direct//reply-%s?exclusive='True'&autodelete='True'",
- brokerId);
- topicName = String
- .format(
- "management://qpid.management//topic-%s?exclusive='True'&autodelete='True'",
- brokerId);
- reply = session.createQueue(replyName);
- MessageConsumer cons = session.createConsumer(reply);
- cons.setMessageListener(this);
- consumers.add(cons);
- prod = session.createProducer(null);
- topic = session.createQueue(topicName);
- cons = session.createConsumer(topic);
- cons.setMessageListener(this);
- consumers.add(cons);
- connection.start();
- // Rest of the topic is bound later. Start er up
- } catch (Exception e)
- {
- throw new ConsoleException(e);
- }
- connected = true;
- consoleSession.handleBrokerConnect(this);
- Encoder Encoder = createEncoder('B', 0);
- this.send(Encoder);
- }
-
- public void updateAgent(QMFObject obj)
- {
- long agentBank = (Long) obj.GetProperty("agentBank");
- long brokerBank = (Long) obj.GetProperty("brokerBank");
- String key = Agent.AgentKey(agentBank, brokerBank);
- if (obj.isDeleted())
- {
- if (Agents.containsKey(key))
- {
- Agent agent = Agents.get(key);
- Agents.remove(key);
- consoleSession.handleAgentRemoved(agent);
- }
- } else
- {
- if (!Agents.containsKey(key))
- {
- Agent newAgent = new Agent(this, agentBank, (String) obj
- .GetProperty("label"));
- Agents.put(key, newAgent);
- consoleSession.handleNewAgent(newAgent);
- }
- }
- }
-
- public void waitForStable()
- {
- synchronized (lockObject)
- {
- if (connected)
- {
- long start = System.currentTimeMillis();
- syncInFlight = true;
- while (reqsOutstanding != 0)
- {
- log.debug("Waiting to recieve messages");
- try
- {
- lockObject.wait(SYNC_TIME);
- } catch (Exception e)
- {
- throw new ConsoleException(e);
- }
- long duration = System.currentTimeMillis() - start;
- if (duration > SYNC_TIME)
- {
- throw new ConsoleException(
- "Timeout waiting for Broker to Sync");
- }
- }
- }
- }
- }
-
- public void waitForSync(int timeout)
- {
- synchronized (lockObject)
- {
- long start = System.currentTimeMillis();
- while (syncInFlight)
- {
- try
- {
- lockObject.wait(SYNC_TIME);
- } catch (Exception e)
- {
- throw new ConsoleException(e);
- }
- }
- long duration = System.currentTimeMillis() - start;
- if (duration > timeout)
- {
- throw new ConsoleException("Timeout waiting for Broker to Sync");
- }
- }
- }
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.console;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.UUID;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.transport.codec.BBEncoder;
+import org.apache.qpid.transport.codec.Decoder;
+import org.apache.qpid.transport.codec.Encoder;
+
+public class Broker implements MessageListener
+{
+ class HeaderInfo
+ {
+ boolean valid;
+ long sequence;
+ char opcode;
+
+ public String toString()
+ {
+ return String.format("%s Header with opcode %s and sequence %s",
+ (valid ? "Valid" : "Invalid"), opcode, sequence);
+ }
+ }
+
+ private static Logger log = LoggerFactory.getLogger(Broker.class);
+ public static int SYNC_TIME = 60000;
+ // JMS Stuff
+ private javax.jms.Session session;
+ boolean sessionTransacted = false;
+ private String replyName;
+ private String topicName;
+ private MessageProducer prod;
+ private ArrayList<MessageConsumer> consumers = new ArrayList<MessageConsumer>();
+ private Queue reply;
+ private Queue topic;
+ private int acknowledgeMode = javax.jms.Session.AUTO_ACKNOWLEDGE;
+ // QMF Stuff
+ AMQConnection connection;
+ public String url;
+ public java.util.HashMap<String, Agent> Agents = new java.util.HashMap<String, Agent>();
+ private Session consoleSession;
+ private boolean connected = false;
+ private boolean syncInFlight = false;
+ private boolean topicBound = false;
+ private int reqsOutstanding = 0;
+ private Object lockObject = new Object();
+ UUID brokerId = UUID.randomUUID();
+
+ public Broker(org.apache.qpid.console.Session session, String url)
+ {
+ log.debug("Creating a new Broker for url " + url);
+ this.url = url;
+ consoleSession = session;
+ this.tryToConnect();
+ }
+
+ public int brokerBank()
+ {
+ return 1;
+ }
+
+ protected HeaderInfo CheckHeader(Decoder decoder)
+ {
+ HeaderInfo returnValue = new HeaderInfo();
+ returnValue.opcode = 'x';
+ returnValue.sequence = -1;
+ returnValue.valid = false;
+ if (decoder.hasRemaining())
+ {
+ char character = (char) decoder.readUint8();
+ if (character != 'A')
+ {
+ return returnValue;
+ }
+ character = (char) decoder.readUint8();
+ if (character != 'M')
+ {
+ return returnValue;
+ }
+ character = (char) decoder.readUint8();
+ if (character != '3')
+ {
+ return returnValue;
+ }
+ returnValue.valid = true;
+ returnValue.opcode = (char) decoder.readUint8();
+ returnValue.sequence = decoder.readUint32();
+ }
+ return returnValue;
+ }
+
+ public Encoder createEncoder(char opcode, long sequence)
+ {
+ return setHeader(new BBEncoder(1024), opcode, sequence);
+ }
+
+ public Message createMessage(Encoder enc)
+ {
+ try
+ {
+ byte[] buf = new byte[1024];
+ byte[] body = new byte[1024];
+ BBEncoder bbenc = (BBEncoder) enc;
+ BytesMessage msg = session.createBytesMessage();
+ ByteBuffer slice = bbenc.buffer();
+ while (slice.hasRemaining())
+ {
+ int n = Math.min(buf.length, slice.remaining());
+ slice.get(buf, 0, n);
+ msg.writeBytes(buf, 0, n);
+ }
+ return msg;
+ } catch (JMSException e)
+ {
+ throw new ConsoleException(e);
+ }
+ }
+
+ public void decrementOutstanding()
+ {
+ synchronized (lockObject)
+ {
+ this.reqsOutstanding -= 1;
+ if ((reqsOutstanding == 0) & !topicBound)
+ {
+ for (String key : consoleSession.bindingKeys())
+ {
+ try
+ {
+ // this.clientSession.exchangeBind(topicName,
+ // "qpid.mannagement", key) ;
+ log.debug("Setting Topic Binding " + key);
+ // topicName = "management://qpid.management//" + key;
+ String rk = String.format("&routingkey='%s'", key);
+ Queue aQueue = session.createQueue(topicName + rk);
+ MessageConsumer cons = session.createConsumer(aQueue);
+ cons.setMessageListener(this);
+ consumers.add(cons);
+ } catch (JMSException e)
+ {
+ throw new ConsoleException(e);
+ }
+ }
+ topicBound = true;
+ }
+ if ((reqsOutstanding == 0) & syncInFlight)
+ {
+ syncInFlight = false;
+ lockObject.notifyAll();
+ }
+ }
+ }
+
+ private byte[] ensure(int capacity, byte[] body, int size)
+ {
+ if (capacity > body.length)
+ {
+ byte[] copy = new byte[capacity];
+ System.arraycopy(body, 0, copy, 0, size);
+ body = copy;
+ }
+ return body;
+ }
+
+ protected void finalize()
+ {
+ if (connected)
+ {
+ this.shutdown();
+ }
+ }
+
+ public boolean getSyncInFlight()
+ {
+ return syncInFlight;
+ }
+
+ public void incrementOutstanding()
+ {
+ synchronized (lockObject)
+ {
+ this.reqsOutstanding += 1;
+ }
+ }
+
+ public boolean isConnected()
+ {
+ return connected;
+ }
+
+ public void onMessage(Message msg)
+ {
+ Decoder decoder = readBody(msg);
+ HeaderInfo headerInfo = this.CheckHeader(decoder);
+ // log.debug(headerInfo.toString());
+ while (headerInfo.valid)
+ {
+ long seq = headerInfo.sequence;
+ switch (headerInfo.opcode)
+ {
+ case 'b':
+ consoleSession.handleBrokerResponse(this, decoder, seq);
+ break;
+ case 'p':
+ consoleSession.handlePackageIndicator(this, decoder, seq);
+ break;
+ case 'z':
+ consoleSession.handleCommandComplete(this, decoder, seq);
+ break;
+ case 'q':
+ consoleSession.handleClassIndicator(this, decoder, seq);
+ break;
+ case 'm':
+ consoleSession.handleMethodResponse(this, decoder, seq);
+ break;
+ case 'h':
+ consoleSession
+ .handleHeartbeatIndicator(this, decoder, seq, msg);
+ break;
+ case 'e':
+ consoleSession.handleEventIndicator(this, decoder, seq);
+ break;
+ case 's':
+ consoleSession.handleSchemaResponse(this, decoder, seq);
+ break;
+ case 'c':
+ consoleSession.handleContentIndicator(this, decoder, seq, true,
+ false);
+ break;
+ case 'i':
+ consoleSession.handleContentIndicator(this, decoder, seq,
+ false, true);
+ break;
+ case 'g':
+ consoleSession.handleContentIndicator(this, decoder, seq, true,
+ true);
+ break;
+ default:
+ log.error("Invalid message type recieved with opcode "
+ + headerInfo.opcode);
+ break;
+ }
+ headerInfo = this.CheckHeader(decoder);
+ }
+ }
+
+ private Decoder readBody(Message message)
+ {
+ BytesMessage msg = (BytesMessage) message;
+ BBDecoder dec = new BBDecoder();
+ byte[] buf = new byte[1024];
+ byte[] body = new byte[1024];
+ int size = 0;
+ int n;
+ try
+ {
+ while ((n = msg.readBytes(buf)) > 0)
+ {
+ body = ensure(size + n, body, size);
+ System.arraycopy(buf, 0, body, size, n);
+ size += n;
+ }
+ } catch (JMSException e)
+ {
+ throw new ConsoleException(e);
+ }
+ dec.init(ByteBuffer.wrap(body, 0, size));
+ return dec;
+ }
+
+ public void send(Encoder enc)
+ {
+ this.send(this.createMessage(enc), "broker");
+ }
+
+ public void send(Message msg)
+ {
+ this.send(msg, "broker", -1);
+ }
+
+ public void send(Message msg, String routingKey)
+ {
+ this.send(msg, routingKey, -1);
+ }
+
+ public void send(Message msg, String routingKey, int ttl)
+ {
+ synchronized (lockObject)
+ {
+ try
+ {
+ log.debug(String.format("Sending message to routing key '%s'",
+ routingKey));
+ String destName = String.format(
+ "management://qpid.management//?routingkey='%s'",
+ routingKey);
+ log.debug(destName);
+ Queue dest = session.createQueue(destName);
+ // Queue jmsReply = session
+ // createQueue("direct://amq.direct//?routingkey='reply-"
+ // + brokerId + "'");
+ if (ttl != -1)
+ {
+ msg.setJMSExpiration(ttl);
+ }
+ msg.setJMSReplyTo(reply);
+ prod.send(dest, msg);
+ } catch (Exception e)
+ {
+ throw new ConsoleException(e);
+ }
+ }
+ }
+
+ protected Encoder setHeader(Encoder enc, char opcode, long sequence)
+ {
+ enc.writeUint8((short) 'A');
+ enc.writeUint8((short) 'M');
+ enc.writeUint8((short) '3');
+ enc.writeUint8((short) opcode);
+ enc.writeUint32(sequence);
+ return enc;
+ }
+
+ public void setSyncInFlight(boolean inFlight)
+ {
+ synchronized (lockObject)
+ {
+ syncInFlight = inFlight;
+ lockObject.notifyAll();
+ }
+ }
+
+ public void shutdown()
+ {
+ if (connected)
+ {
+ this.waitForStable();
+ try
+ {
+ session.close();
+ for (MessageConsumer cons : consumers)
+ {
+ cons.close();
+ }
+ connection.close();
+ } catch (Exception e)
+ {
+ throw new ConsoleException(e);
+ } finally
+ {
+ this.connected = false;
+ }
+ }
+ }
+
+ protected void tryToConnect()
+ {
+ try
+ {
+ reqsOutstanding = 1;
+ Agent newAgent = new Agent(this, 0, "BrokerAgent");
+ Agents.put(newAgent.agentKey(), newAgent);
+ connection = new AMQConnection(url);
+ session = connection.createSession(sessionTransacted,
+ acknowledgeMode);
+ replyName = String
+ .format(
+ "direct://amq.direct//reply-%s?exclusive='True'&autodelete='True'",
+ brokerId);
+ topicName = String
+ .format(
+ "management://qpid.management//topic-%s?exclusive='True'&autodelete='True'",
+ brokerId);
+ reply = session.createQueue(replyName);
+ MessageConsumer cons = session.createConsumer(reply);
+ cons.setMessageListener(this);
+ consumers.add(cons);
+ prod = session.createProducer(null);
+ topic = session.createQueue(topicName);
+ cons = session.createConsumer(topic);
+ cons.setMessageListener(this);
+ consumers.add(cons);
+ connection.start();
+ // Rest of the topic is bound later. Start er up
+ } catch (Exception e)
+ {
+ throw new ConsoleException(e);
+ }
+ connected = true;
+ consoleSession.handleBrokerConnect(this);
+ Encoder Encoder = createEncoder('B', 0);
+ this.send(Encoder);
+ }
+
+ public void updateAgent(QMFObject obj)
+ {
+ long agentBank = (Long) obj.getProperty("agentBank");
+ long brokerBank = (Long) obj.getProperty("brokerBank");
+ String key = Agent.AgentKey(agentBank, brokerBank);
+ if (obj.isDeleted())
+ {
+ if (Agents.containsKey(key))
+ {
+ Agent agent = Agents.get(key);
+ Agents.remove(key);
+ consoleSession.handleAgentRemoved(agent);
+ }
+ } else
+ {
+ if (!Agents.containsKey(key))
+ {
+ Agent newAgent = new Agent(this, agentBank, (String) obj
+ .getProperty("label"));
+ Agents.put(key, newAgent);
+ consoleSession.handleNewAgent(newAgent);
+ }
+ }
+ }
+
+ public void waitForStable()
+ {
+ synchronized (lockObject)
+ {
+ if (connected)
+ {
+ long start = System.currentTimeMillis();
+ syncInFlight = true;
+ while (reqsOutstanding != 0)
+ {
+ log.debug("Waiting to recieve messages");
+ try
+ {
+ lockObject.wait(SYNC_TIME);
+ } catch (Exception e)
+ {
+ throw new ConsoleException(e);
+ }
+ long duration = System.currentTimeMillis() - start;
+ if (duration > SYNC_TIME)
+ {
+ throw new ConsoleException(
+ "Timeout waiting for Broker to Sync");
+ }
+ }
+ }
+ }
+ }
+
+ public void waitForSync(int timeout)
+ {
+ synchronized (lockObject)
+ {
+ long start = System.currentTimeMillis();
+ while (syncInFlight)
+ {
+ try
+ {
+ lockObject.wait(SYNC_TIME);
+ } catch (Exception e)
+ {
+ throw new ConsoleException(e);
+ }
+ }
+ long duration = System.currentTimeMillis() - start;
+ if (duration > timeout)
+ {
+ throw new ConsoleException("Timeout waiting for Broker to Sync");
+ }
+ }
+ }
} \ No newline at end of file
diff --git a/java/management/console/src/main/java/org/apache/qpid/console/ClassKey.java b/java/management/console/src/main/java/org/apache/qpid/console/ClassKey.java
index fc32697fb5..9eac8942cb 100644
--- a/java/management/console/src/main/java/org/apache/qpid/console/ClassKey.java
+++ b/java/management/console/src/main/java/org/apache/qpid/console/ClassKey.java
@@ -22,9 +22,12 @@ package org.apache.qpid.console;
import org.apache.qpid.transport.codec.Decoder;
import org.apache.qpid.transport.codec.Encoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ClassKey
{
+ private static Logger log = LoggerFactory.getLogger(ClassKey.class);
private String packageName;
private String className;
private long[] hash = new long[4];
@@ -41,8 +44,8 @@ public class ClassKey
public ClassKey(String keyString)
{
- String delims = ":()";
- String[] parts = keyString.split(java.util.regex.Pattern.quote(delims));
+ String delims = "[*:*(*)]";
+ String[] parts = keyString.split(delims);
if (parts.length < 3)
{
throw new ConsoleException(
@@ -51,7 +54,7 @@ public class ClassKey
setPackageName(parts[0]);
setClassName(parts[1]);
delims = "-";
- String[] bytes = parts[2].split(java.util.regex.Pattern.quote(delims));
+ String[] bytes = parts[2].split(delims);
if (bytes.length != 4)
{
throw new ConsoleException(
@@ -95,15 +98,16 @@ public class ClassKey
{
return hash;
}
-
- public String getHashString() {
- return String.format("%08x-%08x-%08x-%08x", hash[0], hash[1],
- hash[2], hash[3]);
+
+ public String getHashString()
+ {
+ return String.format("%08x-%08x-%08x-%08x", hash[0], hash[1], hash[2],
+ hash[3]);
}
public String getKeyString()
{
- String hashString = this.getHashString() ;
+ String hashString = this.getHashString();
return String.format("%s:%s(%s)", getPackageName(), getClassName(),
hashString);
}
diff --git a/java/management/console/src/main/java/org/apache/qpid/console/QMFObject.java b/java/management/console/src/main/java/org/apache/qpid/console/QMFObject.java
index f60270d58a..1919bac411 100644
--- a/java/management/console/src/main/java/org/apache/qpid/console/QMFObject.java
+++ b/java/management/console/src/main/java/org/apache/qpid/console/QMFObject.java
@@ -233,7 +233,7 @@ public class QMFObject
return objectID;
}
- public final Object GetProperty(String attributeName)
+ public final Object getProperty(String attributeName)
{
return properties.get(attributeName);
}
diff --git a/java/management/console/src/main/java/org/apache/qpid/console/SchemaClass.java b/java/management/console/src/main/java/org/apache/qpid/console/SchemaClass.java
index 66e3fc3feb..783a4b4b86 100644
--- a/java/management/console/src/main/java/org/apache/qpid/console/SchemaClass.java
+++ b/java/management/console/src/main/java/org/apache/qpid/console/SchemaClass.java
@@ -198,7 +198,7 @@ public class SchemaClass
{
superType = value;
}
-
+
public ArrayList<SchemaProperty> getProperties()
{
return properties;
diff --git a/java/management/console/src/main/java/org/apache/qpid/console/Session.java b/java/management/console/src/main/java/org/apache/qpid/console/Session.java
index 424750ea57..cb2e39c15f 100644
--- a/java/management/console/src/main/java/org/apache/qpid/console/Session.java
+++ b/java/management/console/src/main/java/org/apache/qpid/console/Session.java
@@ -149,6 +149,7 @@ public class Session
{ this, schema, dec, hasProperties, hasStats, isManaged };
try
{
+ log.debug("" + realClass);
Constructor ci = realClass.getConstructor(types);
return (QMFObject) ci.newInstance(args);
} catch (Exception e)
@@ -277,7 +278,7 @@ public class Session
enc.writeUint16(((Integer) val).intValue());
break;
case 3: // U32
- enc.writeUint32(((Long) val).longValue());
+ enc.writeUint32(((Integer) val).longValue());
break;
case 4: // U64
enc.writeUint64(((Long) val).longValue());
diff --git a/java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java b/java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java
index 5904d57e64..1ab93de6a2 100644
--- a/java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java
+++ b/java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java
@@ -1,103 +1,134 @@
package org.apache.qpid.console;
-
public class XMLUtil
{
-
- public static String commonAttributes(SchemaVariable var) {
- String returnString = "" ;
- if (var.getDescription() != null){
- returnString = returnString + String.format(" desc='%s'", var.getDescription()) ;
- }
-
- if (var.getRefPackage() != null){
- returnString = returnString + String.format(" refPackage='%s'", var.getRefPackage()) ;
- }
-
- if (var.getRefClass() != null){
- returnString = returnString + String.format(" refClass='%s'", var.getRefClass()) ;
- }
-
- if (var.getUnit() != null){
- returnString = returnString + String.format(" unit='%s'", var.getUnit()) ;
- }
-
- if (var.getMin() != null){
- returnString = returnString + String.format(" min='%s'", var.getMin()) ;
- }
- if (var.getMax() != null){
- returnString = returnString + String.format(" max='%s'", var.getMax()) ;
- }
- if (var.getMaxLength() != null){
- returnString = returnString + String.format(" maxLength='%s'", var.getMaxLength()) ;
+ public static String commonAttributes(SchemaVariable var)
+ {
+ String returnString = "";
+ if (var.getDescription() != null)
+ {
+ returnString = returnString
+ + String.format(" desc='%s'", var.getDescription());
+ }
+ if (var.getRefPackage() != null)
+ {
+ returnString = returnString
+ + String.format(" refPackage='%s'", var.getRefPackage());
+ }
+ if (var.getRefClass() != null)
+ {
+ returnString = returnString
+ + String.format(" refClass='%s'", var.getRefClass());
+ }
+ if (var.getUnit() != null)
+ {
+ returnString = returnString
+ + String.format(" unit='%s'", var.getUnit());
}
-
- return returnString ;
- }
+ if (var.getMin() != null)
+ {
+ returnString = returnString
+ + String.format(" min='%s'", var.getMin());
+ }
+ if (var.getMax() != null)
+ {
+ returnString = returnString
+ + String.format(" max='%s'", var.getMax());
+ }
+ if (var.getMaxLength() != null)
+ {
+ returnString = returnString
+ + String.format(" maxLength='%s'", var.getMaxLength());
+ }
+ return returnString;
+ }
- public static String schemaXML(Session sess, String packageName) {
- String returnValue = String.format("<schema package='%s'>\n", packageName) ;
- for (ClassKey key : sess.getClasses(packageName)) {
- SchemaClass schema = sess.getSchema(key) ;
- if (schema.getKind() == 1) {
- if (schema.getSuperType() == null) {
- returnValue += String.format("\t<class name='%s' hash='%s'>\n", key.getClassName(), key.getHashString()) ;
- }
- else {
- returnValue += String.format("\t<class name='%s' hash='%s' extends='%s'>\n", key.getClassName(), key.getHashString(), schema.getSuperType().getKeyString()) ;
+ public static String schemaXML(Session sess, String packageName)
+ {
+ String returnValue = String.format("<schema package='%s'>\n",
+ packageName);
+ for (ClassKey key : sess.getClasses(packageName))
+ {
+ SchemaClass schema = sess.getSchema(key);
+ if (schema.getKind() == 1)
+ {
+ if (schema.getSuperType() == null)
+ {
+ returnValue += String.format(
+ "\t<class name='%s' hash='%s'>\n", key
+ .getClassName(), key.getHashString());
+ } else
+ {
+ returnValue += String.format(
+ "\t<class name='%s' hash='%s' extends='%s'>\n", key
+ .getClassName(), key.getHashString(),
+ schema.getSuperType().getKeyString());
}
- for (SchemaProperty prop : schema.getProperties()) {
- Object[] attributes = new Object[5] ;
- attributes[0] = prop.getName() ;
- attributes[1] = Util.typeName(prop.getType()) ;
- attributes[2] = Util.accessName(prop.getAccess()) ;
- attributes[3] = prop.getOptional()? "True" : "False ";
+ for (SchemaProperty prop : schema.getProperties())
+ {
+ Object[] attributes = new Object[5];
+ attributes[0] = prop.getName();
+ attributes[1] = Util.typeName(prop.getType());
+ attributes[2] = Util.accessName(prop.getAccess());
+ attributes[3] = prop.getOptional() ? "True" : "False ";
attributes[4] = XMLUtil.commonAttributes(prop);
- returnValue += String.format("\t\t<property name='%s' type='%s' access='%s' optional='%s'%s/>\n", attributes) ;
+ returnValue += String
+ .format(
+ "\t\t<property name='%s' type='%s' access='%s' optional='%s'%s/>\n",
+ attributes);
}
- for (SchemaMethod meth : schema.getMethods()) {
- returnValue += String.format("\t\t<method name='%s'/>\n", meth.getName()) ;
- for (SchemaArgument arg : meth.Arguments) {
- Object[] attributes = new Object[4] ;
- attributes[0] = arg.getName() ;
- attributes[1] = arg.getDirection() ;
- attributes[2] = Util.typeName(arg.getType()) ;
- attributes[3] = XMLUtil.commonAttributes(arg);
- returnValue += String.format("\t\t\t<arg name='%s' dir='%s' type='%s'%s/>\n", attributes) ;
+ for (SchemaMethod meth : schema.getMethods())
+ {
+ returnValue += String.format("\t\t<method name='%s'/>\n",
+ meth.getName());
+ for (SchemaArgument arg : meth.Arguments)
+ {
+ Object[] attributes = new Object[4];
+ attributes[0] = arg.getName();
+ attributes[1] = arg.getDirection();
+ attributes[2] = Util.typeName(arg.getType());
+ attributes[3] = XMLUtil.commonAttributes(arg);
+ returnValue += String
+ .format(
+ "\t\t\t<arg name='%s' dir='%s' type='%s'%s/>\n",
+ attributes);
}
- returnValue += String.format("\t\t</method>\n") ;
+ returnValue += String.format("\t\t</method>\n");
}
- returnValue += String.format("\t</class>\n") ;
- } else {
- returnValue += String.format("\t<event name='%s' hash='%s'>\n", key.getClassName(), key.getHashString()) ;
- for (SchemaArgument arg : schema.getArguments()) {
- Object[] attributes = new Object[4] ;
- attributes[0] = arg.getName() ;
- attributes[1] = Util.typeName(arg.getType()) ;
- attributes[2] = XMLUtil.commonAttributes(arg);
- returnValue += String.format("\t\t\t<arg name='%s' type='%s'%s/>\n", attributes) ;
+ returnValue += String.format("\t</class>\n");
+ } else
+ {
+ returnValue += String.format("\t<event name='%s' hash='%s'>\n",
+ key.getClassName(), key.getHashString());
+ for (SchemaArgument arg : schema.getArguments())
+ {
+ Object[] attributes = new Object[4];
+ attributes[0] = arg.getName();
+ attributes[1] = Util.typeName(arg.getType());
+ attributes[2] = XMLUtil.commonAttributes(arg);
+ returnValue += String.format(
+ "\t\t\t<arg name='%s' type='%s'%s/>\n", attributes);
}
- returnValue += String.format("\t</event>\n") ;
+ returnValue += String.format("\t</event>\n");
}
}
- returnValue += String.format("</schema>\n") ;
-
- return returnValue ;
- }
-
- public static String schemaXML(Session sess, String[] packageNames) {
- String returnValue = "<schemas>\n" ;
- for (String pack : packageNames) {
- returnValue += XMLUtil.schemaXML(sess, pack) ;
- returnValue += "\n" ;
+ returnValue += String.format("</schema>\n");
+ return returnValue;
+ }
+
+ public static String schemaXML(Session sess, String[] packageNames)
+ {
+ String returnValue = "<schemas>\n";
+ for (String pack : packageNames)
+ {
+ returnValue += XMLUtil.schemaXML(sess, pack);
+ returnValue += "\n";
}
- returnValue += "</schemas>\n" ;
- return returnValue ;
+ returnValue += "</schemas>\n";
+ return returnValue;
}
-
+
protected XMLUtil()
{
}
}
-
-
diff --git a/java/management/console/src/test/java/org/apache/qpid/console/ClassKeyTest.java b/java/management/console/src/test/java/org/apache/qpid/console/ClassKeyTest.java
new file mode 100644
index 0000000000..dc16aaac5b
--- /dev/null
+++ b/java/management/console/src/test/java/org/apache/qpid/console/ClassKeyTest.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.console;
+
+import junit.framework.TestCase;
+
+public class ClassKeyTest extends TestCase
+{
+ public void testCreation()
+ {
+ ClassKey key = new ClassKey(
+ "some.package:Class(00000001-00000002-00000003-00000004)");
+ assertEquals("some.package", key.getPackageName());
+ assertEquals("Class", key.getClassName());
+ assertEquals("00000001-00000002-00000003-00000004", key.getHashString());
+ assertEquals(1, key.getHash()[0]);
+ assertEquals(2, key.getHash()[1]);
+ assertEquals(3, key.getHash()[2]);
+ assertEquals(4, key.getHash()[3]);
+ }
+}