summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerAdapter.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-08-31 20:51:22 +0000
committerAlan Conway <aconway@apache.org>2007-08-31 20:51:22 +0000
commit761e10501fe5ea51f9d8c40d9a200ae27193ab23 (patch)
treee2d4bdfdc0b9383661947378a1f183387501637c /cpp/src/qpid/broker/BrokerAdapter.cpp
parent655b3b5806bafdd784f6a9c242e26341bd6aeccc (diff)
downloadqpid-python-761e10501fe5ea51f9d8c40d9a200ae27193ab23.tar.gz
* Summary:
- Moved BrokerChannel functionality into Session. - Moved ChannelHandler methods handling into SessionAdapter. - Updated all handlers to use session. (We're still using AMQP channel methods in SessionAdapter) Roles & responsibilities: Session: - represents an _open_ session, may be active or suspended. - ows all session state including handler chains. - attahced to SessionAdapter when active, not when suspended. SessionAdapter: - reprents the association of a channel with a session. - owned by Connection, kept in the session map. - channel open == SessionAdapter.getSessio() != 0 Anything that depends on attachment to a channel, connection or protocol should be in SessionAdpater. Anything that suvives a session suspend belongs in Session. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@571575 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/BrokerAdapter.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp140
1 files changed, 56 insertions, 84 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 07b7b4f638..a6e9c007cf 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -15,10 +15,9 @@
* limitations under the License.
*
*/
-#include <boost/format.hpp>
-
#include "BrokerAdapter.h"
-#include "BrokerChannel.h"
+#include "Session.h"
+#include "SessionAdapter.h"
#include "Connection.h"
#include "DeliveryToken.h"
#include "MessageDelivery.h"
@@ -28,18 +27,23 @@
namespace qpid {
namespace broker {
-using boost::format;
using namespace qpid;
using namespace qpid::framing;
typedef std::vector<Queue::shared_ptr> QueueVector;
-
- BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b, ChannelAdapter& a) :
- CoreRefs(ch, c, b, a),
- connection(c),
+// FIXME aconway 2007-08-31: now that functionality is distributed
+// between different handlers, BrokerAdapter should be dropped.
+// Instead the individual class Handler interfaces can be implemented
+// by the handlers responsible for those classes.
+//
+
+BrokerAdapter::BrokerAdapter(Session& s, ChannelAdapter& a) :
+ CoreRefs(s,
+ s.getAdapter()->getConnection(),
+ s.getAdapter()->getConnection().broker,
+ a),
basicHandler(*this),
- channelHandler(*this),
exchangeHandler(*this),
bindingHandler(*this),
messageHandler(*this),
@@ -52,31 +56,6 @@ typedef std::vector<Queue::shared_ptr> QueueVector;
ProtocolVersion BrokerAdapter::getVersion() const {
return connection.getVersion();
}
-
-void BrokerAdapter::ChannelHandlerImpl::open(const string& /*outOfBand*/){
- channel.open();
- client.openOk();
-}
-
-void BrokerAdapter::ChannelHandlerImpl::flow(bool active){
- channel.flow(active);
- client.flowOk(active);
-}
-
-void BrokerAdapter::ChannelHandlerImpl::flowOk(bool /*active*/){}
-
-void BrokerAdapter::ChannelHandlerImpl::close(uint16_t /*replyCode*/,
- const string& /*replyText*/,
- uint16_t /*classId*/, uint16_t /*methodId*/)
-{
- client.closeOk();
- // FIXME aconway 2007-01-18: Following line will "delete this". Ugly.
- connection.closeChannel(channel.getId());
-}
-
-void BrokerAdapter::ChannelHandlerImpl::closeOk(){}
-
-
void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type,
const string& alternateExchange,
@@ -148,10 +127,10 @@ ExchangeQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket
}
BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
- const std::string& exchangeName,
- const std::string& queueName,
- const std::string& key,
- const framing::FieldTable& args)
+ const std::string& exchangeName,
+ const std::string& queueName,
+ const std::string& key,
+ const framing::FieldTable& args)
{
Exchange::shared_ptr exchange;
try {
@@ -181,7 +160,7 @@ BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/
QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name)
{
- Queue::shared_ptr queue = getQueue(name);
+ Queue::shared_ptr queue = session.getQueue(name);
Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
return QueueQueryResult(queue->getName(),
@@ -205,7 +184,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
}
Queue::shared_ptr queue;
if (passive && !name.empty()) {
- queue = getQueue(name);
+ queue = session.getQueue(name);
//TODO: check alternate-exchange is as expected
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
@@ -216,7 +195,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
- channel.setDefaultQueue(queue);
+ session.setDefaultQueue(queue);
if (alternate) {
queue->setAlternateExchange(alternate);
alternate->incAlternateUsers();
@@ -236,17 +215,16 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
}
}
if (exclusive && !queue->isExclusiveOwner(&connection))
- throw ChannelException(
- 405,
- format("Cannot grant exclusive access to queue '%s'")
- % queue->getName());
+ throw ResourceLockedException(
+ QPID_MSG("Cannot grant exclusive access to queue "
+ << queue->getName()));
}
void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName,
const string& exchangeName, const string& routingKey,
const FieldTable& arguments){
- Queue::shared_ptr queue = getQueue(queueName);
+ Queue::shared_ptr queue = session.getQueue(queueName);
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
@@ -257,23 +235,23 @@ void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& qu
}
}
}else{
- throw ChannelException(
- 404, "Bind failed. No such exchange: " + exchangeName);
+ throw NotFoundException(
+ "Bind failed. No such exchange: " + exchangeName);
}
}
void
BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/,
- const string& queueName,
- const string& exchangeName,
- const string& routingKey,
- const qpid::framing::FieldTable& arguments )
+ const string& queueName,
+ const string& exchangeName,
+ const string& routingKey,
+ const qpid::framing::FieldTable& arguments )
{
- Queue::shared_ptr queue = getQueue(queueName);
- if (!queue.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName);
+ Queue::shared_ptr queue = session.getQueue(queueName);
+ if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
- if (!exchange.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName);
+ if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
if (exchange->unbind(queue, routingKey, &arguments) && exchange->isDurable() && queue->isDurable()) {
broker.getStore().unbind(*exchange, *queue, routingKey, arguments);
@@ -282,17 +260,16 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/,
}
void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){
- getQueue(queue)->purge();
+ session.getQueue(queue)->purge();
}
-void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue,
- bool ifUnused, bool ifEmpty){
+void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty){
ChannelException error(0, "");
- Queue::shared_ptr q = getQueue(queue);
+ Queue::shared_ptr q = session.getQueue(queue);
if(ifEmpty && q->getMessageCount() > 0){
- throw ChannelException(406, "Queue not empty.");
+ throw PreconditionFailedException("Queue not empty.");
}else if(ifUnused && q->getConsumerCount() > 0){
- throw ChannelException(406, "Queue in use.");
+ throw PreconditionFailedException("Queue in use.");
}else{
//remove the queue from the list of exclusive queues if necessary
if(q->isExclusiveOwner(&connection)){
@@ -310,18 +287,18 @@ void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string&
void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){
//TODO: handle global
- channel.setPrefetchSize(prefetchSize);
- channel.setPrefetchCount(prefetchCount);
+ session.setPrefetchSize(prefetchSize);
+ session.setPrefetchCount(prefetchCount);
}
void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
- const string& queueName, const string& consumerTag,
- bool noLocal, bool noAck, bool exclusive,
- bool nowait, const FieldTable& fields)
+ const string& queueName, const string& consumerTag,
+ bool noLocal, bool noAck, bool exclusive,
+ bool nowait, const FieldTable& fields)
{
- Queue::shared_ptr queue = getQueue(queueName);
- if(!consumerTag.empty() && channel.exists(consumerTag)){
+ Queue::shared_ptr queue = session.getQueue(queueName);
+ if(!consumerTag.empty() && session.exists(consumerTag)){
throw ConnectionException(530, "Consumer tags must be unique");
}
string newTag = consumerTag;
@@ -329,7 +306,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
//also version specific behaviour now)
if (newTag.empty()) newTag = tagGenerator.generate();
DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag));
- channel.consume(token, newTag, queue, noLocal, !noAck, exclusive, &fields);
+ session.consume(token, newTag, queue, noLocal, !noAck, exclusive, &fields);
if(!nowait) client.consumeOk(newTag);
@@ -338,13 +315,13 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
}
void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){
- channel.cancel(consumerTag);
+ session.cancel(consumerTag);
}
void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){
- Queue::shared_ptr queue = getQueue(queueName);
+ Queue::shared_ptr queue = session.getQueue(queueName);
DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue));
- if(!channel.get(token, queue, !noAck)){
+ if(!session.get(token, queue, !noAck)){
string clusterId;//not used, part of an imatix hack
client.getEmpty(clusterId);
@@ -353,9 +330,9 @@ void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& que
void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){
if (multiple) {
- channel.ackCumulative(deliveryTag);
+ session.ackCumulative(deliveryTag);
} else {
- channel.ackRange(deliveryTag, deliveryTag);
+ session.ackRange(deliveryTag, deliveryTag);
}
}
@@ -363,29 +340,24 @@ void BrokerAdapter::BasicHandlerImpl::reject(uint64_t /*deliveryTag*/, bool /*re
void BrokerAdapter::BasicHandlerImpl::recover(bool requeue)
{
- channel.recover(requeue);
+ session.recover(requeue);
}
void BrokerAdapter::TxHandlerImpl::select()
{
- channel.startTx();
+ session.startTx();
}
void BrokerAdapter::TxHandlerImpl::commit()
{
- channel.commit(&broker.getStore());
+ session.commit(&broker.getStore());
}
void BrokerAdapter::TxHandlerImpl::rollback()
{
- channel.rollback();
- channel.recover(false);
+ session.rollback();
+ session.recover(false);
}
-void BrokerAdapter::ChannelHandlerImpl::ok()
-{
- //no specific action required, generic response handling should be sufficient
-}
-
}} // namespace qpid::broker