From 160dfd1259052babfd5bbee9384670cebbc796ff Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Mon, 22 Jun 2009 17:43:54 +0000 Subject: QPID-1933 - Patch from Bryan Kearney QMF Java Console git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@787325 13f79535-47bb-0310-9956-ffa450edef68 --- java/build.deps | 1 + java/build.xml | 2 +- java/management/console/build.xml | 27 + .../org/apache/qpid/console/AbstractConsole.java | 81 ++ .../main/java/org/apache/qpid/console/Agent.java | 116 +++ .../main/java/org/apache/qpid/console/Broker.java | 505 ++++++++++ .../java/org/apache/qpid/console/ClassKey.java | 142 +++ .../main/java/org/apache/qpid/console/Console.java | 51 + .../org/apache/qpid/console/ConsoleException.java | 48 + .../org/apache/qpid/console/EventSeverity.java | 37 + .../java/org/apache/qpid/console/MethodResult.java | 88 ++ .../java/org/apache/qpid/console/ObjectID.java | 93 ++ .../java/org/apache/qpid/console/QMFEvent.java | 108 +++ .../java/org/apache/qpid/console/QMFObject.java | 423 ++++++++ .../org/apache/qpid/console/SchemaArgument.java | 65 ++ .../java/org/apache/qpid/console/SchemaClass.java | 251 +++++ .../java/org/apache/qpid/console/SchemaMethod.java | 125 +++ .../org/apache/qpid/console/SchemaProperty.java | 81 ++ .../org/apache/qpid/console/SchemaStatistic.java | 88 ++ .../org/apache/qpid/console/SchemaVariable.java | 185 ++++ .../org/apache/qpid/console/SequenceManager.java | 57 ++ .../main/java/org/apache/qpid/console/Session.java | 1007 ++++++++++++++++++++ .../main/java/org/apache/qpid/console/Util.java | 184 ++++ .../main/java/org/apache/qpid/console/XMLUtil.java | 103 ++ 24 files changed, 3867 insertions(+), 1 deletion(-) create mode 100644 java/management/console/build.xml create mode 100644 java/management/console/src/main/java/org/apache/qpid/console/AbstractConsole.java create mode 100644 java/management/console/src/main/java/org/apache/qpid/console/Agent.java create mode 100644 java/management/console/src/main/java/org/apache/qpid/console/Broker.java create mode 100644 java/management/console/src/main/java/org/apache/qpid/console/ClassKey.java create mode 100644 java/management/console/src/main/java/org/apache/qpid/console/Console.java create mode 100644 java/management/console/src/main/java/org/apache/qpid/console/ConsoleException.java create mode 100644 java/management/console/src/main/java/org/apache/qpid/console/EventSeverity.java create mode 100644 java/management/console/src/main/java/org/apache/qpid/console/MethodResult.java create mode 100644 java/management/console/src/main/java/org/apache/qpid/console/ObjectID.java create mode 100644 java/management/console/src/main/java/org/apache/qpid/console/QMFEvent.java create mode 100644 java/management/console/src/main/java/org/apache/qpid/console/QMFObject.java create mode 100644 java/management/console/src/main/java/org/apache/qpid/console/SchemaArgument.java create mode 100644 java/management/console/src/main/java/org/apache/qpid/console/SchemaClass.java create mode 100644 java/management/console/src/main/java/org/apache/qpid/console/SchemaMethod.java create mode 100644 java/management/console/src/main/java/org/apache/qpid/console/SchemaProperty.java create mode 100644 java/management/console/src/main/java/org/apache/qpid/console/SchemaStatistic.java create mode 100644 java/management/console/src/main/java/org/apache/qpid/console/SchemaVariable.java create mode 100644 java/management/console/src/main/java/org/apache/qpid/console/SequenceManager.java create mode 100644 java/management/console/src/main/java/org/apache/qpid/console/Session.java create mode 100644 java/management/console/src/main/java/org/apache/qpid/console/Util.java create mode 100644 java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java (limited to 'java') diff --git a/java/build.deps b/java/build.deps index 57f9e0484e..9ee1f47463 100644 --- a/java/build.deps +++ b/java/build.deps @@ -87,6 +87,7 @@ broker-plugins.libs=${common.libs} ${felix.libs} management-client.libs=${jsp.libs} ${log4j} ${slf4j-log4j} ${slf4j-api} ${commons-pool} ${geronimo-servlet} ${muse.libs} ${javassist} ${xalan} ${mina-core} ${mina-filter-ssl} management-agent.libs=${client.libs} ${commons-logging} ${geronimo-jms} +management-console.libs=${client.libs} ${commons-logging} ${geronimo-jms} junit-toolkit.libs=${log4j} ${junit} ${slf4j-api} test.libs=${slf4j-log4j} ${junit-toolkit.libs} diff --git a/java/build.xml b/java/build.xml index beaaf7fed7..db3ba05d29 100644 --- a/java/build.xml +++ b/java/build.xml @@ -25,7 +25,7 @@ - + + + + + + + + diff --git a/java/management/console/src/main/java/org/apache/qpid/console/AbstractConsole.java b/java/management/console/src/main/java/org/apache/qpid/console/AbstractConsole.java new file mode 100644 index 0000000000..d95003b1cc --- /dev/null +++ b/java/management/console/src/main/java/org/apache/qpid/console/AbstractConsole.java @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console; + +public class AbstractConsole implements Console +{ + public AbstractConsole() + { + } + + public void agentRemoved(Agent agent) + { + } + + public void brokerConnected(Broker broker) + { + } + + public void brokerDisconnected(Broker broker) + { + } + + public void brokerInformation(Broker broker) + { + } + + public void eventRecieved(Broker broker, QMFEvent anEvent) + { + } + + public void hearbeatRecieved(Agent agent, long timestamp) + { + } + + public void methodResponse(Broker broker, long seq, MethodResult response) + { + } + + public void newAgent(Agent agent) + { + } + + public void newClass(short kind, ClassKey key) + { + } + + public void newPackage(String packageName) + { + } + + public void objectProperties(Broker broker, QMFObject obj) + { + } + + public void objectStatistics(Broker broker, QMFObject obj) + { + } + + public Class typeMapping(ClassKey key) + { + return QMFObject.class; + } +} \ No newline at end of file diff --git a/java/management/console/src/main/java/org/apache/qpid/console/Agent.java b/java/management/console/src/main/java/org/apache/qpid/console/Agent.java new file mode 100644 index 0000000000..e1887d82ea --- /dev/null +++ b/java/management/console/src/main/java/org/apache/qpid/console/Agent.java @@ -0,0 +1,116 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Agent +{ + private static Logger log = LoggerFactory.getLogger(Agent.class); + + public static String AgentKey(long AgentBank, long BrokerBank) + { + return String.format("%s:%s", AgentBank, BrokerBank); + } + + public static long getAgentBank(String routingKey) + { + String delim = "."; + return Long.parseLong(routingKey.split(java.util.regex.Pattern + .quote(delim))[3]); + } + + public static long getBrokerBank(String routingKey) + { + String delim = "."; + return Long.parseLong(routingKey.split(java.util.regex.Pattern + .quote(delim))[2]); + } + + public static String routingCode(long AgentBank, long BrokerBank) + { + return String.format("agent.%s.%s", BrokerBank, AgentBank); + } + + private long agentBank; + private Broker broker; + private long brokerBank; + private String label; + + public Agent(Broker broker, long agentBank, String label) + { + this.setBroker(broker); + this.setBrokerBank(broker.brokerBank()); + this.setAgentBank(agentBank); + this.setlabel(label); + } + + public final String agentKey() + { + return Agent.AgentKey(getAgentBank(), getBrokerBank()); + } + + public final long getAgentBank() + { + return agentBank; + } + + public final Broker getBroker() + { + return broker; + } + + public final long getBrokerBank() + { + return brokerBank; + } + + public final String getlabel() + { + return label; + } + + public final String routingCode() + { + return Agent.routingCode(getAgentBank(), getBrokerBank()); + } + + public final void setAgentBank(long value) + { + agentBank = value; + } + + public final void setBroker(Broker value) + { + broker = value; + } + + public final void setBrokerBank(long value) + { + brokerBank = value; + } + + public final void setlabel(String value) + { + label = value; + } +} \ No newline at end of file diff --git a/java/management/console/src/main/java/org/apache/qpid/console/Broker.java b/java/management/console/src/main/java/org/apache/qpid/console/Broker.java new file mode 100644 index 0000000000..a672beb548 --- /dev/null +++ b/java/management/console/src/main/java/org/apache/qpid/console/Broker.java @@ -0,0 +1,505 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.UUID; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.transport.codec.BBEncoder; +import org.apache.qpid.transport.codec.Decoder; +import org.apache.qpid.transport.codec.Encoder; + +public class Broker implements MessageListener +{ + class HeaderInfo + { + boolean valid; + long sequence; + char opcode; + + public String toString() + { + return String.format("%s Header with opcode %s and sequence %s", + (valid ? "Valid" : "Invalid"), opcode, sequence); + } + } + private static Logger log = LoggerFactory.getLogger(Broker.class); + public static int SYNC_TIME = 60000; + // JMS Stuff + private javax.jms.Session session; + boolean sessionTransacted = false; + private String replyName; + private String topicName; + private MessageProducer prod; + private ArrayList consumers = new ArrayList(); + private Queue reply; + private Queue topic; + private int acknowledgeMode = javax.jms.Session.AUTO_ACKNOWLEDGE; + // QMF Stuff + AMQConnection connection; + public String url; + public java.util.HashMap Agents = new java.util.HashMap(); + private Session consoleSession; + private boolean connected = false; + private boolean syncInFlight = false; + private boolean topicBound = false; + private int reqsOutstanding = 0; + private Object lockObject = new Object(); + + UUID brokerId = UUID.randomUUID(); + + public Broker(org.apache.qpid.console.Session session, String url) + { + log.debug("Creating a new Broker for url " + url); + this.url = url; + consoleSession = session; + this.tryToConnect(); + } + + public int brokerBank() + { + return 1; + } + + protected HeaderInfo CheckHeader(Decoder decoder) + { + HeaderInfo returnValue = new HeaderInfo(); + returnValue.opcode = 'x'; + returnValue.sequence = -1; + returnValue.valid = false; + if (decoder.hasRemaining()) + { + char character = (char) decoder.readUint8(); + if (character != 'A') + { + return returnValue; + } + character = (char) decoder.readUint8(); + if (character != 'M') + { + return returnValue; + } + character = (char) decoder.readUint8(); + if (character != '3') + { + return returnValue; + } + returnValue.valid = true; + returnValue.opcode = (char) decoder.readUint8(); + returnValue.sequence = decoder.readUint32(); + } + return returnValue; + } + + public Encoder createEncoder(char opcode, long sequence) + { + return setHeader(new BBEncoder(1024), opcode, sequence); + } + + public Message createMessage(Encoder enc) + { + try + { + byte[] buf = new byte[1024]; + byte[] body = new byte[1024]; + BBEncoder bbenc = (BBEncoder) enc; + BytesMessage msg = session.createBytesMessage(); + ByteBuffer slice = bbenc.buffer(); + while (slice.hasRemaining()) + { + int n = Math.min(buf.length, slice.remaining()); + slice.get(buf, 0, n); + msg.writeBytes(buf, 0, n); + } + return msg; + } catch (JMSException e) + { + throw new ConsoleException(e); + } + } + + public void decrementOutstanding() + { + synchronized (lockObject) + { + this.reqsOutstanding -= 1; + if ((reqsOutstanding == 0) & !topicBound) + { + for (String key : consoleSession.bindingKeys()) + { + try + { + // this.clientSession.exchangeBind(topicName, + // "qpid.mannagement", key) ; + log.debug("Setting Topic Binding " + key); + // topicName = "management://qpid.management//" + key; + String rk = String.format("&routingkey='%s'", key); + Queue aQueue = session.createQueue(topicName + rk); + MessageConsumer cons = session.createConsumer(aQueue); + cons.setMessageListener(this); + consumers.add(cons); + } catch (JMSException e) + { + throw new ConsoleException(e); + } + } + topicBound = true; + } + if ((reqsOutstanding == 0) & syncInFlight) + { + syncInFlight = false; + lockObject.notifyAll(); + } + } + } + + private byte[] ensure(int capacity, byte[] body, int size) + { + if (capacity > body.length) + { + byte[] copy = new byte[capacity]; + System.arraycopy(body, 0, copy, 0, size); + body = copy; + } + return body; + } + + protected void finalize() + { + if (connected) + { + this.shutdown(); + } + } + + public boolean getSyncInFlight() + { + return syncInFlight; + } + + public void incrementOutstanding() + { + synchronized (lockObject) + { + this.reqsOutstanding += 1; + } + } + + public boolean isConnected() + { + return connected; + } + + public void onMessage(Message msg) + { + Decoder decoder = readBody(msg); + HeaderInfo headerInfo = this.CheckHeader(decoder); + // log.debug(headerInfo.toString()); + while (headerInfo.valid) + { + long seq = headerInfo.sequence; + switch (headerInfo.opcode) + { + case 'b': + consoleSession.handleBrokerResponse(this, decoder, seq); + break; + case 'p': + consoleSession.handlePackageIndicator(this, decoder, seq); + break; + case 'z': + consoleSession.handleCommandComplete(this, decoder, seq); + break; + case 'q': + consoleSession.handleClassIndicator(this, decoder, seq); + break; + case 'm': + consoleSession.handleMethodResponse(this, decoder, seq); + break; + case 'h': + consoleSession + .handleHeartbeatIndicator(this, decoder, seq, msg); + break; + case 'e': + consoleSession.handleEventIndicator(this, decoder, seq); + break; + case 's': + consoleSession.handleSchemaResponse(this, decoder, seq); + break; + case 'c': + consoleSession.handleContentIndicator(this, decoder, seq, true, + false); + break; + case 'i': + consoleSession.handleContentIndicator(this, decoder, seq, + false, true); + break; + case 'g': + consoleSession.handleContentIndicator(this, decoder, seq, true, + true); + break; + default: + log.error("Invalid message type recieved with opcode " + + headerInfo.opcode); + break; + } + headerInfo = this.CheckHeader(decoder); + } + } + + private Decoder readBody(Message message) + { + BytesMessage msg = (BytesMessage) message; + BBDecoder dec = new BBDecoder(); + byte[] buf = new byte[1024]; + byte[] body = new byte[1024]; + int size = 0; + int n; + try + { + while ((n = msg.readBytes(buf)) > 0) + { + body = ensure(size + n, body, size); + System.arraycopy(buf, 0, body, size, n); + size += n; + } + } catch (JMSException e) + { + throw new ConsoleException(e); + } + dec.init(ByteBuffer.wrap(body, 0, size)); + return dec; + } + + public void send(Encoder enc) + { + this.send(this.createMessage(enc), "broker"); + } + + public void send(Message msg) + { + this.send(msg, "broker", -1); + } + + public void send(Message msg, String routingKey) + { + this.send(msg, routingKey, -1); + } + + public void send(Message msg, String routingKey, int ttl) + { + synchronized (lockObject) + { + try + { + log.debug(String.format("Sending message to routing key '%s'", + routingKey)); + String destName = String.format( + "management://qpid.management//?routingkey='%s'", + routingKey); + log.debug(destName); + Queue dest = session.createQueue(destName); + // Queue jmsReply = session + // createQueue("direct://amq.direct//?routingkey='reply-" + // + brokerId + "'"); + if (ttl != -1) + { + msg.setJMSExpiration(ttl); + } + msg.setJMSReplyTo(reply); + prod.send(dest, msg); + } catch (Exception e) + { + throw new ConsoleException(e); + } + } + } + + protected Encoder setHeader(Encoder enc, char opcode, long sequence) + { + enc.writeUint8((short) 'A'); + enc.writeUint8((short) 'M'); + enc.writeUint8((short) '3'); + enc.writeUint8((short) opcode); + enc.writeUint32(sequence); + return enc; + } + + public void setSyncInFlight(boolean inFlight) + { + synchronized (lockObject) + { + syncInFlight = inFlight; + lockObject.notifyAll(); + } + } + + public void shutdown() + { + if (connected) + { + this.waitForStable(); + try + { + session.close(); + for (MessageConsumer cons : consumers) + { + cons.close(); + } + connection.close(); + } catch (Exception e) + { + throw new ConsoleException(e); + } finally + { + this.connected = false; + } + } + } + + protected void tryToConnect() + { + try + { + reqsOutstanding = 1; + Agent newAgent = new Agent(this, 0, "BrokerAgent"); + Agents.put(newAgent.agentKey(), newAgent); + connection = new AMQConnection(url); + session = connection.createSession(sessionTransacted, + acknowledgeMode); + replyName = String + .format( + "direct://amq.direct//reply-%s?exclusive='True'&autodelete='True'", + brokerId); + topicName = String + .format( + "management://qpid.management//topic-%s?exclusive='True'&autodelete='True'", + brokerId); + reply = session.createQueue(replyName); + MessageConsumer cons = session.createConsumer(reply); + cons.setMessageListener(this); + consumers.add(cons); + prod = session.createProducer(null); + topic = session.createQueue(topicName); + cons = session.createConsumer(topic); + cons.setMessageListener(this); + consumers.add(cons); + connection.start(); + // Rest of the topic is bound later. Start er up + } catch (Exception e) + { + throw new ConsoleException(e); + } + connected = true; + consoleSession.handleBrokerConnect(this); + Encoder Encoder = createEncoder('B', 0); + this.send(Encoder); + } + + public void updateAgent(QMFObject obj) + { + long agentBank = (Long) obj.GetProperty("agentBank"); + long brokerBank = (Long) obj.GetProperty("brokerBank"); + String key = Agent.AgentKey(agentBank, brokerBank); + if (obj.isDeleted()) + { + if (Agents.containsKey(key)) + { + Agent agent = Agents.get(key); + Agents.remove(key); + consoleSession.handleAgentRemoved(agent); + } + } else + { + if (!Agents.containsKey(key)) + { + Agent newAgent = new Agent(this, agentBank, (String) obj + .GetProperty("label")); + Agents.put(key, newAgent); + consoleSession.handleNewAgent(newAgent); + } + } + } + + public void waitForStable() + { + synchronized (lockObject) + { + if (connected) + { + long start = System.currentTimeMillis(); + syncInFlight = true; + while (reqsOutstanding != 0) + { + log.debug("Waiting to recieve messages"); + try + { + lockObject.wait(SYNC_TIME); + } catch (Exception e) + { + throw new ConsoleException(e); + } + long duration = System.currentTimeMillis() - start; + if (duration > SYNC_TIME) + { + throw new ConsoleException( + "Timeout waiting for Broker to Sync"); + } + } + } + } + } + + public void waitForSync(int timeout) + { + synchronized (lockObject) + { + long start = System.currentTimeMillis(); + while (syncInFlight) + { + try + { + lockObject.wait(SYNC_TIME); + } catch (Exception e) + { + throw new ConsoleException(e); + } + } + long duration = System.currentTimeMillis() - start; + if (duration > timeout) + { + throw new ConsoleException("Timeout waiting for Broker to Sync"); + } + } + } +} \ No newline at end of file diff --git a/java/management/console/src/main/java/org/apache/qpid/console/ClassKey.java b/java/management/console/src/main/java/org/apache/qpid/console/ClassKey.java new file mode 100644 index 0000000000..fc32697fb5 --- /dev/null +++ b/java/management/console/src/main/java/org/apache/qpid/console/ClassKey.java @@ -0,0 +1,142 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console; + +import org.apache.qpid.transport.codec.Decoder; +import org.apache.qpid.transport.codec.Encoder; + +public class ClassKey +{ + private String packageName; + private String className; + private long[] hash = new long[4]; + + public ClassKey(Decoder dec) + { + setPackageName(dec.readStr8()); + setClassName(dec.readStr8()); + hash[0] = dec.readUint32(); + hash[1] = dec.readUint32(); + hash[2] = dec.readUint32(); + hash[3] = dec.readUint32(); + } + + public ClassKey(String keyString) + { + String delims = ":()"; + String[] parts = keyString.split(java.util.regex.Pattern.quote(delims)); + if (parts.length < 3) + { + throw new ConsoleException( + "Invalid class key format. Format should be package:class(bytes)"); + } + setPackageName(parts[0]); + setClassName(parts[1]); + delims = "-"; + String[] bytes = parts[2].split(java.util.regex.Pattern.quote(delims)); + if (bytes.length != 4) + { + throw new ConsoleException( + "Invalid class key format. Bytes should be in the format HEX-HEX-HEX-HEX"); + } + hash[0] = Long.parseLong(bytes[0], 16); + hash[1] = Long.parseLong(bytes[1], 16); + hash[2] = Long.parseLong(bytes[2], 16); + hash[3] = Long.parseLong(bytes[3], 16); + } + + public void encode(Encoder enc) + { + enc.writeStr8(getPackageName()); + enc.writeStr8(getClassName()); + enc.writeUint32(hash[0]); + enc.writeUint32(hash[1]); + enc.writeUint32(hash[2]); + enc.writeUint32(hash[3]); + } + + @Override + public boolean equals(Object obj) + { + if (obj.getClass().equals(this.getClass())) + { + ClassKey other = (ClassKey) obj; + return (other.getKeyString().equals(this.getKeyString())); + } else + { + return false; + } + } + + public final String getClassName() + { + return className; + } + + public long[] getHash() + { + return hash; + } + + public String getHashString() { + return String.format("%08x-%08x-%08x-%08x", hash[0], hash[1], + hash[2], hash[3]); + } + + public String getKeyString() + { + String hashString = this.getHashString() ; + return String.format("%s:%s(%s)", getPackageName(), getClassName(), + hashString); + } + + public String getPackageName() + { + return packageName; + } + + @Override + public int hashCode() + { + return getKeyString().hashCode(); + } + + public void setClassName(String value) + { + className = value; + } + + public void setHash(long[] hash) + { + this.hash = hash; + } + + public void setPackageName(String value) + { + packageName = value; + } + + @Override + public String toString() + { + return String.format("ClassKey: %s", getKeyString()); + } +} \ No newline at end of file diff --git a/java/management/console/src/main/java/org/apache/qpid/console/Console.java b/java/management/console/src/main/java/org/apache/qpid/console/Console.java new file mode 100644 index 0000000000..11b381032a --- /dev/null +++ b/java/management/console/src/main/java/org/apache/qpid/console/Console.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console; + +public interface Console +{ + void agentRemoved(Agent agent); + + void brokerConnected(Broker broker); + + void brokerDisconnected(Broker broker); + + void brokerInformation(Broker broker); + + void eventRecieved(Broker broker, QMFEvent anEvent); + + void hearbeatRecieved(Agent agent, long timestamp); + + void methodResponse(Broker broker, long seq, MethodResult response); + + void newAgent(Agent agent); + + void newClass(short kind, ClassKey key); + + void newPackage(String packageName); + + void objectProperties(Broker broker, QMFObject obj); + + void objectStatistics(Broker broker, QMFObject obj); + + @SuppressWarnings("unchecked") + Class typeMapping(ClassKey key); +} \ No newline at end of file diff --git a/java/management/console/src/main/java/org/apache/qpid/console/ConsoleException.java b/java/management/console/src/main/java/org/apache/qpid/console/ConsoleException.java new file mode 100644 index 0000000000..3176da70a6 --- /dev/null +++ b/java/management/console/src/main/java/org/apache/qpid/console/ConsoleException.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console; + +import java.lang.RuntimeException; + +public class ConsoleException extends RuntimeException +{ + private static final long serialVersionUID = 1L; + + public ConsoleException() + { + super(); + } + + public ConsoleException(String message) + { + super(message); + } + + public ConsoleException(String message, Throwable cause) + { + super(message, cause); + } + + public ConsoleException(Throwable cause) + { + super(cause); + } +} diff --git a/java/management/console/src/main/java/org/apache/qpid/console/EventSeverity.java b/java/management/console/src/main/java/org/apache/qpid/console/EventSeverity.java new file mode 100644 index 0000000000..d40d41b196 --- /dev/null +++ b/java/management/console/src/main/java/org/apache/qpid/console/EventSeverity.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console; + +public enum EventSeverity +{ + EMER(0), ALERT(1), CRIT(2), ERROR(3), WARN(4), NOTIC(5), INFO(6), DEBUG(7); + private int intValue; + + private EventSeverity(int value) + { + intValue = value; + } + + public int getValue() + { + return intValue; + } +} \ No newline at end of file diff --git a/java/management/console/src/main/java/org/apache/qpid/console/MethodResult.java b/java/management/console/src/main/java/org/apache/qpid/console/MethodResult.java new file mode 100644 index 0000000000..34980b50e1 --- /dev/null +++ b/java/management/console/src/main/java/org/apache/qpid/console/MethodResult.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console; + +import java.util.HashMap; + +public class MethodResult +{ + private long returnCode; + private String text; + protected java.util.HashMap returnValues; + + public MethodResult(long aCode, String aMsg, + java.util.HashMap args) + { + setReturnCode(aCode); + setText(aMsg); + returnValues = args; + } + + public long getReturnCode() + { + return returnCode; + } + + public Object getReturnValue(String name) + { + Object returnValue = null; + if (returnValues.containsKey(name)) + { + returnValue = returnValues.get(name); + } + return returnValue; + } + + public HashMap getReturnValues() + { + return returnValues; + } + + public String getText() + { + return text; + } + + public void setReturnCode(long value) + { + returnCode = value; + } + + public void setText(String value) + { + text = value; + } + + @Override + public String toString() + { + String returnString = ""; + for (java.util.Map.Entry pair : returnValues.entrySet()) + { + returnString = returnString + + String.format("(Key: '%s' Value: '%s')", pair.getKey(), + pair.getValue()); + } + return String.format( + "MethodResult: ReturnCode=%s, Text=%s Values=[%s]", + getReturnCode(), getText(), returnString); + } +} \ No newline at end of file diff --git a/java/management/console/src/main/java/org/apache/qpid/console/ObjectID.java b/java/management/console/src/main/java/org/apache/qpid/console/ObjectID.java new file mode 100644 index 0000000000..6cf5301de5 --- /dev/null +++ b/java/management/console/src/main/java/org/apache/qpid/console/ObjectID.java @@ -0,0 +1,93 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console; + +import org.apache.qpid.transport.codec.*; + +public class ObjectID +{ + protected long first; + protected long second; + + public ObjectID() + { + } + + public ObjectID(Decoder dec) + { + first = dec.readUint64(); + second = dec.readUint64(); + } + + public ObjectID(long first, long second) + { + this.first = first; + this.second = second; + } + + public long agentBank() + { + return (this.first & 0x000000000FFFFFFF); + } + + public long brokerBank() + { + return (this.first & 0x0000FFFFF0000000L) >> 28; + } + + public void encode(Encoder enc) + { + enc.writeUint64(first); + enc.writeUint64(second); + } + + public long flags() + { + return (this.first & 0xF000000000000000L) >> 60; + } + + public boolean isDurable() + { + return sequence() == 0; + } + + public long objectNum() + { + return second; + } + + public String routingCode() + { + return Agent.routingCode(agentBank(), brokerBank()); + } + + public long sequence() + { + return (this.first & 0x0FFF000000000000L) >> 48; + } + + @Override + public String toString() + { + return "" + flags() + "-" + sequence() + "-" + brokerBank() + "-" + + agentBank() + "-" + objectNum(); + } +} \ No newline at end of file diff --git a/java/management/console/src/main/java/org/apache/qpid/console/QMFEvent.java b/java/management/console/src/main/java/org/apache/qpid/console/QMFEvent.java new file mode 100644 index 0000000000..116387acfc --- /dev/null +++ b/java/management/console/src/main/java/org/apache/qpid/console/QMFEvent.java @@ -0,0 +1,108 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console; + +import java.util.HashMap; + +import org.apache.qpid.transport.codec.*; + +public class QMFEvent +{ + private java.util.HashMap arguments; + private ClassKey classKey; + private Session session; + private EventSeverity severity; + // FIXME time? + private long timestamp; + + public QMFEvent(Session session, Decoder dec) + { + setSession(session); + setClassKey(new ClassKey(dec)); + setTimestamp(dec.readInt64()); + setSeverity(EventSeverity.values()[dec.readUint8()]); + SchemaClass sClass = getSession().getSchema(getClassKey()); + setArguments(new java.util.HashMap()); + if (sClass != null) + { + for (SchemaArgument arg : sClass.arguments) + { + getArguments().put(arg.getName(), + getSession().decodeValue(dec, arg.getType())); + } + } + } + + public final Object GetArgument(String argName) + { + return getArguments().get(argName); + } + + public final HashMap getArguments() + { + return arguments; + } + + public final ClassKey getClassKey() + { + return classKey; + } + + public final Session getSession() + { + return session; + } + + public final EventSeverity getSeverity() + { + return severity; + } + + public final long getTimestamp() + { + return timestamp; + } + + public final void setArguments(java.util.HashMap value) + { + arguments = value; + } + + public final void setClassKey(ClassKey value) + { + classKey = value; + } + + public final void setSession(Session value) + { + session = value; + } + + public final void setSeverity(EventSeverity value) + { + severity = value; + } + + public final void setTimestamp(long value) + { + timestamp = value; + } +} \ No newline at end of file diff --git a/java/management/console/src/main/java/org/apache/qpid/console/QMFObject.java b/java/management/console/src/main/java/org/apache/qpid/console/QMFObject.java new file mode 100644 index 0000000000..f60270d58a --- /dev/null +++ b/java/management/console/src/main/java/org/apache/qpid/console/QMFObject.java @@ -0,0 +1,423 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.qpid.transport.codec.Decoder; +import org.apache.qpid.transport.codec.Encoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.emory.mathcs.backport.java.util.Arrays; + +public class QMFObject +{ + private static Logger log = LoggerFactory.getLogger(QMFObject.class); + protected SchemaClass schema; + private java.util.Date createTime; + private java.util.Date currentTime; + private java.util.Date deleteTime; + private ObjectID objectID; + private Session session; + private boolean managed; + public java.util.HashMap properties = new java.util.HashMap(); + public java.util.HashMap statistics = new java.util.HashMap(); + + // This constructor is the "naked" constructor which creates + // an object without a session or a schema. It is used by + // subclasses which are auto generated + public QMFObject() + { + } + + public QMFObject(QMFObject source) + { + this.setSession(source.getSession()); + this.setSchema(source.getSchema()); + this.managed = source.managed; + this.setCurrentTime(source.getCurrentTime()); + this.setCreateTime(source.getCreateTime()); + this.setDeleteTime(source.getDeleteTime()); + this.setObjectID(source.getObjectID()); + this.properties = source.properties; + this.statistics = source.statistics; + } + + // This constructor is used by a session make object call to + // create a blank object from a schema. + public QMFObject(Session session, SchemaClass schema, + boolean hasProperties, boolean hasStats, boolean isManaged) + { + setSession(session); + setSchema(schema); + managed = isManaged; + if (hasProperties) + { + for (SchemaProperty prop : getSchema().getAllProperties()) + { + Object propValue = null; + if (!prop.getOptional()) + { + propValue = Util.defaultValue(prop.getType()); + } + this.setProperty(prop.getName(), propValue); + } + } + if (hasStats) + { + for (SchemaStatistic stat : getSchema().statistics) + { + setStatistic(stat.getName(), Util.defaultValue(stat.getType())); + } + } + } + + // This constructor is used by the session to create an object based on a + // data + // stream by the agent. + public QMFObject(Session session, SchemaClass schema, Decoder dec, + boolean hasProperties, boolean hasStats, boolean isManaged) + { + setSession(session); + setSchema(schema); + managed = isManaged; + if (managed) + { + // FIXME DateTime or Uint64?? + setCurrentTime(new java.util.Date(dec.readDatetime())); + setCreateTime(new java.util.Date(dec.readDatetime())); + setDeleteTime(new java.util.Date(dec.readDatetime())); + setObjectID(new ObjectID(dec)); + } + if (hasProperties) + { + java.util.ArrayList excluded = processPresenceMasks(dec, + getSchema()); + for (SchemaProperty prop : getSchema().getAllProperties()) + { + if (excluded.contains(prop.getName())) + { + // log.Debug(String.Format("Setting Property Default {0}", + // prop.Name)) ; + safeAddProperty(prop.getName(), null); + } else + { + // log.Debug(String.Format("Setting Property {0}", + // prop.Name)) ; + safeAddProperty(prop.getName(), session.decodeValue(dec, + prop.getType())); + } + } + } + if (hasStats) + { + for (SchemaStatistic stat : getSchema().getAllStatistics()) + { + // log.Debug(String.Format("Setting Statistic {0}", stat.Name)) + // ; + statistics.put(stat.getName(), session.decodeValue(dec, stat + .getType())); + } + } + } + + public final long agentBank() + { + return getObjectID().agentBank(); + } + + public final long brokerBank() + { + return getObjectID().brokerBank(); + } + + public final void encode(Encoder enc) + { + int mask = 0; + int bit = 0; + java.util.ArrayList propsToEncode = new java.util.ArrayList(); + log.debug(String.format("Encoding class %s:%s", getSchema() + .getPackageName(), getSchema().getClassName())); + enc.writeUint8((short) 20); + getSchema().getKey().encode(enc); + for (SchemaProperty prop : getSchema().getAllProperties()) + { + if (prop.getOptional()) + { + if (bit == 0) + { + bit = 1; + } + if ((properties.containsKey(prop.getName())) + && (properties.get(prop.getName()) != null)) + { + mask |= bit; + propsToEncode.add(prop); + } else + { + } + bit = bit << 1; + if (bit == 256) + { + bit = 0; + enc.writeUint8((short) mask); + mask = 0; + } + } else + { + propsToEncode.add(prop); + } + } + if (bit != 0) + { + enc.writeUint8((short) mask); + } + for (SchemaProperty prop : propsToEncode) + { + Object obj = properties.get(prop.getName()); + // log.Debug(String.Format("Encoding property {0}", prop.Name)) ; + getSession().encodeValue(enc, prop.getType(), obj); + } + for (SchemaStatistic stat : getSchema().statistics) + { + Object obj = statistics.get(stat.getName()); + getSession().encodeValue(enc, stat.getType(), obj); + } + log.debug("Done"); + } + + public final Date getCreateTime() + { + return createTime; + } + + public final Date getCurrentTime() + { + return currentTime; + } + + public final Date getDeleteTime() + { + return deleteTime; + } + + protected final ArrayList getMethods() + { + return getSchema().getAllMethods(); + } + + public final ObjectID getObjectID() + { + return objectID; + } + + public final Object GetProperty(String attributeName) + { + return properties.get(attributeName); + } + + public SchemaClass getSchema() + { + return schema; + } + + public final Session getSession() + { + return session; + } + + protected final MethodResult internalInvokeMethod(String name, + List args, boolean synchronous, int timeToLive) + { + if (!managed) + { + throw new ConsoleException("Object is not Managed"); + } + if (getSchema().getMethod(name) == null) + { + throw new ConsoleException(String.format( + "Method named '%s' does not exist", name)); + } + return getSession().invokeMethod(this, name, args, synchronous, + timeToLive); + } + + public final MethodResult invokeMethod(String name, boolean synchronous, + int timeToLive, Object... args) + { + return this.internalInvokeMethod(name, Arrays.asList(args), + synchronous, timeToLive); + } + + public final MethodResult invokeMethod(String name, boolean synchronous, + Object... args) + { + return this.internalInvokeMethod(name, Arrays.asList(args), + synchronous, Broker.SYNC_TIME); + } + + public final MethodResult invokeMethod(String name, int timeToLive, + Object... args) + { + return this.internalInvokeMethod(name, Arrays.asList(args), true, + timeToLive); + } + + public final MethodResult invokeMethod(String name, Object... args) + { + return this.internalInvokeMethod(name, Arrays.asList(args), true, + Broker.SYNC_TIME); + } + + public final boolean isDeleted() + { + return !getDeleteTime().equals(new java.util.Date(0)); + } + + protected final ArrayList processPresenceMasks(Decoder dec, + SchemaClass schema) + { + java.util.ArrayList excludes = new java.util.ArrayList(); + short bit = 0; + short mask = 0; + for (SchemaProperty prop : getSchema().getAllProperties()) + { + if (prop.getOptional()) + { + // log.Debug(String.Format("Property named {0} is optional", + // prop.Name)) ; + if (bit == 0) + { + mask = dec.readUint8(); + bit = 1; + } + if ((mask & bit) == 0) + { + // log.Debug(String.Format("Property named {0} is not present", + // prop.Name)) ; + excludes.add(prop.getName()); + } + bit *= 2; + if (bit == 256) + { + bit = 0; + } + } + } + return excludes; + } + + public final String routingKey() + { + return getObjectID().routingCode(); + } + + protected final void safeAddProperty(String propName, Object value) + { + if (properties.containsKey(propName)) + { + properties.put(propName, value); + } else + { + properties.put(propName, value); + } + } + + public final void setCreateTime(java.util.Date value) + { + createTime = value; + } + + public final void setCurrentTime(java.util.Date value) + { + currentTime = value; + } + + public final void setDeleteTime(java.util.Date value) + { + deleteTime = value; + } + + public final void setObjectID(ObjectID value) + { + objectID = value; + } + + public final void setProperty(String attributeName, Object newValue) + { + properties.put(attributeName, newValue); + } + + public void setSchema(SchemaClass value) + { + schema = value; + } + + public final void setSession(Session value) + { + session = value; + } + + protected final void setStatistic(String attributeName, Object newValue) + { + statistics.put(attributeName, newValue); + } + + @Override + public String toString() + { + String propertyString = ""; + for (Entry pair : properties.entrySet()) + { + propertyString = propertyString + + String.format("(Name: '%0$s' Value: '%1$s')", pair + .getKey(), pair.getValue()); + } + String statsString = ""; + for (Entry sPair : statistics.entrySet()) + { + statsString = statsString + + String.format("(Name: '%0$s' Value: '%1$s')", sPair + .getKey(), sPair.getValue()); + } + if (managed) + { + return String + .format( + "Managed QMFObject %0$s:%1$s(%2$s) Properties: [%3$s] Statistics: [%4$s])", + getSchema().getPackageName(), getSchema() + .getClassName(), getObjectID(), + propertyString, statsString); + } else + { + return String + .format( + "QMFObject %0$s:%1$s Properties: [%2$s] Statistics: [%3$s]", + getSchema().getPackageName(), getSchema() + .getClassName(), propertyString, + statsString); + } + } +} \ No newline at end of file diff --git a/java/management/console/src/main/java/org/apache/qpid/console/SchemaArgument.java b/java/management/console/src/main/java/org/apache/qpid/console/SchemaArgument.java new file mode 100644 index 0000000000..7e83b1b447 --- /dev/null +++ b/java/management/console/src/main/java/org/apache/qpid/console/SchemaArgument.java @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console; + +import java.util.Map; + +import org.apache.qpid.transport.codec.Decoder; + +public class SchemaArgument extends SchemaVariable +{ + private String direction; + + public SchemaArgument(Decoder dec, boolean methodArg) + { + Map map = dec.readMap(); + super.populateData(map); + if (map.containsKey("dir")) + { + setDirection((String) map.get("dir")); + } + } + + public String getDirection() + { + return direction; + } + + public boolean isBidirectional() + { + return getDirection().equals("IO"); + } + + public boolean isInput() + { + return getDirection().equals("I") | getDirection().equals("IO"); + } + + public boolean isOutput() + { + return getDirection().equals("O") | getDirection().equals("IO"); + } + + public void setDirection(String value) + { + direction = value; + } +} \ No newline at end of file diff --git a/java/management/console/src/main/java/org/apache/qpid/console/SchemaClass.java b/java/management/console/src/main/java/org/apache/qpid/console/SchemaClass.java new file mode 100644 index 0000000000..66e3fc3feb --- /dev/null +++ b/java/management/console/src/main/java/org/apache/qpid/console/SchemaClass.java @@ -0,0 +1,251 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console;// + +import java.util.ArrayList; + +import org.apache.qpid.transport.codec.*; + +public class SchemaClass +{ + public static int CLASS_KIND_EVENT = 2; + public static int CLASS_KIND_TABLE = 1; + public ArrayList arguments = new ArrayList(); + private ClassKey classKey; + private int kind; + private Session session; + private ClassKey superType; + public ArrayList methods = new ArrayList(); + public ArrayList properties = new ArrayList(); + public ArrayList statistics = new ArrayList(); + + public SchemaClass(int kind, ClassKey key, Decoder dec, Session session) + { + // System.Console.WriteLine(key.ClassName) ; + setKind(kind); + setSession(session); + this.setKey(key); + boolean hasSupertype = dec.readUint8() != 0; + if (kind == CLASS_KIND_TABLE) + { + int propCount = dec.readUint16(); + int statCount = dec.readUint16(); + int methodCount = dec.readUint16(); + if (hasSupertype) + { + setSuperType(new ClassKey(dec)); + } + for (int x = 0; x < propCount; x++) + { + properties.add(new SchemaProperty(dec)); + } + for (int x = 0; x < statCount; x++) + { + statistics.add(new SchemaStatistic(dec)); + } + for (int x = 0; x < methodCount; x++) + { + methods.add(new SchemaMethod(dec)); + } + } + if (kind == CLASS_KIND_EVENT) + { + int argCount = dec.readUint16(); + if (hasSupertype) + { + setSuperType(new ClassKey(dec)); + } + for (int x = 0; x < argCount; x++) + { + arguments.add(new SchemaArgument(dec, false)); + } + } + } + + public ArrayList getAllMethods() + { + if (getSuperType() == null) + { + return methods; + } else + { + ArrayList allMethods = new ArrayList( + methods); + allMethods.addAll(getSession().getSchema(getSuperType()) + .getAllMethods()); + return allMethods; + } + } + + public ArrayList getAllProperties() + { + if (getSuperType() == null) + { + return properties; + } else + { + ArrayList allProperties = new ArrayList( + properties); + allProperties.addAll(getSession().getSchema(getSuperType()) + .getAllProperties()); + return allProperties; + } + } + + public ArrayList getAllStatistics() + { + if (getSuperType() == null) + { + return statistics; + } else + { + ArrayList allStats = new ArrayList( + statistics); + allStats.addAll(getSession().getSchema(getSuperType()) + .getAllStatistics()); + return allStats; + } + } + + public String getClassKeyString() + { + return getKey().getKeyString(); + } + + public String getClassName() + { + return getKey().getClassName(); + } + + public ClassKey getKey() + { + return classKey; + } + + public int getKind() + { + return kind; + } + + public SchemaMethod getMethod(String name) + { + SchemaMethod returnValue = null; + for (SchemaMethod method : methods) + { + if (method.getName().equals(name)) + { + returnValue = method; + break; + } + } + return returnValue; + } + + public String getPackageName() + { + return getKey().getPackageName(); + } + + protected Session getSession() + { + return session; + } + + public ClassKey getSuperType() + { + return superType; + } + + public boolean hasSuperType() + { + return getSuperType() != null; + } + + public void setKey(ClassKey value) + { + classKey = value; + } + + public void setKind(int value) + { + kind = value; + } + + protected void setSession(Session value) + { + session = value; + } + + public void setSuperType(ClassKey value) + { + superType = value; + } + + public ArrayList getProperties() + { + return properties; + } + + public void setProperties(ArrayList properties) + { + this.properties = properties; + } + + public ArrayList getMethods() + { + return methods; + } + + public void setMethods(ArrayList methods) + { + this.methods = methods; + } + + public ArrayList getStatistics() + { + return statistics; + } + + public void setStatistics(ArrayList statistics) + { + this.statistics = statistics; + } + + public ArrayList getArguments() + { + return arguments; + } + + public void setArguments(ArrayList arguments) + { + this.arguments = arguments; + } + + public ClassKey getClassKey() + { + return classKey; + } + + public void setClassKey(ClassKey classKey) + { + this.classKey = classKey; + } +} \ No newline at end of file diff --git a/java/management/console/src/main/java/org/apache/qpid/console/SchemaMethod.java b/java/management/console/src/main/java/org/apache/qpid/console/SchemaMethod.java new file mode 100644 index 0000000000..1c20ae55bb --- /dev/null +++ b/java/management/console/src/main/java/org/apache/qpid/console/SchemaMethod.java @@ -0,0 +1,125 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console;// + +import java.util.ArrayList; +import java.util.Map; + +import org.apache.qpid.transport.codec.*; + +public class SchemaMethod +{ + public ArrayList Arguments = new ArrayList(); + private int m_ArgCount; + private int m_BidirectionalArgCount; + private String m_Description; + private int m_InputArgCount; + private String m_Name; + private int m_OutputArgCount; + + public SchemaMethod(Decoder dec) + { + Map map = dec.readMap(); + setName((String) map.get("name")); + setArgCount((Integer) map.get("argCount")); + if (map.containsKey("desc")) + { + setDescription((String) map.get("desc")); + } + for (int x = 0; x < getArgCount(); x++) + { + SchemaArgument arg = new SchemaArgument(dec, true); + Arguments.add(arg); + if (arg.isInput()) + { + setInputArgCount(getInputArgCount() + 1); + } + if (arg.isOutput()) + { + setOutputArgCount(getOutputArgCount() + 1); + } + if (arg.isBidirectional()) + { + setBidirectionalArgCount(getBidirectionalArgCount() + 1); + } + } + } + + public final int getArgCount() + { + return m_ArgCount; + } + + public final int getBidirectionalArgCount() + { + return m_BidirectionalArgCount; + } + + public final String getDescription() + { + return m_Description; + } + + public final int getInputArgCount() + { + return m_InputArgCount; + } + + public final String getName() + { + return m_Name; + } + + public final int getOutputArgCount() + { + return m_OutputArgCount; + } + + public final void setArgCount(int value) + { + m_ArgCount = value; + } + + public final void setBidirectionalArgCount(int value) + { + m_BidirectionalArgCount = value; + } + + public final void setDescription(String value) + { + m_Description = value; + } + + public final void setInputArgCount(int value) + { + m_InputArgCount = value; + } + + public final void setName(String value) + { + m_Name = value; + } + + public final void setOutputArgCount(int value) + { + m_OutputArgCount = value; + } +} \ No newline at end of file diff --git a/java/management/console/src/main/java/org/apache/qpid/console/SchemaProperty.java b/java/management/console/src/main/java/org/apache/qpid/console/SchemaProperty.java new file mode 100644 index 0000000000..8e278ff70d --- /dev/null +++ b/java/management/console/src/main/java/org/apache/qpid/console/SchemaProperty.java @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console; + +import java.util.Map; + +import org.apache.qpid.transport.codec.*; + +public class SchemaProperty extends SchemaVariable +{ + private int access; + private boolean index; + private boolean optional; + + public SchemaProperty(Decoder dec) + { + Map map = dec.readMap(); + super.populateData(map); + setName((String) map.get("name")); + if (map.containsKey("optional")) + { + setOptional((Integer) map.get("optional") != 0); + } + if (map.containsKey("index")) + { + setIndex((Integer) map.get("index") != 0); + } + if (map.containsKey("access")) + { + setAccess((Integer) map.get("access")); + } + } + + public int getAccess() + { + return access; + } + + public boolean getIndex() + { + return index; + } + + public boolean getOptional() + { + return optional; + } + + public void setAccess(int value) + { + access = value; + } + + public void setIndex(boolean value) + { + index = value; + } + + public void setOptional(boolean value) + { + optional = value; + } +} \ No newline at end of file diff --git a/java/management/console/src/main/java/org/apache/qpid/console/SchemaStatistic.java b/java/management/console/src/main/java/org/apache/qpid/console/SchemaStatistic.java new file mode 100644 index 0000000000..18bce86423 --- /dev/null +++ b/java/management/console/src/main/java/org/apache/qpid/console/SchemaStatistic.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console;// + +import java.util.Map; + +import org.apache.qpid.transport.codec.*; + +public class SchemaStatistic +{ + private String description; + private String name; + private short type; + private String unit; + + public SchemaStatistic(Decoder dec) + { + Map map = dec.readMap(); + setName((String) map.get("name")); + setType(Short.parseShort("" + map.get("type"))); + if (map.containsKey("unit")) + { + setUnit((String) map.get("unit")); + } + if (map.containsKey("description")) + { + setDescription((String) map.get("description")); + } + } + + public String getDescription() + { + return description; + } + + public String getName() + { + return name; + } + + public short getType() + { + return type; + } + + public String getUnit() + { + return unit; + } + + public void setDescription(String value) + { + description = value; + } + + public void setName(String value) + { + name = value; + } + + public void setType(short value) + { + type = value; + } + + public void setUnit(String value) + { + unit = value; + } +} \ No newline at end of file diff --git a/java/management/console/src/main/java/org/apache/qpid/console/SchemaVariable.java b/java/management/console/src/main/java/org/apache/qpid/console/SchemaVariable.java new file mode 100644 index 0000000000..483a17d0de --- /dev/null +++ b/java/management/console/src/main/java/org/apache/qpid/console/SchemaVariable.java @@ -0,0 +1,185 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console; + +import java.util.Map; + +public abstract class SchemaVariable +{ + private String defaultVariable; + private String description; + private Integer max; + private Integer maxLength; + private Integer min; + private String name; + private String refClass; + private String refPackage; + private short type; + private String unit; + + public SchemaVariable() + { + } + + public String getDefault() + { + return defaultVariable; + } + + public String getDescription() + { + return description; + } + + public Integer getMax() + { + return max; + } + + public Integer getMaxLength() + { + return maxLength; + } + + public Integer getMin() + { + return min; + } + + public String getName() + { + return name; + } + + public String getRefClass() + { + return refClass; + } + + public String getRefPackage() + { + return refPackage; + } + + public short getType() + { + return type; + } + + public String getUnit() + { + return unit; + } + + protected void populateData(Map map) + { + if (map.containsKey("name")) + { + setName((String) map.get("name")); + } + if (map.containsKey("type")) + { + setType(Short.parseShort(("" + map.get("type")))); + } + if (map.containsKey("unit")) + { + setUnit((String) map.get("unit")); + } + if (map.containsKey("min")) + { + setMin((Integer) map.get("min")); + } + if (map.containsKey("max")) + { + setMax((Integer) map.get("max")); + } + if (map.containsKey("maxlen")) + { + setMaxLength((Integer) map.get("maxlen")); + } + if (map.containsKey("description")) + { + setDescription((String) map.get("description")); + } + if (map.containsKey("refClass")) + { + setRefClass((String) map.get("refClass")); + } + if (map.containsKey("refPackage")) + { + setRefPackage((String) map.get("refPackage")); + } + if (map.containsKey("Default")) + { + setDefault((String) map.get("default")); + } + } + + public void setDefault(String value) + { + defaultVariable = value; + } + + public void setDescription(String value) + { + description = value; + } + + public void setMax(Integer value) + { + max = value; + } + + public void setMaxLength(Integer value) + { + maxLength = value; + } + + public void setMin(Integer value) + { + min = value; + } + + public void setName(String value) + { + name = value; + } + + public void setRefClass(String value) + { + refClass = value; + } + + public void setRefPackage(String value) + { + refPackage = value; + } + + public void setType(short value) + { + type = value; + } + + public void setUnit(String value) + { + unit = value; + } +} \ No newline at end of file diff --git a/java/management/console/src/main/java/org/apache/qpid/console/SequenceManager.java b/java/management/console/src/main/java/org/apache/qpid/console/SequenceManager.java new file mode 100644 index 0000000000..4c5fcc7355 --- /dev/null +++ b/java/management/console/src/main/java/org/apache/qpid/console/SequenceManager.java @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console; + +import java.util.HashMap; + +public class SequenceManager +{ + private long sequence = 0; + private HashMap pending = new HashMap(); + private Object lockObject = new Object(); + + public SequenceManager() + { + } + + public Object release(long seq) + { + Object returnValue = null; + synchronized (lockObject) + { + returnValue = pending.get(seq); + pending.remove(seq); + } + return returnValue; + } + + public long reserve(Object data) + { + long returnValue = 0; + synchronized (lockObject) + { + returnValue = sequence; + sequence += 1; + pending.put(returnValue, data); + } + return returnValue; + } +} \ No newline at end of file diff --git a/java/management/console/src/main/java/org/apache/qpid/console/Session.java b/java/management/console/src/main/java/org/apache/qpid/console/Session.java new file mode 100644 index 0000000000..424750ea57 --- /dev/null +++ b/java/management/console/src/main/java/org/apache/qpid/console/Session.java @@ -0,0 +1,1007 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console;// + +import java.lang.reflect.Constructor; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import javax.jms.Message; + +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.transport.codec.BBEncoder; +import org.apache.qpid.transport.codec.Decoder; +import org.apache.qpid.transport.codec.Encoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Session +{ + private static Logger log = LoggerFactory.getLogger(Session.class); + public static int CONTEXT_SYNC = 1; + public static int CONTEXT_STARTUP = 2; + public static int CONTEXT_MULTIGET = 3; + public static int DEFAULT_GET_WAIT_TIME = 60000; + public boolean recieveObjects = true; + public boolean recieveEvents = true; + public boolean recieveHeartbeat = true; + public boolean userBindings = false; + public Console console; + protected HashMap> packages = new HashMap>(); + protected ArrayList brokers = new ArrayList(); + protected SequenceManager sequenceManager = new SequenceManager(); + protected Object lockObject = new Object(); + protected ArrayList syncSequenceList = new ArrayList(); + protected ArrayList getResult; + protected Object syncResult; + + public Session() + { + } + + public Session(Console console) + { + this.console = console; + } + + public void addBroker(String url) + { + Broker broker = new Broker(this, url); + brokers.add(broker); + java.util.HashMap args = new java.util.HashMap(); + args.put("_class", "agent"); + args.put("_broker", broker); + this.getObjects(args); + } + + public ArrayList bindingKeys() + { + ArrayList bindings = new ArrayList(); + bindings.add("schema.#"); + if (recieveObjects & recieveEvents & recieveHeartbeat & !userBindings) + { + bindings.add("console.#"); + } else + { + if (recieveObjects & !userBindings) + { + bindings.add("console.obj.#"); + } else + { + bindings.add("console.obj.*.*.org.apache.qpid.broker.agent"); + } + if (recieveEvents) + { + bindings.add("console.event.#"); + } + if (recieveHeartbeat) + { + bindings.add("console.heartbeat.#"); + } + } + return bindings; + } + + public void close() + { + for (Broker broker : brokers.toArray(new Broker[0])) + { + this.removeBroker(broker); + } + } + + protected QMFObject createQMFObject(SchemaClass schema, + boolean hasProperties, boolean hasStats, boolean isManaged) + { + Class realClass = QMFObject.class; + if (console != null) + { + realClass = console.typeMapping(schema.getKey()); + } + Class[] types = new Class[] + { Session.class, SchemaClass.class, boolean.class, boolean.class, + boolean.class }; + Object[] args = new Object[] + { this, schema, hasProperties, hasStats, isManaged }; + try + { + Constructor ci = realClass.getConstructor(types); + return (QMFObject) ci.newInstance(args); + } catch (Exception e) + { + throw new ConsoleException(e); + } + } + + protected QMFObject createQMFObject(SchemaClass schema, Decoder dec, + boolean hasProperties, boolean hasStats, boolean isManaged) + { + Class realClass = QMFObject.class; + if (console != null) + { + realClass = console.typeMapping(schema.getKey()); + } + Class[] types = new Class[] + { Session.class, SchemaClass.class, Decoder.class, boolean.class, + boolean.class, boolean.class }; + Object[] args = new Object[] + { this, schema, dec, hasProperties, hasStats, isManaged }; + try + { + Constructor ci = realClass.getConstructor(types); + return (QMFObject) ci.newInstance(args); + } catch (Exception e) + { + throw new ConsoleException(e); + } + } + + public Object decodeValue(Decoder dec, short type) + { + switch (type) + { + case 1: // U8 + return dec.readUint8(); + case 2: // U16 + return dec.readUint16(); + case 3: // U32 + return dec.readUint32(); + case 4: // U64 + return dec.readUint64(); + case 6: // SSTR + return dec.readStr8(); + case 7: // LSTR + return dec.readStr16(); + case 8: // ABSTIME + return dec.readDatetime(); + case 9: // DELTATIME + return dec.readUint32(); + case 10: // ref + return new ObjectID(dec); + case 11: // bool + return dec.readUint8() != 0; + case 12: // float + return dec.readFloat(); + case 13: // double + return dec.readDouble(); + case 14: // UUID + return dec.readUuid(); + case 15: // Ftable + java.util.HashMap ftable = new java.util.HashMap(); + BBDecoder sc = new BBDecoder(); + sc.init(ByteBuffer.wrap(dec.readVbin32())); + if (sc.hasRemaining()) + { + long count = sc.readUint32(); + while (count > 0) + { + String key = sc.readStr8(); + short code = sc.readUint8(); + Object newValue = this.decodeValue(sc, code); + ftable.put(key, newValue); + count -= 1; + } + } + return ftable; + case 16: // int8 + return dec.readInt8(); + case 17: // int16 + return dec.readInt16(); + case 18: // int32 + return dec.readInt32(); + case 19: // int64 + return dec.readInt64(); + case 20: // Object + // Peek into the inner type code, make sure + // it is actually an object + Object returnValue = null; + short innerTypeCode = dec.readUint8(); + if (innerTypeCode != 20) + { + returnValue = this.decodeValue(dec, innerTypeCode); + } else + { + ClassKey classKey = new ClassKey(dec); + synchronized (lockObject) + { + SchemaClass sClass = getSchema(classKey); + if (sClass != null) + { + returnValue = this.createQMFObject(sClass, dec, true, + true, false); + } + } + } + return returnValue; + case 21: // List + BBDecoder lDec = new BBDecoder(); + lDec.init(ByteBuffer.wrap(dec.readVbin32())); + long count = lDec.readUint32(); + ArrayList newList = new ArrayList(); + while (count > 0) + { + short innerType = lDec.readUint8(); + newList.add(this.decodeValue(lDec, innerType)); + count -= 1; + } + return newList; + case 22: // Array + BBDecoder aDec = new BBDecoder(); + aDec.init(ByteBuffer.wrap(dec.readVbin32())); + long cnt = aDec.readUint32(); + short innerType = aDec.readUint8(); + ArrayList aList = new ArrayList(); + while (cnt > 0) + { + aList.add(this.decodeValue(aDec, innerType)); + cnt -= 1; + } + return aList; + default: + throw new ConsoleException(String.format("Invalid Type Code: %s", + type)); + } + } + + public void encodeValue(Encoder enc, short type, Object val) + { + try + { + switch (type) + { + case 1: // U8 + enc.writeUint8(((Short) val).shortValue()); + break; + case 2: // U16 + enc.writeUint16(((Integer) val).intValue()); + break; + case 3: // U32 + enc.writeUint32(((Long) val).longValue()); + break; + case 4: // U64 + enc.writeUint64(((Long) val).longValue()); + break; + case 6: // SSTR + enc.writeStr8((String) val); + break; + case 7: // LSTR + enc.writeStr16((String) val); + break; + case 8: // ABSTIME + enc.writeDatetime(((Long) val).longValue()); + break; + case 9: // DELTATIME + enc.writeUint32(((Long) val).longValue()); + break; + case 10: // ref + ((ObjectID) val).encode(enc); + break; + case 11: + if (((Boolean) val).booleanValue()) + { + enc.writeUint8((short) 1); + } else + { + enc.writeUint8((short) 0); + } + break; + case 12: // FLOAT + enc.writeFloat(((Float) val).floatValue()); + break; + case 13: // DOUBLE + enc.writeDouble(((Double) val).doubleValue()); + break; + case 14: // UUID + enc.writeUuid((UUID) val); + break; + case 15: // Ftable + Map ftable = (Map) val; + BBEncoder sc = new BBEncoder(1); + sc.init(); + sc.writeUint32(ftable.size()); + for (String key : ftable.keySet()) + { + Object obj = ftable.get(key); + short innerType = Util.qmfType(obj); + sc.writeStr8(key); + sc.writeUint8(innerType); + this.encodeValue(sc, innerType, obj); + } + byte[] bytes = sc.segment().array(); + enc.writeVbin32(bytes); + break; + case 16: // int8 + enc.writeInt8((Byte) val); + break; + case 17: // int16 + enc.writeInt16((Short) val); + break; + case 18: // int32 + enc.writeInt32((Integer) val); + break; + case 19: // int64 + enc.writeInt64((Long) val); + break; + case 20: // Object + // Check that the object has a session, if not + // take ownership of it + QMFObject qObj = (QMFObject) val; + if (qObj.getSession() == null) + { + qObj.setSession(this); + } + qObj.encode(enc); + break; + case 21: // List + ArrayList items = (ArrayList) val; + BBEncoder lEnc = new BBEncoder(1); + lEnc.init(); + lEnc.writeUint32(items.size()); + for (Object obj : items) + { + short innerType = Util.qmfType(obj); + lEnc.writeUint8(innerType); + this.encodeValue(lEnc, innerType, obj); + } + enc.writeVbin32(lEnc.segment().array()); + break; + case 22: // Array + ArrayList aItems = (ArrayList) val; + BBEncoder aEnc = new BBEncoder(1); + aEnc.init(); + long aCount = aItems.size(); + aEnc.writeUint32(aCount); + if (aCount > 0) + { + Object anObj = aItems.get(0); + short innerType = Util.qmfType(anObj); + aEnc.writeUint8(innerType); + for (Object obj : aItems) + { + this.encodeValue(aEnc, innerType, obj); + } + } + enc.writeVbin32(aEnc.segment().array()); + break; + default: + throw new ConsoleException(String.format( + "Invalid Type Code: %s", type)); + } + } catch (ClassCastException e) + { + String msg = String.format( + "Class cast exception for typecode %s, type %s ", type, val + .getClass()); + log.error(msg); + throw new ConsoleException(msg + type, e); + } + } + + public Broker getBroker(long BrokerBank) + { + Broker returnValue = null; + for (Broker broker : brokers) + { + if (broker.brokerBank() == BrokerBank) + { + returnValue = broker; + break; + } + } + return returnValue; + } + + public ArrayList getClasses(String packageName) + { + ArrayList returnValue = new ArrayList(); + this.waitForStable(); + if (packages.containsKey(packageName)) + { + for (SchemaClass sClass : packages.get(packageName).values()) + { + returnValue.add(sClass.getKey()); + } + } + return returnValue; + } + + public ArrayList getObjects( + java.util.HashMap args) + { + ArrayList brokerList = null; + ArrayList agentList = new ArrayList(); + if (args.containsKey("_broker")) + { + brokerList = new ArrayList(); + brokerList.add((Broker) args.get("_broker")); + } else + { + brokerList = this.brokers; + } + for (Broker broker : brokerList) + { + broker.waitForStable(); + } + if (args.containsKey("_agent")) + { + Agent agent = (Agent) args.get("_agent"); + if (brokerList.contains(agent.getBroker())) + { + agentList.add(agent); + } else + { + throw new ConsoleException( + "Agent is not managed by this console or the supplied broker"); + } + } else + { + if (args.containsKey("_objectId")) + { + ObjectID oid = (ObjectID) args.get("_objectId"); + for (Broker broker : brokers) + { + for (Agent agent : broker.Agents.values()) + { + if ((agent.getAgentBank() == oid.agentBank()) + && (agent.getBrokerBank() == oid.brokerBank())) + { + agentList.add(agent); + } + } + } + } else + { + for (Broker broker : brokerList) + { + for (Agent agent : broker.Agents.values()) + { + if (agent.getBroker().isConnected()) + { + agentList.add(agent); + } + } + } + } + } + getResult = new ArrayList(); + if (agentList.size() > 0) + { + // FIXME Add a bunch of other suff too + for (Agent agent : agentList) + { + HashMap getParameters = new HashMap(); + Broker broker = agent.getBroker(); + long seq = -1; + synchronized (lockObject) + { + seq = sequenceManager.reserve(Session.CONTEXT_MULTIGET); + syncSequenceList.add(seq); + } + String packageName = (String) args.get("_package"); + String className = (String) args.get("_class"); + ClassKey key = (ClassKey) args.get("_key"); + Object sClass = args.get("_schema"); + Object oid = args.get("_objectID"); + long[] hash = (long[]) args.get("_hash"); + if ((className == null) && (oid == null) && (oid == null)) + { + throw new ConsoleException( + "No class supplied, use '_schema', '_key', '_class', or '_objectId' argument"); + } + if (oid != null) + { + getParameters.put("_objectID", oid); + } else + { + if (sClass != null) + { + key = (key != null) ? key : ((SchemaClass) sClass) + .getKey(); + } + if (key != null) + { + className = (className != null) ? className : key + .getClassName(); + packageName = (packageName != null) ? packageName : key + .getPackageName(); + hash = (hash != null) ? hash : key.getHash(); + } + if (packageName != null) + { + getParameters.put("_package", packageName); + } + if (className != null) + { + getParameters.put("_class", className); + } + if (hash != null) + { + getParameters.put("_hash", hash); + } + for (java.util.Map.Entry pair : args + .entrySet()) + { + if (!pair.getKey().startsWith("_")) + { + getParameters.put(pair.getKey(), pair.getValue()); + } + } + } + Encoder enc = broker.createEncoder('G', seq); + enc.writeMap(getParameters); + String routingKey = agent.routingCode(); + Message msg = broker.createMessage(enc); + log.debug("Get Object Keys: "); + for (String pKey : getParameters.keySet()) + { + log.debug(String.format("\tKey: '%s' Value: '%s'", pKey, + getParameters.get(pKey))); + } + broker.send(msg, routingKey); + } + int waittime = DEFAULT_GET_WAIT_TIME; + boolean timeout = false; + if (args.containsKey("_timeout")) + { + waittime = (Integer) args.get("_timeout"); + } + long start = System.currentTimeMillis(); + synchronized (lockObject) + { + // FIXME ERROR + while (syncSequenceList.size() > 0) + { + try + { + lockObject.wait(waittime); + } catch (InterruptedException e) + { + throw new ConsoleException(e); + } + long duration = System.currentTimeMillis() - start; + if (duration > waittime) + { + for (long pendingSeq : syncSequenceList) + { + sequenceManager.release(pendingSeq); + } + syncSequenceList.clear(); + timeout = true; + } + } + } + // FIXME Add the error logic + if ((getResult.isEmpty()) && timeout) + { + throw new ConsoleException("Get Request timed out"); + } + } + return getResult; + } + + public ArrayList getPackages() + { + this.waitForStable(); + ArrayList returnValue = new ArrayList(); + for (String name : packages.keySet()) + { + returnValue.add(name); + } + return returnValue; + } + + public SchemaClass getSchema(ClassKey key) + { + return getSchema(key, true); + } + + protected SchemaClass getSchema(ClassKey key, boolean waitForStable) + { + if (waitForStable) + { + this.waitForStable(); + } + SchemaClass returnValue = null; + returnValue = packages.get(key.getPackageName()) + .get(key.getKeyString()); + return returnValue; + } + + public void handleAgentRemoved(Agent agent) + { + if (console != null) + { + console.agentRemoved(agent); + } + } + + public void handleBrokerConnect(Broker broker) + { + if (console != null) + { + console.brokerConnected(broker); + } + } + + public void handleBrokerDisconnect(Broker broker) + { + if (console != null) + { + console.brokerDisconnected(broker); + } + } + + public void handleBrokerResponse(Broker broker, Decoder decoder, + long sequence) + { + if (console != null) + { + console.brokerInformation(broker); + } + long seq = sequenceManager.reserve(CONTEXT_STARTUP); + Encoder encoder = broker.createEncoder('P', seq); + broker.send(encoder); + } + + public void handleClassIndicator(Broker broker, Decoder decoder, + long sequence) + { + short kind = decoder.readUint8(); + ClassKey classKey = new ClassKey(decoder); + boolean unknown = false; + synchronized (lockObject) + { + if (packages.containsKey(classKey.getPackageName())) + { + if (!packages.get(classKey.getPackageName()).containsKey( + classKey.getKeyString())) + { + unknown = true; + } + } + } + if (unknown) + { + broker.incrementOutstanding(); + long seq = sequenceManager.reserve(Session.CONTEXT_STARTUP); + Encoder enc = broker.createEncoder('S', seq); + classKey.encode(enc); + broker.send(enc); + } + } + + public void handleCommandComplete(Broker broker, Decoder decoder, + long sequence) + { + long code = decoder.readUint32(); + String text = decoder.readStr8(); + Object context = this.sequenceManager.release(sequence); + if (context.equals(CONTEXT_STARTUP)) + { + broker.decrementOutstanding(); + } else + { + if ((context.equals(CONTEXT_SYNC)) & broker.getSyncInFlight()) + { + broker.setSyncInFlight(false); + } else + { + if (context.equals(CONTEXT_MULTIGET) + && syncSequenceList.contains(sequence)) + { + synchronized (lockObject) + { + syncSequenceList.remove(sequence); + if (syncSequenceList.isEmpty()) + { + lockObject.notifyAll(); + } + } + } + } + } + } + + public void handleContentIndicator(Broker broker, Decoder decoder, + long sequence, boolean hasProperties, boolean hasStatistics) + { + ClassKey key = new ClassKey(decoder); + SchemaClass sClass = null; + ; + synchronized (lockObject) + { + sClass = getSchema(key, false); + } + if (sClass != null) + { + QMFObject obj = this.createQMFObject(sClass, decoder, + hasProperties, hasStatistics, true); + if (key.getPackageName().equals("org.apache.qpid.broker") + && key.getClassName().equals("agent") && hasProperties) + { + broker.updateAgent(obj); + } + synchronized (lockObject) + { + if (syncSequenceList.contains(sequence)) + { + if (!obj.isDeleted() && this.selectMatch(obj)) + { + getResult.add(obj); + } + } + } + if (console != null) + { + if (hasProperties) + { + console.objectProperties(broker, obj); + } + if (hasStatistics) + { + console.objectStatistics(broker, obj); + } + } + } + } + + public void handleEventIndicator(Broker broker, Decoder decoder, + long sequence) + { + if (console != null) + { + QMFEvent newEvent = new QMFEvent(this, decoder); + console.eventRecieved(broker, newEvent); + } + } + + public void handleHeartbeatIndicator(Broker broker, Decoder decoder, + long sequence, Message msg) + { + if (console != null) + { + long brokerBank = 1; + long agentBank = 0; + try + { + // FIXME HOW DO WE GET THE ROUTING KEY + // String routingKey = msg.DeliveryProperties.getRoutingKey(); + String routingKey = null; + if (routingKey != null) + { + agentBank = Agent.getBrokerBank(routingKey); + brokerBank = Agent.getBrokerBank(routingKey); + } + } catch (Throwable e) + { + log.warn("Internal QPID error", e); + } + String agentKey = Agent.AgentKey(agentBank, brokerBank); + long timestamp = decoder.readUint64(); + if (broker.Agents.containsKey(agentKey)) + { + Agent agent = broker.Agents.get(agentKey); + console.hearbeatRecieved(agent, timestamp); + } + } + } + + public void handleMethodResponse(Broker broker, Decoder decoder, + long sequence) + { + long code = decoder.readUint32(); + String text = decoder.readStr16(); + java.util.HashMap outArgs = new java.util.HashMap(); + Object obj = sequenceManager.release(sequence); + if (obj == null) + { + return; + } + Object[] pair = (Object[]) obj; + if (code == 0) + { + for (SchemaArgument arg : ((SchemaMethod) pair[0]).Arguments) + { + if (arg.isOutput()) + { + outArgs.put(arg.getName(), this.decodeValue(decoder, arg + .getType())); + } + } + } + MethodResult result = new MethodResult(code, text, outArgs); + if ((Boolean) pair[1]) + { + this.syncResult = result; + broker.setSyncInFlight(false); + } + if (console != null) + { + console.methodResponse(broker, sequence, result); + } + } + + // Callback Methods + public void handleNewAgent(Agent agent) + { + if (console != null) + { + console.newAgent(agent); + } + } + + public void handlePackageIndicator(Broker broker, Decoder decoder, + long sequence) + { + String packageName = decoder.readStr8(); + boolean notify = false; + if (!packages.containsKey(packageName)) + { + synchronized (lockObject) + { + packages.put(packageName, + new java.util.HashMap()); + notify = true; + } + } + if (notify && console != null) + { + console.newPackage(packageName); + } + broker.incrementOutstanding(); + long seq = sequenceManager.reserve(Session.CONTEXT_STARTUP); + Encoder enc = broker.createEncoder('Q', seq); + enc.writeStr8(packageName); + broker.send(enc); + } + + public void handleSchemaResponse(Broker broker, Decoder decoder, + long sequence) + { + short kind = decoder.readUint8(); + ClassKey classKey = new ClassKey(decoder); + SchemaClass sClass = new SchemaClass(kind, classKey, decoder, this); + synchronized (lockObject) + { + java.util.HashMap classMappings = packages + .get(sClass.getPackageName()); + classMappings.remove(sClass.getClassKeyString()); + classMappings.put(sClass.getClassKeyString(), sClass); + log.debug(classKey.toString()); + } + sequenceManager.release(sequence); + broker.decrementOutstanding(); + if (console != null) + { + this.console.newClass(kind, classKey); + } + } + + public MethodResult invokeMethod(QMFObject obj, String name, + List args, boolean synchronous, int timeToLive) + { + Broker aBroker = this.getBroker(obj.brokerBank()); + long seq = this.sendMethodRequest(obj, aBroker, name, args, + synchronous, timeToLive); + if (seq != 0) + { + if (!synchronous) + { + return null; + } + try + { + aBroker.waitForSync(timeToLive); + } catch (Throwable e) + { + sequenceManager.release(seq); + throw new ConsoleException(e); + } + // FIXME missing error logic in the broker + return (MethodResult) syncResult; + } + return null; + } + + public QMFObject makeObject(ClassKey key) + { + SchemaClass sClass = this.getSchema(key); + if (sClass == null) + { + throw new ConsoleException("No schema found for class " + + key.toString()); + } + return this.createQMFObject(sClass, true, true, false); + } + + public QMFObject makeObject(String keyString) + { + return this.makeObject(new ClassKey(keyString)); + } + + public void removeBroker(Broker broker) + { + if (brokers.contains(broker)) + { + brokers.remove(broker); + } + broker.shutdown(); + } + + public boolean selectMatch(QMFObject obj) + { + return true; + } + + protected long sendMethodRequest(QMFObject obj, Broker aBroker, + String name, List args, boolean synchronous, int timeToLive) + { + SchemaMethod method = obj.getSchema().getMethod(name); + if (args == null) + { + args = new ArrayList(); + } + long seq = 0; + if (method != null) + { + Object[] pair = + { method, synchronous }; + seq = sequenceManager.reserve(pair); + Encoder enc = aBroker.createEncoder('M', seq); + obj.getObjectID().encode(enc); + obj.getSchema().getKey().encode(enc); + enc.writeStr8(name); + if (args.size() < method.getInputArgCount()) + { + throw new ConsoleException(String.format( + "Incorrect number of arguments: expected %s, got %s", + method.getInputArgCount(), args.size())); + } + int argIndex = 0; + for (SchemaArgument arg : method.Arguments) + { + if (arg.isInput()) + { + this.encodeValue(enc, arg.getType(), args.get(argIndex)); + argIndex += 1; + } + } + Message msg = aBroker.createMessage(enc); + if (synchronous) + { + aBroker.setSyncInFlight(true); + } + aBroker.send(msg, obj.routingKey(), timeToLive); + } + return seq; + } + + protected void waitForStable() + { + for (Broker broker : brokers) + { + broker.waitForStable(); + } + } +} \ No newline at end of file diff --git a/java/management/console/src/main/java/org/apache/qpid/console/Util.java b/java/management/console/src/main/java/org/apache/qpid/console/Util.java new file mode 100644 index 0000000000..a9e4d68601 --- /dev/null +++ b/java/management/console/src/main/java/org/apache/qpid/console/Util.java @@ -0,0 +1,184 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.console; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.UUID; + +public class Util +{ + private static HashMap ENCODINGS = new HashMap(); + static + { + ENCODINGS.put(String.class, (short) 7); + ENCODINGS.put(Short.class, (short) 1); + ENCODINGS.put(Float.class, (short) 13); + ENCODINGS.put(QMFObject.class, (short) 20); + ENCODINGS.put(Integer.class, (short) 17); + ENCODINGS.put(Long.class, (short) 18); + ENCODINGS.put(ArrayList.class, (short) 21); + } + + public static String accessName(int type) + { + switch (type) + { + // case 0: return "UNKNOWN" ; + case 1: + return "RC"; + case 2: + return "RW"; + case 3: + return "RO"; + } + throw new ConsoleException(String.format("Invalid Access Code: %s", + type)); + } + + public static String byteString(byte[] bytes) + { + return new String(bytes, Charset.forName("UTF-8")); + } + + public static Object defaultValue(short type) + { + switch (type) + { + // case 0: return "UNKNOWN" ; + case 1: + return 0; + case 2: + return 0; + case 3: + return 0l; + case 4: + return 0l; + case 5: + return false; + case 6: + return ""; + case 7: + return ""; + case 8: + return 0l; + case 9: + return 0l; + case 10: + return new ObjectID(); + case 11: + return false; + case 12: + return 0f; + case 13: + return 0d; + case 14: + return new UUID(0, 0); + case 15: + return new HashMap(); + case 16: + return 0; + case 17: + return 0; + case 18: + return 0l; + case 19: + return 0l; + case 20: + return null; + case 21: + return new java.util.ArrayList(); + case 22: + return new java.util.ArrayList(); + } + throw new ConsoleException(String.format("Invalid Type Code: %s", type)); + } + + public static short qmfType(Object obj) + { + if (ENCODINGS.containsKey(obj.getClass())) + { + return ENCODINGS.get(obj.getClass()); + } else + { + throw new ConsoleException(String.format("Unkown Type of %s", obj + .getClass())); + } + } + + public static String typeName(short type) + { + switch (type) + { + // case 0: return "UNKNOWN" ; + case 1: + return "uint8"; + case 2: + return "uint16"; + case 3: + return "uint32"; + case 4: + return "uint64"; + case 5: + return "bool"; + case 6: + return "short-string"; + case 7: + return "long-string"; + case 8: + return "abs-time"; + case 9: + return "delta-time"; + case 10: + return "reference"; + case 11: + return "boolean"; + case 12: + return "float"; + case 13: + return "double"; + case 14: + return "uuid"; + case 15: + return "field-table"; + case 16: + return "int8"; + case 17: + return "int16"; + case 18: + return "int32"; + case 19: + return "int64"; + case 20: + return "object"; + case 21: + return "list"; + case 22: + return "array"; + } + throw new ConsoleException(String.format("Invalid Type Code: %s", type)); + } + + protected Util() + { + } +} \ No newline at end of file diff --git a/java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java b/java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java new file mode 100644 index 0000000000..5904d57e64 --- /dev/null +++ b/java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java @@ -0,0 +1,103 @@ +package org.apache.qpid.console; + + +public class XMLUtil +{ + + public static String commonAttributes(SchemaVariable var) { + String returnString = "" ; + if (var.getDescription() != null){ + returnString = returnString + String.format(" desc='%s'", var.getDescription()) ; + } + + if (var.getRefPackage() != null){ + returnString = returnString + String.format(" refPackage='%s'", var.getRefPackage()) ; + } + + if (var.getRefClass() != null){ + returnString = returnString + String.format(" refClass='%s'", var.getRefClass()) ; + } + + if (var.getUnit() != null){ + returnString = returnString + String.format(" unit='%s'", var.getUnit()) ; + } + + if (var.getMin() != null){ + returnString = returnString + String.format(" min='%s'", var.getMin()) ; + } + if (var.getMax() != null){ + returnString = returnString + String.format(" max='%s'", var.getMax()) ; + } + if (var.getMaxLength() != null){ + returnString = returnString + String.format(" maxLength='%s'", var.getMaxLength()) ; + } + + return returnString ; + } + + public static String schemaXML(Session sess, String packageName) { + String returnValue = String.format("\n", packageName) ; + for (ClassKey key : sess.getClasses(packageName)) { + SchemaClass schema = sess.getSchema(key) ; + if (schema.getKind() == 1) { + if (schema.getSuperType() == null) { + returnValue += String.format("\t\n", key.getClassName(), key.getHashString()) ; + } + else { + returnValue += String.format("\t\n", key.getClassName(), key.getHashString(), schema.getSuperType().getKeyString()) ; + } + for (SchemaProperty prop : schema.getProperties()) { + Object[] attributes = new Object[5] ; + attributes[0] = prop.getName() ; + attributes[1] = Util.typeName(prop.getType()) ; + attributes[2] = Util.accessName(prop.getAccess()) ; + attributes[3] = prop.getOptional()? "True" : "False "; + attributes[4] = XMLUtil.commonAttributes(prop); + returnValue += String.format("\t\t\n", attributes) ; + } + for (SchemaMethod meth : schema.getMethods()) { + returnValue += String.format("\t\t\n", meth.getName()) ; + for (SchemaArgument arg : meth.Arguments) { + Object[] attributes = new Object[4] ; + attributes[0] = arg.getName() ; + attributes[1] = arg.getDirection() ; + attributes[2] = Util.typeName(arg.getType()) ; + attributes[3] = XMLUtil.commonAttributes(arg); + returnValue += String.format("\t\t\t\n", attributes) ; + } + returnValue += String.format("\t\t\n") ; + } + returnValue += String.format("\t\n") ; + } else { + returnValue += String.format("\t\n", key.getClassName(), key.getHashString()) ; + for (SchemaArgument arg : schema.getArguments()) { + Object[] attributes = new Object[4] ; + attributes[0] = arg.getName() ; + attributes[1] = Util.typeName(arg.getType()) ; + attributes[2] = XMLUtil.commonAttributes(arg); + returnValue += String.format("\t\t\t\n", attributes) ; + } + returnValue += String.format("\t\n") ; + } + } + returnValue += String.format("\n") ; + + return returnValue ; + } + + public static String schemaXML(Session sess, String[] packageNames) { + String returnValue = "\n" ; + for (String pack : packageNames) { + returnValue += XMLUtil.schemaXML(sess, pack) ; + returnValue += "\n" ; + } + returnValue += "\n" ; + return returnValue ; + } + + protected XMLUtil() + { + } +} + + -- cgit v1.2.1