summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/console
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
commit66765100f4257159622cefe57bed50125a5ad017 (patch)
treea88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /cpp/src/qpid/console
parent1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff)
parent88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff)
downloadqpid-python-rajith_jms_client.tar.gz
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/console')
-rw-r--r--cpp/src/qpid/console/Agent.cpp30
-rw-r--r--cpp/src/qpid/console/Broker.cpp333
-rw-r--r--cpp/src/qpid/console/ClassKey.cpp105
-rw-r--r--cpp/src/qpid/console/Event.cpp205
-rw-r--r--cpp/src/qpid/console/Object.cpp384
-rw-r--r--cpp/src/qpid/console/ObjectId.cpp91
-rw-r--r--cpp/src/qpid/console/Package.cpp41
-rw-r--r--cpp/src/qpid/console/Schema.cpp165
-rw-r--r--cpp/src/qpid/console/SequenceManager.cpp48
-rw-r--r--cpp/src/qpid/console/SessionManager.cpp517
-rw-r--r--cpp/src/qpid/console/Value.cpp171
11 files changed, 0 insertions, 2090 deletions
diff --git a/cpp/src/qpid/console/Agent.cpp b/cpp/src/qpid/console/Agent.cpp
deleted file mode 100644
index fa76a13583..0000000000
--- a/cpp/src/qpid/console/Agent.cpp
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/console/Agent.h"
-
-std::ostream& qpid::console::operator<<(std::ostream& o, const Agent& agent)
-{
- o << "Agent at bank " << agent.getBrokerBank() << "." << agent.getAgentBank() <<
- " (" << agent.getLabel() << ")";
- return o;
-}
-
diff --git a/cpp/src/qpid/console/Broker.cpp b/cpp/src/qpid/console/Broker.cpp
deleted file mode 100644
index 86a17d4a10..0000000000
--- a/cpp/src/qpid/console/Broker.cpp
+++ /dev/null
@@ -1,333 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/console/Broker.h"
-#include "qpid/console/Object.h"
-#include "qpid/console/Value.h"
-#include "qpid/console/SessionManager.h"
-#include "qpid/console/ConsoleListener.h"
-#include "qpid/log/Statement.h"
-#include "qpid/sys/SystemInfo.h"
-
-using namespace qpid::client;
-using namespace qpid::console;
-using namespace qpid::framing;
-using namespace qpid::sys;
-using namespace std;
-
-Broker::Broker(SessionManager& sm, ConnectionSettings& settings) :
- sessionManager(sm), connected(false), connectionSettings(settings),
- reqsOutstanding(1), syncInFlight(false), topicBound(false), methodObject(0),
- connThreadBody(*this), connThread(connThreadBody)
-{
- string osName;
- string nodeName;
- string release;
- string version;
- string machine;
-
- sys::SystemInfo::getSystemId(osName, nodeName, release, version, machine);
- uint32_t pid = sys::SystemInfo::getParentProcessId();
-
- stringstream text;
-
- text << "qmfc-cpp-" << nodeName << "-" << pid;
- amqpSessionId = string(text.str());
-
- QPID_LOG(debug, "Broker::Broker: constructed, amqpSessionId=" << amqpSessionId);
-}
-
-Broker::~Broker()
-{
- connThreadBody.shutdown();
- connThread.join();
- resetAgents();
- // resetAgents() does not delete the broker agent...
- for (AgentMap::iterator iter = agents.begin(); iter != agents.end(); iter++) {
- delete iter->second;
- }
-}
-
-string Broker::getUrl() const
-{
- stringstream url;
- url << connectionSettings.host << ":" << connectionSettings.port;
- return url.str();
-}
-
-void Broker::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) const
-{
- buf.putOctet('A');
- buf.putOctet('M');
- buf.putOctet('2');
- buf.putOctet(opcode);
- buf.putLong (seq);
-}
-
-bool Broker::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) const
-{
- if (buf.getSize() < 8)
- return false;
-
- uint8_t h1 = buf.getOctet();
- uint8_t h2 = buf.getOctet();
- uint8_t h3 = buf.getOctet();
-
- *opcode = buf.getOctet();
- *seq = buf.getLong();
-
- return h1 == 'A' && h2 == 'M' && h3 == '2';
-}
-
-void Broker::received(qpid::client::Message& msg)
-{
-#define QMF_HEADER_SIZE 8
- string data = msg.getData();
- Buffer inBuffer(const_cast<char*>(data.c_str()), data.size());
- uint8_t opcode;
- uint32_t sequence;
-
- while (inBuffer.available() >= QMF_HEADER_SIZE) {
- if (checkHeader(inBuffer, &opcode, &sequence)) {
- QPID_LOG(trace, "Broker::received: opcode=" << opcode << " seq=" << sequence);
-
- if (opcode == 'b') sessionManager.handleBrokerResp(this, inBuffer, sequence);
- else if (opcode == 'p') sessionManager.handlePackageInd(this, inBuffer, sequence);
- else if (opcode == 'z') sessionManager.handleCommandComplete(this, inBuffer, sequence);
- else if (opcode == 'q') sessionManager.handleClassInd(this, inBuffer, sequence);
- else if (opcode == 'm') sessionManager.handleMethodResp(this, inBuffer, sequence);
- else if (opcode == 'h') sessionManager.handleHeartbeatInd(this, inBuffer, sequence);
- else if (opcode == 'e') sessionManager.handleEventInd(this, inBuffer, sequence);
- else if (opcode == 's') sessionManager.handleSchemaResp(this, inBuffer, sequence);
- else if (opcode == 'c') sessionManager.handleContentInd(this, inBuffer, sequence, true, false);
- else if (opcode == 'i') sessionManager.handleContentInd(this, inBuffer, sequence, false, true);
- else if (opcode == 'g') sessionManager.handleContentInd(this, inBuffer, sequence, true, true);
- } else
- return;
- }
-}
-
-void Broker::resetAgents()
-{
- for (AgentMap::iterator iter = agents.begin(); iter != agents.end(); iter++) {
- if (sessionManager.listener != 0)
- sessionManager.listener->delAgent(*(iter->second));
- delete iter->second;
- }
-
- agents.clear();
- agents[0x0000000100000000LL] = new Agent(this, 0, "BrokerAgent");
-}
-
-void Broker::updateAgent(const Object& object)
-{
- uint32_t brokerBank = object.attrUint("brokerBank");
- uint32_t agentBank = object.attrUint("agentBank");
- uint64_t agentKey = ((uint64_t) brokerBank << 32) | (uint64_t) agentBank;
- AgentMap::iterator iter = agents.find(agentKey);
-
- if (object.isDeleted()) {
- if (iter != agents.end()) {
- if (sessionManager.listener != 0)
- sessionManager.listener->delAgent(*(iter->second));
- delete iter->second;
- agents.erase(iter);
- }
- } else {
- if (iter == agents.end()) {
- Agent* agent = new Agent(this, agentBank, object.attrString("label"));
- agents[agentKey] = agent;
- if (sessionManager.listener != 0)
- sessionManager.listener->newAgent(*agent);
- }
- }
-}
-
-void Broker::ConnectionThread::run()
-{
- static const int delayMin(1);
- static const int delayMax(128);
- static const int delayFactor(2);
- int delay(delayMin);
- string dest("qmfc");
-
- sessionId.generate();
- queueName << "qmfc-" << sessionId;
-
- while (true) {
- try {
- broker.topicBound = false;
- broker.reqsOutstanding = 1;
- connection.open(broker.connectionSettings);
- session = connection.newSession(queueName.str());
- subscriptions = new client::SubscriptionManager(session);
-
- session.queueDeclare(arg::queue=queueName.str(), arg::autoDelete=true,
- arg::exclusive=true);
- session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(),
- arg::bindingKey=queueName.str());
-
- subscriptions->setAcceptMode(ACCEPT_MODE_NONE);
- subscriptions->setAcquireMode(ACQUIRE_MODE_PRE_ACQUIRED);
- subscriptions->subscribe(broker, queueName.str(), dest);
- subscriptions->setFlowControl(dest, FlowControl::unlimited());
- {
- Mutex::ScopedLock _lock(connLock);
- if (shuttingDown)
- return;
- operational = true;
- broker.resetAgents();
- broker.connected = true;
- broker.sessionManager.handleBrokerConnect(&broker);
- broker.sessionManager.startProtocol(&broker);
- try {
- Mutex::ScopedUnlock _unlock(connLock);
- subscriptions->run();
- } catch (std::exception) {}
-
- operational = false;
- broker.connected = false;
- broker.sessionManager.handleBrokerDisconnect(&broker);
- }
- delay = delayMin;
- connection.close();
- delete subscriptions;
- subscriptions = 0;
- } catch (std::exception &e) {
- QPID_LOG(debug, " outer exception: " << e.what());
- if (delay < delayMax)
- delay *= delayFactor;
- }
-
- {
- Mutex::ScopedLock _lock(connLock);
- if (shuttingDown)
- return;
- {
- Mutex::ScopedUnlock _unlock(connLock);
- ::sleep(delay);
- }
- if (shuttingDown)
- return;
- }
- }
-}
-
-Broker::ConnectionThread::~ConnectionThread()
-{
- if (subscriptions != 0) {
- delete subscriptions;
- }
-}
-
-void Broker::ConnectionThread::sendBuffer(Buffer& buf, uint32_t length,
- const string& exchange, const string& routingKey)
-{
- {
- Mutex::ScopedLock _lock(connLock);
- if (!operational)
- return;
- }
-
- client::Message msg;
- string data;
-
- buf.getRawData(data, length);
- msg.getDeliveryProperties().setRoutingKey(routingKey);
- msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
- msg.setData(data);
- try {
- session.messageTransfer(arg::content=msg, arg::destination=exchange);
- } catch(std::exception&) {}
-}
-
-void Broker::ConnectionThread::bindExchange(const std::string& exchange, const std::string& key)
-{
- {
- Mutex::ScopedLock _lock(connLock);
- if (!operational)
- return;
- }
-
- QPID_LOG(debug, "Broker::ConnectionThread::bindExchange: exchange=" << exchange << " key=" << key);
- session.exchangeBind(arg::exchange=exchange, arg::queue=queueName.str(),
- arg::bindingKey=key);
-}
-
-void Broker::ConnectionThread::shutdown()
-{
- {
- Mutex::ScopedLock _lock(connLock);
- shuttingDown = true;
- }
- if (subscriptions)
- subscriptions->stop();
-}
-
-void Broker::waitForStable()
-{
- Mutex::ScopedLock l(lock);
- if (reqsOutstanding == 0)
- return;
- syncInFlight = true;
- while (reqsOutstanding != 0) {
- bool result = cond.wait(lock, AbsTime(now(), TIME_SEC * sessionManager.settings.getTimeout));
- if (!result)
- throw(Exception("Timed out waiting for broker to synchronize"));
- }
-}
-
-void Broker::incOutstanding()
-{
- Mutex::ScopedLock l(lock);
- reqsOutstanding++;
-}
-
-void Broker::decOutstanding()
-{
- Mutex::ScopedLock l(lock);
- reqsOutstanding--;
- if (reqsOutstanding == 0) {
- if (!topicBound) {
- topicBound = true;
- for (vector<string>::const_iterator iter = sessionManager.bindingKeyList.begin();
- iter != sessionManager.bindingKeyList.end(); iter++)
- connThreadBody.bindExchange("qpid.management", *iter);
- }
- if (syncInFlight) {
- syncInFlight = false;
- cond.notify();
- }
- }
-}
-
-void Broker::appendAgents(Agent::Vector& agentlist) const
-{
- for (AgentMap::const_iterator iter = agents.begin(); iter != agents.end(); iter++) {
- agentlist.push_back(iter->second);
- }
-}
-
-ostream& qpid::console::operator<<(ostream& o, const Broker& k)
-{
- o << "Broker: " << k.connectionSettings.host << ":" << k.connectionSettings.port;
- return o;
-}
diff --git a/cpp/src/qpid/console/ClassKey.cpp b/cpp/src/qpid/console/ClassKey.cpp
deleted file mode 100644
index 7a16113bae..0000000000
--- a/cpp/src/qpid/console/ClassKey.cpp
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/console/ClassKey.h"
-#include <string.h>
-#include <cstdio>
-
-using namespace std;
-using namespace qpid::console;
-
-ClassKey::ClassKey(const string& _package, const string& _name, const uint8_t* _hash) :
- package(_package), name(_name)
-{
- ::memcpy(hash, _hash, HASH_SIZE);
-}
-
-string ClassKey::getHashString() const
-{
- char cstr[36];
- ::sprintf(cstr, "%02x%02x%02x%02x-%02x%02x%02x%02x-%02x%02x%02x%02x-%02x%02x%02x%02x",
- hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6], hash[7],
- hash[8], hash[9], hash[10], hash[11], hash[12], hash[13], hash[14], hash[15]);
- return string(cstr);
-}
-
-string ClassKey::str() const
-{
- string result(package + ":" + name + "(" + getHashString() + ")");
- return result;
-}
-
-bool ClassKey::operator==(const ClassKey& other) const
-{
- return ::memcmp(hash, other.hash, HASH_SIZE) == 0 &&
- name == other.name &&
- package == other.package;
-}
-
-bool ClassKey::operator!=(const ClassKey& other) const
-{
- return !(*this == other);
-}
-
-bool ClassKey::operator<(const ClassKey& other) const
-{
- int cmp = ::memcmp(hash, other.hash, HASH_SIZE);
- if (cmp != 0)
- return cmp < 0;
- cmp = name.compare(other.name);
- if (cmp != 0)
- return cmp < 0;
- return package < other.package;
-}
-
-bool ClassKey::operator>(const ClassKey& other) const
-{
- int cmp = ::memcmp(hash, other.hash, HASH_SIZE);
- if (cmp != 0)
- return cmp > 0;
- cmp = name.compare(other.name);
- if (cmp != 0)
- return cmp > 0;
- return package > other.package;
-}
-
-bool ClassKey::operator<=(const ClassKey& other) const
-{
- return !(*this > other);
-}
-
-bool ClassKey::operator>=(const ClassKey& other) const
-{
- return !(*this < other);
-}
-
-void ClassKey::encode(qpid::framing::Buffer& buffer) const
-{
- buffer.putShortString(package);
- buffer.putShortString(name);
- buffer.putBin128(const_cast<uint8_t*>(hash));
-}
-
-ostream& qpid::console::operator<<(ostream& o, const ClassKey& k)
-{
- o << k.str();
- return o;
-}
diff --git a/cpp/src/qpid/console/Event.cpp b/cpp/src/qpid/console/Event.cpp
deleted file mode 100644
index 3e14804b35..0000000000
--- a/cpp/src/qpid/console/Event.cpp
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/console/Broker.h"
-#include "qpid/console/ClassKey.h"
-#include "qpid/console/Schema.h"
-#include "qpid/console/Event.h"
-#include "qpid/console/Value.h"
-#include "qpid/sys/Time.h"
-#include "qpid/framing/Buffer.h"
-
-using namespace qpid::console;
-using namespace std;
-using qpid::framing::Uuid;
-using qpid::framing::FieldTable;
-
-Event::Event(Broker* _broker, SchemaClass* _schema, qpid::framing::Buffer& buffer) :
- broker(_broker), schema(_schema)
-{
- timestamp = buffer.getLongLong();
- severity = (Severity) buffer.getOctet();
- for (vector<SchemaArgument*>::const_iterator aIter = schema->arguments.begin();
- aIter != schema->arguments.end(); aIter++) {
- SchemaArgument* argument = *aIter;
- attributes[argument->name] = argument->decodeValue(buffer);
- }
-}
-
-const ClassKey& Event::getClassKey() const
-{
- return schema->getClassKey();
-}
-
-string Event::getSeverityString() const
-{
- switch (severity) {
- case SEV_EMERGENCY : return string("EMER");
- case SEV_ALERT : return string("ALERT");
- case SEV_CRITICAL : return string("CRIT");
- case SEV_ERROR : return string("ERROR");
- case SEV_WARNING : return string("WARN");
- case SEV_NOTICE : return string("NOTIC");
- case SEV_INFO : return string("INFO");
- case SEV_DEBUG : return string("DEBUG");
- }
- return string("<UNKNOWN>");
-}
-
-ObjectId Event::attrRef(const string& key) const
-{
- Object::AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return ObjectId();
- Value::Ptr val = iter->second;
- if (!val->isObjectId())
- return ObjectId();
- return val->asObjectId();
-}
-
-uint32_t Event::attrUint(const string& key) const
-{
- Object::AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return 0;
- Value::Ptr val = iter->second;
- if (!val->isUint())
- return 0;
- return val->asUint();
-}
-
-int32_t Event::attrInt(const string& key) const
-{
- Object::AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return 0;
- Value::Ptr val = iter->second;
- if (!val->isInt())
- return 0;
- return val->asInt();
-}
-
-uint64_t Event::attrUint64(const string& key) const
-{
- Object::AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return 0;
- Value::Ptr val = iter->second;
- if (!val->isUint64())
- return 0;
- return val->asUint64();
-}
-
-int64_t Event::attrInt64(const string& key) const
-{
- Object::AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return 0;
- Value::Ptr val = iter->second;
- if (!val->isInt64())
- return 0;
- return val->asInt64();
-}
-
-string Event::attrString(const string& key) const
-{
- Object::AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return string();
- Value::Ptr val = iter->second;
- if (!val->isString())
- return string();
- return val->asString();
-}
-
-bool Event::attrBool(const string& key) const
-{
- Object::AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return false;
- Value::Ptr val = iter->second;
- if (!val->isBool())
- return false;
- return val->asBool();
-}
-
-float Event::attrFloat(const string& key) const
-{
- Object::AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return 0.0;
- Value::Ptr val = iter->second;
- if (!val->isFloat())
- return 0.0;
- return val->asFloat();
-}
-
-double Event::attrDouble(const string& key) const
-{
- Object::AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return 0.0;
- Value::Ptr val = iter->second;
- if (!val->isDouble())
- return 0.0;
- return val->asDouble();
-}
-
-Uuid Event::attrUuid(const string& key) const
-{
- Object::AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return Uuid();
- Value::Ptr val = iter->second;
- if (!val->isUuid())
- return Uuid();
- return val->asUuid();
-}
-
-FieldTable Event::attrMap(const string& key) const
-{
- Object::AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return FieldTable();
- Value::Ptr val = iter->second;
- if (!val->isMap())
- return FieldTable();
- return val->asMap();
-}
-
-
-std::ostream& qpid::console::operator<<(std::ostream& o, const Event& event)
-{
- const ClassKey& key = event.getClassKey();
- sys::AbsTime aTime(sys::AbsTime(), sys::Duration(event.getTimestamp()));
- o << aTime << " " << event.getSeverityString() << " " <<
- key.getPackageName() << ":" << key.getClassName() <<
- " broker=" << event.getBroker()->getUrl();
-
- const Object::AttributeMap& attributes = event.getAttributes();
- for (Object::AttributeMap::const_iterator iter = attributes.begin();
- iter != attributes.end(); iter++) {
- o << " " << iter->first << "=" << iter->second->str();
- }
- return o;
-}
-
-
diff --git a/cpp/src/qpid/console/Object.cpp b/cpp/src/qpid/console/Object.cpp
deleted file mode 100644
index 6570e293ab..0000000000
--- a/cpp/src/qpid/console/Object.cpp
+++ /dev/null
@@ -1,384 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/console/SessionManager.h"
-#include "qpid/console/Broker.h"
-#include "qpid/console/Object.h"
-#include "qpid/console/Schema.h"
-#include "qpid/console/ClassKey.h"
-#include "qpid/console/Value.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/sys/Mutex.h"
-
-using namespace qpid::console;
-using namespace qpid::sys;
-using namespace qpid;
-using namespace std;
-using qpid::framing::Uuid;
-using qpid::framing::FieldTable;
-
-void Object::AttributeMap::addRef(const string& key, const ObjectId& val)
-{
- (*this)[key] = Value::Ptr(new RefValue(val));
-}
-
-void Object::AttributeMap::addUint(const string& key, uint32_t val)
-{
- (*this)[key] = Value::Ptr(new UintValue(val));
-}
-
-void Object::AttributeMap::addInt(const string& key, int32_t val)
-{
- (*this)[key] = Value::Ptr(new IntValue(val));
-}
-
-void Object::AttributeMap::addUint64(const string& key, uint64_t val)
-{
- (*this)[key] = Value::Ptr(new Uint64Value(val));
-}
-
-void Object::AttributeMap::addInt64(const string& key, int64_t val)
-{
- (*this)[key] = Value::Ptr(new Int64Value(val));
-}
-
-void Object::AttributeMap::addString(const string& key, const string& val)
-{
- (*this)[key] = Value::Ptr(new StringValue(val));
-}
-
-void Object::AttributeMap::addBool(const string& key, bool val)
-{
- (*this)[key] = Value::Ptr(new BoolValue(val));
-}
-
-void Object::AttributeMap::addFloat(const string& key, float val)
-{
- (*this)[key] = Value::Ptr(new FloatValue(val));
-}
-
-void Object::AttributeMap::addDouble(const string& key, double val)
-{
- (*this)[key] = Value::Ptr(new DoubleValue(val));
-}
-
-void Object::AttributeMap::addUuid(const string& key, const Uuid& val)
-{
- (*this)[key] = Value::Ptr(new UuidValue(val));
-}
-
-void Object::AttributeMap::addMap(const string& key, const FieldTable& val)
-{
- (*this)[key] = Value::Ptr(new MapValue(val));
-}
-
-Object::Object(Broker* b, SchemaClass* s, framing::Buffer& buffer, bool prop, bool stat) :
- broker(b), schema(s), pendingMethod(0)
-{
- currentTime = buffer.getLongLong();
- createTime = buffer.getLongLong();
- deleteTime = buffer.getLongLong();
- objectId.decode(buffer);
-
- if (prop) {
- set<string> excludes;
- parsePresenceMasks(buffer, excludes);
- for (vector<SchemaProperty*>::const_iterator pIter = schema->properties.begin();
- pIter != schema->properties.end(); pIter++) {
- SchemaProperty* property = *pIter;
- if (excludes.count(property->name) != 0) {
- attributes[property->name] = Value::Ptr(new NullValue());
- } else {
- attributes[property->name] = property->decodeValue(buffer);
- }
- }
- }
-
- if (stat) {
- for (vector<SchemaStatistic*>::const_iterator sIter = schema->statistics.begin();
- sIter != schema->statistics.end(); sIter++) {
- SchemaStatistic* statistic = *sIter;
- attributes[statistic->name] = statistic->decodeValue(buffer);
- }
- }
-}
-
-Object::~Object() {}
-
-const ClassKey& Object::getClassKey() const
-{
- return schema->getClassKey();
-}
-
-string Object::getIndex() const
-{
- string result;
-
- for (vector<SchemaProperty*>::const_iterator pIter = schema->properties.begin();
- pIter != schema->properties.end(); pIter++) {
- SchemaProperty* property = *pIter;
- if (property->isIndex) {
- AttributeMap::const_iterator vIter = attributes.find(property->name);
- if (vIter != attributes.end()) {
- if (!result.empty())
- result += ":";
- result += vIter->second->str();
- }
- }
- }
- return result;
-}
-
-void Object::mergeUpdate(const Object& /*updated*/)
-{
- // TODO
-}
-
-void Object::invokeMethod(const string name, const AttributeMap& args, MethodResponse& result)
-{
- for (vector<SchemaMethod*>::const_iterator iter = schema->methods.begin();
- iter != schema->methods.end(); iter++) {
- if ((*iter)->name == name) {
- SchemaMethod* method = *iter;
- char rawbuffer[65536];
- framing::Buffer buffer(rawbuffer, 65536);
- uint32_t sequence = broker->sessionManager.sequenceManager.reserve("method");
- pendingMethod = method;
- broker->methodObject = this;
- broker->encodeHeader(buffer, 'M', sequence);
- objectId.encode(buffer);
- schema->key.encode(buffer);
- buffer.putShortString(name);
-
- for (vector<SchemaArgument*>::const_iterator aIter = method->arguments.begin();
- aIter != method->arguments.end(); aIter++) {
- SchemaArgument* arg = *aIter;
- if (arg->dirInput) {
- AttributeMap::const_iterator attr = args.find(arg->name);
- if (attr != args.end()) {
- ValueFactory::encodeValue(arg->typeCode, attr->second, buffer);
- } else {
- // TODO Use the default value instead of throwing
- throw Exception("Missing arguments in method call");
- }
- }
- }
-
- uint32_t length = buffer.getPosition();
- buffer.reset();
- stringstream routingKey;
- routingKey << "agent." << objectId.getBrokerBank() << "." << objectId.getAgentBank();
- broker->connThreadBody.sendBuffer(buffer, length, "qpid.management", routingKey.str());
-
- {
- Mutex::ScopedLock l(broker->lock);
- bool ok = true;
- while (pendingMethod != 0 && ok) {
- ok = broker->cond.wait(broker->lock, AbsTime(now(), broker->sessionManager.settings.methodTimeout * TIME_SEC));
- }
-
- if (!ok) {
- result.code = 0x1001;
- result.text.assign("Method call timed out");
- result.arguments.clear();
- } else {
- result = methodResponse;
- }
- }
- }
- }
-}
-
-void Object::handleMethodResp(framing::Buffer& buffer, uint32_t sequence)
-{
- broker->sessionManager.sequenceManager.release(sequence);
- methodResponse.code = buffer.getLong();
- buffer.getMediumString(methodResponse.text);
- methodResponse.arguments.clear();
-
- for (vector<SchemaArgument*>::const_iterator aIter = pendingMethod->arguments.begin();
- aIter != pendingMethod->arguments.end(); aIter++) {
- SchemaArgument* arg = *aIter;
- if (arg->dirOutput) {
- methodResponse.arguments[arg->name] = arg->decodeValue(buffer);
- }
- }
-
- {
- Mutex::ScopedLock l(broker->lock);
- pendingMethod = 0;
- broker->cond.notify();
- }
-}
-
-ObjectId Object::attrRef(const string& key) const
-{
- AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return ObjectId();
- Value::Ptr val = iter->second;
- if (!val->isObjectId())
- return ObjectId();
- return val->asObjectId();
-}
-
-uint32_t Object::attrUint(const string& key) const
-{
- AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return 0;
- Value::Ptr val = iter->second;
- if (!val->isUint())
- return 0;
- return val->asUint();
-}
-
-int32_t Object::attrInt(const string& key) const
-{
- AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return 0;
- Value::Ptr val = iter->second;
- if (!val->isInt())
- return 0;
- return val->asInt();
-}
-
-uint64_t Object::attrUint64(const string& key) const
-{
- AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return 0;
- Value::Ptr val = iter->second;
- if (!val->isUint64())
- return 0;
- return val->asUint64();
-}
-
-int64_t Object::attrInt64(const string& key) const
-{
- AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return 0;
- Value::Ptr val = iter->second;
- if (!val->isInt64())
- return 0;
- return val->asInt64();
-}
-
-string Object::attrString(const string& key) const
-{
- AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return string();
- Value::Ptr val = iter->second;
- if (!val->isString())
- return string();
- return val->asString();
-}
-
-bool Object::attrBool(const string& key) const
-{
- AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return false;
- Value::Ptr val = iter->second;
- if (!val->isBool())
- return false;
- return val->asBool();
-}
-
-float Object::attrFloat(const string& key) const
-{
- AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return 0.0;
- Value::Ptr val = iter->second;
- if (!val->isFloat())
- return 0.0;
- return val->asFloat();
-}
-
-double Object::attrDouble(const string& key) const
-{
- AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return 0.0;
- Value::Ptr val = iter->second;
- if (!val->isDouble())
- return 0.0;
- return val->asDouble();
-}
-
-Uuid Object::attrUuid(const string& key) const
-{
- AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return Uuid();
- Value::Ptr val = iter->second;
- if (!val->isUuid())
- return Uuid();
- return val->asUuid();
-}
-
-FieldTable Object::attrMap(const string& key) const
-{
- AttributeMap::const_iterator iter = attributes.find(key);
- if (iter == attributes.end())
- return FieldTable();
- Value::Ptr val = iter->second;
- if (!val->isMap())
- return FieldTable();
- return val->asMap();
-}
-
-void Object::parsePresenceMasks(framing::Buffer& buffer, set<string>& excludeList)
-{
- excludeList.clear();
- uint8_t bit = 0;
- uint8_t mask = 0;
-
- for (vector<SchemaProperty*>::const_iterator pIter = schema->properties.begin();
- pIter != schema->properties.end(); pIter++) {
- SchemaProperty* property = *pIter;
- if (property->isOptional) {
- if (bit == 0) {
- mask = buffer.getOctet();
- bit = 1;
- }
- if ((mask & bit) == 0)
- excludeList.insert(property->name);
- if (bit == 0x80)
- bit = 0;
- else
- bit = bit << 1;
- }
- }
-}
-
-ostream& qpid::console::operator<<(ostream& o, const Object& object)
-{
- const ClassKey& key = object.getClassKey();
- o << key.getPackageName() << ":" << key.getClassName() << "[" << object.getObjectId() << "] " <<
- object.getIndex();
- return o;
-}
-
diff --git a/cpp/src/qpid/console/ObjectId.cpp b/cpp/src/qpid/console/ObjectId.cpp
deleted file mode 100644
index fbaad20d57..0000000000
--- a/cpp/src/qpid/console/ObjectId.cpp
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/console/ObjectId.h"
-#include "qpid/framing/Buffer.h"
-
-using namespace qpid::console;
-using namespace qpid;
-using namespace std;
-
-ObjectId::ObjectId(framing::Buffer& buffer)
-{
- decode(buffer);
-}
-
-void ObjectId::decode(framing::Buffer& buffer)
-{
- first = buffer.getLongLong();
- second = buffer.getLongLong();
-}
-
-void ObjectId::encode(framing::Buffer& buffer)
-{
- buffer.putLongLong(first);
- buffer.putLongLong(second);
-}
-
-bool ObjectId::operator==(const ObjectId& other) const
-{
- return second == other.second && first == other.first;
-}
-
-bool ObjectId::operator!=(const ObjectId& other) const
-{
- return !(*this == other);
-}
-
-bool ObjectId::operator<(const ObjectId& other) const
-{
- if (first < other.first)
- return true;
- if (first > other.first)
- return false;
- return second < other.second;
-}
-
-bool ObjectId::operator>(const ObjectId& other) const
-{
- if (first > other.first)
- return true;
- if (first < other.first)
- return false;
- return second > other.second;
-}
-
-bool ObjectId::operator<=(const ObjectId& other) const
-{
- return !(*this > other);
-}
-
-bool ObjectId::operator>=(const ObjectId& other) const
-{
- return !(*this < other);
-}
-
-ostream& qpid::console::operator<<(ostream& o, const ObjectId& id)
-{
- o << (int) id.getFlags() << "-" << id.getSequence() << "-" << id.getBrokerBank() << "-" <<
- id.getAgentBank() << "-" << id.getObject();
- return o;
-}
-
-
diff --git a/cpp/src/qpid/console/Package.cpp b/cpp/src/qpid/console/Package.cpp
deleted file mode 100644
index e5d6fa29fd..0000000000
--- a/cpp/src/qpid/console/Package.cpp
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/console/Package.h"
-
-using namespace qpid::console;
-
-SchemaClass* Package::getClass(const std::string& className, uint8_t* hash)
-{
- NameHash key(className, hash);
- ClassMap::iterator iter = classes.find(key);
- if (iter != classes.end())
- return iter->second;
- return 0;
-}
-
-void Package::addClass(const std::string& className, uint8_t* hash, SchemaClass* schemaClass)
-{
- NameHash key(className, hash);
- ClassMap::iterator iter = classes.find(key);
- if (iter == classes.end())
- classes[key] = schemaClass;
-}
diff --git a/cpp/src/qpid/console/Schema.cpp b/cpp/src/qpid/console/Schema.cpp
deleted file mode 100644
index a3dbd91201..0000000000
--- a/cpp/src/qpid/console/Schema.cpp
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/console/Schema.h"
-#include "qpid/console/Value.h"
-#include "qpid/framing/FieldTable.h"
-
-using namespace qpid::console;
-using namespace qpid;
-using std::string;
-using std::vector;
-
-SchemaArgument::SchemaArgument(framing::Buffer& buffer, bool forMethod)
-{
- framing::FieldTable map;
- map.decode(buffer);
-
- name = map.getAsString("name");
- typeCode = map.getAsInt("type");
- unit = map.getAsString("unit");
- min = map.getAsInt("min");
- max = map.getAsInt("max");
- maxLen = map.getAsInt("maxlen");
- desc = map.getAsString("desc");
-
- dirInput = false;
- dirOutput = false;
- if (forMethod) {
- string dir(map.getAsString("dir"));
- if (dir.find('I') != dir.npos || dir.find('i') != dir.npos)
- dirInput = true;
- if (dir.find('O') != dir.npos || dir.find('o') != dir.npos)
- dirOutput = true;
- }
-}
-
-Value::Ptr SchemaArgument::decodeValue(framing::Buffer& buffer)
-{
- return ValueFactory::newValue(typeCode, buffer);
-}
-
-SchemaProperty::SchemaProperty(framing::Buffer& buffer)
-{
- framing::FieldTable map;
- map.decode(buffer);
-
- name = map.getAsString("name");
- typeCode = map.getAsInt("type");
- accessCode = map.getAsInt("access");
- isIndex = map.getAsInt("index") != 0;
- isOptional = map.getAsInt("optional") != 0;
- unit = map.getAsString("unit");
- min = map.getAsInt("min");
- max = map.getAsInt("max");
- maxLen = map.getAsInt("maxlen");
- desc = map.getAsString("desc");
-}
-
-Value::Ptr SchemaProperty::decodeValue(framing::Buffer& buffer)
-{
- return ValueFactory::newValue(typeCode, buffer);
-}
-
-SchemaStatistic::SchemaStatistic(framing::Buffer& buffer)
-{
- framing::FieldTable map;
- map.decode(buffer);
-
- name = map.getAsString("name");
- typeCode = map.getAsInt("type");
- unit = map.getAsString("unit");
- desc = map.getAsString("desc");
-}
-
-Value::Ptr SchemaStatistic::decodeValue(framing::Buffer& buffer)
-{
- return ValueFactory::newValue(typeCode, buffer);
-}
-
-SchemaMethod::SchemaMethod(framing::Buffer& buffer)
-{
- framing::FieldTable map;
- map.decode(buffer);
-
- name = map.getAsString("name");
- desc = map.getAsString("desc");
- int argCount = map.getAsInt("argCount");
-
- for (int i = 0; i < argCount; i++)
- arguments.push_back(new SchemaArgument(buffer, true));
-}
-
-SchemaMethod::~SchemaMethod()
-{
- for (vector<SchemaArgument*>::iterator iter = arguments.begin();
- iter != arguments.end(); iter++)
- delete *iter;
-}
-
-SchemaClass::SchemaClass(const uint8_t _kind, const ClassKey& _key, framing::Buffer& buffer) :
- kind(_kind), key(_key)
-{
- if (kind == KIND_TABLE) {
- uint8_t hasSupertype = 0; //buffer.getOctet();
- uint16_t propCount = buffer.getShort();
- uint16_t statCount = buffer.getShort();
- uint16_t methodCount = buffer.getShort();
-
- if (hasSupertype) {
- string unused;
- buffer.getShortString(unused);
- buffer.getShortString(unused);
- buffer.getLongLong();
- buffer.getLongLong();
- }
-
- for (uint16_t idx = 0; idx < propCount; idx++)
- properties.push_back(new SchemaProperty(buffer));
- for (uint16_t idx = 0; idx < statCount; idx++)
- statistics.push_back(new SchemaStatistic(buffer));
- for (uint16_t idx = 0; idx < methodCount; idx++)
- methods.push_back(new SchemaMethod(buffer));
-
- } else if (kind == KIND_EVENT) {
- uint16_t argCount = buffer.getShort();
-
- for (uint16_t idx = 0; idx < argCount; idx++)
- arguments.push_back(new SchemaArgument(buffer));
- }
-}
-
-SchemaClass::~SchemaClass()
-{
- for (vector<SchemaProperty*>::iterator iter = properties.begin();
- iter != properties.end(); iter++)
- delete *iter;
- for (vector<SchemaStatistic*>::iterator iter = statistics.begin();
- iter != statistics.end(); iter++)
- delete *iter;
- for (vector<SchemaMethod*>::iterator iter = methods.begin();
- iter != methods.end(); iter++)
- delete *iter;
- for (vector<SchemaArgument*>::iterator iter = arguments.begin();
- iter != arguments.end(); iter++)
- delete *iter;
-}
-
diff --git a/cpp/src/qpid/console/SequenceManager.cpp b/cpp/src/qpid/console/SequenceManager.cpp
deleted file mode 100644
index 86ea829749..0000000000
--- a/cpp/src/qpid/console/SequenceManager.cpp
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/console/SequenceManager.h"
-
-using namespace qpid::console;
-using namespace qpid::sys;
-using std::string;
-using std::cout;
-using std::endl;
-
-uint32_t SequenceManager::reserve(const std::string& context)
-{
- Mutex::ScopedLock l(lock);
- uint32_t result = sequence++;
- pending[result] = context;
- return result;
-}
-
-std::string SequenceManager::release(uint32_t seq)
-{
- Mutex::ScopedLock l(lock);
- std::map<uint32_t, string>::iterator iter = pending.find(seq);
- if (iter == pending.end())
- return string();
- string result(iter->second);
- pending.erase(iter);
- return result;
-}
-
diff --git a/cpp/src/qpid/console/SessionManager.cpp b/cpp/src/qpid/console/SessionManager.cpp
deleted file mode 100644
index 80c5959417..0000000000
--- a/cpp/src/qpid/console/SessionManager.cpp
+++ /dev/null
@@ -1,517 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/console/SessionManager.h"
-#include "qpid/console/Schema.h"
-#include "qpid/console/Agent.h"
-#include "qpid/console/ConsoleListener.h"
-#include "qpid/log/Statement.h"
-#include "qpid/sys/Time.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/framing/Uuid.h"
-#include "qpid/framing/FieldTable.h"
-
-using namespace qpid::console;
-using namespace qpid::sys;
-using namespace qpid;
-using namespace std;
-using qpid::framing::Buffer;
-using qpid::framing::FieldTable;
-
-SessionManager::SessionManager(ConsoleListener* _listener, Settings _settings) :
- listener(_listener), settings(_settings)
-{
- bindingKeys();
-}
-
-SessionManager::~SessionManager()
-{
- for (vector<Broker*>::iterator iter = brokers.begin();
- iter != brokers.end(); iter++)
- delete *iter;
-
- for (map<string, Package*>::iterator iter = packages.begin();
- iter != packages.end(); iter++) {
- for (Package::ClassMap::iterator citer = iter->second->classes.begin();
- citer != iter->second->classes.end();
- citer++)
- delete citer->second;
- delete iter->second;
- }
-}
-
-Broker* SessionManager::addBroker(client::ConnectionSettings& settings)
-{
- Broker* broker(new Broker(*this, settings));
- {
- Mutex::ScopedLock l(brokerListLock);
- brokers.push_back(broker);
- }
- return broker;
-}
-
-void SessionManager::delBroker(Broker* broker)
-{
- Mutex::ScopedLock l(brokerListLock);
- for (vector<Broker*>::iterator iter = brokers.begin();
- iter != brokers.end(); iter++)
- if (*iter == broker) {
- brokers.erase(iter);
- delete broker;
- return;
- }
-}
-
-void SessionManager::getPackages(NameVector& packageNames)
-{
- allBrokersStable();
- packageNames.clear();
- {
- Mutex::ScopedLock l(lock);
- for (map<string, Package*>::iterator iter = packages.begin();
- iter != packages.end(); iter++)
- packageNames.push_back(iter->first);
- }
-}
-
-void SessionManager::getClasses(KeyVector& classKeys, const std::string& packageName)
-{
- allBrokersStable();
- classKeys.clear();
- map<string, Package*>::iterator iter = packages.find(packageName);
- if (iter == packages.end())
- return;
-
- Package& package = *(iter->second);
- for (Package::ClassMap::const_iterator piter = package.classes.begin();
- piter != package.classes.end(); piter++) {
- ClassKey key(piter->second->getClassKey());
- classKeys.push_back(key);
- }
-}
-
-SchemaClass& SessionManager::getSchema(const ClassKey& classKey)
-{
- allBrokersStable();
- map<string, Package*>::iterator iter = packages.find(classKey.getPackageName());
- if (iter == packages.end())
- throw Exception("Unknown package");
-
- Package& package = *(iter->second);
- Package::NameHash key(classKey.getClassName(), classKey.getHash());
- Package::ClassMap::iterator cIter = package.classes.find(key);
- if (cIter == package.classes.end())
- throw Exception("Unknown class");
-
- return *(cIter->second);
-}
-
-void SessionManager::bindPackage(const std::string& packageName)
-{
- stringstream key;
- key << "console.obj.*.*." << packageName << ".#";
- bindingKeyList.push_back(key.str());
- for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++)
- (*iter)->addBinding(key.str());
-}
-
-void SessionManager::bindClass(const ClassKey& classKey)
-{
- bindClass(classKey.getPackageName(), classKey.getClassName());
-}
-
-void SessionManager::bindClass(const std::string& packageName, const std::string& className)
-{
- stringstream key;
- key << "console.obj.*.*." << packageName << "." << className << ".#";
- bindingKeyList.push_back(key.str());
- for (vector<Broker*>::iterator iter = brokers.begin();
- iter != brokers.end(); iter++)
- (*iter)->addBinding(key.str());
-}
-
-
-void SessionManager::bindEvent(const ClassKey& classKey)
-{
- bindEvent(classKey.getPackageName(), classKey.getClassName());
-}
-
-
-void SessionManager::bindEvent(const std::string& packageName, const std::string& eventName)
-{
- if (!settings.userBindings) throw Exception("Session not configured for userBindings.");
- if (settings.rcvEvents) throw Exception("Session already configured to receive all events.");
-
- stringstream key;
- key << "console.event.*.*." << packageName;
- if (eventName.length()) {
- key << "." << eventName << ".#";
- } else {
- key << ".#";
- }
-
- bindingKeyList.push_back(key.str());
- for (vector<Broker*>::iterator iter = brokers.begin();
- iter != brokers.end(); iter++)
- (*iter)->addBinding(key.str());
-}
-
-
-void SessionManager::getAgents(Agent::Vector& agents, Broker* broker)
-{
- agents.clear();
- if (broker != 0) {
- broker->appendAgents(agents);
- } else {
- for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++) {
- (*iter)->appendAgents(agents);
- }
- }
-}
-
-void SessionManager::getObjects(Object::Vector& objects, const std::string& className,
- Broker* _broker, Agent* _agent)
-{
- Agent::Vector agentList;
-
- if (_agent != 0) {
- agentList.push_back(_agent);
- _agent->getBroker()->waitForStable();
- } else {
- if (_broker != 0) {
- _broker->appendAgents(agentList);
- _broker->waitForStable();
- } else {
- allBrokersStable();
- Mutex::ScopedLock _lock(brokerListLock);
- for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++) {
- (*iter)->appendAgents(agentList);
- }
- }
- }
-
- FieldTable ft;
- uint32_t sequence;
- ft.setString("_class", className);
-
- getResult.clear();
- syncSequenceList.clear();
- error = string();
-
- if (agentList.empty()) {
- objects = getResult;
- return;
- }
-
- for (Agent::Vector::iterator iter = agentList.begin(); iter != agentList.end(); iter++) {
- Agent* agent = *iter;
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
- stringstream routingKey;
- routingKey << "agent." << agent->getBrokerBank() << "." << agent->getAgentBank();
- {
- Mutex::ScopedLock _lock(lock);
- sequence = sequenceManager.reserve("multiget");
- syncSequenceList.insert(sequence);
- }
- agent->getBroker()->encodeHeader(buffer, 'G', sequence);
- ft.encode(buffer);
- uint32_t length = buffer.getPosition();
- buffer.reset();
- agent->getBroker()->connThreadBody.sendBuffer(buffer, length, "qpid.management", routingKey.str());
- }
-
- {
- Mutex::ScopedLock _lock(lock);
- sys::AbsTime startTime = sys::now();
- while (!syncSequenceList.empty() && error.empty()) {
- cv.wait(lock, AbsTime(now(), settings.getTimeout * TIME_SEC));
- sys::AbsTime currTime = sys::now();
- if (sys::Duration(startTime, currTime) > settings.getTimeout * TIME_SEC)
- break;
- }
- }
-
- objects = getResult;
-}
-
-void SessionManager::bindingKeys()
-{
- bindingKeyList.push_back("schema.#");
- if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) {
- bindingKeyList.push_back("console.#");
- } else {
- if (settings.rcvObjects && !settings.userBindings)
- bindingKeyList.push_back("console.obj.#");
- else
- bindingKeyList.push_back("console.obj.*.*.org.apache.qpid.broker.agent");
- if (settings.rcvEvents)
- bindingKeyList.push_back("console.event.#");
- if (settings.rcvHeartbeats)
- bindingKeyList.push_back("console.heartbeat");
- }
-}
-
-void SessionManager::allBrokersStable()
-{
- Mutex::ScopedLock l(brokerListLock);
- for (vector<Broker*>::iterator iter = brokers.begin();
- iter != brokers.end(); iter++)
- if ((*iter)->isConnected())
- (*iter)->waitForStable();
-}
-
-void SessionManager::startProtocol(Broker* broker)
-{
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
-
- broker->encodeHeader(buffer, 'B');
- uint32_t length = 512 - buffer.available();
- buffer.reset();
- broker->connThreadBody.sendBuffer(buffer, length);
-}
-
-
-void SessionManager::handleBrokerResp(Broker* broker, Buffer& inBuffer, uint32_t)
-{
- framing::Uuid brokerId;
-
- brokerId.decode(inBuffer);
- broker->setBrokerId(brokerId);
-
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
-
- uint32_t sequence = sequenceManager.reserve("startup");
- broker->encodeHeader(buffer, 'P', sequence);
- uint32_t length = 512 - buffer.available();
- buffer.reset();
- broker->connThreadBody.sendBuffer(buffer, length);
-
- if (listener != 0) {
- listener->brokerInfo(*broker);
- }
-}
-
-void SessionManager::handlePackageInd(Broker* broker, Buffer& inBuffer, uint32_t)
-{
- string packageName;
- inBuffer.getShortString(packageName);
-
- {
- Mutex::ScopedLock l(lock);
- map<string, Package*>::iterator iter = packages.find(packageName);
- if (iter == packages.end()) {
- packages[packageName] = new Package(packageName);
- if (listener != 0)
- listener->newPackage(packageName);
- }
- }
-
- broker->incOutstanding();
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
-
- uint32_t sequence = sequenceManager.reserve("startup");
- broker->encodeHeader(buffer, 'Q', sequence);
- buffer.putShortString(packageName);
- uint32_t length = 512 - buffer.available();
- buffer.reset();
- broker->connThreadBody.sendBuffer(buffer, length);
-}
-
-void SessionManager::handleCommandComplete(Broker* broker, Buffer& inBuffer, uint32_t sequence)
-{
- Mutex::ScopedLock l(lock);
- uint32_t resultCode = inBuffer.getLong();
- string resultText;
- inBuffer.getShortString(resultText);
- string context = sequenceManager.release(sequence);
- if (resultCode != 0)
- QPID_LOG(debug, "Received error in completion: " << resultCode << " " << resultText);
- if (context == "startup") {
- broker->decOutstanding();
- } else if (context == "multiget") {
- if (syncSequenceList.count(sequence) == 1) {
- syncSequenceList.erase(sequence);
- if (syncSequenceList.empty()) {
- cv.notify();
- }
- }
- }
- // TODO: Other context cases
-}
-
-void SessionManager::handleClassInd(Broker* broker, Buffer& inBuffer, uint32_t)
-{
- uint8_t kind;
- string packageName;
- string className;
- uint8_t hash[16];
-
- kind = inBuffer.getOctet();
- inBuffer.getShortString(packageName);
- inBuffer.getShortString(className);
- inBuffer.getBin128(hash);
-
- {
- Mutex::ScopedLock l(lock);
- map<string, Package*>::iterator pIter = packages.find(packageName);
- if (pIter == packages.end() || pIter->second->getClass(className, hash))
- return;
- }
-
- broker->incOutstanding();
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
-
- uint32_t sequence = sequenceManager.reserve("startup");
- broker->encodeHeader(buffer, 'S', sequence);
- buffer.putShortString(packageName);
- buffer.putShortString(className);
- buffer.putBin128(hash);
- uint32_t length = 512 - buffer.available();
- buffer.reset();
- broker->connThreadBody.sendBuffer(buffer, length);
-}
-
-void SessionManager::handleMethodResp(Broker* broker, Buffer& buffer, uint32_t sequence)
-{
- if (broker->methodObject) {
- broker->methodObject->handleMethodResp(buffer, sequence);
- }
-}
-
-void SessionManager::handleHeartbeatInd(Broker* /*broker*/, Buffer& /*inBuffer*/, uint32_t /*sequence*/)
-{
-}
-
-void SessionManager::handleEventInd(Broker* broker, Buffer& buffer, uint32_t /*sequence*/)
-{
- string packageName;
- string className;
- uint8_t hash[16];
- SchemaClass* schemaClass;
-
- buffer.getShortString(packageName);
- buffer.getShortString(className);
- buffer.getBin128(hash);
-
- {
- Mutex::ScopedLock l(lock);
- map<string, Package*>::iterator pIter = packages.find(packageName);
- if (pIter == packages.end())
- return;
- schemaClass = pIter->second->getClass(className, hash);
- if (schemaClass == 0)
- return;
- }
-
- Event event(broker, schemaClass, buffer);
-
- if (listener)
- listener->event(event);
-}
-
-void SessionManager::handleSchemaResp(Broker* broker, Buffer& inBuffer, uint32_t sequence)
-{
- uint8_t kind;
- string packageName;
- string className;
- uint8_t hash[16];
-
- kind = inBuffer.getOctet();
- inBuffer.getShortString(packageName);
- inBuffer.getShortString(className);
- inBuffer.getBin128(hash);
-
- {
- Mutex::ScopedLock l(lock);
- map<string, Package*>::iterator pIter = packages.find(packageName);
- if (pIter != packages.end() && !pIter->second->getClass(className, hash)) {
- ClassKey key(packageName, className, hash);
- SchemaClass* schemaClass(new SchemaClass(kind, key, inBuffer));
- pIter->second->addClass(className, hash, schemaClass);
- if (listener != 0) {
- listener->newClass(schemaClass->getClassKey());
- }
- }
- }
-
- sequenceManager.release(sequence);
- broker->decOutstanding();
-}
-
-void SessionManager::handleContentInd(Broker* broker, Buffer& buffer, uint32_t sequence, bool prop, bool stat)
-{
- string packageName;
- string className;
- uint8_t hash[16];
- SchemaClass* schemaClass;
-
- buffer.getShortString(packageName);
- buffer.getShortString(className);
- buffer.getBin128(hash);
-
- {
- Mutex::ScopedLock l(lock);
- map<string, Package*>::iterator pIter = packages.find(packageName);
- if (pIter == packages.end())
- return;
- schemaClass = pIter->second->getClass(className, hash);
- if (schemaClass == 0)
- return;
- }
-
- Object object(broker, schemaClass, buffer, prop, stat);
-
- if (prop && className == "agent" && packageName == "org.apache.qpid.broker")
- broker->updateAgent(object);
-
- {
- Mutex::ScopedLock l(lock);
- if (syncSequenceList.count(sequence) == 1) {
- if (!object.isDeleted())
- getResult.push_back(object);
- return;
- }
- }
-
- if (listener) {
- if (prop)
- listener->objectProps(*broker, object);
- if (stat)
- listener->objectStats(*broker, object);
- }
-}
-
-void SessionManager::handleBrokerConnect(Broker* broker)
-{
- if (listener != 0)
- listener->brokerConnected(*broker);
-}
-
-void SessionManager::handleBrokerDisconnect(Broker* broker)
-{
- if (listener != 0)
- listener->brokerDisconnected(*broker);
-}
-
diff --git a/cpp/src/qpid/console/Value.cpp b/cpp/src/qpid/console/Value.cpp
deleted file mode 100644
index 47c6a4ce57..0000000000
--- a/cpp/src/qpid/console/Value.cpp
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/console/Value.h"
-#include "qpid/framing/Buffer.h"
-
-#include <sstream>
-
-using namespace qpid;
-using namespace qpid::console;
-using namespace std;
-
-string NullValue::str() const
-{
- return "<Null>";
-}
-
-RefValue::RefValue(framing::Buffer& buffer)
-{
- uint64_t first = buffer.getLongLong();
- uint64_t second = buffer.getLongLong();
- value.setValue(first, second);
-}
-
-string RefValue::str() const
-{
- stringstream s;
- s << value;
- return s.str();
-}
-
-string UintValue::str() const
-{
- stringstream s;
- s << value;
- return s.str();
-}
-
-string IntValue::str() const
-{
- stringstream s;
- s << value;
- return s.str();
-}
-
-string Uint64Value::str() const
-{
- stringstream s;
- s << value;
- return s.str();
-}
-
-string Int64Value::str() const
-{
- stringstream s;
- s << value;
- return s.str();
-}
-
-StringValue::StringValue(framing::Buffer& buffer, int tc)
-{
- if (tc == 6)
- buffer.getShortString(value);
- else
- buffer.getMediumString(value);
-}
-
-string BoolValue::str() const
-{
- return value ? "T" : "F";
-}
-
-string FloatValue::str() const
-{
- stringstream s;
- s << value;
- return s.str();
-}
-
-string DoubleValue::str() const
-{
- stringstream s;
- s << value;
- return s.str();
-}
-
-UuidValue::UuidValue(framing::Buffer& buffer)
-{
- value.decode(buffer);
-}
-
-string MapValue::str() const
-{
- stringstream s;
- s << value;
- return s.str();
-}
-
-MapValue::MapValue(framing::Buffer& buffer)
-{
- value.decode(buffer);
-}
-
-
-Value::Ptr ValueFactory::newValue(int typeCode, framing::Buffer& buffer)
-{
- switch (typeCode) {
- case 1: return Value::Ptr(new UintValue(buffer.getOctet())); // U8
- case 2: return Value::Ptr(new UintValue(buffer.getShort())); // U16
- case 3: return Value::Ptr(new UintValue(buffer.getLong())); // U32
- case 4: return Value::Ptr(new Uint64Value(buffer.getLongLong())); // U64
- case 6: return Value::Ptr(new StringValue(buffer, 6)); // SSTR
- case 7: return Value::Ptr(new StringValue(buffer, 7)); // LSTR
- case 8: return Value::Ptr(new Int64Value(buffer.getLongLong())); // ABSTIME
- case 9: return Value::Ptr(new Uint64Value(buffer.getLongLong())); // DELTATIME
- case 10: return Value::Ptr(new RefValue(buffer)); // REF
- case 11: return Value::Ptr(new BoolValue(buffer.getOctet())); // BOOL
- case 12: return Value::Ptr(new FloatValue(buffer.getFloat())); // FLOAT
- case 13: return Value::Ptr(new DoubleValue(buffer.getDouble())); // DOUBLE
- case 14: return Value::Ptr(new UuidValue(buffer)); // UUID
- case 15: return Value::Ptr(new MapValue(buffer)); // MAP
- case 16: return Value::Ptr(new IntValue(buffer.getOctet())); // S8
- case 17: return Value::Ptr(new IntValue(buffer.getShort())); // S16
- case 18: return Value::Ptr(new IntValue(buffer.getLong())); // S32
- case 19: return Value::Ptr(new Int64Value(buffer.getLongLong())); // S64
- }
-
- return Value::Ptr();
-}
-
-void ValueFactory::encodeValue(int typeCode, Value::Ptr value, framing::Buffer& buffer)
-{
- switch (typeCode) {
- case 1: buffer.putOctet(value->asUint()); return; // U8
- case 2: buffer.putShort(value->asUint()); return; // U16
- case 3: buffer.putLong(value->asUint()); return; // U32
- case 4: buffer.putLongLong(value->asUint64()); return; // U64
- case 6: buffer.putShortString(value->asString()); return; // SSTR
- case 7: buffer.putMediumString(value->asString()); return; // LSTR
- case 8: buffer.putLongLong(value->asInt64()); return; // ABSTIME
- case 9: buffer.putLongLong(value->asUint64()); return; // DELTATIME
- case 10: value->asObjectId().encode(buffer); return; // REF
- case 11: buffer.putOctet(value->asBool() ? 1 : 0); return; // BOOL
- case 12: buffer.putFloat(value->asFloat()); return; // FLOAT
- case 13: buffer.putDouble(value->asDouble()); return; // DOUBLE
- case 14: value->asUuid().encode(buffer); return; // UUID
- case 15: value->asMap().encode(buffer); return; // MAP
- case 16: buffer.putOctet(value->asInt()); return; // S8
- case 17: buffer.putShort(value->asInt()); return; // S16
- case 18: buffer.putLong(value->asInt()); return; // S32
- case 19: buffer.putLongLong(value->asInt64()); return; // S64
- }
-}