summaryrefslogtreecommitdiff
path: root/cpp/lib/client/ClientChannel.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-02-21 19:25:45 +0000
committerAlan Conway <aconway@apache.org>2007-02-21 19:25:45 +0000
commit876d0b94c37f252b08c81656386100fad18a8a46 (patch)
tree4840b0d697d4629fd5c518507b58fceb7de1578a /cpp/lib/client/ClientChannel.cpp
parentc36fb4454be5ce4311aa5f5d0e5683db713c5545 (diff)
downloadqpid-python-876d0b94c37f252b08c81656386100fad18a8a46.tar.gz
Thread safety fixes for race conditions on incoming messages.
* cpp/lib/client/MessageListener.h: const correctness. * cpp/tests/*: MessageListener const change. * cpp/lib/broker/Content.h: Removed out-of-date FIXME comments. * cpp/lib/client/ClientChannel.h/ .cpp(): - added locking for consumers map and other member access. - refactored implementations of Basic get, deliver, return: most logic now encapsulted in IncomingMessage class. - fix channel close problems. * cpp/lib/client/ClientMessage.h/.cpp: - const correctness & API convenience fixes. - getMethod/setMethod/getHeader: for new IncomingMessage * cpp/lib/client/Connection.h/.cpp: - Fixes to channel closure. * cpp/lib/client/IncomingMessage.h/.cpp: - Encapsulate *all* incoming message handling for client. - Moved handling of BasicGetOk to IncomingMessage to fix race. - Thread safety fixes. * cpp/lib/client/ResponseHandler.h/.cpp: - added getResponse for ClientChannel. * cpp/lib/common/Exception.h: - added missing throwSelf implementations. - added ShutdownException as general purpose shut-down indicator. - added EmptyException as general purpose "empty" indicator. * cpp/lib/common/sys/Condition|Monitor|Mutex.h|.cpp: - Condition variable abstraction extracted from Monitor for situations where a single lock is associated with multiple conditions. * cpp/tests/ClientChannelTest.cpp: - Test incoming message transfer, get, consume etc. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@510161 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client/ClientChannel.cpp')
-rw-r--r--cpp/lib/client/ClientChannel.cpp302
1 files changed, 130 insertions, 172 deletions
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp
index 42e5cf3054..a8fa219c16 100644
--- a/cpp/lib/client/ClientChannel.cpp
+++ b/cpp/lib/client/ClientChannel.cpp
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include <iostream>
#include <ClientChannel.h>
#include <sys/Monitor.h>
#include <ClientMessage.h>
@@ -29,17 +30,14 @@
// handling of errors that should close the connection or the channel.
// Make sure the user thread receives a connection in each case.
//
-
-using namespace boost; //to use dynamic_pointer_cast
+using namespace std;
+using namespace boost;
using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
-const std::string Channel::OK("OK");
-
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
connection(0),
- incoming(0),
prefetch(_prefetch),
transactional(_transactional)
{ }
@@ -106,8 +104,8 @@ void Channel::protocolInit(
ConnectionRedirectBody::shared_ptr redirect(
shared_polymorphic_downcast<ConnectionRedirectBody>(
responses.getResponse()));
- std::cout << "Received redirection to " << redirect->getHost()
- << std::endl;
+ cout << "Received redirection to " << redirect->getHost()
+ << endl;
} else {
THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
}
@@ -183,11 +181,11 @@ void Channel::consume(
Queue& queue, std::string& tag, MessageListener* listener,
int ackMode, bool noLocal, bool synch, const FieldTable* fields)
{
- string q = queue.getName();
sendAndReceiveSync<BasicConsumeOkBody>(
synch,
new BasicConsumeBody(
- version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch,
+ version, 0, queue.getName(), tag, noLocal,
+ ackMode == NO_ACK, false, !synch,
fields ? *fields : FieldTable()));
if (synch) {
BasicConsumeOkBody::shared_ptr response =
@@ -195,90 +193,78 @@ void Channel::consume(
responses.getResponse());
tag = response->getConsumerTag();
}
- Consumer& c = consumers[tag];
- c.listener = listener;
- c.ackMode = ackMode;
- c.lastDeliveryTag = 0;
+ // FIXME aconway 2007-02-20: Race condition!
+ // We could receive the first message for the consumer
+ // before we create the consumer below.
+ // Move consumer creation to handler for BasicConsumeOkBody
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i != consumers.end())
+ THROW_QPID_ERROR(CLIENT_ERROR,
+ "Consumer already exists with tag="+tag);
+ Consumer& c = consumers[tag];
+ c.listener = listener;
+ c.ackMode = ackMode;
+ c.lastDeliveryTag = 0;
+ }
}
void Channel::cancel(const std::string& tag, bool synch) {
- ConsumerMap::iterator i = consumers.find(tag);
- if (i != consumers.end()) {
- Consumer& c = i->second;
- if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
- send(new BasicAckBody(version, c.lastDeliveryTag, true));
- sendAndReceiveSync<BasicCancelOkBody>(
- synch, new BasicCancelBody(version, tag, !synch));
- consumers.erase(tag);
+ Consumer c;
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i == consumers.end())
+ return;
+ c = i->second;
+ consumers.erase(i);
}
+ if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
+ send(new BasicAckBody(version, c.lastDeliveryTag, true));
+ sendAndReceiveSync<BasicCancelOkBody>(
+ synch, new BasicCancelBody(version, tag, !synch));
}
void Channel::cancelAll(){
- while(!consumers.empty()) {
- Consumer c = consumers.begin()->second;
- consumers.erase(consumers.begin());
+ ConsumerMap consumersCopy;
+ {
+ Mutex::ScopedLock l(lock);
+ consumersCopy = consumers;
+ consumers.clear();
+ }
+ for (ConsumerMap::iterator i=consumersCopy.begin();
+ i != consumersCopy.end(); ++i)
+ {
+ Consumer& c = i->second;
if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
&& c.lastDeliveryTag > 0)
{
- // Let exceptions propagate, if one fails no point
- // trying the rest. NB no memory leaks if we do,
- // ConsumerMap holds values, not pointers.
- //
send(new BasicAckBody(version, c.lastDeliveryTag, true));
}
}
}
-void Channel::retrieve(Message& msg){
- Monitor::ScopedLock l(retrievalMonitor);
- while(retrieved == 0){
- retrievalMonitor.wait();
- }
-
- msg.header = retrieved->getHeader();
- msg.deliveryTag = retrieved->getDeliveryTag();
- msg.data = retrieved->getData();
- delete retrieved;
- retrieved = 0;
-}
-
bool Channel::get(Message& msg, const Queue& queue, int ackMode) {
- string name = queue.getName();
- responses.expect();
- send(new BasicGetBody(version, 0, name, ackMode));
- responses.waitForResponse();
- AMQMethodBody::shared_ptr response = responses.getResponse();
- if(response->isA<BasicGetOkBody>()) {
- if(incoming != 0){
- std::cout << "Existing message not complete" << std::endl;
- // FIXME aconway 2007-01-26: close the connection? the channel?
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
- }else{
- incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response));
- }
- retrieve(msg);
- return true;
- }if(response->isA<BasicGetEmptyBody>()){
- return false;
- }else{
- // FIXME aconway 2007-01-26: must close the connection.
- THROW_QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame");
- }
+ // Expect a message starting with a BasicGetOk
+ incoming.startGet();
+ send(new BasicGetBody(version, 0, queue.getName(), ackMode));
+ return incoming.waitGet(msg);
}
-void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){
- // FIXME aconway 2007-01-30: Rework for message class.
-
- string e = exchange.getName();
+void Channel::publish(
+ const Message& msg, const Exchange& exchange,
+ const std::string& routingKey, bool mandatory, bool immediate)
+{
+ // FIXME aconway 2007-01-30: Rework for 0-9 message class.
+ const string e = exchange.getName();
string key = routingKey;
send(new BasicPublishBody(version, 0, e, key, mandatory, immediate));
//break msg up into header frame and content frame(s) and send these
- string data = msg.getData();
- msg.header->setContentSize(data.length());
send(msg.header);
-
+ string data = msg.getData();
u_int64_t data_length = data.length();
if(data_length > 0){
u_int32_t frag_size = connection->getMaxFrameSize() - 8;//frame itself uses 8 bytes
@@ -312,30 +298,30 @@ void Channel::handleMethodInContext(
{
//channel.flow, channel.close, basic.deliver, basic.return or a
//response to a synchronous request
- if(responses.isWaiting()){
+ if(responses.isWaiting()) {
responses.signalResponse(body);
- }else if(body->isA<BasicDeliverBody>()) {
- if(incoming != 0){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
- std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
- }else{
- incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body));
- }
- }else if(body->isA<BasicReturnBody>()){
- if(incoming != 0){
- std::cout << "Existing message not complete" << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
- }else{
- incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body));
- }
- }else if(body->isA<ChannelCloseBody>()){
+ return;
+ }
+
+ if(body->isA<BasicDeliverBody>()
+ || body->isA<BasicReturnBody>()
+ || body->isA<BasicGetOkBody>()
+ || body->isA<BasicGetEmptyBody>())
+
+ {
+ incoming.add(body);
+ return;
+ }
+ else if(body->isA<ChannelCloseBody>()) {
peerClose(shared_polymorphic_downcast<ChannelCloseBody>(body));
- }else if(body->isA<ChannelFlowBody>()){
- // TODO aconway 2007-01-24:
- }else if(body->isA<ConnectionCloseBody>()){
+ }
+ else if(body->isA<ChannelFlowBody>()){
+ // TODO aconway 2007-01-24: not implemented yet.
+ }
+ else if(body->isA<ConnectionCloseBody>()){
connection->close();
- }else{
+ }
+ else {
connection->close(
504, "Unrecognised method",
body->amqpClassId(), body->amqpMethodId());
@@ -343,31 +329,13 @@ void Channel::handleMethodInContext(
}
void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
- if(incoming == 0){
- //handle invalid frame sequence
- std::cout << "Invalid message sequence: got header before return or deliver." << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver.");
- }else{
- incoming->setHeader(body);
- if(incoming->isComplete()){
- enqueue();
- }
- }
+ incoming.add(body);
}
void Channel::handleContent(AMQContentBody::shared_ptr body){
- if(incoming == 0){
- //handle invalid frame sequence
- std::cout << "Invalid message sequence: got content before return or deliver." << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver.");
- }else{
- incoming->addContent(body);
- if(incoming->isComplete()){
- enqueue();
- }
- }
+ incoming.add(body);
}
-
+
void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat");
}
@@ -376,35 +344,6 @@ void Channel::start(){
dispatcher = Thread(this);
}
-void Channel::run(){
- dispatch();
-}
-
-void Channel::enqueue(){
- Monitor::ScopedLock l(retrievalMonitor);
- if(incoming->isResponse()){
- retrieved = incoming;
- retrievalMonitor.notify();
- }else{
- messages.push(incoming);
- dispatchMonitor.notify();
- }
- incoming = 0;
-}
-
-IncomingMessage* Channel::dequeue(){
- Monitor::ScopedLock l(dispatchMonitor);
- while(messages.empty() && isOpen()){
- dispatchMonitor.wait();
- }
- IncomingMessage* msg = 0;
- if(!messages.empty()){
- msg = messages.front();
- messages.pop();
- }
- return msg;
-}
-
void Channel::deliver(Consumer& consumer, Message& msg){
//record delivery tag:
consumer.lastDeliveryTag = msg.getDeliveryTag();
@@ -412,8 +351,6 @@ void Channel::deliver(Consumer& consumer, Message& msg){
//allow registered listener to handle the message
consumer.listener->received(msg);
- //if the handler calls close on the channel or connection while
- //handling this message, then consumer will now have been deleted.
if(isOpen()){
bool multiple(false);
switch(consumer.ackMode){
@@ -432,35 +369,53 @@ void Channel::deliver(Consumer& consumer, Message& msg){
//a transaction until it commits.
}
-void Channel::dispatch(){
- while(isOpen()){
- IncomingMessage* incomingMsg = dequeue();
- if(incomingMsg){
- //Note: msg is currently only valid for duration of this call
- Message msg(incomingMsg->getHeader());
- msg.data = incomingMsg->getData();
- if(incomingMsg->isReturn()){
- if(returnsHandler == 0){
- //print warning to log/console
- std::cout << "Message returned: " << msg.getData() << std::endl;
- }else{
- returnsHandler->returned(msg);
+void Channel::run() {
+ while(isOpen()) {
+ try {
+ Message msg = incoming.waitDispatch();
+ if(msg.getMethod()->isA<BasicReturnBody>()) {
+ ReturnedMessageHandler* handler=0;
+ {
+ Mutex::ScopedLock l(lock);
+ handler=returnsHandler;
}
- }else{
- msg.deliveryTag = incomingMsg->getDeliveryTag();
- std::string tag = incomingMsg->getConsumerTag();
-
- if(consumers.find(tag) == consumers.end())
- std::cout << "Unknown consumer: " << tag << std::endl;
- else
- deliver(consumers[tag], msg);
+ if(handler == 0) {
+ // TODO aconway 2007-02-20: proper logging.
+ cout << "Message returned: " << msg.getData() << endl;
+ }
+ else
+ handler->returned(msg);
+ }
+ else {
+ BasicDeliverBody::shared_ptr deliverBody =
+ boost::shared_polymorphic_downcast<BasicDeliverBody>(
+ msg.getMethod());
+ std::string tag = deliverBody->getConsumerTag();
+ Consumer consumer;
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if(i == consumers.end())
+ THROW_QPID_ERROR(PROTOCOL_ERROR+504,
+ "Unknown consumer tag=" + tag);
+ consumer = i->second;
+ }
+ deliver(consumer, msg);
}
- delete incomingMsg;
+ }
+ catch (const ShutdownException&) {
+ /* Orderly shutdown */
+ }
+ catch (const Exception& e) {
+ // FIXME aconway 2007-02-20: Report exception to user.
+ cout << "client::Channel::run() terminated by: " << e.toString()
+ << "(" << typeid(e).name() << ")" << endl;
}
}
}
void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
+ Mutex::ScopedLock l(lock);
returnsHandler = handler;
}
@@ -469,13 +424,17 @@ void Channel::close(
u_int16_t code, const std::string& text,
ClassId classId, MethodId methodId)
{
- if (getId() != 0 && isOpen()) {
+ if (isOpen()) {
try {
- sendAndReceive<ChannelCloseOkBody>(
- new ChannelCloseBody(version, code, text, classId, methodId));
- cancelAll();
+ if (getId() != 0) {
+ sendAndReceive<ChannelCloseOkBody>(
+ new ChannelCloseBody(
+ version, code, text, classId, methodId));
+ }
+ static_cast<ConnectionForChannel*>(connection)->erase(getId());
closeInternal();
} catch (...) {
+ static_cast<ConnectionForChannel*>(connection)->erase(getId());
closeInternal();
throw;
}
@@ -491,14 +450,13 @@ void Channel::peerClose(ChannelCloseBody::shared_ptr) {
}
void Channel::closeInternal() {
- assert(isOpen());
+ if (isOpen());
{
- Monitor::ScopedLock l(dispatchMonitor);
- static_cast<ConnectionForChannel*>(connection)->erase(getId());
+ cancelAll();
+ incoming.shutdown();
connection = 0;
// A 0 response means we are closed.
responses.signalResponse(AMQMethodBody::shared_ptr());
- dispatchMonitor.notify();
}
dispatcher.join();
}