diff options
| author | Alan Conway <aconway@apache.org> | 2006-10-16 13:50:26 +0000 | 
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2006-10-16 13:50:26 +0000 | 
| commit | 8a6ab3aa61d441b9210c05c84dc9998acfc38737 (patch) | |
| tree | 1eb9d7f39b5c2d04a85a1f66caef3d398567b740 /cpp/src/qpid/concurrent/TaskQueue.h | |
| parent | 9a808fb13aba243d41bbdab75158dae5939a80a4 (diff) | |
| download | qpid-python-8a6ab3aa61d441b9210c05c84dc9998acfc38737.tar.gz | |
Build system reorg, see README and Makefile comments for details.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@464494 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/concurrent/TaskQueue.h')
| -rw-r--r-- | cpp/src/qpid/concurrent/TaskQueue.h | 200 | 
1 files changed, 200 insertions, 0 deletions
| diff --git a/cpp/src/qpid/concurrent/TaskQueue.h b/cpp/src/qpid/concurrent/TaskQueue.h new file mode 100644 index 0000000000..4abadd7dc5 --- /dev/null +++ b/cpp/src/qpid/concurrent/TaskQueue.h @@ -0,0 +1,200 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *    http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _TaskQueue_ +#define _TaskQueue_ + +#include <iostream> +#include <memory> +#include <queue> +#include "qpid/concurrent/LockedQueue.h" +#include "qpid/concurrent/Runnable.h" +#include "qpid/concurrent/ThreadPool.h" + +namespace qpid { +namespace concurrent { +    template<class T, class L> class TaskQueue : public virtual Runnable +    { +        const int max_iterations_per_run; +        L lock; +        //LockedQueue<T, L> queue; +        std::queue<T*> queue; +        ThreadPool* const pool; +        T* work; +        bool running;         +        volatile bool stopped;         +        TaskQueue<T, L>* next; + +        volatile bool inrun; + +        bool hasWork(); +        void completed(); + +        T* take(); + +    protected: +        /** +         * Callback though which the task is executed  +         */ +        virtual void execute(T* t) = 0; +        /** +         * Allows a task to be completed asynchronously to the +         * execute() call if required. +         */ +        virtual bool isComplete(T* t); +        /** +         * Should be called to signal completion of a task that was +         * signalled as not complete through the isComplete() methods +         * return value. This will allow normal processing to resume. +         */ +        virtual void complete(); + +    public: +        TaskQueue(ThreadPool* const pool, int max_iterations_per_run = 100); +        virtual void run(); +        void trigger(); +        bool append(T* t); +        void stop(bool drain); +        inline void setNext(TaskQueue<T, L>* next){ this->next = next; } +    }; + +    template<class T, class L> TaskQueue<T, L>::TaskQueue(ThreadPool* const _pool, int _max_iterations_per_run) :  +        pool(_pool),  +        max_iterations_per_run(_max_iterations_per_run), +        work(0),  +        running(false), +        stopped(false), +        next(0), inrun(false){ +    } + +    template<class T, class L> void TaskQueue<T, L>::run(){ +        if(inrun) std::cout << "Already running" << std::endl; +        inrun = true; + +        bool blocked = false; +        int count = max_iterations_per_run; +        while(!blocked && hasWork() && count){ +            execute(work); +            if(isComplete(work)){ +                completed(); +            }else{ +                blocked = true; +            } +            count--; +        } +        inrun = false; + +        if(!blocked && count == 0){//performed max_iterations_per_run, requeue task to ensure fairness  +            //running will still be true at this point +            lock.acquire(); +            running = false; +            if(stopped) lock.notify(); +            lock.release(); + +            trigger(); +        }else if(hasWork()){//task was added to queue after we exited the loop above; should not need this? +            trigger(); +        } +    } + +    template<class T, class L> void TaskQueue<T, L>::trigger(){ +        lock.acquire(); +        if(!running){ +            running = true; +            pool->addTask(this); +        } +        lock.release(); +    } + +    template<class T, class L> bool TaskQueue<T, L>::hasWork(){ +        lock.acquire(); +        if(!work) work = take();//queue.take(); +        if(!work){ +            running = false; +            if(stopped) lock.notify(); +        } +        lock.release(); +        return work; +    } + +    template<class T, class L> bool TaskQueue<T, L>::append(T* item){ +        if(!stopped){ +            lock.acquire(); + +            //queue.put(item); +            queue.push(item); + +            if(!running){ +                running = true; +                pool->addTask(this); +            } +            lock.release(); +                //} +            return true; +        }else{ +            return false; +        } +    } + +    template<class T, class L> bool TaskQueue<T, L>::isComplete(T* item){ +        return true;//by default assume all tasks are synchronous w.r.t. execute() +    } + + +    template<class T, class L> void TaskQueue<T, L>::completed(){ +        if(next){ +            if(!next->append(work)){ +                std::cout << "Warning: dropping task as next queue appears to have stopped." << std::endl; +            } +        }else{ +            delete work; +        } +        work = 0;         +    } +         +    template<class T, class L> void TaskQueue<T, L>::complete(){ +        completed(); +        lock.acquire(); +        running = false; +        if(stopped) lock.notify(); +        lock.release(); +    } + +    template<class T, class L> void TaskQueue<T, L>::stop(bool drain){ +        //prevent new tasks from being added +        stopped = true; +        //wait until no longer running +        lock.acquire(); +        while(running && (drain && hasWork())){ +            lock.wait(); +        } +        lock.release(); +    } + +    template<class T, class L> T* TaskQueue<T, L>::take(){ +        T* item = 0; +        if(!queue.empty()){ +            item = queue.front(); +            queue.pop(); +        } +        return item; +    } +} +} + + +#endif | 
