From ab7955c6747f6377c2d36fe8686c3cec39d003b2 Mon Sep 17 00:00:00 2001 From: "Carl C. Trieloff" Date: Fri, 13 Nov 2009 19:30:07 +0000 Subject: Add management subscription object git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@835962 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/SemanticState.cpp | 53 +++++++++++++++++++++++++++++++++-- cpp/src/qpid/broker/SemanticState.h | 8 +++++- 2 files changed, 57 insertions(+), 4 deletions(-) (limited to 'cpp') diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 7e3090bf17..3e23af99c0 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -58,6 +58,11 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; using qpid::ptr_map_ptr; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; +namespace _qmf = qmf::org::apache::qpid::broker; SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) : session(ss), @@ -261,8 +266,38 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, byteCredit(0), notifyEnabled(true), syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)), - deliveryCount(0) -{} + deliveryCount(0), + mgmtObject(0) +{ + if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0) + { + ManagementAgent* agent = parent->session.getBroker().getManagementAgent(); + qpid::management::Manageable* ms = dynamic_cast (&(parent->session)); + + if (agent != 0) + { + mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name ,arguments, + acquire, ackExpected, syncFrequency, resumeId, resumeTtl, exclusive); + agent->addObject (mgmtObject, agent->allocateId(this)); + mgmtObject->set_mode("WINDOW"); + } + } +} + +ManagementObject* SemanticState::ConsumerImpl::GetManagementObject (void) const +{ + return (ManagementObject*) mgmtObject; +} + +Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&) +{ + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + + QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]"); + + return status; +} + OwnershipToken* SemanticState::ConsumerImpl::getSession() { @@ -283,6 +318,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) if (acquire && !ackExpected) { queue->dequeue(0, msg); } + if (mgmtObject) { mgmtObject->inc_delivered(); } return true; } @@ -299,6 +335,7 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr msg) // in future. // blocked = !(filter(msg) && checkCredit(msg)); + if (mgmtObject && !blocked && acquire) { mgmtObject->inc_accepted(); } return !blocked; } @@ -341,7 +378,11 @@ bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr& msg) return enoughCredit; } -SemanticState::ConsumerImpl::~ConsumerImpl() {} +SemanticState::ConsumerImpl::~ConsumerImpl() +{ + if (mgmtObject != 0) + mgmtObject->resourceDestroy (); +} void SemanticState::cancel(ConsumerImpl::shared_ptr c) { @@ -524,11 +565,17 @@ void SemanticState::stop(const std::string& destination) void SemanticState::ConsumerImpl::setWindowMode() { windowing = true; + if (mgmtObject){ + mgmtObject->set_mode("WINDOW"); + } } void SemanticState::ConsumerImpl::setCreditMode() { windowing = false; + if (mgmtObject){ + mgmtObject->set_mode("CREDIT"); + } } void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 89fe7b83dd..99f793c1fc 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -38,6 +38,7 @@ #include "qpid/sys/Mutex.h" #include "qpid/sys/AtomicValue.h" #include "qpid/broker/AclModule.h" +#include "qmf/org/apache/qpid/broker/Subscription.h" #include #include @@ -58,7 +59,8 @@ class SessionContext; class SemanticState : private boost::noncopyable { public: class ConsumerImpl : public Consumer, public sys::OutputTask, - public boost::enable_shared_from_this + public boost::enable_shared_from_this, + public management::Manageable { mutable qpid::sys::Mutex lock; SemanticState* const parent; @@ -77,6 +79,7 @@ class SemanticState : private boost::noncopyable { bool notifyEnabled; const int syncFrequency; int deliveryCount; + qmf::org::apache::qpid::broker::Subscription* mgmtObject; bool checkCredit(boost::intrusive_ptr& msg); void allocateCredit(boost::intrusive_ptr& msg); @@ -130,6 +133,9 @@ class SemanticState : private boost::noncopyable { SemanticState& getParent() { return *parent; } const SemanticState& getParent() const { return *parent; } + // Manageable entry points + management::ManagementObject* GetManagementObject (void) const; + management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); }; private: -- cgit v1.2.1