diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 30 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionContext.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 18 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionImpl.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateExchange.h | 2 |
8 files changed, 51 insertions, 19 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index ed9b6653c3..7e3090bf17 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -302,6 +302,18 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) return !blocked; } +namespace { +struct ConsumerName { + const SemanticState::ConsumerImpl& consumer; + ConsumerName(const SemanticState::ConsumerImpl& ci) : consumer(ci) {} +}; + +ostream& operator<<(ostream& o, const ConsumerName& pc) { + return o << pc.consumer.getName() << " on " + << pc.consumer.getParent().getSession().getSessionId(); +} +} + void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) { uint32_t originalMsgCredit = msgCredit; @@ -312,7 +324,7 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) if (byteCredit != 0xFFFFFFFF) { byteCredit -= msg->getRequiredCredit(); } - QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent + QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this) << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit << " now bytes: " << byteCredit << " msgs: " << msgCredit); @@ -320,15 +332,13 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) { - if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) { - QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent - << ", bytes: " << byteCredit << " msgs: " << msgCredit); - return false; - } else { - QPID_LOG(debug, "Credit available for '" << name << "' on " << parent - << " bytes: " << byteCredit << " msgs: " << msgCredit); - return true; - } + bool enoughCredit = msgCredit > 0 && + (byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit()); + QPID_LOG(debug, (enoughCredit ? "Sufficient credit for " : "Insufficient credit for ") + << ConsumerName(*this) + << ", have bytes: " << byteCredit << " msgs: " << msgCredit + << ", need " << msg->getRequiredCredit() << " bytes"); + return enoughCredit; } SemanticState::ConsumerImpl::~ConsumerImpl() {} diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index da8383fc12..89fe7b83dd 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -129,6 +129,7 @@ class SemanticState : private boost::noncopyable { const framing::FieldTable& getArguments() const { return arguments; } SemanticState& getParent() { return *parent; } + const SemanticState& getParent() const { return *parent; } }; private: @@ -163,6 +164,7 @@ class SemanticState : private boost::noncopyable { ~SemanticState(); SessionContext& getSession() { return session; } + const SessionContext& getSession() const { return session; } ConsumerImpl& find(const std::string& destination); diff --git a/cpp/src/qpid/broker/SessionContext.h b/cpp/src/qpid/broker/SessionContext.h index cfdbd100c3..afbbb2cc22 100644 --- a/cpp/src/qpid/broker/SessionContext.h +++ b/cpp/src/qpid/broker/SessionContext.h @@ -28,7 +28,7 @@ #include "qpid/sys/OutputControl.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/OwnershipToken.h" - +#include "qpid/SessionId.h" #include <boost/noncopyable.hpp> @@ -45,6 +45,7 @@ class SessionContext : public OwnershipToken, public sys::OutputControl virtual framing::AMQP_ClientProxy& getProxy() = 0; virtual Broker& getBroker() = 0; virtual uint16_t getChannel() const = 0; + virtual const SessionId& getSessionId() const = 0; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 67fd4f4f38..eade93ddaa 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -118,6 +118,8 @@ class SessionState : public qpid::SessionState, bool processSendCredit(uint32_t msgs); + const SessionId& getSessionId() const { return getId(); } + private: void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 8ead44a172..32541dceac 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -64,7 +64,8 @@ SessionImpl::SessionImpl(const std::string& name, boost::shared_ptr<ConnectionIm proxy(ioHandler), nextIn(0), nextOut(0), - sendMsgCredit(0) + sendMsgCredit(0), + doClearDeliveryPropertiesExchange(true) { channel.next = connectionShared.get(); } @@ -396,11 +397,16 @@ void SessionImpl::sendContent(const MethodContent& content) { AMQFrame header(content.getHeader()); - // Client is not allowed to set the delivery-properties.exchange. - AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody()); - if (headerp && headerp->get<DeliveryProperties>()) - headerp->get<DeliveryProperties>(true)->clearExchangeFlag(); - + // doClearDeliveryPropertiesExchange is set by cluster update client so + // it can send messages with delivery-properties.exchange set. + // + if (doClearDeliveryPropertiesExchange) { + // Normal client is not allowed to set the delivery-properties.exchange + // so clear it here. + AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody()); + if (headerp && headerp->get<DeliveryProperties>()) + headerp->get<DeliveryProperties>(true)->clearExchangeFlag(); + } header.setFirstSegment(false); uint64_t data_length = content.getData().length(); if(data_length > 0){ diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h index 49d268c44d..0624bb8b3c 100644 --- a/cpp/src/qpid/client/SessionImpl.h +++ b/cpp/src/qpid/client/SessionImpl.h @@ -130,6 +130,8 @@ public: */ boost::shared_ptr<ConnectionImpl> getConnection(); + void setDoClearDeliveryPropertiesExchange(bool b=true) { doClearDeliveryPropertiesExchange = b; } + private: enum State { INACTIVE, @@ -243,6 +245,8 @@ private: // Only keep track of message credit sys::Semaphore* sendMsgCredit; + bool doClearDeliveryPropertiesExchange; + friend class client::SessionHandler; }; diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 2e557f2ab6..d6df8bd5ac 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -209,9 +209,16 @@ class MessageUpdater { ClusterConnectionProxy(session).expiryId(*expiryId); } + // We can't send a broker::Message via the normal client API, + // and it would be expensive to copy it into a client::Message + // so we go a bit under the client API covers here. + // SessionBase_0_10Access sb(session); + // Disable client code that clears the delivery-properties.exchange + sb.get()->setDoClearDeliveryPropertiesExchange(false); framing::MessageTransferBody transfer( - framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); + framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, + message::ACQUIRE_MODE_PRE_ACQUIRED); sb.get()->send(transfer, message.payload->getFrames(), !message.payload->isContentReleased()); if (message.payload->isContentReleased()){ diff --git a/cpp/src/qpid/cluster/UpdateExchange.h b/cpp/src/qpid/cluster/UpdateExchange.h index 194a3d386d..00a92c7f1e 100644 --- a/cpp/src/qpid/cluster/UpdateExchange.h +++ b/cpp/src/qpid/cluster/UpdateExchange.h @@ -30,7 +30,7 @@ namespace qpid { namespace cluster { /** - * A keyless exchange (like fanout exchange) that does not modify deliver-properties.exchange + * A keyless exchange (like fanout exchange) that does not modify delivery-properties.exchange * on messages. */ class UpdateExchange : public broker::FanOutExchange |
