summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qmf/BrokerProxyImpl.cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-09-18 20:15:15 +0000
committerTed Ross <tross@apache.org>2009-09-18 20:15:15 +0000
commitc8fa5fa308f6ad9be22612568ace703777fbb6d9 (patch)
tree148275652ac2572ef4bb0863736a337a5d1617e2 /qpid/cpp/src/qmf/BrokerProxyImpl.cpp
parentd3c07faea48e2dbd57cf27fac2d9940ca6456a69 (diff)
downloadqpid-python-c8fa5fa308f6ad9be22612568ace703777fbb6d9.tar.gz
Refactored the QMF engine to adhere to the following rules regarding
the pimpl (Pointer to Implementation) pattern: 1) Impl classes have constructors matching the public constructors 2) Additional Impl constructors are accessed through a static factory function 3) All linkages to objects are to the public object 4) If a back-link (from Impl to public) is needed, the Impl class must be derived from boost::noncopyable 5) All public classes have non-default copy constructors that make a copy of the Impl class git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@816770 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qmf/BrokerProxyImpl.cpp')
-rw-r--r--qpid/cpp/src/qmf/BrokerProxyImpl.cpp135
1 files changed, 73 insertions, 62 deletions
diff --git a/qpid/cpp/src/qmf/BrokerProxyImpl.cpp b/qpid/cpp/src/qmf/BrokerProxyImpl.cpp
index 47dca309d7..29e51566b3 100644
--- a/qpid/cpp/src/qmf/BrokerProxyImpl.cpp
+++ b/qpid/cpp/src/qmf/BrokerProxyImpl.cpp
@@ -43,7 +43,7 @@ namespace {
const Object* QueryResponseImpl::getObject(uint32_t idx) const
{
- vector<ObjectImpl::Ptr>::const_iterator iter = results.begin();
+ vector<ObjectPtr>::const_iterator iter = results.begin();
while (idx > 0) {
if (iter == results.end())
@@ -52,7 +52,7 @@ const Object* QueryResponseImpl::getObject(uint32_t idx) const
idx--;
}
- return (*iter)->envelope;
+ return iter->get();
}
#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
@@ -68,14 +68,13 @@ BrokerEvent BrokerEventImpl::copy()
STRING_REF(exchange);
STRING_REF(bindingKey);
item.context = context;
- item.queryResponse = queryResponse.get() ? queryResponse->envelope : 0;
- item.methodResponse = methodResponse.get() ? methodResponse->envelope : 0;
+ item.queryResponse = queryResponse.get();
+ item.methodResponse = methodResponse.get();
return item;
}
-BrokerProxyImpl::BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console) :
- envelope(e), console(_console.impl)
+BrokerProxyImpl::BrokerProxyImpl(BrokerProxy& pub, ConsoleEngine& _console) : publicObject(pub), console(_console)
{
stringstream qn;
qpid::TcpAddress addr;
@@ -114,7 +113,7 @@ void BrokerProxyImpl::startProtocol()
char rawbuffer[512];
Buffer buffer(rawbuffer, 512);
- agentList.push_back(AgentProxyImpl::Ptr(new AgentProxyImpl(console, this, 0, "Agent embedded in broker")));
+ agentList.push_back(AgentProxyPtr(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker")));
requestsOutstanding = 1;
topicBound = false;
@@ -190,10 +189,10 @@ uint32_t BrokerProxyImpl::agentCount() const
const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const
{
Mutex::ScopedLock _lock(lock);
- for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
+ for (vector<AgentProxyPtr>::const_iterator iter = agentList.begin();
iter != agentList.end(); iter++)
if (idx-- == 0)
- return (*iter)->envelope;
+ return iter->get();
return 0;
}
@@ -202,17 +201,17 @@ void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentPr
SequenceContext::Ptr queryContext(new QueryContext(*this, context));
Mutex::ScopedLock _lock(lock);
if (agent != 0) {
- sendGetRequestLH(queryContext, query, agent->impl);
+ sendGetRequestLH(queryContext, query, agent);
} else {
// TODO (optimization) only send queries to agents that have the requested class+package
- for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
+ for (vector<AgentProxyPtr>::const_iterator iter = agentList.begin();
iter != agentList.end(); iter++) {
sendGetRequestLH(queryContext, query, (*iter).get());
}
}
}
-void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent)
+void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent)
{
stringstream key;
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
@@ -220,7 +219,7 @@ void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const
Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
query.impl->encode(outBuffer);
- key << "agent.1." << agent->agentBank;
+ key << "agent.1." << agent->impl->agentBank;
sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str());
}
@@ -250,7 +249,7 @@ string BrokerProxyImpl::encodeMethodArguments(const SchemaMethod* schema, const
return string();
}
-void BrokerProxyImpl::sendMethodRequest(ObjectIdImpl* oid, const SchemaObjectClass* cls,
+void BrokerProxyImpl::sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls,
const string& methodName, const Value* args, void* userContext)
{
int methodCount = cls->getMethodCount();
@@ -259,30 +258,30 @@ void BrokerProxyImpl::sendMethodRequest(ObjectIdImpl* oid, const SchemaObjectCla
const SchemaMethod* method = cls->getMethod(idx);
if (string(method->getName()) == methodName) {
Mutex::ScopedLock _lock(lock);
- SequenceContext::Ptr methodContext(new MethodContext(*this, userContext, method->impl));
+ SequenceContext::Ptr methodContext(new MethodContext(*this, userContext, method));
stringstream key;
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t sequence(seqMgr.reserve(methodContext));
Protocol::encodeHeader(outBuffer, Protocol::OP_METHOD_REQUEST, sequence);
- oid->encode(outBuffer);
+ oid->impl->encode(outBuffer);
cls->getClassKey()->impl->encode(outBuffer);
outBuffer.putShortString(methodName);
string argErrorString = encodeMethodArguments(method, args, outBuffer);
if (argErrorString.empty()) {
- key << "agent.1." << oid->getAgentBank();
+ key << "agent.1." << oid->impl->getAgentBank();
sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
QPID_LOG(trace, "SENT MethodRequest seq=" << sequence << " method=" << methodName << " key=" << key.str());
} else {
- MethodResponseImpl::Ptr argError(new MethodResponseImpl(1, argErrorString));
+ MethodResponsePtr argError(MethodResponseImpl::factory(1, argErrorString));
eventQueue.push_back(eventMethodResponse(userContext, argError));
}
return;
}
}
- MethodResponseImpl::Ptr error(new MethodResponseImpl(1, string("Unknown method: ") + methodName));
+ MethodResponsePtr error(MethodResponseImpl::factory(1, string("Unknown method: ") + methodName));
Mutex::ScopedLock _lock(lock);
eventQueue.push_back(eventMethodResponse(userContext, error));
}
@@ -322,7 +321,7 @@ BrokerEventImpl::Ptr BrokerProxyImpl::eventStable()
return event;
}
-BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponseImpl::Ptr response)
+BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponsePtr response)
{
BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE));
event->context = context;
@@ -330,7 +329,7 @@ BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryRes
return event;
}
-BrokerEventImpl::Ptr BrokerProxyImpl::eventMethodResponse(void* context, MethodResponseImpl::Ptr response)
+BrokerEventImpl::Ptr BrokerProxyImpl::eventMethodResponse(void* context, MethodResponsePtr response)
{
BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::METHOD_RESPONSE));
event->context = context;
@@ -357,7 +356,7 @@ void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq)
inBuffer.getShortString(package);
QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package);
- console->learnPackage(package);
+ console.impl->learnPackage(package);
Mutex::ScopedLock _lock(lock);
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
@@ -380,25 +379,25 @@ void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq)
void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq)
{
uint8_t kind = inBuffer.getOctet();
- SchemaClassKeyImpl classKey(inBuffer);
+ auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer));
- QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey.str());
+ QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey->impl->str());
- if (!console->haveClass(classKey)) {
+ if (!console.impl->haveClass(classKey.get())) {
Mutex::ScopedLock _lock(lock);
incOutstandingLH();
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t sequence(seqMgr.reserve());
Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence);
- classKey.encode(outBuffer);
+ classKey->impl->encode(outBuffer);
sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey.str());
+ QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey->impl->str());
}
}
-MethodResponseImpl::Ptr BrokerProxyImpl::handleMethodResponse(Buffer& inBuffer, uint32_t seq, SchemaMethodImpl* schema)
+MethodResponsePtr BrokerProxyImpl::handleMethodResponse(Buffer& inBuffer, uint32_t seq, const SchemaMethod* schema)
{
- MethodResponseImpl::Ptr response(new MethodResponseImpl(inBuffer, schema));
+ MethodResponsePtr response(MethodResponseImpl::factory(inBuffer, schema));
QPID_LOG(trace, "RCVD MethodResponse seq=" << seq << " status=" << response->getStatus() << " text=" <<
response->getException()->asString());
@@ -418,15 +417,15 @@ void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq
void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
{
- SchemaObjectClassImpl::Ptr oClassPtr;
- SchemaEventClassImpl::Ptr eClassPtr;
+ SchemaObjectClass* oClassPtr;
+ SchemaEventClass* eClassPtr;
uint8_t kind = inBuffer.getOctet();
- const SchemaClassKeyImpl* key;
+ const SchemaClassKey* key;
if (kind == CLASS_OBJECT) {
- oClassPtr.reset(new SchemaObjectClassImpl(inBuffer));
- console->learnClass(oClassPtr);
- key = oClassPtr->getClassKey()->impl;
- QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->str());
+ oClassPtr = SchemaObjectClassImpl::factory(inBuffer);
+ console.impl->learnClass(oClassPtr);
+ key = oClassPtr->getClassKey();
+ QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->impl->str());
//
// If we have just learned about the org.apache.qpid.broker:agent class, send a get
@@ -447,28 +446,28 @@ void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY);
}
} else if (kind == CLASS_EVENT) {
- eClassPtr.reset(new SchemaEventClassImpl(inBuffer));
- console->learnClass(eClassPtr);
- key = eClassPtr->getClassKey()->impl;
- QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->str());
+ eClassPtr = SchemaEventClassImpl::factory(inBuffer);
+ console.impl->learnClass(eClassPtr);
+ key = eClassPtr->getClassKey();
+ QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->impl->str());
}
else {
QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind);
}
}
-ObjectImpl::Ptr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat)
+ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat)
{
- SchemaClassKeyImpl classKey(inBuffer);
- QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey.str());
+ auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer));
+ QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey->impl->str());
- SchemaObjectClassImpl::Ptr schema = console->getSchema(classKey);
- if (schema.get() == 0) {
- QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey.str());
- return ObjectImpl::Ptr();
+ SchemaObjectClass* schema = console.impl->getSchema(classKey.get());
+ if (schema == 0) {
+ QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey->impl->str());
+ return ObjectPtr();
}
- return ObjectImpl::Ptr(new ObjectImpl(schema->envelope, this, inBuffer, prop, stat, true));
+ return ObjectPtr(ObjectImpl::factory(schema, this, inBuffer, prop, stat, true));
}
void BrokerProxyImpl::incOutstandingLH()
@@ -482,8 +481,8 @@ void BrokerProxyImpl::decOutstanding()
requestsOutstanding--;
if (requestsOutstanding == 0 && !topicBound) {
topicBound = true;
- for (vector<pair<string, string> >::const_iterator iter = console->bindingList.begin();
- iter != console->bindingList.end(); iter++) {
+ for (vector<pair<string, string> >::const_iterator iter = console.impl->bindingList.begin();
+ iter != console.impl->bindingList.end(); iter++) {
string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first);
string key(iter->second);
eventQueue.push_back(eventBind(exchange, queueName, key));
@@ -493,15 +492,15 @@ void BrokerProxyImpl::decOutstanding()
}
MethodResponseImpl::MethodResponseImpl(const MethodResponseImpl& from) :
- envelope(from.envelope), // !!!! TODO !!!! this is not right
- status(from.status), schema(from.schema),
- exception(from.exception.get() ? new Value(*from.exception) : 0),
- arguments(from.arguments.get() ? new Value(*from.arguments) : 0)
+ status(from.status), schema(from.schema)
{
+ if (from.exception.get())
+ exception.reset(new Value(*(from.exception)));
+ if (from.arguments.get())
+ arguments.reset(new Value(*(from.arguments)));
}
-MethodResponseImpl::MethodResponseImpl(Buffer& buf, SchemaMethodImpl* s) :
- envelope(new MethodResponse(this)), schema(s)
+MethodResponseImpl::MethodResponseImpl(Buffer& buf, const SchemaMethod* s) : schema(s)
{
string text;
@@ -518,19 +517,31 @@ MethodResponseImpl::MethodResponseImpl(Buffer& buf, SchemaMethodImpl* s) :
for (int idx = 0; idx < argCount; idx++) {
const SchemaArgument* arg = schema->getArgument(idx);
if (arg->getDirection() == DIR_OUT || arg->getDirection() == DIR_IN_OUT) {
- ValueImpl* value(new ValueImpl(arg->getType(), buf));
- arguments->insert(arg->getName(), value->envelope);
+ Value* value(ValueImpl::factory(arg->getType(), buf));
+ arguments->insert(arg->getName(), value);
}
}
}
-MethodResponseImpl::MethodResponseImpl(uint32_t s, const string& text) : envelope(new MethodResponse(this)), schema(0)
+MethodResponseImpl::MethodResponseImpl(uint32_t s, const string& text) : schema(0)
{
status = s;
exception.reset(new Value(TYPE_LSTR));
exception->setString(text.c_str());
}
+MethodResponse* MethodResponseImpl::factory(Buffer& buf, const SchemaMethod* schema)
+{
+ MethodResponseImpl* impl(new MethodResponseImpl(buf, schema));
+ return new MethodResponse(impl);
+}
+
+MethodResponse* MethodResponseImpl::factory(uint32_t status, const std::string& text)
+{
+ MethodResponseImpl* impl(new MethodResponseImpl(status, text));
+ return new MethodResponse(impl);
+}
+
bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
{
bool completeContext = false;
@@ -589,7 +600,7 @@ void QueryContext::release()
bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
{
bool completeContext = false;
- ObjectImpl::Ptr object;
+ ObjectPtr object;
if (opcode == Protocol::OP_COMMAND_COMPLETE) {
broker.handleCommandComplete(buffer, sequence);
@@ -598,7 +609,7 @@ bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buff
else if (opcode == Protocol::OP_OBJECT_INDICATION) {
object = broker.handleObjectIndication(buffer, sequence, true, true);
if (object.get() != 0)
- queryResponse->results.push_back(object);
+ queryResponse->impl->results.push_back(object);
}
else {
QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
@@ -633,7 +644,7 @@ AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {}
AgentProxy::~AgentProxy() { delete impl; }
const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); }
-BrokerProxy::BrokerProxy(ConsoleEngine& console) : impl(new BrokerProxyImpl(this, console)) {}
+BrokerProxy::BrokerProxy(ConsoleEngine& console) : impl(new BrokerProxyImpl(*this, console)) {}
BrokerProxy::~BrokerProxy() { delete impl; }
void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); }
void BrokerProxy::sessionClosed() { impl->sessionClosed(); }