diff options
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/Connection.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 169 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connector.h | 90 |
3 files changed, 261 insertions, 2 deletions
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index da747d0e1d..4ff4e83859 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -22,7 +22,7 @@ #define _Connection_ #include "qpid/QpidError.h" -#include "qpid/sys/Connector.h" +#include "qpid/client/Connector.h" #include "qpid/sys/ShutdownHandler.h" #include "qpid/sys/TimeoutHandler.h" @@ -52,7 +52,7 @@ class Connection : public virtual qpid::framing::InputHandler, int port; const u_int32_t max_frame_size; std::map<int, Channel*> channels; - qpid::sys::Connector* connector; + Connector* connector; qpid::framing::OutputHandler* out; ResponseHandler responses; volatile bool closed; diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp new file mode 100644 index 0000000000..5d3a20be6a --- /dev/null +++ b/cpp/src/qpid/client/Connector.cpp @@ -0,0 +1,169 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include <qpid/QpidError.h> +#include <qpid/sys/Time.h> +#include "Connector.h" + +using namespace qpid::sys; +using namespace qpid::client; +using namespace qpid::framing; +using qpid::QpidError; + +Connector::Connector(bool _debug, u_int32_t buffer_size) : + debug(_debug), + receive_buffer_size(buffer_size), + send_buffer_size(buffer_size), + closed(true), + lastIn(0), lastOut(0), + timeout(0), + idleIn(0), idleOut(0), + timeoutHandler(0), + shutdownHandler(0), + inbuf(receive_buffer_size), + outbuf(send_buffer_size){ } + +Connector::~Connector(){ } + +void Connector::connect(const std::string& host, int port){ + socket.connect(host, port); + closed = false; + receiver = Thread(this); +} + +void Connector::init(ProtocolInitiation* header){ + writeBlock(header); + delete header; +} + +void Connector::close(){ + closed = true; + socket.close(); + receiver.join(); +} + +void Connector::setInputHandler(InputHandler* handler){ + input = handler; +} + +void Connector::setShutdownHandler(ShutdownHandler* handler){ + shutdownHandler = handler; +} + +OutputHandler* Connector::getOutputHandler(){ + return this; +} + +void Connector::send(AMQFrame* frame){ + writeBlock(frame); + if(debug) std::cout << "SENT: " << *frame << std::endl; + delete frame; +} + +void Connector::writeBlock(AMQDataBlock* data){ + Mutex::ScopedLock l(writeLock); + data->encode(outbuf); + //transfer data to wire + outbuf.flip(); + writeToSocket(outbuf.start(), outbuf.available()); + outbuf.clear(); +} + +void Connector::writeToSocket(char* data, size_t available){ + size_t written = 0; + while(written < available && !closed){ + ssize_t sent = socket.send(data + written, available-written); + if(sent > 0) { + lastOut = getTimeMsecs(); + written += sent; + } + } +} + +void Connector::checkIdle(ssize_t status){ + if(timeoutHandler){ + int64_t now = getTimeMsecs(); + if(status == Socket::SOCKET_TIMEOUT) { + if(idleIn && (now - lastIn > idleIn)){ + timeoutHandler->idleIn(); + } + }else if(status == Socket::SOCKET_EOF){ + closed = true; + socket.close(); + if(shutdownHandler) shutdownHandler->shutdown(); + }else{ + lastIn = now; + } + if(idleOut && (now - lastOut > idleOut)){ + timeoutHandler->idleOut(); + } + } +} + +void Connector::setReadTimeout(u_int16_t t){ + idleIn = t * 1000;//t is in secs + if(idleIn && (!timeout || idleIn < timeout)){ + timeout = idleIn; + setSocketTimeout(); + } + +} + +void Connector::setWriteTimeout(u_int16_t t){ + idleOut = t * 1000;//t is in secs + if(idleOut && (!timeout || idleOut < timeout)){ + timeout = idleOut; + setSocketTimeout(); + } +} + +void Connector::setSocketTimeout(){ + socket.setTimeout(timeout); +} + +void Connector::setTimeoutHandler(TimeoutHandler* handler){ + timeoutHandler = handler; +} + +void Connector::run(){ + try{ + while(!closed){ + ssize_t available = inbuf.available(); + if(available < 1){ + THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); + } + ssize_t received = socket.recv(inbuf.start(), available); + checkIdle(received); + + if(received > 0){ + inbuf.move(received); + inbuf.flip();//position = 0, limit = total data read + + AMQFrame frame; + while(frame.decode(inbuf)){ + if(debug) std::cout << "RECV: " << frame << std::endl; + input->received(&frame); + } + //need to compact buffer to preserve any 'extra' data + inbuf.compact(); + } + } + }catch(QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } +} diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h new file mode 100644 index 0000000000..91ec58c95c --- /dev/null +++ b/cpp/src/qpid/client/Connector.h @@ -0,0 +1,90 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Connector_ +#define _Connector_ + + +#include "qpid/framing/InputHandler.h" +#include "qpid/framing/OutputHandler.h" +#include "qpid/framing/InitiationHandler.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/sys/ShutdownHandler.h" +#include "qpid/sys/TimeoutHandler.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Monitor.h" +#include <qpid/sys/Socket.h> + +namespace qpid { +namespace client { + + class Connector : public qpid::framing::OutputHandler, + private qpid::sys::Runnable + { + const bool debug; + const int receive_buffer_size; + const int send_buffer_size; + + bool closed; + + int64_t lastIn; + int64_t lastOut; + int64_t timeout; + u_int32_t idleIn; + u_int32_t idleOut; + + qpid::sys::TimeoutHandler* timeoutHandler; + qpid::sys::ShutdownHandler* shutdownHandler; + qpid::framing::InputHandler* input; + qpid::framing::InitiationHandler* initialiser; + qpid::framing::OutputHandler* output; + + qpid::framing::Buffer inbuf; + qpid::framing::Buffer outbuf; + + qpid::sys::Mutex writeLock; + qpid::sys::Thread receiver; + + qpid::sys::Socket socket; + + void checkIdle(ssize_t status); + void writeBlock(qpid::framing::AMQDataBlock* data); + void writeToSocket(char* data, size_t available); + void setSocketTimeout(); + + void run(); + + public: + Connector(bool debug = false, u_int32_t buffer_size = 1024); + virtual ~Connector(); + virtual void connect(const std::string& host, int port); + virtual void init(qpid::framing::ProtocolInitiation* header); + virtual void close(); + virtual void setInputHandler(qpid::framing::InputHandler* handler); + virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler); + virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler); + virtual qpid::framing::OutputHandler* getOutputHandler(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void setReadTimeout(u_int16_t timeout); + virtual void setWriteTimeout(u_int16_t timeout); + }; + +} +} + + +#endif |
