summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp13
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h1
-rw-r--r--cpp/src/qpid/broker/BrokerMessage.h1
-rw-r--r--cpp/src/qpid/broker/BrokerMessageBase.h1
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.cpp9
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.h6
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.cpp10
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.h3
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.cpp6
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.h2
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp9
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.h4
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp14
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h8
14 files changed, 24 insertions, 63 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 26c2b30ac6..77030855ff 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -400,18 +400,5 @@ void BrokerAdapter::ChannelHandlerImpl::ok()
//no specific action required, generic response handling should be sufficient
}
-void BrokerAdapter::setResponseTo(RequestId r)
-{
- basicHandler.client.setResponseTo(r);
- channelHandler.client.setResponseTo(r);
- exchangeHandler.client.setResponseTo(r);
- bindingHandler.client.setResponseTo(r);
- messageHandler.client.setResponseTo(r);
- queueHandler.client.setResponseTo(r);
- txHandler.client.setResponseTo(r);
- dtxHandler.setResponseTo(r);
-}
-
-
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index 4ae8346580..3fe2eb9eba 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -85,7 +85,6 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
}
framing::AMQP_ClientProxy& getProxy() { return proxy; }
- void setResponseTo(framing::RequestId r);
private:
diff --git a/cpp/src/qpid/broker/BrokerMessage.h b/cpp/src/qpid/broker/BrokerMessage.h
index 3f22f07aec..af8e4e62e9 100644
--- a/cpp/src/qpid/broker/BrokerMessage.h
+++ b/cpp/src/qpid/broker/BrokerMessage.h
@@ -35,7 +35,6 @@
namespace qpid {
namespace framing {
-class MethodContext;
class ChannelAdapter;
class AMQHeaderBody;
}
diff --git a/cpp/src/qpid/broker/BrokerMessageBase.h b/cpp/src/qpid/broker/BrokerMessageBase.h
index aeb34880eb..94035905ce 100644
--- a/cpp/src/qpid/broker/BrokerMessageBase.h
+++ b/cpp/src/qpid/broker/BrokerMessageBase.h
@@ -33,7 +33,6 @@
namespace qpid {
namespace framing {
-class MethodContext;
class ChannelAdapter;
class BasicHeaderProperties;
class FieldTable;
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
index 9ad27093bb..0da5f3d8f5 100644
--- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
@@ -48,27 +48,24 @@ struct MessageDeliveryToken : public DeliveryToken
};
MessageMessage::MessageMessage(
- ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_
+ ConnectionToken* publisher, TransferPtr transfer_
) : Message(publisher, transfer_->getDestination(),
transfer_->getRoutingKey(),
transfer_->getRejectUnroutable(),
transfer_->getImmediate(),
transfer_),
- requestId(requestId_),
transfer(transfer_)
{
assert(transfer->getBody().isInline());
}
MessageMessage::MessageMessage(
- ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_,
- ReferencePtr reference_
+ ConnectionToken* publisher, TransferPtr transfer_, ReferencePtr reference_
) : Message(publisher, transfer_->getDestination(),
transfer_->getRoutingKey(),
transfer_->getRejectUnroutable(),
transfer_->getImmediate(),
transfer_),
- requestId(requestId_),
transfer(transfer_),
reference(reference_)
{
@@ -178,7 +175,7 @@ void MessageMessage::transferMessage(
ReferencePtr newRef(new Reference(refname));
Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content));
newRef->append(newAppend);
- MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef);
+ MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), newTransfer, newRef);
newMsg.transferMessage(channel, consumerTag, framesize);
return;
}
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.h b/cpp/src/qpid/broker/BrokerMessageMessage.h
index 4efd22c9fe..6b1bd9ab5d 100644
--- a/cpp/src/qpid/broker/BrokerMessageMessage.h
+++ b/cpp/src/qpid/broker/BrokerMessageMessage.h
@@ -43,13 +43,12 @@ class MessageMessage: public Message{
typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr;
typedef boost::shared_ptr<Reference> ReferencePtr;
- MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer);
- MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer, ReferencePtr reference);
+ MessageMessage(ConnectionToken* publisher, TransferPtr transfer);
+ MessageMessage(ConnectionToken* publisher, TransferPtr transfer, ReferencePtr reference);
MessageMessage();
// Default destructor okay
- framing::RequestId getRequestId() const {return requestId; }
TransferPtr getTransfer() const { return transfer; }
ReferencePtr getReference() const ;
@@ -86,7 +85,6 @@ class MessageMessage: public Message{
const std::string& destination,
const framing::Content& body) const;
- framing::RequestId requestId;
const TransferPtr transfer;
const boost::shared_ptr<Reference> reference;
};
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.cpp b/cpp/src/qpid/broker/ConnectionAdapter.cpp
index 4fa4b2c238..a67a5557c6 100644
--- a/cpp/src/qpid/broker/ConnectionAdapter.cpp
+++ b/cpp/src/qpid/broker/ConnectionAdapter.cpp
@@ -21,14 +21,12 @@
#include "ConnectionAdapter.h"
#include "Connection.h"
-#include "qpid/framing/MethodContext.h"
using namespace qpid;
using namespace qpid::broker;
using qpid::framing::ReplyCode;
using qpid::framing::ClassId;
using qpid::framing::MethodId;
-using qpid::framing::MethodContext;
using qpid::framing::FieldTable;
void ConnectionAdapter::init(const framing::ProtocolInitiation& header) {
@@ -44,13 +42,11 @@ void ConnectionAdapter::close(ReplyCode code, const string& text, ClassId classI
handler->client.close(code, text, classId, methodId);
}
-void ConnectionAdapter::handleMethodInContext(
- boost::shared_ptr<qpid::framing::AMQMethodBody> method,
- const MethodContext& context
-)
+void ConnectionAdapter::handleMethod(
+ boost::shared_ptr<qpid::framing::AMQMethodBody> method)
{
try{
- method->invoke(*this, context);
+ method->invoke(*this);
}catch(ConnectionException& e){
handler->client.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
}catch(std::exception& e){
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionAdapter.h
index 6890b014a4..1ce850a659 100644
--- a/cpp/src/qpid/broker/ConnectionAdapter.h
+++ b/cpp/src/qpid/broker/ConnectionAdapter.h
@@ -47,8 +47,7 @@ public:
void handle(framing::AMQFrame& frame);
//ChannelAdapter virtual methods:
- void handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
- const qpid::framing::MethodContext& context);
+ void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody> method);
bool isOpen() const { return true; } //channel 0 is always open
//never needed:
void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>) {}
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
index 72d3888e37..8b3629dff9 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
@@ -25,7 +25,6 @@ using namespace qpid::broker;
using qpid::framing::AMQP_ClientProxy;
using qpid::framing::Buffer;
using qpid::framing::FieldTable;
-using qpid::framing::MethodContext;
using std::string;
DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) :
@@ -194,9 +193,4 @@ void DtxHandlerImpl::setTimeout(u_int16_t /*ticket*/,
broker.getDtxManager().setTimeout(xid, timeout);
}
-void DtxHandlerImpl::setResponseTo(framing::RequestId r)
-{
- dClient.setResponseTo(r);
- cClient.setResponseTo(r);
-}
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.h b/cpp/src/qpid/broker/DtxHandlerImpl.h
index 6139b95bd6..067ba47fb5 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.h
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.h
@@ -36,8 +36,6 @@ class DtxHandlerImpl
public:
DtxHandlerImpl(CoreRefs& parent);
- void setResponseTo(framing::RequestId r);
-
// DtxCoordinationHandler:
void commit(u_int16_t ticket, const std::string& xid, bool onePhase);
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index 70f7c3b8ec..b5bea05eac 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -54,7 +54,7 @@ MessageHandlerImpl::open(const string& /*reference*/)
}
void
-MessageHandlerImpl::append(const framing::MethodContext& /*context*/)
+MessageHandlerImpl::append(const framing::AMQMethodBody& )
{
throw ConnectionException(540, "References no longer supported");
}
@@ -157,14 +157,13 @@ MessageHandlerImpl::reject(uint16_t /*code*/, const string& /*text*/ )
}
void
-MessageHandlerImpl::transfer(const framing::MethodContext& context)
+MessageHandlerImpl::transfer(const framing::AMQMethodBody& context)
{
MessageTransferBody::shared_ptr transfer(
- boost::shared_polymorphic_downcast<MessageTransferBody>(
- context.methodBody));
+ make_shared_ptr(new MessageTransferBody(static_cast<const MessageTransferBody&>(context))));
if (transfer->getBody().isInline()) {
- MessageMessage::shared_ptr message(new MessageMessage(&connection, 0, transfer));
+ MessageMessage::shared_ptr message(new MessageMessage(&connection, transfer));
channel.handleInlineTransfer(message);
} else {
throw ConnectionException(540, "References no longer supported");
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.h b/cpp/src/qpid/broker/MessageHandlerImpl.h
index b7e91795ec..20cae46da4 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.h
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.h
@@ -40,7 +40,7 @@ class MessageHandlerImpl :
public:
MessageHandlerImpl(CoreRefs& parent);
- void append(const framing::MethodContext& context);
+ void append(const framing::AMQMethodBody& context);
void cancel(const std::string& destination );
@@ -82,7 +82,7 @@ class MessageHandlerImpl :
void resume(const std::string& reference,
const std::string& identifier );
- void transfer(const framing::MethodContext& context);
+ void transfer(const framing::AMQMethodBody& context);
void flow(const std::string& destination, u_int8_t unit, u_int32_t value);
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index fd0a5cfbe1..6ef2162a4a 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -60,13 +60,12 @@ void SemanticHandler::handle(framing::AMQFrame& frame)
}
//ChannelAdapter virtual methods:
-void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
- const qpid::framing::MethodContext& context)
+void SemanticHandler::handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody> method)
{
try {
if (!method->invoke(this)) {
//else do the usual:
- handleL4(method, context);
+ handleL4(method);
//(if the frameset is complete) we can move the execution-mark
//forward
@@ -113,8 +112,7 @@ void SemanticHandler::flush()
}
}
-void SemanticHandler::handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
- const qpid::framing::MethodContext& context)
+void SemanticHandler::handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method)
{
try{
if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
@@ -124,7 +122,7 @@ void SemanticHandler::handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> m
throw ConnectionException(504, out.str());
}
} else {
- method->invoke(*adapter, context);
+ method->invoke(*adapter);
}
}catch(const ChannelException& e){
adapter->getProxy().getChannel().close(
@@ -171,7 +169,7 @@ void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_
msg->deliver(*this, tag, token, connection.getFrameMax());
}
-RequestId SemanticHandler::send(shared_ptr<AMQBody> body)
+void SemanticHandler::send(shared_ptr<AMQBody> body)
{
Mutex::ScopedLock l(outLock);
uint8_t type(body->type());
@@ -182,5 +180,5 @@ RequestId SemanticHandler::send(shared_ptr<AMQBody> body)
//std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body << std::endl;
}
}
- return ChannelAdapter::send(body);
+ ChannelAdapter::send(body);
}
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
index 7d5d95243e..016c94738d 100644
--- a/cpp/src/qpid/broker/SemanticHandler.h
+++ b/cpp/src/qpid/broker/SemanticHandler.h
@@ -48,18 +48,16 @@ class SemanticHandler : private framing::ChannelAdapter,
framing::Window outgoing;
sys::Mutex outLock;
- void handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
- const qpid::framing::MethodContext& context);
+ void handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method);
//ChannelAdapter virtual methods:
- void handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
- const qpid::framing::MethodContext& context);
+ void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody> method);
bool isOpen() const;
void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>);
void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>);
void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>);
- framing::RequestId send(shared_ptr<framing::AMQBody> body);
+ void send(shared_ptr<framing::AMQBody> body);
//delivery adapter methods: