diff options
| author | Alan Conway <aconway@apache.org> | 2006-10-16 13:50:26 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2006-10-16 13:50:26 +0000 |
| commit | 8a6ab3aa61d441b9210c05c84dc9998acfc38737 (patch) | |
| tree | 1eb9d7f39b5c2d04a85a1f66caef3d398567b740 /cpp/src/qpid/client | |
| parent | 9a808fb13aba243d41bbdab75158dae5939a80a4 (diff) | |
| download | qpid-python-8a6ab3aa61d441b9210c05c84dc9998acfc38737.tar.gz | |
Build system reorg, see README and Makefile comments for details.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@464494 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/Channel.cpp | 438 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Channel.h | 127 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connection.cpp | 237 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connection.h | 105 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Exchange.cpp | 30 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Exchange.h | 49 | ||||
| -rw-r--r-- | cpp/src/qpid/client/IncomingMessage.cpp | 85 | ||||
| -rw-r--r-- | cpp/src/qpid/client/IncomingMessage.h | 60 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Message.cpp | 147 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Message.h | 86 | ||||
| -rw-r--r-- | cpp/src/qpid/client/MessageListener.cpp | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/client/MessageListener.h | 38 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Queue.cpp | 47 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Queue.h | 47 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ResponseHandler.cpp | 63 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ResponseHandler.h | 49 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ReturnedMessageHandler.cpp | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ReturnedMessageHandler.h | 38 |
18 files changed, 1688 insertions, 0 deletions
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp new file mode 100644 index 0000000000..0563dbaaba --- /dev/null +++ b/cpp/src/qpid/client/Channel.cpp @@ -0,0 +1,438 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/client/Channel.h" +#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/ThreadFactoryImpl.h" +#include "qpid/client/Message.h" +#include "qpid/QpidError.h" + +using namespace std::tr1;//to use dynamic_pointer_cast +using namespace qpid::client; +using namespace qpid::framing; +using namespace qpid::concurrent; + +Channel::Channel(bool _transactional, u_int16_t _prefetch) : + id(0), + con(0), + dispatcher(0), + out(0), + incoming(0), + closed(true), + prefetch(_prefetch), + transactional(_transactional) +{ + threadFactory = new ThreadFactoryImpl(); + dispatchMonitor = new MonitorImpl(); + retrievalMonitor = new MonitorImpl(); +} + +Channel::~Channel(){ + if(dispatcher){ + stop(); + delete dispatcher; + } + delete retrievalMonitor; + delete dispatchMonitor; + delete threadFactory; +} + +void Channel::setPrefetch(u_int16_t _prefetch){ + prefetch = _prefetch; + if(con != 0 && out != 0){ + setQos(); + } +} + +void Channel::setQos(){ + sendAndReceive(new AMQFrame(id, new BasicQosBody(0, prefetch, false)), basic_qos_ok); + if(transactional){ + sendAndReceive(new AMQFrame(id, new TxSelectBody()), tx_select_ok); + } +} + +void Channel::declareExchange(Exchange& exchange, bool synch){ + string name = exchange.getName(); + string type = exchange.getType(); + FieldTable args; + AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(0, name, type, false, false, false, false, !synch, args)); + if(synch){ + sendAndReceive(frame, exchange_declare_ok); + }else{ + out->send(frame); + } +} + +void Channel::deleteExchange(Exchange& exchange, bool synch){ + string name = exchange.getName(); + AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(0, name, false, !synch)); + if(synch){ + sendAndReceive(frame, exchange_delete_ok); + }else{ + out->send(frame); + } +} + +void Channel::declareQueue(Queue& queue, bool synch){ + string name = queue.getName(); + FieldTable args; + AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(0, name, false, false, + queue.isExclusive(), + queue.isAutoDelete(), !synch, args)); + if(synch){ + sendAndReceive(frame, queue_declare_ok); + if(queue.getName().length() == 0){ + QueueDeclareOkBody::shared_ptr response = + dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse()); + queue.setName(response->getQueue()); + } + }else{ + out->send(frame); + } +} + +void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){ + //ticket, queue, ifunused, ifempty, nowait + string name = queue.getName(); + AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(0, name, ifunused, ifempty, !synch)); + if(synch){ + sendAndReceive(frame, queue_delete_ok); + }else{ + out->send(frame); + } +} + +void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ + string e = exchange.getName(); + string q = queue.getName(); + // TODO aconway 2006-10-10: not const correct, get rid of const_cast. + // + AMQFrame* frame = new AMQFrame(id, new QueueBindBody(0, q, e, key,!synch, const_cast<FieldTable&>(args))); + if(synch){ + sendAndReceive(frame, queue_bind_ok); + }else{ + out->send(frame); + } +} + +void Channel::consume(Queue& queue, std::string& tag, MessageListener* listener, + int ackMode, bool noLocal, bool synch){ + + string q = queue.getName(); + AMQFrame* frame = new AMQFrame(id, new BasicConsumeBody(0, q, (string&) tag, noLocal, ackMode == NO_ACK, false, !synch)); + if(synch){ + sendAndReceive(frame, basic_consume_ok); + BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse()); + tag = response->getConsumerTag(); + }else{ + out->send(frame); + } + Consumer* c = new Consumer(); + c->listener = listener; + c->ackMode = ackMode; + c->lastDeliveryTag = 0; + consumers[tag] = c; +} + +void Channel::cancel(std::string& tag, bool synch){ + Consumer* c = consumers[tag]; + if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){ + out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true))); + } + + AMQFrame* frame = new AMQFrame(id, new BasicCancelBody((string&) tag, !synch)); + if(synch){ + sendAndReceive(frame, basic_cancel_ok); + }else{ + out->send(frame); + } + consumers.erase(tag); + if(c != 0){ + delete c; + } +} + +void Channel::cancelAll(){ + for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){ + Consumer* c = i->second; + if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){ + out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true))); + } + consumers.erase(i); + delete c; + } +} + +void Channel::retrieve(Message& msg){ + retrievalMonitor->acquire(); + while(retrieved == 0){ + retrievalMonitor->wait(); + } + + msg.header = retrieved->getHeader(); + msg.deliveryTag = retrieved->getDeliveryTag(); + retrieved->getData(msg.data); + delete retrieved; + retrieved = 0; + + retrievalMonitor->release(); +} + +bool Channel::get(Message& msg, const Queue& queue, int ackMode){ + string name = queue.getName(); + AMQFrame* frame = new AMQFrame(id, new BasicGetBody(0, name, ackMode)); + responses.expect(); + out->send(frame); + responses.waitForResponse(); + AMQMethodBody::shared_ptr response = responses.getResponse(); + if(basic_get_ok.match(response.get())){ + if(incoming != 0){ + std::cout << "Existing message not complete" << std::endl; + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); + }else{ + incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response)); + } + retrieve(msg); + return true; + }if(basic_get_empty.match(response.get())){ + return false; + }else{ + THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get."); + } +} + + +void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){ + string e = exchange.getName(); + string key = routingKey; + + out->send(new AMQFrame(id, new BasicPublishBody(0, e, key, mandatory, immediate))); + //break msg up into header frame and content frame(s) and send these + string data = msg.getData(); + msg.header->setContentSize(data.length()); + AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header)); + out->send(new AMQFrame(id, body)); + + int data_length = data.length(); + if(data_length > 0){ + //TODO fragmentation of messages, need to know max frame size for connection + int frag_size = con->getMaxFrameSize() - 4; + if(data_length < frag_size){ + out->send(new AMQFrame(id, new AMQContentBody(data))); + }else{ + int frag_count = data_length / frag_size; + for(int i = 0; i < frag_count; i++){ + int pos = i*frag_size; + int len = i < frag_count - 1 ? frag_size : data_length - pos; + string frag(data.substr(pos, len)); + out->send(new AMQFrame(id, new AMQContentBody(frag))); + } + } + } +} + +void Channel::commit(){ + AMQFrame* frame = new AMQFrame(id, new TxCommitBody()); + sendAndReceive(frame, tx_commit_ok); +} + +void Channel::rollback(){ + AMQFrame* frame = new AMQFrame(id, new TxRollbackBody()); + sendAndReceive(frame, tx_rollback_ok); +} + +void Channel::handleMethod(AMQMethodBody::shared_ptr body){ + //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request + if(responses.isWaiting()){ + responses.signalResponse(body); + }else if(basic_deliver.match(body.get())){ + if(incoming != 0){ + std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl; + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); + }else{ + incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body)); + } + }else if(basic_return.match(body.get())){ + if(incoming != 0){ + std::cout << "Existing message not complete" << std::endl; + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); + }else{ + incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body)); + } + }else if(channel_close.match(body.get())){ + con->removeChannel(this); + //need to signal application that channel has been closed through exception + + }else if(channel_flow.match(body.get())){ + + }else{ + //signal error + std::cout << "Unhandled method: " << *body << std::endl; + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Unhandled method"); + } +} + +void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ + if(incoming == 0){ + //handle invalid frame sequence + std::cout << "Invalid message sequence: got header before return or deliver." << std::endl; + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver."); + }else{ + incoming->setHeader(body); + if(incoming->isComplete()){ + enqueue(); + } + } +} + +void Channel::handleContent(AMQContentBody::shared_ptr body){ + if(incoming == 0){ + //handle invalid frame sequence + std::cout << "Invalid message sequence: got content before return or deliver." << std::endl; + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver."); + }else{ + incoming->addContent(body); + if(incoming->isComplete()){ + enqueue(); + } + } +} + +void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ + THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat"); +} + +void Channel::start(){ + dispatcher = threadFactory->create(this); + dispatcher->start(); +} + +void Channel::stop(){ + closed = true; + dispatchMonitor->acquire(); + dispatchMonitor->notify(); + dispatchMonitor->release(); + if(dispatcher){ + dispatcher->join(); + } +} + +void Channel::run(){ + dispatch(); +} + +void Channel::enqueue(){ + if(incoming->isResponse()){ + retrievalMonitor->acquire(); + retrieved = incoming; + retrievalMonitor->notify(); + retrievalMonitor->release(); + }else{ + dispatchMonitor->acquire(); + messages.push(incoming); + dispatchMonitor->notify(); + dispatchMonitor->release(); + } + incoming = 0; +} + +IncomingMessage* Channel::dequeue(){ + dispatchMonitor->acquire(); + while(messages.empty() && !closed){ + dispatchMonitor->wait(); + } + IncomingMessage* msg = 0; + if(!messages.empty()){ + msg = messages.front(); + messages.pop(); + } + dispatchMonitor->release(); + return msg; +} + +void Channel::deliver(Consumer* consumer, Message& msg){ + //record delivery tag: + consumer->lastDeliveryTag = msg.getDeliveryTag(); + + //allow registered listener to handle the message + consumer->listener->received(msg); + + //if the handler calls close on the channel or connection while + //handling this message, then consumer will now have been deleted. + if(!closed){ + bool multiple(false); + switch(consumer->ackMode){ + case LAZY_ACK: + multiple = true; + if(++(consumer->count) < prefetch) break; + //else drop-through + case AUTO_ACK: + out->send(new AMQFrame(id, new BasicAckBody(msg.getDeliveryTag(), multiple))); + consumer->lastDeliveryTag = 0; + } + } + + //as it stands, transactionality is entirely orthogonal to ack + //mode, though the acks will not be processed by the broker under + //a transaction until it commits. +} + +void Channel::dispatch(){ + while(!closed){ + IncomingMessage* incomingMsg = dequeue(); + if(incomingMsg){ + //Note: msg is currently only valid for duration of this call + Message msg(incomingMsg->getHeader()); + incomingMsg->getData(msg.data); + if(incomingMsg->isReturn()){ + if(returnsHandler == 0){ + //print warning to log/console + std::cout << "Message returned: " << msg.getData() << std::endl; + }else{ + returnsHandler->returned(msg); + } + }else{ + msg.deliveryTag = incomingMsg->getDeliveryTag(); + std::string tag = incomingMsg->getConsumerTag(); + + if(consumers[tag] == 0){ + //signal error + std::cout << "Unknown consumer: " << tag << std::endl; + }else{ + deliver(consumers[tag], msg); + } + } + delete incomingMsg; + } + } +} + +void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){ + returnsHandler = handler; +} + +void Channel::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){ + responses.expect(); + out->send(frame); + responses.receive(body); +} + +void Channel::close(){ + if(con != 0){ + con->closeChannel(this); + } +} diff --git a/cpp/src/qpid/client/Channel.h b/cpp/src/qpid/client/Channel.h new file mode 100644 index 0000000000..0b60a827b7 --- /dev/null +++ b/cpp/src/qpid/client/Channel.h @@ -0,0 +1,127 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <map> +#include <string> +#include <queue> +#include "sys/types.h" + +#ifndef _Channel_ +#define _Channel_ + +#include "qpid/framing/amqp_framing.h" + +#include "qpid/concurrent/ThreadFactory.h" + +#include "qpid/client/Connection.h" +#include "qpid/client/Exchange.h" +#include "qpid/client/IncomingMessage.h" +#include "qpid/client/Message.h" +#include "qpid/client/MessageListener.h" +#include "qpid/client/Queue.h" +#include "qpid/client/ResponseHandler.h" +#include "qpid/client/ReturnedMessageHandler.h" + +namespace qpid { +namespace client { + enum ack_modes {NO_ACK=0, AUTO_ACK=1, LAZY_ACK=2, CLIENT_ACK=3}; + + class Channel : private virtual qpid::framing::BodyHandler, public virtual qpid::concurrent::Runnable{ + struct Consumer{ + MessageListener* listener; + int ackMode; + int count; + u_int64_t lastDeliveryTag; + }; + typedef std::map<string,Consumer*>::iterator consumer_iterator; + + u_int16_t id; + Connection* con; + qpid::concurrent::ThreadFactory* threadFactory; + qpid::concurrent::Thread* dispatcher; + qpid::framing::OutputHandler* out; + IncomingMessage* incoming; + ResponseHandler responses; + std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume + IncomingMessage* retrieved;//holds response to basic.get + qpid::concurrent::Monitor* dispatchMonitor; + qpid::concurrent::Monitor* retrievalMonitor; + std::map<std::string, Consumer*> consumers; + ReturnedMessageHandler* returnsHandler; + bool closed; + + u_int16_t prefetch; + const bool transactional; + + void enqueue(); + void retrieve(Message& msg); + IncomingMessage* dequeue(); + void dispatch(); + void stop(); + void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body); + void deliver(Consumer* consumer, Message& msg); + void setQos(); + void cancelAll(); + + virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body); + virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body); + virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body); + virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); + + public: + Channel(bool transactional = false, u_int16_t prefetch = 500); + ~Channel(); + + void declareExchange(Exchange& exchange, bool synch = true); + void deleteExchange(Exchange& exchange, bool synch = true); + void declareQueue(Queue& queue, bool synch = true); + void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true); + void bind(const Exchange& exchange, const Queue& queue, const std::string& key, + const qpid::framing::FieldTable& args, bool synch = true); + void consume(Queue& queue, std::string& tag, MessageListener* listener, + int ackMode = NO_ACK, bool noLocal = false, bool synch = true); + void cancel(std::string& tag, bool synch = true); + bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK); + void publish(Message& msg, const Exchange& exchange, const std::string& routingKey, + bool mandatory = false, bool immediate = false); + + void commit(); + void rollback(); + + void setPrefetch(u_int16_t prefetch); + + /** + * Start message dispatching on a new thread + */ + void start(); + /** + * Do message dispatching on this thread + */ + void run(); + + void close(); + + void setReturnedMessageHandler(ReturnedMessageHandler* handler); + + friend class Connection; + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp new file mode 100644 index 0000000000..779480eae9 --- /dev/null +++ b/cpp/src/qpid/client/Connection.cpp @@ -0,0 +1,237 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/client/Connection.h" +#include "qpid/client/Channel.h" +#include "qpid/io/ConnectorImpl.h" +#include "qpid/client/Message.h" +#include "qpid/QpidError.h" +#include <iostream> + +using namespace qpid::client; +using namespace qpid::framing; +using namespace qpid::io; +using namespace qpid::concurrent; + +u_int16_t Connection::channelIdCounter; + +Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true){ + connector = new ConnectorImpl(debug, _max_frame_size); +} + +Connection::~Connection(){ + delete connector; +} + +void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){ + host = _host; + port = _port; + connector->setInputHandler(this); + connector->setTimeoutHandler(this); + connector->setShutdownHandler(this); + out = connector->getOutputHandler(); + connector->connect(host, port); + + ProtocolInitiation* header = new ProtocolInitiation(8, 0); + responses.expect(); + connector->init(header); + responses.receive(connection_start); + + FieldTable props; + string mechanism("PLAIN"); + string response = ((char)0) + uid + ((char)0) + pwd; + string locale("en_US"); + responses.expect(); + out->send(new AMQFrame(0, new ConnectionStartOkBody(props, mechanism, response, locale))); + + /** + * Assume for now that further challenges will not be required + //receive connection.secure + responses.receive(connection_secure)); + //send connection.secure-ok + out->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); + **/ + + responses.receive(connection_tune); + + ConnectionTuneBody::shared_ptr proposal = std::tr1::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse()); + out->send(new AMQFrame(0, new ConnectionTuneOkBody(proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat()))); + + u_int16_t heartbeat = proposal->getHeartbeat(); + connector->setReadTimeout(heartbeat * 2); + connector->setWriteTimeout(heartbeat); + + //send connection.open + string capabilities; + string vhost = virtualhost; + responses.expect(); + out->send(new AMQFrame(0, new ConnectionOpenBody(vhost, capabilities, true))); + //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true). + responses.waitForResponse(); + if(responses.validate(connection_open_ok)){ + //ok + }else if(responses.validate(connection_redirect)){ + //ignore for now + ConnectionRedirectBody::shared_ptr redirect(std::tr1::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse())); + std::cout << "Received redirection to " << redirect->getHost() << std::endl; + }else{ + THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response"); + } + +} + +void Connection::close(){ + if(!closed){ + u_int16_t code(200); + string text("Ok"); + u_int16_t classId(0); + u_int16_t methodId(0); + + sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, text, classId, methodId)), connection_close_ok); + connector->close(); + } +} + +void Connection::openChannel(Channel* channel){ + channel->con = this; + channel->id = ++channelIdCounter; + channel->out = out; + channels[channel->id] = channel; + //now send frame to open channel and wait for response + string oob; + channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(oob)), channel_open_ok); + channel->setQos(); + channel->closed = false; +} + +void Connection::closeChannel(Channel* channel){ + //send frame to close channel + u_int16_t code(200); + string text("Ok"); + u_int16_t classId(0); + u_int16_t methodId(0); + closeChannel(channel, code, text, classId, methodId); +} + +void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){ + //send frame to close channel + channel->cancelAll(); + channel->closed = true; + channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(code, text, classId, methodId)), channel_close_ok); + channel->con = 0; + channel->out = 0; + removeChannel(channel); +} + +void Connection::removeChannel(Channel* channel){ + //send frame to close channel + + channels.erase(channel->id); + channel->out = 0; + channel->id = 0; + channel->con = 0; +} + +void Connection::received(AMQFrame* frame){ + u_int16_t channelId = frame->getChannel(); + + if(channelId == 0){ + this->handleBody(frame->getBody()); + }else{ + Channel* channel = channels[channelId]; + if(channel == 0){ + error(504, "Unknown channel"); + }else{ + try{ + channel->handleBody(frame->getBody()); + }catch(qpid::QpidError e){ + channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e); + } + } + } +} + +void Connection::handleMethod(AMQMethodBody::shared_ptr body){ + //connection.close, basic.deliver, basic.return or a response to a synchronous request + if(responses.isWaiting()){ + responses.signalResponse(body); + }else if(connection_close.match(body.get())){ + //send back close ok + //close socket + ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get()); + std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl; + connector->close(); + }else{ + std::cout << "Unhandled method for connection: " << *body << std::endl; + error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId()); + } +} + +void Connection::handleHeader(AMQHeaderBody::shared_ptr /*body*/){ + error(504, "Channel error: received header body with channel 0."); +} + +void Connection::handleContent(AMQContentBody::shared_ptr /*body*/){ + error(504, "Channel error: received content body with channel 0."); +} + +void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ +} + +void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){ + responses.expect(); + out->send(frame); + responses.receive(body); +} + +void Connection::error(int code, const string& msg, int classid, int methodid){ + std::cout << "Connection exception generated: " << code << msg; + if(classid || methodid){ + std::cout << " [" << methodid << ":" << classid << "]"; + } + std::cout << std::endl; + sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, msg, classid, methodid)), connection_close_ok); + connector->close(); +} + +void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){ + std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.file << ":" << e.line << ")" << std::endl; + int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500; + string msg = e.msg; + if(method == 0){ + closeChannel(channel, code, msg); + }else{ + closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId()); + } +} + +void Connection::idleIn(){ + std::cout << "Connection timed out due to abscence of heartbeat." << std::endl; + connector->close(); +} + +void Connection::idleOut(){ + out->send(new AMQFrame(0, new AMQHeartbeatBody())); +} + +void Connection::shutdown(){ + closed = true; + //close all channels + for(iterator i = channels.begin(); i != channels.end(); i++){ + i->second->stop(); + } +} diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h new file mode 100644 index 0000000000..0e8f52e88a --- /dev/null +++ b/cpp/src/qpid/client/Connection.h @@ -0,0 +1,105 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <map> +#include <string> + +#ifndef _Connection_ +#define _Connection_ + +#include "qpid/QpidError.h" +#include "qpid/io/Connector.h" +#include "qpid/io/ShutdownHandler.h" +#include "qpid/io/TimeoutHandler.h" + +#include "qpid/framing/amqp_framing.h" +#include "qpid/client/Exchange.h" +#include "qpid/client/IncomingMessage.h" +#include "qpid/client/Message.h" +#include "qpid/client/MessageListener.h" +#include "qpid/client/Queue.h" +#include "qpid/client/ResponseHandler.h" + +namespace qpid { +namespace client { + + class Channel; + + class Connection : public virtual qpid::framing::InputHandler, + public virtual qpid::io::TimeoutHandler, + public virtual qpid::io::ShutdownHandler, + private virtual qpid::framing::BodyHandler{ + + typedef std::map<int, Channel*>::iterator iterator; + + static u_int16_t channelIdCounter; + + std::string host; + int port; + const u_int32_t max_frame_size; + std::map<int, Channel*> channels; + qpid::io::Connector* connector; + qpid::framing::OutputHandler* out; + ResponseHandler responses; + volatile bool closed; + + void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e); + void error(int code, const string& msg, int classid = 0, int methodid = 0); + void closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId = 0, u_int16_t methodId = 0); + void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body); + + virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body); + virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body); + virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body); + virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); + + public: + + Connection(bool debug = false, u_int32_t max_frame_size = 65536); + ~Connection(); + void open(const std::string& host, int port = 5672, + const std::string& uid = "guest", const std::string& pwd = "guest", + const std::string& virtualhost = "/"); + void close(); + void openChannel(Channel* channel); + /* + * Requests that the server close this channel, then removes + * the association to the channel from this connection + */ + void closeChannel(Channel* channel); + /* + * Removes the channel from association with this connection, + * without sending a close request to the server. + */ + void removeChannel(Channel* channel); + + virtual void received(qpid::framing::AMQFrame* frame); + + virtual void idleOut(); + virtual void idleIn(); + + virtual void shutdown(); + + inline u_int32_t getMaxFrameSize(){ return max_frame_size; } + }; + + +} +} + + +#endif diff --git a/cpp/src/qpid/client/Exchange.cpp b/cpp/src/qpid/client/Exchange.cpp new file mode 100644 index 0000000000..078f15c3a7 --- /dev/null +++ b/cpp/src/qpid/client/Exchange.cpp @@ -0,0 +1,30 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/client/Exchange.h" + +qpid::client::Exchange::Exchange(std::string _name, std::string _type) : name(_name), type(_type){} +const std::string& qpid::client::Exchange::getName() const { return name; } +const std::string& qpid::client::Exchange::getType() const { return type; } + +const std::string qpid::client::Exchange::DIRECT_EXCHANGE = "direct"; +const std::string qpid::client::Exchange::TOPIC_EXCHANGE = "topic"; +const std::string qpid::client::Exchange::HEADERS_EXCHANGE = "headers"; + +const qpid::client::Exchange qpid::client::Exchange::DEFAULT_DIRECT_EXCHANGE("amq.direct", DIRECT_EXCHANGE); +const qpid::client::Exchange qpid::client::Exchange::DEFAULT_TOPIC_EXCHANGE("amq.topic", TOPIC_EXCHANGE); +const qpid::client::Exchange qpid::client::Exchange::DEFAULT_HEADERS_EXCHANGE("amq.headers", HEADERS_EXCHANGE); diff --git a/cpp/src/qpid/client/Exchange.h b/cpp/src/qpid/client/Exchange.h new file mode 100644 index 0000000000..66593a41cc --- /dev/null +++ b/cpp/src/qpid/client/Exchange.h @@ -0,0 +1,49 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <string> + +#ifndef _Exchange_ +#define _Exchange_ + +namespace qpid { +namespace client { + + class Exchange{ + const std::string name; + const std::string type; + + public: + + static const std::string DIRECT_EXCHANGE; + static const std::string TOPIC_EXCHANGE; + static const std::string HEADERS_EXCHANGE; + + static const Exchange DEFAULT_DIRECT_EXCHANGE; + static const Exchange DEFAULT_TOPIC_EXCHANGE; + static const Exchange DEFAULT_HEADERS_EXCHANGE; + + Exchange(std::string name, std::string type = DIRECT_EXCHANGE); + const std::string& getName() const; + const std::string& getType() const; + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/client/IncomingMessage.cpp b/cpp/src/qpid/client/IncomingMessage.cpp new file mode 100644 index 0000000000..c385430a27 --- /dev/null +++ b/cpp/src/qpid/client/IncomingMessage.cpp @@ -0,0 +1,85 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/client/IncomingMessage.h" +#include "qpid/QpidError.h" +#include <iostream> + +using namespace qpid::client; +using namespace qpid::framing; + +IncomingMessage::IncomingMessage(BasicDeliverBody::shared_ptr intro) : delivered(intro){} +IncomingMessage::IncomingMessage(BasicReturnBody::shared_ptr intro): returned(intro){} +IncomingMessage::IncomingMessage(BasicGetOkBody::shared_ptr intro): response(intro){} + +IncomingMessage::~IncomingMessage(){ +} + +void IncomingMessage::setHeader(AMQHeaderBody::shared_ptr _header){ + this->header = _header; +} + +void IncomingMessage::addContent(AMQContentBody::shared_ptr _content){ + this->content.push_back(_content); +} + +bool IncomingMessage::isComplete(){ + return header != 0 && header->getContentSize() == contentSize(); +} + +bool IncomingMessage::isReturn(){ + return returned; +} + +bool IncomingMessage::isDelivery(){ + return delivered; +} + +bool IncomingMessage::isResponse(){ + return response; +} + +const string& IncomingMessage::getConsumerTag(){ + if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Consumer tag only valid for delivery"); + return delivered->getConsumerTag(); +} + +u_int64_t IncomingMessage::getDeliveryTag(){ + if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Delivery tag only valid for delivery"); + return delivered->getDeliveryTag(); +} + +AMQHeaderBody::shared_ptr& IncomingMessage::getHeader(){ + return header; +} + +void IncomingMessage::getData(string& s){ + int count(content.size()); + for(int i = 0; i < count; i++){ + if(i == 0) s = content[i]->getData(); + else s += content[i]->getData(); + } +} + +u_int64_t IncomingMessage::contentSize(){ + u_int64_t size(0); + u_int64_t count(content.size()); + for(u_int64_t i = 0; i < count; i++){ + size += content[i]->size(); + } + return size; +} diff --git a/cpp/src/qpid/client/IncomingMessage.h b/cpp/src/qpid/client/IncomingMessage.h new file mode 100644 index 0000000000..fec620066b --- /dev/null +++ b/cpp/src/qpid/client/IncomingMessage.h @@ -0,0 +1,60 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <string> +#include <vector> +#include "qpid/framing/amqp_framing.h" + +#ifndef _IncomingMessage_ +#define _IncomingMessage_ + +#include "qpid/client/Message.h" + +namespace qpid { +namespace client { + + class IncomingMessage{ + //content will be preceded by one of these method frames + qpid::framing::BasicDeliverBody::shared_ptr delivered; + qpid::framing::BasicReturnBody::shared_ptr returned; + qpid::framing::BasicGetOkBody::shared_ptr response; + qpid::framing::AMQHeaderBody::shared_ptr header; + std::vector<qpid::framing::AMQContentBody::shared_ptr> content; + + u_int64_t contentSize(); + public: + IncomingMessage(qpid::framing::BasicDeliverBody::shared_ptr intro); + IncomingMessage(qpid::framing::BasicReturnBody::shared_ptr intro); + IncomingMessage(qpid::framing::BasicGetOkBody::shared_ptr intro); + ~IncomingMessage(); + void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); + void addContent(qpid::framing::AMQContentBody::shared_ptr content); + bool isComplete(); + bool isReturn(); + bool isDelivery(); + bool isResponse(); + const string& getConsumerTag();//only relevant if isDelivery() + qpid::framing::AMQHeaderBody::shared_ptr& getHeader(); + u_int64_t getDeliveryTag(); + void getData(string& data); + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/client/Message.cpp b/cpp/src/qpid/client/Message.cpp new file mode 100644 index 0000000000..34220b595d --- /dev/null +++ b/cpp/src/qpid/client/Message.cpp @@ -0,0 +1,147 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/client/Message.h" + +using namespace qpid::client; +using namespace qpid::framing; + +Message::Message(){ + header = AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)); +} + +Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){ +} + +Message::~Message(){ +} + +BasicHeaderProperties* Message::getHeaderProperties(){ + return dynamic_cast<BasicHeaderProperties*>(header->getProperties()); +} + +std::string& Message::getContentType(){ + return getHeaderProperties()->getContentType(); +} + +std::string& Message::getContentEncoding(){ + return getHeaderProperties()->getContentEncoding(); +} + +FieldTable& Message::getHeaders(){ + return getHeaderProperties()->getHeaders(); +} + +u_int8_t Message::getDeliveryMode(){ + return getHeaderProperties()->getDeliveryMode(); +} + +u_int8_t Message::getPriority(){ + return getHeaderProperties()->getPriority(); +} + +std::string& Message::getCorrelationId(){ + return getHeaderProperties()->getCorrelationId(); +} + +std::string& Message::getReplyTo(){ + return getHeaderProperties()->getReplyTo(); +} + +std::string& Message::getExpiration(){ + return getHeaderProperties()->getExpiration(); +} + +std::string& Message::getMessageId(){ + return getHeaderProperties()->getMessageId(); +} + +u_int64_t Message::getTimestamp(){ + return getHeaderProperties()->getTimestamp(); +} + +std::string& Message::getType(){ + return getHeaderProperties()->getType(); +} + +std::string& Message::getUserId(){ + return getHeaderProperties()->getUserId(); +} + +std::string& Message::getAppId(){ + return getHeaderProperties()->getAppId(); +} + +std::string& Message::getClusterId(){ + return getHeaderProperties()->getClusterId(); +} + +void Message::setContentType(std::string& type){ + getHeaderProperties()->setContentType(type); +} + +void Message::setContentEncoding(std::string& encoding){ + getHeaderProperties()->setContentEncoding(encoding); +} + +void Message::setHeaders(FieldTable& headers){ + getHeaderProperties()->setHeaders(headers); +} + +void Message::setDeliveryMode(u_int8_t mode){ + getHeaderProperties()->setDeliveryMode(mode); +} + +void Message::setPriority(u_int8_t priority){ + getHeaderProperties()->setPriority(priority); +} + +void Message::setCorrelationId(std::string& correlationId){ + getHeaderProperties()->setCorrelationId(correlationId); +} + +void Message::setReplyTo(std::string& replyTo){ + getHeaderProperties()->setReplyTo(replyTo); +} + +void Message::setExpiration(std::string& expiration){ + getHeaderProperties()->setExpiration(expiration); +} + +void Message::setMessageId(std::string& messageId){ + getHeaderProperties()->setMessageId(messageId); +} + +void Message::setTimestamp(u_int64_t timestamp){ + getHeaderProperties()->setTimestamp(timestamp); +} + +void Message::setType(std::string& type){ + getHeaderProperties()->setType(type); +} + +void Message::setUserId(std::string& userId){ + getHeaderProperties()->setUserId(userId); +} + +void Message::setAppId(std::string& appId){ + getHeaderProperties()->setAppId(appId); +} + +void Message::setClusterId(std::string& clusterId){ + getHeaderProperties()->setClusterId(clusterId); +} diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h new file mode 100644 index 0000000000..5b1c1d5de7 --- /dev/null +++ b/cpp/src/qpid/client/Message.h @@ -0,0 +1,86 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <string> +#include "qpid/framing/amqp_framing.h" + +#ifndef _Message_ +#define _Message_ + + +namespace qpid { +namespace client { + + class Message{ + qpid::framing::AMQHeaderBody::shared_ptr header; + string data; + bool redelivered; + u_int64_t deliveryTag; + + qpid::framing::BasicHeaderProperties* getHeaderProperties(); + Message(qpid::framing::AMQHeaderBody::shared_ptr& header); + public: + Message(); + ~Message(); + + inline std::string getData(){ return data; } + inline void setData(const std::string& _data){ data = _data; } + + inline bool isRedelivered(){ return redelivered; } + inline void setRedelivered(bool _redelivered){ redelivered = _redelivered; } + + inline u_int64_t getDeliveryTag(){ return deliveryTag; } + + std::string& getContentType(); + std::string& getContentEncoding(); + qpid::framing::FieldTable& getHeaders(); + u_int8_t getDeliveryMode(); + u_int8_t getPriority(); + std::string& getCorrelationId(); + std::string& getReplyTo(); + std::string& getExpiration(); + std::string& getMessageId(); + u_int64_t getTimestamp(); + std::string& getType(); + std::string& getUserId(); + std::string& getAppId(); + std::string& getClusterId(); + + void setContentType(std::string& type); + void setContentEncoding(std::string& encoding); + void setHeaders(qpid::framing::FieldTable& headers); + void setDeliveryMode(u_int8_t mode); + void setPriority(u_int8_t priority); + void setCorrelationId(std::string& correlationId); + void setReplyTo(std::string& replyTo); + void setExpiration(std::string& expiration); + void setMessageId(std::string& messageId); + void setTimestamp(u_int64_t timestamp); + void setType(std::string& type); + void setUserId(std::string& userId); + void setAppId(std::string& appId); + void setClusterId(std::string& clusterId); + + + friend class Channel; + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/client/MessageListener.cpp b/cpp/src/qpid/client/MessageListener.cpp new file mode 100644 index 0000000000..f3bcb24c1a --- /dev/null +++ b/cpp/src/qpid/client/MessageListener.cpp @@ -0,0 +1,21 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/client/MessageListener.h" + +qpid::client::MessageListener::~MessageListener() {} diff --git a/cpp/src/qpid/client/MessageListener.h b/cpp/src/qpid/client/MessageListener.h new file mode 100644 index 0000000000..a28e26a8d2 --- /dev/null +++ b/cpp/src/qpid/client/MessageListener.h @@ -0,0 +1,38 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <string> + +#ifndef _MessageListener_ +#define _MessageListener_ + +#include "qpid/client/Message.h" + +namespace qpid { +namespace client { + + class MessageListener{ + public: + virtual ~MessageListener(); + virtual void received(Message& msg) = 0; + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/client/Queue.cpp b/cpp/src/qpid/client/Queue.cpp new file mode 100644 index 0000000000..dc752c9fb2 --- /dev/null +++ b/cpp/src/qpid/client/Queue.cpp @@ -0,0 +1,47 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/client/Queue.h" + +qpid::client::Queue::Queue() : name(""), autodelete(true), exclusive(true){} + +qpid::client::Queue::Queue(std::string _name) : name(_name), autodelete(false), exclusive(false){} + +qpid::client::Queue::Queue(std::string _name, bool temp) : name(_name), autodelete(temp), exclusive(temp){} + +qpid::client::Queue::Queue(std::string _name, bool _autodelete, bool _exclusive) + : name(_name), autodelete(_autodelete), exclusive(_exclusive){} + +const std::string& qpid::client::Queue::getName() const{ + return name; +} + +void qpid::client::Queue::setName(const std::string& _name){ + name = _name; +} + +bool qpid::client::Queue::isAutoDelete() const{ + return autodelete; +} + +bool qpid::client::Queue::isExclusive() const{ + return exclusive; +} + + + + diff --git a/cpp/src/qpid/client/Queue.h b/cpp/src/qpid/client/Queue.h new file mode 100644 index 0000000000..e0964af774 --- /dev/null +++ b/cpp/src/qpid/client/Queue.h @@ -0,0 +1,47 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <string> + +#ifndef _Queue_ +#define _Queue_ + +namespace qpid { +namespace client { + + class Queue{ + std::string name; + const bool autodelete; + const bool exclusive; + + public: + + Queue(); + Queue(std::string name); + Queue(std::string name, bool temp); + Queue(std::string name, bool autodelete, bool exclusive); + const std::string& getName() const; + void setName(const std::string&); + bool isAutoDelete() const; + bool isExclusive() const; + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/client/ResponseHandler.cpp b/cpp/src/qpid/client/ResponseHandler.cpp new file mode 100644 index 0000000000..ec20dd1a10 --- /dev/null +++ b/cpp/src/qpid/client/ResponseHandler.cpp @@ -0,0 +1,63 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/client/ResponseHandler.h" +#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/QpidError.h" + +qpid::client::ResponseHandler::ResponseHandler() : waiting(false){ + monitor = new qpid::concurrent::MonitorImpl(); +} + +qpid::client::ResponseHandler::~ResponseHandler(){ + delete monitor; +} + +bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){ + return expected.match(response.get()); +} + +void qpid::client::ResponseHandler::waitForResponse(){ + monitor->acquire(); + if(waiting){ + monitor->wait(); + } + monitor->release(); +} + +void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr _response){ + response = _response; + monitor->acquire(); + waiting = false; + monitor->notify(); + monitor->release(); +} + +void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){ + monitor->acquire(); + if(waiting){ + monitor->wait(); + } + monitor->release(); + if(!validate(expected)){ + THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error"); + } +} + +void qpid::client::ResponseHandler::expect(){ + waiting = true; +} diff --git a/cpp/src/qpid/client/ResponseHandler.h b/cpp/src/qpid/client/ResponseHandler.h new file mode 100644 index 0000000000..179daa1f1f --- /dev/null +++ b/cpp/src/qpid/client/ResponseHandler.h @@ -0,0 +1,49 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <string> +#include "qpid/framing/amqp_framing.h" +#include "qpid/concurrent/Monitor.h" + +#ifndef _ResponseHandler_ +#define _ResponseHandler_ + +namespace qpid { + namespace client { + + class ResponseHandler{ + bool waiting; + qpid::framing::AMQMethodBody::shared_ptr response; + qpid::concurrent::Monitor* monitor; + + public: + ResponseHandler(); + ~ResponseHandler(); + inline bool isWaiting(){ return waiting; } + inline qpid::framing::AMQMethodBody::shared_ptr getResponse(){ return response; } + bool validate(const qpid::framing::AMQMethodBody& expected); + void waitForResponse(); + void signalResponse(qpid::framing::AMQMethodBody::shared_ptr response); + void receive(const qpid::framing::AMQMethodBody& expected); + void expect();//must be called before calling receive + }; + + } +} + + +#endif diff --git a/cpp/src/qpid/client/ReturnedMessageHandler.cpp b/cpp/src/qpid/client/ReturnedMessageHandler.cpp new file mode 100644 index 0000000000..ca6fba2408 --- /dev/null +++ b/cpp/src/qpid/client/ReturnedMessageHandler.cpp @@ -0,0 +1,21 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/client/ReturnedMessageHandler.h" + +qpid::client::ReturnedMessageHandler::~ReturnedMessageHandler() {} diff --git a/cpp/src/qpid/client/ReturnedMessageHandler.h b/cpp/src/qpid/client/ReturnedMessageHandler.h new file mode 100644 index 0000000000..5a4c4d5d1b --- /dev/null +++ b/cpp/src/qpid/client/ReturnedMessageHandler.h @@ -0,0 +1,38 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <string> + +#ifndef _ReturnedMessageHandler_ +#define _ReturnedMessageHandler_ + +#include "qpid/client/Message.h" + +namespace qpid { +namespace client { + + class ReturnedMessageHandler{ + public: + virtual ~ReturnedMessageHandler(); + virtual void returned(Message& msg) = 0; + }; + +} +} + + +#endif |
