summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/apr/LFProcessor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/apr/LFProcessor.cpp')
-rw-r--r--cpp/src/qpid/sys/apr/LFProcessor.cpp179
1 files changed, 179 insertions, 0 deletions
diff --git a/cpp/src/qpid/sys/apr/LFProcessor.cpp b/cpp/src/qpid/sys/apr/LFProcessor.cpp
new file mode 100644
index 0000000000..0d8ac425fe
--- /dev/null
+++ b/cpp/src/qpid/sys/apr/LFProcessor.cpp
@@ -0,0 +1,179 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <sstream>
+#include "qpid/QpidError.h"
+#include "LFProcessor.h"
+#include "APRBase.h"
+#include "LFSessionContext.h"
+
+using namespace qpid::sys;
+using qpid::QpidError;
+
+// TODO aconway 2006-10-12: stopped is read outside locks.
+//
+
+LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) :
+ size(_size),
+ timeout(_timeout),
+ signalledCount(0),
+ current(0),
+ count(0),
+ workerCount(_workers),
+ hasLeader(false),
+ workers(new Thread[_workers]),
+ stopped(false)
+{
+
+ CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE));
+}
+
+
+LFProcessor::~LFProcessor(){
+ if (!stopped) stop();
+ delete[] workers;
+ CHECK_APR_SUCCESS(apr_pollset_destroy(pollset));
+}
+
+void LFProcessor::start(){
+ for(int i = 0; i < workerCount; i++){
+ workers[i] = Thread(this);
+ }
+}
+
+void LFProcessor::add(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+ Monitor::ScopedLock l(countLock);
+ sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data));
+ count++;
+}
+
+void LFProcessor::remove(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+ Monitor::ScopedLock l(countLock);
+ sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data)));
+ count--;
+}
+
+void LFProcessor::reactivate(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+}
+
+void LFProcessor::deactivate(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+}
+
+void LFProcessor::update(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+}
+
+bool LFProcessor::full(){
+ Mutex::ScopedLock locker(countLock);
+ return count == size;
+}
+
+bool LFProcessor::empty(){
+ Mutex::ScopedLock locker(countLock);
+ return count == 0;
+}
+
+void LFProcessor::poll() {
+ apr_status_t status = APR_EGENERAL;
+ do{
+ current = 0;
+ if(!stopped){
+ status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs);
+ }
+ }while(status != APR_SUCCESS && !stopped);
+}
+
+void LFProcessor::run(){
+ try{
+ while(!stopped){
+ const apr_pollfd_t* event = 0;
+ LFSessionContext* session = 0;
+ {
+ Monitor::ScopedLock l(leadLock);
+ waitToLead();
+ event = getNextEvent();
+ if(!event) return;
+ session = reinterpret_cast<LFSessionContext*>(
+ event->client_data);
+ session->startProcessing();
+ relinquishLead();
+ }
+
+ //process event:
+ if(event->rtnevents & APR_POLLIN) session->read();
+ if(event->rtnevents & APR_POLLOUT) session->write();
+
+ if(session->isClosed()){
+ session->handleClose();
+ Monitor::ScopedLock l(countLock);
+ sessions.erase(find(sessions.begin(),sessions.end(), session));
+ count--;
+ }else{
+ session->stopProcessing();
+ }
+ }
+ }catch(std::exception e){
+ std::cout << e.what() << std::endl;
+ }
+}
+
+void LFProcessor::waitToLead(){
+ while(hasLeader && !stopped) leadLock.wait();
+ hasLeader = !stopped;
+}
+
+void LFProcessor::relinquishLead(){
+ hasLeader = false;
+ leadLock.notify();
+}
+
+const apr_pollfd_t* LFProcessor::getNextEvent(){
+ while(true){
+ if(stopped){
+ return 0;
+ }else if(current < signalledCount){
+ //use result of previous poll if one is available
+ return signalledFDs + (current++);
+ }else{
+ //else poll to get new events
+ poll();
+ }
+ }
+}
+
+void LFProcessor::stop(){
+ stopped = true;
+ {
+ Monitor::ScopedLock l(leadLock);
+ leadLock.notifyAll();
+ }
+ for(int i = 0; i < workerCount; i++){
+ workers[i].join();
+ }
+ for(iterator i = sessions.begin(); i < sessions.end(); i++){
+ (*i)->shutdown();
+ }
+}
+