diff options
| author | Alan Conway <aconway@apache.org> | 2007-01-29 16:13:24 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-01-29 16:13:24 +0000 |
| commit | 5a1b8a846bdfa5cb517da0c507f3dc3a8ceec25d (patch) | |
| tree | f9a982b65400154a86edd02faf75da143a96404c /cpp/lib/client/Connection.cpp | |
| parent | 5d28464c46c1e64ded078a4585f0f49e30b8b5d6 (diff) | |
| download | qpid-python-5a1b8a846bdfa5cb517da0c507f3dc3a8ceec25d.tar.gz | |
* Added ClientAdapter - client side ChannelAdapter. Updated client side.
* Moved ChannelAdapter initialization from ctor to init(), updated broker side.
* Improved various exception messages with boost::format messages.
* Removed unnecssary virtual inheritance.
* Widespread: fixed incorrect non-const ProtocolVersion& parameters.
* Client API: pass channels by reference, not pointer.
* codegen:
- MethodBodyClass.h.templ: Added CLASS_ID, METHOD_ID and isA() template.
- Various: fixed non-const ProtocolVersion& parameters.
* cpp/bootstrap: Allow config arguments with -build.
* cpp/gen/Makefile.am: Merged codegen fixes from trunk.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501087 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client/Connection.cpp')
| -rw-r--r-- | cpp/lib/client/Connection.cpp | 254 |
1 files changed, 70 insertions, 184 deletions
diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index 1ae317db62..19d5cce7db 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -18,35 +18,46 @@ * under the License. * */ +#include <boost/format.hpp> + #include <Connection.h> #include <ClientChannel.h> #include <ClientMessage.h> #include <QpidError.h> #include <iostream> +#include <sstream> #include <MethodBodyInstances.h> -using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; using namespace qpid::sys; -u_int16_t Connection::channelIdCounter; + +namespace qpid { +namespace client { + +ChannelId Connection::channelIdCounter; + +const std::string Connection::OK("OK"); Connection::Connection( bool debug, u_int32_t _max_frame_size, - qpid::framing::ProtocolVersion* _version + const framing::ProtocolVersion& _version ) : max_frame_size(_max_frame_size), closed(true), - version(_version->getMajor(),_version->getMinor()) + version(_version) { - connector = new Connector( - version, requester, responder, debug, _max_frame_size); + connector = new Connector(version, debug, _max_frame_size); } Connection::~Connection(){ delete connector; } -void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){ +void Connection::open( + const std::string& _host, int _port, const std::string& uid, + const std::string& pwd, const std::string& virtualhost) +{ + host = _host; port = _port; connector->setInputHandler(this); @@ -55,197 +66,69 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui out = connector->getOutputHandler(); connector->connect(host, port); - ProtocolInitiation* header = new ProtocolInitiation(version); - responses.expect(); - connector->init(header); - responses.receive(method_bodies.connection_start); - - FieldTable props; - string mechanism("PLAIN"); - string response = ((char)0) + uid + ((char)0) + pwd; - string locale("en_US"); - responses.expect(); - out->send(new AMQFrame(version, 0, new ConnectionStartOkBody(version, props, mechanism, response, locale))); - - /** - * Assume for now that further challenges will not be required - //receive connection.secure - responses.receive(connection_secure)); - //send connection.secure-ok - out->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); - **/ - - responses.receive(method_bodies.connection_tune); - - ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse()); - out->send(new AMQFrame(version, 0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat()))); - - u_int16_t heartbeat = proposal->getHeartbeat(); - connector->setReadTimeout(heartbeat * 2); - connector->setWriteTimeout(heartbeat); - - //send connection.open - string capabilities; - string vhost = virtualhost; - responses.expect(); - out->send(new AMQFrame(version, 0, new ConnectionOpenBody(version, vhost, capabilities, true))); - //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true). - responses.waitForResponse(); - if(responses.validate(method_bodies.connection_open_ok)){ - //ok - }else if(responses.validate(method_bodies.connection_redirect)){ - //ignore for now - ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse())); - std::cout << "Received redirection to " << redirect->getHost() << std::endl; - }else{ - THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response"); - } - + // Open the special channel 0. + channels[0] = &channel0; + channel0.open(0, *this); + channel0.protocolInit(uid, pwd, virtualhost); } -void Connection::close(){ - if(!closed){ - u_int16_t code(200); - string text("Ok"); - u_int16_t classId(0); - u_int16_t methodId(0); - - sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok); +void Connection::close( + ReplyCode code, const string& msg, ClassId classId, MethodId methodId +) +{ + if(!closed) { + channel0.sendAndReceive<ConnectionCloseOkBody>( + new ConnectionCloseBody( + getVersion(), code, msg, classId, methodId)); connector->close(); } } -void Connection::openChannel(Channel* channel){ - channel->con = this; - channel->id = ++channelIdCounter; - channel->out = out; - channels[channel->id] = channel; - //now send frame to open channel and wait for response - string oob; - channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok); - channel->setQos(); - channel->closed = false; -} - -void Connection::closeChannel(Channel* channel){ - //send frame to close channel - u_int16_t code(200); - string text("Ok"); - u_int16_t classId(0); - u_int16_t methodId(0); - closeChannel(channel, code, text, classId, methodId); +// FIXME aconway 2007-01-26: make channels owned and created by connection? +void Connection::openChannel(Channel& channel) { + ChannelId id = ++channelIdCounter; + assert (channels.find(id) == channels.end()); + assert(out); + channels[id] = &channel; + channel.open(id, *this); } -void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){ - //send frame to close channel - channel->cancelAll(); - channel->closed = true; - channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok); - channel->con = 0; - channel->out = 0; - removeChannel(channel); -} - -void Connection::removeChannel(Channel* channel){ - //send frame to close channel - - channels.erase(channel->id); - channel->out = 0; - channel->id = 0; - channel->con = 0; +void Connection::erase(ChannelId id) { + channels.erase(id); } void Connection::received(AMQFrame* frame){ - AMQBody::shared_ptr body = frame->getBody(); - u_int8_t type = body->type(); - if (type == REQUEST_BODY) - responder.received(AMQRequestBody::getData(body)); - handleFrame(frame); - if (type == RESPONSE_BODY) - requester.processed(AMQResponseBody::getData(body)); -} - -void Connection::handleFrame(AMQFrame* frame){ - u_int16_t channelId = frame->getChannel(); - - if(channelId == 0){ - this->handleBody(frame->getBody()); - }else{ - Channel* channel = channels[channelId]; - if(channel == 0){ - error(504, "Unknown channel"); - }else{ - try{ - channel->handleBody(frame->getBody()); - }catch(qpid::QpidError e){ - channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e); - } - } + // FIXME aconway 2007-01-25: Mutex + ChannelId id = frame->getChannel(); + Channel* channel = channels[id]; + // FIXME aconway 2007-01-26: Exception thrown here is hanging the + // client. Need to review use of exceptions. + if (channel == 0) + THROW_QPID_ERROR( + PROTOCOL_ERROR+504, + (boost::format("Invalid channel number %g") % id).str()); + try{ + channel->handleBody(frame->getBody()); + }catch(const qpid::QpidError& e){ + channelException( + *channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e); } } -void Connection::handleRequest(AMQRequestBody::shared_ptr body) { - // FIXME aconway 2007-01-19: request/response handling. - handleMethod(body); -} - -void Connection::handleResponse(AMQResponseBody::shared_ptr body) { - // FIXME aconway 2007-01-19: request/response handling. - handleMethod(body); -} - -void Connection::handleMethod(AMQMethodBody::shared_ptr body){ - //connection.close, basic.deliver, basic.return or a response to a synchronous request - if(responses.isWaiting()){ - responses.signalResponse(body); - }else if(method_bodies.connection_close.match(body.get())){ - //send back close ok - //close socket - ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get()); - std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl; - connector->close(); - }else{ - std::cout << "Unhandled method for connection: " << *body << std::endl; - error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId()); - } -} - -void Connection::handleHeader(AMQHeaderBody::shared_ptr /*body*/){ - error(504, "Channel error: received header body with channel 0."); -} - -void Connection::handleContent(AMQContentBody::shared_ptr /*body*/){ - error(504, "Channel error: received content body with channel 0."); -} - -void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ -} - -void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){ - responses.expect(); +void Connection::send(AMQFrame* frame) { out->send(frame); - responses.receive(body); } -void Connection::error(int code, const string& msg, int classid, int methodid){ - std::cout << "Connection exception generated: " << code << msg; - if(classid || methodid){ - std::cout << " [" << methodid << ":" << classid << "]"; - } - std::cout << std::endl; - sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok); - connector->close(); -} - -void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){ - std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.location.file << ":" << e.location.line << ")" << std::endl; - int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500; +void Connection::channelException( + Channel& channel, AMQMethodBody* method, const QpidError& e) +{ + int code = (e.code >= PROTOCOL_ERROR) ? e.code - PROTOCOL_ERROR : 500; string msg = e.msg; - if(method == 0){ - closeChannel(channel, code, msg); - }else{ - closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId()); - } + if(method == 0) + channel.close(code, msg); + else + channel.close( + code, msg, method->amqpClassId(), method->amqpMethodId()); } void Connection::idleIn(){ @@ -259,9 +142,12 @@ void Connection::idleOut(){ void Connection::shutdown(){ closed = true; - //close all channels - for(iterator i = channels.begin(); i != channels.end(); i++){ - i->second->stop(); + //close all channels, also removes them from the map. + while(!channels.empty()){ + Channel* channel = channels.begin()->second; + if (channel != 0) + channel->close(); } - responses.signalResponse(AMQMethodBody::shared_ptr()); } + +}} // namespace qpid::client |
