diff options
| author | Ted Ross <tross@apache.org> | 2009-09-18 20:15:15 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2009-09-18 20:15:15 +0000 |
| commit | c8fa5fa308f6ad9be22612568ace703777fbb6d9 (patch) | |
| tree | 148275652ac2572ef4bb0863736a337a5d1617e2 /qpid/cpp/src/qmf/BrokerProxyImpl.cpp | |
| parent | d3c07faea48e2dbd57cf27fac2d9940ca6456a69 (diff) | |
| download | qpid-python-c8fa5fa308f6ad9be22612568ace703777fbb6d9.tar.gz | |
Refactored the QMF engine to adhere to the following rules regarding
the pimpl (Pointer to Implementation) pattern:
1) Impl classes have constructors matching the public constructors
2) Additional Impl constructors are accessed through a static factory function
3) All linkages to objects are to the public object
4) If a back-link (from Impl to public) is needed, the Impl class must be
derived from boost::noncopyable
5) All public classes have non-default copy constructors that make a copy of the
Impl class
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@816770 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qmf/BrokerProxyImpl.cpp')
| -rw-r--r-- | qpid/cpp/src/qmf/BrokerProxyImpl.cpp | 135 |
1 files changed, 73 insertions, 62 deletions
diff --git a/qpid/cpp/src/qmf/BrokerProxyImpl.cpp b/qpid/cpp/src/qmf/BrokerProxyImpl.cpp index 47dca309d7..29e51566b3 100644 --- a/qpid/cpp/src/qmf/BrokerProxyImpl.cpp +++ b/qpid/cpp/src/qmf/BrokerProxyImpl.cpp @@ -43,7 +43,7 @@ namespace { const Object* QueryResponseImpl::getObject(uint32_t idx) const { - vector<ObjectImpl::Ptr>::const_iterator iter = results.begin(); + vector<ObjectPtr>::const_iterator iter = results.begin(); while (idx > 0) { if (iter == results.end()) @@ -52,7 +52,7 @@ const Object* QueryResponseImpl::getObject(uint32_t idx) const idx--; } - return (*iter)->envelope; + return iter->get(); } #define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} @@ -68,14 +68,13 @@ BrokerEvent BrokerEventImpl::copy() STRING_REF(exchange); STRING_REF(bindingKey); item.context = context; - item.queryResponse = queryResponse.get() ? queryResponse->envelope : 0; - item.methodResponse = methodResponse.get() ? methodResponse->envelope : 0; + item.queryResponse = queryResponse.get(); + item.methodResponse = methodResponse.get(); return item; } -BrokerProxyImpl::BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console) : - envelope(e), console(_console.impl) +BrokerProxyImpl::BrokerProxyImpl(BrokerProxy& pub, ConsoleEngine& _console) : publicObject(pub), console(_console) { stringstream qn; qpid::TcpAddress addr; @@ -114,7 +113,7 @@ void BrokerProxyImpl::startProtocol() char rawbuffer[512]; Buffer buffer(rawbuffer, 512); - agentList.push_back(AgentProxyImpl::Ptr(new AgentProxyImpl(console, this, 0, "Agent embedded in broker"))); + agentList.push_back(AgentProxyPtr(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker"))); requestsOutstanding = 1; topicBound = false; @@ -190,10 +189,10 @@ uint32_t BrokerProxyImpl::agentCount() const const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const { Mutex::ScopedLock _lock(lock); - for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin(); + for (vector<AgentProxyPtr>::const_iterator iter = agentList.begin(); iter != agentList.end(); iter++) if (idx-- == 0) - return (*iter)->envelope; + return iter->get(); return 0; } @@ -202,17 +201,17 @@ void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentPr SequenceContext::Ptr queryContext(new QueryContext(*this, context)); Mutex::ScopedLock _lock(lock); if (agent != 0) { - sendGetRequestLH(queryContext, query, agent->impl); + sendGetRequestLH(queryContext, query, agent); } else { // TODO (optimization) only send queries to agents that have the requested class+package - for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin(); + for (vector<AgentProxyPtr>::const_iterator iter = agentList.begin(); iter != agentList.end(); iter++) { sendGetRequestLH(queryContext, query, (*iter).get()); } } } -void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent) +void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent) { stringstream key; Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); @@ -220,7 +219,7 @@ void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence); query.impl->encode(outBuffer); - key << "agent.1." << agent->agentBank; + key << "agent.1." << agent->impl->agentBank; sendBufferLH(outBuffer, QMF_EXCHANGE, key.str()); QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str()); } @@ -250,7 +249,7 @@ string BrokerProxyImpl::encodeMethodArguments(const SchemaMethod* schema, const return string(); } -void BrokerProxyImpl::sendMethodRequest(ObjectIdImpl* oid, const SchemaObjectClass* cls, +void BrokerProxyImpl::sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, const string& methodName, const Value* args, void* userContext) { int methodCount = cls->getMethodCount(); @@ -259,30 +258,30 @@ void BrokerProxyImpl::sendMethodRequest(ObjectIdImpl* oid, const SchemaObjectCla const SchemaMethod* method = cls->getMethod(idx); if (string(method->getName()) == methodName) { Mutex::ScopedLock _lock(lock); - SequenceContext::Ptr methodContext(new MethodContext(*this, userContext, method->impl)); + SequenceContext::Ptr methodContext(new MethodContext(*this, userContext, method)); stringstream key; Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t sequence(seqMgr.reserve(methodContext)); Protocol::encodeHeader(outBuffer, Protocol::OP_METHOD_REQUEST, sequence); - oid->encode(outBuffer); + oid->impl->encode(outBuffer); cls->getClassKey()->impl->encode(outBuffer); outBuffer.putShortString(methodName); string argErrorString = encodeMethodArguments(method, args, outBuffer); if (argErrorString.empty()) { - key << "agent.1." << oid->getAgentBank(); + key << "agent.1." << oid->impl->getAgentBank(); sendBufferLH(outBuffer, QMF_EXCHANGE, key.str()); QPID_LOG(trace, "SENT MethodRequest seq=" << sequence << " method=" << methodName << " key=" << key.str()); } else { - MethodResponseImpl::Ptr argError(new MethodResponseImpl(1, argErrorString)); + MethodResponsePtr argError(MethodResponseImpl::factory(1, argErrorString)); eventQueue.push_back(eventMethodResponse(userContext, argError)); } return; } } - MethodResponseImpl::Ptr error(new MethodResponseImpl(1, string("Unknown method: ") + methodName)); + MethodResponsePtr error(MethodResponseImpl::factory(1, string("Unknown method: ") + methodName)); Mutex::ScopedLock _lock(lock); eventQueue.push_back(eventMethodResponse(userContext, error)); } @@ -322,7 +321,7 @@ BrokerEventImpl::Ptr BrokerProxyImpl::eventStable() return event; } -BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponseImpl::Ptr response) +BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponsePtr response) { BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE)); event->context = context; @@ -330,7 +329,7 @@ BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryRes return event; } -BrokerEventImpl::Ptr BrokerProxyImpl::eventMethodResponse(void* context, MethodResponseImpl::Ptr response) +BrokerEventImpl::Ptr BrokerProxyImpl::eventMethodResponse(void* context, MethodResponsePtr response) { BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::METHOD_RESPONSE)); event->context = context; @@ -357,7 +356,7 @@ void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq) inBuffer.getShortString(package); QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package); - console->learnPackage(package); + console.impl->learnPackage(package); Mutex::ScopedLock _lock(lock); Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); @@ -380,25 +379,25 @@ void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq) void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq) { uint8_t kind = inBuffer.getOctet(); - SchemaClassKeyImpl classKey(inBuffer); + auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer)); - QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey.str()); + QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey->impl->str()); - if (!console->haveClass(classKey)) { + if (!console.impl->haveClass(classKey.get())) { Mutex::ScopedLock _lock(lock); incOutstandingLH(); Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t sequence(seqMgr.reserve()); Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence); - classKey.encode(outBuffer); + classKey->impl->encode(outBuffer); sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey.str()); + QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey->impl->str()); } } -MethodResponseImpl::Ptr BrokerProxyImpl::handleMethodResponse(Buffer& inBuffer, uint32_t seq, SchemaMethodImpl* schema) +MethodResponsePtr BrokerProxyImpl::handleMethodResponse(Buffer& inBuffer, uint32_t seq, const SchemaMethod* schema) { - MethodResponseImpl::Ptr response(new MethodResponseImpl(inBuffer, schema)); + MethodResponsePtr response(MethodResponseImpl::factory(inBuffer, schema)); QPID_LOG(trace, "RCVD MethodResponse seq=" << seq << " status=" << response->getStatus() << " text=" << response->getException()->asString()); @@ -418,15 +417,15 @@ void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq) { - SchemaObjectClassImpl::Ptr oClassPtr; - SchemaEventClassImpl::Ptr eClassPtr; + SchemaObjectClass* oClassPtr; + SchemaEventClass* eClassPtr; uint8_t kind = inBuffer.getOctet(); - const SchemaClassKeyImpl* key; + const SchemaClassKey* key; if (kind == CLASS_OBJECT) { - oClassPtr.reset(new SchemaObjectClassImpl(inBuffer)); - console->learnClass(oClassPtr); - key = oClassPtr->getClassKey()->impl; - QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->str()); + oClassPtr = SchemaObjectClassImpl::factory(inBuffer); + console.impl->learnClass(oClassPtr); + key = oClassPtr->getClassKey(); + QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->impl->str()); // // If we have just learned about the org.apache.qpid.broker:agent class, send a get @@ -447,28 +446,28 @@ void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq) QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY); } } else if (kind == CLASS_EVENT) { - eClassPtr.reset(new SchemaEventClassImpl(inBuffer)); - console->learnClass(eClassPtr); - key = eClassPtr->getClassKey()->impl; - QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->str()); + eClassPtr = SchemaEventClassImpl::factory(inBuffer); + console.impl->learnClass(eClassPtr); + key = eClassPtr->getClassKey(); + QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->impl->str()); } else { QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind); } } -ObjectImpl::Ptr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat) +ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat) { - SchemaClassKeyImpl classKey(inBuffer); - QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey.str()); + auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer)); + QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey->impl->str()); - SchemaObjectClassImpl::Ptr schema = console->getSchema(classKey); - if (schema.get() == 0) { - QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey.str()); - return ObjectImpl::Ptr(); + SchemaObjectClass* schema = console.impl->getSchema(classKey.get()); + if (schema == 0) { + QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey->impl->str()); + return ObjectPtr(); } - return ObjectImpl::Ptr(new ObjectImpl(schema->envelope, this, inBuffer, prop, stat, true)); + return ObjectPtr(ObjectImpl::factory(schema, this, inBuffer, prop, stat, true)); } void BrokerProxyImpl::incOutstandingLH() @@ -482,8 +481,8 @@ void BrokerProxyImpl::decOutstanding() requestsOutstanding--; if (requestsOutstanding == 0 && !topicBound) { topicBound = true; - for (vector<pair<string, string> >::const_iterator iter = console->bindingList.begin(); - iter != console->bindingList.end(); iter++) { + for (vector<pair<string, string> >::const_iterator iter = console.impl->bindingList.begin(); + iter != console.impl->bindingList.end(); iter++) { string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first); string key(iter->second); eventQueue.push_back(eventBind(exchange, queueName, key)); @@ -493,15 +492,15 @@ void BrokerProxyImpl::decOutstanding() } MethodResponseImpl::MethodResponseImpl(const MethodResponseImpl& from) : - envelope(from.envelope), // !!!! TODO !!!! this is not right - status(from.status), schema(from.schema), - exception(from.exception.get() ? new Value(*from.exception) : 0), - arguments(from.arguments.get() ? new Value(*from.arguments) : 0) + status(from.status), schema(from.schema) { + if (from.exception.get()) + exception.reset(new Value(*(from.exception))); + if (from.arguments.get()) + arguments.reset(new Value(*(from.arguments))); } -MethodResponseImpl::MethodResponseImpl(Buffer& buf, SchemaMethodImpl* s) : - envelope(new MethodResponse(this)), schema(s) +MethodResponseImpl::MethodResponseImpl(Buffer& buf, const SchemaMethod* s) : schema(s) { string text; @@ -518,19 +517,31 @@ MethodResponseImpl::MethodResponseImpl(Buffer& buf, SchemaMethodImpl* s) : for (int idx = 0; idx < argCount; idx++) { const SchemaArgument* arg = schema->getArgument(idx); if (arg->getDirection() == DIR_OUT || arg->getDirection() == DIR_IN_OUT) { - ValueImpl* value(new ValueImpl(arg->getType(), buf)); - arguments->insert(arg->getName(), value->envelope); + Value* value(ValueImpl::factory(arg->getType(), buf)); + arguments->insert(arg->getName(), value); } } } -MethodResponseImpl::MethodResponseImpl(uint32_t s, const string& text) : envelope(new MethodResponse(this)), schema(0) +MethodResponseImpl::MethodResponseImpl(uint32_t s, const string& text) : schema(0) { status = s; exception.reset(new Value(TYPE_LSTR)); exception->setString(text.c_str()); } +MethodResponse* MethodResponseImpl::factory(Buffer& buf, const SchemaMethod* schema) +{ + MethodResponseImpl* impl(new MethodResponseImpl(buf, schema)); + return new MethodResponse(impl); +} + +MethodResponse* MethodResponseImpl::factory(uint32_t status, const std::string& text) +{ + MethodResponseImpl* impl(new MethodResponseImpl(status, text)); + return new MethodResponse(impl); +} + bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer) { bool completeContext = false; @@ -589,7 +600,7 @@ void QueryContext::release() bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer) { bool completeContext = false; - ObjectImpl::Ptr object; + ObjectPtr object; if (opcode == Protocol::OP_COMMAND_COMPLETE) { broker.handleCommandComplete(buffer, sequence); @@ -598,7 +609,7 @@ bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buff else if (opcode == Protocol::OP_OBJECT_INDICATION) { object = broker.handleObjectIndication(buffer, sequence, true, true); if (object.get() != 0) - queryResponse->results.push_back(object); + queryResponse->impl->results.push_back(object); } else { QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode); @@ -633,7 +644,7 @@ AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {} AgentProxy::~AgentProxy() { delete impl; } const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); } -BrokerProxy::BrokerProxy(ConsoleEngine& console) : impl(new BrokerProxyImpl(this, console)) {} +BrokerProxy::BrokerProxy(ConsoleEngine& console) : impl(new BrokerProxyImpl(*this, console)) {} BrokerProxy::~BrokerProxy() { delete impl; } void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); } void BrokerProxy::sessionClosed() { impl->sessionClosed(); } |
