diff options
| author | Alan Conway <aconway@apache.org> | 2006-12-01 05:11:45 +0000 | 
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2006-12-01 05:11:45 +0000 | 
| commit | fb9ad93a3d422c1e83c998f44c4782f7bf1d1a66 (patch) | |
| tree | a2ebf932750bf13bf3db271f92df390335b0e844 /cpp/src/qpid/client/Connection.cpp | |
| parent | 33c04c7e619a65e2d92ac231805e8ad27f4a29c2 (diff) | |
| download | qpid-python-fb9ad93a3d422c1e83c998f44c4782f7bf1d1a66.tar.gz | |
2006-12-01  Jim Meyering  <meyering@redhat.com>
This delta imposes two major changes on the C++ hierarchy:
  - adds autoconf, automake, libtool support
  - makes the hierarchy flatter and renames a few files (e.g., Queue.h,
  Queue.cpp) that appeared twice, once under client/ and again under broker/.
In the process, I've changed many #include directives, mostly
to remove a qpid/ or qpid/framing/ prefix from the file name argument.
Although most changes were to .cpp and .h files under qpid/cpp/, there
were also several to template files under qpid/gentools, and even one
to CppGenerator.java.
Nearly all files are moved to a new position in the hierarchy.
The new hierarchy looks like this:
  src               # this is the new home of qpidd.cpp
  tests             # all tests are here.  See Makefile.am.
  gen               # As before, all generated files go here.
  lib               # This is just a container for the 3 lib dirs:
  lib/client
  lib/broker
  lib/common
  lib/common/framing
  lib/common/sys
  lib/common/sys/posix
  lib/common/sys/apr
  build-aux
  m4
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@481159 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/Connection.cpp')
| -rw-r--r-- | cpp/src/qpid/client/Connection.cpp | 244 | 
1 files changed, 0 insertions, 244 deletions
| diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp deleted file mode 100644 index 0b520d169d..0000000000 --- a/cpp/src/qpid/client/Connection.cpp +++ /dev/null @@ -1,244 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements.  See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership.  The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License.  You may obtain a copy of the License at - *  - *   http://www.apache.org/licenses/LICENSE-2.0 - *  - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied.  See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <qpid/client/Connection.h> -#include <qpid/client/Channel.h> -#include <qpid/client/Message.h> -#include <qpid/QpidError.h> -#include <iostream> -#include <qpid/client/MethodBodyInstances.h> - -using namespace qpid::client; -using namespace qpid::framing; -using namespace qpid::sys; -using namespace qpid::sys; - -u_int16_t Connection::channelIdCounter; - -Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true), -// AMQP version management change - kpvdr 2006-11-20 -// TODO: Make this class version-aware and link these hard-wired numbers to that version -    version(8, 0) -{ -    connector = new Connector(debug, _max_frame_size); -} - -Connection::~Connection(){ -    delete connector; -} - -void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){ -    host = _host; -    port = _port; -    connector->setInputHandler(this); -    connector->setTimeoutHandler(this); -    connector->setShutdownHandler(this); -    out = connector->getOutputHandler(); -    connector->connect(host, port); -     -    ProtocolInitiation* header = new ProtocolInitiation(8, 0); -    responses.expect(); -    connector->init(header); -    responses.receive(method_bodies.connection_start); - -    FieldTable props; -    string mechanism("PLAIN"); -    string response = ((char)0) + uid + ((char)0) + pwd; -    string locale("en_US"); -    responses.expect(); -    out->send(new AMQFrame(0, new ConnectionStartOkBody(version, props, mechanism, response, locale))); - -    /** -     * Assume for now that further challenges will not be required -    //receive connection.secure -    responses.receive(connection_secure)); -    //send connection.secure-ok -    out->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); -    **/ - -    responses.receive(method_bodies.connection_tune); - -    ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse()); -    out->send(new AMQFrame(0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat()))); - -    u_int16_t heartbeat = proposal->getHeartbeat(); -    connector->setReadTimeout(heartbeat * 2); -    connector->setWriteTimeout(heartbeat); - -    //send connection.open -    string capabilities; -    string vhost = virtualhost; -    responses.expect(); -    out->send(new AMQFrame(0, new ConnectionOpenBody(version, vhost, capabilities, true))); -    //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true). -    responses.waitForResponse(); -    if(responses.validate(method_bodies.connection_open_ok)){ -        //ok -    }else if(responses.validate(method_bodies.connection_redirect)){ -        //ignore for now -        ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse())); -        std::cout << "Received redirection to " << redirect->getHost() << std::endl; -    }else{ -        THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response"); -    } -     -} - -void Connection::close(){ -    if(!closed){ -        u_int16_t code(200); -        string text("Ok"); -        u_int16_t classId(0); -        u_int16_t methodId(0); -         -        sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok); -        connector->close(); -    } -} - -void Connection::openChannel(Channel* channel){ -    channel->con = this; -    channel->id = ++channelIdCounter; -    channel->out = out; -    channels[channel->id] = channel; -    //now send frame to open channel and wait for response -    string oob; -    channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok); -    channel->setQos(); -    channel->closed = false; -} - -void Connection::closeChannel(Channel* channel){ -    //send frame to close channel -    u_int16_t code(200); -    string text("Ok"); -    u_int16_t classId(0); -    u_int16_t methodId(0); -    closeChannel(channel, code, text, classId, methodId); -} - -void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){ -    //send frame to close channel -    channel->cancelAll(); -    channel->closed = true; -    channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok); -    channel->con = 0; -    channel->out = 0; -    removeChannel(channel); -} - -void Connection::removeChannel(Channel* channel){ -    //send frame to close channel - -    channels.erase(channel->id); -    channel->out = 0;     -    channel->id = 0; -    channel->con = 0; -} - -void Connection::received(AMQFrame* frame){ -    u_int16_t channelId = frame->getChannel(); - -    if(channelId == 0){ -        this->handleBody(frame->getBody()); -    }else{ -        Channel* channel = channels[channelId]; -        if(channel == 0){ -            error(504, "Unknown channel"); -        }else{ -            try{ -                channel->handleBody(frame->getBody()); -            }catch(qpid::QpidError e){ -                channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e); -            } -        } -    } -} - -void Connection::handleMethod(AMQMethodBody::shared_ptr body){ -    //connection.close, basic.deliver, basic.return or a response to a synchronous request -    if(responses.isWaiting()){ -        responses.signalResponse(body); -    }else if(method_bodies.connection_close.match(body.get())){ -        //send back close ok -        //close socket -        ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get()); -        std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl; -        connector->close(); -    }else{ -        std::cout << "Unhandled method for connection: " << *body << std::endl; -        error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId()); -    } -} -     -void Connection::handleHeader(AMQHeaderBody::shared_ptr /*body*/){ -    error(504, "Channel error: received header body with channel 0."); -} -     -void Connection::handleContent(AMQContentBody::shared_ptr /*body*/){ -    error(504, "Channel error: received content body with channel 0."); -} -     -void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ -} - -void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){ -    responses.expect(); -    out->send(frame); -    responses.receive(body); -} - -void Connection::error(int code, const string& msg, int classid, int methodid){ -    std::cout << "Connection exception generated: " << code << msg; -    if(classid || methodid){ -        std::cout << " [" << methodid << ":" << classid << "]"; -    } -    std::cout << std::endl; -    sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok); -    connector->close(); -} - -void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){ -    std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.location.file << ":" << e.location.line << ")" << std::endl; -    int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500; -    string msg = e.msg; -    if(method == 0){ -        closeChannel(channel, code, msg); -    }else{ -        closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId()); -    } -} - -void Connection::idleIn(){ -    std::cout << "Connection timed out due to abscence of heartbeat." << std::endl; -    connector->close(); -} - -void Connection::idleOut(){ -    out->send(new AMQFrame(0, new AMQHeartbeatBody())); -} - -void Connection::shutdown(){ -    closed = true; -    //close all channels -    for(iterator i = channels.begin(); i != channels.end(); i++){ -        i->second->stop(); -    } -} | 
