diff options
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ConnectionHandler.cpp | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ConnectionHandler.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 37 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 15 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 179 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionHandler.h | 51 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 49 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 14 |
9 files changed, 180 insertions, 182 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index ef2c51bb8d..5237087dc8 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -85,6 +85,8 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations ExecutionHandler* getExecutionHandler() { BADHANDLER(); } ConnectionHandler* getConnectionHandler() { BADHANDLER(); } SessionHandler* getSessionHandler() { BADHANDLER(); } + Connection010Handler* getConnection010Handler() { BADHANDLER(); } + Session010Handler* getSession010Handler() { BADHANDLER(); } #undef BADHANDLER private: diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index e296d52214..126e1b2723 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -23,6 +23,7 @@ #include "ConnectionHandler.h" #include "Connection.h" #include "qpid/framing/ConnectionStartBody.h" +#include "qpid/framing/Connection010StartBody.h" #include "qpid/framing/ClientInvoker.h" #include "qpid/framing/ServerInvoker.h" @@ -38,11 +39,14 @@ const std::string en_US = "en_US"; } void ConnectionHandler::init(const framing::ProtocolInitiation& header) { + //need to send out a protocol header back to the client + handler->connection.getOutput().initiated(header); + FieldTable properties; string mechanisms(PLAIN); string locales(en_US); - handler->serverMode = true; - handler->client.start(header.getMajor(), header.getMinor(), properties, mechanisms, locales); + handler->serverMode = true; + handler->client.start(properties, mechanisms, locales); } void ConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId) @@ -55,7 +59,7 @@ void ConnectionHandler::handle(framing::AMQFrame& frame) AMQMethodBody* method=frame.getBody()->getMethod(); try{ if (handler->serverMode) { - if (!invoke(static_cast<AMQP_ServerOperations::ConnectionHandler&>(*handler.get()), *method)) + if (!invoke(static_cast<AMQP_ServerOperations::Connection010Handler&>(*handler.get()), *method)) throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0")); } else { if (!invoke(static_cast<AMQP_ClientOperations::ConnectionHandler&>(*handler.get()), *method)) diff --git a/cpp/src/qpid/broker/ConnectionHandler.h b/cpp/src/qpid/broker/ConnectionHandler.h index 2a581d5675..44e2ce05fa 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.h +++ b/cpp/src/qpid/broker/ConnectionHandler.h @@ -41,10 +41,10 @@ class Connection; // TODO aconway 2007-09-18: Rename to ConnectionHandler class ConnectionHandler : public framing::FrameHandler { - struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler, + struct Handler : public framing::AMQP_ServerOperations::Connection010Handler, public framing::AMQP_ClientOperations::ConnectionHandler { - framing::AMQP_ClientProxy::Connection client; + framing::AMQP_ClientProxy::Connection010 client; framing::AMQP_ServerProxy::Connection server; Connection& connection; bool serverMode; @@ -55,6 +55,7 @@ class ConnectionHandler : public framing::FrameHandler const std::string& locale); void secureOk(const std::string& response); void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat); + void heartbeat() {} void open(const std::string& virtualHost, const std::string& capabilities, bool insist); void close(uint16_t replyCode, const std::string& replyText, diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 9b44f31e14..e012d693fb 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -387,7 +387,7 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) ++end; } - for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1)); + for_each(start, end, boost::bind(&SemanticState::adjustFlow, this, _1)); if (txBuffer.get()) { //in transactional mode, don't dequeue or remove, just @@ -427,16 +427,16 @@ void SemanticState::requestDispatch(ConsumerImpl& c) } } -void SemanticState::acknowledged(const DeliveryRecord& delivery) +void SemanticState::adjustFlow(const DeliveryRecord& delivery) { delivery.subtractFrom(outstanding); ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { - get_pointer(i)->acknowledged(delivery); + get_pointer(i)->adjustFlow(delivery); } } -void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery) +void SemanticState::ConsumerImpl::adjustFlow(const DeliveryRecord& delivery) { if (windowing) { if (msgCredit != 0xFFFFFFFF) msgCredit++; @@ -639,4 +639,33 @@ void SemanticState::ConsumerImpl::notify() parent->outputTasks.activateOutput(); } + +void SemanticState::accepted(DeliveryId first, DeliveryId last) +{ + AckRange range = findRange(first, last); + if (txBuffer.get()) { + //in transactional mode, don't dequeue or remove, just + //maintain set of acknowledged messages: + accumulatedAck.update(first, last);//TODO convert accumulatedAck to SequenceSet + + if (dtxBuffer.get()) { + //if enlisted in a dtx, remove the relevant slice from + //unacked and record it against that transaction + TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); + accumulatedAck.clear(); + dtxBuffer->enlist(txAck); + } + } else { + for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0)); + unacked.erase(range.start, range.end); + } +} + +void SemanticState::completed(DeliveryId first, DeliveryId last) +{ + AckRange range = findRange(first, last); + for_each(range.start, range.end, boost::bind(&SemanticState::adjustFlow, this, _1)); + requestDispatch(); +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index cc9c0e1e9b..88a2fcab5c 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -88,7 +88,7 @@ class SemanticState : public framing::FrameHandler::Chains, void addMessageCredit(uint32_t value); void flush(); void stop(); - void acknowledged(const DeliveryRecord&); + void adjustFlow(const DeliveryRecord&); Queue::shared_ptr getQueue() { return queue; } bool isBlocked() const { return blocked; } @@ -122,7 +122,7 @@ class SemanticState : public framing::FrameHandler::Chains, void checkDtxTimeout(); ConsumerImpl& find(const std::string& destination); void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative); - void acknowledged(const DeliveryRecord&); + void adjustFlow(const DeliveryRecord&); AckRange findRange(DeliveryId first, DeliveryId last); void requestDispatch(); void requestDispatch(ConsumerImpl&); @@ -171,8 +171,6 @@ class SemanticState : public framing::FrameHandler::Chains, void endDtx(const std::string& xid, bool fail); void suspendDtx(const std::string& xid); void resumeDtx(const std::string& xid); - void ackCumulative(DeliveryId deliveryTag); - void ackRange(DeliveryId deliveryTag, DeliveryId endTag); void recover(bool requeue); void flow(bool active); DeliveryId redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token); @@ -180,8 +178,15 @@ class SemanticState : public framing::FrameHandler::Chains, void release(DeliveryId first, DeliveryId last); void reject(DeliveryId first, DeliveryId last); void handle(intrusive_ptr<Message> msg); - bool doOutput() { return outputTasks.doOutput(); } + + //preview only (completed == ack): + void ackCumulative(DeliveryId deliveryTag); + void ackRange(DeliveryId deliveryTag, DeliveryId endTag); + + //final 0-10 spec (completed and accepted are distinct): + void completed(DeliveryId deliveryTag, DeliveryId endTag); + void accepted(DeliveryId deliveryTag, DeliveryId endTag); }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 0e3c9928d1..de96ae3f12 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -60,17 +60,10 @@ void SessionHandler::handleIn(AMQFrame& f) { AMQMethodBody* m = f.getBody()->getMethod(); try { if (!ignoring) { - if (m && - (invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m) || - invoke(static_cast<AMQP_ServerOperations::ExecutionHandler&>(*this), *m))) { + if (m && invoke(static_cast<AMQP_ServerOperations::Session010Handler&>(*this), *m)) { return; } else if (session.get()) { - boost::optional<SequenceNumber> ack=session->received(f); session->handle(f); - if (ack) - peerSession.ack(*ack, SequenceNumberSet()); - } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) { - return; } else { throw ChannelErrorException( QPID_MSG("Channel " << channel.get() << " is not open")); @@ -80,7 +73,8 @@ void SessionHandler::handleIn(AMQFrame& f) { ignoring=true; // Ignore trailing frames sent by client. session->detach(); session.reset(); - peerSession.closed(e.code, e.what()); + //TODO: implement new exception handling mechanism + //peerSession.closed(e.code, e.what()); }catch(const ConnectionException& e){ connection.close(e.code, e.what(), classId(m), methodId(m)); }catch(const std::exception& e){ @@ -92,7 +86,7 @@ void SessionHandler::handleIn(AMQFrame& f) { void SessionHandler::handleOut(AMQFrame& f) { channel.handle(f); // Send it. if (session->sent(f)) - peerSession.solicitAck(); + peerSession.flush(false, false, true); } void SessionHandler::assertAttached(const char* method) const { @@ -111,136 +105,123 @@ void SessionHandler::assertClosed(const char* method) const { << " is already open.")); } -void SessionHandler::open(uint32_t detachedLifetime) { - assertClosed("open"); - std::auto_ptr<SessionState> state( - connection.broker.getSessionManager().open(*this, detachedLifetime)); - session.reset(state.release()); - peerSession.attached(session->getId(), session->getTimeout()); +void SessionHandler::localSuspend() { + if (session.get() && session->isAttached()) { + session->detach(); + connection.broker.getSessionManager().suspend(session); + session.reset(); + } } -void SessionHandler::resume(const Uuid& id) { - assertClosed("resume"); - session = connection.broker.getSessionManager().resume(id); - session->attach(*this); - SequenceNumber seq = session->resuming(); - peerSession.attached(session->getId(), session->getTimeout()); - proxy.getSession().ack(seq, SequenceNumberSet()); -} -void SessionHandler::flow(bool /*active*/) { - assertAttached("flow"); - // TODO aconway 2007-09-19: Removed in 0-10, remove - assert(0); throw NotImplementedException("session.flow"); +ConnectionState& SessionHandler::getConnection() { return connection; } +const ConnectionState& SessionHandler::getConnection() const { return connection; } + +//new methods: +void SessionHandler::attach(const std::string& name, bool /*force*/) +{ + //TODO: need to revise session manager to support resume as well + assertClosed("attach"); + std::auto_ptr<SessionState> state( + connection.broker.getSessionManager().open(*this, 0)); + session.reset(state.release()); + peerSession.attached(name); } -void SessionHandler::flowOk(bool /*active*/) { - assertAttached("flowOk"); - // TODO aconway 2007-09-19: Removed in 0-10, remove - assert(0); throw NotImplementedException("session.flowOk"); +void SessionHandler::attached(const std::string& /*name*/) +{ + std::auto_ptr<SessionState> state(connection.broker.getSessionManager().open(*this, 0)); + session.reset(state.release()); } -void SessionHandler::close() { - assertAttached("close"); - QPID_LOG(info, "Received session.close"); - ignoring=false; - session->detach(); - session.reset(); - peerSession.closed(REPLY_SUCCESS, "ok"); +void SessionHandler::detach(const std::string& name) +{ + assertAttached("detach"); + localSuspend(); + peerSession.detached(name, 0); assert(&connection.getChannel(channel.get()) == this); connection.closeChannel(channel.get()); } -void SessionHandler::closed(uint16_t replyCode, const string& replyText) { - QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText); +void SessionHandler::detached(const std::string& name, uint8_t code) +{ ignoring=false; session->detach(); session.reset(); -} - -void SessionHandler::localSuspend() { - if (session.get() && session->isAttached()) { - session->detach(); - connection.broker.getSessionManager().suspend(session); - session.reset(); + if (code) { + //no error + } else { + //error occured + QPID_LOG(warning, "Received session.closed: "<< name << " " << code); } } -void SessionHandler::suspend() { - assertAttached("suspend"); - localSuspend(); - peerSession.detached(); - assert(&connection.getChannel(channel.get()) == this); - connection.closeChannel(channel.get()); -} - -void SessionHandler::ack(uint32_t cumulativeSeenMark, - const SequenceNumberSet& /*seenFrameSet*/) +void SessionHandler::requestTimeout(uint32_t t) { - assertAttached("ack"); - if (session->getState() == SessionState::RESUMING) { - session->receivedAck(cumulativeSeenMark); - framing::SessionState::Replay replay=session->replay(); - std::for_each(replay.begin(), replay.end(), - boost::bind(&SessionHandler::handleOut, this, _1)); - } - else - session->receivedAck(cumulativeSeenMark); + session->setTimeout(t); + //proxy.timeout(t); } -void SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) { - // TODO aconway 2007-10-02: may be removed from spec. - assert(0); throw NotImplementedException("session.high-water-mark"); +void SessionHandler::timeout(uint32_t) +{ + //not sure what we need to do on the server for this... } -void SessionHandler::solicitAck() { - assertAttached("solicit-ack"); - peerSession.ack(session->sendingAck(), SequenceNumberSet()); +void SessionHandler::commandPoint(const framing::SequenceNumber& id, uint64_t offset) +{ + if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point"); + + session->next = id; } -void SessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime) +void SessionHandler::expected(const framing::SequenceSet& commands, const framing::Array& fragments) { - std::auto_ptr<SessionState> state( - connection.broker.getSessionManager().open(*this, detachedLifetime)); - session.reset(state.release()); + if (!commands.empty() || fragments.size()) { + throw NotImplementedException("Session resumption not yet supported"); + } } -void SessionHandler::detached() +void SessionHandler::confirmed(const framing::SequenceSet& /*commands*/, const framing::Array& /*fragments*/) { - connection.broker.getSessionManager().suspend(session); - session.reset(); + //don't really care too much about this yet } - -ConnectionState& SessionHandler::getConnection() { return connection; } -const ConnectionState& SessionHandler::getConnection() const { return connection; } - -void SessionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) +void SessionHandler::completed(const framing::SequenceSet& commands, bool timelyReply) { - assertAttached("complete"); - session->complete(cumulative, range); + session->complete(commands); + if (timelyReply) { + peerSession.knownCompleted(session->knownCompleted); + session->knownCompleted.clear(); + } } -void SessionHandler::flush() +void SessionHandler::knownCompleted(const framing::SequenceSet& commands) { - assertAttached("flush"); - session->flush(); + session->completed.remove(commands); } -void SessionHandler::sync() + +void SessionHandler::flush(bool expected, bool confirmed, bool completed) { - assertAttached("sync"); - session->sync(); + if (expected) { + peerSession.expected(SequenceSet(session->next), Array()); + } + if (confirmed) { + peerSession.confirmed(session->completed, Array()); + } + if (completed) { + peerSession.completed(session->completed, true); + } } -void SessionHandler::noop() + +void SessionHandler::sendCompletion() { - assertAttached("noop"); - session->noop(); + peerSession.completed(session->completed, true); } -void SessionHandler::result(uint32_t /*command*/, const std::string& /*data*/) +void SessionHandler::gap(const framing::SequenceSet& /*commands*/) { - //never actually sent by client at present + throw NotImplementedException("gap not yet supported"); } - + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index e6bc463a82..4b031f2951 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -27,8 +27,10 @@ #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/amqp_types.h" +#include "qpid/framing/Array.h" #include "qpid/framing/ChannelHandler.h" #include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/SequenceSet.h" #include <boost/noncopyable.hpp> @@ -44,9 +46,7 @@ class SessionState; * receives incoming frames, handles session controls and manages the * association between the channel and a session. */ -class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, - public framing::AMQP_ClientOperations::SessionHandler, - public framing::AMQP_ServerOperations::ExecutionHandler, +class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler, public framing::FrameHandler::InOutHandler, private boost::noncopyable { @@ -69,35 +69,32 @@ class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, // Called by closing connection. void localSuspend(); void detach() { localSuspend(); } + void sendCompletion(); protected: void handleIn(framing::AMQFrame&); void handleOut(framing::AMQFrame&); private: - /// Session methods - void open(uint32_t detachedLifetime); - void flow(bool active); - void flowOk(bool active); - void close(); - void closed(uint16_t replyCode, const std::string& replyText); - void resume(const framing::Uuid& sessionId); - void suspend(); - void ack(uint32_t cumulativeSeenMark, - const framing::SequenceNumberSet& seenFrameSet); - void highWaterMark(uint32_t lastSentMark); - void solicitAck(); - - //extra methods required for assuming client role - void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime); - void detached(); - - //Execution methods: - void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range); - void flush(); - void noop(); - void result(uint32_t command, const std::string& data); - void sync(); + //new methods: + void attach(const std::string& name, bool force); + void attached(const std::string& name); + void detach(const std::string& name); + void detached(const std::string& name, uint8_t code); + + void requestTimeout(uint32_t t); + void timeout(uint32_t t); + + void commandPoint(const framing::SequenceNumber& id, uint64_t offset); + void expected(const framing::SequenceSet& commands, const framing::Array& fragments); + void confirmed(const framing::SequenceSet& commands,const framing::Array& fragments); + void completed(const framing::SequenceSet& commands, bool timelyReply); + void knownCompleted(const framing::SequenceSet& commands); + void flush(bool expected, bool confirmed, bool completed); + void gap(const framing::SequenceSet& commands); + + //hacks for old generator: + void commandPoint(uint32_t id, uint64_t offset) { commandPoint(framing::SequenceNumber(id), offset); } void assertAttached(const char* method) const; void assertActive(const char* method) const; @@ -106,7 +103,7 @@ class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, Connection& connection; framing::ChannelHandler channel; framing::AMQP_ClientProxy proxy; - framing::AMQP_ClientProxy::Session peerSession; + framing::AMQP_ClientProxy::Session010 peerSession; bool ignoring; std::auto_ptr<SessionState> session; }; diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 573a567da6..5f04136444 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -49,7 +49,7 @@ SessionState::SessionState( semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), - ackOp(boost::bind(&SemanticState::ackRange, &semanticState, _1, _2)) + ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)) { getConnection().outputTasks.addOutputTask(&semanticState); @@ -170,9 +170,9 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, void SessionState::handleCommand(framing::AMQMethodBody* method) { - SequenceNumber id = incoming.next(); + SequenceNumber id = next++; Invoker::Result invocation = invoke(adapter, *method); - incoming.complete(id); + completed.add(id); if (!invocation.wasHandled()) { throw NotImplementedException("Not implemented"); @@ -180,7 +180,6 @@ void SessionState::handleCommand(framing::AMQMethodBody* method) getProxy().getExecution().result(id.getValue(), invocation.getResult()); } if (method->isSync()) { - incoming.sync(id); sendCompletion(); } //TODO: if window gets too large send unsolicited completion @@ -190,7 +189,8 @@ void SessionState::handleContent(AMQFrame& frame) { intrusive_ptr<Message> msg(msgBuilder.getMessage()); if (!msg) {//start of frameset will be indicated by frame flags - msgBuilder.start(incoming.next()); + SequenceNumber id = next++; + msgBuilder.start(id); msg = msgBuilder.getMessage(); } msgBuilder.handle(frame); @@ -198,9 +198,9 @@ void SessionState::handleContent(AMQFrame& frame) msg->setPublisher(&getConnection()); semanticState.handle(msg); msgBuilder.end(); - incoming.track(msg); + //TODO: may want to hold up execution until async enqueue is complete + completed.add(msg->getCommandId()); if (msg->getFrames().getMethod()->isSync()) { - incoming.sync(msg->getCommandId()); sendCompletion(); } } @@ -208,6 +208,8 @@ void SessionState::handleContent(AMQFrame& frame) void SessionState::handle(AMQFrame& frame) { + received(frame); + //TODO: make command handling more uniform, regardless of whether //commands carry content. (For now, assume all single frame //assmblies are non-content bearing and all content-bearing @@ -229,38 +231,13 @@ DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr t void SessionState::sendCompletion() { - SequenceNumber mark = incoming.getMark(); - SequenceNumberSet range = incoming.getRange(); - getProxy().getExecution().complete(mark.getValue(), range); -} - -void SessionState::complete(uint32_t cumulative, const SequenceNumberSet& range) -{ - //record: - SequenceNumber mark(cumulative); - if (outgoing.lwm < mark) { - outgoing.lwm = mark; - //ack messages: - semanticState.ackCumulative(mark.getValue()); - } - range.processRanges(ackOp); -} - -void SessionState::flush() -{ - incoming.flush(); - sendCompletion(); -} - -void SessionState::sync() -{ - incoming.sync(); - sendCompletion(); + handler->sendCompletion(); } -void SessionState::noop() +void SessionState::complete(const SequenceSet& commands) { - incoming.noop(); + knownCompleted.add(commands); + commands.for_each(ackOp); } diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 98c21a8ab5..fa6bd14ef3 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -25,6 +25,7 @@ #include "qpid/framing/Uuid.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/SessionState.h" +#include "qpid/framing/SequenceSet.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Time.h" @@ -83,6 +84,8 @@ class SessionState : public framing::SessionState, ConnectionState& getConnection(); uint32_t getTimeout() const { return timeout; } + void setTimeout(uint32_t t) { timeout = t; } + Broker& getBroker() { return broker; } framing::ProtocolVersion getVersion() const { return version; } @@ -93,10 +96,7 @@ class SessionState : public framing::SessionState, void handleCommand(framing::AMQMethodBody* method); void handleContent(framing::AMQFrame& frame); - void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range); - void flush(); - void noop(); - void sync(); + void complete(const framing::SequenceSet& ranges); void sendCompletion(); //delivery adapter methods: @@ -114,6 +114,10 @@ class SessionState : public framing::SessionState, uint32_t ackInterval); + framing::SequenceSet completed; + framing::SequenceSet knownCompleted; + framing::SequenceNumber next; + private: typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; @@ -130,8 +134,6 @@ class SessionState : public framing::SessionState, BrokerAdapter adapter; MessageBuilder msgBuilder; - //execution state - IncomingExecutionContext incoming; framing::Window outgoing; RangedOperation ackOp; |
