summaryrefslogtreecommitdiff
path: root/cpp/lib/client/ClientChannel.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-01-29 16:13:24 +0000
committerAlan Conway <aconway@apache.org>2007-01-29 16:13:24 +0000
commit5a1b8a846bdfa5cb517da0c507f3dc3a8ceec25d (patch)
treef9a982b65400154a86edd02faf75da143a96404c /cpp/lib/client/ClientChannel.cpp
parent5d28464c46c1e64ded078a4585f0f49e30b8b5d6 (diff)
downloadqpid-python-5a1b8a846bdfa5cb517da0c507f3dc3a8ceec25d.tar.gz
* Added ClientAdapter - client side ChannelAdapter. Updated client side.
* Moved ChannelAdapter initialization from ctor to init(), updated broker side. * Improved various exception messages with boost::format messages. * Removed unnecssary virtual inheritance. * Widespread: fixed incorrect non-const ProtocolVersion& parameters. * Client API: pass channels by reference, not pointer. * codegen: - MethodBodyClass.h.templ: Added CLASS_ID, METHOD_ID and isA() template. - Various: fixed non-const ProtocolVersion& parameters. * cpp/bootstrap: Allow config arguments with -build. * cpp/gen/Makefile.am: Merged codegen fixes from trunk. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501087 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client/ClientChannel.cpp')
-rw-r--r--cpp/lib/client/ClientChannel.cpp408
1 files changed, 246 insertions, 162 deletions
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp
index d9edb2f390..b93596ebfc 100644
--- a/cpp/lib/client/ClientChannel.cpp
+++ b/cpp/lib/client/ClientChannel.cpp
@@ -23,42 +23,115 @@
#include <ClientMessage.h>
#include <QpidError.h>
#include <MethodBodyInstances.h>
+#include "Connection.h"
+
+// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
+// handling of errors that should close the connection or the channel.
+// Make sure the user thread receives a connection in each case.
+//
using namespace boost; //to use dynamic_pointer_cast
using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
+const std::string Channel::OK("OK");
+
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
- id(0),
- con(0),
- out(0),
+ connection(0),
incoming(0),
- closed(true),
prefetch(_prefetch),
- transactional(_transactional),
-// AMQP version management change - kpvdr 2006-11-20
-// TODO: Make this class version-aware and link these hard-wired numbers to that version
- version(8, 0)
+ transactional(_transactional)
{ }
Channel::~Channel(){
- stop();
+ close();
+}
+
+void Channel::open(ChannelId id, Connection& con)
+{
+ if (isOpen())
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id);
+ connection = &con;
+ init(id, con, con.getVersion()); // ChannelAdapter initialization.
+ string oob;
+ if (id != 0)
+ sendAndReceive<ChannelOpenOkBody>(new ChannelOpenBody(version, oob));
+}
+
+void Channel::protocolInit(
+ const std::string& uid, const std::string& pwd, const std::string& vhost) {
+ assert(connection);
+ responses.expect();
+ connection->connector->init(); // Send ProtocolInit block.
+ responses.receive<ConnectionStartBody>();
+
+ FieldTable props;
+ string mechanism("PLAIN");
+ string response = ((char)0) + uid + ((char)0) + pwd;
+ string locale("en_US");
+ // TODO aconway 2007-01-26: Move client over to proxy model,
+ // symmetric with server.
+ ConnectionTuneBody::shared_ptr proposal =
+ sendAndReceive<ConnectionTuneBody>(
+ new ConnectionStartOkBody(
+ version, props, mechanism, response, locale));
+
+ /**
+ * Assume for now that further challenges will not be required
+ //receive connection.secure
+ responses.receive(connection_secure));
+ //send connection.secure-ok
+ connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
+ **/
+
+ connection->send(
+ new AMQFrame(
+ version, 0,
+ new ConnectionTuneOkBody(
+ version, proposal->getChannelMax(),
+ connection->getMaxFrameSize(),
+ proposal->getHeartbeat())));
+
+ u_int16_t heartbeat = proposal->getHeartbeat();
+ connection->connector->setReadTimeout(heartbeat * 2);
+ connection->connector->setWriteTimeout(heartbeat);
+
+ // Send connection open.
+ std::string capabilities;
+ responses.expect();
+ send(new AMQFrame(
+ version, 0,
+ new ConnectionOpenBody(version, vhost, capabilities, true)));
+ //receive connection.open-ok (or redirect, but ignore that for now
+ //esp. as using force=true).
+ responses.waitForResponse();
+ if(responses.validate<ConnectionOpenOkBody>()) {
+ //ok
+ }else if(responses.validate<ConnectionRedirectBody>()){
+ //ignore for now
+ ConnectionRedirectBody::shared_ptr redirect(
+ shared_polymorphic_downcast<ConnectionRedirectBody>(
+ responses.getResponse()));
+ std::cout << "Received redirection to " << redirect->getHost()
+ << std::endl;
+ } else {
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
+ }
}
+
+bool Channel::isOpen() const { return connection; }
void Channel::setPrefetch(u_int16_t _prefetch){
prefetch = _prefetch;
- if(con != 0 && out != 0){
- setQos();
- }
+ setQos();
}
void Channel::setQos(){
-// AMQP version management change - kpvdr 2006-11-20
-// TODO: Make this class version-aware and link these hard-wired numbers to that version
- sendAndReceive(new AMQFrame(version, id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok);
+ sendAndReceive<BasicQosOkBody>(
+ new BasicQosBody(version, 0, prefetch, false));
if(transactional){
- sendAndReceive(new AMQFrame(version, id, new TxSelectBody(version)), method_bodies.tx_select_ok);
+ sendAndReceive<TxSelectOkBody>(new TxSelectBody(version));
}
}
@@ -66,62 +139,51 @@ void Channel::declareExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
string type = exchange.getType();
FieldTable args;
- AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args));
- if(synch){
- sendAndReceive(frame, method_bodies.exchange_declare_ok);
- }else{
- out->send(frame);
- }
+ sendAndReceiveSync<ExchangeDeclareOkBody>(
+ synch,
+ new ExchangeDeclareBody(
+ version, 0, name, type, false, false, false, false, !synch, args));
}
void Channel::deleteExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
- AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeleteBody(version, 0, name, false, !synch));
- if(synch){
- sendAndReceive(frame, method_bodies.exchange_delete_ok);
- }else{
- out->send(frame);
- }
+ sendAndReceiveSync<ExchangeDeleteOkBody>(
+ synch,
+ new ExchangeDeleteBody(version, 0, name, false, !synch));
}
void Channel::declareQueue(Queue& queue, bool synch){
string name = queue.getName();
FieldTable args;
- AMQFrame* frame = new AMQFrame(version, id, new QueueDeclareBody(version, 0, name, false, false,
- queue.isExclusive(),
- queue.isAutoDelete(), !synch, args));
- if(synch){
- sendAndReceive(frame, method_bodies.queue_declare_ok);
+ sendAndReceiveSync<QueueDeclareOkBody>(
+ synch,
+ new QueueDeclareBody(
+ version, 0, name, false, false,
+ queue.isExclusive(), queue.isAutoDelete(), !synch, args));
+ if (synch) {
if(queue.getName().length() == 0){
QueueDeclareOkBody::shared_ptr response =
- dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse());
+ shared_polymorphic_downcast<QueueDeclareOkBody>(
+ responses.getResponse());
queue.setName(response->getQueue());
}
- }else{
- out->send(frame);
}
}
void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
//ticket, queue, ifunused, ifempty, nowait
string name = queue.getName();
- AMQFrame* frame = new AMQFrame(version, id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
- if(synch){
- sendAndReceive(frame, method_bodies.queue_delete_ok);
- }else{
- out->send(frame);
- }
+ sendAndReceiveSync<QueueDeleteOkBody>(
+ synch,
+ new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
}
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
string e = exchange.getName();
string q = queue.getName();
- AMQFrame* frame = new AMQFrame(version, id, new QueueBindBody(version, 0, q, e, key,!synch, args));
- if(synch){
- sendAndReceive(frame, method_bodies.queue_bind_ok);
- }else{
- out->send(frame);
- }
+ sendAndReceiveSync<QueueBindOkBody>(
+ synch,
+ new QueueBindBody(version, 0, q, e, key,!synch, args));
}
void Channel::consume(
@@ -129,52 +191,48 @@ void Channel::consume(
int ackMode, bool noLocal, bool synch, const FieldTable* fields)
{
string q = queue.getName();
- AMQFrame* frame =
- new AMQFrame(version,
- id,
- new BasicConsumeBody(
- version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch,
- fields ? *fields : FieldTable()));
- if(synch){
- sendAndReceive(frame, method_bodies.basic_consume_ok);
- BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse());
+ sendAndReceiveSync<BasicConsumeOkBody>(
+ synch,
+ new BasicConsumeBody(
+ version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch,
+ fields ? *fields : FieldTable()));
+ if (synch) {
+ BasicConsumeOkBody::shared_ptr response =
+ shared_polymorphic_downcast<BasicConsumeOkBody>(
+ responses.getResponse());
tag = response->getConsumerTag();
- }else{
- out->send(frame);
- }
- Consumer* c = new Consumer();
- c->listener = listener;
- c->ackMode = ackMode;
- c->lastDeliveryTag = 0;
- consumers[tag] = c;
-}
-
-void Channel::cancel(std::string& tag, bool synch){
- Consumer* c = consumers[tag];
- if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){
- out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true)));
}
-
- AMQFrame* frame = new AMQFrame(version, id, new BasicCancelBody(version, (string&) tag, !synch));
- if(synch){
- sendAndReceive(frame, method_bodies.basic_cancel_ok);
- }else{
- out->send(frame);
- }
- consumers.erase(tag);
- if(c != 0){
- delete c;
+ Consumer& c = consumers[tag];
+ c.listener = listener;
+ c.ackMode = ackMode;
+ c.lastDeliveryTag = 0;
+}
+
+void Channel::cancel(const std::string& tag, bool synch) {
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i != consumers.end()) {
+ Consumer& c = i->second;
+ if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
+ send(new BasicAckBody(version, c.lastDeliveryTag, true));
+ sendAndReceiveSync<BasicCancelOkBody>(
+ synch, new BasicCancelBody(version, tag, !synch));
+ consumers.erase(tag);
}
}
void Channel::cancelAll(){
- for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){
- Consumer* c = i->second;
- if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){
- out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true)));
+ while(!consumers.empty()) {
+ Consumer c = consumers.begin()->second;
+ consumers.erase(consumers.begin());
+ if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
+ && c.lastDeliveryTag > 0)
+ {
+ // Let exceptions propagate, if one fails no point
+ // trying the rest. NB no memory leaks if we do,
+ // ConsumerMap holds values, not pointers.
+ //
+ send(new BasicAckBody(version, c.lastDeliveryTag, true));
}
- consumers.erase(i);
- delete c;
}
}
@@ -191,26 +249,28 @@ void Channel::retrieve(Message& msg){
retrieved = 0;
}
-bool Channel::get(Message& msg, const Queue& queue, int ackMode){
+bool Channel::get(Message& msg, const Queue& queue, int ackMode) {
string name = queue.getName();
- AMQFrame* frame = new AMQFrame(version, id, new BasicGetBody(version, 0, name, ackMode));
+ AMQBody::shared_ptr body(new BasicGetBody(version, 0, name, ackMode));
responses.expect();
- out->send(frame);
+ send(body);
responses.waitForResponse();
AMQMethodBody::shared_ptr response = responses.getResponse();
- if(method_bodies.basic_get_ok.match(response.get())){
+ if(response->isA<BasicGetOkBody>()) {
if(incoming != 0){
std::cout << "Existing message not complete" << std::endl;
+ // FIXME aconway 2007-01-26: close the connection? the channel?
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
}else{
incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response));
}
retrieve(msg);
return true;
- }if(method_bodies.basic_get_empty.match(response.get())){
+ }if(response->isA<BasicGetEmptyBody>()){
return false;
}else{
- THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get.");
+ // FIXME aconway 2007-01-26: must close the connection.
+ THROW_QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame");
}
}
@@ -219,25 +279,24 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string&
string e = exchange.getName();
string key = routingKey;
- out->send(new AMQFrame(version, id, new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
+ send(new BasicPublishBody(version, 0, e, key, mandatory, immediate));
//break msg up into header frame and content frame(s) and send these
string data = msg.getData();
msg.header->setContentSize(data.length());
- AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header));
- out->send(new AMQFrame(version, id, body));
+ send(msg.header);
u_int64_t data_length = data.length();
if(data_length > 0){
- u_int32_t frag_size = con->getMaxFrameSize() - 8;//frame itself uses 8 bytes
+ u_int32_t frag_size = connection->getMaxFrameSize() - 8;//frame itself uses 8 bytes
if(data_length < frag_size){
- out->send(new AMQFrame(version, id, new AMQContentBody(data)));
+ send(new AMQContentBody(data));
}else{
u_int32_t offset = 0;
u_int32_t remaining = data_length - offset;
while (remaining > 0) {
u_int32_t length = remaining > frag_size ? frag_size : remaining;
string frag(data.substr(offset, length));
- out->send(new AMQFrame(version, id, new AMQContentBody(frag)));
+ send(new AMQContentBody(frag));
offset += length;
remaining = data_length - offset;
@@ -247,56 +306,48 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string&
}
void Channel::commit(){
- AMQFrame* frame = new AMQFrame(version, id, new TxCommitBody(version));
- sendAndReceive(frame, method_bodies.tx_commit_ok);
+ sendAndReceive<TxCommitOkBody>(new TxCommitBody(version));
}
void Channel::rollback(){
- AMQFrame* frame = new AMQFrame(version, id, new TxRollbackBody(version));
- sendAndReceive(frame, method_bodies.tx_rollback_ok);
-}
-
-void Channel::handleRequest(AMQRequestBody::shared_ptr body) {
- // FIXME aconway 2007-01-19: request/response handling.
- handleMethod(body);
+ sendAndReceive<TxRollbackOkBody>(new TxRollbackBody(version));
}
-void Channel::handleResponse(AMQResponseBody::shared_ptr body) {
- // FIXME aconway 2007-01-19: request/response handling.
- handleMethod(body);
-}
-
-void Channel::handleMethod(AMQMethodBody::shared_ptr body){
- //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request
+void Channel::handleMethodInContext(
+ AMQMethodBody::shared_ptr body, const MethodContext&)
+{
+ //channel.flow, channel.close, basic.deliver, basic.return or a
+ //response to a synchronous request
if(responses.isWaiting()){
responses.signalResponse(body);
- }else if(method_bodies.basic_deliver.match(body.get())){
+ }else if(body->isA<BasicDeliverBody>()) {
if(incoming != 0){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl;
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
}else{
incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body));
}
- }else if(method_bodies.basic_return.match(body.get())){
+ }else if(body->isA<BasicReturnBody>()){
if(incoming != 0){
std::cout << "Existing message not complete" << std::endl;
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
}else{
incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body));
}
- }else if(method_bodies.channel_close.match(body.get())){
- con->removeChannel(this);
- //need to signal application that channel has been closed through exception
-
- }else if(method_bodies.channel_flow.match(body.get())){
-
+ }else if(body->isA<ChannelCloseBody>()){
+ peerClose(shared_polymorphic_downcast<ChannelCloseBody>(body));
+ }else if(body->isA<ChannelFlowBody>()){
+ // TODO aconway 2007-01-24:
+ }else if(body->isA<ConnectionCloseBody>()){
+ connection->close();
}else{
- //signal error
- std::cout << "Unhandled method: " << *body << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Unhandled method");
+ connection->close(
+ 504, "Unrecognised method",
+ body->amqpClassId(), body->amqpMethodId());
}
}
-
+
void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
if(incoming == 0){
//handle invalid frame sequence
@@ -331,27 +382,16 @@ void Channel::start(){
dispatcher = Thread(this);
}
-void Channel::stop(){
- {
- Monitor::ScopedLock l(dispatchMonitor);
- closed = true;
- responses.signalResponse(AMQMethodBody::shared_ptr());
- dispatchMonitor.notify();
- }
- dispatcher.join();
-}
-
void Channel::run(){
dispatch();
}
void Channel::enqueue(){
+ Monitor::ScopedLock l(retrievalMonitor);
if(incoming->isResponse()){
- Monitor::ScopedLock l(retrievalMonitor);
retrieved = incoming;
retrievalMonitor.notify();
}else{
- Monitor::ScopedLock l(dispatchMonitor);
messages.push(incoming);
dispatchMonitor.notify();
}
@@ -360,7 +400,7 @@ void Channel::enqueue(){
IncomingMessage* Channel::dequeue(){
Monitor::ScopedLock l(dispatchMonitor);
- while(messages.empty() && !closed){
+ while(messages.empty() && isOpen()){
dispatchMonitor.wait();
}
IncomingMessage* msg = 0;
@@ -371,25 +411,25 @@ IncomingMessage* Channel::dequeue(){
return msg;
}
-void Channel::deliver(Consumer* consumer, Message& msg){
+void Channel::deliver(Consumer& consumer, Message& msg){
//record delivery tag:
- consumer->lastDeliveryTag = msg.getDeliveryTag();
+ consumer.lastDeliveryTag = msg.getDeliveryTag();
//allow registered listener to handle the message
- consumer->listener->received(msg);
+ consumer.listener->received(msg);
//if the handler calls close on the channel or connection while
//handling this message, then consumer will now have been deleted.
- if(!closed){
+ if(isOpen()){
bool multiple(false);
- switch(consumer->ackMode){
- case LAZY_ACK:
+ switch(consumer.ackMode){
+ case LAZY_ACK:
multiple = true;
- if(++(consumer->count) < prefetch) break;
+ if(++(consumer.count) < prefetch) break;
//else drop-through
- case AUTO_ACK:
- out->send(new AMQFrame(version, id, new BasicAckBody(version, msg.getDeliveryTag(), multiple)));
- consumer->lastDeliveryTag = 0;
+ case AUTO_ACK:
+ send(new BasicAckBody(version, msg.getDeliveryTag(), multiple));
+ consumer.lastDeliveryTag = 0;
}
}
@@ -399,7 +439,7 @@ void Channel::deliver(Consumer* consumer, Message& msg){
}
void Channel::dispatch(){
- while(!closed){
+ while(isOpen()){
IncomingMessage* incomingMsg = dequeue();
if(incomingMsg){
//Note: msg is currently only valid for duration of this call
@@ -416,12 +456,10 @@ void Channel::dispatch(){
msg.deliveryTag = incomingMsg->getDeliveryTag();
std::string tag = incomingMsg->getConsumerTag();
- if(consumers[tag] == 0){
- //signal error
+ if(consumers.find(tag) == consumers.end())
std::cout << "Unknown consumer: " << tag << std::endl;
- }else{
+ else
deliver(consumers[tag], msg);
- }
}
delete incomingMsg;
}
@@ -432,14 +470,60 @@ void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
returnsHandler = handler;
}
-void Channel::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
- responses.expect();
- out->send(frame);
- responses.receive(body);
+// Close called by local application.
+void Channel::close(
+ u_int16_t code, const std::string& text,
+ ClassId classId, MethodId methodId)
+{
+ // FIXME aconway 2007-01-26: Locking?
+ if (getId() != 0 && isOpen()) {
+ try {
+ sendAndReceive<ChannelCloseOkBody>(
+ new ChannelCloseBody(version, code, text, classId, methodId));
+ cancelAll();
+ closeInternal();
+ } catch (...) {
+ closeInternal();
+ throw;
+ }
+ }
+}
+
+// Channel closed by peer.
+void Channel::peerClose(ChannelCloseBody::shared_ptr) {
+ assert(isOpen());
+ closeInternal();
+ // FIXME aconway 2007-01-26: How to throw the proper exception
+ // to the application thread?
}
-void Channel::close(){
- if(con != 0){
- con->closeChannel(this);
+void Channel::closeInternal() {
+ assert(isOpen());
+ {
+ Monitor::ScopedLock l(dispatchMonitor);
+ static_cast<ConnectionForChannel*>(connection)->erase(getId());
+ connection = 0;
+ // A 0 response means we are closed.
+ responses.signalResponse(AMQMethodBody::shared_ptr());
+ dispatchMonitor.notify();
}
+ dispatcher.join();
}
+
+void Channel::sendAndReceive(AMQBody* toSend, ClassId c, MethodId m)
+{
+ responses.expect();
+ send(toSend);
+ responses.receive(c, m);
+}
+
+void Channel::sendAndReceiveSync(
+ bool sync, AMQBody* body, ClassId c, MethodId m)
+{
+ if(sync)
+ sendAndReceive(body, c, m);
+ else
+ send(body);
+}
+
+