diff options
Diffstat (limited to 'qpid/cpp/src/client')
28 files changed, 0 insertions, 3667 deletions
diff --git a/qpid/cpp/src/client/AckMode.h b/qpid/cpp/src/client/AckMode.h deleted file mode 100644 index 9ad5ef925c..0000000000 --- a/qpid/cpp/src/client/AckMode.h +++ /dev/null @@ -1,102 +0,0 @@ -#ifndef _client_AckMode_h -#define _client_AckMode_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -namespace qpid { -namespace client { - -/** - * The available acknowledgements modes. - * - * \ingroup clientapi - */ -enum AckMode { - /** No acknowledgement will be sent, broker can - discard messages as soon as they are delivered - to a consumer using this mode. **/ - NO_ACK = 0, - /** Each message will be automatically - acknowledged as soon as it is delivered to the - application **/ - AUTO_ACK = 1, - /** Acknowledgements will be sent automatically, - but not for each message. **/ - LAZY_ACK = 2, - /** The application is responsible for explicitly - acknowledging messages. **/ - CLIENT_ACK = 3 -}; - -}} // namespace qpid::client - - - -#endif /*!_client_AckMode_h*/ -#ifndef _client_AckMode_h -#define _client_AckMode_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -namespace qpid { -namespace client { - -/** - * The available acknowledgements modes. - * - * \ingroup clientapi - */ -enum AckMode { - /** No acknowledgement will be sent, broker can - discard messages as soon as they are delivered - to a consumer using this mode. **/ - NO_ACK = 0, - /** Each message will be automatically - acknowledged as soon as it is delivered to the - application **/ - AUTO_ACK = 1, - /** Acknowledgements will be sent automatically, - but not for each message. **/ - LAZY_ACK = 2, - /** The application is responsible for explicitly - acknowledging messages. **/ - CLIENT_ACK = 3 -}; - -}} // namespace qpid::client - - - -#endif /*!_client_AckMode_h*/ diff --git a/qpid/cpp/src/client/BasicMessageChannel.cpp b/qpid/cpp/src/client/BasicMessageChannel.cpp deleted file mode 100644 index c577c0a305..0000000000 --- a/qpid/cpp/src/client/BasicMessageChannel.cpp +++ /dev/null @@ -1,345 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "BasicMessageChannel.h" -#include "../framing/AMQMethodBody.h" -#include "ClientChannel.h" -#include "ReturnedMessageHandler.h" -#include "MessageListener.h" -#include "../framing/FieldTable.h" -#include "Connection.h" -#include <queue> -#include <iostream> -#include <boost/format.hpp> -#include <boost/variant.hpp> - -namespace qpid { -namespace client { - -using namespace std; -using namespace sys; -using namespace framing; -using boost::format; - -namespace { - -// Destination name constants -const std::string BASIC_GET("__basic_get__"); -const std::string BASIC_RETURN("__basic_return__"); - -// Reference name constant -const std::string BASIC_REF("__basic_reference__"); -} - -BasicMessageChannel::BasicMessageChannel(Channel& ch) - : channel(ch), returnsHandler(0) -{ - incoming.addDestination(BASIC_RETURN, destDispatch); -} - -void BasicMessageChannel::consume( - Queue& queue, std::string& tag, MessageListener* listener, - AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) -{ - { - // Note we create a consumer even if tag="". In that case - // It will be renamed when we handle BasicConsumeOkBody. - // - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if (i != consumers.end()) - THROW_QPID_ERROR(CLIENT_ERROR, - "Consumer already exists with tag="+tag); - Consumer& c = consumers[tag]; - c.listener = listener; - c.ackMode = ackMode; - c.lastDeliveryTag = 0; - } - - // FIXME aconway 2007-03-23: get processed in both. - - // BasicConsumeOkBody is really processed in handle(), here - // we just pick up the tag to return to the user. - // - // We can't process it here because messages for the consumer may - // already be arriving. - // - BasicConsumeOkBody::shared_ptr ok = - channel.sendAndReceiveSync<BasicConsumeOkBody>( - synch, - make_shared_ptr(new BasicConsumeBody( - channel.version, 0, queue.getName(), tag, noLocal, - ackMode == NO_ACK, false, !synch, - fields ? *fields : FieldTable()))); - tag = ok->getConsumerTag(); -} - - -void BasicMessageChannel::cancel(const std::string& tag, bool synch) { - Consumer c; - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if (i == consumers.end()) - return; - c = i->second; - consumers.erase(i); - } - if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) - channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); - channel.sendAndReceiveSync<BasicCancelOkBody>( - synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch))); -} - -void BasicMessageChannel::close(){ - ConsumerMap consumersCopy; - { - Mutex::ScopedLock l(lock); - consumersCopy = consumers; - consumers.clear(); - } - destGet.shutdown(); - destDispatch.shutdown(); - for (ConsumerMap::iterator i=consumersCopy.begin(); - i != consumersCopy.end(); ++i) - { - Consumer& c = i->second; - if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) - && c.lastDeliveryTag > 0) - { - channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); - } - } -} - - -bool BasicMessageChannel::get( - Message& msg, const Queue& queue, AckMode ackMode) -{ - // Prepare for incoming response - incoming.addDestination(BASIC_GET, destGet); - channel.send( - new BasicGetBody(channel.version, 0, queue.getName(), ackMode)); - bool got = destGet.wait(msg); - return got; -} - -void BasicMessageChannel::publish( - const Message& msg, const Exchange& exchange, - const std::string& routingKey, bool mandatory, bool immediate) -{ - const string e = exchange.getName(); - string key = routingKey; - - // Make a header for the message - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - BasicHeaderProperties::copy( - *static_cast<BasicHeaderProperties*>(header->getProperties()), msg); - header->setContentSize(msg.getData().size()); - - channel.send( - new BasicPublishBody( - channel.version, 0, e, key, mandatory, immediate)); - channel.send(header); - string data = msg.getData(); - u_int64_t data_length = data.length(); - if(data_length > 0){ - //frame itself uses 8 bytes - u_int32_t frag_size = channel.connection->getMaxFrameSize() - 8; - if(data_length < frag_size){ - channel.send(new AMQContentBody(data)); - }else{ - u_int32_t offset = 0; - u_int32_t remaining = data_length - offset; - while (remaining > 0) { - u_int32_t length = remaining > frag_size ? frag_size : remaining; - string frag(data.substr(offset, length)); - channel.send(new AMQContentBody(frag)); - - offset += length; - remaining = data_length - offset; - } - } - } -} - -void BasicMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) { - assert(method->amqpClassId() ==BasicGetBody::CLASS_ID); - switch(method->amqpMethodId()) { - case BasicGetOkBody::METHOD_ID: { - incoming.openReference(BASIC_REF); - incoming.createMessage(BASIC_GET, BASIC_REF); - return; - } - case BasicGetEmptyBody::METHOD_ID: { - incoming.getDestination(BASIC_GET).empty(); - incoming.removeDestination(BASIC_GET); - return; - } - case BasicDeliverBody::METHOD_ID: { - BasicDeliverBody::shared_ptr deliver= - boost::shared_polymorphic_downcast<BasicDeliverBody>(method); - incoming.openReference(BASIC_REF); - Message& msg = incoming.createMessage( - deliver->getConsumerTag(), BASIC_REF); - msg.setDestination(deliver->getConsumerTag()); - msg.setDeliveryTag(deliver->getDeliveryTag()); - msg.setRedelivered(deliver->getRedelivered()); - return; - } - case BasicReturnBody::METHOD_ID: { - incoming.openReference(BASIC_REF); - incoming.createMessage(BASIC_RETURN, BASIC_REF); - return; - } - case BasicConsumeOkBody::METHOD_ID: { - Mutex::ScopedLock l(lock); - BasicConsumeOkBody::shared_ptr consumeOk = - boost::shared_polymorphic_downcast<BasicConsumeOkBody>(method); - std::string tag = consumeOk->getConsumerTag(); - ConsumerMap::iterator i = consumers.find(std::string()); - if (i != consumers.end()) { - // Need to rename the un-named consumer. - if (consumers.find(tag) == consumers.end()) { - consumers[tag] = i->second; - consumers.erase(i); - } - else // Tag already exists. - throw ChannelException(404, "Tag already exists: "+tag); - } - // FIXME aconway 2007-03-23: Integrate consumer & destination - // maps. - incoming.addDestination(tag, destDispatch); - return; - } - } - throw Channel::UnknownMethod(); -} - -void BasicMessageChannel::handle(AMQHeaderBody::shared_ptr header) { - BasicHeaderProperties* props = - boost::polymorphic_downcast<BasicHeaderProperties*>( - header->getProperties()); - IncomingMessage::Reference& ref = incoming.getReference(BASIC_REF); - assert (ref.messages.size() == 1); - ref.messages.front().BasicHeaderProperties::operator=(*props); - incoming_size = header->getContentSize(); - if (incoming_size==0) - incoming.closeReference(BASIC_REF); -} - -void BasicMessageChannel::handle(AMQContentBody::shared_ptr content){ - incoming.appendReference(BASIC_REF, content->getData()); - size_t size = incoming.getReference(BASIC_REF).data.size(); - if (size >= incoming_size) { - incoming.closeReference(BASIC_REF); - if (size > incoming_size) - throw ChannelException(502, "Content exceeded declared size"); - } -} - -void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){ - //record delivery tag: - consumer.lastDeliveryTag = msg.getDeliveryTag(); - - //allow registered listener to handle the message - consumer.listener->received(msg); - - if(channel.isOpen()){ - bool multiple(false); - switch(consumer.ackMode){ - case LAZY_ACK: - multiple = true; - if(++(consumer.count) < channel.getPrefetch()) - break; - //else drop-through - case AUTO_ACK: - consumer.lastDeliveryTag = 0; - channel.send( - new BasicAckBody( - channel.version, - msg.getDeliveryTag(), - multiple)); - case NO_ACK: // Nothing to do - case CLIENT_ACK: // User code must ack. - break; - // TODO aconway 2007-02-22: Provide a way for user - // to ack! - } - } - - //as it stands, transactionality is entirely orthogonal to ack - //mode, though the acks will not be processed by the broker under - //a transaction until it commits. -} - - -void BasicMessageChannel::run() { - while(channel.isOpen()) { - try { - Message msg; - bool gotMessge = destDispatch.wait(msg); - if (gotMessge) { - if(msg.getDestination() == BASIC_RETURN) { - ReturnedMessageHandler* handler=0; - { - Mutex::ScopedLock l(lock); - handler=returnsHandler; - } - if(handler != 0) - handler->returned(msg); - } - else { - Consumer consumer; - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find( - msg.getDestination()); - if(i == consumers.end()) - THROW_QPID_ERROR(PROTOCOL_ERROR+504, - "Unknown consumer tag=" + - msg.getDestination()); - consumer = i->second; - } - deliver(consumer, msg); - } - } - } - catch (const ShutdownException&) { - /* Orderly shutdown */ - } - catch (const Exception& e) { - // FIXME aconway 2007-02-20: Report exception to user. - cout << "client::BasicMessageChannel::run() terminated by: " - << e.toString() << endl; - } - } -} - -void BasicMessageChannel::setReturnedMessageHandler(ReturnedMessageHandler* handler){ - Mutex::ScopedLock l(lock); - returnsHandler = handler; -} - -void BasicMessageChannel::setQos(){ - channel.sendAndReceive<BasicQosOkBody>( - make_shared_ptr(new BasicQosBody(channel.version, 0, channel.getPrefetch(), false))); - if(channel.isTransactional()) - channel.sendAndReceive<TxSelectOkBody>(make_shared_ptr(new TxSelectBody(channel.version))); -} - -}} // namespace qpid::client diff --git a/qpid/cpp/src/client/BasicMessageChannel.h b/qpid/cpp/src/client/BasicMessageChannel.h deleted file mode 100644 index 13e1cf1e00..0000000000 --- a/qpid/cpp/src/client/BasicMessageChannel.h +++ /dev/null @@ -1,90 +0,0 @@ -#ifndef _client_BasicMessageChannel_h -#define _client_BasicMessageChannel_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "MessageChannel.h" -#include "IncomingMessage.h" -#include <boost/scoped_ptr.hpp> - -namespace qpid { -namespace client { -/** - * Messaging implementation using AMQP 0-8 BasicMessageChannel class - * to send and receiving messages. - */ -class BasicMessageChannel : public MessageChannel -{ - public: - BasicMessageChannel(Channel& parent); - - void consume( - Queue& queue, std::string& tag, MessageListener* listener, - AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, - const framing::FieldTable* fields = 0); - - void cancel(const std::string& tag, bool synch = true); - - bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK); - - void publish(const Message& msg, const Exchange& exchange, - const std::string& routingKey, - bool mandatory = false, bool immediate = false); - - void setReturnedMessageHandler(ReturnedMessageHandler* handler); - - void run(); - - void handle(boost::shared_ptr<framing::AMQMethodBody>); - - void handle(shared_ptr<framing::AMQHeaderBody>); - - void handle(shared_ptr<framing::AMQContentBody>); - - void setQos(); - - void close(); - - private: - - struct Consumer{ - MessageListener* listener; - AckMode ackMode; - int count; - u_int64_t lastDeliveryTag; - }; - typedef std::map<std::string, Consumer> ConsumerMap; - - void deliver(Consumer& consumer, Message& msg); - - sys::Mutex lock; - Channel& channel; - IncomingMessage incoming; - uint64_t incoming_size; - ConsumerMap consumers ; - ReturnedMessageHandler* returnsHandler; - IncomingMessage::WaitableDestination destGet; - IncomingMessage::WaitableDestination destDispatch; -}; - -}} // namespace qpid::client - - - -#endif /*!_client_BasicMessageChannel_h*/ diff --git a/qpid/cpp/src/client/ClientAdapter.cpp b/qpid/cpp/src/client/ClientAdapter.cpp deleted file mode 100644 index f5eb2f4536..0000000000 --- a/qpid/cpp/src/client/ClientAdapter.cpp +++ /dev/null @@ -1,70 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "../gen/AMQP_ClientOperations.h" -#include "ClientAdapter.h" -#include "Connection.h" -#include "../Exception.h" -#include "../framing/AMQMethodBody.h" - -namespace qpid { -namespace client { - -using namespace qpid; -using namespace qpid::framing; - -typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; - -void ClientAdapter::handleMethodInContext( - boost::shared_ptr<qpid::framing::AMQMethodBody> method, - const MethodContext& context -) -{ - try{ - method->invoke(*clientOps, context); - }catch(ChannelException& e){ - connection.client->getChannel().close( - context, e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - connection.closeChannel(getId()); - }catch(ConnectionException& e){ - connection.client->getConnection().close( - context, e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - }catch(std::exception& e){ - connection.client->getConnection().close( - context, 541/*internal error*/, e.what(), - method->amqpClassId(), method->amqpMethodId()); - } -} - -void ClientAdapter::handleHeader(AMQHeaderBody::shared_ptr body) { - channel->handleHeader(body); -} - -void ClientAdapter::handleContent(AMQContentBody::shared_ptr body) { - channel->handleContent(body); -} - -void ClientAdapter::handleHeartbeat(AMQHeartbeatBody::shared_ptr) { - // TODO aconway 2007-01-17: Implement heartbeats. -} - - - -}} // namespace qpid::client - diff --git a/qpid/cpp/src/client/ClientAdapter.h b/qpid/cpp/src/client/ClientAdapter.h deleted file mode 100644 index ca029a793f..0000000000 --- a/qpid/cpp/src/client/ClientAdapter.h +++ /dev/null @@ -1,66 +0,0 @@ -#ifndef _client_ClientAdapter_h -#define _client_ClientAdapter_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "../framing/ChannelAdapter.h" -#include "ClientChannel.h" - -namespace qpid { -namespace client { - -class AMQMethodBody; -class Connection; - -/** - * Per-channel protocol adapter. - * - * Translates protocol bodies into calls on the core Channel, - * Connection and Client objects. - * - * Owns a channel, has references to Connection and Client. - */ -class ClientAdapter : public framing::ChannelAdapter -{ - public: - ClientAdapter(std::auto_ptr<Channel> ch, Connection&, Client&); - Channel& getChannel() { return *channel; } - - void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>); - void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>); - void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>); - - private: - void handleMethodInContext( - boost::shared_ptr<qpid::framing::AMQMethodBody> method, - const framing::MethodContext& context); - - class ClientOps; - - std::auto_ptr<Channel> channel; - Connection& connection; - Client& client; - boost::shared_ptr<ClientOps> clientOps; -}; - -}} // namespace qpid::client - - - -#endif /*!_client_ClientAdapter_h*/ diff --git a/qpid/cpp/src/client/ClientChannel.cpp b/qpid/cpp/src/client/ClientChannel.cpp deleted file mode 100644 index 533b590010..0000000000 --- a/qpid/cpp/src/client/ClientChannel.cpp +++ /dev/null @@ -1,341 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include "ClientChannel.h" -#include "../sys/Monitor.h" -#include "ClientMessage.h" -#include "../QpidError.h" -#include "MethodBodyInstances.h" -#include "Connection.h" -#include "BasicMessageChannel.h" -#include "MessageMessageChannel.h" - -// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent -// handling of errors that should close the connection or the channel. -// Make sure the user thread receives a connection in each case. -// -using namespace std; -using namespace boost; -using namespace qpid::client; -using namespace qpid::framing; -using namespace qpid::sys; - -Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) : - connection(0), prefetch(_prefetch), transactional(_transactional) -{ - switch (mode) { - case AMQP_08: messaging.reset(new BasicMessageChannel(*this)); break; - case AMQP_09: messaging.reset(new MessageMessageChannel(*this)); break; - default: assert(0); QPID_ERROR(INTERNAL_ERROR, "Invalid interop-mode."); - } -} - -Channel::~Channel(){ - close(); -} - -void Channel::open(ChannelId id, Connection& con) -{ - if (isOpen()) - THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id); - connection = &con; - init(id, con, con.getVersion()); // ChannelAdapter initialization. - string oob; - if (id != 0) - sendAndReceive<ChannelOpenOkBody>(make_shared_ptr(new ChannelOpenBody(version, oob))); -} - -void Channel::protocolInit( - const std::string& uid, const std::string& pwd, const std::string& vhost) { - assert(connection); - responses.expect(); - connection->connector->init(); // Send ProtocolInit block. - ConnectionStartBody::shared_ptr connectionStart = - responses.receive<ConnectionStartBody>(); - - FieldTable props; - string mechanism("PLAIN"); - string response = ((char)0) + uid + ((char)0) + pwd; - string locale("en_US"); - ConnectionTuneBody::shared_ptr proposal = - sendAndReceive<ConnectionTuneBody>( - make_shared_ptr(new ConnectionStartOkBody( - version, connectionStart->getRequestId(), - props, mechanism, - response, locale))); - - /** - * Assume for now that further challenges will not be required - //receive connection.secure - responses.receive(connection_secure)); - //send connection.secure-ok - connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); - **/ - - send(new ConnectionTuneOkBody( - version, proposal->getRequestId(), - proposal->getChannelMax(), connection->getMaxFrameSize(), - proposal->getHeartbeat())); - - uint16_t heartbeat = proposal->getHeartbeat(); - connection->connector->setReadTimeout(heartbeat * 2); - connection->connector->setWriteTimeout(heartbeat); - - // Send connection open. - std::string capabilities; - responses.expect(); - send(new ConnectionOpenBody(version, vhost, capabilities, true)); - //receive connection.open-ok (or redirect, but ignore that for now - //esp. as using force=true). - AMQMethodBody::shared_ptr openResponse = responses.receive(); - if(openResponse->isA<ConnectionOpenOkBody>()) { - //ok - }else if(openResponse->isA<ConnectionRedirectBody>()){ - //ignore for now - ConnectionRedirectBody::shared_ptr redirect( - shared_polymorphic_downcast<ConnectionRedirectBody>(openResponse)); - cout << "Received redirection to " << redirect->getHost() - << endl; - } else { - THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response to Connection.open"); - } -} - -bool Channel::isOpen() const { return connection; } - -void Channel::setQos() { - messaging->setQos(); -} - -void Channel::setPrefetch(uint16_t _prefetch){ - prefetch = _prefetch; - setQos(); -} - -void Channel::declareExchange(Exchange& exchange, bool synch){ - string name = exchange.getName(); - string type = exchange.getType(); - FieldTable args; - sendAndReceiveSync<ExchangeDeclareOkBody>( - synch, - make_shared_ptr(new ExchangeDeclareBody( - version, 0, name, type, false, false, false, false, !synch, args))); -} - -void Channel::deleteExchange(Exchange& exchange, bool synch){ - string name = exchange.getName(); - sendAndReceiveSync<ExchangeDeleteOkBody>( - synch, - make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false, !synch))); -} - -void Channel::declareQueue(Queue& queue, bool synch){ - string name = queue.getName(); - FieldTable args; - QueueDeclareOkBody::shared_ptr response = - sendAndReceiveSync<QueueDeclareOkBody>( - synch, - make_shared_ptr(new QueueDeclareBody( - version, 0, name, false/*passive*/, queue.isDurable(), - queue.isExclusive(), queue.isAutoDelete(), !synch, args))); - if(synch) { - if(queue.getName().length() == 0) - queue.setName(response->getQueue()); - } -} - -void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){ - //ticket, queue, ifunused, ifempty, nowait - string name = queue.getName(); - sendAndReceiveSync<QueueDeleteOkBody>( - synch, - make_shared_ptr(new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch))); -} - -void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ - string e = exchange.getName(); - string q = queue.getName(); - sendAndReceiveSync<QueueBindOkBody>( - synch, - make_shared_ptr(new QueueBindBody(version, 0, q, e, key,!synch, args))); -} - -void Channel::commit(){ - sendAndReceive<TxCommitOkBody>(make_shared_ptr(new TxCommitBody(version))); -} - -void Channel::rollback(){ - sendAndReceive<TxRollbackOkBody>(make_shared_ptr(new TxRollbackBody(version))); -} - -void Channel::handleMethodInContext( - AMQMethodBody::shared_ptr method, const MethodContext&) -{ - // TODO aconway 2007-03-23: Special case for consume OK as it - // is both an expected response and needs handling in this thread. - // Need to review & reationalize the client-side processing model. - if (method->isA<BasicConsumeOkBody>()) { - messaging->handle(method); - responses.signalResponse(method); - return; - } - if(responses.isWaiting()) { - responses.signalResponse(method); - return; - } - try { - switch (method->amqpClassId()) { - case MessageOkBody::CLASS_ID: - case BasicGetOkBody::CLASS_ID: messaging->handle(method); break; - case ChannelCloseBody::CLASS_ID: handleChannel(method); break; - case ConnectionCloseBody::CLASS_ID: handleConnection(method); break; - default: throw UnknownMethod(); - } - } - catch (const UnknownMethod&) { - connection->close( - 504, "Unknown method", - method->amqpClassId(), method->amqpMethodId()); - } - } - -void Channel::handleChannel(AMQMethodBody::shared_ptr method) { - switch (method->amqpMethodId()) { - case ChannelCloseBody::METHOD_ID: - peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method)); - return; - case ChannelFlowBody::METHOD_ID: - // FIXME aconway 2007-02-22: Not yet implemented. - return; - } - throw UnknownMethod(); -} - -void Channel::handleConnection(AMQMethodBody::shared_ptr method) { - if (method->amqpMethodId() == ConnectionCloseBody::METHOD_ID) { - connection->close(); - return; - } - throw UnknownMethod(); -} - -void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ - messaging->handle(body); -} - -void Channel::handleContent(AMQContentBody::shared_ptr body){ - messaging->handle(body); -} - -void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat"); -} - -void Channel::start(){ - dispatcher = Thread(*messaging); -} - -// Close called by local application. -void Channel::close( - uint16_t code, const std::string& text, - ClassId classId, MethodId methodId) -{ - if (isOpen()) { - try { - if (getId() != 0) { - sendAndReceive<ChannelCloseOkBody>( - make_shared_ptr(new ChannelCloseBody( - version, code, text, classId, methodId))); - } - static_cast<ConnectionForChannel*>(connection)->erase(getId()); - closeInternal(); - } catch (...) { - static_cast<ConnectionForChannel*>(connection)->erase(getId()); - closeInternal(); - throw; - } - } -} - -// Channel closed by peer. -void Channel::peerClose(ChannelCloseBody::shared_ptr) { - assert(isOpen()); - closeInternal(); -} - -void Channel::closeInternal() { - if (isOpen()); - { - messaging->close(); - connection = 0; - // A 0 response means we are closed. - responses.signalResponse(AMQMethodBody::shared_ptr()); - } - dispatcher.join(); -} - -AMQMethodBody::shared_ptr Channel::sendAndReceive( - AMQMethodBody::shared_ptr toSend, ClassId c, MethodId m) -{ - responses.expect(); - send(toSend); - return responses.receive(c, m); -} - -AMQMethodBody::shared_ptr Channel::sendAndReceiveSync( - bool sync, AMQMethodBody::shared_ptr body, ClassId c, MethodId m) -{ - if(sync) - return sendAndReceive(body, c, m); - else { - send(body); - return AMQMethodBody::shared_ptr(); - } -} - -void Channel::consume( - Queue& queue, std::string& tag, MessageListener* listener, - AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { - messaging->consume(queue, tag, listener, ackMode, noLocal, synch, fields); -} - -void Channel::cancel(const std::string& tag, bool synch) { - messaging->cancel(tag, synch); -} - -bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { - return messaging->get(msg, queue, ackMode); -} - -void Channel::publish(const Message& msg, const Exchange& exchange, - const std::string& routingKey, - bool mandatory, bool immediate) { - messaging->publish(msg, exchange, routingKey, mandatory, immediate); -} - -void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler) { - messaging->setReturnedMessageHandler(handler); -} - -void Channel::run() { - messaging->run(); -} - diff --git a/qpid/cpp/src/client/ClientChannel.h b/qpid/cpp/src/client/ClientChannel.h deleted file mode 100644 index 328fc23f68..0000000000 --- a/qpid/cpp/src/client/ClientChannel.h +++ /dev/null @@ -1,356 +0,0 @@ -#ifndef _client_ClientChannel_h -#define _client_ClientChannel_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <boost/scoped_ptr.hpp> -#include "../framing/amqp_framing.h" -#include "ClientExchange.h" -#include "ClientMessage.h" -#include "ClientQueue.h" -#include "ResponseHandler.h" -#include "../framing/ChannelAdapter.h" -#include "../sys/Thread.h" -#include "AckMode.h" - -namespace qpid { - -namespace framing { -class ChannelCloseBody; -class AMQMethodBody; -} - -namespace client { - -class Connection; -class MessageChannel; -class MessageListener; -class ReturnedMessageHandler; - -/** - * Represents an AMQP channel, i.e. loosely a session of work. It - * is through a channel that most of the AMQP 'methods' are - * exposed. - * - * \ingroup clientapi - */ -class Channel : public framing::ChannelAdapter -{ - private: - struct UnknownMethod {}; - typedef shared_ptr<framing::AMQMethodBody> MethodPtr; - - sys::Mutex lock; - boost::scoped_ptr<MessageChannel> messaging; - Connection* connection; - sys::Thread dispatcher; - ResponseHandler responses; - - uint16_t prefetch; - const bool transactional; - framing::ProtocolVersion version; - - void handleHeader(framing::AMQHeaderBody::shared_ptr body); - void handleContent(framing::AMQContentBody::shared_ptr body); - void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body); - void handleMethodInContext( - framing::AMQMethodBody::shared_ptr, const framing::MethodContext&); - void handleChannel(framing::AMQMethodBody::shared_ptr method); - void handleConnection(framing::AMQMethodBody::shared_ptr method); - - void setQos(); - - void protocolInit( - const std::string& uid, const std::string& pwd, - const std::string& vhost); - - framing::AMQMethodBody::shared_ptr sendAndReceive( - framing::AMQMethodBody::shared_ptr, - framing::ClassId, framing::MethodId); - - framing::AMQMethodBody::shared_ptr sendAndReceiveSync( - bool sync, - framing::AMQMethodBody::shared_ptr, - framing::ClassId, framing::MethodId); - - template <class BodyType> - boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody::shared_ptr body) { - return boost::shared_polymorphic_downcast<BodyType>( - sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID)); - } - - template <class BodyType> - boost::shared_ptr<BodyType> sendAndReceiveSync( - bool sync, framing::AMQMethodBody::shared_ptr body) { - return boost::shared_polymorphic_downcast<BodyType>( - sendAndReceiveSync( - sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID)); - } - - void open(framing::ChannelId, Connection&); - void closeInternal(); - void peerClose(boost::shared_ptr<framing::ChannelCloseBody>); - - // FIXME aconway 2007-02-23: Get rid of friendships. - friend class Connection; - friend class BasicMessageChannel; // for sendAndReceive. - friend class MessageMessageChannel; // for sendAndReceive. - - public: - enum InteropMode { AMQP_08, AMQP_09 }; - - /** - * Creates a channel object. - * - * @param transactional if true, the publishing and acknowledgement - * of messages will be transactional and can be committed or - * aborted in atomic units (@see commit(), @see rollback()) - * - * @param prefetch specifies the number of unacknowledged - * messages the channel is willing to have sent to it - * asynchronously - * - * @param messageImpl Alternate messaging implementation class to - * allow alternate protocol implementations of messaging - * operations. Takes ownership. - */ - Channel( - bool transactional = false, u_int16_t prefetch = 500, - InteropMode=AMQP_08); - - ~Channel(); - - /** - * Declares an exchange. - * - * In AMQP Exchanges are the destinations to which messages - * are published. They have Queues bound to them and route - * messages they receive to those queues. The routing rules - * depend on the type of the exchange. - * - * @param exchange an Exchange object representing the - * exchange to declare - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void declareExchange(Exchange& exchange, bool synch = true); - /** - * Deletes an exchange - * - * @param exchange an Exchange object representing the exchange to delete - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void deleteExchange(Exchange& exchange, bool synch = true); - /** - * Declares a Queue - * - * @param queue a Queue object representing the queue to declare - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void declareQueue(Queue& queue, bool synch = true); - /** - * Deletes a Queue - * - * @param queue a Queue object representing the queue to delete - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true); - /** - * Binds a queue to an exchange. The exact semantics of this - * (in particular how 'routing keys' and 'binding arguments' - * are used) depends on the type of the exchange. - * - * @param exchange an Exchange object representing the - * exchange to bind to - * - * @param queue a Queue object representing the queue to be - * bound - * - * @param key the 'routing key' for the binding - * - * @param args the 'binding arguments' for the binding - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void bind(const Exchange& exchange, const Queue& queue, - const std::string& key, const framing::FieldTable& args, - bool synch = true); - - /** - * For a transactional channel this will commit all - * publications and acknowledgements since the last commit (or - * the channel was opened if there has been no previous - * commit). This will cause published messages to become - * available to consumers and acknowledged messages to be - * consumed and removed from the queues they were dispatched - * from. - * - * Transactionailty of a channel is specified when the channel - * object is created (@see Channel()). - */ - void commit(); - - /** - * For a transactional channel, this will rollback any - * publications or acknowledgements. It will be as if the - * ppblished messages were never sent and the acknowledged - * messages were never consumed. - */ - void rollback(); - - /** - * Change the prefetch in use. - */ - void setPrefetch(uint16_t prefetch); - - uint16_t getPrefetch() { return prefetch; } - - /** - * Start message dispatching on a new thread - */ - void start(); - - /** - * Close the channel with optional error information. - * Closing a channel that is not open has no effect. - */ - void close( - framing::ReplyCode = 200, const std::string& ="OK", - framing::ClassId = 0, framing::MethodId = 0); - - /** True if the channel is transactional */ - bool isTransactional() { return transactional; } - - /** True if the channel is open */ - bool isOpen() const; - - /** Get the connection associated with this channel */ - Connection& getConnection() { return *connection; } - - /** Return the protocol version */ - framing::ProtocolVersion getVersion() const { return version ; } - - /** - * Creates a 'consumer' for a queue. Messages in (or arriving - * at) that queue will be delivered to consumers - * asynchronously. - * - * @param queue a Queue instance representing the queue to - * consume from - * - * @param tag an identifier to associate with the consumer - * that can be used to cancel its subscription (if empty, this - * will be assigned by the broker) - * - * @param listener a pointer to an instance of an - * implementation of the MessageListener interface. Messages - * received from this queue for this consumer will result in - * invocation of the received() method on the listener, with - * the message itself passed in. - * - * @param ackMode the mode of acknowledgement that the broker - * should assume for this consumer. @see AckMode - * - * @param noLocal if true, this consumer will not be sent any - * message published by this connection - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void consume( - Queue& queue, std::string& tag, MessageListener* listener, - AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, - const framing::FieldTable* fields = 0); - - /** - * Cancels a subscription previously set up through a call to consume(). - * - * @param tag the identifier used (or assigned) in the consume - * request that set up the subscription to be cancelled. - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void cancel(const std::string& tag, bool synch = true); - /** - * Synchronous pull of a message from a queue. - * - * @param msg a message object that will contain the message - * headers and content if the call completes. - * - * @param queue the queue to consume from - * - * @param ackMode the acknowledgement mode to use (@see - * AckMode) - * - * @return true if a message was succcessfully dequeued from - * the queue, false if the queue was empty. - */ - bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK); - - /** - * Publishes (i.e. sends a message to the broker). - * - * @param msg the message to publish - * - * @param exchange the exchange to publish the message to - * - * @param routingKey the routing key to publish with - * - * @param mandatory if true and the exchange to which this - * publish is directed has no matching bindings, the message - * will be returned (see setReturnedMessageHandler()). - * - * @param immediate if true and there is no consumer to - * receive this message on publication, the message will be - * returned (see setReturnedMessageHandler()). - */ - void publish(const Message& msg, const Exchange& exchange, - const std::string& routingKey, - bool mandatory = false, bool immediate = false); - - /** - * Set a handler for this channel that will process any - * returned messages - * - * @see publish() - */ - void setReturnedMessageHandler(ReturnedMessageHandler* handler); - - /** - * Deliver messages from the broker to the appropriate MessageListener. - */ - void run(); - - -}; - -}} - -#endif /*!_client_ClientChannel_h*/ diff --git a/qpid/cpp/src/client/ClientConnection.cpp b/qpid/cpp/src/client/ClientConnection.cpp deleted file mode 100644 index b053a45b0f..0000000000 --- a/qpid/cpp/src/client/ClientConnection.cpp +++ /dev/null @@ -1,156 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <algorithm> -#include <boost/format.hpp> -#include <boost/bind.hpp> - -#include "Connection.h" -#include "ClientChannel.h" -#include "ClientMessage.h" -#include "../QpidError.h" -#include <iostream> -#include <sstream> -#include "MethodBodyInstances.h" -#include <functional> - -using namespace qpid::framing; -using namespace qpid::sys; - - -namespace qpid { -namespace client { - -const std::string Connection::OK("OK"); - -Connection::Connection( - bool _debug, uint32_t _max_frame_size, - framing::ProtocolVersion _version -) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size), - defaultConnector(version, _debug, _max_frame_size), - isOpen(false), debug(_debug) -{ - setConnector(defaultConnector); -} - -Connection::~Connection(){} - -void Connection::setConnector(Connector& con) -{ - connector = &con; - connector->setInputHandler(this); - connector->setTimeoutHandler(this); - connector->setShutdownHandler(this); - out = connector->getOutputHandler(); -} - -void Connection::open( - const std::string& host, int port, - const std::string& uid, const std::string& pwd, const std::string& vhost) -{ - if (isOpen) - THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open"); - connector->connect(host, port); - channels[0] = &channel0; - channel0.open(0, *this); - channel0.protocolInit(uid, pwd, vhost); - isOpen = true; -} - -void Connection::shutdown() { - close(); -} - -void Connection::close( - ReplyCode code, const string& msg, ClassId classId, MethodId methodId -) -{ - if(isOpen) { - // TODO aconway 2007-01-29: Exception handling - could end up - // partly closed with threads left unjoined. - isOpen = false; - channel0.sendAndReceive<ConnectionCloseOkBody>( - make_shared_ptr(new ConnectionCloseBody( - getVersion(), code, msg, classId, methodId))); - - using boost::bind; - for_each(channels.begin(), channels.end(), - bind(&Channel::closeInternal, - bind(&ChannelMap::value_type::second, _1))); - channels.clear(); - connector->close(); - } -} - -void Connection::openChannel(Channel& channel) { - ChannelId id = ++channelIdCounter; - assert (channels.find(id) == channels.end()); - assert(out); - channels[id] = &channel; - channel.open(id, *this); -} - -void Connection::erase(ChannelId id) { - channels.erase(id); -} - -void Connection::received(AMQFrame* frame){ - // FIXME aconway 2007-01-25: Mutex - ChannelId id = frame->getChannel(); - Channel* channel = channels[id]; - // FIXME aconway 2007-01-26: Exception thrown here is hanging the - // client. Need to review use of exceptions. - if (channel == 0) - THROW_QPID_ERROR( - PROTOCOL_ERROR+504, - (boost::format("Invalid channel number %g") % id).str()); - try{ - channel->handleBody(frame->getBody()); - }catch(const qpid::QpidError& e){ - channelException( - *channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e); - } -} - -void Connection::send(AMQFrame* frame) { - out->send(frame); -} - -void Connection::channelException( - Channel& channel, AMQMethodBody* method, const QpidError& e) -{ - int code = (e.code >= PROTOCOL_ERROR) ? e.code - PROTOCOL_ERROR : 500; - string msg = e.msg; - if(method == 0) - channel.close(code, msg); - else - channel.close( - code, msg, method->amqpClassId(), method->amqpMethodId()); -} - -void Connection::idleIn(){ - connector->close(); -} - -void Connection::idleOut(){ - out->send(new AMQFrame(version, 0, new AMQHeartbeatBody())); -} - -}} // namespace qpid::client diff --git a/qpid/cpp/src/client/ClientExchange.cpp b/qpid/cpp/src/client/ClientExchange.cpp deleted file mode 100644 index d5914beea2..0000000000 --- a/qpid/cpp/src/client/ClientExchange.cpp +++ /dev/null @@ -1,34 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "ClientExchange.h" - -qpid::client::Exchange::Exchange(std::string _name, std::string _type) : name(_name), type(_type){} -const std::string& qpid::client::Exchange::getName() const { return name; } -const std::string& qpid::client::Exchange::getType() const { return type; } - -const std::string qpid::client::Exchange::DIRECT_EXCHANGE = "direct"; -const std::string qpid::client::Exchange::TOPIC_EXCHANGE = "topic"; -const std::string qpid::client::Exchange::HEADERS_EXCHANGE = "headers"; - -const qpid::client::Exchange qpid::client::Exchange::DEFAULT_EXCHANGE("", DIRECT_EXCHANGE); -const qpid::client::Exchange qpid::client::Exchange::STANDARD_DIRECT_EXCHANGE("amq.direct", DIRECT_EXCHANGE); -const qpid::client::Exchange qpid::client::Exchange::STANDARD_TOPIC_EXCHANGE("amq.topic", TOPIC_EXCHANGE); -const qpid::client::Exchange qpid::client::Exchange::STANDARD_HEADERS_EXCHANGE("amq.headers", HEADERS_EXCHANGE); diff --git a/qpid/cpp/src/client/ClientExchange.h b/qpid/cpp/src/client/ClientExchange.h deleted file mode 100644 index a8ac21fa9b..0000000000 --- a/qpid/cpp/src/client/ClientExchange.h +++ /dev/null @@ -1,106 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> - -#ifndef _Exchange_ -#define _Exchange_ - -namespace qpid { -namespace client { - - /** - * A 'handle' used to represent an AMQP exchange in the Channel - * methods. Exchanges are the destinations to which messages are - * published. - * - * There are different types of exchange (the standard types are - * available as static constants, see DIRECT_EXCHANGE, - * TOPIC_EXCHANGE and HEADERS_EXCHANGE). A Queue can be bound to - * an exchange using Channel::bind() and messages published to - * that exchange are then routed to the queue based on the details - * of the binding and the type of exchange. - * - * There are some standard exchange instances that are predeclared - * on all AMQP brokers. These are defined as static members - * STANDARD_DIRECT_EXCHANGE, STANDARD_TOPIC_EXCHANGE and - * STANDARD_HEADERS_EXCHANGE. There is also the 'default' exchange - * (member DEFAULT_EXCHANGE) which is nameless and of type - * 'direct' and has every declared queue bound to it by queue - * name. - * - * \ingroup clientapi - */ - class Exchange{ - const std::string name; - const std::string type; - - public: - /** - * A direct exchange routes messages published with routing - * key X to any queue bound with key X (i.e. an exact match is - * used). - */ - static const std::string DIRECT_EXCHANGE; - /** - * A topic exchange treat the key with which a queue is bound - * as a pattern and routes all messages whose routing keys - * match that pattern to the bound queue. The routing key for - * a message must consist of zero or more alpha-numeric words - * delimited by dots. The pattern is of a similar form but * - * can be used to match excatly one word and # can be used to - * match zero or more words. - */ - static const std::string TOPIC_EXCHANGE; - /** - * The headers exchange routes messages based on whether their - * headers match the binding arguments specified when - * binding. (see the AMQP spec for more details). - */ - static const std::string HEADERS_EXCHANGE; - - /** - * The 'default' exchange, nameless and of type 'direct'. Has - * every declared queue bound to it by name. - */ - static const Exchange DEFAULT_EXCHANGE; - /** - * The standard direct exchange, named amq.direct. - */ - static const Exchange STANDARD_DIRECT_EXCHANGE; - /** - * The standard topic exchange, named amq.topic. - */ - static const Exchange STANDARD_TOPIC_EXCHANGE; - /** - * The standard headers exchange, named amq.header. - */ - static const Exchange STANDARD_HEADERS_EXCHANGE; - - Exchange(std::string name, std::string type = DIRECT_EXCHANGE); - const std::string& getName() const; - const std::string& getType() const; - }; - -} -} - - -#endif diff --git a/qpid/cpp/src/client/ClientMessage.h b/qpid/cpp/src/client/ClientMessage.h deleted file mode 100644 index 35aed6c734..0000000000 --- a/qpid/cpp/src/client/ClientMessage.h +++ /dev/null @@ -1,64 +0,0 @@ -#ifndef _client_ClientMessage_h -#define _client_ClientMessage_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> -#include "../framing/BasicHeaderProperties.h" - -namespace qpid { -namespace client { - -/** - * A representation of messages for sent or recived through the - * client api. - * - * \ingroup clientapi - */ -// FIXME aconway 2007-04-05: Should be based on MessageTransfer properties not -// basic header properties. -class Message : public framing::BasicHeaderProperties { - public: - Message(const std::string& data_=std::string()) : data(data_) {} - - std::string getData() const { return data; } - void setData(const std::string& _data) { data = _data; } - - std::string getDestination() const { return destination; } - void setDestination(const std::string& dest) { destination = dest; } - - // TODO aconway 2007-03-22: only needed for Basic.deliver support. - uint64_t getDeliveryTag() const { return deliveryTag; } - void setDeliveryTag(uint64_t dt) { deliveryTag = dt; } - - bool isRedelivered() const { return redelivered; } - void setRedelivered(bool _redelivered){ redelivered = _redelivered; } - - private: - std::string data; - std::string destination; - bool redelivered; - uint64_t deliveryTag; -}; - -}} - -#endif /*!_client_ClientMessage_h*/ diff --git a/qpid/cpp/src/client/ClientQueue.cpp b/qpid/cpp/src/client/ClientQueue.cpp deleted file mode 100644 index 613cf8d288..0000000000 --- a/qpid/cpp/src/client/ClientQueue.cpp +++ /dev/null @@ -1,58 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "ClientQueue.h" - -qpid::client::Queue::Queue() : name(""), autodelete(true), exclusive(true), durable(false){} - -qpid::client::Queue::Queue(std::string _name) : name(_name), autodelete(false), exclusive(false), durable(false){} - -qpid::client::Queue::Queue(std::string _name, bool temp) : name(_name), autodelete(temp), exclusive(temp), durable(false){} - -qpid::client::Queue::Queue(std::string _name, bool _autodelete, bool _exclusive, bool _durable) - : name(_name), autodelete(_autodelete), exclusive(_exclusive), durable(_durable){} - -const std::string& qpid::client::Queue::getName() const{ - return name; -} - -void qpid::client::Queue::setName(const std::string& _name){ - name = _name; -} - -bool qpid::client::Queue::isAutoDelete() const{ - return autodelete; -} - -bool qpid::client::Queue::isExclusive() const{ - return exclusive; -} - -bool qpid::client::Queue::isDurable() const{ - return durable; -} - -void qpid::client::Queue::setDurable(bool _durable){ - durable = _durable; -} - - - - diff --git a/qpid/cpp/src/client/ClientQueue.h b/qpid/cpp/src/client/ClientQueue.h deleted file mode 100644 index b37a44b004..0000000000 --- a/qpid/cpp/src/client/ClientQueue.h +++ /dev/null @@ -1,103 +0,0 @@ -#ifndef _client_ClientQueue_h -#define _client_ClientQueue_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> - -namespace qpid { -namespace client { - - /** - * A 'handle' used to represent an AMQP queue in the Channel - * methods. Creating an instance of this class does not cause the - * queue to be created on the broker. Rather, an instance of this - * class should be passed to Channel::declareQueue() to ensure - * that the queue exists or is created. - * - * Queues hold messages and allow clients to consume - * (see Channel::consume()) or get (see Channel::get()) those messags. A - * queue receives messages by being bound to one or more Exchange; - * messages published to that exchange may then be routed to the - * queue based on the details of the binding and the type of the - * exchange (see Channel::bind()). - * - * Queues are identified by a name. They can be exclusive (in which - * case they can only be used in the context of the connection - * over which they were declared, and are deleted when then - * connection closes), or they can be shared. Shared queues can be - * auto deleted when they have no consumers. - * - * We use the term 'temporary queue' to refer to an exclusive - * queue. - * - * \ingroup clientapi - */ - class Queue{ - std::string name; - const bool autodelete; - const bool exclusive; - bool durable; - - public: - - /** - * Creates an unnamed, non-durable, temporary queue. A name - * will be assigned to this queue instance by a call to - * Channel::declareQueue(). - */ - Queue(); - /** - * Creates a shared, non-durable, queue with a given name, - * that will not be autodeleted. - * - * @param name the name of the queue - */ - Queue(std::string name); - /** - * Creates a non-durable queue with a given name. - * - * @param name the name of the queue - * - * @param temp if true the queue will be a temporary queue, if - * false it will be shared and not autodeleted. - */ - Queue(std::string name, bool temp); - /** - * This constructor allows the autodelete, exclusive and - * durable propeties to be explictly set. Note however that if - * exclusive is true, autodelete has no meaning as exclusive - * queues are always destroyed when the connection that - * created them is closed. - */ - Queue(std::string name, bool autodelete, bool exclusive, bool durable); - const std::string& getName() const; - void setName(const std::string&); - bool isAutoDelete() const; - bool isExclusive() const; - bool isDurable() const; - void setDurable(bool durable); - }; - -} -} - -#endif /*!_client_ClientQueue_h*/ diff --git a/qpid/cpp/src/client/Connection.h b/qpid/cpp/src/client/Connection.h deleted file mode 100644 index 5e0b413dac..0000000000 --- a/qpid/cpp/src/client/Connection.h +++ /dev/null @@ -1,179 +0,0 @@ -#ifndef _client_Connection_ -#define _client_Connection_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <map> -#include <string> -#include "../QpidError.h" -#include "ClientChannel.h" -#include "Connector.h" -#include "../sys/ShutdownHandler.h" -#include "../sys/TimeoutHandler.h" - - -namespace qpid { - -/** - * The client namespace contains all classes that make up a client - * implementation of the AMQP protocol. The key classes that form - * the basis of the client API to be used by applications are - * Connection and Channel. - */ -namespace client { - -/** - * \internal provide access to selected private channel functions - * for the Connection without making it a friend of the entire channel. - */ -class ConnectionForChannel : - public framing::InputHandler, - public framing::OutputHandler, - public sys::TimeoutHandler, - public sys::ShutdownHandler - -{ - private: - friend class Channel; - virtual void erase(framing::ChannelId) = 0; -}; - - -/** - * \defgroup clientapi Application API for an AMQP client - */ - -/** - * Represents a connection to an AMQP broker. All communication is - * initiated by establishing a connection, then opening one or - * more Channels over that connection. - * - * \ingroup clientapi - */ -class Connection : public ConnectionForChannel -{ - typedef std::map<framing::ChannelId, Channel*> ChannelMap; - - framing::ChannelId channelIdCounter; - static const std::string OK; - - framing::ProtocolVersion version; - const uint32_t max_frame_size; - ChannelMap channels; - Connector defaultConnector; - Connector* connector; - framing::OutputHandler* out; - volatile bool isOpen; - Channel channel0; - bool debug; - - void erase(framing::ChannelId); - void channelException( - Channel&, framing::AMQMethodBody*, const QpidError&); - - // TODO aconway 2007-01-26: too many friendships, untagle these classes. - friend class Channel; - - public: - /** - * Creates a connection object, but does not open the - * connection. - * - * @param _version the version of the protocol to connect with - * - * @param debug turns on tracing for the connection - * (i.e. prints details of the frames sent and received to std - * out). Optional and defaults to false. - * - * @param max_frame_size the maximum frame size that the - * client will accept. Optional and defaults to 65536. - */ - Connection(bool debug = false, uint32_t max_frame_size = 65536, - framing::ProtocolVersion=framing::highestProtocolVersion); - ~Connection(); - - /** - * Opens a connection to a broker. - * - * @param host the host on which the broker is running - * - * @param port the port on the which the broker is listening - * - * @param uid the userid to connect with - * - * @param pwd the password to connect with (currently SASL - * PLAIN is the only authentication method supported so this - * is sent in clear text) - * - * @param virtualhost the AMQP virtual host to use (virtual - * hosts, where implemented(!), provide namespace partitioning - * within a single broker). - */ - void open(const std::string& host, int port = 5672, - const std::string& uid = "guest", - const std::string& pwd = "guest", - const std::string& virtualhost = "/"); - - /** - * Close the connection with optional error information for the peer. - * - * Any further use of this connection (without reopening it) will - * not succeed. - */ - void close(framing::ReplyCode=200, const std::string& msg=OK, - framing::ClassId = 0, framing::MethodId = 0); - - /** - * Associate a Channel with this connection and open it for use. - * - * In AMQP channels are like multi-plexed 'sessions' of work over - * a connection. Almost all the interaction with AMQP is done over - * a channel. - * - * @param connection the connection object to be associated with - * the channel. Call Channel::close() to close the channel. - */ - void openChannel(Channel&); - - - // TODO aconway 2007-01-26: can these be private? - void send(framing::AMQFrame*); - void received(framing::AMQFrame*); - void idleOut(); - void idleIn(); - void shutdown(); - - /**\internal used for testing */ - void setConnector(Connector& connector); - - /** - * @return the maximum frame size in use on this connection - */ - inline uint32_t getMaxFrameSize(){ return max_frame_size; } - - /** @return protocol version in use on this connection. */ - framing::ProtocolVersion getVersion() const { return version; } -}; - -}} // namespace qpid::client - - -#endif diff --git a/qpid/cpp/src/client/Connector.cpp b/qpid/cpp/src/client/Connector.cpp deleted file mode 100644 index 566e58ec13..0000000000 --- a/qpid/cpp/src/client/Connector.cpp +++ /dev/null @@ -1,188 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include "../QpidError.h" -#include "../sys/Time.h" -#include "Connector.h" - -namespace qpid { -namespace client { - -using namespace qpid::sys; -using namespace qpid::framing; -using qpid::QpidError; - -Connector::Connector( - ProtocolVersion ver, bool _debug, uint32_t buffer_size -) : debug(_debug), - receive_buffer_size(buffer_size), - send_buffer_size(buffer_size), - version(ver), - closed(true), - lastIn(0), lastOut(0), - timeout(0), - idleIn(0), idleOut(0), - timeoutHandler(0), - shutdownHandler(0), - inbuf(receive_buffer_size), - outbuf(send_buffer_size) -{ } - -Connector::~Connector(){ } - -void Connector::connect(const std::string& host, int port){ - socket = Socket::createTcp(); - socket.connect(host, port); - closed = false; - receiver = Thread(this); -} - -void Connector::init(){ - ProtocolInitiation init(version); - writeBlock(&init); -} - -void Connector::close(){ - closed = true; - socket.close(); - receiver.join(); -} - -void Connector::setInputHandler(InputHandler* handler){ - input = handler; -} - -void Connector::setShutdownHandler(ShutdownHandler* handler){ - shutdownHandler = handler; -} - -OutputHandler* Connector::getOutputHandler(){ - return this; -} - -void Connector::send(AMQFrame* f){ - std::auto_ptr<AMQFrame> frame(f); - AMQBody::shared_ptr body = frame->getBody(); - writeBlock(frame.get()); - if(debug) std::cout << "SENT: " << *frame << std::endl; -} - -void Connector::writeBlock(AMQDataBlock* data){ - Mutex::ScopedLock l(writeLock); - data->encode(outbuf); - //transfer data to wire - outbuf.flip(); - writeToSocket(outbuf.start(), outbuf.available()); - outbuf.clear(); -} - -void Connector::writeToSocket(char* data, size_t available){ - size_t written = 0; - while(written < available && !closed){ - ssize_t sent = socket.send(data + written, available-written); - if(sent > 0) { - lastOut = now() * TIME_MSEC; - written += sent; - } - } -} - -void Connector::handleClosed(){ - closed = true; - socket.close(); - if(shutdownHandler) shutdownHandler->shutdown(); -} - -void Connector::checkIdle(ssize_t status){ - if(timeoutHandler){ - Time t = now() * TIME_MSEC; - if(status == Socket::SOCKET_TIMEOUT) { - if(idleIn && (t - lastIn > idleIn)){ - timeoutHandler->idleIn(); - } - } - else if(status == 0 || status == Socket::SOCKET_EOF) { - handleClosed(); - } - else { - lastIn = t; - } - if(idleOut && (t - lastOut > idleOut)){ - timeoutHandler->idleOut(); - } - } -} - -void Connector::setReadTimeout(uint16_t t){ - idleIn = t * 1000;//t is in secs - if(idleIn && (!timeout || idleIn < timeout)){ - timeout = idleIn; - setSocketTimeout(); - } - -} - -void Connector::setWriteTimeout(uint16_t t){ - idleOut = t * 1000;//t is in secs - if(idleOut && (!timeout || idleOut < timeout)){ - timeout = idleOut; - setSocketTimeout(); - } -} - -void Connector::setSocketTimeout(){ - socket.setTimeout(timeout*TIME_MSEC); -} - -void Connector::setTimeoutHandler(TimeoutHandler* handler){ - timeoutHandler = handler; -} - -void Connector::run(){ - try{ - while(!closed){ - ssize_t available = inbuf.available(); - if(available < 1){ - THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); - } - ssize_t received = socket.recv(inbuf.start(), available); - checkIdle(received); - - if(!closed && received > 0){ - inbuf.move(received); - inbuf.flip();//position = 0, limit = total data read - - AMQFrame frame(version); - while(frame.decode(inbuf)){ - if(debug) std::cout << "RECV: " << frame << std::endl; - input->received(&frame); - } - //need to compact buffer to preserve any 'extra' data - inbuf.compact(); - } - } - } catch (const std::exception& e) { - std::cout << e.what() << std::endl; - handleClosed(); - } -} - -}} // namespace qpid::client diff --git a/qpid/cpp/src/client/Connector.h b/qpid/cpp/src/client/Connector.h deleted file mode 100644 index 928bfa2d14..0000000000 --- a/qpid/cpp/src/client/Connector.h +++ /dev/null @@ -1,98 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _Connector_ -#define _Connector_ - - -#include "../framing/InputHandler.h" -#include "../framing/OutputHandler.h" -#include "../framing/InitiationHandler.h" -#include "../framing/ProtocolInitiation.h" -#include "../framing/ProtocolVersion.h" -#include "../sys/ShutdownHandler.h" -#include "../sys/TimeoutHandler.h" -#include "../sys/Thread.h" -#include "../sys/Monitor.h" -#include "../sys/Socket.h" - -namespace qpid { - -namespace client { - -class Connector : public framing::OutputHandler, - private sys::Runnable -{ - const bool debug; - const int receive_buffer_size; - const int send_buffer_size; - framing::ProtocolVersion version; - - bool closed; - - int64_t lastIn; - int64_t lastOut; - int64_t timeout; - uint32_t idleIn; - uint32_t idleOut; - - sys::TimeoutHandler* timeoutHandler; - sys::ShutdownHandler* shutdownHandler; - framing::InputHandler* input; - framing::InitiationHandler* initialiser; - framing::OutputHandler* output; - - framing::Buffer inbuf; - framing::Buffer outbuf; - - sys::Mutex writeLock; - sys::Thread receiver; - - sys::Socket socket; - - void checkIdle(ssize_t status); - void writeBlock(framing::AMQDataBlock* data); - void writeToSocket(char* data, size_t available); - void setSocketTimeout(); - - void run(); - void handleClosed(); - - friend class Channel; - public: - Connector(framing::ProtocolVersion pVersion, - bool debug = false, uint32_t buffer_size = 1024); - virtual ~Connector(); - virtual void connect(const std::string& host, int port); - virtual void init(); - virtual void close(); - virtual void setInputHandler(framing::InputHandler* handler); - virtual void setTimeoutHandler(sys::TimeoutHandler* handler); - virtual void setShutdownHandler(sys::ShutdownHandler* handler); - virtual framing::OutputHandler* getOutputHandler(); - virtual void send(framing::AMQFrame* frame); - virtual void setReadTimeout(uint16_t timeout); - virtual void setWriteTimeout(uint16_t timeout); -}; - -}} - - -#endif diff --git a/qpid/cpp/src/client/IncomingMessage.cpp b/qpid/cpp/src/client/IncomingMessage.cpp deleted file mode 100644 index eb5f2b6fae..0000000000 --- a/qpid/cpp/src/client/IncomingMessage.cpp +++ /dev/null @@ -1,168 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "IncomingMessage.h" -#include "../Exception.h" -#include "ClientMessage.h" -#include <boost/format.hpp> - -namespace qpid { -namespace client { - -using boost::format; -using sys::Mutex; - -IncomingMessage::Destination::~Destination() {} - - -IncomingMessage::WaitableDestination::WaitableDestination() - : shutdownFlag(false) {} - -void IncomingMessage::WaitableDestination::message(const Message& msg) { - Mutex::ScopedLock l(monitor); - queue.push(msg); - monitor.notify(); -} - -void IncomingMessage::WaitableDestination::empty() { - Mutex::ScopedLock l(monitor); - queue.push(Empty()); - monitor.notify(); -} - -bool IncomingMessage::WaitableDestination::wait(Message& msgOut) { - Mutex::ScopedLock l(monitor); - while (queue.empty() && !shutdownFlag) - monitor.wait(); - if (shutdownFlag) - return false; - Message* msg = boost::get<Message>(&queue.front()); - bool success = msg; - if (success) - msgOut=*msg; - queue.pop(); - if (!queue.empty()) - monitor.notify(); // Wake another waiter. - return success; -} - -void IncomingMessage::WaitableDestination::shutdown() { - Mutex::ScopedLock l(monitor); - shutdownFlag = true; - monitor.notifyAll(); -} - -void IncomingMessage::openReference(const std::string& name) { - Mutex::ScopedLock l(lock); - if (references.find(name) != references.end()) - throw ConnectionException( - 503, format("Attempt to open existing reference %s.") % name); - references[name]; - return; -} - -void IncomingMessage::appendReference( - const std::string& name, const std::string& data) -{ - Mutex::ScopedLock l(lock); - getRefUnlocked(name).data += data; -} - -Message& IncomingMessage::createMessage( - const std::string& destination, const std::string& reference) -{ - Mutex::ScopedLock l(lock); - getDestUnlocked(destination); // Verify destination. - Reference& ref = getRefUnlocked(reference); - ref.messages.resize(ref.messages.size() +1); - ref.messages.back().setDestination(destination); - return ref.messages.back(); -} - -void IncomingMessage::closeReference(const std::string& name) { - Reference refCopy; - { - Mutex::ScopedLock l(lock); - refCopy = getRefUnlocked(name); - references.erase(name); - } - for (std::vector<Message>::iterator i = refCopy.messages.begin(); - i != refCopy.messages.end(); - ++i) - { - i->setData(refCopy.data); - // TODO aconway 2007-03-23: Thread safety, - // can a destination be removed while we're doing this? - getDestination(i->getDestination()).message(*i); - } -} - - -void IncomingMessage::addDestination(std::string name, Destination& dest) { - Mutex::ScopedLock l(lock); - DestinationMap::iterator i = destinations.find(name); - if (i == destinations.end()) - destinations[name]=&dest; - else if (i->second != &dest) - throw ConnectionException( - 503, format("Destination already exists: %s.") % name); -} - -void IncomingMessage::removeDestination(std::string name) { - Mutex::ScopedLock l(lock); - DestinationMap::iterator i = destinations.find(name); - if (i == destinations.end()) - throw ConnectionException( - 503, format("No such destination: %s.") % name); - destinations.erase(i); -} - -IncomingMessage::Destination& IncomingMessage::getDestination( - const std::string& name) { - return getDestUnlocked(name); -} - -IncomingMessage::Reference& IncomingMessage::getReference( - const std::string& name) { - return getRefUnlocked(name); -} - -IncomingMessage::Reference& IncomingMessage::getRefUnlocked( - const std::string& name) { - Mutex::ScopedLock l(lock); - ReferenceMap::iterator i = references.find(name); - if (i == references.end()) - throw ConnectionException( - 503, format("No such reference: %s.") % name); - return i->second; -} - -IncomingMessage::Destination& IncomingMessage::getDestUnlocked( - const std::string& name) { - Mutex::ScopedLock l(lock); - DestinationMap::iterator i = destinations.find(name); - if (i == destinations.end()) - throw ConnectionException( - 503, format("No such destination: %s.") % name); - return *i->second; -} - -}} // namespace qpid::client diff --git a/qpid/cpp/src/client/IncomingMessage.h b/qpid/cpp/src/client/IncomingMessage.h deleted file mode 100644 index bc650dfbe1..0000000000 --- a/qpid/cpp/src/client/IncomingMessage.h +++ /dev/null @@ -1,136 +0,0 @@ -#ifndef _IncomingMessage_ -#define _IncomingMessage_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "../sys/Monitor.h" -#include <map> -#include <queue> -#include <vector> -#include <boost/variant.hpp> - -namespace qpid { -namespace client { - -class Message; - -/** - * Manage incoming messages. - * - * Uses reference and destination concepts from 0-9 Messsage class. - * - * Basic messages use special destination and reference names to indicate - * get-ok, return etc. messages. - * - */ -class IncomingMessage { - public: - /** Accumulate data associated with a set of messages. */ - struct Reference { - std::string data; - std::vector<Message> messages; - }; - - /** Interface to a destination for messages. */ - class Destination { - public: - virtual ~Destination(); - - /** Pass a message to the destination */ - virtual void message(const Message&) = 0; - - /** Notify destination of queue-empty contition */ - virtual void empty() = 0; - }; - - - /** A destination that a thread can wait on till a message arrives. */ - class WaitableDestination : public Destination - { - public: - WaitableDestination(); - void message(const Message& msg); - void empty(); - /** Wait till message() or empty() is called. True for message() */ - bool wait(Message& msgOut); - void shutdown(); - - private: - struct Empty {}; - typedef boost::variant<Message,Empty> Item; - sys::Monitor monitor; - std::queue<Item> queue; - bool shutdownFlag; - }; - - - - /** Add a reference. Throws if already open. */ - void openReference(const std::string& name); - - /** Get a reference. Throws if not already open. */ - void appendReference(const std::string& name, - const std::string& data); - - /** Create a message to destination associated with reference - *@exception if destination or reference non-existent. - */ - Message& createMessage(const std::string& destination, - const std::string& reference); - - /** Get a reference. - *@exception if non-existent. - */ - Reference& getReference(const std::string& name); - - /** Close a reference and deliver all its messages. - * Throws if not open or a message has an invalid destination. - */ - void closeReference(const std::string& name); - - /** Add a destination. - *@exception if a different Destination is already registered - * under name. - */ - void addDestination(std::string name, Destination&); - - /** Remove a destination. Throws if does not exist */ - void removeDestination(std::string name); - - /** Get a destination. Throws if does not exist */ - Destination& getDestination(const std::string& name); - private: - - typedef std::map<std::string, Reference> ReferenceMap; - typedef std::map<std::string, Destination*> DestinationMap; - - Reference& getRefUnlocked(const std::string& name); - Destination& getDestUnlocked(const std::string& name); - - mutable sys::Mutex lock; - ReferenceMap references; - DestinationMap destinations; -}; - -}} - - -#endif diff --git a/qpid/cpp/src/client/MessageChannel.h b/qpid/cpp/src/client/MessageChannel.h deleted file mode 100644 index 2fa387b7f7..0000000000 --- a/qpid/cpp/src/client/MessageChannel.h +++ /dev/null @@ -1,94 +0,0 @@ -#ifndef _client_MessageChannel_h -#define _client_MessageChannel_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "../shared_ptr.h" -#include "../sys/Runnable.h" -#include "AckMode.h" - -namespace qpid { - -namespace framing { -class AMQMethodBody; -class AMQHeaderBody; -class AMQContentBody; -class FieldTable; -} - -namespace client { - -class Channel; -class Message; -class Queue; -class Exchange; -class MessageListener; -class ReturnedMessageHandler; - -/** - * Abstract interface for messaging implementation for a channel. - * - *@see Channel for documentation. - */ -class MessageChannel : public sys::Runnable -{ - public: - /**@see Channel::consume */ - virtual void consume( - Queue& queue, std::string& tag, MessageListener* listener, - AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, - const framing::FieldTable* fields = 0) = 0; - - /**@see Channel::cancel */ - virtual void cancel(const std::string& tag, bool synch = true) = 0; - - /**@see Channel::get */ - virtual bool get( - Message& msg, const Queue& queue, AckMode ackMode = NO_ACK) = 0; - - /**@see Channel::get */ - virtual void publish(const Message& msg, const Exchange& exchange, - const std::string& routingKey, - bool mandatory = false, bool immediate = false) = 0; - - /**@see Channel::setReturnedMessageHandler */ - virtual void setReturnedMessageHandler( - ReturnedMessageHandler* handler) = 0; - - /** Handle an incoming method. */ - virtual void handle(shared_ptr<framing::AMQMethodBody>) = 0; - - /** Handle an incoming header */ - virtual void handle(shared_ptr<framing::AMQHeaderBody>) = 0; - - /** Handle an incoming content */ - virtual void handle(shared_ptr<framing::AMQContentBody>) = 0; - - /** Send channel's QOS settings */ - virtual void setQos() = 0; - - /** Channel is closing */ - virtual void close() = 0; -}; - -}} // namespace qpid::client - - - -#endif /*!_client_MessageChannel_h*/ diff --git a/qpid/cpp/src/client/MessageListener.cpp b/qpid/cpp/src/client/MessageListener.cpp deleted file mode 100644 index 68ebedeb0d..0000000000 --- a/qpid/cpp/src/client/MessageListener.cpp +++ /dev/null @@ -1,24 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "MessageListener.h" - -qpid::client::MessageListener::~MessageListener() {} diff --git a/qpid/cpp/src/client/MessageListener.h b/qpid/cpp/src/client/MessageListener.h deleted file mode 100644 index 501862a3ef..0000000000 --- a/qpid/cpp/src/client/MessageListener.h +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> - -#ifndef _MessageListener_ -#define _MessageListener_ - -#include "ClientMessage.h" - -namespace qpid { -namespace client { - - /** - * An interface through which asynchronously delivered messages - * can be received by an application. - * - * @see Channel::consume() - * - * \ingroup clientapi - */ - class MessageListener{ - public: - virtual ~MessageListener(); - virtual void received(Message& msg) = 0; - }; - -} -} - - -#endif diff --git a/qpid/cpp/src/client/MessageMessageChannel.cpp b/qpid/cpp/src/client/MessageMessageChannel.cpp deleted file mode 100644 index 8d0fdc3189..0000000000 --- a/qpid/cpp/src/client/MessageMessageChannel.cpp +++ /dev/null @@ -1,431 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <iostream> -#include <boost/format.hpp> -#include "MessageMessageChannel.h" -#include "../framing/AMQMethodBody.h" -#include "ClientChannel.h" -#include "ReturnedMessageHandler.h" -#include "MessageListener.h" -#include "../framing/FieldTable.h" -#include "Connection.h" -#include "../shared_ptr.h" -#include <boost/bind.hpp> - -namespace qpid { -namespace client { - -using namespace std; -using namespace sys; -using namespace framing; - -MessageMessageChannel::MessageMessageChannel(Channel& ch) - : channel(ch), tagCount(0) {} - -string MessageMessageChannel::newTag() { - Mutex::ScopedLock l(lock); - return (boost::format("__tag%d")%++tagCount).str(); -} - -void MessageMessageChannel::consume( - Queue& queue, std::string& tag, MessageListener* /*listener*/, - AckMode ackMode, bool noLocal, bool /*synch*/, const FieldTable* fields) -{ - if (tag.empty()) - tag = newTag(); - channel.sendAndReceive<MessageOkBody>( - make_shared_ptr(new MessageConsumeBody( - channel.getVersion(), 0, queue.getName(), tag, noLocal, - ackMode == NO_ACK, false, fields ? *fields : FieldTable()))); - -// // FIXME aconway 2007-02-20: Race condition! -// // We could receive the first message for the consumer -// // before we create the consumer below. -// // Move consumer creation to handler for MessageConsumeOkBody -// { -// Mutex::ScopedLock l(lock); -// ConsumerMap::iterator i = consumers.find(tag); -// if (i != consumers.end()) -// THROW_QPID_ERROR(CLIENT_ERROR, -// "Consumer already exists with tag="+tag); -// Consumer& c = consumers[tag]; -// c.listener = listener; -// c.ackMode = ackMode; -// c.lastDeliveryTag = 0; -// } -} - - -void MessageMessageChannel::cancel(const std::string& /*tag*/, bool /*synch*/) { - // FIXME aconway 2007-02-23: -// Consumer c; -// { -// Mutex::ScopedLock l(lock); -// ConsumerMap::iterator i = consumers.find(tag); -// if (i == consumers.end()) -// return; -// c = i->second; -// consumers.erase(i); -// } -// if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) -// channel.send(new MessageAckBody(channel.version, c.lastDeliveryTag, true)); -// channel.sendAndReceiveSync<MessageCancelOkBody>( -// synch, new MessageCancelBody(channel.version, tag, !synch)); -} - -void MessageMessageChannel::close(){ - // FIXME aconway 2007-02-23: -// ConsumerMap consumersCopy; -// { -// Mutex::ScopedLock l(lock); -// consumersCopy = consumers; -// consumers.clear(); -// } -// for (ConsumerMap::iterator i=consumersCopy.begin(); -// i != consumersCopy.end(); ++i) -// { -// Consumer& c = i->second; -// if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) -// && c.lastDeliveryTag > 0) -// { -// channel.send(new MessageAckBody(channel.version, c.lastDeliveryTag, true)); -// } -// } -// incoming.shutdown(); -} - - -/** Destination ID for the current get. - * Must not clash with a generated consumer ID. - * TODO aconway 2007-03-06: support multiple outstanding gets? - */ -const string getDestinationId("__get__"); - -/** - * A destination that provides a Correlator::Action to handle - * MessageEmpty responses. - */ -struct MessageGetDestination : public IncomingMessage::WaitableDestination -{ - void response(shared_ptr<AMQResponseBody> response) { - if (response->amqpClassId() == MessageOkBody::CLASS_ID) { - switch (response->amqpMethodId()) { - case MessageOkBody::METHOD_ID: - // Nothing to do, wait for transfer. - return; - case MessageEmptyBody::METHOD_ID: - empty(); // Wake up waiter with empty queue. - return; - } - } - throw QPID_ERROR(PROTOCOL_ERROR, "Invalid response"); - } - - Correlator::Action action() { - return boost::bind(&MessageGetDestination::response, this, _1); - } -}; - -bool MessageMessageChannel::get( - Message& msg, const Queue& queue, AckMode ackMode) -{ - Mutex::ScopedLock l(lock); - std::string destName=newTag(); - MessageGetDestination dest; - incoming.addDestination(destName, dest); - channel.send( - make_shared_ptr( - new MessageGetBody( - channel.version, 0, queue.getName(), destName, ackMode)), - dest.action()); - return dest.wait(msg); -} - - -/** Convert a message to a transfer command. */ -MessageTransferBody::shared_ptr makeTransfer( - ProtocolVersion version, - const Message& msg, const string& destination, - const std::string& routingKey, bool mandatory, bool immediate) -{ - return MessageTransferBody::shared_ptr( - new MessageTransferBody( - version, - 0, // FIXME aconway 2007-04-03: ticket. - destination, - msg.isRedelivered(), - immediate, - 0, // FIXME aconway 2007-02-23: ttl - msg.getPriority(), - msg.getTimestamp(), - static_cast<uint8_t>(msg.getDeliveryMode()), - 0, // FIXME aconway 2007-04-03: Expiration - string(), // Exchange: for broker use only. - routingKey, - msg.getMessageId(), - msg.getCorrelationId(), - msg.getReplyTo(), - msg.getContentType(), - msg.getContentEncoding(), - msg.getUserId(), - msg.getAppId(), - string(), // FIXME aconway 2007-04-03: TransactionId - string(), //FIXME aconway 2007-04-03: SecurityToken - msg.getHeaders(), - Content(INLINE, msg.getData()), - mandatory - )); -} - -// FIXME aconway 2007-04-05: Generated code should provide this. -/** - * Calculate the size of a frame containing the given body type - * if all variable-lengths parts are empty. - */ -template <class T> size_t overhead() { - static AMQFrame frame( - ProtocolVersion(), 0, make_shared_ptr(new T(ProtocolVersion()))); - return frame.size(); -} - -void MessageMessageChannel::publish( - const Message& msg, const Exchange& exchange, - const std::string& routingKey, bool mandatory, bool immediate) -{ - MessageTransferBody::shared_ptr transfer = makeTransfer( - channel.getVersion(), - msg, exchange.getName(), routingKey, mandatory, immediate); - // Frame itself uses 8 bytes. - u_int32_t frameMax = channel.connection->getMaxFrameSize() - 8; - if (transfer->size() <= frameMax) { - channel.sendAndReceive<MessageOkBody>(transfer); - } - else { - std::string ref = newTag(); - std::string data = transfer->getBody().getValue(); - size_t chunk = - channel.connection->getMaxFrameSize() - - (overhead<MessageAppendBody>() + ref.size()); - // TODO aconway 2007-04-05: cast around lack of generated setters - const_cast<Content&>(transfer->getBody()) = Content(REFERENCE,ref); - channel.send( - make_shared_ptr(new MessageOpenBody(channel.version, ref))); - channel.send(transfer); - const char* p = data.data(); - const char* end = data.data()+data.size(); - while (p+chunk <= end) { - channel.send( - make_shared_ptr( - new MessageAppendBody(channel.version, ref, std::string(p, chunk)))); - p += chunk; - } - if (p < end) { - channel.send( - make_shared_ptr( - new MessageAppendBody(channel.version, ref, std::string(p, end-p)))); - } - channel.send(make_shared_ptr(new MessageCloseBody(channel.version, ref))); - } -} - -void copy(Message& msg, MessageTransferBody& transfer) { - // FIXME aconway 2007-04-05: Verify all required fields - // are copied. - msg.setContentType(transfer.getContentType()); - msg.setContentEncoding(transfer.getContentEncoding()); - msg.setHeaders(transfer.getApplicationHeaders()); - msg.setDeliveryMode(DeliveryMode(transfer.getDeliveryMode())); - msg.setPriority(transfer.getPriority()); - msg.setCorrelationId(transfer.getCorrelationId()); - msg.setReplyTo(transfer.getReplyTo()); - // FIXME aconway 2007-04-05: TTL/Expiration - msg.setMessageId(transfer.getMessageId()); - msg.setTimestamp(transfer.getTimestamp()); - msg.setUserId(transfer.getUserId()); - msg.setAppId(transfer.getAppId()); - msg.setDestination(transfer.getDestination()); - msg.setRedelivered(transfer.getRedelivered()); - msg.setDeliveryTag(0); // No meaning in 0-9 - if (transfer.getBody().isInline()) - msg.setData(transfer.getBody().getValue()); -} - -void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) { - assert(method->amqpClassId() ==MessageTransferBody::CLASS_ID); - switch(method->amqpMethodId()) { - case MessageAppendBody::METHOD_ID: { - MessageAppendBody::shared_ptr append = - shared_polymorphic_downcast<MessageAppendBody>(method); - incoming.appendReference(append->getReference(), append->getBytes()); - break; - } - case MessageOpenBody::METHOD_ID: { - MessageOpenBody::shared_ptr open = - shared_polymorphic_downcast<MessageOpenBody>(method); - incoming.openReference(open->getReference()); - break; - } - - case MessageCloseBody::METHOD_ID: { - MessageCloseBody::shared_ptr close = - shared_polymorphic_downcast<MessageCloseBody>(method); - incoming.closeReference(close->getReference()); - break; - } - - case MessageTransferBody::METHOD_ID: { - MessageTransferBody::shared_ptr transfer= - shared_polymorphic_downcast<MessageTransferBody>(method); - if (transfer->getBody().isInline()) { - Message msg; - copy(msg, *transfer); - // Deliver it. - incoming.getDestination(transfer->getDestination()).message(msg); - } - else { - Message& msg=incoming.createMessage( - transfer->getDestination(), transfer->getBody().getValue()); - copy(msg, *transfer); - // Will be delivered when reference closes. - } - break; - } - - case MessageEmptyBody::METHOD_ID: - case MessageOkBody::METHOD_ID: - // Nothing to do - break; - - // FIXME aconway 2007-04-03: TODO - case MessageCancelBody::METHOD_ID: - case MessageCheckpointBody::METHOD_ID: - case MessageOffsetBody::METHOD_ID: - case MessageQosBody::METHOD_ID: - case MessageRecoverBody::METHOD_ID: - case MessageRejectBody::METHOD_ID: - case MessageResumeBody::METHOD_ID: - break; - default: - throw Channel::UnknownMethod(); - } -} - -void MessageMessageChannel::handle(AMQHeaderBody::shared_ptr ){ - throw QPID_ERROR(INTERNAL_ERROR, "Basic protocol not supported"); -} - -void MessageMessageChannel::handle(AMQContentBody::shared_ptr ){ - throw QPID_ERROR(INTERNAL_ERROR, "Basic protocol not supported"); -} - -// FIXME aconway 2007-02-23: -// void MessageMessageChannel::deliver(IncomingMessage::Destination& consumer, Message& msg){ -// //record delivery tag: -// consumer.lastDeliveryTag = msg.getDeliveryTag(); - -// //allow registered listener to handle the message -// consumer.listener->received(msg); - -// if(channel.isOpen()){ -// bool multiple(false); -// switch(consumer.ackMode){ -// case LAZY_ACK: -// multiple = true; -// if(++(consumer.count) < channel.getPrefetch()) -// break; -// //else drop-through -// case AUTO_ACK: -// consumer.lastDeliveryTag = 0; -// channel.send( -// new MessageAckBody( -// channel.version, msg.getDeliveryTag(), multiple)); -// case NO_ACK: // Nothing to do -// case CLIENT_ACK: // User code must ack. -// break; -// // TODO aconway 2007-02-22: Provide a way for user -// // to ack! -// } -// } - -// //as it stands, transactionality is entirely orthogonal to ack -// //mode, though the acks will not be processed by the broker under -// //a transaction until it commits. -// } - - -void MessageMessageChannel::run() { - // FIXME aconway 2007-02-23: -// while(channel.isOpen()) { -// try { -// Message msg = incoming.waitDispatch(); -// if(msg.getMethod()->isA<MessageReturnBody>()) { -// ReturnedMessageHandler* handler=0; -// { -// Mutex::ScopedLock l(lock); -// handler=returnsHandler; -// } -// if(handler == 0) { -// // TODO aconway 2007-02-20: proper logging. -// cout << "Message returned: " << msg.getData() << endl; -// } -// else -// handler->returned(msg); -// } -// else { -// MessageDeliverBody::shared_ptr deliverBody = -// boost::shared_polymorphic_downcast<MessageDeliverBody>( -// msg.getMethod()); -// std::string tag = deliverBody->getConsumerTag(); -// Consumer consumer; -// { -// Mutex::ScopedLock l(lock); -// ConsumerMap::iterator i = consumers.find(tag); -// if(i == consumers.end()) -// THROW_QPID_ERROR(PROTOCOL_ERROR+504, -// "Unknown consumer tag=" + tag); -// consumer = i->second; -// } -// deliver(consumer, msg); -// } -// } -// catch (const ShutdownException&) { -// /* Orderly shutdown */ -// } -// catch (const Exception& e) { -// // FIXME aconway 2007-02-20: Report exception to user. -// cout << "client::Message::run() terminated by: " << e.toString() -// << "(" << typeid(e).name() << ")" << endl; -// } -// } -} - -void MessageMessageChannel::setReturnedMessageHandler( - ReturnedMessageHandler* ) -{ - throw QPID_ERROR(INTERNAL_ERROR, "Message class does not support returns"); -} - -void MessageMessageChannel::setQos(){ - channel.sendAndReceive<MessageOkBody>( - make_shared_ptr(new MessageQosBody(channel.version, 0, channel.getPrefetch(), false))); - if(channel.isTransactional()) - channel.sendAndReceive<TxSelectOkBody>( - make_shared_ptr(new TxSelectBody(channel.version))); -} - -}} // namespace qpid::client diff --git a/qpid/cpp/src/client/MessageMessageChannel.h b/qpid/cpp/src/client/MessageMessageChannel.h deleted file mode 100644 index 4c4721ce90..0000000000 --- a/qpid/cpp/src/client/MessageMessageChannel.h +++ /dev/null @@ -1,82 +0,0 @@ -#ifndef _client_MessageMessageChannel_h -#define _client_MessageMessageChannel_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "MessageChannel.h" -#include "IncomingMessage.h" -#include "../sys/Monitor.h" -#include <boost/ptr_container/ptr_map.hpp> - -namespace qpid { -namespace client { -/** - * Messaging implementation using AMQP 0-9 MessageMessageChannel class - * to send and receiving messages. - */ -class MessageMessageChannel : public MessageChannel -{ - public: - MessageMessageChannel(Channel& parent); - - void consume( - Queue& queue, std::string& tag, MessageListener* listener, - AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, - const framing::FieldTable* fields = 0); - - void cancel(const std::string& tag, bool synch = true); - - bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK); - - void publish(const Message& msg, const Exchange& exchange, - const std::string& routingKey, - bool mandatory = false, bool immediate = false); - - void setReturnedMessageHandler(ReturnedMessageHandler* handler); - - void run(); - - void handle(boost::shared_ptr<framing::AMQMethodBody>); - - void handle(shared_ptr<framing::AMQHeaderBody>); - - void handle(shared_ptr<framing::AMQContentBody>); - - void setQos(); - - void close(); - - private: - typedef boost::ptr_map<std::string, IncomingMessage::WaitableDestination> - Destinations; - - std::string newTag(); - - sys::Mutex lock; - Channel& channel; - IncomingMessage incoming; - long tagCount; -}; - -}} // namespace qpid::client - - - -#endif /*!_client_MessageMessageChannel_h*/ - diff --git a/qpid/cpp/src/client/MethodBodyInstances.h b/qpid/cpp/src/client/MethodBodyInstances.h deleted file mode 100644 index 57b9bf73ce..0000000000 --- a/qpid/cpp/src/client/MethodBodyInstances.h +++ /dev/null @@ -1,100 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "../framing/amqp_framing.h" - -#ifndef _MethodBodyInstances_h_ -#define _MethodBodyInstances_h_ - -namespace qpid { -namespace client { - -/** - * A list of method body instances that can be used to compare against - * incoming bodies. - */ -class MethodBodyInstances -{ -private: - qpid::framing::ProtocolVersion version; -public: - const qpid::framing::BasicCancelOkBody basic_cancel_ok; - const qpid::framing::BasicConsumeOkBody basic_consume_ok; - const qpid::framing::BasicDeliverBody basic_deliver; - const qpid::framing::BasicGetEmptyBody basic_get_empty; - const qpid::framing::BasicGetOkBody basic_get_ok; - const qpid::framing::BasicQosOkBody basic_qos_ok; - const qpid::framing::BasicReturnBody basic_return; - const qpid::framing::ChannelCloseBody channel_close; - const qpid::framing::ChannelCloseOkBody channel_close_ok; - const qpid::framing::ChannelFlowBody channel_flow; - const qpid::framing::ChannelOpenOkBody channel_open_ok; - const qpid::framing::ConnectionCloseBody connection_close; - const qpid::framing::ConnectionCloseOkBody connection_close_ok; - const qpid::framing::ConnectionOpenOkBody connection_open_ok; - const qpid::framing::ConnectionRedirectBody connection_redirect; - const qpid::framing::ConnectionStartBody connection_start; - const qpid::framing::ConnectionTuneBody connection_tune; - const qpid::framing::ExchangeDeclareOkBody exchange_declare_ok; - const qpid::framing::ExchangeDeleteOkBody exchange_delete_ok; - const qpid::framing::QueueDeclareOkBody queue_declare_ok; - const qpid::framing::QueueDeleteOkBody queue_delete_ok; - const qpid::framing::QueueBindOkBody queue_bind_ok; - const qpid::framing::TxCommitOkBody tx_commit_ok; - const qpid::framing::TxRollbackOkBody tx_rollback_ok; - const qpid::framing::TxSelectOkBody tx_select_ok; - - MethodBodyInstances(uint8_t major, uint8_t minor) : - version(major, minor), - basic_cancel_ok(version), - basic_consume_ok(version), - basic_deliver(version), - basic_get_empty(version), - basic_get_ok(version), - basic_qos_ok(version), - basic_return(version), - channel_close(version), - channel_close_ok(version), - channel_flow(version), - channel_open_ok(version), - connection_close(version), - connection_close_ok(version), - connection_open_ok(version), - connection_redirect(version), - connection_start(version), - connection_tune(version), - exchange_declare_ok(version), - exchange_delete_ok(version), - queue_declare_ok(version), - queue_delete_ok(version), - queue_bind_ok(version), - tx_commit_ok(version), - tx_rollback_ok(version), - tx_select_ok(version) - {} - -}; - -static MethodBodyInstances method_bodies(8, 0); - -} // namespace client -} // namespace qpid - -#endif diff --git a/qpid/cpp/src/client/ResponseHandler.cpp b/qpid/cpp/src/client/ResponseHandler.cpp deleted file mode 100644 index ca0129d587..0000000000 --- a/qpid/cpp/src/client/ResponseHandler.cpp +++ /dev/null @@ -1,79 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "../QpidError.h" -#include <boost/format.hpp> -#include "ResponseHandler.h" -#include "../framing/AMQMethodBody.h" - -using namespace qpid::sys; -using namespace qpid::framing; - -namespace qpid { -namespace client { - -ResponseHandler::ResponseHandler() : waiting(false), shutdownFlag(false) {} - -ResponseHandler::~ResponseHandler(){} - -bool ResponseHandler::isWaiting() { - Monitor::ScopedLock l(monitor); - return waiting; -} - -void ResponseHandler::expect(){ - Monitor::ScopedLock l(monitor); - waiting = true; -} - -void ResponseHandler::signalResponse(MethodPtr _response) -{ - Monitor::ScopedLock l(monitor); - response = _response; - if (!response) - shutdownFlag=true; - waiting = false; - monitor.notify(); -} - -ResponseHandler::MethodPtr ResponseHandler::receive() { - Monitor::ScopedLock l(monitor); - while (!response && !shutdownFlag) - monitor.wait(); - if (shutdownFlag) - THROW_QPID_ERROR( - PROTOCOL_ERROR, "Channel closed unexpectedly."); - MethodPtr result = response; - response.reset(); - return result; -} - -ResponseHandler::MethodPtr ResponseHandler::receive(ClassId c, MethodId m) { - MethodPtr response = receive(); - if(c != response->amqpClassId() || m != response->amqpMethodId()) { - THROW_QPID_ERROR( - PROTOCOL_ERROR, - boost::format("Expected class:method %d:%d, got %d:%d") - % c % m % response->amqpClassId() % response->amqpMethodId()); - } - return response; -} - -}} // namespace qpid::client diff --git a/qpid/cpp/src/client/ResponseHandler.h b/qpid/cpp/src/client/ResponseHandler.h deleted file mode 100644 index 289a5dd994..0000000000 --- a/qpid/cpp/src/client/ResponseHandler.h +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "../shared_ptr.h" -#include "../sys/Monitor.h" - -#ifndef _ResponseHandler_ -#define _ResponseHandler_ - -namespace qpid { - -namespace framing { -class AMQMethodBody; -} - -namespace client { - -/** - * Holds a response from the broker peer for the client. - */ -class ResponseHandler{ - typedef shared_ptr<framing::AMQMethodBody> MethodPtr; - bool waiting; - bool shutdownFlag; - MethodPtr response; - sys::Monitor monitor; - - public: - ResponseHandler(); - ~ResponseHandler(); - - /** Is a response expected? */ - bool isWaiting(); - - /** Provide a response to the waiting thread */ - void signalResponse(MethodPtr response); - - /** Indicate a message is expected. */ - void expect(); - - /** Wait for a response. */ - MethodPtr receive(); - - /** Wait for a specific response. */ - MethodPtr receive(framing::ClassId, framing::MethodId); - - /** Template version of receive returns typed pointer. */ - template <class BodyType> - shared_ptr<BodyType> receive() { - return shared_polymorphic_downcast<BodyType>( - receive(BodyType::CLASS_ID, BodyType::METHOD_ID)); - } -}; - -}} - - -#endif diff --git a/qpid/cpp/src/client/ReturnedMessageHandler.cpp b/qpid/cpp/src/client/ReturnedMessageHandler.cpp deleted file mode 100644 index 35d0b5c0a8..0000000000 --- a/qpid/cpp/src/client/ReturnedMessageHandler.cpp +++ /dev/null @@ -1,24 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ReturnedMessageHandler.h" - -qpid::client::ReturnedMessageHandler::~ReturnedMessageHandler() {} diff --git a/qpid/cpp/src/client/ReturnedMessageHandler.h b/qpid/cpp/src/client/ReturnedMessageHandler.h deleted file mode 100644 index 8b42fc0764..0000000000 --- a/qpid/cpp/src/client/ReturnedMessageHandler.h +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> - -#ifndef _ReturnedMessageHandler_ -#define _ReturnedMessageHandler_ - -#include "ClientMessage.h" - -namespace qpid { -namespace client { - - /** - * An interface through which returned messages can be received by - * an application. - * - * @see Channel::setReturnedMessageHandler() - * - * \ingroup clientapi - */ - class ReturnedMessageHandler{ - public: - virtual ~ReturnedMessageHandler(); - virtual void returned(Message& msg) = 0; - }; - -} -} - - -#endif |
