summaryrefslogtreecommitdiff
path: root/cpp/src/tests/InProcessBroker.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/InProcessBroker.h')
-rw-r--r--cpp/src/tests/InProcessBroker.h54
1 files changed, 43 insertions, 11 deletions
diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h
index c893e6906a..9fa0135502 100644
--- a/cpp/src/tests/InProcessBroker.h
+++ b/cpp/src/tests/InProcessBroker.h
@@ -36,6 +36,7 @@
namespace qpid {
+using qpid::sys::ConnectionInputHandler;
/**
* A client::Connector that connects directly to an in-process broker.
@@ -54,13 +55,21 @@ class InProcessConnector :
enum Sender {CLIENT,BROKER};
+ struct Task {
+ AMQFrame frame;
+ bool doOutput;
+
+ Task() : doOutput(true) {}
+ Task(AMQFrame& f) : frame(f), doOutput(false) {}
+ };
+
/** Simulate the network thread of a peer with a queue and a thread.
* With setInputHandler(0) drops frames simulating network packet loss.
*/
class NetworkQueue : public sys::Runnable
{
public:
- NetworkQueue(const char* r) : inputHandler(0), receiver(r) {
+ NetworkQueue(const char* r) : inputHandler(0), connectionHandler(0), receiver(r) {
thread=sys::Thread(this);
}
@@ -70,17 +79,24 @@ class InProcessConnector :
}
void push(AMQFrame& f) { queue.push(f); }
+ void activateOutput() { queue.push(Task()); }
void run() {
try {
while(true) {
- AMQFrame f = queue.pop();
- if (inputHandler) {
- QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << f));
- inputHandler->handle(f);
+ Task t = queue.pop();
+ if (t.doOutput) {
+ if (connectionHandler) {
+ while (connectionHandler->doOutput());
+ }
+ } else {
+ if (inputHandler) {
+ QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << t.frame));
+ inputHandler->handle(t.frame);
+ }
+ else
+ QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << t.frame));
}
- else
- QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << f));
}
}
catch (const ClosedException&) {
@@ -88,16 +104,24 @@ class InProcessConnector :
}
}
+ void setConnectionInputHandler(ConnectionInputHandler* h) {
+ Lock l(lock);
+ inputHandler = h;
+ connectionHandler = h;
+ }
+
void setInputHandler(FrameHandler* h) {
Lock l(lock);
inputHandler = h;
+ connectionHandler = 0;
}
private:
sys::Mutex lock;
- sys::BlockingQueue<AMQFrame> queue;
+ sys::BlockingQueue<Task> queue;
sys::Thread thread;
FrameHandler* inputHandler;
+ ConnectionInputHandler* connectionHandler;
const char* const receiver;
};
@@ -105,11 +129,13 @@ class InProcessConnector :
Sender from;
NetworkQueue queue;
const char* const sender;
+ NetworkQueue* reverseQueue;
InProcessHandler(Sender s)
: from(s),
queue(from==CLIENT? "BROKER" : "CLIENT"),
- sender(from==BROKER? "BROKER" : "CLIENT")
+ sender(from==BROKER? "BROKER" : "CLIENT"),
+ reverseQueue(0)
{}
~InProcessHandler() { }
@@ -123,6 +149,10 @@ class InProcessConnector :
// Do not shut down the queue here, we may be in
// the queue's dispatch thread.
}
+
+ void activateOutput() {
+ if (reverseQueue) reverseQueue->activateOutput();
+ }
};
InProcessConnector(shared_ptr<broker::Broker> b,
@@ -135,7 +165,9 @@ class InProcessConnector :
clientOut(CLIENT),
isClosed(false)
{
- clientOut.queue.setInputHandler(&brokerConnection);
+ clientOut.queue.setConnectionInputHandler(&brokerConnection);
+ brokerOut.reverseQueue = &clientOut.queue;
+ clientOut.reverseQueue = &brokerOut.queue;
}
~InProcessConnector() {
@@ -169,7 +201,7 @@ class InProcessConnector :
/** Sliently discard frames sent by either party, lost network traffic. */
void discard() {
brokerOut.queue.setInputHandler(0);
- clientOut.queue.setInputHandler(0);
+ clientOut.queue.setConnectionInputHandler(0);
}
private: