summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/io/BlockingAPRSessionContext.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/io/BlockingAPRSessionContext.cpp')
-rw-r--r--cpp/src/qpid/io/BlockingAPRSessionContext.cpp178
1 files changed, 178 insertions, 0 deletions
diff --git a/cpp/src/qpid/io/BlockingAPRSessionContext.cpp b/cpp/src/qpid/io/BlockingAPRSessionContext.cpp
new file mode 100644
index 0000000000..aee223ca3b
--- /dev/null
+++ b/cpp/src/qpid/io/BlockingAPRSessionContext.cpp
@@ -0,0 +1,178 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <assert.h>
+#include <iostream>
+#include "qpid/io/BlockingAPRSessionContext.h"
+#include "qpid/io/BlockingAPRAcceptor.h"
+#include "qpid/concurrent/APRBase.h"
+#include "qpid/QpidError.h"
+
+using namespace qpid::concurrent;
+using namespace qpid::framing;
+using namespace qpid::io;
+
+
+BlockingAPRSessionContext::BlockingAPRSessionContext(apr_socket_t* _socket,
+ ThreadFactory* factory,
+ BlockingAPRAcceptor* _acceptor,
+ bool _debug)
+ : socket(_socket),
+ debug(_debug),
+ handler(0),
+ acceptor(_acceptor),
+ inbuf(65536),
+ outbuf(65536),
+ closed(false){
+
+ reader = new Reader(this);
+ writer = new Writer(this);
+
+ rThread = factory->create(reader);
+ wThread = factory->create(writer);
+}
+
+BlockingAPRSessionContext::~BlockingAPRSessionContext(){
+ delete reader;
+ delete writer;
+
+ delete rThread;
+ delete wThread;
+
+ delete handler;
+}
+
+void BlockingAPRSessionContext::read(){
+ try{
+ bool initiated(false);
+ while(!closed){
+ apr_size_t bytes(inbuf.available());
+ if(bytes < 1){
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
+ }
+ apr_status_t s = apr_socket_recv(socket, inbuf.start(), &bytes);
+ if(APR_STATUS_IS_TIMEUP(s)){
+ //timed out, check closed on loop
+ }else if(APR_STATUS_IS_EOF(s) || bytes == 0){
+ closed = true;
+ }else{
+ inbuf.move(bytes);
+ inbuf.flip();
+
+ if(!initiated){
+ ProtocolInitiation* protocolInit = new ProtocolInitiation();
+ if(protocolInit->decode(inbuf)){
+ handler->initiated(protocolInit);
+ if(debug) std::cout << "RECV: [" << &socket << "]: Initialised " << std::endl;
+ initiated = true;
+ }
+ }else{
+ AMQFrame frame;
+ while(frame.decode(inbuf)){
+ if(debug) std::cout << "RECV: [" << &socket << "]:" << frame << std::endl;
+ handler->received(&frame);
+ }
+ }
+ //need to compact buffer to preserve any 'extra' data
+ inbuf.compact();
+ }
+ }
+
+ //close socket
+ }catch(qpid::QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ }
+}
+
+void BlockingAPRSessionContext::write(){
+ while(!closed){
+ //get next frame
+ outlock.acquire();
+ while(outframes.empty() && !closed){
+ outlock.wait();
+ }
+ if(!closed){
+ AMQFrame* frame = outframes.front();
+ outframes.pop();
+ outlock.release();
+
+ //encode
+ frame->encode(outbuf);
+ if(debug) std::cout << "SENT [" << &socket << "]:" << *frame << std::endl;
+ delete frame;
+ outbuf.flip();
+
+ //write from outbuf to socket
+ char* data = outbuf.start();
+ const int available = outbuf.available();
+ int written = 0;
+ apr_size_t bytes = available;
+ while(available > written){
+ apr_status_t s = apr_socket_send(socket, data + written, &bytes);
+ assert(s == 0); // TODO aconway 2006-10-05: Error Handling.
+ written += bytes;
+ bytes = available - written;
+ }
+ outbuf.clear();
+ }else{
+ outlock.release();
+ }
+ }
+}
+
+void BlockingAPRSessionContext::send(AMQFrame* frame){
+ if(!closed){
+ outlock.acquire();
+ bool was_empty(outframes.empty());
+ outframes.push(frame);
+ if(was_empty){
+ outlock.notify();
+ }
+ outlock.release();
+ }else{
+ std::cout << "WARNING: Session closed[" << &socket << "], dropping frame. " << &frame << std::endl;
+ }
+}
+
+void BlockingAPRSessionContext::init(SessionHandler* _handler){
+ handler = _handler;
+ rThread->start();
+ wThread->start();
+}
+
+void BlockingAPRSessionContext::close(){
+ closed = true;
+ wThread->join();
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ if(debug) std::cout << "RECV: [" << &socket << "]: Closed " << std::endl;
+ handler->closed();
+ acceptor->closed(this);
+ delete this;
+}
+
+void BlockingAPRSessionContext::shutdown(){
+ closed = true;
+ outlock.acquire();
+ outlock.notify();
+ outlock.release();
+
+ wThread->join();
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ rThread->join();
+ handler->closed();
+ delete this;
+}