summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp57
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h36
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp2
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.h4
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.cpp63
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.h22
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp20
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.h26
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp44
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h5
10 files changed, 162 insertions, 117 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 77030855ff..024516fb7b 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -137,17 +137,17 @@ void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const stri
broker.getExchanges().destroy(name);
}
-void BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
+ExchangeQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
{
try {
Exchange::shared_ptr exchange(broker.getExchanges().get(name));
- client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
+ return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
} catch (const ChannelException& e) {
- client.queryOk("", false, true, FieldTable());
+ return ExchangeQueryResult("", false, true, FieldTable());
}
}
-void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
+BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
const std::string& exchangeName,
const std::string& queueName,
const std::string& key,
@@ -164,24 +164,40 @@ void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
}
if (!exchange) {
- client.queryOk(true, false, false, false, false);
+ return BindingQueryResult(true, false, false, false, false);
} else if (!queueName.empty() && !queue) {
- client.queryOk(false, true, false, false, false);
+ return BindingQueryResult(false, true, false, false, false);
} else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) {
- client.queryOk(false, false, false, false, false);
+ return BindingQueryResult(false, false, false, false, false);
} else {
//need to test each specified option individually
bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0);
bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0);
bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args);
- client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched);
+ return BindingQueryResult(false, false, !queueMatched, !keyMatched, !argsMatched);
}
}
+QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name)
+{
+ Queue::shared_ptr queue = getQueue(name);
+ Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
+
+ return QueueQueryResult(queue->getName(),
+ alternateExchange ? alternateExchange->getName() : "",
+ queue->isDurable(),
+ queue->hasExclusiveOwner(),
+ queue->isAutoDelete(),
+ queue->getSettings(),
+ queue->getMessageCount(),
+ queue->getConsumerCount());
+
+}
+
void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, const string& alternateExchange,
bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+ bool autoDelete, const qpid::framing::FieldTable& arguments){
Exchange::shared_ptr alternate;
if (!alternateExchange.empty()) {
alternate = broker.getExchanges().get(alternateExchange);
@@ -223,11 +239,6 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
405,
format("Cannot grant exclusive access to queue '%s'")
% queue->getName());
- if (!nowait) {
- string queueName = queue->getName();
- client.declareOk(
- queueName, queue->getMessageCount(), queue->getConsumerCount());
- }
}
void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName,
@@ -269,17 +280,13 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/,
}
-void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queueName, bool nowait){
-
- Queue::shared_ptr queue = getQueue(queueName);
- int count = queue->purge();
- if(!nowait) client.purgeOk( count);
+void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){
+ getQueue(queue)->purge();
}
void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue,
- bool ifUnused, bool ifEmpty, bool nowait){
+ bool ifUnused, bool ifEmpty){
ChannelException error(0, "");
- int count(0);
Queue::shared_ptr q = getQueue(queue);
if(ifEmpty && q->getMessageCount() > 0){
throw ChannelException(406, "Queue not empty.");
@@ -291,14 +298,10 @@ void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string&
QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q);
if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i);
}
- count = q->getMessageCount();
q->destroy();
broker.getQueues().destroy(queue);
q->unbind(broker.getExchanges(), q);
}
-
- if(!nowait)
- client.deleteOk(count);
}
@@ -333,10 +336,8 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
queue->requestDispatch();
}
-void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag, bool nowait){
+void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){
channel.cancel(consumerTag);
-
- if(!nowait) client.cancelOk(consumerTag);
}
void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/,
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index 3fe2eb9eba..99b7f14525 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -121,7 +121,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
const qpid::framing::FieldTable& arguments);
void delete_(uint16_t ticket,
const std::string& exchange, bool ifUnused);
- void query(u_int16_t ticket, const string& name);
+ framing::ExchangeQueryResult query(u_int16_t ticket, const string& name);
private:
void checkType(Exchange::shared_ptr exchange, const std::string& type);
void checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate);
@@ -134,11 +134,11 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
public:
BindingHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
- void query(u_int16_t ticket,
- const std::string& exchange,
- const std::string& queue,
- const std::string& routingKey,
- const framing::FieldTable& arguments);
+ framing::BindingQueryResult query(u_int16_t ticket,
+ const std::string& exchange,
+ const std::string& queue,
+ const std::string& routingKey,
+ const framing::FieldTable& arguments);
};
class QueueHandlerImpl :
@@ -151,7 +151,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
void declare(uint16_t ticket, const std::string& queue,
const std::string& alternateExchange,
bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait,
+ bool autoDelete,
const qpid::framing::FieldTable& arguments);
void bind(uint16_t ticket, const std::string& queue,
const std::string& exchange, const std::string& routingKey,
@@ -161,11 +161,10 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
const std::string& exchange,
const std::string& routingKey,
const qpid::framing::FieldTable& arguments );
- void purge(uint16_t ticket, const std::string& queue,
- bool nowait);
+ framing::QueueQueryResult query(const string& queue);
+ void purge(uint16_t ticket, const std::string& queue);
void delete_(uint16_t ticket, const std::string& queue,
- bool ifUnused, bool ifEmpty,
- bool nowait);
+ bool ifUnused, bool ifEmpty);
};
class BasicHandlerImpl :
@@ -179,18 +178,15 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
void qos(uint32_t prefetchSize,
uint16_t prefetchCount, bool global);
- void consume(
- uint16_t ticket, const std::string& queue,
- const std::string& consumerTag, bool noLocal, bool noAck,
- bool exclusive, bool nowait,
- const qpid::framing::FieldTable& fields);
- void cancel(const std::string& consumerTag,
- bool nowait);
+ void consume(uint16_t ticket, const std::string& queue,
+ const std::string& consumerTag,
+ bool noLocal, bool noAck, bool exclusive, bool nowait,
+ const qpid::framing::FieldTable& fields);
+ void cancel(const std::string& consumerTag);
void publish(uint16_t ticket,
const std::string& exchange, const std::string& routingKey,
bool rejectUnroutable, bool immediate);
- void get(uint16_t ticket, const std::string& queue,
- bool noAck);
+ void get(uint16_t ticket, const std::string& queue, bool noAck);
void ack(uint64_t deliveryTag, bool multiple);
void reject(uint64_t deliveryTag, bool requeue);
void recover(bool requeue);
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index e135e960c4..0dc4bed661 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -374,7 +374,7 @@ void Channel::ack(DeliveryId first, DeliveryId last, bool cumulative)
//just acked single element (move end past it)
++end;
}
-
+
for_each(start, end, boost::bind(&Channel::acknowledged, this, _1));
if (txBuffer.get()) {
diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h
index 857a7adfc2..35aa954c1e 100644
--- a/cpp/src/qpid/broker/BrokerQueue.h
+++ b/cpp/src/qpid/broker/BrokerQueue.h
@@ -145,8 +145,10 @@ namespace qpid {
inline const string& getName() const { return name; }
inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
inline bool hasExclusiveConsumer() const { return exclusive; }
+ inline bool hasExclusiveOwner() const { return owner != 0; }
inline bool isDurable() const { return store != 0; }
-
+ inline const framing::FieldTable& getSettings() const { return settings; }
+ inline bool isAutoDelete() const { return autodelete; }
bool canAutoDelete() const;
bool enqueue(TransactionContext* ctxt, Message::shared_ptr& msg);
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
index 8b3629dff9..5a69ff0d65 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
@@ -22,19 +22,9 @@
#include "BrokerChannel.h"
using namespace qpid::broker;
-using qpid::framing::AMQP_ClientProxy;
-using qpid::framing::Buffer;
-using qpid::framing::FieldTable;
+using namespace qpid::framing;
using std::string;
-DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) :
- CoreRefs(parent),
- dClient(AMQP_ClientProxy::DtxDemarcation::get(proxy)),
- cClient(AMQP_ClientProxy::DtxCoordination::get(proxy))
-
-{
-}
-
const int XA_RBROLLBACK(1);
const int XA_RBTIMEOUT(2);
const int XA_HEURHAZ(3);
@@ -44,6 +34,7 @@ const int XA_HEURMIX(6);
const int XA_RDONLY(7);
const int XA_OK(8);
+DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) : CoreRefs(parent) {}
// DtxDemarcationHandler:
@@ -53,10 +44,10 @@ void DtxHandlerImpl::select()
channel.selectDtx();
}
-void DtxHandlerImpl::end(u_int16_t /*ticket*/,
- const string& xid,
- bool fail,
- bool suspend)
+DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
+ const string& xid,
+ bool fail,
+ bool suspend)
{
try {
if (fail) {
@@ -64,7 +55,7 @@ void DtxHandlerImpl::end(u_int16_t /*ticket*/,
if (suspend) {
throw ConnectionException(503, "End and suspend cannot both be set.");
} else {
- dClient.endOk(XA_RBROLLBACK);
+ return DtxDemarcationEndResult(XA_RBROLLBACK);
}
} else {
if (suspend) {
@@ -72,14 +63,14 @@ void DtxHandlerImpl::end(u_int16_t /*ticket*/,
} else {
channel.endDtx(xid, false);
}
- dClient.endOk(XA_OK);
+ return DtxDemarcationEndResult(XA_OK);
}
} catch (const DtxTimeoutException& e) {
- dClient.endOk(XA_RBTIMEOUT);
+ return DtxDemarcationEndResult(XA_RBTIMEOUT);
}
}
-void DtxHandlerImpl::start(u_int16_t /*ticket*/,
+DtxDemarcationStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/,
const string& xid,
bool join,
bool resume)
@@ -93,52 +84,52 @@ void DtxHandlerImpl::start(u_int16_t /*ticket*/,
} else {
channel.startDtx(xid, broker.getDtxManager(), join);
}
- dClient.startOk(XA_OK);
+ return DtxDemarcationStartResult(XA_OK);
} catch (const DtxTimeoutException& e) {
- dClient.startOk(XA_RBTIMEOUT);
+ return DtxDemarcationStartResult(XA_RBTIMEOUT);
}
}
// DtxCoordinationHandler:
-void DtxHandlerImpl::prepare(u_int16_t /*ticket*/,
+DtxCoordinationPrepareResult DtxHandlerImpl::prepare(u_int16_t /*ticket*/,
const string& xid)
{
try {
bool ok = broker.getDtxManager().prepare(xid);
- cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK);
+ return DtxCoordinationPrepareResult(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- cClient.prepareOk(XA_RBTIMEOUT);
+ return DtxCoordinationPrepareResult(XA_RBTIMEOUT);
}
}
-void DtxHandlerImpl::commit(u_int16_t /*ticket*/,
+DtxCoordinationCommitResult DtxHandlerImpl::commit(u_int16_t /*ticket*/,
const string& xid,
bool onePhase)
{
try {
bool ok = broker.getDtxManager().commit(xid, onePhase);
- cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK);
+ return DtxCoordinationCommitResult(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- cClient.commitOk(XA_RBTIMEOUT);
+ return DtxCoordinationCommitResult(XA_RBTIMEOUT);
}
}
-void DtxHandlerImpl::rollback(u_int16_t /*ticket*/,
+DtxCoordinationRollbackResult DtxHandlerImpl::rollback(u_int16_t /*ticket*/,
const string& xid )
{
try {
broker.getDtxManager().rollback(xid);
- cClient.rollbackOk(XA_OK);
+ return DtxCoordinationRollbackResult(XA_OK);
} catch (const DtxTimeoutException& e) {
- cClient.rollbackOk(XA_RBTIMEOUT);
+ return DtxCoordinationRollbackResult(XA_RBTIMEOUT);
}
}
-void DtxHandlerImpl::recover(u_int16_t /*ticket*/,
- bool /*startscan*/,
- bool /*endscan*/ )
+DtxCoordinationRecoverResult DtxHandlerImpl::recover(u_int16_t /*ticket*/,
+ bool /*startscan*/,
+ bool /*endscan*/ )
{
//TODO: what do startscan and endscan actually mean?
@@ -169,7 +160,7 @@ void DtxHandlerImpl::recover(u_int16_t /*ticket*/,
FieldTable response;
response.setString("xids", data);
- cClient.recoverOk(response);
+ return DtxCoordinationRecoverResult(response);
}
void DtxHandlerImpl::forget(u_int16_t /*ticket*/,
@@ -179,10 +170,10 @@ void DtxHandlerImpl::forget(u_int16_t /*ticket*/,
throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1% not heuristically completed!") % xid);
}
-void DtxHandlerImpl::getTimeout(const string& xid)
+DtxCoordinationGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid)
{
uint32_t timeout = broker.getDtxManager().getTimeout(xid);
- cClient.getTimeoutOk(timeout);
+ return DtxCoordinationGetTimeoutResult(timeout);
}
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.h b/cpp/src/qpid/broker/DtxHandlerImpl.h
index 067ba47fb5..da6379b26c 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.h
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.h
@@ -31,34 +31,34 @@ class DtxHandlerImpl
public framing::AMQP_ServerOperations::DtxCoordinationHandler,
public framing::AMQP_ServerOperations::DtxDemarcationHandler
{
- framing::AMQP_ClientProxy::DtxDemarcation dClient;
- framing::AMQP_ClientProxy::DtxCoordination cClient;
public:
DtxHandlerImpl(CoreRefs& parent);
// DtxCoordinationHandler:
- void commit(u_int16_t ticket, const std::string& xid, bool onePhase);
+ framing::DtxCoordinationCommitResult commit(u_int16_t ticket, const std::string& xid, bool onePhase);
void forget(u_int16_t ticket, const std::string& xid);
- void getTimeout(const std::string& xid);
+ framing::DtxCoordinationGetTimeoutResult getTimeout(const std::string& xid);
- void prepare(u_int16_t ticket, const std::string& xid);
+ framing::DtxCoordinationPrepareResult prepare(u_int16_t ticket, const std::string& xid);
- void recover(u_int16_t ticket, bool startscan, bool endscan);
+ framing::DtxCoordinationRecoverResult recover(u_int16_t ticket, bool startscan, bool endscan);
- void rollback(u_int16_t ticket, const std::string& xid);
+ framing::DtxCoordinationRollbackResult rollback(u_int16_t ticket, const std::string& xid);
void setTimeout(u_int16_t ticket, const std::string& xid, u_int32_t timeout);
// DtxDemarcationHandler:
-
- void end(u_int16_t ticket, const std::string& xid, bool fail, bool suspend);
-
+
+ framing::DtxDemarcationEndResult end(u_int16_t ticket, const std::string& xid, bool fail, bool suspend);
+
void select();
+
+ framing::DtxDemarcationStartResult start(u_int16_t ticket, const std::string& xid, bool join, bool resume);
+
- void start(u_int16_t ticket, const std::string& xid, bool join, bool resume);
};
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index 3f407c11f7..ce1fa1e028 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -87,19 +87,21 @@ MessageHandlerImpl::offset(uint64_t /*value*/ )
}
void
-MessageHandlerImpl::consume(uint16_t /*ticket*/,
+MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
const string& queueName,
const string& destination,
bool noLocal,
- bool noAck,
+ u_int8_t confirmMode,
+ u_int8_t /*acquireMode*/,//TODO: implement acquire modes
bool exclusive,
const framing::FieldTable& filter )
{
Queue::shared_ptr queue = getQueue(queueName);
if(!destination.empty() && channel.exists(destination))
throw ConnectionException(530, "Consumer tags must be unique");
+
string tag = destination;
- channel.consume(MessageMessage::getToken(destination), tag, queue, noLocal, !noAck, exclusive, &filter);
+ channel.consume(MessageMessage::getToken(destination), tag, queue, noLocal, confirmMode == 1, exclusive, &filter);
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
}
@@ -153,8 +155,9 @@ MessageHandlerImpl::recover(bool requeue)
}
void
-MessageHandlerImpl::reject(uint16_t /*code*/, const string& /*text*/ )
+MessageHandlerImpl::reject(const SequenceNumberSet& /*transfers*/, uint16_t /*code*/, const string& /*text*/ )
{
+ //TODO: implement
}
void
@@ -210,5 +213,14 @@ void MessageHandlerImpl::stop(const std::string& destination)
channel.stop(destination);
}
+void MessageHandlerImpl::acquire(const SequenceNumberSet& /*transfers*/, u_int8_t /*mode*/)
+{
+ throw ConnectionException(540, "Not yet implemented");
+}
+
+void MessageHandlerImpl::release(const SequenceNumberSet& /*transfers*/)
+{
+ throw ConnectionException(540, "Not yet implemented");
+}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.h b/cpp/src/qpid/broker/MessageHandlerImpl.h
index 20cae46da4..f4d9fa0c76 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.h
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.h
@@ -49,14 +49,6 @@ class MessageHandlerImpl :
void close(const std::string& reference );
- void consume(uint16_t ticket,
- const std::string& queue,
- const std::string& destination,
- bool noLocal,
- bool noAck,
- bool exclusive,
- const framing::FieldTable& filter );
-
void empty();
void get(uint16_t ticket,
@@ -76,8 +68,9 @@ class MessageHandlerImpl :
void recover(bool requeue );
- void reject(uint16_t code,
- const std::string& text );
+ void reject(const framing::SequenceNumberSet& transfers,
+ uint16_t code,
+ const std::string& text );
void resume(const std::string& reference,
const std::string& identifier );
@@ -92,6 +85,19 @@ class MessageHandlerImpl :
void stop(const std::string& destination);
+ void acquire(const framing::SequenceNumberSet& transfers, u_int8_t mode);
+
+ void release(const framing::SequenceNumberSet& transfers);
+
+ void subscribe(u_int16_t ticket,
+ const string& queue,
+ const string& destination,
+ bool noLocal,
+ u_int8_t confirmMode,
+ u_int8_t acquireMode,
+ bool exclusive,
+ const framing::FieldTable& filter);
+
private:
ReferenceRegistry references;
};
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index b7aa2aad25..f65e450e82 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -22,8 +22,10 @@
#include "SemanticHandler.h"
#include "BrokerAdapter.h"
#include "qpid/framing/ChannelAdapter.h"
-#include "qpid/framing/ExecutionCompleteBody.h"
#include "qpid/framing/ChannelCloseOkBody.h"
+#include "qpid/framing/ExecutionCompleteBody.h"
+#include "qpid/framing/ExecutionResultBody.h"
+#include "qpid/framing/InvocationVisitor.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -66,6 +68,11 @@ void SemanticHandler::handleMethod(framing::AMQMethodBody* method)
{
try {
if (!method->invoke(this)) {
+ //temporary hack until channel management is moved to its own handler:
+ if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
+ ++(incoming.lwm);
+ }
+
//else do the usual:
handleL4(method);
//(if the frameset is complete) we can move the execution-mark
@@ -73,7 +80,9 @@ void SemanticHandler::handleMethod(framing::AMQMethodBody* method)
//temporary hack until channel management is moved to its own handler:
if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
- ++(incoming.hwm);
+ //TODO: need to account for async store opreations
+ //when this command is a message publication
+ ++(incoming.hwm);
}
//note: need to be more sophisticated than this if we execute
@@ -85,7 +94,7 @@ void SemanticHandler::handleMethod(framing::AMQMethodBody* method)
}
}
-void SemanticHandler::complete(uint32_t cumulative, SequenceNumberSet range)
+void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
{
//record:
SequenceNumber mark(cumulative);
@@ -98,7 +107,7 @@ void SemanticHandler::complete(uint32_t cumulative, SequenceNumberSet range)
if (range.size() % 2) { //must be even number
throw ConnectionException(530, "Received odd number of elements in ranged mark");
} else {
- for (SequenceNumberSet::iterator i = range.begin(); i != range.end(); i++) {
+ for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) {
channel.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
}
}
@@ -113,6 +122,25 @@ void SemanticHandler::flush()
ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()));
}
}
+void SemanticHandler::sync()
+{
+ //for now, just treat as flush; will need to get more clever when we deal with async publication
+ flush();
+}
+
+void SemanticHandler::noop()
+{
+ //Do nothing...
+ //
+ //is this an L3 control? or is it an L4 command?
+ //if the former, of what use is it?
+ //if the latter it may contain a synch request... but its odd to have it in this class
+}
+
+void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/)
+{
+ //never actually sent by client at present
+}
void SemanticHandler::handleL4(framing::AMQMethodBody* method)
{
@@ -124,7 +152,13 @@ void SemanticHandler::handleL4(framing::AMQMethodBody* method)
throw ConnectionException(504, out.str());
}
} else {
- method->invoke(*adapter);
+ InvocationVisitor v(adapter.get());
+ method->accept(v);
+ if (!v.wasHandled()) {
+ throw ConnectionException(540, "Not implemented");
+ } else if (v.hasResult()) {
+ ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult()));
+ }
}
}catch(const ChannelException& e){
adapter->getProxy().getChannel().close(
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
index 6748da8500..672c6ad929 100644
--- a/cpp/src/qpid/broker/SemanticHandler.h
+++ b/cpp/src/qpid/broker/SemanticHandler.h
@@ -79,8 +79,11 @@ public:
void handle(framing::AMQFrame& frame);
//execution class method handlers:
- void complete(uint32_t cumulativeExecutionMark, framing::SequenceNumberSet range);
+ void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);
void flush();
+ void noop();
+ void result(uint32_t command, const std::string& data);
+ void sync();
};
}}