diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 45 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/log/Logger.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/BlockingQueue.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/Waitable.h | 3 |
5 files changed, 58 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 33364e48df..0cc150419f 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -34,10 +34,15 @@ #include "qmf/org/apache/qpid/broker/Package.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h" #include "qpid/management/ManagementDirectExchange.h" #include "qpid/management/ManagementTopicExchange.h" +#include "qpid/log/Logger.h" +#include "qpid/log/Options.h" #include "qpid/log/Statement.h" +#include "qpid/log/posix/SinkOptions.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/framing/Uuid.h" @@ -51,6 +56,7 @@ #include "qpid/sys/TimeoutHandler.h" #include "qpid/sys/SystemInfo.h" #include "qpid/Address.h" +#include "qpid/StringUtils.h" #include "qpid/Url.h" #include "qpid/Version.h" @@ -388,17 +394,21 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; - QPID_LOG (debug, "Broker::ManagementMethod [id=" << methodId << "]"); - switch (methodId) { case _qmf::Broker::METHOD_ECHO : + QPID_LOG (debug, "Broker::echo(" + << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence + << ", " + << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body + << ")"); status = Manageable::STATUS_OK; break; case _qmf::Broker::METHOD_CONNECT : { _qmf::ArgsBrokerConnect& hp= dynamic_cast<_qmf::ArgsBrokerConnect&>(args); + QPID_LOG (debug, "Broker::connect()"); string transport = hp.i_transport.empty() ? TCP_TRANSPORT : hp.i_transport; if (!getProtocolFactory(transport)) { QPID_LOG(error, "Transport '" << transport << "' not supported"); @@ -415,13 +425,25 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, case _qmf::Broker::METHOD_QUEUEMOVEMESSAGES : { _qmf::ArgsBrokerQueueMoveMessages& moveArgs= dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args); + QPID_LOG (debug, "Broker::queueMoveMessages()"); if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty)) status = Manageable::STATUS_OK; else return Manageable::STATUS_PARAMETER_INVALID; break; } + case _qmf::Broker::METHOD_SETLOGLEVEL : + setLogLevel(dynamic_cast<_qmf::ArgsBrokerSetLogLevel&>(args).i_level); + QPID_LOG (debug, "Broker::setLogLevel()"); + status = Manageable::STATUS_OK; + break; + case _qmf::Broker::METHOD_GETLOGLEVEL : + dynamic_cast<_qmf::ArgsBrokerGetLogLevel&>(args).o_level = getLogLevel(); + QPID_LOG (debug, "Broker::getLogLevel()"); + status = Manageable::STATUS_OK; + break; default: + QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]"); status = Manageable::STATUS_NOT_IMPLEMENTED; break; } @@ -429,6 +451,25 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, return status; } +void Broker::setLogLevel(const std::string& level) +{ + QPID_LOG(notice, "Changing log level to " << level); + std::vector<std::string> selectors; + split(selectors, level, ", "); + qpid::log::Logger::instance().reconfigure(selectors); +} + +std::string Broker::getLogLevel() +{ + std::string level; + const std::vector<std::string>& selectors = qpid::log::Logger::instance().getOptions().selectors; + for (std::vector<std::string>::const_iterator i = selectors.begin(); i != selectors.end(); ++i) { + if (i != selectors.begin()) level += std::string(","); + level += *i; + } + return level; +} + boost::shared_ptr<ProtocolFactory> Broker::getProtocolFactory(const std::string& name) const { ProtocolFactoryMap::const_iterator i = name.empty() ? protocolFactories.begin() : protocolFactories.find(name); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 4f089a1fca..32e2c8ab6b 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -145,6 +145,8 @@ public: void declareStandardExchange(const std::string& name, const std::string& type); void setStore (); + void setLogLevel(const std::string& level); + std::string getLogLevel(); boost::shared_ptr<sys::Poller> poller; sys::Timer timer; diff --git a/cpp/src/qpid/log/Logger.cpp b/cpp/src/qpid/log/Logger.cpp index 939e2502cc..2217cdddbd 100644 --- a/cpp/src/qpid/log/Logger.cpp +++ b/cpp/src/qpid/log/Logger.cpp @@ -146,6 +146,11 @@ void Logger::configure(const Options& opts) { options.sinkOptions->setup(this); } +void Logger::reconfigure(const std::vector<std::string>& selectors) { + options.selectors = selectors; + select(Selector(options)); +} + void Logger::setPrefix(const std::string& p) { prefix = p; } }} // namespace qpid::log diff --git a/cpp/src/qpid/sys/BlockingQueue.h b/cpp/src/qpid/sys/BlockingQueue.h index 210cb4ad82..ca6b529930 100644 --- a/cpp/src/qpid/sys/BlockingQueue.h +++ b/cpp/src/qpid/sys/BlockingQueue.h @@ -53,9 +53,13 @@ public: Waitable::ScopedWait w(waitable); if (timeout == TIME_INFINITE) { while (queue.empty()) waitable.wait(); - } else { + } else if (timeout) { AbsTime deadline(now(),timeout); while (queue.empty() && deadline > now()) waitable.wait(deadline); + } else { + //ensure zero timeout pop does not miss the fact that + //queue is closed + waitable.checkException(); } } if (queue.empty()) return false; diff --git a/cpp/src/qpid/sys/Waitable.h b/cpp/src/qpid/sys/Waitable.h index 7701b6f97d..8f6bd17049 100644 --- a/cpp/src/qpid/sys/Waitable.h +++ b/cpp/src/qpid/sys/Waitable.h @@ -79,6 +79,9 @@ class Waitable : public Monitor { /** True if the waitable has an exception */ bool hasException() const { return exception; } + /** Throws if the waitable has an exception */ + void checkException() const { exception.raise(); } + /** Clear the exception if any */ void resetException() { exception.reset(); } |
