/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #ifndef _Subscription_ #define _Subscription_ #include "SubscriptionManager.h" #include #include #include #include #include #include namespace qpid { namespace client { SubscriptionManager::SubscriptionManager(const Session& s) : dispatcher(s), session(s), flowControl(UNLIMITED, UNLIMITED, false), acceptMode(0), acquireMode(0), autoStop(true) {} void SubscriptionManager::subscribeInternal( const std::string& q, const std::string& dest, const FlowControl& fc) { session.messageSubscribe( arg::queue=q, arg::destination=dest, arg::acceptMode=acceptMode, arg::acquireMode=acquireMode); if (fc.messages || fc.bytes) // No need to set if all 0. setFlowControl(dest, fc); } void SubscriptionManager::subscribe( MessageListener& listener, const std::string& q, const std::string& d) { subscribe(listener, q, getFlowControl(), d); } void SubscriptionManager::subscribe( MessageListener& listener, const std::string& q, const FlowControl& fc, const std::string& d) { std::string dest=d.empty() ? q:d; dispatcher.listen(dest, &listener, autoAck); return subscribeInternal(q, dest, fc); } void SubscriptionManager::subscribe( LocalQueue& lq, const std::string& q, const std::string& d) { subscribe(lq, q, getFlowControl(), d); } void SubscriptionManager::subscribe( LocalQueue& lq, const std::string& q, const FlowControl& fc, const std::string& d) { std::string dest=d.empty() ? q:d; lq.session=session; lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest)); return subscribeInternal(q, dest, fc); } void SubscriptionManager::setFlowControl( const std::string& dest, uint32_t messages, uint32_t bytes, bool window) { session.messageSetFlowMode(dest, window); session.messageFlow(dest, 0, messages); session.messageFlow(dest, 1, bytes); session.sync(); } void SubscriptionManager::setFlowControl(const std::string& dest, const FlowControl& fc) { setFlowControl(dest, fc.messages, fc.bytes, fc.window); } void SubscriptionManager::setFlowControl(const FlowControl& fc) { flowControl=fc; } void SubscriptionManager::setFlowControl( uint32_t messages_, uint32_t bytes_, bool window_) { setFlowControl(FlowControl(messages_, bytes_, window_)); } const FlowControl& SubscriptionManager::getFlowControl() const { return flowControl; } void SubscriptionManager::setAcceptMode(bool c) { acceptMode=c; } void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; } void SubscriptionManager::setAckPolicy(const AckPolicy& a) { autoAck=a; } AckPolicy& SubscriptionManager::getAckPolicy() { return autoAck; } void SubscriptionManager::cancel(const std::string dest) { sync(session).messageCancel(dest); dispatcher.cancel(dest); } void SubscriptionManager::setAutoStop(bool set) { autoStop=set; } void SubscriptionManager::run() { dispatcher.setAutoStop(autoStop); dispatcher.run(); } void SubscriptionManager::start() { dispatcher.setAutoStop(autoStop); dispatcher.start(); } void SubscriptionManager::stop() { dispatcher.stop(); } bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) { LocalQueue lq; std::string unique = framing::Uuid(true).str(); subscribe(lq, queue, FlowControl::messageCredit(1), unique); AutoCancel ac(*this, unique); //first wait for message to be delivered if a timeout has been specified if (timeout && lq.get(result, timeout)) return true; //make sure message is not on queue before final check sync(session).messageFlush(unique); return lq.get(result, 0); } Session SubscriptionManager::getSession() const { return session; } void SubscriptionManager::registerFailoverHandler (boost::function fh) { dispatcher.registerFailoverHandler(fh); } }} // namespace qpid::client #endif