summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/Connection.h4
-rw-r--r--cpp/src/qpid/client/Connector.cpp169
-rw-r--r--cpp/src/qpid/client/Connector.h90
3 files changed, 261 insertions, 2 deletions
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index da747d0e1d..4ff4e83859 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -22,7 +22,7 @@
#define _Connection_
#include "qpid/QpidError.h"
-#include "qpid/sys/Connector.h"
+#include "qpid/client/Connector.h"
#include "qpid/sys/ShutdownHandler.h"
#include "qpid/sys/TimeoutHandler.h"
@@ -52,7 +52,7 @@ class Connection : public virtual qpid::framing::InputHandler,
int port;
const u_int32_t max_frame_size;
std::map<int, Channel*> channels;
- qpid::sys::Connector* connector;
+ Connector* connector;
qpid::framing::OutputHandler* out;
ResponseHandler responses;
volatile bool closed;
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
new file mode 100644
index 0000000000..5d3a20be6a
--- /dev/null
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -0,0 +1,169 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include <qpid/QpidError.h>
+#include <qpid/sys/Time.h>
+#include "Connector.h"
+
+using namespace qpid::sys;
+using namespace qpid::client;
+using namespace qpid::framing;
+using qpid::QpidError;
+
+Connector::Connector(bool _debug, u_int32_t buffer_size) :
+ debug(_debug),
+ receive_buffer_size(buffer_size),
+ send_buffer_size(buffer_size),
+ closed(true),
+ lastIn(0), lastOut(0),
+ timeout(0),
+ idleIn(0), idleOut(0),
+ timeoutHandler(0),
+ shutdownHandler(0),
+ inbuf(receive_buffer_size),
+ outbuf(send_buffer_size){ }
+
+Connector::~Connector(){ }
+
+void Connector::connect(const std::string& host, int port){
+ socket.connect(host, port);
+ closed = false;
+ receiver = Thread(this);
+}
+
+void Connector::init(ProtocolInitiation* header){
+ writeBlock(header);
+ delete header;
+}
+
+void Connector::close(){
+ closed = true;
+ socket.close();
+ receiver.join();
+}
+
+void Connector::setInputHandler(InputHandler* handler){
+ input = handler;
+}
+
+void Connector::setShutdownHandler(ShutdownHandler* handler){
+ shutdownHandler = handler;
+}
+
+OutputHandler* Connector::getOutputHandler(){
+ return this;
+}
+
+void Connector::send(AMQFrame* frame){
+ writeBlock(frame);
+ if(debug) std::cout << "SENT: " << *frame << std::endl;
+ delete frame;
+}
+
+void Connector::writeBlock(AMQDataBlock* data){
+ Mutex::ScopedLock l(writeLock);
+ data->encode(outbuf);
+ //transfer data to wire
+ outbuf.flip();
+ writeToSocket(outbuf.start(), outbuf.available());
+ outbuf.clear();
+}
+
+void Connector::writeToSocket(char* data, size_t available){
+ size_t written = 0;
+ while(written < available && !closed){
+ ssize_t sent = socket.send(data + written, available-written);
+ if(sent > 0) {
+ lastOut = getTimeMsecs();
+ written += sent;
+ }
+ }
+}
+
+void Connector::checkIdle(ssize_t status){
+ if(timeoutHandler){
+ int64_t now = getTimeMsecs();
+ if(status == Socket::SOCKET_TIMEOUT) {
+ if(idleIn && (now - lastIn > idleIn)){
+ timeoutHandler->idleIn();
+ }
+ }else if(status == Socket::SOCKET_EOF){
+ closed = true;
+ socket.close();
+ if(shutdownHandler) shutdownHandler->shutdown();
+ }else{
+ lastIn = now;
+ }
+ if(idleOut && (now - lastOut > idleOut)){
+ timeoutHandler->idleOut();
+ }
+ }
+}
+
+void Connector::setReadTimeout(u_int16_t t){
+ idleIn = t * 1000;//t is in secs
+ if(idleIn && (!timeout || idleIn < timeout)){
+ timeout = idleIn;
+ setSocketTimeout();
+ }
+
+}
+
+void Connector::setWriteTimeout(u_int16_t t){
+ idleOut = t * 1000;//t is in secs
+ if(idleOut && (!timeout || idleOut < timeout)){
+ timeout = idleOut;
+ setSocketTimeout();
+ }
+}
+
+void Connector::setSocketTimeout(){
+ socket.setTimeout(timeout);
+}
+
+void Connector::setTimeoutHandler(TimeoutHandler* handler){
+ timeoutHandler = handler;
+}
+
+void Connector::run(){
+ try{
+ while(!closed){
+ ssize_t available = inbuf.available();
+ if(available < 1){
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
+ }
+ ssize_t received = socket.recv(inbuf.start(), available);
+ checkIdle(received);
+
+ if(received > 0){
+ inbuf.move(received);
+ inbuf.flip();//position = 0, limit = total data read
+
+ AMQFrame frame;
+ while(frame.decode(inbuf)){
+ if(debug) std::cout << "RECV: " << frame << std::endl;
+ input->received(&frame);
+ }
+ //need to compact buffer to preserve any 'extra' data
+ inbuf.compact();
+ }
+ }
+ }catch(QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ }
+}
diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h
new file mode 100644
index 0000000000..91ec58c95c
--- /dev/null
+++ b/cpp/src/qpid/client/Connector.h
@@ -0,0 +1,90 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Connector_
+#define _Connector_
+
+
+#include "qpid/framing/InputHandler.h"
+#include "qpid/framing/OutputHandler.h"
+#include "qpid/framing/InitiationHandler.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/sys/ShutdownHandler.h"
+#include "qpid/sys/TimeoutHandler.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Monitor.h"
+#include <qpid/sys/Socket.h>
+
+namespace qpid {
+namespace client {
+
+ class Connector : public qpid::framing::OutputHandler,
+ private qpid::sys::Runnable
+ {
+ const bool debug;
+ const int receive_buffer_size;
+ const int send_buffer_size;
+
+ bool closed;
+
+ int64_t lastIn;
+ int64_t lastOut;
+ int64_t timeout;
+ u_int32_t idleIn;
+ u_int32_t idleOut;
+
+ qpid::sys::TimeoutHandler* timeoutHandler;
+ qpid::sys::ShutdownHandler* shutdownHandler;
+ qpid::framing::InputHandler* input;
+ qpid::framing::InitiationHandler* initialiser;
+ qpid::framing::OutputHandler* output;
+
+ qpid::framing::Buffer inbuf;
+ qpid::framing::Buffer outbuf;
+
+ qpid::sys::Mutex writeLock;
+ qpid::sys::Thread receiver;
+
+ qpid::sys::Socket socket;
+
+ void checkIdle(ssize_t status);
+ void writeBlock(qpid::framing::AMQDataBlock* data);
+ void writeToSocket(char* data, size_t available);
+ void setSocketTimeout();
+
+ void run();
+
+ public:
+ Connector(bool debug = false, u_int32_t buffer_size = 1024);
+ virtual ~Connector();
+ virtual void connect(const std::string& host, int port);
+ virtual void init(qpid::framing::ProtocolInitiation* header);
+ virtual void close();
+ virtual void setInputHandler(qpid::framing::InputHandler* handler);
+ virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler);
+ virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler);
+ virtual qpid::framing::OutputHandler* getOutputHandler();
+ virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void setReadTimeout(u_int16_t timeout);
+ virtual void setWriteTimeout(u_int16_t timeout);
+ };
+
+}
+}
+
+
+#endif