diff options
| author | Ted Ross <tross@apache.org> | 2009-06-26 17:40:01 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2009-06-26 17:40:01 +0000 |
| commit | 3851ab7c8e7668638d4e0fb4627db96013a753c6 (patch) | |
| tree | 843af86708a86678534295a4c09188d906ad0913 /java | |
| parent | 77cd3793e12416dc017ace43f6ef5331fb0602c4 (diff) | |
| download | qpid-python-3851ab7c8e7668638d4e0fb4627db96013a753c6.tar.gz | |
QPID-1948 - Changes to the java consle as a result of a code generated front end.
Patch from Bryan Kearney
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@788782 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
8 files changed, 674 insertions, 598 deletions
diff --git a/java/build.deps b/java/build.deps index 9ee1f47463..7f7d5d94ea 100644 --- a/java/build.deps +++ b/java/build.deps @@ -167,6 +167,7 @@ client-example.test.libs=${test.libs} tools.test.libs=${client.test.libs} testkit.test.libs=${test.libs} management-client.test.libs=${muse.libs} ${test.libs} ${log4j} ${javassist} ${geronimo-servlet} ${commons-pool} +management-console.test.libs=${junit4} ${slf4j-log4j} ${log4j} ${client.libs} management-eclipse-plugin.test.libs=${systests.libs} broker-plugins.test.libs=${test.libs} management-tools-qpid-cli.test.libs=${junit4} ${slf4j-log4j} ${log4j} ${client.libs} diff --git a/java/management/console/src/main/java/org/apache/qpid/console/Broker.java b/java/management/console/src/main/java/org/apache/qpid/console/Broker.java index a672beb548..8c71925f4e 100644 --- a/java/management/console/src/main/java/org/apache/qpid/console/Broker.java +++ b/java/management/console/src/main/java/org/apache/qpid/console/Broker.java @@ -1,505 +1,505 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.console;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.UUID;
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.transport.codec.BBDecoder;
-import org.apache.qpid.transport.codec.BBEncoder;
-import org.apache.qpid.transport.codec.Decoder;
-import org.apache.qpid.transport.codec.Encoder;
-
-public class Broker implements MessageListener
-{
- class HeaderInfo
- {
- boolean valid;
- long sequence;
- char opcode;
-
- public String toString()
- {
- return String.format("%s Header with opcode %s and sequence %s",
- (valid ? "Valid" : "Invalid"), opcode, sequence);
- }
- }
- private static Logger log = LoggerFactory.getLogger(Broker.class);
- public static int SYNC_TIME = 60000;
- // JMS Stuff
- private javax.jms.Session session;
- boolean sessionTransacted = false;
- private String replyName;
- private String topicName;
- private MessageProducer prod;
- private ArrayList<MessageConsumer> consumers = new ArrayList<MessageConsumer>();
- private Queue reply;
- private Queue topic;
- private int acknowledgeMode = javax.jms.Session.AUTO_ACKNOWLEDGE;
- // QMF Stuff
- AMQConnection connection;
- public String url;
- public java.util.HashMap<String, Agent> Agents = new java.util.HashMap<String, Agent>();
- private Session consoleSession;
- private boolean connected = false;
- private boolean syncInFlight = false;
- private boolean topicBound = false;
- private int reqsOutstanding = 0;
- private Object lockObject = new Object();
-
- UUID brokerId = UUID.randomUUID();
-
- public Broker(org.apache.qpid.console.Session session, String url)
- {
- log.debug("Creating a new Broker for url " + url);
- this.url = url;
- consoleSession = session;
- this.tryToConnect();
- }
-
- public int brokerBank()
- {
- return 1;
- }
-
- protected HeaderInfo CheckHeader(Decoder decoder)
- {
- HeaderInfo returnValue = new HeaderInfo();
- returnValue.opcode = 'x';
- returnValue.sequence = -1;
- returnValue.valid = false;
- if (decoder.hasRemaining())
- {
- char character = (char) decoder.readUint8();
- if (character != 'A')
- {
- return returnValue;
- }
- character = (char) decoder.readUint8();
- if (character != 'M')
- {
- return returnValue;
- }
- character = (char) decoder.readUint8();
- if (character != '3')
- {
- return returnValue;
- }
- returnValue.valid = true;
- returnValue.opcode = (char) decoder.readUint8();
- returnValue.sequence = decoder.readUint32();
- }
- return returnValue;
- }
-
- public Encoder createEncoder(char opcode, long sequence)
- {
- return setHeader(new BBEncoder(1024), opcode, sequence);
- }
-
- public Message createMessage(Encoder enc)
- {
- try
- {
- byte[] buf = new byte[1024];
- byte[] body = new byte[1024];
- BBEncoder bbenc = (BBEncoder) enc;
- BytesMessage msg = session.createBytesMessage();
- ByteBuffer slice = bbenc.buffer();
- while (slice.hasRemaining())
- {
- int n = Math.min(buf.length, slice.remaining());
- slice.get(buf, 0, n);
- msg.writeBytes(buf, 0, n);
- }
- return msg;
- } catch (JMSException e)
- {
- throw new ConsoleException(e);
- }
- }
-
- public void decrementOutstanding()
- {
- synchronized (lockObject)
- {
- this.reqsOutstanding -= 1;
- if ((reqsOutstanding == 0) & !topicBound)
- {
- for (String key : consoleSession.bindingKeys())
- {
- try
- {
- // this.clientSession.exchangeBind(topicName,
- // "qpid.mannagement", key) ;
- log.debug("Setting Topic Binding " + key);
- // topicName = "management://qpid.management//" + key;
- String rk = String.format("&routingkey='%s'", key);
- Queue aQueue = session.createQueue(topicName + rk);
- MessageConsumer cons = session.createConsumer(aQueue);
- cons.setMessageListener(this);
- consumers.add(cons);
- } catch (JMSException e)
- {
- throw new ConsoleException(e);
- }
- }
- topicBound = true;
- }
- if ((reqsOutstanding == 0) & syncInFlight)
- {
- syncInFlight = false;
- lockObject.notifyAll();
- }
- }
- }
-
- private byte[] ensure(int capacity, byte[] body, int size)
- {
- if (capacity > body.length)
- {
- byte[] copy = new byte[capacity];
- System.arraycopy(body, 0, copy, 0, size);
- body = copy;
- }
- return body;
- }
-
- protected void finalize()
- {
- if (connected)
- {
- this.shutdown();
- }
- }
-
- public boolean getSyncInFlight()
- {
- return syncInFlight;
- }
-
- public void incrementOutstanding()
- {
- synchronized (lockObject)
- {
- this.reqsOutstanding += 1;
- }
- }
-
- public boolean isConnected()
- {
- return connected;
- }
-
- public void onMessage(Message msg)
- {
- Decoder decoder = readBody(msg);
- HeaderInfo headerInfo = this.CheckHeader(decoder);
- // log.debug(headerInfo.toString());
- while (headerInfo.valid)
- {
- long seq = headerInfo.sequence;
- switch (headerInfo.opcode)
- {
- case 'b':
- consoleSession.handleBrokerResponse(this, decoder, seq);
- break;
- case 'p':
- consoleSession.handlePackageIndicator(this, decoder, seq);
- break;
- case 'z':
- consoleSession.handleCommandComplete(this, decoder, seq);
- break;
- case 'q':
- consoleSession.handleClassIndicator(this, decoder, seq);
- break;
- case 'm':
- consoleSession.handleMethodResponse(this, decoder, seq);
- break;
- case 'h':
- consoleSession
- .handleHeartbeatIndicator(this, decoder, seq, msg);
- break;
- case 'e':
- consoleSession.handleEventIndicator(this, decoder, seq);
- break;
- case 's':
- consoleSession.handleSchemaResponse(this, decoder, seq);
- break;
- case 'c':
- consoleSession.handleContentIndicator(this, decoder, seq, true,
- false);
- break;
- case 'i':
- consoleSession.handleContentIndicator(this, decoder, seq,
- false, true);
- break;
- case 'g':
- consoleSession.handleContentIndicator(this, decoder, seq, true,
- true);
- break;
- default:
- log.error("Invalid message type recieved with opcode "
- + headerInfo.opcode);
- break;
- }
- headerInfo = this.CheckHeader(decoder);
- }
- }
-
- private Decoder readBody(Message message)
- {
- BytesMessage msg = (BytesMessage) message;
- BBDecoder dec = new BBDecoder();
- byte[] buf = new byte[1024];
- byte[] body = new byte[1024];
- int size = 0;
- int n;
- try
- {
- while ((n = msg.readBytes(buf)) > 0)
- {
- body = ensure(size + n, body, size);
- System.arraycopy(buf, 0, body, size, n);
- size += n;
- }
- } catch (JMSException e)
- {
- throw new ConsoleException(e);
- }
- dec.init(ByteBuffer.wrap(body, 0, size));
- return dec;
- }
-
- public void send(Encoder enc)
- {
- this.send(this.createMessage(enc), "broker");
- }
-
- public void send(Message msg)
- {
- this.send(msg, "broker", -1);
- }
-
- public void send(Message msg, String routingKey)
- {
- this.send(msg, routingKey, -1);
- }
-
- public void send(Message msg, String routingKey, int ttl)
- {
- synchronized (lockObject)
- {
- try
- {
- log.debug(String.format("Sending message to routing key '%s'",
- routingKey));
- String destName = String.format(
- "management://qpid.management//?routingkey='%s'",
- routingKey);
- log.debug(destName);
- Queue dest = session.createQueue(destName);
- // Queue jmsReply = session
- // createQueue("direct://amq.direct//?routingkey='reply-"
- // + brokerId + "'");
- if (ttl != -1)
- {
- msg.setJMSExpiration(ttl);
- }
- msg.setJMSReplyTo(reply);
- prod.send(dest, msg);
- } catch (Exception e)
- {
- throw new ConsoleException(e);
- }
- }
- }
-
- protected Encoder setHeader(Encoder enc, char opcode, long sequence)
- {
- enc.writeUint8((short) 'A');
- enc.writeUint8((short) 'M');
- enc.writeUint8((short) '3');
- enc.writeUint8((short) opcode);
- enc.writeUint32(sequence);
- return enc;
- }
-
- public void setSyncInFlight(boolean inFlight)
- {
- synchronized (lockObject)
- {
- syncInFlight = inFlight;
- lockObject.notifyAll();
- }
- }
-
- public void shutdown()
- {
- if (connected)
- {
- this.waitForStable();
- try
- {
- session.close();
- for (MessageConsumer cons : consumers)
- {
- cons.close();
- }
- connection.close();
- } catch (Exception e)
- {
- throw new ConsoleException(e);
- } finally
- {
- this.connected = false;
- }
- }
- }
-
- protected void tryToConnect()
- {
- try
- {
- reqsOutstanding = 1;
- Agent newAgent = new Agent(this, 0, "BrokerAgent");
- Agents.put(newAgent.agentKey(), newAgent);
- connection = new AMQConnection(url);
- session = connection.createSession(sessionTransacted,
- acknowledgeMode);
- replyName = String
- .format(
- "direct://amq.direct//reply-%s?exclusive='True'&autodelete='True'",
- brokerId);
- topicName = String
- .format(
- "management://qpid.management//topic-%s?exclusive='True'&autodelete='True'",
- brokerId);
- reply = session.createQueue(replyName);
- MessageConsumer cons = session.createConsumer(reply);
- cons.setMessageListener(this);
- consumers.add(cons);
- prod = session.createProducer(null);
- topic = session.createQueue(topicName);
- cons = session.createConsumer(topic);
- cons.setMessageListener(this);
- consumers.add(cons);
- connection.start();
- // Rest of the topic is bound later. Start er up
- } catch (Exception e)
- {
- throw new ConsoleException(e);
- }
- connected = true;
- consoleSession.handleBrokerConnect(this);
- Encoder Encoder = createEncoder('B', 0);
- this.send(Encoder);
- }
-
- public void updateAgent(QMFObject obj)
- {
- long agentBank = (Long) obj.GetProperty("agentBank");
- long brokerBank = (Long) obj.GetProperty("brokerBank");
- String key = Agent.AgentKey(agentBank, brokerBank);
- if (obj.isDeleted())
- {
- if (Agents.containsKey(key))
- {
- Agent agent = Agents.get(key);
- Agents.remove(key);
- consoleSession.handleAgentRemoved(agent);
- }
- } else
- {
- if (!Agents.containsKey(key))
- {
- Agent newAgent = new Agent(this, agentBank, (String) obj
- .GetProperty("label"));
- Agents.put(key, newAgent);
- consoleSession.handleNewAgent(newAgent);
- }
- }
- }
-
- public void waitForStable()
- {
- synchronized (lockObject)
- {
- if (connected)
- {
- long start = System.currentTimeMillis();
- syncInFlight = true;
- while (reqsOutstanding != 0)
- {
- log.debug("Waiting to recieve messages");
- try
- {
- lockObject.wait(SYNC_TIME);
- } catch (Exception e)
- {
- throw new ConsoleException(e);
- }
- long duration = System.currentTimeMillis() - start;
- if (duration > SYNC_TIME)
- {
- throw new ConsoleException(
- "Timeout waiting for Broker to Sync");
- }
- }
- }
- }
- }
-
- public void waitForSync(int timeout)
- {
- synchronized (lockObject)
- {
- long start = System.currentTimeMillis();
- while (syncInFlight)
- {
- try
- {
- lockObject.wait(SYNC_TIME);
- } catch (Exception e)
- {
- throw new ConsoleException(e);
- }
- }
- long duration = System.currentTimeMillis() - start;
- if (duration > timeout)
- {
- throw new ConsoleException("Timeout waiting for Broker to Sync");
- }
- }
- }
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.UUID; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.transport.codec.BBEncoder; +import org.apache.qpid.transport.codec.Decoder; +import org.apache.qpid.transport.codec.Encoder; + +public class Broker implements MessageListener +{ + class HeaderInfo + { + boolean valid; + long sequence; + char opcode; + + public String toString() + { + return String.format("%s Header with opcode %s and sequence %s", + (valid ? "Valid" : "Invalid"), opcode, sequence); + } + } + + private static Logger log = LoggerFactory.getLogger(Broker.class); + public static int SYNC_TIME = 60000; + // JMS Stuff + private javax.jms.Session session; + boolean sessionTransacted = false; + private String replyName; + private String topicName; + private MessageProducer prod; + private ArrayList<MessageConsumer> consumers = new ArrayList<MessageConsumer>(); + private Queue reply; + private Queue topic; + private int acknowledgeMode = javax.jms.Session.AUTO_ACKNOWLEDGE; + // QMF Stuff + AMQConnection connection; + public String url; + public java.util.HashMap<String, Agent> Agents = new java.util.HashMap<String, Agent>(); + private Session consoleSession; + private boolean connected = false; + private boolean syncInFlight = false; + private boolean topicBound = false; + private int reqsOutstanding = 0; + private Object lockObject = new Object(); + UUID brokerId = UUID.randomUUID(); + + public Broker(org.apache.qpid.console.Session session, String url) + { + log.debug("Creating a new Broker for url " + url); + this.url = url; + consoleSession = session; + this.tryToConnect(); + } + + public int brokerBank() + { + return 1; + } + + protected HeaderInfo CheckHeader(Decoder decoder) + { + HeaderInfo returnValue = new HeaderInfo(); + returnValue.opcode = 'x'; + returnValue.sequence = -1; + returnValue.valid = false; + if (decoder.hasRemaining()) + { + char character = (char) decoder.readUint8(); + if (character != 'A') + { + return returnValue; + } + character = (char) decoder.readUint8(); + if (character != 'M') + { + return returnValue; + } + character = (char) decoder.readUint8(); + if (character != '3') + { + return returnValue; + } + returnValue.valid = true; + returnValue.opcode = (char) decoder.readUint8(); + returnValue.sequence = decoder.readUint32(); + } + return returnValue; + } + + public Encoder createEncoder(char opcode, long sequence) + { + return setHeader(new BBEncoder(1024), opcode, sequence); + } + + public Message createMessage(Encoder enc) + { + try + { + byte[] buf = new byte[1024]; + byte[] body = new byte[1024]; + BBEncoder bbenc = (BBEncoder) enc; + BytesMessage msg = session.createBytesMessage(); + ByteBuffer slice = bbenc.buffer(); + while (slice.hasRemaining()) + { + int n = Math.min(buf.length, slice.remaining()); + slice.get(buf, 0, n); + msg.writeBytes(buf, 0, n); + } + return msg; + } catch (JMSException e) + { + throw new ConsoleException(e); + } + } + + public void decrementOutstanding() + { + synchronized (lockObject) + { + this.reqsOutstanding -= 1; + if ((reqsOutstanding == 0) & !topicBound) + { + for (String key : consoleSession.bindingKeys()) + { + try + { + // this.clientSession.exchangeBind(topicName, + // "qpid.mannagement", key) ; + log.debug("Setting Topic Binding " + key); + // topicName = "management://qpid.management//" + key; + String rk = String.format("&routingkey='%s'", key); + Queue aQueue = session.createQueue(topicName + rk); + MessageConsumer cons = session.createConsumer(aQueue); + cons.setMessageListener(this); + consumers.add(cons); + } catch (JMSException e) + { + throw new ConsoleException(e); + } + } + topicBound = true; + } + if ((reqsOutstanding == 0) & syncInFlight) + { + syncInFlight = false; + lockObject.notifyAll(); + } + } + } + + private byte[] ensure(int capacity, byte[] body, int size) + { + if (capacity > body.length) + { + byte[] copy = new byte[capacity]; + System.arraycopy(body, 0, copy, 0, size); + body = copy; + } + return body; + } + + protected void finalize() + { + if (connected) + { + this.shutdown(); + } + } + + public boolean getSyncInFlight() + { + return syncInFlight; + } + + public void incrementOutstanding() + { + synchronized (lockObject) + { + this.reqsOutstanding += 1; + } + } + + public boolean isConnected() + { + return connected; + } + + public void onMessage(Message msg) + { + Decoder decoder = readBody(msg); + HeaderInfo headerInfo = this.CheckHeader(decoder); + // log.debug(headerInfo.toString()); + while (headerInfo.valid) + { + long seq = headerInfo.sequence; + switch (headerInfo.opcode) + { + case 'b': + consoleSession.handleBrokerResponse(this, decoder, seq); + break; + case 'p': + consoleSession.handlePackageIndicator(this, decoder, seq); + break; + case 'z': + consoleSession.handleCommandComplete(this, decoder, seq); + break; + case 'q': + consoleSession.handleClassIndicator(this, decoder, seq); + break; + case 'm': + consoleSession.handleMethodResponse(this, decoder, seq); + break; + case 'h': + consoleSession + .handleHeartbeatIndicator(this, decoder, seq, msg); + break; + case 'e': + consoleSession.handleEventIndicator(this, decoder, seq); + break; + case 's': + consoleSession.handleSchemaResponse(this, decoder, seq); + break; + case 'c': + consoleSession.handleContentIndicator(this, decoder, seq, true, + false); + break; + case 'i': + consoleSession.handleContentIndicator(this, decoder, seq, + false, true); + break; + case 'g': + consoleSession.handleContentIndicator(this, decoder, seq, true, + true); + break; + default: + log.error("Invalid message type recieved with opcode " + + headerInfo.opcode); + break; + } + headerInfo = this.CheckHeader(decoder); + } + } + + private Decoder readBody(Message message) + { + BytesMessage msg = (BytesMessage) message; + BBDecoder dec = new BBDecoder(); + byte[] buf = new byte[1024]; + byte[] body = new byte[1024]; + int size = 0; + int n; + try + { + while ((n = msg.readBytes(buf)) > 0) + { + body = ensure(size + n, body, size); + System.arraycopy(buf, 0, body, size, n); + size += n; + } + } catch (JMSException e) + { + throw new ConsoleException(e); + } + dec.init(ByteBuffer.wrap(body, 0, size)); + return dec; + } + + public void send(Encoder enc) + { + this.send(this.createMessage(enc), "broker"); + } + + public void send(Message msg) + { + this.send(msg, "broker", -1); + } + + public void send(Message msg, String routingKey) + { + this.send(msg, routingKey, -1); + } + + public void send(Message msg, String routingKey, int ttl) + { + synchronized (lockObject) + { + try + { + log.debug(String.format("Sending message to routing key '%s'", + routingKey)); + String destName = String.format( + "management://qpid.management//?routingkey='%s'", + routingKey); + log.debug(destName); + Queue dest = session.createQueue(destName); + // Queue jmsReply = session + // createQueue("direct://amq.direct//?routingkey='reply-" + // + brokerId + "'"); + if (ttl != -1) + { + msg.setJMSExpiration(ttl); + } + msg.setJMSReplyTo(reply); + prod.send(dest, msg); + } catch (Exception e) + { + throw new ConsoleException(e); + } + } + } + + protected Encoder setHeader(Encoder enc, char opcode, long sequence) + { + enc.writeUint8((short) 'A'); + enc.writeUint8((short) 'M'); + enc.writeUint8((short) '3'); + enc.writeUint8((short) opcode); + enc.writeUint32(sequence); + return enc; + } + + public void setSyncInFlight(boolean inFlight) + { + synchronized (lockObject) + { + syncInFlight = inFlight; + lockObject.notifyAll(); + } + } + + public void shutdown() + { + if (connected) + { + this.waitForStable(); + try + { + session.close(); + for (MessageConsumer cons : consumers) + { + cons.close(); + } + connection.close(); + } catch (Exception e) + { + throw new ConsoleException(e); + } finally + { + this.connected = false; + } + } + } + + protected void tryToConnect() + { + try + { + reqsOutstanding = 1; + Agent newAgent = new Agent(this, 0, "BrokerAgent"); + Agents.put(newAgent.agentKey(), newAgent); + connection = new AMQConnection(url); + session = connection.createSession(sessionTransacted, + acknowledgeMode); + replyName = String + .format( + "direct://amq.direct//reply-%s?exclusive='True'&autodelete='True'", + brokerId); + topicName = String + .format( + "management://qpid.management//topic-%s?exclusive='True'&autodelete='True'", + brokerId); + reply = session.createQueue(replyName); + MessageConsumer cons = session.createConsumer(reply); + cons.setMessageListener(this); + consumers.add(cons); + prod = session.createProducer(null); + topic = session.createQueue(topicName); + cons = session.createConsumer(topic); + cons.setMessageListener(this); + consumers.add(cons); + connection.start(); + // Rest of the topic is bound later. Start er up + } catch (Exception e) + { + throw new ConsoleException(e); + } + connected = true; + consoleSession.handleBrokerConnect(this); + Encoder Encoder = createEncoder('B', 0); + this.send(Encoder); + } + + public void updateAgent(QMFObject obj) + { + long agentBank = (Long) obj.getProperty("agentBank"); + long brokerBank = (Long) obj.getProperty("brokerBank"); + String key = Agent.AgentKey(agentBank, brokerBank); + if (obj.isDeleted()) + { + if (Agents.containsKey(key)) + { + Agent agent = Agents.get(key); + Agents.remove(key); + consoleSession.handleAgentRemoved(agent); + } + } else + { + if (!Agents.containsKey(key)) + { + Agent newAgent = new Agent(this, agentBank, (String) obj + .getProperty("label")); + Agents.put(key, newAgent); + consoleSession.handleNewAgent(newAgent); + } + } + } + + public void waitForStable() + { + synchronized (lockObject) + { + if (connected) + { + long start = System.currentTimeMillis(); + syncInFlight = true; + while (reqsOutstanding != 0) + { + log.debug("Waiting to recieve messages"); + try + { + lockObject.wait(SYNC_TIME); + } catch (Exception e) + { + throw new ConsoleException(e); + } + long duration = System.currentTimeMillis() - start; + if (duration > SYNC_TIME) + { + throw new ConsoleException( + "Timeout waiting for Broker to Sync"); + } + } + } + } + } + + public void waitForSync(int timeout) + { + synchronized (lockObject) + { + long start = System.currentTimeMillis(); + while (syncInFlight) + { + try + { + lockObject.wait(SYNC_TIME); + } catch (Exception e) + { + throw new ConsoleException(e); + } + } + long duration = System.currentTimeMillis() - start; + if (duration > timeout) + { + throw new ConsoleException("Timeout waiting for Broker to Sync"); + } + } + } }
\ No newline at end of file diff --git a/java/management/console/src/main/java/org/apache/qpid/console/ClassKey.java b/java/management/console/src/main/java/org/apache/qpid/console/ClassKey.java index fc32697fb5..9eac8942cb 100644 --- a/java/management/console/src/main/java/org/apache/qpid/console/ClassKey.java +++ b/java/management/console/src/main/java/org/apache/qpid/console/ClassKey.java @@ -22,9 +22,12 @@ package org.apache.qpid.console; import org.apache.qpid.transport.codec.Decoder;
import org.apache.qpid.transport.codec.Encoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ClassKey
{
+ private static Logger log = LoggerFactory.getLogger(ClassKey.class);
private String packageName;
private String className;
private long[] hash = new long[4];
@@ -41,8 +44,8 @@ public class ClassKey public ClassKey(String keyString)
{
- String delims = ":()";
- String[] parts = keyString.split(java.util.regex.Pattern.quote(delims));
+ String delims = "[*:*(*)]";
+ String[] parts = keyString.split(delims);
if (parts.length < 3)
{
throw new ConsoleException(
@@ -51,7 +54,7 @@ public class ClassKey setPackageName(parts[0]);
setClassName(parts[1]);
delims = "-";
- String[] bytes = parts[2].split(java.util.regex.Pattern.quote(delims));
+ String[] bytes = parts[2].split(delims);
if (bytes.length != 4)
{
throw new ConsoleException(
@@ -95,15 +98,16 @@ public class ClassKey {
return hash;
}
-
- public String getHashString() {
- return String.format("%08x-%08x-%08x-%08x", hash[0], hash[1],
- hash[2], hash[3]);
+
+ public String getHashString()
+ {
+ return String.format("%08x-%08x-%08x-%08x", hash[0], hash[1], hash[2],
+ hash[3]);
}
public String getKeyString()
{
- String hashString = this.getHashString() ;
+ String hashString = this.getHashString();
return String.format("%s:%s(%s)", getPackageName(), getClassName(),
hashString);
}
diff --git a/java/management/console/src/main/java/org/apache/qpid/console/QMFObject.java b/java/management/console/src/main/java/org/apache/qpid/console/QMFObject.java index f60270d58a..1919bac411 100644 --- a/java/management/console/src/main/java/org/apache/qpid/console/QMFObject.java +++ b/java/management/console/src/main/java/org/apache/qpid/console/QMFObject.java @@ -233,7 +233,7 @@ public class QMFObject return objectID;
}
- public final Object GetProperty(String attributeName)
+ public final Object getProperty(String attributeName)
{
return properties.get(attributeName);
}
diff --git a/java/management/console/src/main/java/org/apache/qpid/console/SchemaClass.java b/java/management/console/src/main/java/org/apache/qpid/console/SchemaClass.java index 66e3fc3feb..783a4b4b86 100644 --- a/java/management/console/src/main/java/org/apache/qpid/console/SchemaClass.java +++ b/java/management/console/src/main/java/org/apache/qpid/console/SchemaClass.java @@ -198,7 +198,7 @@ public class SchemaClass {
superType = value;
}
-
+
public ArrayList<SchemaProperty> getProperties()
{
return properties;
diff --git a/java/management/console/src/main/java/org/apache/qpid/console/Session.java b/java/management/console/src/main/java/org/apache/qpid/console/Session.java index 424750ea57..cb2e39c15f 100644 --- a/java/management/console/src/main/java/org/apache/qpid/console/Session.java +++ b/java/management/console/src/main/java/org/apache/qpid/console/Session.java @@ -149,6 +149,7 @@ public class Session { this, schema, dec, hasProperties, hasStats, isManaged };
try
{
+ log.debug("" + realClass);
Constructor ci = realClass.getConstructor(types);
return (QMFObject) ci.newInstance(args);
} catch (Exception e)
@@ -277,7 +278,7 @@ public class Session enc.writeUint16(((Integer) val).intValue());
break;
case 3: // U32
- enc.writeUint32(((Long) val).longValue());
+ enc.writeUint32(((Integer) val).longValue());
break;
case 4: // U64
enc.writeUint64(((Long) val).longValue());
diff --git a/java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java b/java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java index 5904d57e64..1ab93de6a2 100644 --- a/java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java +++ b/java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java @@ -1,103 +1,134 @@ package org.apache.qpid.console; - public class XMLUtil { - - public static String commonAttributes(SchemaVariable var) { - String returnString = "" ; - if (var.getDescription() != null){ - returnString = returnString + String.format(" desc='%s'", var.getDescription()) ; - } - - if (var.getRefPackage() != null){ - returnString = returnString + String.format(" refPackage='%s'", var.getRefPackage()) ; - } - - if (var.getRefClass() != null){ - returnString = returnString + String.format(" refClass='%s'", var.getRefClass()) ; - } - - if (var.getUnit() != null){ - returnString = returnString + String.format(" unit='%s'", var.getUnit()) ; - } - - if (var.getMin() != null){ - returnString = returnString + String.format(" min='%s'", var.getMin()) ; - } - if (var.getMax() != null){ - returnString = returnString + String.format(" max='%s'", var.getMax()) ; - } - if (var.getMaxLength() != null){ - returnString = returnString + String.format(" maxLength='%s'", var.getMaxLength()) ; + public static String commonAttributes(SchemaVariable var) + { + String returnString = ""; + if (var.getDescription() != null) + { + returnString = returnString + + String.format(" desc='%s'", var.getDescription()); + } + if (var.getRefPackage() != null) + { + returnString = returnString + + String.format(" refPackage='%s'", var.getRefPackage()); + } + if (var.getRefClass() != null) + { + returnString = returnString + + String.format(" refClass='%s'", var.getRefClass()); + } + if (var.getUnit() != null) + { + returnString = returnString + + String.format(" unit='%s'", var.getUnit()); } - - return returnString ; - } + if (var.getMin() != null) + { + returnString = returnString + + String.format(" min='%s'", var.getMin()); + } + if (var.getMax() != null) + { + returnString = returnString + + String.format(" max='%s'", var.getMax()); + } + if (var.getMaxLength() != null) + { + returnString = returnString + + String.format(" maxLength='%s'", var.getMaxLength()); + } + return returnString; + } - public static String schemaXML(Session sess, String packageName) { - String returnValue = String.format("<schema package='%s'>\n", packageName) ; - for (ClassKey key : sess.getClasses(packageName)) { - SchemaClass schema = sess.getSchema(key) ; - if (schema.getKind() == 1) { - if (schema.getSuperType() == null) { - returnValue += String.format("\t<class name='%s' hash='%s'>\n", key.getClassName(), key.getHashString()) ; - } - else { - returnValue += String.format("\t<class name='%s' hash='%s' extends='%s'>\n", key.getClassName(), key.getHashString(), schema.getSuperType().getKeyString()) ; + public static String schemaXML(Session sess, String packageName) + { + String returnValue = String.format("<schema package='%s'>\n", + packageName); + for (ClassKey key : sess.getClasses(packageName)) + { + SchemaClass schema = sess.getSchema(key); + if (schema.getKind() == 1) + { + if (schema.getSuperType() == null) + { + returnValue += String.format( + "\t<class name='%s' hash='%s'>\n", key + .getClassName(), key.getHashString()); + } else + { + returnValue += String.format( + "\t<class name='%s' hash='%s' extends='%s'>\n", key + .getClassName(), key.getHashString(), + schema.getSuperType().getKeyString()); } - for (SchemaProperty prop : schema.getProperties()) { - Object[] attributes = new Object[5] ; - attributes[0] = prop.getName() ; - attributes[1] = Util.typeName(prop.getType()) ; - attributes[2] = Util.accessName(prop.getAccess()) ; - attributes[3] = prop.getOptional()? "True" : "False "; + for (SchemaProperty prop : schema.getProperties()) + { + Object[] attributes = new Object[5]; + attributes[0] = prop.getName(); + attributes[1] = Util.typeName(prop.getType()); + attributes[2] = Util.accessName(prop.getAccess()); + attributes[3] = prop.getOptional() ? "True" : "False "; attributes[4] = XMLUtil.commonAttributes(prop); - returnValue += String.format("\t\t<property name='%s' type='%s' access='%s' optional='%s'%s/>\n", attributes) ; + returnValue += String + .format( + "\t\t<property name='%s' type='%s' access='%s' optional='%s'%s/>\n", + attributes); } - for (SchemaMethod meth : schema.getMethods()) { - returnValue += String.format("\t\t<method name='%s'/>\n", meth.getName()) ; - for (SchemaArgument arg : meth.Arguments) { - Object[] attributes = new Object[4] ; - attributes[0] = arg.getName() ; - attributes[1] = arg.getDirection() ; - attributes[2] = Util.typeName(arg.getType()) ; - attributes[3] = XMLUtil.commonAttributes(arg); - returnValue += String.format("\t\t\t<arg name='%s' dir='%s' type='%s'%s/>\n", attributes) ; + for (SchemaMethod meth : schema.getMethods()) + { + returnValue += String.format("\t\t<method name='%s'/>\n", + meth.getName()); + for (SchemaArgument arg : meth.Arguments) + { + Object[] attributes = new Object[4]; + attributes[0] = arg.getName(); + attributes[1] = arg.getDirection(); + attributes[2] = Util.typeName(arg.getType()); + attributes[3] = XMLUtil.commonAttributes(arg); + returnValue += String + .format( + "\t\t\t<arg name='%s' dir='%s' type='%s'%s/>\n", + attributes); } - returnValue += String.format("\t\t</method>\n") ; + returnValue += String.format("\t\t</method>\n"); } - returnValue += String.format("\t</class>\n") ; - } else { - returnValue += String.format("\t<event name='%s' hash='%s'>\n", key.getClassName(), key.getHashString()) ; - for (SchemaArgument arg : schema.getArguments()) { - Object[] attributes = new Object[4] ; - attributes[0] = arg.getName() ; - attributes[1] = Util.typeName(arg.getType()) ; - attributes[2] = XMLUtil.commonAttributes(arg); - returnValue += String.format("\t\t\t<arg name='%s' type='%s'%s/>\n", attributes) ; + returnValue += String.format("\t</class>\n"); + } else + { + returnValue += String.format("\t<event name='%s' hash='%s'>\n", + key.getClassName(), key.getHashString()); + for (SchemaArgument arg : schema.getArguments()) + { + Object[] attributes = new Object[4]; + attributes[0] = arg.getName(); + attributes[1] = Util.typeName(arg.getType()); + attributes[2] = XMLUtil.commonAttributes(arg); + returnValue += String.format( + "\t\t\t<arg name='%s' type='%s'%s/>\n", attributes); } - returnValue += String.format("\t</event>\n") ; + returnValue += String.format("\t</event>\n"); } } - returnValue += String.format("</schema>\n") ; - - return returnValue ; - } - - public static String schemaXML(Session sess, String[] packageNames) { - String returnValue = "<schemas>\n" ; - for (String pack : packageNames) { - returnValue += XMLUtil.schemaXML(sess, pack) ; - returnValue += "\n" ; + returnValue += String.format("</schema>\n"); + return returnValue; + } + + public static String schemaXML(Session sess, String[] packageNames) + { + String returnValue = "<schemas>\n"; + for (String pack : packageNames) + { + returnValue += XMLUtil.schemaXML(sess, pack); + returnValue += "\n"; } - returnValue += "</schemas>\n" ; - return returnValue ; + returnValue += "</schemas>\n"; + return returnValue; } - + protected XMLUtil() { } } - - diff --git a/java/management/console/src/test/java/org/apache/qpid/console/ClassKeyTest.java b/java/management/console/src/test/java/org/apache/qpid/console/ClassKeyTest.java new file mode 100644 index 0000000000..dc16aaac5b --- /dev/null +++ b/java/management/console/src/test/java/org/apache/qpid/console/ClassKeyTest.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console; + +import junit.framework.TestCase; + +public class ClassKeyTest extends TestCase +{ + public void testCreation() + { + ClassKey key = new ClassKey( + "some.package:Class(00000001-00000002-00000003-00000004)"); + assertEquals("some.package", key.getPackageName()); + assertEquals("Class", key.getClassName()); + assertEquals("00000001-00000002-00000003-00000004", key.getHashString()); + assertEquals(1, key.getHash()[0]); + assertEquals(2, key.getHash()[1]); + assertEquals(3, key.getHash()[2]); + assertEquals(4, key.getHash()[3]); + } +} |
