diff options
| author | Alan Conway <aconway@apache.org> | 2007-01-29 16:13:24 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-01-29 16:13:24 +0000 |
| commit | 5a1b8a846bdfa5cb517da0c507f3dc3a8ceec25d (patch) | |
| tree | f9a982b65400154a86edd02faf75da143a96404c /cpp/lib/client/ClientChannel.cpp | |
| parent | 5d28464c46c1e64ded078a4585f0f49e30b8b5d6 (diff) | |
| download | qpid-python-5a1b8a846bdfa5cb517da0c507f3dc3a8ceec25d.tar.gz | |
* Added ClientAdapter - client side ChannelAdapter. Updated client side.
* Moved ChannelAdapter initialization from ctor to init(), updated broker side.
* Improved various exception messages with boost::format messages.
* Removed unnecssary virtual inheritance.
* Widespread: fixed incorrect non-const ProtocolVersion& parameters.
* Client API: pass channels by reference, not pointer.
* codegen:
- MethodBodyClass.h.templ: Added CLASS_ID, METHOD_ID and isA() template.
- Various: fixed non-const ProtocolVersion& parameters.
* cpp/bootstrap: Allow config arguments with -build.
* cpp/gen/Makefile.am: Merged codegen fixes from trunk.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501087 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client/ClientChannel.cpp')
| -rw-r--r-- | cpp/lib/client/ClientChannel.cpp | 408 |
1 files changed, 246 insertions, 162 deletions
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index d9edb2f390..b93596ebfc 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -23,42 +23,115 @@ #include <ClientMessage.h> #include <QpidError.h> #include <MethodBodyInstances.h> +#include "Connection.h" + +// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent +// handling of errors that should close the connection or the channel. +// Make sure the user thread receives a connection in each case. +// using namespace boost; //to use dynamic_pointer_cast using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; +const std::string Channel::OK("OK"); + Channel::Channel(bool _transactional, u_int16_t _prefetch) : - id(0), - con(0), - out(0), + connection(0), incoming(0), - closed(true), prefetch(_prefetch), - transactional(_transactional), -// AMQP version management change - kpvdr 2006-11-20 -// TODO: Make this class version-aware and link these hard-wired numbers to that version - version(8, 0) + transactional(_transactional) { } Channel::~Channel(){ - stop(); + close(); +} + +void Channel::open(ChannelId id, Connection& con) +{ + if (isOpen()) + THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id); + connection = &con; + init(id, con, con.getVersion()); // ChannelAdapter initialization. + string oob; + if (id != 0) + sendAndReceive<ChannelOpenOkBody>(new ChannelOpenBody(version, oob)); +} + +void Channel::protocolInit( + const std::string& uid, const std::string& pwd, const std::string& vhost) { + assert(connection); + responses.expect(); + connection->connector->init(); // Send ProtocolInit block. + responses.receive<ConnectionStartBody>(); + + FieldTable props; + string mechanism("PLAIN"); + string response = ((char)0) + uid + ((char)0) + pwd; + string locale("en_US"); + // TODO aconway 2007-01-26: Move client over to proxy model, + // symmetric with server. + ConnectionTuneBody::shared_ptr proposal = + sendAndReceive<ConnectionTuneBody>( + new ConnectionStartOkBody( + version, props, mechanism, response, locale)); + + /** + * Assume for now that further challenges will not be required + //receive connection.secure + responses.receive(connection_secure)); + //send connection.secure-ok + connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); + **/ + + connection->send( + new AMQFrame( + version, 0, + new ConnectionTuneOkBody( + version, proposal->getChannelMax(), + connection->getMaxFrameSize(), + proposal->getHeartbeat()))); + + u_int16_t heartbeat = proposal->getHeartbeat(); + connection->connector->setReadTimeout(heartbeat * 2); + connection->connector->setWriteTimeout(heartbeat); + + // Send connection open. + std::string capabilities; + responses.expect(); + send(new AMQFrame( + version, 0, + new ConnectionOpenBody(version, vhost, capabilities, true))); + //receive connection.open-ok (or redirect, but ignore that for now + //esp. as using force=true). + responses.waitForResponse(); + if(responses.validate<ConnectionOpenOkBody>()) { + //ok + }else if(responses.validate<ConnectionRedirectBody>()){ + //ignore for now + ConnectionRedirectBody::shared_ptr redirect( + shared_polymorphic_downcast<ConnectionRedirectBody>( + responses.getResponse())); + std::cout << "Received redirection to " << redirect->getHost() + << std::endl; + } else { + THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response"); + } } + +bool Channel::isOpen() const { return connection; } void Channel::setPrefetch(u_int16_t _prefetch){ prefetch = _prefetch; - if(con != 0 && out != 0){ - setQos(); - } + setQos(); } void Channel::setQos(){ -// AMQP version management change - kpvdr 2006-11-20 -// TODO: Make this class version-aware and link these hard-wired numbers to that version - sendAndReceive(new AMQFrame(version, id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok); + sendAndReceive<BasicQosOkBody>( + new BasicQosBody(version, 0, prefetch, false)); if(transactional){ - sendAndReceive(new AMQFrame(version, id, new TxSelectBody(version)), method_bodies.tx_select_ok); + sendAndReceive<TxSelectOkBody>(new TxSelectBody(version)); } } @@ -66,62 +139,51 @@ void Channel::declareExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); string type = exchange.getType(); FieldTable args; - AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args)); - if(synch){ - sendAndReceive(frame, method_bodies.exchange_declare_ok); - }else{ - out->send(frame); - } + sendAndReceiveSync<ExchangeDeclareOkBody>( + synch, + new ExchangeDeclareBody( + version, 0, name, type, false, false, false, false, !synch, args)); } void Channel::deleteExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); - AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeleteBody(version, 0, name, false, !synch)); - if(synch){ - sendAndReceive(frame, method_bodies.exchange_delete_ok); - }else{ - out->send(frame); - } + sendAndReceiveSync<ExchangeDeleteOkBody>( + synch, + new ExchangeDeleteBody(version, 0, name, false, !synch)); } void Channel::declareQueue(Queue& queue, bool synch){ string name = queue.getName(); FieldTable args; - AMQFrame* frame = new AMQFrame(version, id, new QueueDeclareBody(version, 0, name, false, false, - queue.isExclusive(), - queue.isAutoDelete(), !synch, args)); - if(synch){ - sendAndReceive(frame, method_bodies.queue_declare_ok); + sendAndReceiveSync<QueueDeclareOkBody>( + synch, + new QueueDeclareBody( + version, 0, name, false, false, + queue.isExclusive(), queue.isAutoDelete(), !synch, args)); + if (synch) { if(queue.getName().length() == 0){ QueueDeclareOkBody::shared_ptr response = - dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse()); + shared_polymorphic_downcast<QueueDeclareOkBody>( + responses.getResponse()); queue.setName(response->getQueue()); } - }else{ - out->send(frame); } } void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){ //ticket, queue, ifunused, ifempty, nowait string name = queue.getName(); - AMQFrame* frame = new AMQFrame(version, id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)); - if(synch){ - sendAndReceive(frame, method_bodies.queue_delete_ok); - }else{ - out->send(frame); - } + sendAndReceiveSync<QueueDeleteOkBody>( + synch, + new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)); } void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ string e = exchange.getName(); string q = queue.getName(); - AMQFrame* frame = new AMQFrame(version, id, new QueueBindBody(version, 0, q, e, key,!synch, args)); - if(synch){ - sendAndReceive(frame, method_bodies.queue_bind_ok); - }else{ - out->send(frame); - } + sendAndReceiveSync<QueueBindOkBody>( + synch, + new QueueBindBody(version, 0, q, e, key,!synch, args)); } void Channel::consume( @@ -129,52 +191,48 @@ void Channel::consume( int ackMode, bool noLocal, bool synch, const FieldTable* fields) { string q = queue.getName(); - AMQFrame* frame = - new AMQFrame(version, - id, - new BasicConsumeBody( - version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch, - fields ? *fields : FieldTable())); - if(synch){ - sendAndReceive(frame, method_bodies.basic_consume_ok); - BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse()); + sendAndReceiveSync<BasicConsumeOkBody>( + synch, + new BasicConsumeBody( + version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch, + fields ? *fields : FieldTable())); + if (synch) { + BasicConsumeOkBody::shared_ptr response = + shared_polymorphic_downcast<BasicConsumeOkBody>( + responses.getResponse()); tag = response->getConsumerTag(); - }else{ - out->send(frame); - } - Consumer* c = new Consumer(); - c->listener = listener; - c->ackMode = ackMode; - c->lastDeliveryTag = 0; - consumers[tag] = c; -} - -void Channel::cancel(std::string& tag, bool synch){ - Consumer* c = consumers[tag]; - if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){ - out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true))); } - - AMQFrame* frame = new AMQFrame(version, id, new BasicCancelBody(version, (string&) tag, !synch)); - if(synch){ - sendAndReceive(frame, method_bodies.basic_cancel_ok); - }else{ - out->send(frame); - } - consumers.erase(tag); - if(c != 0){ - delete c; + Consumer& c = consumers[tag]; + c.listener = listener; + c.ackMode = ackMode; + c.lastDeliveryTag = 0; +} + +void Channel::cancel(const std::string& tag, bool synch) { + ConsumerMap::iterator i = consumers.find(tag); + if (i != consumers.end()) { + Consumer& c = i->second; + if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) + send(new BasicAckBody(version, c.lastDeliveryTag, true)); + sendAndReceiveSync<BasicCancelOkBody>( + synch, new BasicCancelBody(version, tag, !synch)); + consumers.erase(tag); } } void Channel::cancelAll(){ - for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){ - Consumer* c = i->second; - if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){ - out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true))); + while(!consumers.empty()) { + Consumer c = consumers.begin()->second; + consumers.erase(consumers.begin()); + if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) + && c.lastDeliveryTag > 0) + { + // Let exceptions propagate, if one fails no point + // trying the rest. NB no memory leaks if we do, + // ConsumerMap holds values, not pointers. + // + send(new BasicAckBody(version, c.lastDeliveryTag, true)); } - consumers.erase(i); - delete c; } } @@ -191,26 +249,28 @@ void Channel::retrieve(Message& msg){ retrieved = 0; } -bool Channel::get(Message& msg, const Queue& queue, int ackMode){ +bool Channel::get(Message& msg, const Queue& queue, int ackMode) { string name = queue.getName(); - AMQFrame* frame = new AMQFrame(version, id, new BasicGetBody(version, 0, name, ackMode)); + AMQBody::shared_ptr body(new BasicGetBody(version, 0, name, ackMode)); responses.expect(); - out->send(frame); + send(body); responses.waitForResponse(); AMQMethodBody::shared_ptr response = responses.getResponse(); - if(method_bodies.basic_get_ok.match(response.get())){ + if(response->isA<BasicGetOkBody>()) { if(incoming != 0){ std::cout << "Existing message not complete" << std::endl; + // FIXME aconway 2007-01-26: close the connection? the channel? THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); }else{ incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response)); } retrieve(msg); return true; - }if(method_bodies.basic_get_empty.match(response.get())){ + }if(response->isA<BasicGetEmptyBody>()){ return false; }else{ - THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get."); + // FIXME aconway 2007-01-26: must close the connection. + THROW_QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame"); } } @@ -219,25 +279,24 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string& string e = exchange.getName(); string key = routingKey; - out->send(new AMQFrame(version, id, new BasicPublishBody(version, 0, e, key, mandatory, immediate))); + send(new BasicPublishBody(version, 0, e, key, mandatory, immediate)); //break msg up into header frame and content frame(s) and send these string data = msg.getData(); msg.header->setContentSize(data.length()); - AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header)); - out->send(new AMQFrame(version, id, body)); + send(msg.header); u_int64_t data_length = data.length(); if(data_length > 0){ - u_int32_t frag_size = con->getMaxFrameSize() - 8;//frame itself uses 8 bytes + u_int32_t frag_size = connection->getMaxFrameSize() - 8;//frame itself uses 8 bytes if(data_length < frag_size){ - out->send(new AMQFrame(version, id, new AMQContentBody(data))); + send(new AMQContentBody(data)); }else{ u_int32_t offset = 0; u_int32_t remaining = data_length - offset; while (remaining > 0) { u_int32_t length = remaining > frag_size ? frag_size : remaining; string frag(data.substr(offset, length)); - out->send(new AMQFrame(version, id, new AMQContentBody(frag))); + send(new AMQContentBody(frag)); offset += length; remaining = data_length - offset; @@ -247,56 +306,48 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string& } void Channel::commit(){ - AMQFrame* frame = new AMQFrame(version, id, new TxCommitBody(version)); - sendAndReceive(frame, method_bodies.tx_commit_ok); + sendAndReceive<TxCommitOkBody>(new TxCommitBody(version)); } void Channel::rollback(){ - AMQFrame* frame = new AMQFrame(version, id, new TxRollbackBody(version)); - sendAndReceive(frame, method_bodies.tx_rollback_ok); -} - -void Channel::handleRequest(AMQRequestBody::shared_ptr body) { - // FIXME aconway 2007-01-19: request/response handling. - handleMethod(body); + sendAndReceive<TxRollbackOkBody>(new TxRollbackBody(version)); } -void Channel::handleResponse(AMQResponseBody::shared_ptr body) { - // FIXME aconway 2007-01-19: request/response handling. - handleMethod(body); -} - -void Channel::handleMethod(AMQMethodBody::shared_ptr body){ - //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request +void Channel::handleMethodInContext( + AMQMethodBody::shared_ptr body, const MethodContext&) +{ + //channel.flow, channel.close, basic.deliver, basic.return or a + //response to a synchronous request if(responses.isWaiting()){ responses.signalResponse(body); - }else if(method_bodies.basic_deliver.match(body.get())){ + }else if(body->isA<BasicDeliverBody>()) { if(incoming != 0){ + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl; THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); }else{ incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body)); } - }else if(method_bodies.basic_return.match(body.get())){ + }else if(body->isA<BasicReturnBody>()){ if(incoming != 0){ std::cout << "Existing message not complete" << std::endl; THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); }else{ incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body)); } - }else if(method_bodies.channel_close.match(body.get())){ - con->removeChannel(this); - //need to signal application that channel has been closed through exception - - }else if(method_bodies.channel_flow.match(body.get())){ - + }else if(body->isA<ChannelCloseBody>()){ + peerClose(shared_polymorphic_downcast<ChannelCloseBody>(body)); + }else if(body->isA<ChannelFlowBody>()){ + // TODO aconway 2007-01-24: + }else if(body->isA<ConnectionCloseBody>()){ + connection->close(); }else{ - //signal error - std::cout << "Unhandled method: " << *body << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Unhandled method"); + connection->close( + 504, "Unrecognised method", + body->amqpClassId(), body->amqpMethodId()); } } - + void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ if(incoming == 0){ //handle invalid frame sequence @@ -331,27 +382,16 @@ void Channel::start(){ dispatcher = Thread(this); } -void Channel::stop(){ - { - Monitor::ScopedLock l(dispatchMonitor); - closed = true; - responses.signalResponse(AMQMethodBody::shared_ptr()); - dispatchMonitor.notify(); - } - dispatcher.join(); -} - void Channel::run(){ dispatch(); } void Channel::enqueue(){ + Monitor::ScopedLock l(retrievalMonitor); if(incoming->isResponse()){ - Monitor::ScopedLock l(retrievalMonitor); retrieved = incoming; retrievalMonitor.notify(); }else{ - Monitor::ScopedLock l(dispatchMonitor); messages.push(incoming); dispatchMonitor.notify(); } @@ -360,7 +400,7 @@ void Channel::enqueue(){ IncomingMessage* Channel::dequeue(){ Monitor::ScopedLock l(dispatchMonitor); - while(messages.empty() && !closed){ + while(messages.empty() && isOpen()){ dispatchMonitor.wait(); } IncomingMessage* msg = 0; @@ -371,25 +411,25 @@ IncomingMessage* Channel::dequeue(){ return msg; } -void Channel::deliver(Consumer* consumer, Message& msg){ +void Channel::deliver(Consumer& consumer, Message& msg){ //record delivery tag: - consumer->lastDeliveryTag = msg.getDeliveryTag(); + consumer.lastDeliveryTag = msg.getDeliveryTag(); //allow registered listener to handle the message - consumer->listener->received(msg); + consumer.listener->received(msg); //if the handler calls close on the channel or connection while //handling this message, then consumer will now have been deleted. - if(!closed){ + if(isOpen()){ bool multiple(false); - switch(consumer->ackMode){ - case LAZY_ACK: + switch(consumer.ackMode){ + case LAZY_ACK: multiple = true; - if(++(consumer->count) < prefetch) break; + if(++(consumer.count) < prefetch) break; //else drop-through - case AUTO_ACK: - out->send(new AMQFrame(version, id, new BasicAckBody(version, msg.getDeliveryTag(), multiple))); - consumer->lastDeliveryTag = 0; + case AUTO_ACK: + send(new BasicAckBody(version, msg.getDeliveryTag(), multiple)); + consumer.lastDeliveryTag = 0; } } @@ -399,7 +439,7 @@ void Channel::deliver(Consumer* consumer, Message& msg){ } void Channel::dispatch(){ - while(!closed){ + while(isOpen()){ IncomingMessage* incomingMsg = dequeue(); if(incomingMsg){ //Note: msg is currently only valid for duration of this call @@ -416,12 +456,10 @@ void Channel::dispatch(){ msg.deliveryTag = incomingMsg->getDeliveryTag(); std::string tag = incomingMsg->getConsumerTag(); - if(consumers[tag] == 0){ - //signal error + if(consumers.find(tag) == consumers.end()) std::cout << "Unknown consumer: " << tag << std::endl; - }else{ + else deliver(consumers[tag], msg); - } } delete incomingMsg; } @@ -432,14 +470,60 @@ void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){ returnsHandler = handler; } -void Channel::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){ - responses.expect(); - out->send(frame); - responses.receive(body); +// Close called by local application. +void Channel::close( + u_int16_t code, const std::string& text, + ClassId classId, MethodId methodId) +{ + // FIXME aconway 2007-01-26: Locking? + if (getId() != 0 && isOpen()) { + try { + sendAndReceive<ChannelCloseOkBody>( + new ChannelCloseBody(version, code, text, classId, methodId)); + cancelAll(); + closeInternal(); + } catch (...) { + closeInternal(); + throw; + } + } +} + +// Channel closed by peer. +void Channel::peerClose(ChannelCloseBody::shared_ptr) { + assert(isOpen()); + closeInternal(); + // FIXME aconway 2007-01-26: How to throw the proper exception + // to the application thread? } -void Channel::close(){ - if(con != 0){ - con->closeChannel(this); +void Channel::closeInternal() { + assert(isOpen()); + { + Monitor::ScopedLock l(dispatchMonitor); + static_cast<ConnectionForChannel*>(connection)->erase(getId()); + connection = 0; + // A 0 response means we are closed. + responses.signalResponse(AMQMethodBody::shared_ptr()); + dispatchMonitor.notify(); } + dispatcher.join(); } + +void Channel::sendAndReceive(AMQBody* toSend, ClassId c, MethodId m) +{ + responses.expect(); + send(toSend); + responses.receive(c, m); +} + +void Channel::sendAndReceiveSync( + bool sync, AMQBody* body, ClassId c, MethodId m) +{ + if(sync) + sendAndReceive(body, c, m); + else + send(body); +} + + |
