diff options
| author | Alan Conway <aconway@apache.org> | 2006-12-01 05:11:45 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2006-12-01 05:11:45 +0000 |
| commit | fb9ad93a3d422c1e83c998f44c4782f7bf1d1a66 (patch) | |
| tree | a2ebf932750bf13bf3db271f92df390335b0e844 /cpp/lib/common/sys/apr/LFSessionContext.cpp | |
| parent | 33c04c7e619a65e2d92ac231805e8ad27f4a29c2 (diff) | |
| download | qpid-python-fb9ad93a3d422c1e83c998f44c4782f7bf1d1a66.tar.gz | |
2006-12-01 Jim Meyering <meyering@redhat.com>
This delta imposes two major changes on the C++ hierarchy:
- adds autoconf, automake, libtool support
- makes the hierarchy flatter and renames a few files (e.g., Queue.h,
Queue.cpp) that appeared twice, once under client/ and again under broker/.
In the process, I've changed many #include directives, mostly
to remove a qpid/ or qpid/framing/ prefix from the file name argument.
Although most changes were to .cpp and .h files under qpid/cpp/, there
were also several to template files under qpid/gentools, and even one
to CppGenerator.java.
Nearly all files are moved to a new position in the hierarchy.
The new hierarchy looks like this:
src # this is the new home of qpidd.cpp
tests # all tests are here. See Makefile.am.
gen # As before, all generated files go here.
lib # This is just a container for the 3 lib dirs:
lib/client
lib/broker
lib/common
lib/common/framing
lib/common/sys
lib/common/sys/posix
lib/common/sys/apr
build-aux
m4
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@481159 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/common/sys/apr/LFSessionContext.cpp')
| -rw-r--r-- | cpp/lib/common/sys/apr/LFSessionContext.cpp | 173 |
1 files changed, 173 insertions, 0 deletions
diff --git a/cpp/lib/common/sys/apr/LFSessionContext.cpp b/cpp/lib/common/sys/apr/LFSessionContext.cpp new file mode 100644 index 0000000000..7fb8d5a91b --- /dev/null +++ b/cpp/lib/common/sys/apr/LFSessionContext.cpp @@ -0,0 +1,173 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "LFSessionContext.h" +#include "APRBase.h" +#include <QpidError.h> +#include <assert.h> + +using namespace qpid::sys; +using namespace qpid::sys; +using namespace qpid::framing; + +LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, + LFProcessor* const _processor, + bool _debug) : + debug(_debug), + socket(_socket), + initiated(false), + in(65536), + out(65536), + processor(_processor), + processing(false), + closing(false) +{ + + fd.p = _pool; + fd.desc_type = APR_POLL_SOCKET; + fd.reqevents = APR_POLLIN; + fd.client_data = this; + fd.desc.s = _socket; + + out.flip(); +} + +LFSessionContext::~LFSessionContext(){ + +} + +void LFSessionContext::read(){ + socket.read(in); + in.flip(); + if(initiated){ + AMQFrame frame; + while(frame.decode(in)){ + if(debug) log("RECV", &frame); + handler->received(&frame); + } + }else{ + ProtocolInitiation protocolInit; + if(protocolInit.decode(in)){ + handler->initiated(&protocolInit); + initiated = true; + if(debug) std::cout << "INIT [" << &socket << "]" << std::endl; + } + } + in.compact(); +} + +void LFSessionContext::write(){ + bool done = isClosed(); + while(!done){ + if(out.available() > 0){ + socket.write(out); + if(out.available() > 0){ + + //incomplete write, leave flags to receive notification of readiness to write + done = true;//finished processing for now, but write is still in progress + } + }else{ + //do we have any frames to write? + Mutex::ScopedLock l(writeLock); + if(!framesToWrite.empty()){ + out.clear(); + bool encoded(false); + AMQFrame* frame = framesToWrite.front(); + while(frame && out.available() >= frame->size()){ + encoded = true; + frame->encode(out); + if(debug) log("SENT", frame); + delete frame; + framesToWrite.pop(); + frame = framesToWrite.empty() ? 0 : framesToWrite.front(); + } + if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); + out.flip(); + }else{ + //reset flags, don't care about writability anymore + fd.reqevents = APR_POLLIN; + done = true; + + if(closing){ + socket.close(); + } + } + } + } +} + +void LFSessionContext::send(AMQFrame* frame){ + Mutex::ScopedLock l(writeLock); + if(!closing){ + framesToWrite.push(frame); + if(!(fd.reqevents & APR_POLLOUT)){ + fd.reqevents |= APR_POLLOUT; + if(!processing){ + processor->update(&fd); + } + } + } +} + +void LFSessionContext::startProcessing(){ + Mutex::ScopedLock l(writeLock); + processing = true; + processor->deactivate(&fd); +} + +void LFSessionContext::stopProcessing(){ + Mutex::ScopedLock l(writeLock); + processor->reactivate(&fd); + processing = false; +} + +void LFSessionContext::close(){ + closing = true; + Mutex::ScopedLock l(writeLock); + if(!processing){ + //allow pending frames to be written to socket + fd.reqevents = APR_POLLOUT; + processor->update(&fd); + } +} + +void LFSessionContext::handleClose(){ + handler->closed(); + std::cout << "Session closed [" << &socket << "]" << std::endl; + delete handler; + delete this; +} + +void LFSessionContext::shutdown(){ + socket.close(); + handleClose(); +} + +void LFSessionContext::init(SessionHandler* _handler){ + handler = _handler; + processor->add(&fd); +} + +void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){ + Mutex::ScopedLock l(logLock); + std::cout << desc << " [" << &socket << "]: " << *frame << std::endl; +} + +Mutex LFSessionContext::logLock; |
