diff options
Diffstat (limited to 'cpp/src/qpid/console')
-rw-r--r-- | cpp/src/qpid/console/Agent.cpp | 30 | ||||
-rw-r--r-- | cpp/src/qpid/console/Broker.cpp | 333 | ||||
-rw-r--r-- | cpp/src/qpid/console/ClassKey.cpp | 105 | ||||
-rw-r--r-- | cpp/src/qpid/console/Event.cpp | 205 | ||||
-rw-r--r-- | cpp/src/qpid/console/Object.cpp | 384 | ||||
-rw-r--r-- | cpp/src/qpid/console/ObjectId.cpp | 91 | ||||
-rw-r--r-- | cpp/src/qpid/console/Package.cpp | 41 | ||||
-rw-r--r-- | cpp/src/qpid/console/Schema.cpp | 165 | ||||
-rw-r--r-- | cpp/src/qpid/console/SequenceManager.cpp | 48 | ||||
-rw-r--r-- | cpp/src/qpid/console/SessionManager.cpp | 517 | ||||
-rw-r--r-- | cpp/src/qpid/console/Value.cpp | 171 |
11 files changed, 0 insertions, 2090 deletions
diff --git a/cpp/src/qpid/console/Agent.cpp b/cpp/src/qpid/console/Agent.cpp deleted file mode 100644 index fa76a13583..0000000000 --- a/cpp/src/qpid/console/Agent.cpp +++ /dev/null @@ -1,30 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/console/Agent.h" - -std::ostream& qpid::console::operator<<(std::ostream& o, const Agent& agent) -{ - o << "Agent at bank " << agent.getBrokerBank() << "." << agent.getAgentBank() << - " (" << agent.getLabel() << ")"; - return o; -} - diff --git a/cpp/src/qpid/console/Broker.cpp b/cpp/src/qpid/console/Broker.cpp deleted file mode 100644 index 86a17d4a10..0000000000 --- a/cpp/src/qpid/console/Broker.cpp +++ /dev/null @@ -1,333 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/console/Broker.h" -#include "qpid/console/Object.h" -#include "qpid/console/Value.h" -#include "qpid/console/SessionManager.h" -#include "qpid/console/ConsoleListener.h" -#include "qpid/log/Statement.h" -#include "qpid/sys/SystemInfo.h" - -using namespace qpid::client; -using namespace qpid::console; -using namespace qpid::framing; -using namespace qpid::sys; -using namespace std; - -Broker::Broker(SessionManager& sm, ConnectionSettings& settings) : - sessionManager(sm), connected(false), connectionSettings(settings), - reqsOutstanding(1), syncInFlight(false), topicBound(false), methodObject(0), - connThreadBody(*this), connThread(connThreadBody) -{ - string osName; - string nodeName; - string release; - string version; - string machine; - - sys::SystemInfo::getSystemId(osName, nodeName, release, version, machine); - uint32_t pid = sys::SystemInfo::getParentProcessId(); - - stringstream text; - - text << "qmfc-cpp-" << nodeName << "-" << pid; - amqpSessionId = string(text.str()); - - QPID_LOG(debug, "Broker::Broker: constructed, amqpSessionId=" << amqpSessionId); -} - -Broker::~Broker() -{ - connThreadBody.shutdown(); - connThread.join(); - resetAgents(); - // resetAgents() does not delete the broker agent... - for (AgentMap::iterator iter = agents.begin(); iter != agents.end(); iter++) { - delete iter->second; - } -} - -string Broker::getUrl() const -{ - stringstream url; - url << connectionSettings.host << ":" << connectionSettings.port; - return url.str(); -} - -void Broker::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) const -{ - buf.putOctet('A'); - buf.putOctet('M'); - buf.putOctet('2'); - buf.putOctet(opcode); - buf.putLong (seq); -} - -bool Broker::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) const -{ - if (buf.getSize() < 8) - return false; - - uint8_t h1 = buf.getOctet(); - uint8_t h2 = buf.getOctet(); - uint8_t h3 = buf.getOctet(); - - *opcode = buf.getOctet(); - *seq = buf.getLong(); - - return h1 == 'A' && h2 == 'M' && h3 == '2'; -} - -void Broker::received(qpid::client::Message& msg) -{ -#define QMF_HEADER_SIZE 8 - string data = msg.getData(); - Buffer inBuffer(const_cast<char*>(data.c_str()), data.size()); - uint8_t opcode; - uint32_t sequence; - - while (inBuffer.available() >= QMF_HEADER_SIZE) { - if (checkHeader(inBuffer, &opcode, &sequence)) { - QPID_LOG(trace, "Broker::received: opcode=" << opcode << " seq=" << sequence); - - if (opcode == 'b') sessionManager.handleBrokerResp(this, inBuffer, sequence); - else if (opcode == 'p') sessionManager.handlePackageInd(this, inBuffer, sequence); - else if (opcode == 'z') sessionManager.handleCommandComplete(this, inBuffer, sequence); - else if (opcode == 'q') sessionManager.handleClassInd(this, inBuffer, sequence); - else if (opcode == 'm') sessionManager.handleMethodResp(this, inBuffer, sequence); - else if (opcode == 'h') sessionManager.handleHeartbeatInd(this, inBuffer, sequence); - else if (opcode == 'e') sessionManager.handleEventInd(this, inBuffer, sequence); - else if (opcode == 's') sessionManager.handleSchemaResp(this, inBuffer, sequence); - else if (opcode == 'c') sessionManager.handleContentInd(this, inBuffer, sequence, true, false); - else if (opcode == 'i') sessionManager.handleContentInd(this, inBuffer, sequence, false, true); - else if (opcode == 'g') sessionManager.handleContentInd(this, inBuffer, sequence, true, true); - } else - return; - } -} - -void Broker::resetAgents() -{ - for (AgentMap::iterator iter = agents.begin(); iter != agents.end(); iter++) { - if (sessionManager.listener != 0) - sessionManager.listener->delAgent(*(iter->second)); - delete iter->second; - } - - agents.clear(); - agents[0x0000000100000000LL] = new Agent(this, 0, "BrokerAgent"); -} - -void Broker::updateAgent(const Object& object) -{ - uint32_t brokerBank = object.attrUint("brokerBank"); - uint32_t agentBank = object.attrUint("agentBank"); - uint64_t agentKey = ((uint64_t) brokerBank << 32) | (uint64_t) agentBank; - AgentMap::iterator iter = agents.find(agentKey); - - if (object.isDeleted()) { - if (iter != agents.end()) { - if (sessionManager.listener != 0) - sessionManager.listener->delAgent(*(iter->second)); - delete iter->second; - agents.erase(iter); - } - } else { - if (iter == agents.end()) { - Agent* agent = new Agent(this, agentBank, object.attrString("label")); - agents[agentKey] = agent; - if (sessionManager.listener != 0) - sessionManager.listener->newAgent(*agent); - } - } -} - -void Broker::ConnectionThread::run() -{ - static const int delayMin(1); - static const int delayMax(128); - static const int delayFactor(2); - int delay(delayMin); - string dest("qmfc"); - - sessionId.generate(); - queueName << "qmfc-" << sessionId; - - while (true) { - try { - broker.topicBound = false; - broker.reqsOutstanding = 1; - connection.open(broker.connectionSettings); - session = connection.newSession(queueName.str()); - subscriptions = new client::SubscriptionManager(session); - - session.queueDeclare(arg::queue=queueName.str(), arg::autoDelete=true, - arg::exclusive=true); - session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(), - arg::bindingKey=queueName.str()); - - subscriptions->setAcceptMode(ACCEPT_MODE_NONE); - subscriptions->setAcquireMode(ACQUIRE_MODE_PRE_ACQUIRED); - subscriptions->subscribe(broker, queueName.str(), dest); - subscriptions->setFlowControl(dest, FlowControl::unlimited()); - { - Mutex::ScopedLock _lock(connLock); - if (shuttingDown) - return; - operational = true; - broker.resetAgents(); - broker.connected = true; - broker.sessionManager.handleBrokerConnect(&broker); - broker.sessionManager.startProtocol(&broker); - try { - Mutex::ScopedUnlock _unlock(connLock); - subscriptions->run(); - } catch (std::exception) {} - - operational = false; - broker.connected = false; - broker.sessionManager.handleBrokerDisconnect(&broker); - } - delay = delayMin; - connection.close(); - delete subscriptions; - subscriptions = 0; - } catch (std::exception &e) { - QPID_LOG(debug, " outer exception: " << e.what()); - if (delay < delayMax) - delay *= delayFactor; - } - - { - Mutex::ScopedLock _lock(connLock); - if (shuttingDown) - return; - { - Mutex::ScopedUnlock _unlock(connLock); - ::sleep(delay); - } - if (shuttingDown) - return; - } - } -} - -Broker::ConnectionThread::~ConnectionThread() -{ - if (subscriptions != 0) { - delete subscriptions; - } -} - -void Broker::ConnectionThread::sendBuffer(Buffer& buf, uint32_t length, - const string& exchange, const string& routingKey) -{ - { - Mutex::ScopedLock _lock(connLock); - if (!operational) - return; - } - - client::Message msg; - string data; - - buf.getRawData(data, length); - msg.getDeliveryProperties().setRoutingKey(routingKey); - msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); - msg.setData(data); - try { - session.messageTransfer(arg::content=msg, arg::destination=exchange); - } catch(std::exception&) {} -} - -void Broker::ConnectionThread::bindExchange(const std::string& exchange, const std::string& key) -{ - { - Mutex::ScopedLock _lock(connLock); - if (!operational) - return; - } - - QPID_LOG(debug, "Broker::ConnectionThread::bindExchange: exchange=" << exchange << " key=" << key); - session.exchangeBind(arg::exchange=exchange, arg::queue=queueName.str(), - arg::bindingKey=key); -} - -void Broker::ConnectionThread::shutdown() -{ - { - Mutex::ScopedLock _lock(connLock); - shuttingDown = true; - } - if (subscriptions) - subscriptions->stop(); -} - -void Broker::waitForStable() -{ - Mutex::ScopedLock l(lock); - if (reqsOutstanding == 0) - return; - syncInFlight = true; - while (reqsOutstanding != 0) { - bool result = cond.wait(lock, AbsTime(now(), TIME_SEC * sessionManager.settings.getTimeout)); - if (!result) - throw(Exception("Timed out waiting for broker to synchronize")); - } -} - -void Broker::incOutstanding() -{ - Mutex::ScopedLock l(lock); - reqsOutstanding++; -} - -void Broker::decOutstanding() -{ - Mutex::ScopedLock l(lock); - reqsOutstanding--; - if (reqsOutstanding == 0) { - if (!topicBound) { - topicBound = true; - for (vector<string>::const_iterator iter = sessionManager.bindingKeyList.begin(); - iter != sessionManager.bindingKeyList.end(); iter++) - connThreadBody.bindExchange("qpid.management", *iter); - } - if (syncInFlight) { - syncInFlight = false; - cond.notify(); - } - } -} - -void Broker::appendAgents(Agent::Vector& agentlist) const -{ - for (AgentMap::const_iterator iter = agents.begin(); iter != agents.end(); iter++) { - agentlist.push_back(iter->second); - } -} - -ostream& qpid::console::operator<<(ostream& o, const Broker& k) -{ - o << "Broker: " << k.connectionSettings.host << ":" << k.connectionSettings.port; - return o; -} diff --git a/cpp/src/qpid/console/ClassKey.cpp b/cpp/src/qpid/console/ClassKey.cpp deleted file mode 100644 index 7a16113bae..0000000000 --- a/cpp/src/qpid/console/ClassKey.cpp +++ /dev/null @@ -1,105 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/console/ClassKey.h" -#include <string.h> -#include <cstdio> - -using namespace std; -using namespace qpid::console; - -ClassKey::ClassKey(const string& _package, const string& _name, const uint8_t* _hash) : - package(_package), name(_name) -{ - ::memcpy(hash, _hash, HASH_SIZE); -} - -string ClassKey::getHashString() const -{ - char cstr[36]; - ::sprintf(cstr, "%02x%02x%02x%02x-%02x%02x%02x%02x-%02x%02x%02x%02x-%02x%02x%02x%02x", - hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6], hash[7], - hash[8], hash[9], hash[10], hash[11], hash[12], hash[13], hash[14], hash[15]); - return string(cstr); -} - -string ClassKey::str() const -{ - string result(package + ":" + name + "(" + getHashString() + ")"); - return result; -} - -bool ClassKey::operator==(const ClassKey& other) const -{ - return ::memcmp(hash, other.hash, HASH_SIZE) == 0 && - name == other.name && - package == other.package; -} - -bool ClassKey::operator!=(const ClassKey& other) const -{ - return !(*this == other); -} - -bool ClassKey::operator<(const ClassKey& other) const -{ - int cmp = ::memcmp(hash, other.hash, HASH_SIZE); - if (cmp != 0) - return cmp < 0; - cmp = name.compare(other.name); - if (cmp != 0) - return cmp < 0; - return package < other.package; -} - -bool ClassKey::operator>(const ClassKey& other) const -{ - int cmp = ::memcmp(hash, other.hash, HASH_SIZE); - if (cmp != 0) - return cmp > 0; - cmp = name.compare(other.name); - if (cmp != 0) - return cmp > 0; - return package > other.package; -} - -bool ClassKey::operator<=(const ClassKey& other) const -{ - return !(*this > other); -} - -bool ClassKey::operator>=(const ClassKey& other) const -{ - return !(*this < other); -} - -void ClassKey::encode(qpid::framing::Buffer& buffer) const -{ - buffer.putShortString(package); - buffer.putShortString(name); - buffer.putBin128(const_cast<uint8_t*>(hash)); -} - -ostream& qpid::console::operator<<(ostream& o, const ClassKey& k) -{ - o << k.str(); - return o; -} diff --git a/cpp/src/qpid/console/Event.cpp b/cpp/src/qpid/console/Event.cpp deleted file mode 100644 index 3e14804b35..0000000000 --- a/cpp/src/qpid/console/Event.cpp +++ /dev/null @@ -1,205 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/console/Broker.h" -#include "qpid/console/ClassKey.h" -#include "qpid/console/Schema.h" -#include "qpid/console/Event.h" -#include "qpid/console/Value.h" -#include "qpid/sys/Time.h" -#include "qpid/framing/Buffer.h" - -using namespace qpid::console; -using namespace std; -using qpid::framing::Uuid; -using qpid::framing::FieldTable; - -Event::Event(Broker* _broker, SchemaClass* _schema, qpid::framing::Buffer& buffer) : - broker(_broker), schema(_schema) -{ - timestamp = buffer.getLongLong(); - severity = (Severity) buffer.getOctet(); - for (vector<SchemaArgument*>::const_iterator aIter = schema->arguments.begin(); - aIter != schema->arguments.end(); aIter++) { - SchemaArgument* argument = *aIter; - attributes[argument->name] = argument->decodeValue(buffer); - } -} - -const ClassKey& Event::getClassKey() const -{ - return schema->getClassKey(); -} - -string Event::getSeverityString() const -{ - switch (severity) { - case SEV_EMERGENCY : return string("EMER"); - case SEV_ALERT : return string("ALERT"); - case SEV_CRITICAL : return string("CRIT"); - case SEV_ERROR : return string("ERROR"); - case SEV_WARNING : return string("WARN"); - case SEV_NOTICE : return string("NOTIC"); - case SEV_INFO : return string("INFO"); - case SEV_DEBUG : return string("DEBUG"); - } - return string("<UNKNOWN>"); -} - -ObjectId Event::attrRef(const string& key) const -{ - Object::AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return ObjectId(); - Value::Ptr val = iter->second; - if (!val->isObjectId()) - return ObjectId(); - return val->asObjectId(); -} - -uint32_t Event::attrUint(const string& key) const -{ - Object::AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return 0; - Value::Ptr val = iter->second; - if (!val->isUint()) - return 0; - return val->asUint(); -} - -int32_t Event::attrInt(const string& key) const -{ - Object::AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return 0; - Value::Ptr val = iter->second; - if (!val->isInt()) - return 0; - return val->asInt(); -} - -uint64_t Event::attrUint64(const string& key) const -{ - Object::AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return 0; - Value::Ptr val = iter->second; - if (!val->isUint64()) - return 0; - return val->asUint64(); -} - -int64_t Event::attrInt64(const string& key) const -{ - Object::AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return 0; - Value::Ptr val = iter->second; - if (!val->isInt64()) - return 0; - return val->asInt64(); -} - -string Event::attrString(const string& key) const -{ - Object::AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return string(); - Value::Ptr val = iter->second; - if (!val->isString()) - return string(); - return val->asString(); -} - -bool Event::attrBool(const string& key) const -{ - Object::AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return false; - Value::Ptr val = iter->second; - if (!val->isBool()) - return false; - return val->asBool(); -} - -float Event::attrFloat(const string& key) const -{ - Object::AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return 0.0; - Value::Ptr val = iter->second; - if (!val->isFloat()) - return 0.0; - return val->asFloat(); -} - -double Event::attrDouble(const string& key) const -{ - Object::AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return 0.0; - Value::Ptr val = iter->second; - if (!val->isDouble()) - return 0.0; - return val->asDouble(); -} - -Uuid Event::attrUuid(const string& key) const -{ - Object::AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return Uuid(); - Value::Ptr val = iter->second; - if (!val->isUuid()) - return Uuid(); - return val->asUuid(); -} - -FieldTable Event::attrMap(const string& key) const -{ - Object::AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return FieldTable(); - Value::Ptr val = iter->second; - if (!val->isMap()) - return FieldTable(); - return val->asMap(); -} - - -std::ostream& qpid::console::operator<<(std::ostream& o, const Event& event) -{ - const ClassKey& key = event.getClassKey(); - sys::AbsTime aTime(sys::AbsTime(), sys::Duration(event.getTimestamp())); - o << aTime << " " << event.getSeverityString() << " " << - key.getPackageName() << ":" << key.getClassName() << - " broker=" << event.getBroker()->getUrl(); - - const Object::AttributeMap& attributes = event.getAttributes(); - for (Object::AttributeMap::const_iterator iter = attributes.begin(); - iter != attributes.end(); iter++) { - o << " " << iter->first << "=" << iter->second->str(); - } - return o; -} - - diff --git a/cpp/src/qpid/console/Object.cpp b/cpp/src/qpid/console/Object.cpp deleted file mode 100644 index 6570e293ab..0000000000 --- a/cpp/src/qpid/console/Object.cpp +++ /dev/null @@ -1,384 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/console/SessionManager.h" -#include "qpid/console/Broker.h" -#include "qpid/console/Object.h" -#include "qpid/console/Schema.h" -#include "qpid/console/ClassKey.h" -#include "qpid/console/Value.h" -#include "qpid/framing/Buffer.h" -#include "qpid/sys/Mutex.h" - -using namespace qpid::console; -using namespace qpid::sys; -using namespace qpid; -using namespace std; -using qpid::framing::Uuid; -using qpid::framing::FieldTable; - -void Object::AttributeMap::addRef(const string& key, const ObjectId& val) -{ - (*this)[key] = Value::Ptr(new RefValue(val)); -} - -void Object::AttributeMap::addUint(const string& key, uint32_t val) -{ - (*this)[key] = Value::Ptr(new UintValue(val)); -} - -void Object::AttributeMap::addInt(const string& key, int32_t val) -{ - (*this)[key] = Value::Ptr(new IntValue(val)); -} - -void Object::AttributeMap::addUint64(const string& key, uint64_t val) -{ - (*this)[key] = Value::Ptr(new Uint64Value(val)); -} - -void Object::AttributeMap::addInt64(const string& key, int64_t val) -{ - (*this)[key] = Value::Ptr(new Int64Value(val)); -} - -void Object::AttributeMap::addString(const string& key, const string& val) -{ - (*this)[key] = Value::Ptr(new StringValue(val)); -} - -void Object::AttributeMap::addBool(const string& key, bool val) -{ - (*this)[key] = Value::Ptr(new BoolValue(val)); -} - -void Object::AttributeMap::addFloat(const string& key, float val) -{ - (*this)[key] = Value::Ptr(new FloatValue(val)); -} - -void Object::AttributeMap::addDouble(const string& key, double val) -{ - (*this)[key] = Value::Ptr(new DoubleValue(val)); -} - -void Object::AttributeMap::addUuid(const string& key, const Uuid& val) -{ - (*this)[key] = Value::Ptr(new UuidValue(val)); -} - -void Object::AttributeMap::addMap(const string& key, const FieldTable& val) -{ - (*this)[key] = Value::Ptr(new MapValue(val)); -} - -Object::Object(Broker* b, SchemaClass* s, framing::Buffer& buffer, bool prop, bool stat) : - broker(b), schema(s), pendingMethod(0) -{ - currentTime = buffer.getLongLong(); - createTime = buffer.getLongLong(); - deleteTime = buffer.getLongLong(); - objectId.decode(buffer); - - if (prop) { - set<string> excludes; - parsePresenceMasks(buffer, excludes); - for (vector<SchemaProperty*>::const_iterator pIter = schema->properties.begin(); - pIter != schema->properties.end(); pIter++) { - SchemaProperty* property = *pIter; - if (excludes.count(property->name) != 0) { - attributes[property->name] = Value::Ptr(new NullValue()); - } else { - attributes[property->name] = property->decodeValue(buffer); - } - } - } - - if (stat) { - for (vector<SchemaStatistic*>::const_iterator sIter = schema->statistics.begin(); - sIter != schema->statistics.end(); sIter++) { - SchemaStatistic* statistic = *sIter; - attributes[statistic->name] = statistic->decodeValue(buffer); - } - } -} - -Object::~Object() {} - -const ClassKey& Object::getClassKey() const -{ - return schema->getClassKey(); -} - -string Object::getIndex() const -{ - string result; - - for (vector<SchemaProperty*>::const_iterator pIter = schema->properties.begin(); - pIter != schema->properties.end(); pIter++) { - SchemaProperty* property = *pIter; - if (property->isIndex) { - AttributeMap::const_iterator vIter = attributes.find(property->name); - if (vIter != attributes.end()) { - if (!result.empty()) - result += ":"; - result += vIter->second->str(); - } - } - } - return result; -} - -void Object::mergeUpdate(const Object& /*updated*/) -{ - // TODO -} - -void Object::invokeMethod(const string name, const AttributeMap& args, MethodResponse& result) -{ - for (vector<SchemaMethod*>::const_iterator iter = schema->methods.begin(); - iter != schema->methods.end(); iter++) { - if ((*iter)->name == name) { - SchemaMethod* method = *iter; - char rawbuffer[65536]; - framing::Buffer buffer(rawbuffer, 65536); - uint32_t sequence = broker->sessionManager.sequenceManager.reserve("method"); - pendingMethod = method; - broker->methodObject = this; - broker->encodeHeader(buffer, 'M', sequence); - objectId.encode(buffer); - schema->key.encode(buffer); - buffer.putShortString(name); - - for (vector<SchemaArgument*>::const_iterator aIter = method->arguments.begin(); - aIter != method->arguments.end(); aIter++) { - SchemaArgument* arg = *aIter; - if (arg->dirInput) { - AttributeMap::const_iterator attr = args.find(arg->name); - if (attr != args.end()) { - ValueFactory::encodeValue(arg->typeCode, attr->second, buffer); - } else { - // TODO Use the default value instead of throwing - throw Exception("Missing arguments in method call"); - } - } - } - - uint32_t length = buffer.getPosition(); - buffer.reset(); - stringstream routingKey; - routingKey << "agent." << objectId.getBrokerBank() << "." << objectId.getAgentBank(); - broker->connThreadBody.sendBuffer(buffer, length, "qpid.management", routingKey.str()); - - { - Mutex::ScopedLock l(broker->lock); - bool ok = true; - while (pendingMethod != 0 && ok) { - ok = broker->cond.wait(broker->lock, AbsTime(now(), broker->sessionManager.settings.methodTimeout * TIME_SEC)); - } - - if (!ok) { - result.code = 0x1001; - result.text.assign("Method call timed out"); - result.arguments.clear(); - } else { - result = methodResponse; - } - } - } - } -} - -void Object::handleMethodResp(framing::Buffer& buffer, uint32_t sequence) -{ - broker->sessionManager.sequenceManager.release(sequence); - methodResponse.code = buffer.getLong(); - buffer.getMediumString(methodResponse.text); - methodResponse.arguments.clear(); - - for (vector<SchemaArgument*>::const_iterator aIter = pendingMethod->arguments.begin(); - aIter != pendingMethod->arguments.end(); aIter++) { - SchemaArgument* arg = *aIter; - if (arg->dirOutput) { - methodResponse.arguments[arg->name] = arg->decodeValue(buffer); - } - } - - { - Mutex::ScopedLock l(broker->lock); - pendingMethod = 0; - broker->cond.notify(); - } -} - -ObjectId Object::attrRef(const string& key) const -{ - AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return ObjectId(); - Value::Ptr val = iter->second; - if (!val->isObjectId()) - return ObjectId(); - return val->asObjectId(); -} - -uint32_t Object::attrUint(const string& key) const -{ - AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return 0; - Value::Ptr val = iter->second; - if (!val->isUint()) - return 0; - return val->asUint(); -} - -int32_t Object::attrInt(const string& key) const -{ - AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return 0; - Value::Ptr val = iter->second; - if (!val->isInt()) - return 0; - return val->asInt(); -} - -uint64_t Object::attrUint64(const string& key) const -{ - AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return 0; - Value::Ptr val = iter->second; - if (!val->isUint64()) - return 0; - return val->asUint64(); -} - -int64_t Object::attrInt64(const string& key) const -{ - AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return 0; - Value::Ptr val = iter->second; - if (!val->isInt64()) - return 0; - return val->asInt64(); -} - -string Object::attrString(const string& key) const -{ - AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return string(); - Value::Ptr val = iter->second; - if (!val->isString()) - return string(); - return val->asString(); -} - -bool Object::attrBool(const string& key) const -{ - AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return false; - Value::Ptr val = iter->second; - if (!val->isBool()) - return false; - return val->asBool(); -} - -float Object::attrFloat(const string& key) const -{ - AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return 0.0; - Value::Ptr val = iter->second; - if (!val->isFloat()) - return 0.0; - return val->asFloat(); -} - -double Object::attrDouble(const string& key) const -{ - AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return 0.0; - Value::Ptr val = iter->second; - if (!val->isDouble()) - return 0.0; - return val->asDouble(); -} - -Uuid Object::attrUuid(const string& key) const -{ - AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return Uuid(); - Value::Ptr val = iter->second; - if (!val->isUuid()) - return Uuid(); - return val->asUuid(); -} - -FieldTable Object::attrMap(const string& key) const -{ - AttributeMap::const_iterator iter = attributes.find(key); - if (iter == attributes.end()) - return FieldTable(); - Value::Ptr val = iter->second; - if (!val->isMap()) - return FieldTable(); - return val->asMap(); -} - -void Object::parsePresenceMasks(framing::Buffer& buffer, set<string>& excludeList) -{ - excludeList.clear(); - uint8_t bit = 0; - uint8_t mask = 0; - - for (vector<SchemaProperty*>::const_iterator pIter = schema->properties.begin(); - pIter != schema->properties.end(); pIter++) { - SchemaProperty* property = *pIter; - if (property->isOptional) { - if (bit == 0) { - mask = buffer.getOctet(); - bit = 1; - } - if ((mask & bit) == 0) - excludeList.insert(property->name); - if (bit == 0x80) - bit = 0; - else - bit = bit << 1; - } - } -} - -ostream& qpid::console::operator<<(ostream& o, const Object& object) -{ - const ClassKey& key = object.getClassKey(); - o << key.getPackageName() << ":" << key.getClassName() << "[" << object.getObjectId() << "] " << - object.getIndex(); - return o; -} - diff --git a/cpp/src/qpid/console/ObjectId.cpp b/cpp/src/qpid/console/ObjectId.cpp deleted file mode 100644 index fbaad20d57..0000000000 --- a/cpp/src/qpid/console/ObjectId.cpp +++ /dev/null @@ -1,91 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/console/ObjectId.h" -#include "qpid/framing/Buffer.h" - -using namespace qpid::console; -using namespace qpid; -using namespace std; - -ObjectId::ObjectId(framing::Buffer& buffer) -{ - decode(buffer); -} - -void ObjectId::decode(framing::Buffer& buffer) -{ - first = buffer.getLongLong(); - second = buffer.getLongLong(); -} - -void ObjectId::encode(framing::Buffer& buffer) -{ - buffer.putLongLong(first); - buffer.putLongLong(second); -} - -bool ObjectId::operator==(const ObjectId& other) const -{ - return second == other.second && first == other.first; -} - -bool ObjectId::operator!=(const ObjectId& other) const -{ - return !(*this == other); -} - -bool ObjectId::operator<(const ObjectId& other) const -{ - if (first < other.first) - return true; - if (first > other.first) - return false; - return second < other.second; -} - -bool ObjectId::operator>(const ObjectId& other) const -{ - if (first > other.first) - return true; - if (first < other.first) - return false; - return second > other.second; -} - -bool ObjectId::operator<=(const ObjectId& other) const -{ - return !(*this > other); -} - -bool ObjectId::operator>=(const ObjectId& other) const -{ - return !(*this < other); -} - -ostream& qpid::console::operator<<(ostream& o, const ObjectId& id) -{ - o << (int) id.getFlags() << "-" << id.getSequence() << "-" << id.getBrokerBank() << "-" << - id.getAgentBank() << "-" << id.getObject(); - return o; -} - - diff --git a/cpp/src/qpid/console/Package.cpp b/cpp/src/qpid/console/Package.cpp deleted file mode 100644 index e5d6fa29fd..0000000000 --- a/cpp/src/qpid/console/Package.cpp +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/console/Package.h" - -using namespace qpid::console; - -SchemaClass* Package::getClass(const std::string& className, uint8_t* hash) -{ - NameHash key(className, hash); - ClassMap::iterator iter = classes.find(key); - if (iter != classes.end()) - return iter->second; - return 0; -} - -void Package::addClass(const std::string& className, uint8_t* hash, SchemaClass* schemaClass) -{ - NameHash key(className, hash); - ClassMap::iterator iter = classes.find(key); - if (iter == classes.end()) - classes[key] = schemaClass; -} diff --git a/cpp/src/qpid/console/Schema.cpp b/cpp/src/qpid/console/Schema.cpp deleted file mode 100644 index a3dbd91201..0000000000 --- a/cpp/src/qpid/console/Schema.cpp +++ /dev/null @@ -1,165 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/console/Schema.h" -#include "qpid/console/Value.h" -#include "qpid/framing/FieldTable.h" - -using namespace qpid::console; -using namespace qpid; -using std::string; -using std::vector; - -SchemaArgument::SchemaArgument(framing::Buffer& buffer, bool forMethod) -{ - framing::FieldTable map; - map.decode(buffer); - - name = map.getAsString("name"); - typeCode = map.getAsInt("type"); - unit = map.getAsString("unit"); - min = map.getAsInt("min"); - max = map.getAsInt("max"); - maxLen = map.getAsInt("maxlen"); - desc = map.getAsString("desc"); - - dirInput = false; - dirOutput = false; - if (forMethod) { - string dir(map.getAsString("dir")); - if (dir.find('I') != dir.npos || dir.find('i') != dir.npos) - dirInput = true; - if (dir.find('O') != dir.npos || dir.find('o') != dir.npos) - dirOutput = true; - } -} - -Value::Ptr SchemaArgument::decodeValue(framing::Buffer& buffer) -{ - return ValueFactory::newValue(typeCode, buffer); -} - -SchemaProperty::SchemaProperty(framing::Buffer& buffer) -{ - framing::FieldTable map; - map.decode(buffer); - - name = map.getAsString("name"); - typeCode = map.getAsInt("type"); - accessCode = map.getAsInt("access"); - isIndex = map.getAsInt("index") != 0; - isOptional = map.getAsInt("optional") != 0; - unit = map.getAsString("unit"); - min = map.getAsInt("min"); - max = map.getAsInt("max"); - maxLen = map.getAsInt("maxlen"); - desc = map.getAsString("desc"); -} - -Value::Ptr SchemaProperty::decodeValue(framing::Buffer& buffer) -{ - return ValueFactory::newValue(typeCode, buffer); -} - -SchemaStatistic::SchemaStatistic(framing::Buffer& buffer) -{ - framing::FieldTable map; - map.decode(buffer); - - name = map.getAsString("name"); - typeCode = map.getAsInt("type"); - unit = map.getAsString("unit"); - desc = map.getAsString("desc"); -} - -Value::Ptr SchemaStatistic::decodeValue(framing::Buffer& buffer) -{ - return ValueFactory::newValue(typeCode, buffer); -} - -SchemaMethod::SchemaMethod(framing::Buffer& buffer) -{ - framing::FieldTable map; - map.decode(buffer); - - name = map.getAsString("name"); - desc = map.getAsString("desc"); - int argCount = map.getAsInt("argCount"); - - for (int i = 0; i < argCount; i++) - arguments.push_back(new SchemaArgument(buffer, true)); -} - -SchemaMethod::~SchemaMethod() -{ - for (vector<SchemaArgument*>::iterator iter = arguments.begin(); - iter != arguments.end(); iter++) - delete *iter; -} - -SchemaClass::SchemaClass(const uint8_t _kind, const ClassKey& _key, framing::Buffer& buffer) : - kind(_kind), key(_key) -{ - if (kind == KIND_TABLE) { - uint8_t hasSupertype = 0; //buffer.getOctet(); - uint16_t propCount = buffer.getShort(); - uint16_t statCount = buffer.getShort(); - uint16_t methodCount = buffer.getShort(); - - if (hasSupertype) { - string unused; - buffer.getShortString(unused); - buffer.getShortString(unused); - buffer.getLongLong(); - buffer.getLongLong(); - } - - for (uint16_t idx = 0; idx < propCount; idx++) - properties.push_back(new SchemaProperty(buffer)); - for (uint16_t idx = 0; idx < statCount; idx++) - statistics.push_back(new SchemaStatistic(buffer)); - for (uint16_t idx = 0; idx < methodCount; idx++) - methods.push_back(new SchemaMethod(buffer)); - - } else if (kind == KIND_EVENT) { - uint16_t argCount = buffer.getShort(); - - for (uint16_t idx = 0; idx < argCount; idx++) - arguments.push_back(new SchemaArgument(buffer)); - } -} - -SchemaClass::~SchemaClass() -{ - for (vector<SchemaProperty*>::iterator iter = properties.begin(); - iter != properties.end(); iter++) - delete *iter; - for (vector<SchemaStatistic*>::iterator iter = statistics.begin(); - iter != statistics.end(); iter++) - delete *iter; - for (vector<SchemaMethod*>::iterator iter = methods.begin(); - iter != methods.end(); iter++) - delete *iter; - for (vector<SchemaArgument*>::iterator iter = arguments.begin(); - iter != arguments.end(); iter++) - delete *iter; -} - diff --git a/cpp/src/qpid/console/SequenceManager.cpp b/cpp/src/qpid/console/SequenceManager.cpp deleted file mode 100644 index 86ea829749..0000000000 --- a/cpp/src/qpid/console/SequenceManager.cpp +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/console/SequenceManager.h" - -using namespace qpid::console; -using namespace qpid::sys; -using std::string; -using std::cout; -using std::endl; - -uint32_t SequenceManager::reserve(const std::string& context) -{ - Mutex::ScopedLock l(lock); - uint32_t result = sequence++; - pending[result] = context; - return result; -} - -std::string SequenceManager::release(uint32_t seq) -{ - Mutex::ScopedLock l(lock); - std::map<uint32_t, string>::iterator iter = pending.find(seq); - if (iter == pending.end()) - return string(); - string result(iter->second); - pending.erase(iter); - return result; -} - diff --git a/cpp/src/qpid/console/SessionManager.cpp b/cpp/src/qpid/console/SessionManager.cpp deleted file mode 100644 index 80c5959417..0000000000 --- a/cpp/src/qpid/console/SessionManager.cpp +++ /dev/null @@ -1,517 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/console/SessionManager.h" -#include "qpid/console/Schema.h" -#include "qpid/console/Agent.h" -#include "qpid/console/ConsoleListener.h" -#include "qpid/log/Statement.h" -#include "qpid/sys/Time.h" -#include "qpid/framing/Buffer.h" -#include "qpid/framing/Uuid.h" -#include "qpid/framing/FieldTable.h" - -using namespace qpid::console; -using namespace qpid::sys; -using namespace qpid; -using namespace std; -using qpid::framing::Buffer; -using qpid::framing::FieldTable; - -SessionManager::SessionManager(ConsoleListener* _listener, Settings _settings) : - listener(_listener), settings(_settings) -{ - bindingKeys(); -} - -SessionManager::~SessionManager() -{ - for (vector<Broker*>::iterator iter = brokers.begin(); - iter != brokers.end(); iter++) - delete *iter; - - for (map<string, Package*>::iterator iter = packages.begin(); - iter != packages.end(); iter++) { - for (Package::ClassMap::iterator citer = iter->second->classes.begin(); - citer != iter->second->classes.end(); - citer++) - delete citer->second; - delete iter->second; - } -} - -Broker* SessionManager::addBroker(client::ConnectionSettings& settings) -{ - Broker* broker(new Broker(*this, settings)); - { - Mutex::ScopedLock l(brokerListLock); - brokers.push_back(broker); - } - return broker; -} - -void SessionManager::delBroker(Broker* broker) -{ - Mutex::ScopedLock l(brokerListLock); - for (vector<Broker*>::iterator iter = brokers.begin(); - iter != brokers.end(); iter++) - if (*iter == broker) { - brokers.erase(iter); - delete broker; - return; - } -} - -void SessionManager::getPackages(NameVector& packageNames) -{ - allBrokersStable(); - packageNames.clear(); - { - Mutex::ScopedLock l(lock); - for (map<string, Package*>::iterator iter = packages.begin(); - iter != packages.end(); iter++) - packageNames.push_back(iter->first); - } -} - -void SessionManager::getClasses(KeyVector& classKeys, const std::string& packageName) -{ - allBrokersStable(); - classKeys.clear(); - map<string, Package*>::iterator iter = packages.find(packageName); - if (iter == packages.end()) - return; - - Package& package = *(iter->second); - for (Package::ClassMap::const_iterator piter = package.classes.begin(); - piter != package.classes.end(); piter++) { - ClassKey key(piter->second->getClassKey()); - classKeys.push_back(key); - } -} - -SchemaClass& SessionManager::getSchema(const ClassKey& classKey) -{ - allBrokersStable(); - map<string, Package*>::iterator iter = packages.find(classKey.getPackageName()); - if (iter == packages.end()) - throw Exception("Unknown package"); - - Package& package = *(iter->second); - Package::NameHash key(classKey.getClassName(), classKey.getHash()); - Package::ClassMap::iterator cIter = package.classes.find(key); - if (cIter == package.classes.end()) - throw Exception("Unknown class"); - - return *(cIter->second); -} - -void SessionManager::bindPackage(const std::string& packageName) -{ - stringstream key; - key << "console.obj.*.*." << packageName << ".#"; - bindingKeyList.push_back(key.str()); - for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++) - (*iter)->addBinding(key.str()); -} - -void SessionManager::bindClass(const ClassKey& classKey) -{ - bindClass(classKey.getPackageName(), classKey.getClassName()); -} - -void SessionManager::bindClass(const std::string& packageName, const std::string& className) -{ - stringstream key; - key << "console.obj.*.*." << packageName << "." << className << ".#"; - bindingKeyList.push_back(key.str()); - for (vector<Broker*>::iterator iter = brokers.begin(); - iter != brokers.end(); iter++) - (*iter)->addBinding(key.str()); -} - - -void SessionManager::bindEvent(const ClassKey& classKey) -{ - bindEvent(classKey.getPackageName(), classKey.getClassName()); -} - - -void SessionManager::bindEvent(const std::string& packageName, const std::string& eventName) -{ - if (!settings.userBindings) throw Exception("Session not configured for userBindings."); - if (settings.rcvEvents) throw Exception("Session already configured to receive all events."); - - stringstream key; - key << "console.event.*.*." << packageName; - if (eventName.length()) { - key << "." << eventName << ".#"; - } else { - key << ".#"; - } - - bindingKeyList.push_back(key.str()); - for (vector<Broker*>::iterator iter = brokers.begin(); - iter != brokers.end(); iter++) - (*iter)->addBinding(key.str()); -} - - -void SessionManager::getAgents(Agent::Vector& agents, Broker* broker) -{ - agents.clear(); - if (broker != 0) { - broker->appendAgents(agents); - } else { - for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++) { - (*iter)->appendAgents(agents); - } - } -} - -void SessionManager::getObjects(Object::Vector& objects, const std::string& className, - Broker* _broker, Agent* _agent) -{ - Agent::Vector agentList; - - if (_agent != 0) { - agentList.push_back(_agent); - _agent->getBroker()->waitForStable(); - } else { - if (_broker != 0) { - _broker->appendAgents(agentList); - _broker->waitForStable(); - } else { - allBrokersStable(); - Mutex::ScopedLock _lock(brokerListLock); - for (vector<Broker*>::iterator iter = brokers.begin(); iter != brokers.end(); iter++) { - (*iter)->appendAgents(agentList); - } - } - } - - FieldTable ft; - uint32_t sequence; - ft.setString("_class", className); - - getResult.clear(); - syncSequenceList.clear(); - error = string(); - - if (agentList.empty()) { - objects = getResult; - return; - } - - for (Agent::Vector::iterator iter = agentList.begin(); iter != agentList.end(); iter++) { - Agent* agent = *iter; - char rawbuffer[512]; - Buffer buffer(rawbuffer, 512); - stringstream routingKey; - routingKey << "agent." << agent->getBrokerBank() << "." << agent->getAgentBank(); - { - Mutex::ScopedLock _lock(lock); - sequence = sequenceManager.reserve("multiget"); - syncSequenceList.insert(sequence); - } - agent->getBroker()->encodeHeader(buffer, 'G', sequence); - ft.encode(buffer); - uint32_t length = buffer.getPosition(); - buffer.reset(); - agent->getBroker()->connThreadBody.sendBuffer(buffer, length, "qpid.management", routingKey.str()); - } - - { - Mutex::ScopedLock _lock(lock); - sys::AbsTime startTime = sys::now(); - while (!syncSequenceList.empty() && error.empty()) { - cv.wait(lock, AbsTime(now(), settings.getTimeout * TIME_SEC)); - sys::AbsTime currTime = sys::now(); - if (sys::Duration(startTime, currTime) > settings.getTimeout * TIME_SEC) - break; - } - } - - objects = getResult; -} - -void SessionManager::bindingKeys() -{ - bindingKeyList.push_back("schema.#"); - if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) { - bindingKeyList.push_back("console.#"); - } else { - if (settings.rcvObjects && !settings.userBindings) - bindingKeyList.push_back("console.obj.#"); - else - bindingKeyList.push_back("console.obj.*.*.org.apache.qpid.broker.agent"); - if (settings.rcvEvents) - bindingKeyList.push_back("console.event.#"); - if (settings.rcvHeartbeats) - bindingKeyList.push_back("console.heartbeat"); - } -} - -void SessionManager::allBrokersStable() -{ - Mutex::ScopedLock l(brokerListLock); - for (vector<Broker*>::iterator iter = brokers.begin(); - iter != brokers.end(); iter++) - if ((*iter)->isConnected()) - (*iter)->waitForStable(); -} - -void SessionManager::startProtocol(Broker* broker) -{ - char rawbuffer[512]; - Buffer buffer(rawbuffer, 512); - - broker->encodeHeader(buffer, 'B'); - uint32_t length = 512 - buffer.available(); - buffer.reset(); - broker->connThreadBody.sendBuffer(buffer, length); -} - - -void SessionManager::handleBrokerResp(Broker* broker, Buffer& inBuffer, uint32_t) -{ - framing::Uuid brokerId; - - brokerId.decode(inBuffer); - broker->setBrokerId(brokerId); - - char rawbuffer[512]; - Buffer buffer(rawbuffer, 512); - - uint32_t sequence = sequenceManager.reserve("startup"); - broker->encodeHeader(buffer, 'P', sequence); - uint32_t length = 512 - buffer.available(); - buffer.reset(); - broker->connThreadBody.sendBuffer(buffer, length); - - if (listener != 0) { - listener->brokerInfo(*broker); - } -} - -void SessionManager::handlePackageInd(Broker* broker, Buffer& inBuffer, uint32_t) -{ - string packageName; - inBuffer.getShortString(packageName); - - { - Mutex::ScopedLock l(lock); - map<string, Package*>::iterator iter = packages.find(packageName); - if (iter == packages.end()) { - packages[packageName] = new Package(packageName); - if (listener != 0) - listener->newPackage(packageName); - } - } - - broker->incOutstanding(); - char rawbuffer[512]; - Buffer buffer(rawbuffer, 512); - - uint32_t sequence = sequenceManager.reserve("startup"); - broker->encodeHeader(buffer, 'Q', sequence); - buffer.putShortString(packageName); - uint32_t length = 512 - buffer.available(); - buffer.reset(); - broker->connThreadBody.sendBuffer(buffer, length); -} - -void SessionManager::handleCommandComplete(Broker* broker, Buffer& inBuffer, uint32_t sequence) -{ - Mutex::ScopedLock l(lock); - uint32_t resultCode = inBuffer.getLong(); - string resultText; - inBuffer.getShortString(resultText); - string context = sequenceManager.release(sequence); - if (resultCode != 0) - QPID_LOG(debug, "Received error in completion: " << resultCode << " " << resultText); - if (context == "startup") { - broker->decOutstanding(); - } else if (context == "multiget") { - if (syncSequenceList.count(sequence) == 1) { - syncSequenceList.erase(sequence); - if (syncSequenceList.empty()) { - cv.notify(); - } - } - } - // TODO: Other context cases -} - -void SessionManager::handleClassInd(Broker* broker, Buffer& inBuffer, uint32_t) -{ - uint8_t kind; - string packageName; - string className; - uint8_t hash[16]; - - kind = inBuffer.getOctet(); - inBuffer.getShortString(packageName); - inBuffer.getShortString(className); - inBuffer.getBin128(hash); - - { - Mutex::ScopedLock l(lock); - map<string, Package*>::iterator pIter = packages.find(packageName); - if (pIter == packages.end() || pIter->second->getClass(className, hash)) - return; - } - - broker->incOutstanding(); - char rawbuffer[512]; - Buffer buffer(rawbuffer, 512); - - uint32_t sequence = sequenceManager.reserve("startup"); - broker->encodeHeader(buffer, 'S', sequence); - buffer.putShortString(packageName); - buffer.putShortString(className); - buffer.putBin128(hash); - uint32_t length = 512 - buffer.available(); - buffer.reset(); - broker->connThreadBody.sendBuffer(buffer, length); -} - -void SessionManager::handleMethodResp(Broker* broker, Buffer& buffer, uint32_t sequence) -{ - if (broker->methodObject) { - broker->methodObject->handleMethodResp(buffer, sequence); - } -} - -void SessionManager::handleHeartbeatInd(Broker* /*broker*/, Buffer& /*inBuffer*/, uint32_t /*sequence*/) -{ -} - -void SessionManager::handleEventInd(Broker* broker, Buffer& buffer, uint32_t /*sequence*/) -{ - string packageName; - string className; - uint8_t hash[16]; - SchemaClass* schemaClass; - - buffer.getShortString(packageName); - buffer.getShortString(className); - buffer.getBin128(hash); - - { - Mutex::ScopedLock l(lock); - map<string, Package*>::iterator pIter = packages.find(packageName); - if (pIter == packages.end()) - return; - schemaClass = pIter->second->getClass(className, hash); - if (schemaClass == 0) - return; - } - - Event event(broker, schemaClass, buffer); - - if (listener) - listener->event(event); -} - -void SessionManager::handleSchemaResp(Broker* broker, Buffer& inBuffer, uint32_t sequence) -{ - uint8_t kind; - string packageName; - string className; - uint8_t hash[16]; - - kind = inBuffer.getOctet(); - inBuffer.getShortString(packageName); - inBuffer.getShortString(className); - inBuffer.getBin128(hash); - - { - Mutex::ScopedLock l(lock); - map<string, Package*>::iterator pIter = packages.find(packageName); - if (pIter != packages.end() && !pIter->second->getClass(className, hash)) { - ClassKey key(packageName, className, hash); - SchemaClass* schemaClass(new SchemaClass(kind, key, inBuffer)); - pIter->second->addClass(className, hash, schemaClass); - if (listener != 0) { - listener->newClass(schemaClass->getClassKey()); - } - } - } - - sequenceManager.release(sequence); - broker->decOutstanding(); -} - -void SessionManager::handleContentInd(Broker* broker, Buffer& buffer, uint32_t sequence, bool prop, bool stat) -{ - string packageName; - string className; - uint8_t hash[16]; - SchemaClass* schemaClass; - - buffer.getShortString(packageName); - buffer.getShortString(className); - buffer.getBin128(hash); - - { - Mutex::ScopedLock l(lock); - map<string, Package*>::iterator pIter = packages.find(packageName); - if (pIter == packages.end()) - return; - schemaClass = pIter->second->getClass(className, hash); - if (schemaClass == 0) - return; - } - - Object object(broker, schemaClass, buffer, prop, stat); - - if (prop && className == "agent" && packageName == "org.apache.qpid.broker") - broker->updateAgent(object); - - { - Mutex::ScopedLock l(lock); - if (syncSequenceList.count(sequence) == 1) { - if (!object.isDeleted()) - getResult.push_back(object); - return; - } - } - - if (listener) { - if (prop) - listener->objectProps(*broker, object); - if (stat) - listener->objectStats(*broker, object); - } -} - -void SessionManager::handleBrokerConnect(Broker* broker) -{ - if (listener != 0) - listener->brokerConnected(*broker); -} - -void SessionManager::handleBrokerDisconnect(Broker* broker) -{ - if (listener != 0) - listener->brokerDisconnected(*broker); -} - diff --git a/cpp/src/qpid/console/Value.cpp b/cpp/src/qpid/console/Value.cpp deleted file mode 100644 index 47c6a4ce57..0000000000 --- a/cpp/src/qpid/console/Value.cpp +++ /dev/null @@ -1,171 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/console/Value.h" -#include "qpid/framing/Buffer.h" - -#include <sstream> - -using namespace qpid; -using namespace qpid::console; -using namespace std; - -string NullValue::str() const -{ - return "<Null>"; -} - -RefValue::RefValue(framing::Buffer& buffer) -{ - uint64_t first = buffer.getLongLong(); - uint64_t second = buffer.getLongLong(); - value.setValue(first, second); -} - -string RefValue::str() const -{ - stringstream s; - s << value; - return s.str(); -} - -string UintValue::str() const -{ - stringstream s; - s << value; - return s.str(); -} - -string IntValue::str() const -{ - stringstream s; - s << value; - return s.str(); -} - -string Uint64Value::str() const -{ - stringstream s; - s << value; - return s.str(); -} - -string Int64Value::str() const -{ - stringstream s; - s << value; - return s.str(); -} - -StringValue::StringValue(framing::Buffer& buffer, int tc) -{ - if (tc == 6) - buffer.getShortString(value); - else - buffer.getMediumString(value); -} - -string BoolValue::str() const -{ - return value ? "T" : "F"; -} - -string FloatValue::str() const -{ - stringstream s; - s << value; - return s.str(); -} - -string DoubleValue::str() const -{ - stringstream s; - s << value; - return s.str(); -} - -UuidValue::UuidValue(framing::Buffer& buffer) -{ - value.decode(buffer); -} - -string MapValue::str() const -{ - stringstream s; - s << value; - return s.str(); -} - -MapValue::MapValue(framing::Buffer& buffer) -{ - value.decode(buffer); -} - - -Value::Ptr ValueFactory::newValue(int typeCode, framing::Buffer& buffer) -{ - switch (typeCode) { - case 1: return Value::Ptr(new UintValue(buffer.getOctet())); // U8 - case 2: return Value::Ptr(new UintValue(buffer.getShort())); // U16 - case 3: return Value::Ptr(new UintValue(buffer.getLong())); // U32 - case 4: return Value::Ptr(new Uint64Value(buffer.getLongLong())); // U64 - case 6: return Value::Ptr(new StringValue(buffer, 6)); // SSTR - case 7: return Value::Ptr(new StringValue(buffer, 7)); // LSTR - case 8: return Value::Ptr(new Int64Value(buffer.getLongLong())); // ABSTIME - case 9: return Value::Ptr(new Uint64Value(buffer.getLongLong())); // DELTATIME - case 10: return Value::Ptr(new RefValue(buffer)); // REF - case 11: return Value::Ptr(new BoolValue(buffer.getOctet())); // BOOL - case 12: return Value::Ptr(new FloatValue(buffer.getFloat())); // FLOAT - case 13: return Value::Ptr(new DoubleValue(buffer.getDouble())); // DOUBLE - case 14: return Value::Ptr(new UuidValue(buffer)); // UUID - case 15: return Value::Ptr(new MapValue(buffer)); // MAP - case 16: return Value::Ptr(new IntValue(buffer.getOctet())); // S8 - case 17: return Value::Ptr(new IntValue(buffer.getShort())); // S16 - case 18: return Value::Ptr(new IntValue(buffer.getLong())); // S32 - case 19: return Value::Ptr(new Int64Value(buffer.getLongLong())); // S64 - } - - return Value::Ptr(); -} - -void ValueFactory::encodeValue(int typeCode, Value::Ptr value, framing::Buffer& buffer) -{ - switch (typeCode) { - case 1: buffer.putOctet(value->asUint()); return; // U8 - case 2: buffer.putShort(value->asUint()); return; // U16 - case 3: buffer.putLong(value->asUint()); return; // U32 - case 4: buffer.putLongLong(value->asUint64()); return; // U64 - case 6: buffer.putShortString(value->asString()); return; // SSTR - case 7: buffer.putMediumString(value->asString()); return; // LSTR - case 8: buffer.putLongLong(value->asInt64()); return; // ABSTIME - case 9: buffer.putLongLong(value->asUint64()); return; // DELTATIME - case 10: value->asObjectId().encode(buffer); return; // REF - case 11: buffer.putOctet(value->asBool() ? 1 : 0); return; // BOOL - case 12: buffer.putFloat(value->asFloat()); return; // FLOAT - case 13: buffer.putDouble(value->asDouble()); return; // DOUBLE - case 14: value->asUuid().encode(buffer); return; // UUID - case 15: value->asMap().encode(buffer); return; // MAP - case 16: buffer.putOctet(value->asInt()); return; // S8 - case 17: buffer.putShort(value->asInt()); return; // S16 - case 18: buffer.putLong(value->asInt()); return; // S32 - case 19: buffer.putLongLong(value->asInt64()); return; // S64 - } -} |