diff options
| author | Alan Conway <aconway@apache.org> | 2007-11-27 21:23:22 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-11-27 21:23:22 +0000 |
| commit | 3d6a67e8ba5a0de453af9ab2e21422b3906b6897 (patch) | |
| tree | 05e966f24272d7044f160a09f9659c12c916f0e9 /cpp/src/qpid/client | |
| parent | 43e9a596bf6089e7a2c6949c522e353e6ff59544 (diff) | |
| download | qpid-python-3d6a67e8ba5a0de453af9ab2e21422b3906b6897.tar.gz | |
perftest improvements.
NOTE: options have changed, see perftest --help.
- Supports multiple publishers.
- Subscribers set credit to receive exactly the expected no. of messages.
- All transfers unconfirmed by default.
client/Connector.cpp: Added connector ID to RECV/SENT logging
client/Completion.h: Added default ctor.
broker/Broker.cpp: --ack defaults to 0 - session acks disabled.
client/SessionCore.cpp: Ignore surplus frames in CLOSING state.
log/Options.cpp: By default log to stdout instead of stderr. Easier to grep.
framing/AMQContentBody.h: Log message content even in NDEBUG mode.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@598770 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/Completion.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionCore.cpp | 23 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionCore.h | 1 |
4 files changed, 25 insertions, 7 deletions
diff --git a/cpp/src/qpid/client/Completion.h b/cpp/src/qpid/client/Completion.h index a126bc9766..4d324aaf28 100644 --- a/cpp/src/qpid/client/Completion.h +++ b/cpp/src/qpid/client/Completion.h @@ -36,6 +36,8 @@ protected: shared_ptr<SessionCore> session; public: + Completion() {} + Completion(Future f, shared_ptr<SessionCore> s) : future(f), session(s) {} void sync() diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 80d97b10aa..497288bc3f 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -108,7 +108,7 @@ void Connector::send(AMQFrame& frame){ writeFrameQueue.push(frame); aio->queueWrite(); - QPID_LOG(trace, "SENT: " << frame); + QPID_LOG(trace, "SENT [" << this << "]: " << frame); } void Connector::handleClosed() { @@ -180,8 +180,8 @@ void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { AMQFrame frame; while(frame.decode(in)){ - QPID_LOG(trace, "RECV: " << frame); - input->received(frame); + QPID_LOG(trace, "RECV [" << this << "]: " << frame); + input->received(frame); } // TODO: unreading needs to go away, and when we can cope // with multiple sub-buffers in the general buffer scheme, it will diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index 8eab54fa62..3a26734892 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -87,7 +87,6 @@ inline void SessionCore::waitFor(State s) { // We can be CLOSED or SUSPENDED by error at any time. state.waitFor(States(s, CLOSED, SUSPENDED)); check(); - assert(state==s); invariant(); } @@ -97,7 +96,8 @@ SessionCore::SessionCore(shared_ptr<ConnectionImpl> conn, sync(false), channel(ch), proxy(channel), - state(OPENING) + state(OPENING), + detachedLifetime(0) { l3.out = &out; attaching(conn); @@ -166,10 +166,11 @@ FrameSet::shared_ptr SessionCore::get() { // user thread static const std::string CANNOT_REOPEN_SESSION="Cannot re-open a session."; -void SessionCore::open(uint32_t detachedLifetime) { // user thread +void SessionCore::open(uint32_t timeout) { // user thread Lock l(state); check(state==OPENING && !session, COMMAND_INVALID, CANNOT_REOPEN_SESSION); + detachedLifetime=timeout; proxy.open(detachedLifetime); waitFor(OPEN); } @@ -364,8 +365,22 @@ Future SessionCore::send(const AMQBody& command, const MethodContent& content) return Future(l3.send(command, content)); } +namespace { +bool isCloseResponse(const AMQFrame& frame) { + return frame.getMethod() && + frame.getMethod()->amqpClassId() == SESSION_CLASS_ID && + frame.getMethod()->amqpMethodId() == SESSION_CLOSED_METHOD_ID; +} +} + // Network thread. void SessionCore::handleIn(AMQFrame& frame) { + { + Lock l(state); + // Ignore frames received while closing other than closed response. + if (state==CLOSING && !isCloseResponse(frame)) + return; + } try { // Cast to expose private SessionHandler functions. if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { @@ -382,7 +397,7 @@ void SessionCore::handleOut(AMQFrame& frame) { Lock l(state); if (state==OPEN) { - if (session->sent(frame)) + if (detachedLifetime > 0 && session->sent(frame)) proxy.solicitAck(); channel.handle(frame); } diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h index 38c72359a3..2bb0f41fbf 100644 --- a/cpp/src/qpid/client/SessionCore.h +++ b/cpp/src/qpid/client/SessionCore.h @@ -133,6 +133,7 @@ class SessionCore : public framing::FrameHandler::InOutHandler, framing::ChannelHandler channel; framing::AMQP_ServerProxy::Session proxy; mutable StateMonitor state; + uint32_t detachedLifetime; }; }} // namespace qpid::client |
