summaryrefslogtreecommitdiff
path: root/cpp/common/io/src/LFProcessor.cpp
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
committerRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
commit913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch)
tree7ea442d6867d0076f1c9ea4f4265664059e7aff5 /cpp/common/io/src/LFProcessor.cpp
downloadqpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze Repository Root: https://etp.108.redhat.com/svn/etp Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48 Revision: 608 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/common/io/src/LFProcessor.cpp')
-rw-r--r--cpp/common/io/src/LFProcessor.cpp191
1 files changed, 191 insertions, 0 deletions
diff --git a/cpp/common/io/src/LFProcessor.cpp b/cpp/common/io/src/LFProcessor.cpp
new file mode 100644
index 0000000000..8ef3543b8f
--- /dev/null
+++ b/cpp/common/io/src/LFProcessor.cpp
@@ -0,0 +1,191 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "LFProcessor.h"
+#include "APRBase.h"
+#include "LFSessionContext.h"
+#include "QpidError.h"
+#include <sstream>
+
+using namespace qpid::io;
+using namespace qpid::concurrent;
+using qpid::QpidError;
+
+LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : size(_size),
+ timeout(_timeout),
+ signalledCount(0),
+ current(0),
+ count(0),
+ hasLeader(false),
+ workerCount(_workers),
+ workers(new Thread*[_workers]),
+ stopped(false){
+
+ CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE));
+ //create & start the required number of threads
+ for(int i = 0; i < workerCount; i++){
+ workers[i] = factory.create(this);
+ }
+}
+
+
+LFProcessor::~LFProcessor(){
+ for(int i = 0; i < workerCount; i++){
+ delete workers[i];
+ }
+ delete[] workers;
+ CHECK_APR_SUCCESS(apr_pollset_destroy(pollset));
+}
+
+void LFProcessor::start(){
+ for(int i = 0; i < workerCount; i++){
+ workers[i]->start();
+ }
+}
+
+void LFProcessor::add(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+ countLock.acquire();
+ sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data));
+ count++;
+ countLock.release();
+}
+
+void LFProcessor::remove(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+ countLock.acquire();
+ sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data)));
+ count--;
+ countLock.release();
+}
+
+void LFProcessor::reactivate(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+}
+
+void LFProcessor::deactivate(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+}
+
+void LFProcessor::update(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+}
+
+bool LFProcessor::full(){
+ countLock.acquire();
+ bool full = count == size;
+ countLock.release();
+ return full;
+}
+
+bool LFProcessor::empty(){
+ countLock.acquire();
+ bool empty = count == 0;
+ countLock.release();
+ return empty;
+}
+
+void LFProcessor::poll(){
+ apr_status_t status;
+ do{
+ current = 0;
+ if(!stopped){
+ status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs);
+ }
+ }while(status != APR_SUCCESS && !stopped);
+}
+
+void LFProcessor::run(){
+ try{
+ while(!stopped){
+ leadLock.acquire();
+ waitToLead();
+ if(!stopped){
+ const apr_pollfd_t* evt = getNextEvent();
+ if(evt){
+ LFSessionContext* session = reinterpret_cast<LFSessionContext*>(evt->client_data);
+ session->startProcessing();
+
+ relinquishLead();
+ leadLock.release();
+
+ //process event:
+ if(evt->rtnevents & APR_POLLIN) session->read();
+ if(evt->rtnevents & APR_POLLOUT) session->write();
+
+ if(session->isClosed()){
+ session->handleClose();
+ countLock.acquire();
+ sessions.erase(find(sessions.begin(), sessions.end(), session));
+ count--;
+ countLock.release();
+ }else{
+ session->stopProcessing();
+ }
+
+ }else{
+ leadLock.release();
+ }
+ }else{
+ leadLock.release();
+ }
+ }
+ }catch(QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ }
+}
+
+void LFProcessor::waitToLead(){
+ while(hasLeader && !stopped) leadLock.wait();
+ hasLeader = !stopped;
+}
+
+void LFProcessor::relinquishLead(){
+ hasLeader = false;
+ leadLock.notify();
+}
+
+const apr_pollfd_t* LFProcessor::getNextEvent(){
+ while(true){
+ if(stopped){
+ return 0;
+ }else if(current < signalledCount){
+ //use result of previous poll if one is available
+ return signalledFDs + (current++);
+ }else{
+ //else poll to get new events
+ poll();
+ }
+ }
+}
+
+void LFProcessor::stop(){
+ stopped = true;
+ leadLock.acquire();
+ leadLock.notifyAll();
+ leadLock.release();
+
+ for(int i = 0; i < workerCount; i++){
+ workers[i]->join();
+ }
+
+ for(iterator i = sessions.begin(); i < sessions.end(); i++){
+ (*i)->shutdown();
+ }
+}
+