diff options
Diffstat (limited to 'cpp/src/tests/InProcessBroker.h')
| -rw-r--r-- | cpp/src/tests/InProcessBroker.h | 54 |
1 files changed, 43 insertions, 11 deletions
diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h index c893e6906a..9fa0135502 100644 --- a/cpp/src/tests/InProcessBroker.h +++ b/cpp/src/tests/InProcessBroker.h @@ -36,6 +36,7 @@ namespace qpid { +using qpid::sys::ConnectionInputHandler; /** * A client::Connector that connects directly to an in-process broker. @@ -54,13 +55,21 @@ class InProcessConnector : enum Sender {CLIENT,BROKER}; + struct Task { + AMQFrame frame; + bool doOutput; + + Task() : doOutput(true) {} + Task(AMQFrame& f) : frame(f), doOutput(false) {} + }; + /** Simulate the network thread of a peer with a queue and a thread. * With setInputHandler(0) drops frames simulating network packet loss. */ class NetworkQueue : public sys::Runnable { public: - NetworkQueue(const char* r) : inputHandler(0), receiver(r) { + NetworkQueue(const char* r) : inputHandler(0), connectionHandler(0), receiver(r) { thread=sys::Thread(this); } @@ -70,17 +79,24 @@ class InProcessConnector : } void push(AMQFrame& f) { queue.push(f); } + void activateOutput() { queue.push(Task()); } void run() { try { while(true) { - AMQFrame f = queue.pop(); - if (inputHandler) { - QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << f)); - inputHandler->handle(f); + Task t = queue.pop(); + if (t.doOutput) { + if (connectionHandler) { + while (connectionHandler->doOutput()); + } + } else { + if (inputHandler) { + QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << t.frame)); + inputHandler->handle(t.frame); + } + else + QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << t.frame)); } - else - QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << f)); } } catch (const ClosedException&) { @@ -88,16 +104,24 @@ class InProcessConnector : } } + void setConnectionInputHandler(ConnectionInputHandler* h) { + Lock l(lock); + inputHandler = h; + connectionHandler = h; + } + void setInputHandler(FrameHandler* h) { Lock l(lock); inputHandler = h; + connectionHandler = 0; } private: sys::Mutex lock; - sys::BlockingQueue<AMQFrame> queue; + sys::BlockingQueue<Task> queue; sys::Thread thread; FrameHandler* inputHandler; + ConnectionInputHandler* connectionHandler; const char* const receiver; }; @@ -105,11 +129,13 @@ class InProcessConnector : Sender from; NetworkQueue queue; const char* const sender; + NetworkQueue* reverseQueue; InProcessHandler(Sender s) : from(s), queue(from==CLIENT? "BROKER" : "CLIENT"), - sender(from==BROKER? "BROKER" : "CLIENT") + sender(from==BROKER? "BROKER" : "CLIENT"), + reverseQueue(0) {} ~InProcessHandler() { } @@ -123,6 +149,10 @@ class InProcessConnector : // Do not shut down the queue here, we may be in // the queue's dispatch thread. } + + void activateOutput() { + if (reverseQueue) reverseQueue->activateOutput(); + } }; InProcessConnector(shared_ptr<broker::Broker> b, @@ -135,7 +165,9 @@ class InProcessConnector : clientOut(CLIENT), isClosed(false) { - clientOut.queue.setInputHandler(&brokerConnection); + clientOut.queue.setConnectionInputHandler(&brokerConnection); + brokerOut.reverseQueue = &clientOut.queue; + clientOut.reverseQueue = &brokerOut.queue; } ~InProcessConnector() { @@ -169,7 +201,7 @@ class InProcessConnector : /** Sliently discard frames sent by either party, lost network traffic. */ void discard() { brokerOut.queue.setInputHandler(0); - clientOut.queue.setInputHandler(0); + clientOut.queue.setConnectionInputHandler(0); } private: |
