diff options
author | Alan Conway <aconway@apache.org> | 2007-09-25 18:16:02 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-09-25 18:16:02 +0000 |
commit | 00b761b3b6d80ee2bb3e538face881748efb2b09 (patch) | |
tree | 59c1b38093bb0cd993863f8c72cd8d22a3aa7bb9 /cpp/src/qpid/client/Channel.cpp | |
parent | bbdaa6ec54ad9d04baa5ae1cb4d99c0387aa7d9d (diff) | |
download | qpid-python-00b761b3b6d80ee2bb3e538face881748efb2b09.tar.gz |
Renamed the following files for consistency:
broker/BrokerExchange.cpp -> Exchange.cpp
broker/BrokerExchange.h -> Exchange.h
broker/BrokerQueue.cpp -> Queue.cpp
broker/BrokerQueue.h -> Queue.h
client/ClientChannel.cpp -> Channel.cpp
client/ClientChannel.h -> Channel.h
client/ClientConnection.cpp -> Connection.cpp
client/ClientExchange.cpp -> Exchange.cpp
client/ClientExchange.h -> Exchange.h
client/ClientMessage.h -> Message.h
client/ClientQueue.cpp -> Queue.cpp
client/ClientQueue.h -> Queue.h
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@579340 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/Channel.cpp')
-rw-r--r-- | cpp/src/qpid/client/Channel.cpp | 271 |
1 files changed, 271 insertions, 0 deletions
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp new file mode 100644 index 0000000000..cef34630db --- /dev/null +++ b/cpp/src/qpid/client/Channel.cpp @@ -0,0 +1,271 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/log/Statement.h" +#include <iostream> +#include <sstream> +#include "Channel.h" +#include "qpid/sys/Monitor.h" +#include "Message.h" +#include "qpid/QpidError.h" +#include "Connection.h" +#include "Demux.h" +#include "FutureResponse.h" +#include "MessageListener.h" +#include "MessageQueue.h" +#include <boost/format.hpp> +#include <boost/bind.hpp> +#include "qpid/framing/all_method_bodies.h" + +// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent +// handling of errors that should close the connection or the channel. +// Make sure the user thread receives a connection in each case. +// +using namespace std; +using namespace boost; +using namespace qpid::framing; +using namespace qpid::sys; + +namespace qpid{ +namespace client{ + +const std::string empty; + +class ScopedSync +{ + Session& session; + public: + ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); } + ~ScopedSync() { session.setSynchronous(false); } +}; + +Channel::Channel(bool _transactional, u_int16_t _prefetch) : + prefetch(_prefetch), transactional(_transactional), running(false), + uniqueId(true)/*could eventually be the session id*/, nameCounter(0), active(false) +{ +} + +Channel::~Channel() +{ + join(); +} + +void Channel::open(const Session& s) +{ + Mutex::ScopedLock l(stopLock); + if (isOpen()) + THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel"); + active = true; + session = s; + if(isTransactional()) { + session.txSelect(); + } +} + +bool Channel::isOpen() const { + Mutex::ScopedLock l(stopLock); + return active; +} + +void Channel::setPrefetch(uint32_t _prefetch){ + prefetch = _prefetch; +} + +void Channel::declareExchange(Exchange& _exchange, bool synch){ + ScopedSync s(session, synch); + session.exchangeDeclare_(exchange=_exchange.getName(), type=_exchange.getType()); +} + +void Channel::deleteExchange(Exchange& _exchange, bool synch){ + ScopedSync s(session, synch); + session.exchangeDelete_(exchange=_exchange.getName(), ifUnused=false); +} + +void Channel::declareQueue(Queue& _queue, bool synch){ + if (_queue.getName().empty()) { + stringstream uniqueName; + uniqueName << uniqueId << "-queue-" << ++nameCounter; + _queue.setName(uniqueName.str()); + } + + ScopedSync s(session, synch); + session.queueDeclare_(queue=_queue.getName(), passive=false/*passive*/, durable=_queue.isDurable(), + exclusive=_queue.isExclusive(), autoDelete=_queue.isAutoDelete()); + +} + +void Channel::deleteQueue(Queue& _queue, bool ifunused, bool ifempty, bool synch){ + ScopedSync s(session, synch); + session.queueDelete_(queue=_queue.getName(), ifUnused=ifunused, ifEmpty=ifempty); +} + +void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ + string e = exchange.getName(); + string q = queue.getName(); + ScopedSync s(session, synch); + session.queueBind(0, q, e, key, args); +} + +void Channel::commit(){ + session.txCommit(); +} + +void Channel::rollback(){ + session.txRollback(); +} + +void Channel::consume( + Queue& _queue, const std::string& tag, MessageListener* listener, + AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { + + if (tag.empty()) { + throw Exception("A tag must be specified for a consumer."); + } + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i != consumers.end()) + throw Exception(boost::format("Consumer already exists with tag: '%1%'") % tag); + Consumer& c = consumers[tag]; + c.listener = listener; + c.ackMode = ackMode; + c.count = 0; + } + uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1; + ScopedSync s(session, synch); + session.messageSubscribe(0, _queue.getName(), tag, noLocal, + confirmMode, 0/*pre-acquire*/, + false, fields ? *fields : FieldTable()); + if (!prefetch) { + session.messageFlowMode(tag, 0/*credit based*/); + } + + //allocate some credit: + session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); + session.messageFlow(tag, 0/*MESSAGES*/, prefetch ? prefetch : 0xFFFFFFFF); +} + +void Channel::cancel(const std::string& tag, bool synch) { + Consumer c; + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i == consumers.end()) + return; + c = i->second; + consumers.erase(i); + } + ScopedSync s(session, synch); + session.messageCancel(tag); +} + +bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) { + string tag = "get-handler"; + ScopedDivert handler(tag, session.execution().getDemux()); + Demux::Queue& incoming = handler.getQueue(); + + session.messageSubscribe_(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1)); + session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); + session.messageFlow(tag, 0/*MESSAGES*/, 1); + Completion status = session.messageFlush(tag); + status.sync(); + session.messageCancel(tag); + + if (incoming.empty()) { + return false; + } else { + msg.populate(*(incoming.pop())); + if (ackMode == AUTO_ACK) msg.acknowledge(session, false, true); + return true; + } +} + +void Channel::publish(Message& msg, const Exchange& exchange, + const std::string& routingKey, + bool mandatory, bool /*?TODO-restore immediate?*/) { + + msg.getDeliveryProperties().setRoutingKey(routingKey); + msg.getDeliveryProperties().setDiscardUnroutable(!mandatory); + session.messageTransfer_(destination=exchange.getName(), content=msg); +} + +void Channel::close() +{ + session.close(); + { + Mutex::ScopedLock l(stopLock); + active = false; + } + stop(); +} + +void Channel::start(){ + running = true; + dispatcher = Thread(*this); +} + +void Channel::stop() { + gets.close(); + join(); +} + +void Channel::join() { + Mutex::ScopedLock l(stopLock); + if(running && dispatcher.id()) { + dispatcher.join(); + running = false; + } +} + +void Channel::dispatch(FrameSet& content, const std::string& destination) +{ + ConsumerMap::iterator i = consumers.find(destination); + if (i != consumers.end()) { + Message msg; + msg.populate(content); + MessageListener* listener = i->second.listener; + listener->received(msg); + if (isOpen() && i->second.ackMode != CLIENT_ACK) { + bool send = i->second.ackMode == AUTO_ACK + || (prefetch && ++(i->second.count) > (prefetch / 2)); + if (send) i->second.count = 0; + session.execution().completed(content.getId(), true, send); + } + } else { + QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination); + } +} + +void Channel::run() { + try { + while (true) { + FrameSet::shared_ptr content = session.get(); + //need to dispatch this to the relevant listener: + if (content->isA<MessageTransferBody>()) { + dispatch(*content, content->as<MessageTransferBody>()->getDestination()); + } else { + QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod()); + } + } + } catch (const QueueClosed&) {} +} + +}} + |