diff options
| author | Gordon Sim <gsim@apache.org> | 2008-03-03 19:19:00 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-03-03 19:19:00 +0000 |
| commit | 1f0b710ec149075c369624fa140a2af550ec0a5f (patch) | |
| tree | ac1d73e9e82d11ccaf8e6061431ecc311564ab9a /cpp/src/qpid | |
| parent | a711889b0b3c16d7bffe008ece53cd41d5069909 (diff) | |
| download | qpid-python-1f0b710ec149075c369624fa140a2af550ec0a5f.tar.gz | |
Updated tracking of outgoing command id and send command-point control on session attachment.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@633241 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 4 |
3 files changed, 9 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index de96ae3f12..919a3e6ee8 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -126,6 +126,7 @@ void SessionHandler::attach(const std::string& name, bool /*force*/) connection.broker.getSessionManager().open(*this, 0)); session.reset(state.release()); peerSession.attached(name); + peerSession.commandPoint(session->nextOut, 0); } void SessionHandler::attached(const std::string& /*name*/) @@ -171,7 +172,7 @@ void SessionHandler::commandPoint(const framing::SequenceNumber& id, uint64_t of { if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point"); - session->next = id; + session->nextIn = id; } void SessionHandler::expected(const framing::SequenceSet& commands, const framing::Array& fragments) @@ -203,7 +204,7 @@ void SessionHandler::knownCompleted(const framing::SequenceSet& commands) void SessionHandler::flush(bool expected, bool confirmed, bool completed) { if (expected) { - peerSession.expected(SequenceSet(session->next), Array()); + peerSession.expected(SequenceSet(session->nextIn), Array()); } if (confirmed) { peerSession.confirmed(session->completed, Array()); diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 5f04136444..571b8848ae 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -170,7 +170,7 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, void SessionState::handleCommand(framing::AMQMethodBody* method) { - SequenceNumber id = next++; + SequenceNumber id = nextIn++; Invoker::Result invocation = invoke(adapter, *method); completed.add(id); @@ -189,7 +189,7 @@ void SessionState::handleContent(AMQFrame& frame) { intrusive_ptr<Message> msg(msgBuilder.getMessage()); if (!msg) {//start of frameset will be indicated by frame flags - SequenceNumber id = next++; + SequenceNumber id = nextIn++; msgBuilder.start(id); msg = msgBuilder.getMessage(); } @@ -225,8 +225,8 @@ void SessionState::handle(AMQFrame& frame) DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) { uint32_t maxFrameSize = getConnection().getFrameMax(); - MessageDelivery::deliver(msg, getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize); - return outgoing.hwm; + MessageDelivery::deliver(msg, getProxy().getHandler(), nextOut, token, maxFrameSize); + return nextOut++; } void SessionState::sendCompletion() diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index fa6bd14ef3..2db7d688b7 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -116,7 +116,8 @@ class SessionState : public framing::SessionState, framing::SequenceSet completed; framing::SequenceSet knownCompleted; - framing::SequenceNumber next; + framing::SequenceNumber nextIn; + framing::SequenceNumber nextOut; private: typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; @@ -134,7 +135,6 @@ class SessionState : public framing::SessionState, BrokerAdapter adapter; MessageBuilder msgBuilder; - framing::Window outgoing; RangedOperation ackOp; management::Session::shared_ptr mgmtObject; |
