summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/Connector.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/Connector.cpp')
-rw-r--r--cpp/src/qpid/client/Connector.cpp188
1 files changed, 188 insertions, 0 deletions
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
new file mode 100644
index 0000000000..9230050ff7
--- /dev/null
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -0,0 +1,188 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include "qpid/QpidError.h"
+#include "qpid/sys/Time.h"
+#include "Connector.h"
+
+namespace qpid {
+namespace client {
+
+using namespace qpid::sys;
+using namespace qpid::framing;
+using qpid::QpidError;
+
+Connector::Connector(
+ ProtocolVersion ver, bool _debug, uint32_t buffer_size
+) : debug(_debug),
+ receive_buffer_size(buffer_size),
+ send_buffer_size(buffer_size),
+ version(ver),
+ closed(true),
+ lastIn(0), lastOut(0),
+ timeout(0),
+ idleIn(0), idleOut(0),
+ timeoutHandler(0),
+ shutdownHandler(0),
+ inbuf(receive_buffer_size),
+ outbuf(send_buffer_size)
+{ }
+
+Connector::~Connector(){ }
+
+void Connector::connect(const std::string& host, int port){
+ socket = Socket::createTcp();
+ socket.connect(host, port);
+ closed = false;
+ receiver = Thread(this);
+}
+
+void Connector::init(){
+ ProtocolInitiation init(version);
+ writeBlock(&init);
+}
+
+void Connector::close(){
+ closed = true;
+ socket.close();
+ receiver.join();
+}
+
+void Connector::setInputHandler(InputHandler* handler){
+ input = handler;
+}
+
+void Connector::setShutdownHandler(ShutdownHandler* handler){
+ shutdownHandler = handler;
+}
+
+OutputHandler* Connector::getOutputHandler(){
+ return this;
+}
+
+void Connector::send(AMQFrame* f){
+ std::auto_ptr<AMQFrame> frame(f);
+ AMQBody::shared_ptr body = frame->getBody();
+ writeBlock(frame.get());
+ if(debug) std::cout << "SENT: " << *frame << std::endl;
+}
+
+void Connector::writeBlock(AMQDataBlock* data){
+ Mutex::ScopedLock l(writeLock);
+ data->encode(outbuf);
+ //transfer data to wire
+ outbuf.flip();
+ writeToSocket(outbuf.start(), outbuf.available());
+ outbuf.clear();
+}
+
+void Connector::writeToSocket(char* data, size_t available){
+ size_t written = 0;
+ while(written < available && !closed){
+ ssize_t sent = socket.send(data + written, available-written);
+ if(sent > 0) {
+ lastOut = now() * TIME_MSEC;
+ written += sent;
+ }
+ }
+}
+
+void Connector::handleClosed(){
+ closed = true;
+ socket.close();
+ if(shutdownHandler) shutdownHandler->shutdown();
+}
+
+void Connector::checkIdle(ssize_t status){
+ if(timeoutHandler){
+ Time t = now() * TIME_MSEC;
+ if(status == Socket::SOCKET_TIMEOUT) {
+ if(idleIn && (t - lastIn > idleIn)){
+ timeoutHandler->idleIn();
+ }
+ }
+ else if(status == 0 || status == Socket::SOCKET_EOF) {
+ handleClosed();
+ }
+ else {
+ lastIn = t;
+ }
+ if(idleOut && (t - lastOut > idleOut)){
+ timeoutHandler->idleOut();
+ }
+ }
+}
+
+void Connector::setReadTimeout(uint16_t t){
+ idleIn = t * 1000;//t is in secs
+ if(idleIn && (!timeout || idleIn < timeout)){
+ timeout = idleIn;
+ setSocketTimeout();
+ }
+
+}
+
+void Connector::setWriteTimeout(uint16_t t){
+ idleOut = t * 1000;//t is in secs
+ if(idleOut && (!timeout || idleOut < timeout)){
+ timeout = idleOut;
+ setSocketTimeout();
+ }
+}
+
+void Connector::setSocketTimeout(){
+ socket.setTimeout(timeout*TIME_MSEC);
+}
+
+void Connector::setTimeoutHandler(TimeoutHandler* handler){
+ timeoutHandler = handler;
+}
+
+void Connector::run(){
+ try{
+ while(!closed){
+ ssize_t available = inbuf.available();
+ if(available < 1){
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
+ }
+ ssize_t received = socket.recv(inbuf.start(), available);
+ checkIdle(received);
+
+ if(!closed && received > 0){
+ inbuf.move(received);
+ inbuf.flip();//position = 0, limit = total data read
+
+ AMQFrame frame(version);
+ while(frame.decode(inbuf)){
+ if(debug) std::cout << "RECV: " << frame << std::endl;
+ input->received(&frame);
+ }
+ //need to compact buffer to preserve any 'extra' data
+ inbuf.compact();
+ }
+ }
+ } catch (const std::exception& e) {
+ std::cout << e.what() << std::endl;
+ handleClosed();
+ }
+}
+
+}} // namespace qpid::client