summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-09-13 17:29:16 +0000
committerGordon Sim <gsim@apache.org>2007-09-13 17:29:16 +0000
commit0a1b3430450f274aee273a9f792a2d43f771b85f (patch)
tree71be3bc1a920a568c0680f8e8a5e802c1c3bee8d /cpp/src/qpid/client
parente00a1cfa3881e3bb8aadfecdf502f17903e319b1 (diff)
downloadqpid-python-0a1b3430450f274aee273a9f792a2d43f771b85f.tar.gz
Use frameset begin/end flags for determining frameset boundaries.
Set frameset & segment begin/end flags for content bearing methods (i.e. messages). git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@575377 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/AckMode.h83
-rw-r--r--cpp/src/qpid/client/ChannelHandler.cpp6
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp28
-rw-r--r--cpp/src/qpid/client/ClientChannel.h5
-rw-r--r--cpp/src/qpid/client/Correlator.cpp5
-rw-r--r--cpp/src/qpid/client/Correlator.h4
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp49
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.h1
-rw-r--r--cpp/src/qpid/client/FutureResponse.cpp2
-rw-r--r--cpp/src/qpid/client/FutureResponse.h2
10 files changed, 76 insertions, 109 deletions
diff --git a/cpp/src/qpid/client/AckMode.h b/cpp/src/qpid/client/AckMode.h
index 9ad5ef925c..f565c1d36b 100644
--- a/cpp/src/qpid/client/AckMode.h
+++ b/cpp/src/qpid/client/AckMode.h
@@ -1,72 +1,25 @@
#ifndef _client_AckMode_h
#define _client_AckMode_h
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-namespace qpid {
-namespace client {
-
-/**
- * The available acknowledgements modes.
- *
- * \ingroup clientapi
- */
-enum AckMode {
- /** No acknowledgement will be sent, broker can
- discard messages as soon as they are delivered
- to a consumer using this mode. **/
- NO_ACK = 0,
- /** Each message will be automatically
- acknowledged as soon as it is delivered to the
- application **/
- AUTO_ACK = 1,
- /** Acknowledgements will be sent automatically,
- but not for each message. **/
- LAZY_ACK = 2,
- /** The application is responsible for explicitly
- acknowledging messages. **/
- CLIENT_ACK = 3
-};
-
-}} // namespace qpid::client
-
-
-
-#endif /*!_client_AckMode_h*/
-#ifndef _client_AckMode_h
-#define _client_AckMode_h
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
@@ -97,6 +50,4 @@ enum AckMode {
}} // namespace qpid::client
-
-
-#endif /*!_client_AckMode_h*/
+#endif
diff --git a/cpp/src/qpid/client/ChannelHandler.cpp b/cpp/src/qpid/client/ChannelHandler.cpp
index c9b7a68f38..49e7285a47 100644
--- a/cpp/src/qpid/client/ChannelHandler.cpp
+++ b/cpp/src/qpid/client/ChannelHandler.cpp
@@ -58,7 +58,7 @@ void ChannelHandler::incoming(AMQFrame& frame)
if (body->getMethod())
handleMethod(body->getMethod());
else
- throw new ConnectionException(504, "Channel not open.");
+ throw ConnectionException(504, "Channel not open for content.");
}
}
@@ -68,7 +68,7 @@ void ChannelHandler::outgoing(AMQFrame& frame)
frame.setChannel(id);
out(frame);
} else if (getState() == CLOSED) {
- throw Exception("Channel not open");
+ throw Exception(QPID_MSG("Channel not open, can't send " << frame));
} else if (getState() == CLOSED_BY_PEER) {
throw ChannelException(code, text);
}
@@ -120,7 +120,7 @@ void ChannelHandler::handleMethod(AMQMethodBody* method)
} //else just ignore it
break;
case CLOSED:
- throw ConnectionException(504, "Channel not opened.");
+ throw ConnectionException(504, "Channel is closed.");
default:
throw Exception("Unexpected state encountered in ChannelHandler!");
}
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index 87062e1470..0a85b8e4f0 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -69,7 +69,7 @@ Channel::~Channel()
void Channel::open(const Session& s)
{
- Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(stopLock);
if (isOpen())
THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
active = true;
@@ -80,7 +80,7 @@ void Channel::open(const Session& s)
}
bool Channel::isOpen() const {
- Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(stopLock);
return active;
}
@@ -146,7 +146,7 @@ void Channel::consume(
Consumer& c = consumers[tag];
c.listener = listener;
c.ackMode = ackMode;
- c.lastDeliveryTag = 0;
+ c.count = 0;
}
uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1;
ScopedSync s(session, synch);
@@ -205,7 +205,7 @@ void Channel::close()
{
session.close();
{
- Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(stopLock);
active = false;
}
stop();
@@ -231,20 +231,18 @@ void Channel::join() {
void Channel::dispatch(FrameSet& content, const std::string& destination)
{
- MessageListener* listener(0);
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(destination);
- if (i != consumers.end()) {
- Message msg;
- msg.populate(content);
- listener = i->second.listener;
- }
- }
- if (listener) {
+ ConsumerMap::iterator i = consumers.find(destination);
+ if (i != consumers.end()) {
Message msg;
msg.populate(content);
+ MessageListener* listener = i->second.listener;
listener->received(msg);
+ if (isOpen() && i->second.ackMode != CLIENT_ACK) {
+ bool send = i->second.ackMode == AUTO_ACK
+ || (prefetch && ++(i->second.count) > (prefetch / 2));
+ if (send) i->second.count = 0;
+ session.execution().completed(content.getId(), true, send);
+ }
} else {
QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);
}
diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h
index b33af65d21..527f5d418f 100644
--- a/cpp/src/qpid/client/ClientChannel.h
+++ b/cpp/src/qpid/client/ClientChannel.h
@@ -63,8 +63,7 @@ class Channel : private sys::Runnable
struct Consumer{
MessageListener* listener;
AckMode ackMode;
- int count;
- u_int64_t lastDeliveryTag;
+ uint32_t count;
};
typedef std::map<std::string, Consumer> ConsumerMap;
@@ -75,7 +74,7 @@ class Channel : private sys::Runnable
const bool transactional;
framing::ProtocolVersion version;
- sys::Mutex stopLock;
+ mutable sys::Mutex stopLock;
bool running;
ConsumerMap consumers;
diff --git a/cpp/src/qpid/client/Correlator.cpp b/cpp/src/qpid/client/Correlator.cpp
index 9ef6857957..f30c92b992 100644
--- a/cpp/src/qpid/client/Correlator.cpp
+++ b/cpp/src/qpid/client/Correlator.cpp
@@ -25,14 +25,15 @@ using qpid::client::Correlator;
using namespace qpid::framing;
using namespace boost;
-void Correlator::receive(AMQMethodBody* response)
+bool Correlator::receive(const AMQMethodBody* response)
{
if (listeners.empty()) {
- throw ConnectionException(503, "Unexpected method!");//TODO: include the method & class name
+ return false;
} else {
Listener l = listeners.front();
if (l) l(response);
listeners.pop();
+ return true;
}
}
diff --git a/cpp/src/qpid/client/Correlator.h b/cpp/src/qpid/client/Correlator.h
index d93e7b66cd..45b22fb2fe 100644
--- a/cpp/src/qpid/client/Correlator.h
+++ b/cpp/src/qpid/client/Correlator.h
@@ -36,9 +36,9 @@ namespace client {
class Correlator
{
public:
- typedef boost::function<void(framing::AMQMethodBody*)> Listener;
+ typedef boost::function<void(const framing::AMQMethodBody*)> Listener;
- void receive(framing::AMQMethodBody*);
+ bool receive(const framing::AMQMethodBody*);
void listen(Listener l);
private:
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index 8ea2cc64e6..95cdc7032a 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -62,19 +62,16 @@ void ExecutionHandler::handle(AMQFrame& frame)
{
AMQBody* body = frame.getBody();
if (!invoke(body, this)) {
- if (isContentFrame(frame)) {
- if (!arriving) {
- arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm));
- }
- arriving->append(frame);
- if (arriving->isComplete()) {
+ if (!arriving) {
+ arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm));
+ }
+ arriving->append(frame);
+ if (arriving->isComplete()) {
+ if (arriving->isContentBearing() || !correlation.receive(arriving->getMethod())) {
demux.handle(arriving);
- arriving.reset();
- }
- } else {
- ++incoming.hwm;
- correlation.receive(body->getMethod());
- }
+ }
+ arriving.reset();
+ }
}
}
@@ -168,11 +165,19 @@ void ExecutionHandler::sendCompletion()
SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l)
{
+ return send(command, l, false);
+}
+
+SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l, bool hasContent)
+{
SequenceNumber id = ++outgoing.hwm;
if(l) {
completion.listenForResult(id, l);
}
AMQFrame frame(0/*channel will be filled in be channel handler*/, command);
+ if (hasContent) {
+ frame.setEof(false);
+ }
out(frame);
return id;
}
@@ -180,7 +185,7 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker:
SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content,
CompletionTracker::ResultListener l)
{
- SequenceNumber id = send(command, l);
+ SequenceNumber id = send(command, l, true);
sendContent(content);
return id;
}
@@ -188,14 +193,16 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodConten
void ExecutionHandler::sendContent(const MethodContent& content)
{
AMQFrame header(0, content.getHeader());
- out(header);
-
+ header.setBof(false);
u_int64_t data_length = content.getData().length();
if(data_length > 0){
+ header.setEof(false);
+ out(header);
//frame itself uses 8 bytes
u_int32_t frag_size = maxFrameSize - 8;
if(data_length < frag_size){
AMQFrame frame(0, AMQContentBody(content.getData()));
+ frame.setBof(false);
out(frame);
}else{
u_int32_t offset = 0;
@@ -204,10 +211,20 @@ void ExecutionHandler::sendContent(const MethodContent& content)
u_int32_t length = remaining > frag_size ? frag_size : remaining;
string frag(content.getData().substr(offset, length));
AMQFrame frame(0, AMQContentBody(frag));
- out(frame);
+ frame.setBof(false);
+ if (offset > 0) {
+ frame.setBos(false);
+ }
offset += length;
remaining = data_length - offset;
+ if (remaining) {
+ frame.setEos(false);
+ frame.setEof(false);
+ }
+ out(frame);
}
}
+ } else {
+ out(header);
}
}
diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h
index 88424b555a..5f9cdff9d2 100644
--- a/cpp/src/qpid/client/ExecutionHandler.h
+++ b/cpp/src/qpid/client/ExecutionHandler.h
@@ -59,6 +59,7 @@ class ExecutionHandler :
void sendCompletion();
+ framing::SequenceNumber send(const framing::AMQBody&, CompletionTracker::ResultListener, bool hasContent);
void sendContent(const framing::MethodContent&);
public:
diff --git a/cpp/src/qpid/client/FutureResponse.cpp b/cpp/src/qpid/client/FutureResponse.cpp
index afdd35c5eb..73b7c3a7a6 100644
--- a/cpp/src/qpid/client/FutureResponse.cpp
+++ b/cpp/src/qpid/client/FutureResponse.cpp
@@ -35,7 +35,7 @@ AMQMethodBody* FutureResponse::getResponse(SessionCore& session)
return response.get();
}
-void FutureResponse::received(AMQMethodBody* r)
+void FutureResponse::received(const AMQMethodBody* r)
{
Monitor::ScopedLock l(lock);
response = *r;
diff --git a/cpp/src/qpid/client/FutureResponse.h b/cpp/src/qpid/client/FutureResponse.h
index 1e8a7eb456..df3b7c6f30 100644
--- a/cpp/src/qpid/client/FutureResponse.h
+++ b/cpp/src/qpid/client/FutureResponse.h
@@ -36,7 +36,7 @@ class FutureResponse : public FutureCompletion
framing::MethodHolder response;
public:
framing::AMQMethodBody* getResponse(SessionCore& session);
- void received(framing::AMQMethodBody* response);
+ void received(const framing::AMQMethodBody* response);
};
}}