diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /cpp/src/qmf/engine/BrokerProxyImpl.cpp | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qmf/engine/BrokerProxyImpl.cpp')
-rw-r--r-- | cpp/src/qmf/engine/BrokerProxyImpl.cpp | 827 |
1 files changed, 0 insertions, 827 deletions
diff --git a/cpp/src/qmf/engine/BrokerProxyImpl.cpp b/cpp/src/qmf/engine/BrokerProxyImpl.cpp deleted file mode 100644 index 5fc71979fd..0000000000 --- a/cpp/src/qmf/engine/BrokerProxyImpl.cpp +++ /dev/null @@ -1,827 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qmf/engine/BrokerProxyImpl.h" -#include "qmf/engine/ConsoleImpl.h" -#include "qmf/engine/Protocol.h" -#include "qpid/Address.h" -#include "qpid/sys/SystemInfo.h" -#include <qpid/log/Statement.h> -#include <qpid/StringUtils.h> -#include <string.h> -#include <iostream> -#include <fstream> - -using namespace std; -using namespace qmf::engine; -using namespace qpid::framing; -using namespace qpid::sys; - -namespace { - const char* QMF_EXCHANGE = "qpid.management"; - const char* DIR_EXCHANGE = "amq.direct"; - const char* BROKER_KEY = "broker"; - const char* BROKER_PACKAGE = "org.apache.qpid.broker"; - const char* AGENT_CLASS = "agent"; - const char* BROKER_AGENT_KEY = "agent.1.0"; -} - -const Object* QueryResponseImpl::getObject(uint32_t idx) const -{ - vector<ObjectPtr>::const_iterator iter = results.begin(); - - while (idx > 0) { - if (iter == results.end()) - return 0; - iter++; - idx--; - } - - return iter->get(); -} - -#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} - -BrokerEvent BrokerEventImpl::copy() -{ - BrokerEvent item; - - ::memset(&item, 0, sizeof(BrokerEvent)); - item.kind = kind; - - STRING_REF(name); - STRING_REF(exchange); - STRING_REF(bindingKey); - item.context = context; - item.queryResponse = queryResponse.get(); - item.methodResponse = methodResponse.get(); - - return item; -} - -BrokerProxyImpl::BrokerProxyImpl(BrokerProxy& pub, Console& _console) : publicObject(pub), console(_console) -{ - stringstream qn; - qpid::Address addr; - - SystemInfo::getLocalHostname(addr); - qn << "qmfc-" << SystemInfo::getProcessName() << "-" << addr << "-" << SystemInfo::getProcessId(); - queueName = qn.str(); - - seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this))); -} - -void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/) -{ - Mutex::ScopedLock _lock(lock); - agentList.clear(); - eventQueue.clear(); - xmtQueue.clear(); - eventQueue.push_back(eventDeclareQueue(queueName)); - eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName)); - eventQueue.push_back(eventSetupComplete()); - - // TODO: Store session handle -} - -void BrokerProxyImpl::sessionClosed() -{ - Mutex::ScopedLock _lock(lock); - agentList.clear(); - eventQueue.clear(); - xmtQueue.clear(); -} - -void BrokerProxyImpl::startProtocol() -{ - AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker")); - { - Mutex::ScopedLock _lock(lock); - char rawbuffer[512]; - Buffer buffer(rawbuffer, 512); - - agentList[0] = agent; - - requestsOutstanding = 1; - topicBound = false; - uint32_t sequence(seqMgr.reserve()); - Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence); - sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence); - } - - console.impl->eventAgentAdded(agent); -} - -void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey) -{ - uint32_t length = buf.getPosition(); - MessageImpl::Ptr message(new MessageImpl); - - buf.reset(); - buf.getRawData(message->body, length); - message->destination = destination; - message->routingKey = routingKey; - message->replyExchange = DIR_EXCHANGE; - message->replyKey = queueName; - - xmtQueue.push_back(message); -} - -void BrokerProxyImpl::handleRcvMessage(Message& message) -{ - Buffer inBuffer(message.body, message.length); - uint8_t opcode; - uint32_t sequence; - - while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) - seqMgr.dispatch(opcode, sequence, message.routingKey ? string(message.routingKey) : string(), inBuffer); -} - -bool BrokerProxyImpl::getXmtMessage(Message& item) const -{ - Mutex::ScopedLock _lock(lock); - if (xmtQueue.empty()) - return false; - item = xmtQueue.front()->copy(); - return true; -} - -void BrokerProxyImpl::popXmt() -{ - Mutex::ScopedLock _lock(lock); - if (!xmtQueue.empty()) - xmtQueue.pop_front(); -} - -bool BrokerProxyImpl::getEvent(BrokerEvent& event) const -{ - Mutex::ScopedLock _lock(lock); - if (eventQueue.empty()) - return false; - event = eventQueue.front()->copy(); - return true; -} - -void BrokerProxyImpl::popEvent() -{ - Mutex::ScopedLock _lock(lock); - if (!eventQueue.empty()) - eventQueue.pop_front(); -} - -uint32_t BrokerProxyImpl::agentCount() const -{ - Mutex::ScopedLock _lock(lock); - return agentList.size(); -} - -const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const -{ - Mutex::ScopedLock _lock(lock); - for (map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.begin(); - iter != agentList.end(); iter++) - if (idx-- == 0) - return iter->second.get(); - return 0; -} - -void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentProxy* agent) -{ - SequenceContext::Ptr queryContext(new QueryContext(*this, context)); - Mutex::ScopedLock _lock(lock); - bool sent = false; - if (agent != 0) { - if (sendGetRequestLH(queryContext, query, agent)) - sent = true; - } else { - // TODO (optimization) only send queries to agents that have the requested class+package - for (map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.begin(); - iter != agentList.end(); iter++) { - if (sendGetRequestLH(queryContext, query, iter->second.get())) - sent = true; - } - } - - if (!sent) { - queryContext->reserve(); - queryContext->release(); - } -} - -bool BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent) -{ - if (query.impl->singleAgent()) { - if (query.impl->agentBank() != agent->getAgentBank()) - return false; - } - stringstream key; - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve(queryContext)); - agent->impl->addSequence(sequence); - - Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence); - query.impl->encode(outBuffer); - key << "agent.1." << agent->impl->agentBank; - sendBufferLH(outBuffer, QMF_EXCHANGE, key.str()); - QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str()); - return true; -} - -string BrokerProxyImpl::encodeMethodArguments(const SchemaMethod* schema, const Value* argmap, Buffer& buffer) -{ - int argCount = schema->getArgumentCount(); - - if (argmap == 0 || !argmap->isMap()) - return string("Arguments must be in a map value"); - - for (int aIdx = 0; aIdx < argCount; aIdx++) { - const SchemaArgument* arg(schema->getArgument(aIdx)); - if (arg->getDirection() == DIR_IN || arg->getDirection() == DIR_IN_OUT) { - if (argmap->keyInMap(arg->getName())) { - const Value* argVal(argmap->byKey(arg->getName())); - if (argVal->getType() != arg->getType()) - return string("Argument is the wrong type: ") + arg->getName(); - argVal->impl->encode(buffer); - } else { - Value defaultValue(arg->getType()); - defaultValue.impl->encode(buffer); - } - } - } - - return string(); -} - -string BrokerProxyImpl::encodedSizeMethodArguments(const SchemaMethod* schema, const Value* argmap, uint32_t& size) -{ - int argCount = schema->getArgumentCount(); - - if (argmap == 0 || !argmap->isMap()) - return string("Arguments must be in a map value"); - - for (int aIdx = 0; aIdx < argCount; aIdx++) { - const SchemaArgument* arg(schema->getArgument(aIdx)); - if (arg->getDirection() == DIR_IN || arg->getDirection() == DIR_IN_OUT) { - if (argmap->keyInMap(arg->getName())) { - const Value* argVal(argmap->byKey(arg->getName())); - if (argVal->getType() != arg->getType()) - return string("Argument is the wrong type: ") + arg->getName(); - size += argVal->impl->encodedSize(); - } else { - Value defaultValue(arg->getType()); - size += defaultValue.impl->encodedSize(); - } - } - } - - return string(); -} - -void BrokerProxyImpl::sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, - const string& methodName, const Value* args, void* userContext) -{ - int methodCount = cls->getMethodCount(); - int idx; - for (idx = 0; idx < methodCount; idx++) { - const SchemaMethod* method = cls->getMethod(idx); - if (string(method->getName()) == methodName) { - Mutex::ScopedLock _lock(lock); - SequenceContext::Ptr methodContext(new MethodContext(*this, userContext, method)); - stringstream key; - char* buf(outputBuffer); - uint32_t bufLen(1024); - bool allocated(false); - - string argErrorString = encodedSizeMethodArguments(method, args, bufLen); - if (!argErrorString.empty()) { - MethodResponsePtr argError(MethodResponseImpl::factory(1, argErrorString)); - eventQueue.push_back(eventMethodResponse(userContext, argError)); - return; - } - - if (bufLen > MA_BUFFER_SIZE) { - buf = (char*) malloc(bufLen); - allocated = true; - } - - Buffer outBuffer(buf, bufLen); - uint32_t sequence(seqMgr.reserve(methodContext)); - - Protocol::encodeHeader(outBuffer, Protocol::OP_METHOD_REQUEST, sequence); - oid->impl->encode(outBuffer); - cls->getClassKey()->impl->encode(outBuffer); - outBuffer.putShortString(methodName); - - encodeMethodArguments(method, args, outBuffer); - key << "agent.1." << oid->impl->getAgentBank(); - sendBufferLH(outBuffer, QMF_EXCHANGE, key.str()); - QPID_LOG(trace, "SENT MethodRequest seq=" << sequence << " method=" << methodName << " key=" << key.str()); - - if (allocated) - free(buf); - - return; - } - } - - MethodResponsePtr error(MethodResponseImpl::factory(1, string("Unknown method: ") + methodName)); - Mutex::ScopedLock _lock(lock); - eventQueue.push_back(eventMethodResponse(userContext, error)); -} - -void BrokerProxyImpl::addBinding(const string& exchange, const string& key) -{ - Mutex::ScopedLock _lock(lock); - eventQueue.push_back(eventBind(exchange, queueName, key)); -} - -BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName) -{ - BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE)); - event->name = queueName; - return event; -} - -BrokerEventImpl::Ptr BrokerProxyImpl::eventBind(const string& exchange, const string& queue, const string& key) -{ - BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::BIND)); - event->name = queue; - event->exchange = exchange; - event->bindingKey = key; - - return event; -} - -BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete() -{ - BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::SETUP_COMPLETE)); - return event; -} - -BrokerEventImpl::Ptr BrokerProxyImpl::eventStable() -{ - QPID_LOG(trace, "Console Link to Broker Stable"); - BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::STABLE)); - return event; -} - -BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponsePtr response) -{ - BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE)); - event->context = context; - event->queryResponse = response; - return event; -} - -BrokerEventImpl::Ptr BrokerProxyImpl::eventMethodResponse(void* context, MethodResponsePtr response) -{ - BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::METHOD_RESPONSE)); - event->context = context; - event->methodResponse = response; - return event; -} - -void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq) -{ - brokerId.decode(inBuffer); - QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId); - Mutex::ScopedLock _lock(lock); - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve()); - incOutstandingLH(); - Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence); - sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT PackageRequest seq=" << sequence); -} - -void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq) -{ - string package; - - inBuffer.getShortString(package); - QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package); - console.impl->learnPackage(package); - - Mutex::ScopedLock _lock(lock); - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve()); - incOutstandingLH(); - Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence); - outBuffer.putShortString(package); - sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT ClassQuery seq=" << sequence << " package=" << package); -} - -void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq) -{ - string text; - uint32_t code = inBuffer.getLong(); - inBuffer.getShortString(text); - QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text); -} - -void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq) -{ - uint8_t kind = inBuffer.getOctet(); - auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer)); - - QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey->impl->str()); - - if (!console.impl->haveClass(classKey.get())) { - Mutex::ScopedLock _lock(lock); - incOutstandingLH(); - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve()); - Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence); - classKey->impl->encode(outBuffer); - sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey->impl->str()); - } -} - -MethodResponsePtr BrokerProxyImpl::handleMethodResponse(Buffer& inBuffer, uint32_t seq, const SchemaMethod* schema) -{ - MethodResponsePtr response(MethodResponseImpl::factory(inBuffer, schema)); - - QPID_LOG(trace, "RCVD MethodResponse seq=" << seq << " status=" << response->getStatus() << " text=" << - response->getException()->asString()); - - return response; -} - -void BrokerProxyImpl::handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq, const string& routingKey) -{ - vector<string> tokens = qpid::split(routingKey, "."); - uint32_t agentBank; - uint64_t timestamp; - - if (routingKey.empty() || tokens.size() != 4) - agentBank = 0; - else - agentBank = ::atoi(tokens[3].c_str()); - - timestamp = inBuffer.getLongLong(); - map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank); - if (iter != agentList.end()) { - console.impl->eventAgentHeartbeat(iter->second, timestamp); - } - QPID_LOG(trace, "RCVD HeartbeatIndication seq=" << seq << " agentBank=" << agentBank); -} - -void BrokerProxyImpl::handleEventIndication(Buffer& inBuffer, uint32_t seq) -{ - auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer)); - const SchemaEventClass *schema = console.impl->getEventClass(classKey.get()); - if (schema == 0) { - QPID_LOG(trace, "No Schema Found for EventIndication. seq=" << seq << " key=" << classKey->impl->str()); - return; - } - - EventPtr eptr(EventImpl::factory(schema, inBuffer)); - - console.impl->eventEventReceived(eptr); - QPID_LOG(trace, "RCVD EventIndication seq=" << seq << " key=" << classKey->impl->str()); -} - -void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq) -{ - SchemaObjectClass* oClassPtr; - SchemaEventClass* eClassPtr; - uint8_t kind = inBuffer.getOctet(); - const SchemaClassKey* key; - if (kind == CLASS_OBJECT) { - oClassPtr = SchemaObjectClassImpl::factory(inBuffer); - console.impl->learnClass(oClassPtr); - key = oClassPtr->getClassKey(); - QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->impl->str()); - - // - // If we have just learned about the org.apache.qpid.broker:agent class, send a get - // request for the current list of agents so we can have it on-hand before we declare - // this session "stable". - // - if (key->impl->getClassName() == AGENT_CLASS && key->impl->getPackageName() == BROKER_PACKAGE) { - Mutex::ScopedLock _lock(lock); - incOutstandingLH(); - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve()); - Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence); - FieldTable ft; - ft.setString("_class", AGENT_CLASS); - ft.setString("_package", BROKER_PACKAGE); - ft.encode(outBuffer); - sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY); - QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY); - } - } else if (kind == CLASS_EVENT) { - eClassPtr = SchemaEventClassImpl::factory(inBuffer); - console.impl->learnClass(eClassPtr); - key = eClassPtr->getClassKey(); - QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->impl->str()); - } - else { - QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind); - } -} - -ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat) -{ - auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer)); - QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey->impl->str()); - - SchemaObjectClass* schema = console.impl->getSchema(classKey.get()); - if (schema == 0) { - QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey->impl->str()); - return ObjectPtr(); - } - - ObjectPtr optr(ObjectImpl::factory(schema, this, inBuffer, prop, stat, true)); - if (prop && classKey->impl->getPackageName() == BROKER_PACKAGE && classKey->impl->getClassName() == AGENT_CLASS) { - // - // We've intercepted information about a remote agent... update the agent list accordingly - // - updateAgentList(optr); - } - return optr; -} - -void BrokerProxyImpl::updateAgentList(ObjectPtr obj) -{ - Value* value = obj->getValue("agentBank"); - Mutex::ScopedLock _lock(lock); - if (value != 0 && value->isUint()) { - uint32_t agentBank = value->asUint(); - if (obj->isDeleted()) { - map<uint32_t, AgentProxyPtr>::iterator iter = agentList.find(agentBank); - if (iter != agentList.end()) { - AgentProxyPtr agent(iter->second); - console.impl->eventAgentDeleted(agent); - agentList.erase(agentBank); - QPID_LOG(trace, "Agent at bank " << agentBank << " removed from agent list"); - - // - // Release all sequence numbers for requests in-flight to this agent. - // Since the agent is no longer connected, these requests would not - // otherwise complete. - // - agent->impl->releaseInFlight(seqMgr); - } - } else { - Value* str = obj->getValue("label"); - string label; - if (str != 0 && str->isString()) - label = str->asString(); - map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank); - if (iter == agentList.end()) { - AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, agentBank, label)); - agentList[agentBank] = agent; - console.impl->eventAgentAdded(agent); - QPID_LOG(trace, "Agent '" << label << "' found at bank " << agentBank); - } - } - } -} - -void BrokerProxyImpl::incOutstandingLH() -{ - requestsOutstanding++; -} - -void BrokerProxyImpl::decOutstanding() -{ - Mutex::ScopedLock _lock(lock); - requestsOutstanding--; - if (requestsOutstanding == 0 && !topicBound) { - topicBound = true; - for (vector<pair<string, string> >::const_iterator iter = console.impl->bindingList.begin(); - iter != console.impl->bindingList.end(); iter++) { - string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first); - string key(iter->second); - eventQueue.push_back(eventBind(exchange, queueName, key)); - } - eventQueue.push_back(eventStable()); - } -} - -MethodResponseImpl::MethodResponseImpl(const MethodResponseImpl& from) : - status(from.status), schema(from.schema) -{ - if (from.exception.get()) - exception.reset(new Value(*(from.exception))); - if (from.arguments.get()) - arguments.reset(new Value(*(from.arguments))); -} - -MethodResponseImpl::MethodResponseImpl(Buffer& buf, const SchemaMethod* s) : schema(s) -{ - string text; - - status = buf.getLong(); - buf.getMediumString(text); - exception.reset(new Value(TYPE_LSTR)); - exception->setString(text.c_str()); - - if (status != 0) - return; - - arguments.reset(new Value(TYPE_MAP)); - int argCount(schema->getArgumentCount()); - for (int idx = 0; idx < argCount; idx++) { - const SchemaArgument* arg = schema->getArgument(idx); - if (arg->getDirection() == DIR_OUT || arg->getDirection() == DIR_IN_OUT) { - Value* value(ValueImpl::factory(arg->getType(), buf)); - arguments->insert(arg->getName(), value); - } - } -} - -MethodResponseImpl::MethodResponseImpl(uint32_t s, const string& text) : schema(0) -{ - status = s; - exception.reset(new Value(TYPE_LSTR)); - exception->setString(text.c_str()); -} - -MethodResponse* MethodResponseImpl::factory(Buffer& buf, const SchemaMethod* schema) -{ - MethodResponseImpl* impl(new MethodResponseImpl(buf, schema)); - return new MethodResponse(impl); -} - -MethodResponse* MethodResponseImpl::factory(uint32_t status, const std::string& text) -{ - MethodResponseImpl* impl(new MethodResponseImpl(status, text)); - return new MethodResponse(impl); -} - -bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& routingKey, Buffer& buffer) -{ - ObjectPtr object; - bool completeContext = false; - - if (opcode == Protocol::OP_BROKER_RESPONSE) { - broker.handleBrokerResponse(buffer, sequence); - completeContext = true; - } - else if (opcode == Protocol::OP_COMMAND_COMPLETE) { - broker.handleCommandComplete(buffer, sequence); - completeContext = true; - } - else if (opcode == Protocol::OP_SCHEMA_RESPONSE) { - broker.handleSchemaResponse(buffer, sequence); - completeContext = true; - } - else if (opcode == Protocol::OP_PACKAGE_INDICATION) - broker.handlePackageIndication(buffer, sequence); - else if (opcode == Protocol::OP_CLASS_INDICATION) - broker.handleClassIndication(buffer, sequence); - else if (opcode == Protocol::OP_HEARTBEAT_INDICATION) - broker.handleHeartbeatIndication(buffer, sequence, routingKey); - else if (opcode == Protocol::OP_EVENT_INDICATION) - broker.handleEventIndication(buffer, sequence); - else if (opcode == Protocol::OP_PROPERTY_INDICATION) { - object = broker.handleObjectIndication(buffer, sequence, true, false); - broker.console.impl->eventObjectUpdate(object, true, false); - } - else if (opcode == Protocol::OP_STATISTIC_INDICATION) { - object = broker.handleObjectIndication(buffer, sequence, false, true); - broker.console.impl->eventObjectUpdate(object, false, true); - } - else if (opcode == Protocol::OP_OBJECT_INDICATION) { - object = broker.handleObjectIndication(buffer, sequence, true, true); - broker.console.impl->eventObjectUpdate(object, true, true); - } - else { - QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode); - completeContext = true; - } - - return completeContext; -} - -void QueryContext::reserve() -{ - Mutex::ScopedLock _lock(lock); - requestsOutstanding++; -} - -void QueryContext::release() -{ - { - Mutex::ScopedLock _lock(lock); - if (--requestsOutstanding > 0) - return; - } - - Mutex::ScopedLock _block(broker.lock); - broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse)); -} - -bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer) -{ - bool completeContext = false; - ObjectPtr object; - - if (opcode == Protocol::OP_COMMAND_COMPLETE) { - broker.handleCommandComplete(buffer, sequence); - completeContext = true; - - // - // Visit each agent and remove the sequence from that agent's in-flight list. - // This could be made more efficient because only one agent will have this sequence - // in its list. - // - map<uint32_t, AgentProxyPtr> copy; - { - Mutex::ScopedLock _block(broker.lock); - copy = broker.agentList; - } - for (map<uint32_t, AgentProxyPtr>::iterator iter = copy.begin(); iter != copy.end(); iter++) - iter->second->impl->delSequence(sequence); - } - else if (opcode == Protocol::OP_OBJECT_INDICATION) { - object = broker.handleObjectIndication(buffer, sequence, true, true); - if (object.get() != 0) - queryResponse->impl->results.push_back(object); - } - else { - QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode); - completeContext = true; - } - - return completeContext; -} - -void MethodContext::release() -{ - Mutex::ScopedLock _block(broker.lock); - broker.eventQueue.push_back(broker.eventMethodResponse(userContext, methodResponse)); -} - -bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer) -{ - if (opcode == Protocol::OP_METHOD_RESPONSE) - methodResponse = broker.handleMethodResponse(buffer, sequence, schema); - else - QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode); - - return true; -} - - -//================================================================== -// Wrappers -//================================================================== - -AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {} -AgentProxy::AgentProxy(const AgentProxy& from) : impl(new AgentProxyImpl(*(from.impl))) {} -AgentProxy::~AgentProxy() { delete impl; } -const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); } -uint32_t AgentProxy::getBrokerBank() const { return impl->getBrokerBank(); } -uint32_t AgentProxy::getAgentBank() const { return impl->getAgentBank(); } - -BrokerProxy::BrokerProxy(Console& console) : impl(new BrokerProxyImpl(*this, console)) {} -BrokerProxy::~BrokerProxy() { delete impl; } -void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); } -void BrokerProxy::sessionClosed() { impl->sessionClosed(); } -void BrokerProxy::startProtocol() { impl->startProtocol(); } -void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); } -bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); } -void BrokerProxy::popXmt() { impl->popXmt(); } -bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); } -void BrokerProxy::popEvent() { impl->popEvent(); } -uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); } -const AgentProxy* BrokerProxy::getAgent(uint32_t idx) const { return impl->getAgent(idx); } -void BrokerProxy::sendQuery(const Query& query, void* context, const AgentProxy* agent) { impl->sendQuery(query, context, agent); } - -MethodResponse::MethodResponse(const MethodResponse& from) : impl(new MethodResponseImpl(*(from.impl))) {} -MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {} -MethodResponse::~MethodResponse() {} -uint32_t MethodResponse::getStatus() const { return impl->getStatus(); } -const Value* MethodResponse::getException() const { return impl->getException(); } -const Value* MethodResponse::getArgs() const { return impl->getArgs(); } - -QueryResponse::QueryResponse(QueryResponseImpl* i) : impl(i) {} -QueryResponse::~QueryResponse() {} -uint32_t QueryResponse::getStatus() const { return impl->getStatus(); } -const Value* QueryResponse::getException() const { return impl->getException(); } -uint32_t QueryResponse::getObjectCount() const { return impl->getObjectCount(); } -const Object* QueryResponse::getObject(uint32_t idx) const { return impl->getObject(idx); } - |