summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp103
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h21
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp23
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.h3
-rw-r--r--cpp/src/qpid/broker/BrokerExchange.h3
-rw-r--r--cpp/src/qpid/broker/BrokerMessageBase.h2
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.cpp22
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.cpp17
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.h2
-rw-r--r--cpp/src/qpid/broker/Connection.cpp1
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.cpp5
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.h2
-rw-r--r--cpp/src/qpid/broker/Deliverable.h2
-rw-r--r--cpp/src/qpid/broker/DeliverableMessage.cpp2
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.cpp4
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.h2
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.cpp5
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp25
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp7
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h2
-rw-r--r--cpp/src/qpid/broker/TxPublish.cpp1
21 files changed, 150 insertions, 104 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 9bf148bcf0..376108193a 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -55,8 +55,7 @@ ProtocolVersion BrokerAdapter::getVersion() const {
void BrokerAdapter::ChannelHandlerImpl::open(const string& /*outOfBand*/){
channel.open();
- // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
- client.openOk(std::string()/* ID */);
+ client.openOk();
}
void BrokerAdapter::ChannelHandlerImpl::flow(bool active){
@@ -80,41 +79,63 @@ void BrokerAdapter::ChannelHandlerImpl::closeOk(){}
void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type,
- bool passive, bool durable, bool /*autoDelete*/, bool /*internal*/, bool nowait,
- const FieldTable& args){
+ const string& alternateExchange,
+ bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
+ Exchange::shared_ptr alternate;
+ if (!alternateExchange.empty()) {
+ alternate = broker.getExchanges().get(alternateExchange);
+ }
if(passive){
- if(!broker.getExchanges().get(exchange)) {
- throw ChannelException(404, "Exchange not found: " + exchange);
- }
+ Exchange::shared_ptr actual(broker.getExchanges().get(exchange));
+ checkType(actual, type);
+ checkAlternate(actual, alternate);
}else{
try{
std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type, durable, args);
if (response.second) {
- if (durable) broker.getStore().create(*response.first);
- } else if (response.first->getType() != type) {
- throw ConnectionException(
- 530,
- "Exchange already declared to be of type "
- + response.first->getType() + ", requested " + type);
+ if (durable) {
+ broker.getStore().create(*response.first);
+ }
+ if (alternate) {
+ response.first->setAlternate(alternate);
+ alternate->incAlternateUsers();
+ }
+ } else {
+ checkType(response.first, type);
+ checkAlternate(response.first, alternate);
}
}catch(UnknownExchangeTypeException& e){
throw ConnectionException(
503, "Exchange type not implemented: " + type);
}
}
- if(!nowait){
- client.declareOk();
+}
+
+void BrokerAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchange, const std::string& type)
+{
+ if (!type.empty() && exchange->getType() != type) {
+ throw ConnectionException(530, "Exchange declared to be of type " + exchange->getType() + ", requested " + type);
+ }
+}
+
+void BrokerAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate)
+{
+ if (alternate && alternate != exchange->getAlternate()) {
+ throw ConnectionException(530, "Exchange declared with alternate-exchange "
+ + exchange->getAlternate()->getName() + ", requested "
+ + alternate->getName());
}
+
}
-void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/,
- const string& name, bool /*ifUnused*/, bool nowait){
+void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/){
//TODO: implement unused
Exchange::shared_ptr exchange(broker.getExchanges().get(name));
+ if (exchange->inUseAsAlternate()) throw ConnectionException(530, "Exchange in use as alternate-exchange.");
if (exchange->isDurable()) broker.getStore().destroy(*exchange);
+ if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
broker.getExchanges().destroy(name);
- if(!nowait) client.deleteOk();
}
void BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
@@ -159,12 +180,17 @@ void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
}
}
-void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name,
+void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, const string& alternateExchange,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+ Exchange::shared_ptr alternate;
+ if (!alternateExchange.empty()) {
+ alternate = broker.getExchanges().get(alternateExchange);
+ }
Queue::shared_ptr queue;
if (passive && !name.empty()) {
queue = getQueue(name);
+ //TODO: check alternate-exchange is as expected
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
broker.getQueues().declare(
@@ -175,6 +201,11 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
assert(queue);
if (queue_created.second) { // This is a new queue
channel.setDefaultQueue(queue);
+ if (alternate) {
+ queue->setAlternateExchange(alternate);
+ alternate->incAlternateUsers();
+ }
+
//apply settings & create persistent record if required
queue_created.first->create(arguments);
@@ -201,7 +232,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
}
void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName,
- const string& exchangeName, const string& routingKey, bool nowait,
+ const string& exchangeName, const string& routingKey,
const FieldTable& arguments){
Queue::shared_ptr queue = getQueue(queueName);
@@ -214,7 +245,6 @@ void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& qu
broker.getStore().bind(*exchange, *queue, routingKey, arguments);
}
}
- if(!nowait) client.bindOk();
}else{
throw ChannelException(
404, "Bind failed. No such exchange: " + exchangeName);
@@ -238,7 +268,6 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/,
broker.getStore().unbind(*exchange, *queue, routingKey, arguments);
}
- client.unbindOk();
}
void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queueName, bool nowait){
@@ -280,7 +309,6 @@ void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefet
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- client.qosOk();
}
void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
@@ -314,12 +342,12 @@ void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag, bool now
void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/,
const string& exchangeName, const string& routingKey,
- bool mandatory, bool immediate)
+ bool rejectUnroutable, bool immediate)
{
Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
if(exchange){
- BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, mandatory, immediate);
+ BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, rejectUnroutable, immediate);
channel.handlePublish(msg);
}else{
throw ChannelException(
@@ -351,19 +379,16 @@ void BrokerAdapter::BasicHandlerImpl::recover(bool requeue)
void BrokerAdapter::TxHandlerImpl::select()
{
channel.startTx();
- client.selectOk();
}
void BrokerAdapter::TxHandlerImpl::commit()
{
channel.commit();
- client.commitOk();
}
void BrokerAdapter::TxHandlerImpl::rollback()
{
channel.rollback();
- client.rollbackOk();
channel.recover(false);
}
@@ -372,28 +397,6 @@ void BrokerAdapter::ChannelHandlerImpl::ok()
//no specific action required, generic response handling should be sufficient
}
-
-//
-// Message class method handlers
-//
-void BrokerAdapter::ChannelHandlerImpl::ping()
-{
- client.ok();
- client.pong();
-}
-
-
-void
-BrokerAdapter::ChannelHandlerImpl::pong()
-{
- client.ok();
-}
-
-void BrokerAdapter::ChannelHandlerImpl::resume(const string& /*channel*/)
-{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
-}
-
void BrokerAdapter::setResponseTo(RequestId r)
{
basicHandler.client.setResponseTo(r);
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index a7e27a0ee6..4ae8346580 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -72,10 +72,9 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
throw ConnectionException(540, "File class not implemented"); }
StreamHandler* getStreamHandler() {
throw ConnectionException(540, "Stream class not implemented"); }
- DtxHandler* getDtxHandler() {
- throw ConnectionException(540, "Dtx class not implemented"); }
TunnelHandler* getTunnelHandler() {
throw ConnectionException(540, "Tunnel class not implemented"); }
+ SessionHandler* getSessionHandler() { throw ConnectionException(503, "Session class not implemented yet"); }
DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
@@ -117,13 +116,16 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
ExchangeHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
void declare(uint16_t ticket,
- const std::string& exchange, const std::string& type,
- bool passive, bool durable, bool autoDelete,
- bool internal, bool nowait,
+ const std::string& exchange, const std::string& type,
+ const std::string& alternateExchange,
+ bool passive, bool durable, bool autoDelete,
const qpid::framing::FieldTable& arguments);
void delete_(uint16_t ticket,
- const std::string& exchange, bool ifUnused, bool nowait);
+ const std::string& exchange, bool ifUnused);
void query(u_int16_t ticket, const string& name);
+ private:
+ void checkType(Exchange::shared_ptr exchange, const std::string& type);
+ void checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate);
};
class BindingHandlerImpl :
@@ -147,13 +149,14 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
public:
QueueHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
- void declare(uint16_t ticket, const std::string& queue,
+ void declare(uint16_t ticket, const std::string& queue,
+ const std::string& alternateExchange,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait,
const qpid::framing::FieldTable& arguments);
void bind(uint16_t ticket, const std::string& queue,
const std::string& exchange, const std::string& routingKey,
- bool nowait, const qpid::framing::FieldTable& arguments);
+ const qpid::framing::FieldTable& arguments);
void unbind(uint16_t ticket,
const std::string& queue,
const std::string& exchange,
@@ -186,7 +189,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
bool nowait);
void publish(uint16_t ticket,
const std::string& exchange, const std::string& routingKey,
- bool mandatory, bool immediate);
+ bool rejectUnroutable, bool immediate);
void get(uint16_t ticket, const std::string& queue,
bool noAck);
void ack(uint64_t deliveryTag, bool multiple);
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index 9b6bdf5a2b..a598717c5d 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -280,22 +280,31 @@ void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) {
}
void Channel::complete(Message::shared_ptr msg) {
- Exchange::shared_ptr exchange =
- connection.broker.getExchanges().get(msg->getExchange());
- assert(exchange.get());
if (txBuffer.get()) {
TxPublish* deliverable(new TxPublish(msg));
TxOp::shared_ptr op(deliverable);
- exchange->route(*deliverable, msg->getRoutingKey(),
- &(msg->getApplicationHeaders()));
+ route(msg, *deliverable);
txBuffer->enlist(op);
} else {
DeliverableMessage deliverable(msg);
- exchange->route(deliverable, msg->getRoutingKey(),
- &(msg->getApplicationHeaders()));
+ route(msg, deliverable);
}
}
+void Channel::route(Message::shared_ptr msg, Deliverable& strategy) {
+ Exchange::shared_ptr exchange = connection.broker.getExchanges().get(msg->getExchange());
+ assert(exchange.get());
+ exchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ if (!strategy.delivered) {
+ //TODO:if reject-unroutable, then reject
+ //else route to alternate exchange
+ if (exchange->getAlternate()) {
+ exchange->getAlternate()->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ }
+ }
+
+}
+
// Used by Basic
void Channel::ack(uint64_t deliveryTag, bool multiple){
if (multiple)
diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h
index a2b6bd3ef9..a70dce0ce8 100644
--- a/cpp/src/qpid/broker/BrokerChannel.h
+++ b/cpp/src/qpid/broker/BrokerChannel.h
@@ -33,6 +33,7 @@
#include "Consumer.h"
#include "DeliveryAdapter.h"
#include "DeliveryRecord.h"
+#include "Deliverable.h"
#include "DtxBuffer.h"
#include "DtxManager.h"
#include "MessageBuilder.h"
@@ -102,7 +103,7 @@ class Channel : public CompletionHandler
MessageBuilder messageBuilder;//builder for in-progress message
bool opened;
bool flowActive;
-
+ void route(Message::shared_ptr msg, Deliverable& strategy);
void complete(Message::shared_ptr msg);// completion handler for MessageBuilder
void record(const DeliveryRecord& delivery);
bool checkPrefetch(Message::shared_ptr& msg);
diff --git a/cpp/src/qpid/broker/BrokerExchange.h b/cpp/src/qpid/broker/BrokerExchange.h
index 968775cfe5..91c295e1b7 100644
--- a/cpp/src/qpid/broker/BrokerExchange.h
+++ b/cpp/src/qpid/broker/BrokerExchange.h
@@ -48,7 +48,7 @@ namespace qpid {
explicit Exchange(const string& _name) : name(_name), durable(false), persistenceId(0){}
Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args)
- : name(_name), durable(_durable), args(_args), persistenceId(0){}
+ : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0){}
virtual ~Exchange(){}
string getName() const { return name; }
@@ -59,6 +59,7 @@ namespace qpid {
void setAlternate(Exchange::shared_ptr _alternate) { alternate = _alternate; }
void incAlternateUsers() { alternateUsers++; }
void decAlternateUsers() { alternateUsers--; }
+ bool inUseAsAlternate() { return alternateUsers > 0; }
virtual string getType() const = 0;
virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
diff --git a/cpp/src/qpid/broker/BrokerMessageBase.h b/cpp/src/qpid/broker/BrokerMessageBase.h
index a6b039cd4d..73af3935a8 100644
--- a/cpp/src/qpid/broker/BrokerMessageBase.h
+++ b/cpp/src/qpid/broker/BrokerMessageBase.h
@@ -165,6 +165,8 @@ class Message : public PersistableMessage{
*/
virtual void releaseContent(MessageStore* /*store*/) {};
+ bool isImmediate() const { return immediate; }
+
private:
const ConnectionToken* publisher;
std::string exchange;
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
index 01f8250b84..efa295e44f 100644
--- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
@@ -43,7 +43,7 @@ MessageMessage::MessageMessage(
ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_
) : Message(publisher, transfer_->getDestination(),
transfer_->getRoutingKey(),
- transfer_->getMandatory(),
+ transfer_->getRejectUnroutable(),
transfer_->getImmediate(),
transfer_),
requestId(requestId_),
@@ -57,7 +57,7 @@ MessageMessage::MessageMessage(
ReferencePtr reference_
) : Message(publisher, transfer_->getDestination(),
transfer_->getRoutingKey(),
- transfer_->getMandatory(),
+ transfer_->getRejectUnroutable(),
transfer_->getImmediate(),
transfer_),
requestId(requestId_),
@@ -113,6 +113,7 @@ void MessageMessage::transferMessage(
transfer->getTicket(),
consumerTag,
getRedelivered(),
+ transfer->getRejectUnroutable(),
transfer->getImmediate(),
transfer->getTtl(),
transfer->getPriority(),
@@ -126,13 +127,14 @@ void MessageMessage::transferMessage(
transfer->getReplyTo(),
transfer->getContentType(),
transfer->getContentEncoding(),
+ 0, /*content-length*/
+ string(), /*type*/
transfer->getUserId(),
transfer->getAppId(),
transfer->getTransactionId(),
transfer->getSecurityToken(),
transfer->getApplicationHeaders(),
- body,
- transfer->getMandatory())));
+ body)));
} else {
// Thing to do here is to construct a simple reference message then deliver that instead
// fragmentation will be taken care of in the delivery if necessary;
@@ -143,6 +145,7 @@ void MessageMessage::transferMessage(
transfer->getTicket(),
consumerTag,
getRedelivered(),
+ transfer->getRejectUnroutable(),
transfer->getImmediate(),
transfer->getTtl(),
transfer->getPriority(),
@@ -156,13 +159,14 @@ void MessageMessage::transferMessage(
transfer->getReplyTo(),
transfer->getContentType(),
transfer->getContentEncoding(),
+ 0, /*content-length*/
+ string(), /*type*/
transfer->getUserId(),
transfer->getAppId(),
transfer->getTransactionId(),
transfer->getSecurityToken(),
transfer->getApplicationHeaders(),
- framing::Content(REFERENCE, refname),
- transfer->getMandatory()));
+ framing::Content(REFERENCE, refname)));
ReferencePtr newRef(new Reference(refname));
Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content));
newRef->append(newAppend);
@@ -288,6 +292,7 @@ MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version
transfer->getTicket(),
destination,
getRedelivered(),
+ transfer->getRejectUnroutable(),
transfer->getImmediate(),
transfer->getTtl(),
transfer->getPriority(),
@@ -301,13 +306,14 @@ MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version
transfer->getReplyTo(),
transfer->getContentType(),
transfer->getContentEncoding(),
+ 0, /*content-length*/
+ string(), /*type*/
transfer->getUserId(),
transfer->getAppId(),
transfer->getTransactionId(),
transfer->getSecurityToken(),
transfer->getApplicationHeaders(),
- body,
- transfer->getMandatory());
+ body);
}
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp
index cf6beff375..f8bffa01a3 100644
--- a/cpp/src/qpid/broker/BrokerQueue.cpp
+++ b/cpp/src/qpid/broker/BrokerQueue.cpp
@@ -56,8 +56,15 @@ Queue::Queue(const string& _name, bool _autodelete,
Queue::~Queue(){}
void Queue::deliver(Message::shared_ptr& msg){
- enqueue(0, msg);
- process(msg);
+ if (msg->isImmediate() && getConsumerCount() == 0) {
+ if (alternateExchange) {
+ DeliverableMessage deliverable(msg);
+ alternateExchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ }
+ } else {
+ enqueue(0, msg);
+ process(msg);
+ }
}
void Queue::recover(Message::shared_ptr& msg){
@@ -255,6 +262,7 @@ void Queue::destroy()
&(msg.getMessage().getApplicationHeaders()));
pop();
}
+ alternateExchange->decAlternateUsers();
}
if (store) {
@@ -318,3 +326,8 @@ void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange)
{
alternateExchange = exchange;
}
+
+boost::shared_ptr<Exchange> Queue::getAlternateExchange()
+{
+ return alternateExchange;
+}
diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h
index 0ed368e404..f82a7dac55 100644
--- a/cpp/src/qpid/broker/BrokerQueue.h
+++ b/cpp/src/qpid/broker/BrokerQueue.h
@@ -155,7 +155,7 @@ namespace qpid {
const QueuePolicy* const getPolicy();
void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
-
+ boost::shared_ptr<Exchange> getAlternateExchange();
//PersistableQueue support:
uint64_t getPersistenceId() const;
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 7a987f28d2..5b22167323 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -87,6 +87,7 @@ void Connection::closed(){
broker.getQueues().destroy(q->getName());
exclusiveQueues.erase(exclusiveQueues.begin());
q->unbind(broker.getExchanges(), q);
+ q->destroy();
}
} catch(std::exception& e) {
QPID_LOG(error, " Unhandled exception while closing session: " <<
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.cpp b/cpp/src/qpid/broker/ConnectionAdapter.cpp
index bb2a66bfdb..65933660f1 100644
--- a/cpp/src/qpid/broker/ConnectionAdapter.cpp
+++ b/cpp/src/qpid/broker/ConnectionAdapter.cpp
@@ -106,15 +106,14 @@ void Handler::open(const string& /*virtualHost*/,
const string& /*capabilities*/, bool /*insist*/)
{
string knownhosts;
- client.openOk(
- knownhosts);//GRS, context.getRequestId());
+ client.openOk(knownhosts);
}
void Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/,
uint16_t /*classId*/, uint16_t /*methodId*/)
{
- client.closeOk();//GRS context.getRequestId());
+ client.closeOk();
connection.getOutput().close();
}
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionAdapter.h
index b624102cd2..6890b014a4 100644
--- a/cpp/src/qpid/broker/ConnectionAdapter.h
+++ b/cpp/src/qpid/broker/ConnectionAdapter.h
@@ -67,11 +67,11 @@ public:
AccessHandler* getAccessHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
FileHandler* getFileHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
StreamHandler* getStreamHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
- DtxHandler* getDtxHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
TunnelHandler* getTunnelHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
DtxCoordinationHandler* getDtxCoordinationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
DtxDemarcationHandler* getDtxDemarcationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
ExecutionHandler* getExecutionHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
+ SessionHandler* getSessionHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
framing::ProtocolVersion getVersion() const;
};
diff --git a/cpp/src/qpid/broker/Deliverable.h b/cpp/src/qpid/broker/Deliverable.h
index 1570917849..cd1dbaa85d 100644
--- a/cpp/src/qpid/broker/Deliverable.h
+++ b/cpp/src/qpid/broker/Deliverable.h
@@ -27,6 +27,8 @@ namespace qpid {
namespace broker {
class Deliverable{
public:
+ bool delivered;
+ Deliverable() : delivered(false) {}
virtual void deliverTo(Queue::shared_ptr& queue) = 0;
virtual ~Deliverable(){}
};
diff --git a/cpp/src/qpid/broker/DeliverableMessage.cpp b/cpp/src/qpid/broker/DeliverableMessage.cpp
index a713f306a8..9a3752d71c 100644
--- a/cpp/src/qpid/broker/DeliverableMessage.cpp
+++ b/cpp/src/qpid/broker/DeliverableMessage.cpp
@@ -29,9 +29,11 @@ DeliverableMessage::DeliverableMessage(Message::shared_ptr& _msg) : msg(_msg)
void DeliverableMessage::deliverTo(Queue::shared_ptr& queue)
{
queue->deliver(msg);
+ delivered = true;
}
Message& DeliverableMessage::getMessage()
{
return *msg;
}
+
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
index d1f925e40c..72d3888e37 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
@@ -52,7 +52,6 @@ const int XA_OK(8);
void DtxHandlerImpl::select()
{
channel.selectDtx();
- dClient.selectOk();
}
void DtxHandlerImpl::end(u_int16_t /*ticket*/,
@@ -140,7 +139,7 @@ void DtxHandlerImpl::rollback(u_int16_t /*ticket*/,
void DtxHandlerImpl::recover(u_int16_t /*ticket*/,
bool /*startscan*/,
- u_int32_t /*endscan*/ )
+ bool /*endscan*/ )
{
//TODO: what do startscan and endscan actually mean?
@@ -193,7 +192,6 @@ void DtxHandlerImpl::setTimeout(u_int16_t /*ticket*/,
u_int32_t timeout)
{
broker.getDtxManager().setTimeout(xid, timeout);
- cClient.setTimeoutOk();
}
void DtxHandlerImpl::setResponseTo(framing::RequestId r)
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.h b/cpp/src/qpid/broker/DtxHandlerImpl.h
index e18d3c153d..6139b95bd6 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.h
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.h
@@ -48,7 +48,7 @@ public:
void prepare(u_int16_t ticket, const std::string& xid);
- void recover(u_int16_t ticket, bool startscan, u_int32_t endscan);
+ void recover(u_int16_t ticket, bool startscan, bool endscan);
void rollback(u_int16_t ticket, const std::string& xid);
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp
index 732d45dc44..edc9a5b63b 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -72,8 +72,9 @@ void ExchangeRegistry::destroy(const string& name){
Exchange::shared_ptr ExchangeRegistry::get(const string& name){
RWlock::ScopedRlock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
- if (i == exchanges.end())
- throw ChannelException(404, "Exchange not found:" + name);
+ if (i == exchanges.end()) {
+ throw ChannelException(404, "Exchange not found: " + name);
+ }
return i->second;
}
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index de32368158..41dd8cc145 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -45,14 +45,14 @@ void
MessageHandlerImpl::cancel(const string& destination )
{
channel.cancel(destination);
- client.ok();
+ //client.ok();
}
void
MessageHandlerImpl::open(const string& reference)
{
references.open(reference);
- client.ok();
+ //client.ok();
}
void
@@ -60,14 +60,14 @@ MessageHandlerImpl::append(const framing::MethodContext& context)
{
MessageAppendBody::shared_ptr body(boost::shared_polymorphic_downcast<MessageAppendBody>(context.methodBody));
references.get(body->getReference())->append(body);
- client.ok();
+ //client.ok();
}
void
MessageHandlerImpl::close(const string& reference)
{
- Reference::shared_ptr ref = references.get(reference);
- client.ok();
+ Reference::shared_ptr ref = references.get(reference);
+ //client.ok();
// Send any transfer messages to their correct exchanges and okay them
const Reference::Messages& msgs = ref->getMessages();
@@ -85,7 +85,7 @@ MessageHandlerImpl::checkpoint(const string& /*reference*/,
{
// Initial implementation (which is conforming) is to do nothing here
// and return offset zero for the resume
- client.ok();
+ //client.ok();
}
void
@@ -123,7 +123,7 @@ MessageHandlerImpl::consume(uint16_t /*ticket*/,
channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())),
tag, queue, !noAck, exclusive,
noLocal ? &connection : 0, &filter);
- client.ok();
+ //client.ok();
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
}
@@ -137,10 +137,11 @@ MessageHandlerImpl::get(uint16_t /*ticket*/,
Queue::shared_ptr queue = getQueue(queueName);
GetAdapter out(adapter, queue, destination, connection.getFrameMax());
- if(channel.get(out, queue, !noAck))
+ if(channel.get(out, queue, !noAck)) {
client.ok();
- else
+ } else {
client.empty();
+ }
}
void
@@ -166,14 +167,14 @@ MessageHandlerImpl::qos(uint32_t prefetchSize,
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- client.ok();
+ //client.ok();
}
void
MessageHandlerImpl::recover(bool requeue)
{
channel.recover(requeue);
- client.ok();
+ //client.ok();
}
void
@@ -192,7 +193,7 @@ MessageHandlerImpl::transfer(const framing::MethodContext& context)
if (transfer->getBody().isInline()) {
MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer));
channel.handleInlineTransfer(message);
- client.ok();
+ client.ok();
} else {
Reference::shared_ptr ref(references.get(transfer->getBody().getValue()));
MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer, ref));
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index f616ec2db8..2b1de1bbc0 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -75,7 +75,7 @@ void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQ
}
}
-void SemanticHandler::complete(u_int32_t mark)
+void SemanticHandler::complete(uint32_t mark, uint16_t /*range- not decoded correctly yet*/)
{
//just record it for now (will eventually need to use it to ack messages):
outgoing.lwm = SequenceNumber(mark);
@@ -85,7 +85,10 @@ void SemanticHandler::flush()
{
//flush doubles as a sync to begin with - send an execution.complete
incoming.lwm = incoming.hwm;
- send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue())));
+ if (isOpen()) {
+ /*use dummy value for range which is not yet encoded correctly*/
+ send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), 0)));
+ }
}
void SemanticHandler::handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
index 6003bbec0c..a57559d043 100644
--- a/cpp/src/qpid/broker/SemanticHandler.h
+++ b/cpp/src/qpid/broker/SemanticHandler.h
@@ -60,7 +60,7 @@ public:
void handle(framing::AMQFrame& frame);
//execution class method handlers:
- void complete(u_int32_t cumulativeExecutionMark);
+ void complete(uint32_t cumulativeExecutionMark, uint16_t);
void flush();
};
diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp
index 6e03f37dcd..db02673b1f 100644
--- a/cpp/src/qpid/broker/TxPublish.cpp
+++ b/cpp/src/qpid/broker/TxPublish.cpp
@@ -44,6 +44,7 @@ void TxPublish::rollback() throw(){
void TxPublish::deliverTo(Queue::shared_ptr& queue){
queues.push_back(queue);
+ delivered = true;
}
TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg)