summaryrefslogtreecommitdiff
path: root/cpp/lib/client/Connection.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-01-29 16:13:24 +0000
committerAlan Conway <aconway@apache.org>2007-01-29 16:13:24 +0000
commit5a1b8a846bdfa5cb517da0c507f3dc3a8ceec25d (patch)
treef9a982b65400154a86edd02faf75da143a96404c /cpp/lib/client/Connection.cpp
parent5d28464c46c1e64ded078a4585f0f49e30b8b5d6 (diff)
downloadqpid-python-5a1b8a846bdfa5cb517da0c507f3dc3a8ceec25d.tar.gz
* Added ClientAdapter - client side ChannelAdapter. Updated client side.
* Moved ChannelAdapter initialization from ctor to init(), updated broker side. * Improved various exception messages with boost::format messages. * Removed unnecssary virtual inheritance. * Widespread: fixed incorrect non-const ProtocolVersion& parameters. * Client API: pass channels by reference, not pointer. * codegen: - MethodBodyClass.h.templ: Added CLASS_ID, METHOD_ID and isA() template. - Various: fixed non-const ProtocolVersion& parameters. * cpp/bootstrap: Allow config arguments with -build. * cpp/gen/Makefile.am: Merged codegen fixes from trunk. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501087 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client/Connection.cpp')
-rw-r--r--cpp/lib/client/Connection.cpp254
1 files changed, 70 insertions, 184 deletions
diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp
index 1ae317db62..19d5cce7db 100644
--- a/cpp/lib/client/Connection.cpp
+++ b/cpp/lib/client/Connection.cpp
@@ -18,35 +18,46 @@
* under the License.
*
*/
+#include <boost/format.hpp>
+
#include <Connection.h>
#include <ClientChannel.h>
#include <ClientMessage.h>
#include <QpidError.h>
#include <iostream>
+#include <sstream>
#include <MethodBodyInstances.h>
-using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
using namespace qpid::sys;
-u_int16_t Connection::channelIdCounter;
+
+namespace qpid {
+namespace client {
+
+ChannelId Connection::channelIdCounter;
+
+const std::string Connection::OK("OK");
Connection::Connection(
bool debug, u_int32_t _max_frame_size,
- qpid::framing::ProtocolVersion* _version
+ const framing::ProtocolVersion& _version
) : max_frame_size(_max_frame_size), closed(true),
- version(_version->getMajor(),_version->getMinor())
+ version(_version)
{
- connector = new Connector(
- version, requester, responder, debug, _max_frame_size);
+ connector = new Connector(version, debug, _max_frame_size);
}
Connection::~Connection(){
delete connector;
}
-void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){
+void Connection::open(
+ const std::string& _host, int _port, const std::string& uid,
+ const std::string& pwd, const std::string& virtualhost)
+{
+
host = _host;
port = _port;
connector->setInputHandler(this);
@@ -55,197 +66,69 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui
out = connector->getOutputHandler();
connector->connect(host, port);
- ProtocolInitiation* header = new ProtocolInitiation(version);
- responses.expect();
- connector->init(header);
- responses.receive(method_bodies.connection_start);
-
- FieldTable props;
- string mechanism("PLAIN");
- string response = ((char)0) + uid + ((char)0) + pwd;
- string locale("en_US");
- responses.expect();
- out->send(new AMQFrame(version, 0, new ConnectionStartOkBody(version, props, mechanism, response, locale)));
-
- /**
- * Assume for now that further challenges will not be required
- //receive connection.secure
- responses.receive(connection_secure));
- //send connection.secure-ok
- out->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
- **/
-
- responses.receive(method_bodies.connection_tune);
-
- ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse());
- out->send(new AMQFrame(version, 0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat())));
-
- u_int16_t heartbeat = proposal->getHeartbeat();
- connector->setReadTimeout(heartbeat * 2);
- connector->setWriteTimeout(heartbeat);
-
- //send connection.open
- string capabilities;
- string vhost = virtualhost;
- responses.expect();
- out->send(new AMQFrame(version, 0, new ConnectionOpenBody(version, vhost, capabilities, true)));
- //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true).
- responses.waitForResponse();
- if(responses.validate(method_bodies.connection_open_ok)){
- //ok
- }else if(responses.validate(method_bodies.connection_redirect)){
- //ignore for now
- ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse()));
- std::cout << "Received redirection to " << redirect->getHost() << std::endl;
- }else{
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
- }
-
+ // Open the special channel 0.
+ channels[0] = &channel0;
+ channel0.open(0, *this);
+ channel0.protocolInit(uid, pwd, virtualhost);
}
-void Connection::close(){
- if(!closed){
- u_int16_t code(200);
- string text("Ok");
- u_int16_t classId(0);
- u_int16_t methodId(0);
-
- sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok);
+void Connection::close(
+ ReplyCode code, const string& msg, ClassId classId, MethodId methodId
+)
+{
+ if(!closed) {
+ channel0.sendAndReceive<ConnectionCloseOkBody>(
+ new ConnectionCloseBody(
+ getVersion(), code, msg, classId, methodId));
connector->close();
}
}
-void Connection::openChannel(Channel* channel){
- channel->con = this;
- channel->id = ++channelIdCounter;
- channel->out = out;
- channels[channel->id] = channel;
- //now send frame to open channel and wait for response
- string oob;
- channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok);
- channel->setQos();
- channel->closed = false;
-}
-
-void Connection::closeChannel(Channel* channel){
- //send frame to close channel
- u_int16_t code(200);
- string text("Ok");
- u_int16_t classId(0);
- u_int16_t methodId(0);
- closeChannel(channel, code, text, classId, methodId);
+// FIXME aconway 2007-01-26: make channels owned and created by connection?
+void Connection::openChannel(Channel& channel) {
+ ChannelId id = ++channelIdCounter;
+ assert (channels.find(id) == channels.end());
+ assert(out);
+ channels[id] = &channel;
+ channel.open(id, *this);
}
-void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){
- //send frame to close channel
- channel->cancelAll();
- channel->closed = true;
- channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok);
- channel->con = 0;
- channel->out = 0;
- removeChannel(channel);
-}
-
-void Connection::removeChannel(Channel* channel){
- //send frame to close channel
-
- channels.erase(channel->id);
- channel->out = 0;
- channel->id = 0;
- channel->con = 0;
+void Connection::erase(ChannelId id) {
+ channels.erase(id);
}
void Connection::received(AMQFrame* frame){
- AMQBody::shared_ptr body = frame->getBody();
- u_int8_t type = body->type();
- if (type == REQUEST_BODY)
- responder.received(AMQRequestBody::getData(body));
- handleFrame(frame);
- if (type == RESPONSE_BODY)
- requester.processed(AMQResponseBody::getData(body));
-}
-
-void Connection::handleFrame(AMQFrame* frame){
- u_int16_t channelId = frame->getChannel();
-
- if(channelId == 0){
- this->handleBody(frame->getBody());
- }else{
- Channel* channel = channels[channelId];
- if(channel == 0){
- error(504, "Unknown channel");
- }else{
- try{
- channel->handleBody(frame->getBody());
- }catch(qpid::QpidError e){
- channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
- }
- }
+ // FIXME aconway 2007-01-25: Mutex
+ ChannelId id = frame->getChannel();
+ Channel* channel = channels[id];
+ // FIXME aconway 2007-01-26: Exception thrown here is hanging the
+ // client. Need to review use of exceptions.
+ if (channel == 0)
+ THROW_QPID_ERROR(
+ PROTOCOL_ERROR+504,
+ (boost::format("Invalid channel number %g") % id).str());
+ try{
+ channel->handleBody(frame->getBody());
+ }catch(const qpid::QpidError& e){
+ channelException(
+ *channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
}
}
-void Connection::handleRequest(AMQRequestBody::shared_ptr body) {
- // FIXME aconway 2007-01-19: request/response handling.
- handleMethod(body);
-}
-
-void Connection::handleResponse(AMQResponseBody::shared_ptr body) {
- // FIXME aconway 2007-01-19: request/response handling.
- handleMethod(body);
-}
-
-void Connection::handleMethod(AMQMethodBody::shared_ptr body){
- //connection.close, basic.deliver, basic.return or a response to a synchronous request
- if(responses.isWaiting()){
- responses.signalResponse(body);
- }else if(method_bodies.connection_close.match(body.get())){
- //send back close ok
- //close socket
- ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get());
- std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl;
- connector->close();
- }else{
- std::cout << "Unhandled method for connection: " << *body << std::endl;
- error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId());
- }
-}
-
-void Connection::handleHeader(AMQHeaderBody::shared_ptr /*body*/){
- error(504, "Channel error: received header body with channel 0.");
-}
-
-void Connection::handleContent(AMQContentBody::shared_ptr /*body*/){
- error(504, "Channel error: received content body with channel 0.");
-}
-
-void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
-}
-
-void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
- responses.expect();
+void Connection::send(AMQFrame* frame) {
out->send(frame);
- responses.receive(body);
}
-void Connection::error(int code, const string& msg, int classid, int methodid){
- std::cout << "Connection exception generated: " << code << msg;
- if(classid || methodid){
- std::cout << " [" << methodid << ":" << classid << "]";
- }
- std::cout << std::endl;
- sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok);
- connector->close();
-}
-
-void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){
- std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.location.file << ":" << e.location.line << ")" << std::endl;
- int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500;
+void Connection::channelException(
+ Channel& channel, AMQMethodBody* method, const QpidError& e)
+{
+ int code = (e.code >= PROTOCOL_ERROR) ? e.code - PROTOCOL_ERROR : 500;
string msg = e.msg;
- if(method == 0){
- closeChannel(channel, code, msg);
- }else{
- closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId());
- }
+ if(method == 0)
+ channel.close(code, msg);
+ else
+ channel.close(
+ code, msg, method->amqpClassId(), method->amqpMethodId());
}
void Connection::idleIn(){
@@ -259,9 +142,12 @@ void Connection::idleOut(){
void Connection::shutdown(){
closed = true;
- //close all channels
- for(iterator i = channels.begin(); i != channels.end(); i++){
- i->second->stop();
+ //close all channels, also removes them from the map.
+ while(!channels.empty()){
+ Channel* channel = channels.begin()->second;
+ if (channel != 0)
+ channel->close();
}
- responses.signalResponse(AMQMethodBody::shared_ptr());
}
+
+}} // namespace qpid::client