summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-11-29 11:54:17 +0000
committerGordon Sim <gsim@apache.org>2007-11-29 11:54:17 +0000
commit6b179639ac573be8f5c7d84bfd480c71a6815265 (patch)
tree29d56665e8258c923f256fbed3942148dede48e0 /cpp/src/qpid/sys
parentd1f32f54b73807b778eb6027bb048f9e7b0e808f (diff)
downloadqpid-python-6b179639ac573be8f5c7d84bfd480c71a6815265.tar.gz
Changes to threading: queues serialiser removed, io threads used to drive dispatch to consumers
Fix to PersistableMessage: use correct lock when accessing synclist, don't hold enqueue lock when notifying queues git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@599395 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r--cpp/src/qpid/sys/AggregateOutput.cpp61
-rw-r--r--cpp/src/qpid/sys/AggregateOutput.h54
-rw-r--r--cpp/src/qpid/sys/AsynchIO.h10
-rw-r--r--cpp/src/qpid/sys/AsynchIOAcceptor.cpp17
-rw-r--r--cpp/src/qpid/sys/ConnectionInputHandler.h3
-rw-r--r--cpp/src/qpid/sys/ConnectionOutputHandler.h3
-rw-r--r--cpp/src/qpid/sys/OutputControl.h38
-rw-r--r--cpp/src/qpid/sys/OutputTask.h38
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp50
9 files changed, 245 insertions, 29 deletions
diff --git a/cpp/src/qpid/sys/AggregateOutput.cpp b/cpp/src/qpid/sys/AggregateOutput.cpp
new file mode 100644
index 0000000000..74eea5ed08
--- /dev/null
+++ b/cpp/src/qpid/sys/AggregateOutput.cpp
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/AggregateOutput.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace sys {
+
+void AggregateOutput::activateOutput()
+{
+ control.activateOutput();
+}
+
+bool AggregateOutput::doOutput()
+{
+ bool result = false;
+ if (!tasks.empty()) {
+ if (next >= tasks.size()) next = next % tasks.size();
+
+ size_t start = next;
+ //loop until a task generated some output
+ while (!result) {
+ result = tasks[next++]->doOutput();
+ if (next >= tasks.size()) next = next % tasks.size();
+ if (start == next) break;
+ }
+ }
+ return result;
+}
+
+void AggregateOutput::addOutputTask(OutputTask* t)
+{
+ tasks.push_back(t);
+}
+
+void AggregateOutput::removeOutputTask(OutputTask* t)
+{
+ TaskList::iterator i = find(tasks.begin(), tasks.end(), t);
+ if (i != tasks.end()) tasks.erase(i);
+}
+
+}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/AggregateOutput.h b/cpp/src/qpid/sys/AggregateOutput.h
new file mode 100644
index 0000000000..a870fcb95a
--- /dev/null
+++ b/cpp/src/qpid/sys/AggregateOutput.h
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _AggregateOutput_
+#define _AggregateOutput_
+
+#include <vector>
+#include "Mutex.h"
+#include "OutputControl.h"
+#include "OutputTask.h"
+
+namespace qpid {
+namespace sys {
+
+ class AggregateOutput : public OutputTask, public OutputControl
+ {
+ typedef std::vector<OutputTask*> TaskList;
+
+ TaskList tasks;
+ size_t next;
+ OutputControl& control;
+
+ public:
+ AggregateOutput(OutputControl& c) : next(0), control(c) {};
+ //this may be called on any thread
+ void activateOutput();
+ //all the following will be called on the same thread
+ bool doOutput();
+ void addOutputTask(OutputTask* t);
+ void removeOutputTask(OutputTask* t);
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h
index 7cb56b30aa..ca34d82741 100644
--- a/cpp/src/qpid/sys/AsynchIO.h
+++ b/cpp/src/qpid/sys/AsynchIO.h
@@ -97,6 +97,13 @@ private:
std::deque<BufferBase*> bufferQueue;
std::deque<BufferBase*> writeQueue;
bool queuedClose;
+ /**
+ * This flag is used to detect and handle concurrency between
+ * calls to notifyPendingWrite() (which can be made from any thread) and
+ * the execution of the writeable() method (which is always on the
+ * thread processing this handle.
+ */
+ volatile bool writePending;
public:
AsynchIO(const Socket& s,
@@ -107,7 +114,8 @@ public:
void start(Poller::shared_ptr poller);
void queueReadBuffer(BufferBase* buff);
void unread(BufferBase* buff);
- void queueWrite(BufferBase* buff = 0);
+ void queueWrite(BufferBase* buff);
+ void notifyPendingWrite();
void queueWriteClose();
bool writeQueueEmpty() { return writeQueue.empty(); }
BufferBase* getQueuedBuffer();
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index bdf3e3b8d3..51ec7f718a 100644
--- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -115,6 +115,7 @@ public:
// Output side
void send(framing::AMQFrame&);
void close();
+ void activateOutput();
// Input side
void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
@@ -135,7 +136,7 @@ void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, Conn
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
boost::bind(&AsynchIOHandler::eof, async, _1),
boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
async->init(aio, handler);
@@ -195,7 +196,7 @@ void AsynchIOHandler::send(framing::AMQFrame& frame) {
}
// Activate aio for writing here
- aio->queueWrite();
+ aio->notifyPendingWrite();
}
void AsynchIOHandler::close() {
@@ -203,6 +204,10 @@ void AsynchIOHandler::close() {
frameQueueClosed = true;
}
+void AsynchIOHandler::activateOutput() {
+ aio->notifyPendingWrite();
+}
+
// Input side
void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
if (readError) {
@@ -272,9 +277,11 @@ void AsynchIOHandler::idle(AsynchIO&){
ScopedLock<Mutex> l(frameQueueLock);
if (frameQueue.empty()) {
- // At this point we know that we're write idling the connection
- // so we could note that somewhere or do something special
- return;
+ // At this point we know that we're write idling the connection
+ // so tell the input handler to queue any available output:
+ inputHandler->doOutput();
+ //if still no frames, theres nothing to do:
+ if (frameQueue.empty()) return;
}
do {
diff --git a/cpp/src/qpid/sys/ConnectionInputHandler.h b/cpp/src/qpid/sys/ConnectionInputHandler.h
index 2bf3f66ec2..226096c5ef 100644
--- a/cpp/src/qpid/sys/ConnectionInputHandler.h
+++ b/cpp/src/qpid/sys/ConnectionInputHandler.h
@@ -24,6 +24,7 @@
#include "qpid/framing/InputHandler.h"
#include "qpid/framing/InitiationHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
+#include "OutputTask.h"
#include "TimeoutHandler.h"
namespace qpid {
@@ -32,7 +33,7 @@ namespace sys {
class ConnectionInputHandler :
public qpid::framing::InitiationHandler,
public qpid::framing::InputHandler,
- public TimeoutHandler
+ public TimeoutHandler, public OutputTask
{
public:
virtual void closed() = 0;
diff --git a/cpp/src/qpid/sys/ConnectionOutputHandler.h b/cpp/src/qpid/sys/ConnectionOutputHandler.h
index 8436bea599..5a60ae4998 100644
--- a/cpp/src/qpid/sys/ConnectionOutputHandler.h
+++ b/cpp/src/qpid/sys/ConnectionOutputHandler.h
@@ -22,6 +22,7 @@
#define _ConnectionOutputHandler_
#include "qpid/framing/OutputHandler.h"
+#include "OutputControl.h"
namespace qpid {
namespace sys {
@@ -29,7 +30,7 @@ namespace sys {
/**
* Provides the output handler associated with a connection.
*/
-class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler
+class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, public OutputControl
{
public:
virtual void close() = 0;
diff --git a/cpp/src/qpid/sys/OutputControl.h b/cpp/src/qpid/sys/OutputControl.h
new file mode 100644
index 0000000000..d922a0d85c
--- /dev/null
+++ b/cpp/src/qpid/sys/OutputControl.h
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _OutputControl_
+#define _OutputControl_
+
+namespace qpid {
+namespace sys {
+
+ class OutputControl
+ {
+ public:
+ virtual ~OutputControl() {}
+ virtual void activateOutput() = 0;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/sys/OutputTask.h b/cpp/src/qpid/sys/OutputTask.h
new file mode 100644
index 0000000000..109765b8c3
--- /dev/null
+++ b/cpp/src/qpid/sys/OutputTask.h
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _OutputTask_
+#define _OutputTask_
+
+namespace qpid {
+namespace sys {
+
+ class OutputTask
+ {
+ public:
+ virtual ~OutputTask() {}
+ virtual bool doOutput() = 0;
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 4600960c6d..e73bbc03ca 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -97,7 +97,8 @@ AsynchIO::AsynchIO(const Socket& s,
closedCallback(cCb),
emptyCallback(eCb),
idleCallback(iCb),
- queuedClose(false) {
+ queuedClose(false),
+ writePending(false) {
s.setNonblocking();
}
@@ -139,20 +140,21 @@ void AsynchIO::unread(BufferBase* buff) {
DispatchHandle::rewatchRead();
}
-// Either queue for writing or announce that there is something to write
-// and we should ask for it
void AsynchIO::queueWrite(BufferBase* buff) {
- // If no buffer then don't queue anything
- // (but still wake up for writing)
- if (buff) {
- // If we've already closed the socket then throw the write away
- if (queuedClose) {
- bufferQueue.push_front(buff);
- return;
- } else {
- writeQueue.push_front(buff);
- }
- }
+ assert(buff);
+ // If we've already closed the socket then throw the write away
+ if (queuedClose) {
+ bufferQueue.push_front(buff);
+ return;
+ } else {
+ writeQueue.push_front(buff);
+ }
+ writePending = false;
+ DispatchHandle::rewatchWrite();
+}
+
+void AsynchIO::notifyPendingWrite() {
+ writePending = true;
DispatchHandle::rewatchWrite();
}
@@ -269,18 +271,24 @@ void AsynchIO::writeable(DispatchHandle& h) {
}
}
} else {
- // If we're waiting to close the socket then can do it now as there is nothing to write
- if (queuedClose) {
- close(h);
- return;
- }
+ // If we're waiting to close the socket then can do it now as there is nothing to write
+ if (queuedClose) {
+ close(h);
+ return;
+ }
// Fd is writable, but nothing to write
if (idleCallback) {
+ writePending = false;
idleCallback(*this);
}
// If we still have no buffers to write we can't do anything more
- if (writeQueue.empty() && !queuedClose) {
+ if (writeQueue.empty() && !writePending && !queuedClose) {
h.unwatchWrite();
+ //the following handles the case where writePending is
+ //set to true after the test above; in this case its
+ //possible that the unwatchWrite overwrites the
+ //desired rewatchWrite so we correct that here
+ if (writePending) h.rewatchWrite();
return;
}
}
@@ -304,7 +312,7 @@ void AsynchIO::close(DispatchHandle& h) {
h.stopWatch();
h.getSocket().close();
if (closedCallback) {
- closedCallback(*this, getSocket());
+ closedCallback(*this, getSocket());
}
}