summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp30
-rw-r--r--cpp/src/qpid/broker/SemanticState.h2
-rw-r--r--cpp/src/qpid/broker/SessionContext.h3
-rw-r--r--cpp/src/qpid/broker/SessionState.h2
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp18
-rw-r--r--cpp/src/qpid/client/SessionImpl.h4
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp9
-rw-r--r--cpp/src/qpid/cluster/UpdateExchange.h2
8 files changed, 51 insertions, 19 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index ed9b6653c3..7e3090bf17 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -302,6 +302,18 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
return !blocked;
}
+namespace {
+struct ConsumerName {
+ const SemanticState::ConsumerImpl& consumer;
+ ConsumerName(const SemanticState::ConsumerImpl& ci) : consumer(ci) {}
+};
+
+ostream& operator<<(ostream& o, const ConsumerName& pc) {
+ return o << pc.consumer.getName() << " on "
+ << pc.consumer.getParent().getSession().getSessionId();
+}
+}
+
void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
{
uint32_t originalMsgCredit = msgCredit;
@@ -312,7 +324,7 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
if (byteCredit != 0xFFFFFFFF) {
byteCredit -= msg->getRequiredCredit();
}
- QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent
+ QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
<< ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
<< " now bytes: " << byteCredit << " msgs: " << msgCredit);
@@ -320,15 +332,13 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
{
- if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
- QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent
- << ", bytes: " << byteCredit << " msgs: " << msgCredit);
- return false;
- } else {
- QPID_LOG(debug, "Credit available for '" << name << "' on " << parent
- << " bytes: " << byteCredit << " msgs: " << msgCredit);
- return true;
- }
+ bool enoughCredit = msgCredit > 0 &&
+ (byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit());
+ QPID_LOG(debug, (enoughCredit ? "Sufficient credit for " : "Insufficient credit for ")
+ << ConsumerName(*this)
+ << ", have bytes: " << byteCredit << " msgs: " << msgCredit
+ << ", need " << msg->getRequiredCredit() << " bytes");
+ return enoughCredit;
}
SemanticState::ConsumerImpl::~ConsumerImpl() {}
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index da8383fc12..89fe7b83dd 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -129,6 +129,7 @@ class SemanticState : private boost::noncopyable {
const framing::FieldTable& getArguments() const { return arguments; }
SemanticState& getParent() { return *parent; }
+ const SemanticState& getParent() const { return *parent; }
};
private:
@@ -163,6 +164,7 @@ class SemanticState : private boost::noncopyable {
~SemanticState();
SessionContext& getSession() { return session; }
+ const SessionContext& getSession() const { return session; }
ConsumerImpl& find(const std::string& destination);
diff --git a/cpp/src/qpid/broker/SessionContext.h b/cpp/src/qpid/broker/SessionContext.h
index cfdbd100c3..afbbb2cc22 100644
--- a/cpp/src/qpid/broker/SessionContext.h
+++ b/cpp/src/qpid/broker/SessionContext.h
@@ -28,7 +28,7 @@
#include "qpid/sys/OutputControl.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/OwnershipToken.h"
-
+#include "qpid/SessionId.h"
#include <boost/noncopyable.hpp>
@@ -45,6 +45,7 @@ class SessionContext : public OwnershipToken, public sys::OutputControl
virtual framing::AMQP_ClientProxy& getProxy() = 0;
virtual Broker& getBroker() = 0;
virtual uint16_t getChannel() const = 0;
+ virtual const SessionId& getSessionId() const = 0;
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index 67fd4f4f38..eade93ddaa 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -118,6 +118,8 @@ class SessionState : public qpid::SessionState,
bool processSendCredit(uint32_t msgs);
+ const SessionId& getSessionId() const { return getId(); }
+
private:
void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 8ead44a172..32541dceac 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -64,7 +64,8 @@ SessionImpl::SessionImpl(const std::string& name, boost::shared_ptr<ConnectionIm
proxy(ioHandler),
nextIn(0),
nextOut(0),
- sendMsgCredit(0)
+ sendMsgCredit(0),
+ doClearDeliveryPropertiesExchange(true)
{
channel.next = connectionShared.get();
}
@@ -396,11 +397,16 @@ void SessionImpl::sendContent(const MethodContent& content)
{
AMQFrame header(content.getHeader());
- // Client is not allowed to set the delivery-properties.exchange.
- AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody());
- if (headerp && headerp->get<DeliveryProperties>())
- headerp->get<DeliveryProperties>(true)->clearExchangeFlag();
-
+ // doClearDeliveryPropertiesExchange is set by cluster update client so
+ // it can send messages with delivery-properties.exchange set.
+ //
+ if (doClearDeliveryPropertiesExchange) {
+ // Normal client is not allowed to set the delivery-properties.exchange
+ // so clear it here.
+ AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody());
+ if (headerp && headerp->get<DeliveryProperties>())
+ headerp->get<DeliveryProperties>(true)->clearExchangeFlag();
+ }
header.setFirstSegment(false);
uint64_t data_length = content.getData().length();
if(data_length > 0){
diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h
index 49d268c44d..0624bb8b3c 100644
--- a/cpp/src/qpid/client/SessionImpl.h
+++ b/cpp/src/qpid/client/SessionImpl.h
@@ -130,6 +130,8 @@ public:
*/
boost::shared_ptr<ConnectionImpl> getConnection();
+ void setDoClearDeliveryPropertiesExchange(bool b=true) { doClearDeliveryPropertiesExchange = b; }
+
private:
enum State {
INACTIVE,
@@ -243,6 +245,8 @@ private:
// Only keep track of message credit
sys::Semaphore* sendMsgCredit;
+ bool doClearDeliveryPropertiesExchange;
+
friend class client::SessionHandler;
};
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 2e557f2ab6..d6df8bd5ac 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -209,9 +209,16 @@ class MessageUpdater {
ClusterConnectionProxy(session).expiryId(*expiryId);
}
+ // We can't send a broker::Message via the normal client API,
+ // and it would be expensive to copy it into a client::Message
+ // so we go a bit under the client API covers here.
+ //
SessionBase_0_10Access sb(session);
+ // Disable client code that clears the delivery-properties.exchange
+ sb.get()->setDoClearDeliveryPropertiesExchange(false);
framing::MessageTransferBody transfer(
- framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
+ framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE,
+ message::ACQUIRE_MODE_PRE_ACQUIRED);
sb.get()->send(transfer, message.payload->getFrames(), !message.payload->isContentReleased());
if (message.payload->isContentReleased()){
diff --git a/cpp/src/qpid/cluster/UpdateExchange.h b/cpp/src/qpid/cluster/UpdateExchange.h
index 194a3d386d..00a92c7f1e 100644
--- a/cpp/src/qpid/cluster/UpdateExchange.h
+++ b/cpp/src/qpid/cluster/UpdateExchange.h
@@ -30,7 +30,7 @@ namespace qpid {
namespace cluster {
/**
- * A keyless exchange (like fanout exchange) that does not modify deliver-properties.exchange
+ * A keyless exchange (like fanout exchange) that does not modify delivery-properties.exchange
* on messages.
*/
class UpdateExchange : public broker::FanOutExchange