summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-11-27 21:23:22 +0000
committerAlan Conway <aconway@apache.org>2007-11-27 21:23:22 +0000
commit3d6a67e8ba5a0de453af9ab2e21422b3906b6897 (patch)
tree05e966f24272d7044f160a09f9659c12c916f0e9 /cpp/src/qpid/client
parent43e9a596bf6089e7a2c6949c522e353e6ff59544 (diff)
downloadqpid-python-3d6a67e8ba5a0de453af9ab2e21422b3906b6897.tar.gz
perftest improvements.
NOTE: options have changed, see perftest --help. - Supports multiple publishers. - Subscribers set credit to receive exactly the expected no. of messages. - All transfers unconfirmed by default. client/Connector.cpp: Added connector ID to RECV/SENT logging client/Completion.h: Added default ctor. broker/Broker.cpp: --ack defaults to 0 - session acks disabled. client/SessionCore.cpp: Ignore surplus frames in CLOSING state. log/Options.cpp: By default log to stdout instead of stderr. Easier to grep. framing/AMQContentBody.h: Log message content even in NDEBUG mode. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@598770 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/Completion.h2
-rw-r--r--cpp/src/qpid/client/Connector.cpp6
-rw-r--r--cpp/src/qpid/client/SessionCore.cpp23
-rw-r--r--cpp/src/qpid/client/SessionCore.h1
4 files changed, 25 insertions, 7 deletions
diff --git a/cpp/src/qpid/client/Completion.h b/cpp/src/qpid/client/Completion.h
index a126bc9766..4d324aaf28 100644
--- a/cpp/src/qpid/client/Completion.h
+++ b/cpp/src/qpid/client/Completion.h
@@ -36,6 +36,8 @@ protected:
shared_ptr<SessionCore> session;
public:
+ Completion() {}
+
Completion(Future f, shared_ptr<SessionCore> s) : future(f), session(s) {}
void sync()
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index 80d97b10aa..497288bc3f 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -108,7 +108,7 @@ void Connector::send(AMQFrame& frame){
writeFrameQueue.push(frame);
aio->queueWrite();
- QPID_LOG(trace, "SENT: " << frame);
+ QPID_LOG(trace, "SENT [" << this << "]: " << frame);
}
void Connector::handleClosed() {
@@ -180,8 +180,8 @@ void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
AMQFrame frame;
while(frame.decode(in)){
- QPID_LOG(trace, "RECV: " << frame);
- input->received(frame);
+ QPID_LOG(trace, "RECV [" << this << "]: " << frame);
+ input->received(frame);
}
// TODO: unreading needs to go away, and when we can cope
// with multiple sub-buffers in the general buffer scheme, it will
diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp
index 8eab54fa62..3a26734892 100644
--- a/cpp/src/qpid/client/SessionCore.cpp
+++ b/cpp/src/qpid/client/SessionCore.cpp
@@ -87,7 +87,6 @@ inline void SessionCore::waitFor(State s) {
// We can be CLOSED or SUSPENDED by error at any time.
state.waitFor(States(s, CLOSED, SUSPENDED));
check();
- assert(state==s);
invariant();
}
@@ -97,7 +96,8 @@ SessionCore::SessionCore(shared_ptr<ConnectionImpl> conn,
sync(false),
channel(ch),
proxy(channel),
- state(OPENING)
+ state(OPENING),
+ detachedLifetime(0)
{
l3.out = &out;
attaching(conn);
@@ -166,10 +166,11 @@ FrameSet::shared_ptr SessionCore::get() { // user thread
static const std::string CANNOT_REOPEN_SESSION="Cannot re-open a session.";
-void SessionCore::open(uint32_t detachedLifetime) { // user thread
+void SessionCore::open(uint32_t timeout) { // user thread
Lock l(state);
check(state==OPENING && !session,
COMMAND_INVALID, CANNOT_REOPEN_SESSION);
+ detachedLifetime=timeout;
proxy.open(detachedLifetime);
waitFor(OPEN);
}
@@ -364,8 +365,22 @@ Future SessionCore::send(const AMQBody& command, const MethodContent& content)
return Future(l3.send(command, content));
}
+namespace {
+bool isCloseResponse(const AMQFrame& frame) {
+ return frame.getMethod() &&
+ frame.getMethod()->amqpClassId() == SESSION_CLASS_ID &&
+ frame.getMethod()->amqpMethodId() == SESSION_CLOSED_METHOD_ID;
+}
+}
+
// Network thread.
void SessionCore::handleIn(AMQFrame& frame) {
+ {
+ Lock l(state);
+ // Ignore frames received while closing other than closed response.
+ if (state==CLOSING && !isCloseResponse(frame))
+ return;
+ }
try {
// Cast to expose private SessionHandler functions.
if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
@@ -382,7 +397,7 @@ void SessionCore::handleOut(AMQFrame& frame)
{
Lock l(state);
if (state==OPEN) {
- if (session->sent(frame))
+ if (detachedLifetime > 0 && session->sent(frame))
proxy.solicitAck();
channel.handle(frame);
}
diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h
index 38c72359a3..2bb0f41fbf 100644
--- a/cpp/src/qpid/client/SessionCore.h
+++ b/cpp/src/qpid/client/SessionCore.h
@@ -133,6 +133,7 @@ class SessionCore : public framing::FrameHandler::InOutHandler,
framing::ChannelHandler channel;
framing::AMQP_ServerProxy::Session proxy;
mutable StateMonitor state;
+ uint32_t detachedLifetime;
};
}} // namespace qpid::client