summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h2
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp10
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.h5
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp37
-rw-r--r--cpp/src/qpid/broker/SemanticState.h15
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp179
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h51
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp49
-rw-r--r--cpp/src/qpid/broker/SessionState.h14
9 files changed, 180 insertions, 182 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index ef2c51bb8d..5237087dc8 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -85,6 +85,8 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
ExecutionHandler* getExecutionHandler() { BADHANDLER(); }
ConnectionHandler* getConnectionHandler() { BADHANDLER(); }
SessionHandler* getSessionHandler() { BADHANDLER(); }
+ Connection010Handler* getConnection010Handler() { BADHANDLER(); }
+ Session010Handler* getSession010Handler() { BADHANDLER(); }
#undef BADHANDLER
private:
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index e296d52214..126e1b2723 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -23,6 +23,7 @@
#include "ConnectionHandler.h"
#include "Connection.h"
#include "qpid/framing/ConnectionStartBody.h"
+#include "qpid/framing/Connection010StartBody.h"
#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/ServerInvoker.h"
@@ -38,11 +39,14 @@ const std::string en_US = "en_US";
}
void ConnectionHandler::init(const framing::ProtocolInitiation& header) {
+ //need to send out a protocol header back to the client
+ handler->connection.getOutput().initiated(header);
+
FieldTable properties;
string mechanisms(PLAIN);
string locales(en_US);
- handler->serverMode = true;
- handler->client.start(header.getMajor(), header.getMinor(), properties, mechanisms, locales);
+ handler->serverMode = true;
+ handler->client.start(properties, mechanisms, locales);
}
void ConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId)
@@ -55,7 +59,7 @@ void ConnectionHandler::handle(framing::AMQFrame& frame)
AMQMethodBody* method=frame.getBody()->getMethod();
try{
if (handler->serverMode) {
- if (!invoke(static_cast<AMQP_ServerOperations::ConnectionHandler&>(*handler.get()), *method))
+ if (!invoke(static_cast<AMQP_ServerOperations::Connection010Handler&>(*handler.get()), *method))
throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0"));
} else {
if (!invoke(static_cast<AMQP_ClientOperations::ConnectionHandler&>(*handler.get()), *method))
diff --git a/cpp/src/qpid/broker/ConnectionHandler.h b/cpp/src/qpid/broker/ConnectionHandler.h
index 2a581d5675..44e2ce05fa 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.h
+++ b/cpp/src/qpid/broker/ConnectionHandler.h
@@ -41,10 +41,10 @@ class Connection;
// TODO aconway 2007-09-18: Rename to ConnectionHandler
class ConnectionHandler : public framing::FrameHandler
{
- struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler,
+ struct Handler : public framing::AMQP_ServerOperations::Connection010Handler,
public framing::AMQP_ClientOperations::ConnectionHandler
{
- framing::AMQP_ClientProxy::Connection client;
+ framing::AMQP_ClientProxy::Connection010 client;
framing::AMQP_ServerProxy::Connection server;
Connection& connection;
bool serverMode;
@@ -55,6 +55,7 @@ class ConnectionHandler : public framing::FrameHandler
const std::string& locale);
void secureOk(const std::string& response);
void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat);
+ void heartbeat() {}
void open(const std::string& virtualHost,
const std::string& capabilities, bool insist);
void close(uint16_t replyCode, const std::string& replyText,
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 9b44f31e14..e012d693fb 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -387,7 +387,7 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
++end;
}
- for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1));
+ for_each(start, end, boost::bind(&SemanticState::adjustFlow, this, _1));
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
@@ -427,16 +427,16 @@ void SemanticState::requestDispatch(ConsumerImpl& c)
}
}
-void SemanticState::acknowledged(const DeliveryRecord& delivery)
+void SemanticState::adjustFlow(const DeliveryRecord& delivery)
{
delivery.subtractFrom(outstanding);
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
- get_pointer(i)->acknowledged(delivery);
+ get_pointer(i)->adjustFlow(delivery);
}
}
-void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
+void SemanticState::ConsumerImpl::adjustFlow(const DeliveryRecord& delivery)
{
if (windowing) {
if (msgCredit != 0xFFFFFFFF) msgCredit++;
@@ -639,4 +639,33 @@ void SemanticState::ConsumerImpl::notify()
parent->outputTasks.activateOutput();
}
+
+void SemanticState::accepted(DeliveryId first, DeliveryId last)
+{
+ AckRange range = findRange(first, last);
+ if (txBuffer.get()) {
+ //in transactional mode, don't dequeue or remove, just
+ //maintain set of acknowledged messages:
+ accumulatedAck.update(first, last);//TODO convert accumulatedAck to SequenceSet
+
+ if (dtxBuffer.get()) {
+ //if enlisted in a dtx, remove the relevant slice from
+ //unacked and record it against that transaction
+ TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+ accumulatedAck.clear();
+ dtxBuffer->enlist(txAck);
+ }
+ } else {
+ for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
+ unacked.erase(range.start, range.end);
+ }
+}
+
+void SemanticState::completed(DeliveryId first, DeliveryId last)
+{
+ AckRange range = findRange(first, last);
+ for_each(range.start, range.end, boost::bind(&SemanticState::adjustFlow, this, _1));
+ requestDispatch();
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index cc9c0e1e9b..88a2fcab5c 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -88,7 +88,7 @@ class SemanticState : public framing::FrameHandler::Chains,
void addMessageCredit(uint32_t value);
void flush();
void stop();
- void acknowledged(const DeliveryRecord&);
+ void adjustFlow(const DeliveryRecord&);
Queue::shared_ptr getQueue() { return queue; }
bool isBlocked() const { return blocked; }
@@ -122,7 +122,7 @@ class SemanticState : public framing::FrameHandler::Chains,
void checkDtxTimeout();
ConsumerImpl& find(const std::string& destination);
void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
- void acknowledged(const DeliveryRecord&);
+ void adjustFlow(const DeliveryRecord&);
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
void requestDispatch(ConsumerImpl&);
@@ -171,8 +171,6 @@ class SemanticState : public framing::FrameHandler::Chains,
void endDtx(const std::string& xid, bool fail);
void suspendDtx(const std::string& xid);
void resumeDtx(const std::string& xid);
- void ackCumulative(DeliveryId deliveryTag);
- void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
void recover(bool requeue);
void flow(bool active);
DeliveryId redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
@@ -180,8 +178,15 @@ class SemanticState : public framing::FrameHandler::Chains,
void release(DeliveryId first, DeliveryId last);
void reject(DeliveryId first, DeliveryId last);
void handle(intrusive_ptr<Message> msg);
-
bool doOutput() { return outputTasks.doOutput(); }
+
+ //preview only (completed == ack):
+ void ackCumulative(DeliveryId deliveryTag);
+ void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
+
+ //final 0-10 spec (completed and accepted are distinct):
+ void completed(DeliveryId deliveryTag, DeliveryId endTag);
+ void accepted(DeliveryId deliveryTag, DeliveryId endTag);
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index 0e3c9928d1..de96ae3f12 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -60,17 +60,10 @@ void SessionHandler::handleIn(AMQFrame& f) {
AMQMethodBody* m = f.getBody()->getMethod();
try {
if (!ignoring) {
- if (m &&
- (invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m) ||
- invoke(static_cast<AMQP_ServerOperations::ExecutionHandler&>(*this), *m))) {
+ if (m && invoke(static_cast<AMQP_ServerOperations::Session010Handler&>(*this), *m)) {
return;
} else if (session.get()) {
- boost::optional<SequenceNumber> ack=session->received(f);
session->handle(f);
- if (ack)
- peerSession.ack(*ack, SequenceNumberSet());
- } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
- return;
} else {
throw ChannelErrorException(
QPID_MSG("Channel " << channel.get() << " is not open"));
@@ -80,7 +73,8 @@ void SessionHandler::handleIn(AMQFrame& f) {
ignoring=true; // Ignore trailing frames sent by client.
session->detach();
session.reset();
- peerSession.closed(e.code, e.what());
+ //TODO: implement new exception handling mechanism
+ //peerSession.closed(e.code, e.what());
}catch(const ConnectionException& e){
connection.close(e.code, e.what(), classId(m), methodId(m));
}catch(const std::exception& e){
@@ -92,7 +86,7 @@ void SessionHandler::handleIn(AMQFrame& f) {
void SessionHandler::handleOut(AMQFrame& f) {
channel.handle(f); // Send it.
if (session->sent(f))
- peerSession.solicitAck();
+ peerSession.flush(false, false, true);
}
void SessionHandler::assertAttached(const char* method) const {
@@ -111,136 +105,123 @@ void SessionHandler::assertClosed(const char* method) const {
<< " is already open."));
}
-void SessionHandler::open(uint32_t detachedLifetime) {
- assertClosed("open");
- std::auto_ptr<SessionState> state(
- connection.broker.getSessionManager().open(*this, detachedLifetime));
- session.reset(state.release());
- peerSession.attached(session->getId(), session->getTimeout());
+void SessionHandler::localSuspend() {
+ if (session.get() && session->isAttached()) {
+ session->detach();
+ connection.broker.getSessionManager().suspend(session);
+ session.reset();
+ }
}
-void SessionHandler::resume(const Uuid& id) {
- assertClosed("resume");
- session = connection.broker.getSessionManager().resume(id);
- session->attach(*this);
- SequenceNumber seq = session->resuming();
- peerSession.attached(session->getId(), session->getTimeout());
- proxy.getSession().ack(seq, SequenceNumberSet());
-}
-void SessionHandler::flow(bool /*active*/) {
- assertAttached("flow");
- // TODO aconway 2007-09-19: Removed in 0-10, remove
- assert(0); throw NotImplementedException("session.flow");
+ConnectionState& SessionHandler::getConnection() { return connection; }
+const ConnectionState& SessionHandler::getConnection() const { return connection; }
+
+//new methods:
+void SessionHandler::attach(const std::string& name, bool /*force*/)
+{
+ //TODO: need to revise session manager to support resume as well
+ assertClosed("attach");
+ std::auto_ptr<SessionState> state(
+ connection.broker.getSessionManager().open(*this, 0));
+ session.reset(state.release());
+ peerSession.attached(name);
}
-void SessionHandler::flowOk(bool /*active*/) {
- assertAttached("flowOk");
- // TODO aconway 2007-09-19: Removed in 0-10, remove
- assert(0); throw NotImplementedException("session.flowOk");
+void SessionHandler::attached(const std::string& /*name*/)
+{
+ std::auto_ptr<SessionState> state(connection.broker.getSessionManager().open(*this, 0));
+ session.reset(state.release());
}
-void SessionHandler::close() {
- assertAttached("close");
- QPID_LOG(info, "Received session.close");
- ignoring=false;
- session->detach();
- session.reset();
- peerSession.closed(REPLY_SUCCESS, "ok");
+void SessionHandler::detach(const std::string& name)
+{
+ assertAttached("detach");
+ localSuspend();
+ peerSession.detached(name, 0);
assert(&connection.getChannel(channel.get()) == this);
connection.closeChannel(channel.get());
}
-void SessionHandler::closed(uint16_t replyCode, const string& replyText) {
- QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText);
+void SessionHandler::detached(const std::string& name, uint8_t code)
+{
ignoring=false;
session->detach();
session.reset();
-}
-
-void SessionHandler::localSuspend() {
- if (session.get() && session->isAttached()) {
- session->detach();
- connection.broker.getSessionManager().suspend(session);
- session.reset();
+ if (code) {
+ //no error
+ } else {
+ //error occured
+ QPID_LOG(warning, "Received session.closed: "<< name << " " << code);
}
}
-void SessionHandler::suspend() {
- assertAttached("suspend");
- localSuspend();
- peerSession.detached();
- assert(&connection.getChannel(channel.get()) == this);
- connection.closeChannel(channel.get());
-}
-
-void SessionHandler::ack(uint32_t cumulativeSeenMark,
- const SequenceNumberSet& /*seenFrameSet*/)
+void SessionHandler::requestTimeout(uint32_t t)
{
- assertAttached("ack");
- if (session->getState() == SessionState::RESUMING) {
- session->receivedAck(cumulativeSeenMark);
- framing::SessionState::Replay replay=session->replay();
- std::for_each(replay.begin(), replay.end(),
- boost::bind(&SessionHandler::handleOut, this, _1));
- }
- else
- session->receivedAck(cumulativeSeenMark);
+ session->setTimeout(t);
+ //proxy.timeout(t);
}
-void SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) {
- // TODO aconway 2007-10-02: may be removed from spec.
- assert(0); throw NotImplementedException("session.high-water-mark");
+void SessionHandler::timeout(uint32_t)
+{
+ //not sure what we need to do on the server for this...
}
-void SessionHandler::solicitAck() {
- assertAttached("solicit-ack");
- peerSession.ack(session->sendingAck(), SequenceNumberSet());
+void SessionHandler::commandPoint(const framing::SequenceNumber& id, uint64_t offset)
+{
+ if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point");
+
+ session->next = id;
}
-void SessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime)
+void SessionHandler::expected(const framing::SequenceSet& commands, const framing::Array& fragments)
{
- std::auto_ptr<SessionState> state(
- connection.broker.getSessionManager().open(*this, detachedLifetime));
- session.reset(state.release());
+ if (!commands.empty() || fragments.size()) {
+ throw NotImplementedException("Session resumption not yet supported");
+ }
}
-void SessionHandler::detached()
+void SessionHandler::confirmed(const framing::SequenceSet& /*commands*/, const framing::Array& /*fragments*/)
{
- connection.broker.getSessionManager().suspend(session);
- session.reset();
+ //don't really care too much about this yet
}
-
-ConnectionState& SessionHandler::getConnection() { return connection; }
-const ConnectionState& SessionHandler::getConnection() const { return connection; }
-
-void SessionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
+void SessionHandler::completed(const framing::SequenceSet& commands, bool timelyReply)
{
- assertAttached("complete");
- session->complete(cumulative, range);
+ session->complete(commands);
+ if (timelyReply) {
+ peerSession.knownCompleted(session->knownCompleted);
+ session->knownCompleted.clear();
+ }
}
-void SessionHandler::flush()
+void SessionHandler::knownCompleted(const framing::SequenceSet& commands)
{
- assertAttached("flush");
- session->flush();
+ session->completed.remove(commands);
}
-void SessionHandler::sync()
+
+void SessionHandler::flush(bool expected, bool confirmed, bool completed)
{
- assertAttached("sync");
- session->sync();
+ if (expected) {
+ peerSession.expected(SequenceSet(session->next), Array());
+ }
+ if (confirmed) {
+ peerSession.confirmed(session->completed, Array());
+ }
+ if (completed) {
+ peerSession.completed(session->completed, true);
+ }
}
-void SessionHandler::noop()
+
+void SessionHandler::sendCompletion()
{
- assertAttached("noop");
- session->noop();
+ peerSession.completed(session->completed, true);
}
-void SessionHandler::result(uint32_t /*command*/, const std::string& /*data*/)
+void SessionHandler::gap(const framing::SequenceSet& /*commands*/)
{
- //never actually sent by client at present
+ throw NotImplementedException("gap not yet supported");
}
-
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index e6bc463a82..4b031f2951 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -27,8 +27,10 @@
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/Array.h"
#include "qpid/framing/ChannelHandler.h"
#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/SequenceSet.h"
#include <boost/noncopyable.hpp>
@@ -44,9 +46,7 @@ class SessionState;
* receives incoming frames, handles session controls and manages the
* association between the channel and a session.
*/
-class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
- public framing::AMQP_ClientOperations::SessionHandler,
- public framing::AMQP_ServerOperations::ExecutionHandler,
+class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler,
public framing::FrameHandler::InOutHandler,
private boost::noncopyable
{
@@ -69,35 +69,32 @@ class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
// Called by closing connection.
void localSuspend();
void detach() { localSuspend(); }
+ void sendCompletion();
protected:
void handleIn(framing::AMQFrame&);
void handleOut(framing::AMQFrame&);
private:
- /// Session methods
- void open(uint32_t detachedLifetime);
- void flow(bool active);
- void flowOk(bool active);
- void close();
- void closed(uint16_t replyCode, const std::string& replyText);
- void resume(const framing::Uuid& sessionId);
- void suspend();
- void ack(uint32_t cumulativeSeenMark,
- const framing::SequenceNumberSet& seenFrameSet);
- void highWaterMark(uint32_t lastSentMark);
- void solicitAck();
-
- //extra methods required for assuming client role
- void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
- void detached();
-
- //Execution methods:
- void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);
- void flush();
- void noop();
- void result(uint32_t command, const std::string& data);
- void sync();
+ //new methods:
+ void attach(const std::string& name, bool force);
+ void attached(const std::string& name);
+ void detach(const std::string& name);
+ void detached(const std::string& name, uint8_t code);
+
+ void requestTimeout(uint32_t t);
+ void timeout(uint32_t t);
+
+ void commandPoint(const framing::SequenceNumber& id, uint64_t offset);
+ void expected(const framing::SequenceSet& commands, const framing::Array& fragments);
+ void confirmed(const framing::SequenceSet& commands,const framing::Array& fragments);
+ void completed(const framing::SequenceSet& commands, bool timelyReply);
+ void knownCompleted(const framing::SequenceSet& commands);
+ void flush(bool expected, bool confirmed, bool completed);
+ void gap(const framing::SequenceSet& commands);
+
+ //hacks for old generator:
+ void commandPoint(uint32_t id, uint64_t offset) { commandPoint(framing::SequenceNumber(id), offset); }
void assertAttached(const char* method) const;
void assertActive(const char* method) const;
@@ -106,7 +103,7 @@ class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
Connection& connection;
framing::ChannelHandler channel;
framing::AMQP_ClientProxy proxy;
- framing::AMQP_ClientProxy::Session peerSession;
+ framing::AMQP_ClientProxy::Session010 peerSession;
bool ignoring;
std::auto_ptr<SessionState> session;
};
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 573a567da6..5f04136444 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -49,7 +49,7 @@ SessionState::SessionState(
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
- ackOp(boost::bind(&SemanticState::ackRange, &semanticState, _1, _2))
+ ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2))
{
getConnection().outputTasks.addOutputTask(&semanticState);
@@ -170,9 +170,9 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
void SessionState::handleCommand(framing::AMQMethodBody* method)
{
- SequenceNumber id = incoming.next();
+ SequenceNumber id = next++;
Invoker::Result invocation = invoke(adapter, *method);
- incoming.complete(id);
+ completed.add(id);
if (!invocation.wasHandled()) {
throw NotImplementedException("Not implemented");
@@ -180,7 +180,6 @@ void SessionState::handleCommand(framing::AMQMethodBody* method)
getProxy().getExecution().result(id.getValue(), invocation.getResult());
}
if (method->isSync()) {
- incoming.sync(id);
sendCompletion();
}
//TODO: if window gets too large send unsolicited completion
@@ -190,7 +189,8 @@ void SessionState::handleContent(AMQFrame& frame)
{
intrusive_ptr<Message> msg(msgBuilder.getMessage());
if (!msg) {//start of frameset will be indicated by frame flags
- msgBuilder.start(incoming.next());
+ SequenceNumber id = next++;
+ msgBuilder.start(id);
msg = msgBuilder.getMessage();
}
msgBuilder.handle(frame);
@@ -198,9 +198,9 @@ void SessionState::handleContent(AMQFrame& frame)
msg->setPublisher(&getConnection());
semanticState.handle(msg);
msgBuilder.end();
- incoming.track(msg);
+ //TODO: may want to hold up execution until async enqueue is complete
+ completed.add(msg->getCommandId());
if (msg->getFrames().getMethod()->isSync()) {
- incoming.sync(msg->getCommandId());
sendCompletion();
}
}
@@ -208,6 +208,8 @@ void SessionState::handleContent(AMQFrame& frame)
void SessionState::handle(AMQFrame& frame)
{
+ received(frame);
+
//TODO: make command handling more uniform, regardless of whether
//commands carry content. (For now, assume all single frame
//assmblies are non-content bearing and all content-bearing
@@ -229,38 +231,13 @@ DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr t
void SessionState::sendCompletion()
{
- SequenceNumber mark = incoming.getMark();
- SequenceNumberSet range = incoming.getRange();
- getProxy().getExecution().complete(mark.getValue(), range);
-}
-
-void SessionState::complete(uint32_t cumulative, const SequenceNumberSet& range)
-{
- //record:
- SequenceNumber mark(cumulative);
- if (outgoing.lwm < mark) {
- outgoing.lwm = mark;
- //ack messages:
- semanticState.ackCumulative(mark.getValue());
- }
- range.processRanges(ackOp);
-}
-
-void SessionState::flush()
-{
- incoming.flush();
- sendCompletion();
-}
-
-void SessionState::sync()
-{
- incoming.sync();
- sendCompletion();
+ handler->sendCompletion();
}
-void SessionState::noop()
+void SessionState::complete(const SequenceSet& commands)
{
- incoming.noop();
+ knownCompleted.add(commands);
+ commands.for_each(ackOp);
}
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index 98c21a8ab5..fa6bd14ef3 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -25,6 +25,7 @@
#include "qpid/framing/Uuid.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SessionState.h"
+#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Time.h"
@@ -83,6 +84,8 @@ class SessionState : public framing::SessionState,
ConnectionState& getConnection();
uint32_t getTimeout() const { return timeout; }
+ void setTimeout(uint32_t t) { timeout = t; }
+
Broker& getBroker() { return broker; }
framing::ProtocolVersion getVersion() const { return version; }
@@ -93,10 +96,7 @@ class SessionState : public framing::SessionState,
void handleCommand(framing::AMQMethodBody* method);
void handleContent(framing::AMQFrame& frame);
- void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);
- void flush();
- void noop();
- void sync();
+ void complete(const framing::SequenceSet& ranges);
void sendCompletion();
//delivery adapter methods:
@@ -114,6 +114,10 @@ class SessionState : public framing::SessionState,
uint32_t ackInterval);
+ framing::SequenceSet completed;
+ framing::SequenceSet knownCompleted;
+ framing::SequenceNumber next;
+
private:
typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
@@ -130,8 +134,6 @@ class SessionState : public framing::SessionState,
BrokerAdapter adapter;
MessageBuilder msgBuilder;
- //execution state
- IncomingExecutionContext incoming;
framing::Window outgoing;
RangedOperation ackOp;