diff options
| author | Gordon Sim <gsim@apache.org> | 2007-11-29 11:54:17 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-11-29 11:54:17 +0000 |
| commit | 6b179639ac573be8f5c7d84bfd480c71a6815265 (patch) | |
| tree | 29d56665e8258c923f256fbed3942148dede48e0 /cpp/src/qpid/sys | |
| parent | d1f32f54b73807b778eb6027bb048f9e7b0e808f (diff) | |
| download | qpid-python-6b179639ac573be8f5c7d84bfd480c71a6815265.tar.gz | |
Changes to threading: queues serialiser removed, io threads used to drive dispatch to consumers
Fix to PersistableMessage: use correct lock when accessing synclist, don't hold enqueue lock when notifying queues
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@599395 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
| -rw-r--r-- | cpp/src/qpid/sys/AggregateOutput.cpp | 61 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/AggregateOutput.h | 54 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIO.h | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 17 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ConnectionInputHandler.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ConnectionOutputHandler.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/OutputControl.h | 38 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/OutputTask.h | 38 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 50 |
9 files changed, 245 insertions, 29 deletions
diff --git a/cpp/src/qpid/sys/AggregateOutput.cpp b/cpp/src/qpid/sys/AggregateOutput.cpp new file mode 100644 index 0000000000..74eea5ed08 --- /dev/null +++ b/cpp/src/qpid/sys/AggregateOutput.cpp @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/AggregateOutput.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace sys { + +void AggregateOutput::activateOutput() +{ + control.activateOutput(); +} + +bool AggregateOutput::doOutput() +{ + bool result = false; + if (!tasks.empty()) { + if (next >= tasks.size()) next = next % tasks.size(); + + size_t start = next; + //loop until a task generated some output + while (!result) { + result = tasks[next++]->doOutput(); + if (next >= tasks.size()) next = next % tasks.size(); + if (start == next) break; + } + } + return result; +} + +void AggregateOutput::addOutputTask(OutputTask* t) +{ + tasks.push_back(t); +} + +void AggregateOutput::removeOutputTask(OutputTask* t) +{ + TaskList::iterator i = find(tasks.begin(), tasks.end(), t); + if (i != tasks.end()) tasks.erase(i); +} + +}} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/AggregateOutput.h b/cpp/src/qpid/sys/AggregateOutput.h new file mode 100644 index 0000000000..a870fcb95a --- /dev/null +++ b/cpp/src/qpid/sys/AggregateOutput.h @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _AggregateOutput_ +#define _AggregateOutput_ + +#include <vector> +#include "Mutex.h" +#include "OutputControl.h" +#include "OutputTask.h" + +namespace qpid { +namespace sys { + + class AggregateOutput : public OutputTask, public OutputControl + { + typedef std::vector<OutputTask*> TaskList; + + TaskList tasks; + size_t next; + OutputControl& control; + + public: + AggregateOutput(OutputControl& c) : next(0), control(c) {}; + //this may be called on any thread + void activateOutput(); + //all the following will be called on the same thread + bool doOutput(); + void addOutputTask(OutputTask* t); + void removeOutputTask(OutputTask* t); + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h index 7cb56b30aa..ca34d82741 100644 --- a/cpp/src/qpid/sys/AsynchIO.h +++ b/cpp/src/qpid/sys/AsynchIO.h @@ -97,6 +97,13 @@ private: std::deque<BufferBase*> bufferQueue; std::deque<BufferBase*> writeQueue; bool queuedClose; + /** + * This flag is used to detect and handle concurrency between + * calls to notifyPendingWrite() (which can be made from any thread) and + * the execution of the writeable() method (which is always on the + * thread processing this handle. + */ + volatile bool writePending; public: AsynchIO(const Socket& s, @@ -107,7 +114,8 @@ public: void start(Poller::shared_ptr poller); void queueReadBuffer(BufferBase* buff); void unread(BufferBase* buff); - void queueWrite(BufferBase* buff = 0); + void queueWrite(BufferBase* buff); + void notifyPendingWrite(); void queueWriteClose(); bool writeQueueEmpty() { return writeQueue.empty(); } BufferBase* getQueuedBuffer(); diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index bdf3e3b8d3..51ec7f718a 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -115,6 +115,7 @@ public: // Output side void send(framing::AMQFrame&); void close(); + void activateOutput(); // Input side void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff); @@ -135,7 +136,7 @@ void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, Conn boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), boost::bind(&AsynchIOHandler::eof, async, _1), boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), boost::bind(&AsynchIOHandler::nobuffs, async, _1), boost::bind(&AsynchIOHandler::idle, async, _1)); async->init(aio, handler); @@ -195,7 +196,7 @@ void AsynchIOHandler::send(framing::AMQFrame& frame) { } // Activate aio for writing here - aio->queueWrite(); + aio->notifyPendingWrite(); } void AsynchIOHandler::close() { @@ -203,6 +204,10 @@ void AsynchIOHandler::close() { frameQueueClosed = true; } +void AsynchIOHandler::activateOutput() { + aio->notifyPendingWrite(); +} + // Input side void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { if (readError) { @@ -272,9 +277,11 @@ void AsynchIOHandler::idle(AsynchIO&){ ScopedLock<Mutex> l(frameQueueLock); if (frameQueue.empty()) { - // At this point we know that we're write idling the connection - // so we could note that somewhere or do something special - return; + // At this point we know that we're write idling the connection + // so tell the input handler to queue any available output: + inputHandler->doOutput(); + //if still no frames, theres nothing to do: + if (frameQueue.empty()) return; } do { diff --git a/cpp/src/qpid/sys/ConnectionInputHandler.h b/cpp/src/qpid/sys/ConnectionInputHandler.h index 2bf3f66ec2..226096c5ef 100644 --- a/cpp/src/qpid/sys/ConnectionInputHandler.h +++ b/cpp/src/qpid/sys/ConnectionInputHandler.h @@ -24,6 +24,7 @@ #include "qpid/framing/InputHandler.h" #include "qpid/framing/InitiationHandler.h" #include "qpid/framing/ProtocolInitiation.h" +#include "OutputTask.h" #include "TimeoutHandler.h" namespace qpid { @@ -32,7 +33,7 @@ namespace sys { class ConnectionInputHandler : public qpid::framing::InitiationHandler, public qpid::framing::InputHandler, - public TimeoutHandler + public TimeoutHandler, public OutputTask { public: virtual void closed() = 0; diff --git a/cpp/src/qpid/sys/ConnectionOutputHandler.h b/cpp/src/qpid/sys/ConnectionOutputHandler.h index 8436bea599..5a60ae4998 100644 --- a/cpp/src/qpid/sys/ConnectionOutputHandler.h +++ b/cpp/src/qpid/sys/ConnectionOutputHandler.h @@ -22,6 +22,7 @@ #define _ConnectionOutputHandler_ #include "qpid/framing/OutputHandler.h" +#include "OutputControl.h" namespace qpid { namespace sys { @@ -29,7 +30,7 @@ namespace sys { /** * Provides the output handler associated with a connection. */ -class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler +class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, public OutputControl { public: virtual void close() = 0; diff --git a/cpp/src/qpid/sys/OutputControl.h b/cpp/src/qpid/sys/OutputControl.h new file mode 100644 index 0000000000..d922a0d85c --- /dev/null +++ b/cpp/src/qpid/sys/OutputControl.h @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _OutputControl_ +#define _OutputControl_ + +namespace qpid { +namespace sys { + + class OutputControl + { + public: + virtual ~OutputControl() {} + virtual void activateOutput() = 0; + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/sys/OutputTask.h b/cpp/src/qpid/sys/OutputTask.h new file mode 100644 index 0000000000..109765b8c3 --- /dev/null +++ b/cpp/src/qpid/sys/OutputTask.h @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _OutputTask_ +#define _OutputTask_ + +namespace qpid { +namespace sys { + + class OutputTask + { + public: + virtual ~OutputTask() {} + virtual bool doOutput() = 0; + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 4600960c6d..e73bbc03ca 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -97,7 +97,8 @@ AsynchIO::AsynchIO(const Socket& s, closedCallback(cCb), emptyCallback(eCb), idleCallback(iCb), - queuedClose(false) { + queuedClose(false), + writePending(false) { s.setNonblocking(); } @@ -139,20 +140,21 @@ void AsynchIO::unread(BufferBase* buff) { DispatchHandle::rewatchRead(); } -// Either queue for writing or announce that there is something to write -// and we should ask for it void AsynchIO::queueWrite(BufferBase* buff) { - // If no buffer then don't queue anything - // (but still wake up for writing) - if (buff) { - // If we've already closed the socket then throw the write away - if (queuedClose) { - bufferQueue.push_front(buff); - return; - } else { - writeQueue.push_front(buff); - } - } + assert(buff); + // If we've already closed the socket then throw the write away + if (queuedClose) { + bufferQueue.push_front(buff); + return; + } else { + writeQueue.push_front(buff); + } + writePending = false; + DispatchHandle::rewatchWrite(); +} + +void AsynchIO::notifyPendingWrite() { + writePending = true; DispatchHandle::rewatchWrite(); } @@ -269,18 +271,24 @@ void AsynchIO::writeable(DispatchHandle& h) { } } } else { - // If we're waiting to close the socket then can do it now as there is nothing to write - if (queuedClose) { - close(h); - return; - } + // If we're waiting to close the socket then can do it now as there is nothing to write + if (queuedClose) { + close(h); + return; + } // Fd is writable, but nothing to write if (idleCallback) { + writePending = false; idleCallback(*this); } // If we still have no buffers to write we can't do anything more - if (writeQueue.empty() && !queuedClose) { + if (writeQueue.empty() && !writePending && !queuedClose) { h.unwatchWrite(); + //the following handles the case where writePending is + //set to true after the test above; in this case its + //possible that the unwatchWrite overwrites the + //desired rewatchWrite so we correct that here + if (writePending) h.rewatchWrite(); return; } } @@ -304,7 +312,7 @@ void AsynchIO::close(DispatchHandle& h) { h.stopWatch(); h.getSocket().close(); if (closedCallback) { - closedCallback(*this, getSocket()); + closedCallback(*this, getSocket()); } } |
