summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-10-07 14:21:48 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-10-07 14:21:48 +0000
commit4fbbc6ecf68bd8f118f4a6165c8f5bfca2c3c8b6 (patch)
tree4a54f245efa1c2df1601d648c1fdd41fba08b802 /cpp/src/qpid/cluster
parent92d889931fe1cea19d1e33658d5f30348bd7070e (diff)
downloadqpid-python-4fbbc6ecf68bd8f118f4a6165c8f5bfca2c3c8b6.tar.gz
QPID-3346: move message group feature into trunk.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1180050 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp14
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp18
2 files changed, 16 insertions, 16 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 015301573e..394749aad2 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -406,11 +406,11 @@ void Connection::shadowSetUser(const std::string& userId) {
void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position)
{
- broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
- c.position = position;
- c.setBlocked(blocked);
- if (notifyEnabled) c.enableNotify(); else c.disableNotify();
- updateIn.consumerNumbering.add(c.shared_from_this());
+ broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name);
+ c->position = position;
+ c->setBlocked(blocked);
+ if (notifyEnabled) c->enableNotify(); else c->disableNotify();
+ updateIn.consumerNumbering.add(c);
}
@@ -444,7 +444,7 @@ void Connection::outputTask(uint16_t channel, const std::string& name) {
if (!session)
throw Exception(QPID_MSG(cluster << " channel not attached " << *this
<< "[" << channel << "] "));
- OutputTask* task = &session->getSemanticState().find(name);
+ OutputTask* task = session->getSemanticState().find(name).get();
connection->getOutputTasks().addOutputTask(task);
}
@@ -547,7 +547,7 @@ void Connection::deliveryRecord(const string& qname,
m.position = position;
if (enqueued) queue->updateEnqueued(m); //inform queue of the message
} else { // Message at original position in original queue
- m = queue->find(position);
+ queue->find(position, m);
}
// FIXME aconway 2011-08-19: removed:
// if (!m.payload)
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 2be1bf1f77..2446c12f2b 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -421,8 +421,8 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task);
SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci);
uint16_t channel = ci->getParent().getSession().getChannel();
- ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName());
- QPID_LOG(debug, *this << " updating output task " << ci->getName()
+ ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getTag());
+ QPID_LOG(debug, *this << " updating output task " << ci->getTag()
<< " channel=" << channel);
}
@@ -521,13 +521,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
void UpdateClient::updateConsumer(
const broker::SemanticState::ConsumerImpl::shared_ptr& ci)
{
- QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on "
+ QPID_LOG(debug, *this << " updating consumer " << ci->getTag() << " on "
<< shadowSession.getId());
using namespace message;
shadowSession.messageSubscribe(
arg::queue = ci->getQueue()->getName(),
- arg::destination = ci->getName(),
+ arg::destination = ci->getTag(),
arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE,
arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED,
arg::exclusive = ci->isExclusive(),
@@ -535,18 +535,18 @@ void UpdateClient::updateConsumer(
arg::resumeTtl = ci->getResumeTtl(),
arg::arguments = ci->getArguments()
);
- shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
- shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
- shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit());
+ shadowSession.messageSetFlowMode(ci->getTag(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
+ shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
+ shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_BYTE, ci->getByteCredit());
ClusterConnectionProxy(shadowSession).consumerState(
- ci->getName(),
+ ci->getTag(),
ci->isBlocked(),
ci->isNotifyEnabled(),
ci->position
);
consumerNumbering.add(ci.get());
- QPID_LOG(debug, *this << " updated consumer " << ci->getName()
+ QPID_LOG(debug, *this << " updated consumer " << ci->getTag()
<< " on " << shadowSession.getId());
}