summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-03-03 19:19:00 +0000
committerGordon Sim <gsim@apache.org>2008-03-03 19:19:00 +0000
commit1f0b710ec149075c369624fa140a2af550ec0a5f (patch)
treeac1d73e9e82d11ccaf8e6061431ecc311564ab9a /cpp/src/qpid
parenta711889b0b3c16d7bffe008ece53cd41d5069909 (diff)
downloadqpid-python-1f0b710ec149075c369624fa140a2af550ec0a5f.tar.gz
Updated tracking of outgoing command id and send command-point control on session attachment.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@633241 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp5
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp8
-rw-r--r--cpp/src/qpid/broker/SessionState.h4
3 files changed, 9 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index de96ae3f12..919a3e6ee8 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -126,6 +126,7 @@ void SessionHandler::attach(const std::string& name, bool /*force*/)
connection.broker.getSessionManager().open(*this, 0));
session.reset(state.release());
peerSession.attached(name);
+ peerSession.commandPoint(session->nextOut, 0);
}
void SessionHandler::attached(const std::string& /*name*/)
@@ -171,7 +172,7 @@ void SessionHandler::commandPoint(const framing::SequenceNumber& id, uint64_t of
{
if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point");
- session->next = id;
+ session->nextIn = id;
}
void SessionHandler::expected(const framing::SequenceSet& commands, const framing::Array& fragments)
@@ -203,7 +204,7 @@ void SessionHandler::knownCompleted(const framing::SequenceSet& commands)
void SessionHandler::flush(bool expected, bool confirmed, bool completed)
{
if (expected) {
- peerSession.expected(SequenceSet(session->next), Array());
+ peerSession.expected(SequenceSet(session->nextIn), Array());
}
if (confirmed) {
peerSession.confirmed(session->completed, Array());
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 5f04136444..571b8848ae 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -170,7 +170,7 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
void SessionState::handleCommand(framing::AMQMethodBody* method)
{
- SequenceNumber id = next++;
+ SequenceNumber id = nextIn++;
Invoker::Result invocation = invoke(adapter, *method);
completed.add(id);
@@ -189,7 +189,7 @@ void SessionState::handleContent(AMQFrame& frame)
{
intrusive_ptr<Message> msg(msgBuilder.getMessage());
if (!msg) {//start of frameset will be indicated by frame flags
- SequenceNumber id = next++;
+ SequenceNumber id = nextIn++;
msgBuilder.start(id);
msg = msgBuilder.getMessage();
}
@@ -225,8 +225,8 @@ void SessionState::handle(AMQFrame& frame)
DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
{
uint32_t maxFrameSize = getConnection().getFrameMax();
- MessageDelivery::deliver(msg, getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize);
- return outgoing.hwm;
+ MessageDelivery::deliver(msg, getProxy().getHandler(), nextOut, token, maxFrameSize);
+ return nextOut++;
}
void SessionState::sendCompletion()
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index fa6bd14ef3..2db7d688b7 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -116,7 +116,8 @@ class SessionState : public framing::SessionState,
framing::SequenceSet completed;
framing::SequenceSet knownCompleted;
- framing::SequenceNumber next;
+ framing::SequenceNumber nextIn;
+ framing::SequenceNumber nextOut;
private:
typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
@@ -134,7 +135,6 @@ class SessionState : public framing::SessionState,
BrokerAdapter adapter;
MessageBuilder msgBuilder;
- framing::Window outgoing;
RangedOperation ackOp;
management::Session::shared_ptr mgmtObject;