summaryrefslogtreecommitdiff
path: root/cpp/client
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/client')
-rw-r--r--cpp/client/Makefile41
-rw-r--r--cpp/client/inc/Channel.h127
-rw-r--r--cpp/client/inc/Connection.h105
-rw-r--r--cpp/client/inc/Exchange.h49
-rw-r--r--cpp/client/inc/IncomingMessage.h60
-rw-r--r--cpp/client/inc/Message.h86
-rw-r--r--cpp/client/inc/MessageListener.h38
-rw-r--r--cpp/client/inc/Queue.h47
-rw-r--r--cpp/client/inc/ResponseHandler.h49
-rw-r--r--cpp/client/inc/ReturnedMessageHandler.h38
-rw-r--r--cpp/client/src/Channel.cpp438
-rw-r--r--cpp/client/src/Connection.cpp237
-rw-r--r--cpp/client/src/Exchange.cpp30
-rw-r--r--cpp/client/src/IncomingMessage.cpp85
-rw-r--r--cpp/client/src/Message.cpp147
-rw-r--r--cpp/client/src/MessageListener.cpp21
-rw-r--r--cpp/client/src/Queue.cpp47
-rw-r--r--cpp/client/src/ResponseHandler.cpp63
-rw-r--r--cpp/client/src/ReturnedMessageHandler.cpp21
-rw-r--r--cpp/client/test/Makefile45
-rw-r--r--cpp/client/test/client_test.cpp97
-rw-r--r--cpp/client/test/topic_listener.cpp180
-rw-r--r--cpp/client/test/topic_publisher.cpp253
23 files changed, 0 insertions, 2304 deletions
diff --git a/cpp/client/Makefile b/cpp/client/Makefile
deleted file mode 100644
index f3c8b11a7a..0000000000
--- a/cpp/client/Makefile
+++ /dev/null
@@ -1,41 +0,0 @@
-#
-# Copyright (c) 2006 The Apache Software Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-#
-# Build client library.
-#
-
-QPID_HOME = ../..
-include ${QPID_HOME}/cpp/options.mk
-
-SOURCES := $(wildcard src/*.cpp)
-OBJECTS := $(subst .cpp,.o,$(SOURCES))
-CLIENT_LIB=$(LIB_DIR)/libqpid_client.so.1.0
-
-.PHONY: all clean
-
-all: $(CLIENT_LIB)
- @$(MAKE) -C test all
-
-clean:
- -@rm -f $(CLIENT_LIB) $(OBJECTS) src/*.d
- $(MAKE) -C test clean
-
-$(CLIENT_LIB): $(OBJECTS)
- $(CXX) -shared -o $@ $^ $(LDFLAGS) $(COMMON_LIB)
-
-# Dependencies
--include $(SOURCES:.cpp=.d)
diff --git a/cpp/client/inc/Channel.h b/cpp/client/inc/Channel.h
deleted file mode 100644
index debecf922e..0000000000
--- a/cpp/client/inc/Channel.h
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <map>
-#include <string>
-#include <queue>
-#include "sys/types.h"
-
-#ifndef _Channel_
-#define _Channel_
-
-#include "amqp_framing.h"
-
-#include "ThreadFactory.h"
-
-#include "Connection.h"
-#include "Exchange.h"
-#include "IncomingMessage.h"
-#include "Message.h"
-#include "MessageListener.h"
-#include "Queue.h"
-#include "ResponseHandler.h"
-#include "ReturnedMessageHandler.h"
-
-namespace qpid {
-namespace client {
- enum ack_modes {NO_ACK=0, AUTO_ACK=1, LAZY_ACK=2, CLIENT_ACK=3};
-
- class Channel : private virtual qpid::framing::BodyHandler, public virtual qpid::concurrent::Runnable{
- struct Consumer{
- MessageListener* listener;
- int ackMode;
- int count;
- u_int64_t lastDeliveryTag;
- };
- typedef std::map<string,Consumer*>::iterator consumer_iterator;
-
- u_int16_t id;
- Connection* con;
- qpid::concurrent::ThreadFactory* threadFactory;
- qpid::concurrent::Thread* dispatcher;
- qpid::framing::OutputHandler* out;
- IncomingMessage* incoming;
- ResponseHandler responses;
- std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
- IncomingMessage* retrieved;//holds response to basic.get
- qpid::concurrent::Monitor* dispatchMonitor;
- qpid::concurrent::Monitor* retrievalMonitor;
- std::map<std::string, Consumer*> consumers;
- ReturnedMessageHandler* returnsHandler;
- bool closed;
-
- u_int16_t prefetch;
- const bool transactional;
-
- void enqueue();
- void retrieve(Message& msg);
- IncomingMessage* dequeue();
- void dispatch();
- void stop();
- void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body);
- void deliver(Consumer* consumer, Message& msg);
- void setQos();
- void cancelAll();
-
- virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body);
- virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body);
- virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body);
- virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
-
- public:
- Channel(bool transactional = false, u_int16_t prefetch = 500);
- ~Channel();
-
- void declareExchange(Exchange& exchange, bool synch = true);
- void deleteExchange(Exchange& exchange, bool synch = true);
- void declareQueue(Queue& queue, bool synch = true);
- void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true);
- void bind(const Exchange& exchange, const Queue& queue, const std::string& key,
- const qpid::framing::FieldTable& args, bool synch = true);
- void consume(Queue& queue, std::string& tag, MessageListener* listener,
- int ackMode = NO_ACK, bool noLocal = false, bool synch = true);
- void cancel(std::string& tag, bool synch = true);
- bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK);
- void publish(Message& msg, const Exchange& exchange, const std::string& routingKey,
- bool mandatory = false, bool immediate = false);
-
- void commit();
- void rollback();
-
- void setPrefetch(u_int16_t prefetch);
-
- /**
- * Start message dispatching on a new thread
- */
- void start();
- /**
- * Do message dispatching on this thread
- */
- void run();
-
- void close();
-
- void setReturnedMessageHandler(ReturnedMessageHandler* handler);
-
- friend class Connection;
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/client/inc/Connection.h b/cpp/client/inc/Connection.h
deleted file mode 100644
index 89169e92b1..0000000000
--- a/cpp/client/inc/Connection.h
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <map>
-#include <string>
-
-#ifndef _Connection_
-#define _Connection_
-
-#include "QpidError.h"
-#include "Connector.h"
-#include "ShutdownHandler.h"
-#include "TimeoutHandler.h"
-
-#include "amqp_framing.h"
-#include "Exchange.h"
-#include "IncomingMessage.h"
-#include "Message.h"
-#include "MessageListener.h"
-#include "Queue.h"
-#include "ResponseHandler.h"
-
-namespace qpid {
-namespace client {
-
- class Channel;
-
- class Connection : public virtual qpid::framing::InputHandler,
- public virtual qpid::io::TimeoutHandler,
- public virtual qpid::io::ShutdownHandler,
- private virtual qpid::framing::BodyHandler{
-
- typedef std::map<int, Channel*>::iterator iterator;
-
- static u_int16_t channelIdCounter;
-
- std::string host;
- int port;
- const u_int32_t max_frame_size;
- std::map<int, Channel*> channels;
- qpid::io::Connector* connector;
- qpid::framing::OutputHandler* out;
- ResponseHandler responses;
- volatile bool closed;
-
- void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e);
- void error(int code, const string& msg, int classid = 0, int methodid = 0);
- void closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId = 0, u_int16_t methodId = 0);
- void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body);
-
- virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body);
- virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body);
- virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body);
- virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
-
- public:
-
- Connection(bool debug = false, u_int32_t max_frame_size = 65536);
- ~Connection();
- void open(const std::string& host, int port = 5672,
- const std::string& uid = "guest", const std::string& pwd = "guest",
- const std::string& virtualhost = "/");
- void close();
- void openChannel(Channel* channel);
- /*
- * Requests that the server close this channel, then removes
- * the association to the channel from this connection
- */
- void closeChannel(Channel* channel);
- /*
- * Removes the channel from association with this connection,
- * without sending a close request to the server.
- */
- void removeChannel(Channel* channel);
-
- virtual void received(qpid::framing::AMQFrame* frame);
-
- virtual void idleOut();
- virtual void idleIn();
-
- virtual void shutdown();
-
- inline u_int32_t getMaxFrameSize(){ return max_frame_size; }
- };
-
-
-}
-}
-
-
-#endif
diff --git a/cpp/client/inc/Exchange.h b/cpp/client/inc/Exchange.h
deleted file mode 100644
index 66593a41cc..0000000000
--- a/cpp/client/inc/Exchange.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <string>
-
-#ifndef _Exchange_
-#define _Exchange_
-
-namespace qpid {
-namespace client {
-
- class Exchange{
- const std::string name;
- const std::string type;
-
- public:
-
- static const std::string DIRECT_EXCHANGE;
- static const std::string TOPIC_EXCHANGE;
- static const std::string HEADERS_EXCHANGE;
-
- static const Exchange DEFAULT_DIRECT_EXCHANGE;
- static const Exchange DEFAULT_TOPIC_EXCHANGE;
- static const Exchange DEFAULT_HEADERS_EXCHANGE;
-
- Exchange(std::string name, std::string type = DIRECT_EXCHANGE);
- const std::string& getName() const;
- const std::string& getType() const;
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/client/inc/IncomingMessage.h b/cpp/client/inc/IncomingMessage.h
deleted file mode 100644
index 90fed17ee0..0000000000
--- a/cpp/client/inc/IncomingMessage.h
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <string>
-#include <vector>
-#include "amqp_framing.h"
-
-#ifndef _IncomingMessage_
-#define _IncomingMessage_
-
-#include "Message.h"
-
-namespace qpid {
-namespace client {
-
- class IncomingMessage{
- //content will be preceded by one of these method frames
- qpid::framing::BasicDeliverBody::shared_ptr delivered;
- qpid::framing::BasicReturnBody::shared_ptr returned;
- qpid::framing::BasicGetOkBody::shared_ptr response;
- qpid::framing::AMQHeaderBody::shared_ptr header;
- std::vector<qpid::framing::AMQContentBody::shared_ptr> content;
-
- u_int64_t contentSize();
- public:
- IncomingMessage(qpid::framing::BasicDeliverBody::shared_ptr intro);
- IncomingMessage(qpid::framing::BasicReturnBody::shared_ptr intro);
- IncomingMessage(qpid::framing::BasicGetOkBody::shared_ptr intro);
- ~IncomingMessage();
- void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
- void addContent(qpid::framing::AMQContentBody::shared_ptr content);
- bool isComplete();
- bool isReturn();
- bool isDelivery();
- bool isResponse();
- const string& getConsumerTag();//only relevant if isDelivery()
- qpid::framing::AMQHeaderBody::shared_ptr& getHeader();
- u_int64_t getDeliveryTag();
- void getData(string& data);
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/client/inc/Message.h b/cpp/client/inc/Message.h
deleted file mode 100644
index fc3ec34bcf..0000000000
--- a/cpp/client/inc/Message.h
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <string>
-#include "amqp_framing.h"
-
-#ifndef _Message_
-#define _Message_
-
-
-namespace qpid {
-namespace client {
-
- class Message{
- qpid::framing::AMQHeaderBody::shared_ptr header;
- string data;
- bool redelivered;
- u_int64_t deliveryTag;
-
- qpid::framing::BasicHeaderProperties* getHeaderProperties();
- Message(qpid::framing::AMQHeaderBody::shared_ptr& header);
- public:
- Message();
- ~Message();
-
- inline std::string getData(){ return data; }
- inline void setData(const std::string& _data){ data = _data; }
-
- inline bool isRedelivered(){ return redelivered; }
- inline void setRedelivered(bool _redelivered){ redelivered = _redelivered; }
-
- inline u_int64_t getDeliveryTag(){ return deliveryTag; }
-
- std::string& getContentType();
- std::string& getContentEncoding();
- qpid::framing::FieldTable& getHeaders();
- u_int8_t getDeliveryMode();
- u_int8_t getPriority();
- std::string& getCorrelationId();
- std::string& getReplyTo();
- std::string& getExpiration();
- std::string& getMessageId();
- u_int64_t getTimestamp();
- std::string& getType();
- std::string& getUserId();
- std::string& getAppId();
- std::string& getClusterId();
-
- void setContentType(std::string& type);
- void setContentEncoding(std::string& encoding);
- void setHeaders(qpid::framing::FieldTable& headers);
- void setDeliveryMode(u_int8_t mode);
- void setPriority(u_int8_t priority);
- void setCorrelationId(std::string& correlationId);
- void setReplyTo(std::string& replyTo);
- void setExpiration(std::string& expiration);
- void setMessageId(std::string& messageId);
- void setTimestamp(u_int64_t timestamp);
- void setType(std::string& type);
- void setUserId(std::string& userId);
- void setAppId(std::string& appId);
- void setClusterId(std::string& clusterId);
-
-
- friend class Channel;
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/client/inc/MessageListener.h b/cpp/client/inc/MessageListener.h
deleted file mode 100644
index 81b80ff5cb..0000000000
--- a/cpp/client/inc/MessageListener.h
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <string>
-
-#ifndef _MessageListener_
-#define _MessageListener_
-
-#include "Message.h"
-
-namespace qpid {
-namespace client {
-
- class MessageListener{
- public:
- virtual ~MessageListener();
- virtual void received(Message& msg) = 0;
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/client/inc/Queue.h b/cpp/client/inc/Queue.h
deleted file mode 100644
index e0964af774..0000000000
--- a/cpp/client/inc/Queue.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <string>
-
-#ifndef _Queue_
-#define _Queue_
-
-namespace qpid {
-namespace client {
-
- class Queue{
- std::string name;
- const bool autodelete;
- const bool exclusive;
-
- public:
-
- Queue();
- Queue(std::string name);
- Queue(std::string name, bool temp);
- Queue(std::string name, bool autodelete, bool exclusive);
- const std::string& getName() const;
- void setName(const std::string&);
- bool isAutoDelete() const;
- bool isExclusive() const;
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/client/inc/ResponseHandler.h b/cpp/client/inc/ResponseHandler.h
deleted file mode 100644
index f5392c954d..0000000000
--- a/cpp/client/inc/ResponseHandler.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <string>
-#include "amqp_framing.h"
-#include "Monitor.h"
-
-#ifndef _ResponseHandler_
-#define _ResponseHandler_
-
-namespace qpid {
- namespace client {
-
- class ResponseHandler{
- bool waiting;
- qpid::framing::AMQMethodBody::shared_ptr response;
- qpid::concurrent::Monitor* monitor;
-
- public:
- ResponseHandler();
- ~ResponseHandler();
- inline bool isWaiting(){ return waiting; }
- inline qpid::framing::AMQMethodBody::shared_ptr getResponse(){ return response; }
- bool validate(const qpid::framing::AMQMethodBody& expected);
- void waitForResponse();
- void signalResponse(qpid::framing::AMQMethodBody::shared_ptr response);
- void receive(const qpid::framing::AMQMethodBody& expected);
- void expect();//must be called before calling receive
- };
-
- }
-}
-
-
-#endif
diff --git a/cpp/client/inc/ReturnedMessageHandler.h b/cpp/client/inc/ReturnedMessageHandler.h
deleted file mode 100644
index 0d30d0ab32..0000000000
--- a/cpp/client/inc/ReturnedMessageHandler.h
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <string>
-
-#ifndef _ReturnedMessageHandler_
-#define _ReturnedMessageHandler_
-
-#include "Message.h"
-
-namespace qpid {
-namespace client {
-
- class ReturnedMessageHandler{
- public:
- virtual ~ReturnedMessageHandler();
- virtual void returned(Message& msg) = 0;
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/client/src/Channel.cpp b/cpp/client/src/Channel.cpp
deleted file mode 100644
index cf2f5bc081..0000000000
--- a/cpp/client/src/Channel.cpp
+++ /dev/null
@@ -1,438 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "Channel.h"
-#include "MonitorImpl.h"
-#include "ThreadFactoryImpl.h"
-#include "Message.h"
-#include "QpidError.h"
-
-using namespace std::tr1;//to use dynamic_pointer_cast
-using namespace qpid::client;
-using namespace qpid::framing;
-using namespace qpid::concurrent;
-
-Channel::Channel(bool _transactional, u_int16_t _prefetch) :
- id(0),
- con(0),
- dispatcher(0),
- out(0),
- incoming(0),
- closed(true),
- prefetch(_prefetch),
- transactional(_transactional)
-{
- threadFactory = new ThreadFactoryImpl();
- dispatchMonitor = new MonitorImpl();
- retrievalMonitor = new MonitorImpl();
-}
-
-Channel::~Channel(){
- if(dispatcher){
- stop();
- delete dispatcher;
- }
- delete retrievalMonitor;
- delete dispatchMonitor;
- delete threadFactory;
-}
-
-void Channel::setPrefetch(u_int16_t _prefetch){
- prefetch = _prefetch;
- if(con != 0 && out != 0){
- setQos();
- }
-}
-
-void Channel::setQos(){
- sendAndReceive(new AMQFrame(id, new BasicQosBody(0, prefetch, false)), basic_qos_ok);
- if(transactional){
- sendAndReceive(new AMQFrame(id, new TxSelectBody()), tx_select_ok);
- }
-}
-
-void Channel::declareExchange(Exchange& exchange, bool synch){
- string name = exchange.getName();
- string type = exchange.getType();
- FieldTable args;
- AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(0, name, type, false, false, false, false, !synch, args));
- if(synch){
- sendAndReceive(frame, exchange_declare_ok);
- }else{
- out->send(frame);
- }
-}
-
-void Channel::deleteExchange(Exchange& exchange, bool synch){
- string name = exchange.getName();
- AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(0, name, false, !synch));
- if(synch){
- sendAndReceive(frame, exchange_delete_ok);
- }else{
- out->send(frame);
- }
-}
-
-void Channel::declareQueue(Queue& queue, bool synch){
- string name = queue.getName();
- FieldTable args;
- AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(0, name, false, false,
- queue.isExclusive(),
- queue.isAutoDelete(), !synch, args));
- if(synch){
- sendAndReceive(frame, queue_declare_ok);
- if(queue.getName().length() == 0){
- QueueDeclareOkBody::shared_ptr response =
- dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse());
- queue.setName(response->getQueue());
- }
- }else{
- out->send(frame);
- }
-}
-
-void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
- //ticket, queue, ifunused, ifempty, nowait
- string name = queue.getName();
- AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(0, name, ifunused, ifempty, !synch));
- if(synch){
- sendAndReceive(frame, queue_delete_ok);
- }else{
- out->send(frame);
- }
-}
-
-void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
- string e = exchange.getName();
- string q = queue.getName();
- // TODO aconway 2006-10-10: not const correct, get rid of const_cast.
- //
- AMQFrame* frame = new AMQFrame(id, new QueueBindBody(0, q, e, key,!synch, const_cast<FieldTable&>(args)));
- if(synch){
- sendAndReceive(frame, queue_bind_ok);
- }else{
- out->send(frame);
- }
-}
-
-void Channel::consume(Queue& queue, std::string& tag, MessageListener* listener,
- int ackMode, bool noLocal, bool synch){
-
- string q = queue.getName();
- AMQFrame* frame = new AMQFrame(id, new BasicConsumeBody(0, q, (string&) tag, noLocal, ackMode == NO_ACK, false, !synch));
- if(synch){
- sendAndReceive(frame, basic_consume_ok);
- BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse());
- tag = response->getConsumerTag();
- }else{
- out->send(frame);
- }
- Consumer* c = new Consumer();
- c->listener = listener;
- c->ackMode = ackMode;
- c->lastDeliveryTag = 0;
- consumers[tag] = c;
-}
-
-void Channel::cancel(std::string& tag, bool synch){
- Consumer* c = consumers[tag];
- if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){
- out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true)));
- }
-
- AMQFrame* frame = new AMQFrame(id, new BasicCancelBody((string&) tag, !synch));
- if(synch){
- sendAndReceive(frame, basic_cancel_ok);
- }else{
- out->send(frame);
- }
- consumers.erase(tag);
- if(c != 0){
- delete c;
- }
-}
-
-void Channel::cancelAll(){
- for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){
- Consumer* c = i->second;
- if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){
- out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true)));
- }
- consumers.erase(i);
- delete c;
- }
-}
-
-void Channel::retrieve(Message& msg){
- retrievalMonitor->acquire();
- while(retrieved == 0){
- retrievalMonitor->wait();
- }
-
- msg.header = retrieved->getHeader();
- msg.deliveryTag = retrieved->getDeliveryTag();
- retrieved->getData(msg.data);
- delete retrieved;
- retrieved = 0;
-
- retrievalMonitor->release();
-}
-
-bool Channel::get(Message& msg, const Queue& queue, int ackMode){
- string name = queue.getName();
- AMQFrame* frame = new AMQFrame(id, new BasicGetBody(0, name, ackMode));
- responses.expect();
- out->send(frame);
- responses.waitForResponse();
- AMQMethodBody::shared_ptr response = responses.getResponse();
- if(basic_get_ok.match(response.get())){
- if(incoming != 0){
- std::cout << "Existing message not complete" << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
- }else{
- incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response));
- }
- retrieve(msg);
- return true;
- }if(basic_get_empty.match(response.get())){
- return false;
- }else{
- THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get.");
- }
-}
-
-
-void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){
- string e = exchange.getName();
- string key = routingKey;
-
- out->send(new AMQFrame(id, new BasicPublishBody(0, e, key, mandatory, immediate)));
- //break msg up into header frame and content frame(s) and send these
- string data = msg.getData();
- msg.header->setContentSize(data.length());
- AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header));
- out->send(new AMQFrame(id, body));
-
- int data_length = data.length();
- if(data_length > 0){
- //TODO fragmentation of messages, need to know max frame size for connection
- int frag_size = con->getMaxFrameSize() - 4;
- if(data_length < frag_size){
- out->send(new AMQFrame(id, new AMQContentBody(data)));
- }else{
- int frag_count = data_length / frag_size;
- for(int i = 0; i < frag_count; i++){
- int pos = i*frag_size;
- int len = i < frag_count - 1 ? frag_size : data_length - pos;
- string frag(data.substr(pos, len));
- out->send(new AMQFrame(id, new AMQContentBody(frag)));
- }
- }
- }
-}
-
-void Channel::commit(){
- AMQFrame* frame = new AMQFrame(id, new TxCommitBody());
- sendAndReceive(frame, tx_commit_ok);
-}
-
-void Channel::rollback(){
- AMQFrame* frame = new AMQFrame(id, new TxRollbackBody());
- sendAndReceive(frame, tx_rollback_ok);
-}
-
-void Channel::handleMethod(AMQMethodBody::shared_ptr body){
- //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request
- if(responses.isWaiting()){
- responses.signalResponse(body);
- }else if(basic_deliver.match(body.get())){
- if(incoming != 0){
- std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
- }else{
- incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body));
- }
- }else if(basic_return.match(body.get())){
- if(incoming != 0){
- std::cout << "Existing message not complete" << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
- }else{
- incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body));
- }
- }else if(channel_close.match(body.get())){
- con->removeChannel(this);
- //need to signal application that channel has been closed through exception
-
- }else if(channel_flow.match(body.get())){
-
- }else{
- //signal error
- std::cout << "Unhandled method: " << *body << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Unhandled method");
- }
-}
-
-void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
- if(incoming == 0){
- //handle invalid frame sequence
- std::cout << "Invalid message sequence: got header before return or deliver." << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver.");
- }else{
- incoming->setHeader(body);
- if(incoming->isComplete()){
- enqueue();
- }
- }
-}
-
-void Channel::handleContent(AMQContentBody::shared_ptr body){
- if(incoming == 0){
- //handle invalid frame sequence
- std::cout << "Invalid message sequence: got content before return or deliver." << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver.");
- }else{
- incoming->addContent(body);
- if(incoming->isComplete()){
- enqueue();
- }
- }
-}
-
-void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat");
-}
-
-void Channel::start(){
- dispatcher = threadFactory->create(this);
- dispatcher->start();
-}
-
-void Channel::stop(){
- closed = true;
- dispatchMonitor->acquire();
- dispatchMonitor->notify();
- dispatchMonitor->release();
- if(dispatcher){
- dispatcher->join();
- }
-}
-
-void Channel::run(){
- dispatch();
-}
-
-void Channel::enqueue(){
- if(incoming->isResponse()){
- retrievalMonitor->acquire();
- retrieved = incoming;
- retrievalMonitor->notify();
- retrievalMonitor->release();
- }else{
- dispatchMonitor->acquire();
- messages.push(incoming);
- dispatchMonitor->notify();
- dispatchMonitor->release();
- }
- incoming = 0;
-}
-
-IncomingMessage* Channel::dequeue(){
- dispatchMonitor->acquire();
- while(messages.empty() && !closed){
- dispatchMonitor->wait();
- }
- IncomingMessage* msg = 0;
- if(!messages.empty()){
- msg = messages.front();
- messages.pop();
- }
- dispatchMonitor->release();
- return msg;
-}
-
-void Channel::deliver(Consumer* consumer, Message& msg){
- //record delivery tag:
- consumer->lastDeliveryTag = msg.getDeliveryTag();
-
- //allow registered listener to handle the message
- consumer->listener->received(msg);
-
- //if the handler calls close on the channel or connection while
- //handling this message, then consumer will now have been deleted.
- if(!closed){
- bool multiple(false);
- switch(consumer->ackMode){
- case LAZY_ACK:
- multiple = true;
- if(++(consumer->count) < prefetch) break;
- //else drop-through
- case AUTO_ACK:
- out->send(new AMQFrame(id, new BasicAckBody(msg.getDeliveryTag(), multiple)));
- consumer->lastDeliveryTag = 0;
- }
- }
-
- //as it stands, transactionality is entirely orthogonal to ack
- //mode, though the acks will not be processed by the broker under
- //a transaction until it commits.
-}
-
-void Channel::dispatch(){
- while(!closed){
- IncomingMessage* incomingMsg = dequeue();
- if(incomingMsg){
- //Note: msg is currently only valid for duration of this call
- Message msg(incomingMsg->getHeader());
- incomingMsg->getData(msg.data);
- if(incomingMsg->isReturn()){
- if(returnsHandler == 0){
- //print warning to log/console
- std::cout << "Message returned: " << msg.getData() << std::endl;
- }else{
- returnsHandler->returned(msg);
- }
- }else{
- msg.deliveryTag = incomingMsg->getDeliveryTag();
- std::string tag = incomingMsg->getConsumerTag();
-
- if(consumers[tag] == 0){
- //signal error
- std::cout << "Unknown consumer: " << tag << std::endl;
- }else{
- deliver(consumers[tag], msg);
- }
- }
- delete incomingMsg;
- }
- }
-}
-
-void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
- returnsHandler = handler;
-}
-
-void Channel::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
- responses.expect();
- out->send(frame);
- responses.receive(body);
-}
-
-void Channel::close(){
- if(con != 0){
- con->closeChannel(this);
- }
-}
diff --git a/cpp/client/src/Connection.cpp b/cpp/client/src/Connection.cpp
deleted file mode 100644
index f332d2242c..0000000000
--- a/cpp/client/src/Connection.cpp
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "Connection.h"
-#include "Channel.h"
-#include "ConnectorImpl.h"
-#include "Message.h"
-#include "QpidError.h"
-#include <iostream>
-
-using namespace qpid::client;
-using namespace qpid::framing;
-using namespace qpid::io;
-using namespace qpid::concurrent;
-
-u_int16_t Connection::channelIdCounter;
-
-Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true){
- connector = new ConnectorImpl(debug, _max_frame_size);
-}
-
-Connection::~Connection(){
- delete connector;
-}
-
-void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){
- host = _host;
- port = _port;
- connector->setInputHandler(this);
- connector->setTimeoutHandler(this);
- connector->setShutdownHandler(this);
- out = connector->getOutputHandler();
- connector->connect(host, port);
-
- ProtocolInitiation* header = new ProtocolInitiation(8, 0);
- responses.expect();
- connector->init(header);
- responses.receive(connection_start);
-
- FieldTable props;
- string mechanism("PLAIN");
- string response = ((char)0) + uid + ((char)0) + pwd;
- string locale("en_US");
- responses.expect();
- out->send(new AMQFrame(0, new ConnectionStartOkBody(props, mechanism, response, locale)));
-
- /**
- * Assume for now that further challenges will not be required
- //receive connection.secure
- responses.receive(connection_secure));
- //send connection.secure-ok
- out->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
- **/
-
- responses.receive(connection_tune);
-
- ConnectionTuneBody::shared_ptr proposal = std::tr1::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse());
- out->send(new AMQFrame(0, new ConnectionTuneOkBody(proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat())));
-
- u_int16_t heartbeat = proposal->getHeartbeat();
- connector->setReadTimeout(heartbeat * 2);
- connector->setWriteTimeout(heartbeat);
-
- //send connection.open
- string capabilities;
- string vhost = virtualhost;
- responses.expect();
- out->send(new AMQFrame(0, new ConnectionOpenBody(vhost, capabilities, true)));
- //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true).
- responses.waitForResponse();
- if(responses.validate(connection_open_ok)){
- //ok
- }else if(responses.validate(connection_redirect)){
- //ignore for now
- ConnectionRedirectBody::shared_ptr redirect(std::tr1::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse()));
- std::cout << "Received redirection to " << redirect->getHost() << std::endl;
- }else{
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
- }
-
-}
-
-void Connection::close(){
- if(!closed){
- u_int16_t code(200);
- string text("Ok");
- u_int16_t classId(0);
- u_int16_t methodId(0);
-
- sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, text, classId, methodId)), connection_close_ok);
- connector->close();
- }
-}
-
-void Connection::openChannel(Channel* channel){
- channel->con = this;
- channel->id = ++channelIdCounter;
- channel->out = out;
- channels[channel->id] = channel;
- //now send frame to open channel and wait for response
- string oob;
- channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(oob)), channel_open_ok);
- channel->setQos();
- channel->closed = false;
-}
-
-void Connection::closeChannel(Channel* channel){
- //send frame to close channel
- u_int16_t code(200);
- string text("Ok");
- u_int16_t classId(0);
- u_int16_t methodId(0);
- closeChannel(channel, code, text, classId, methodId);
-}
-
-void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){
- //send frame to close channel
- channel->cancelAll();
- channel->closed = true;
- channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(code, text, classId, methodId)), channel_close_ok);
- channel->con = 0;
- channel->out = 0;
- removeChannel(channel);
-}
-
-void Connection::removeChannel(Channel* channel){
- //send frame to close channel
-
- channels.erase(channel->id);
- channel->out = 0;
- channel->id = 0;
- channel->con = 0;
-}
-
-void Connection::received(AMQFrame* frame){
- u_int16_t channelId = frame->getChannel();
-
- if(channelId == 0){
- this->handleBody(frame->getBody());
- }else{
- Channel* channel = channels[channelId];
- if(channel == 0){
- error(504, "Unknown channel");
- }else{
- try{
- channel->handleBody(frame->getBody());
- }catch(qpid::QpidError e){
- channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
- }
- }
- }
-}
-
-void Connection::handleMethod(AMQMethodBody::shared_ptr body){
- //connection.close, basic.deliver, basic.return or a response to a synchronous request
- if(responses.isWaiting()){
- responses.signalResponse(body);
- }else if(connection_close.match(body.get())){
- //send back close ok
- //close socket
- ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get());
- std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl;
- connector->close();
- }else{
- std::cout << "Unhandled method for connection: " << *body << std::endl;
- error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId());
- }
-}
-
-void Connection::handleHeader(AMQHeaderBody::shared_ptr /*body*/){
- error(504, "Channel error: received header body with channel 0.");
-}
-
-void Connection::handleContent(AMQContentBody::shared_ptr /*body*/){
- error(504, "Channel error: received content body with channel 0.");
-}
-
-void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
-}
-
-void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
- responses.expect();
- out->send(frame);
- responses.receive(body);
-}
-
-void Connection::error(int code, const string& msg, int classid, int methodid){
- std::cout << "Connection exception generated: " << code << msg;
- if(classid || methodid){
- std::cout << " [" << methodid << ":" << classid << "]";
- }
- std::cout << std::endl;
- sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, msg, classid, methodid)), connection_close_ok);
- connector->close();
-}
-
-void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){
- std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.file << ":" << e.line << ")" << std::endl;
- int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500;
- string msg = e.msg;
- if(method == 0){
- closeChannel(channel, code, msg);
- }else{
- closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId());
- }
-}
-
-void Connection::idleIn(){
- std::cout << "Connection timed out due to abscence of heartbeat." << std::endl;
- connector->close();
-}
-
-void Connection::idleOut(){
- out->send(new AMQFrame(0, new AMQHeartbeatBody()));
-}
-
-void Connection::shutdown(){
- closed = true;
- //close all channels
- for(iterator i = channels.begin(); i != channels.end(); i++){
- i->second->stop();
- }
-}
diff --git a/cpp/client/src/Exchange.cpp b/cpp/client/src/Exchange.cpp
deleted file mode 100644
index 681068dc4c..0000000000
--- a/cpp/client/src/Exchange.cpp
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "Exchange.h"
-
-qpid::client::Exchange::Exchange(std::string _name, std::string _type) : name(_name), type(_type){}
-const std::string& qpid::client::Exchange::getName() const { return name; }
-const std::string& qpid::client::Exchange::getType() const { return type; }
-
-const std::string qpid::client::Exchange::DIRECT_EXCHANGE = "direct";
-const std::string qpid::client::Exchange::TOPIC_EXCHANGE = "topic";
-const std::string qpid::client::Exchange::HEADERS_EXCHANGE = "headers";
-
-const qpid::client::Exchange qpid::client::Exchange::DEFAULT_DIRECT_EXCHANGE("amq.direct", DIRECT_EXCHANGE);
-const qpid::client::Exchange qpid::client::Exchange::DEFAULT_TOPIC_EXCHANGE("amq.topic", TOPIC_EXCHANGE);
-const qpid::client::Exchange qpid::client::Exchange::DEFAULT_HEADERS_EXCHANGE("amq.headers", HEADERS_EXCHANGE);
diff --git a/cpp/client/src/IncomingMessage.cpp b/cpp/client/src/IncomingMessage.cpp
deleted file mode 100644
index 9576051302..0000000000
--- a/cpp/client/src/IncomingMessage.cpp
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "IncomingMessage.h"
-#include "QpidError.h"
-#include <iostream>
-
-using namespace qpid::client;
-using namespace qpid::framing;
-
-IncomingMessage::IncomingMessage(BasicDeliverBody::shared_ptr intro) : delivered(intro){}
-IncomingMessage::IncomingMessage(BasicReturnBody::shared_ptr intro): returned(intro){}
-IncomingMessage::IncomingMessage(BasicGetOkBody::shared_ptr intro): response(intro){}
-
-IncomingMessage::~IncomingMessage(){
-}
-
-void IncomingMessage::setHeader(AMQHeaderBody::shared_ptr _header){
- this->header = _header;
-}
-
-void IncomingMessage::addContent(AMQContentBody::shared_ptr _content){
- this->content.push_back(_content);
-}
-
-bool IncomingMessage::isComplete(){
- return header != 0 && header->getContentSize() == contentSize();
-}
-
-bool IncomingMessage::isReturn(){
- return returned;
-}
-
-bool IncomingMessage::isDelivery(){
- return delivered;
-}
-
-bool IncomingMessage::isResponse(){
- return response;
-}
-
-const string& IncomingMessage::getConsumerTag(){
- if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Consumer tag only valid for delivery");
- return delivered->getConsumerTag();
-}
-
-u_int64_t IncomingMessage::getDeliveryTag(){
- if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Delivery tag only valid for delivery");
- return delivered->getDeliveryTag();
-}
-
-AMQHeaderBody::shared_ptr& IncomingMessage::getHeader(){
- return header;
-}
-
-void IncomingMessage::getData(string& s){
- int count(content.size());
- for(int i = 0; i < count; i++){
- if(i == 0) s = content[i]->getData();
- else s += content[i]->getData();
- }
-}
-
-u_int64_t IncomingMessage::contentSize(){
- u_int64_t size(0);
- u_int64_t count(content.size());
- for(u_int64_t i = 0; i < count; i++){
- size += content[i]->size();
- }
- return size;
-}
diff --git a/cpp/client/src/Message.cpp b/cpp/client/src/Message.cpp
deleted file mode 100644
index 71befe57b1..0000000000
--- a/cpp/client/src/Message.cpp
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "Message.h"
-
-using namespace qpid::client;
-using namespace qpid::framing;
-
-Message::Message(){
- header = AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC));
-}
-
-Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){
-}
-
-Message::~Message(){
-}
-
-BasicHeaderProperties* Message::getHeaderProperties(){
- return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
-}
-
-std::string& Message::getContentType(){
- return getHeaderProperties()->getContentType();
-}
-
-std::string& Message::getContentEncoding(){
- return getHeaderProperties()->getContentEncoding();
-}
-
-FieldTable& Message::getHeaders(){
- return getHeaderProperties()->getHeaders();
-}
-
-u_int8_t Message::getDeliveryMode(){
- return getHeaderProperties()->getDeliveryMode();
-}
-
-u_int8_t Message::getPriority(){
- return getHeaderProperties()->getPriority();
-}
-
-std::string& Message::getCorrelationId(){
- return getHeaderProperties()->getCorrelationId();
-}
-
-std::string& Message::getReplyTo(){
- return getHeaderProperties()->getReplyTo();
-}
-
-std::string& Message::getExpiration(){
- return getHeaderProperties()->getExpiration();
-}
-
-std::string& Message::getMessageId(){
- return getHeaderProperties()->getMessageId();
-}
-
-u_int64_t Message::getTimestamp(){
- return getHeaderProperties()->getTimestamp();
-}
-
-std::string& Message::getType(){
- return getHeaderProperties()->getType();
-}
-
-std::string& Message::getUserId(){
- return getHeaderProperties()->getUserId();
-}
-
-std::string& Message::getAppId(){
- return getHeaderProperties()->getAppId();
-}
-
-std::string& Message::getClusterId(){
- return getHeaderProperties()->getClusterId();
-}
-
-void Message::setContentType(std::string& type){
- getHeaderProperties()->setContentType(type);
-}
-
-void Message::setContentEncoding(std::string& encoding){
- getHeaderProperties()->setContentEncoding(encoding);
-}
-
-void Message::setHeaders(FieldTable& headers){
- getHeaderProperties()->setHeaders(headers);
-}
-
-void Message::setDeliveryMode(u_int8_t mode){
- getHeaderProperties()->setDeliveryMode(mode);
-}
-
-void Message::setPriority(u_int8_t priority){
- getHeaderProperties()->setPriority(priority);
-}
-
-void Message::setCorrelationId(std::string& correlationId){
- getHeaderProperties()->setCorrelationId(correlationId);
-}
-
-void Message::setReplyTo(std::string& replyTo){
- getHeaderProperties()->setReplyTo(replyTo);
-}
-
-void Message::setExpiration(std::string& expiration){
- getHeaderProperties()->setExpiration(expiration);
-}
-
-void Message::setMessageId(std::string& messageId){
- getHeaderProperties()->setMessageId(messageId);
-}
-
-void Message::setTimestamp(u_int64_t timestamp){
- getHeaderProperties()->setTimestamp(timestamp);
-}
-
-void Message::setType(std::string& type){
- getHeaderProperties()->setType(type);
-}
-
-void Message::setUserId(std::string& userId){
- getHeaderProperties()->setUserId(userId);
-}
-
-void Message::setAppId(std::string& appId){
- getHeaderProperties()->setAppId(appId);
-}
-
-void Message::setClusterId(std::string& clusterId){
- getHeaderProperties()->setClusterId(clusterId);
-}
diff --git a/cpp/client/src/MessageListener.cpp b/cpp/client/src/MessageListener.cpp
deleted file mode 100644
index cf55db8edf..0000000000
--- a/cpp/client/src/MessageListener.cpp
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "MessageListener.h"
-
-qpid::client::MessageListener::~MessageListener() {}
diff --git a/cpp/client/src/Queue.cpp b/cpp/client/src/Queue.cpp
deleted file mode 100644
index 5b2881f7ff..0000000000
--- a/cpp/client/src/Queue.cpp
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "Queue.h"
-
-qpid::client::Queue::Queue() : name(""), autodelete(true), exclusive(true){}
-
-qpid::client::Queue::Queue(std::string _name) : name(_name), autodelete(false), exclusive(false){}
-
-qpid::client::Queue::Queue(std::string _name, bool temp) : name(_name), autodelete(temp), exclusive(temp){}
-
-qpid::client::Queue::Queue(std::string _name, bool _autodelete, bool _exclusive)
- : name(_name), autodelete(_autodelete), exclusive(_exclusive){}
-
-const std::string& qpid::client::Queue::getName() const{
- return name;
-}
-
-void qpid::client::Queue::setName(const std::string& _name){
- name = _name;
-}
-
-bool qpid::client::Queue::isAutoDelete() const{
- return autodelete;
-}
-
-bool qpid::client::Queue::isExclusive() const{
- return exclusive;
-}
-
-
-
-
diff --git a/cpp/client/src/ResponseHandler.cpp b/cpp/client/src/ResponseHandler.cpp
deleted file mode 100644
index 6938539469..0000000000
--- a/cpp/client/src/ResponseHandler.cpp
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "ResponseHandler.h"
-#include "MonitorImpl.h"
-#include "QpidError.h"
-
-qpid::client::ResponseHandler::ResponseHandler() : waiting(false){
- monitor = new qpid::concurrent::MonitorImpl();
-}
-
-qpid::client::ResponseHandler::~ResponseHandler(){
- delete monitor;
-}
-
-bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){
- return expected.match(response.get());
-}
-
-void qpid::client::ResponseHandler::waitForResponse(){
- monitor->acquire();
- if(waiting){
- monitor->wait();
- }
- monitor->release();
-}
-
-void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr _response){
- response = _response;
- monitor->acquire();
- waiting = false;
- monitor->notify();
- monitor->release();
-}
-
-void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){
- monitor->acquire();
- if(waiting){
- monitor->wait();
- }
- monitor->release();
- if(!validate(expected)){
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error");
- }
-}
-
-void qpid::client::ResponseHandler::expect(){
- waiting = true;
-}
diff --git a/cpp/client/src/ReturnedMessageHandler.cpp b/cpp/client/src/ReturnedMessageHandler.cpp
deleted file mode 100644
index cfa91fee97..0000000000
--- a/cpp/client/src/ReturnedMessageHandler.cpp
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "ReturnedMessageHandler.h"
-
-qpid::client::ReturnedMessageHandler::~ReturnedMessageHandler() {}
diff --git a/cpp/client/test/Makefile b/cpp/client/test/Makefile
deleted file mode 100644
index f35aab3e17..0000000000
--- a/cpp/client/test/Makefile
+++ /dev/null
@@ -1,45 +0,0 @@
-#
-# Copyright (c) 2006 The Apache Software Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-QPID_HOME = ../../..
-include ${QPID_HOME}/cpp/options.mk
-
-# TODO aconway 2006-09-12: These are system tests, not unit tests.
-# We need client side unit tests.
-# We should separate them from the system tets.
-# We need an approach to automate the C++ client/server system tests.
-#
-
-SOURCES=$(wildcard *.cpp)
-TESTS=$(SOURCES:.cpp=)
-DEPS= $(SOURCES:.cpp=.d)
-
-INCLUDES = $(TEST_INCLUDES)
-LDLIBS= -lapr-1 $(COMMON_LIB) $(CLIENT_LIB)
-
-.PHONY: all clean
-
-all: $(TESTS)
-
-clean:
- -@rm -f $(TESTS) $(DEPS)
-
-# Rule to build test programs.
-%: %.cpp
- $(CXX) -o $@ $< $(CXXFLAGS) $(LDFLAGS) $(LDLIBS)
-
-# Dependencies
--include $(DEPS)
diff --git a/cpp/client/test/client_test.cpp b/cpp/client/test/client_test.cpp
deleted file mode 100644
index b899d494d7..0000000000
--- a/cpp/client/test/client_test.cpp
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <iostream>
-
-#include "QpidError.h"
-#include "Channel.h"
-#include "Connection.h"
-#include "FieldTable.h"
-#include "Message.h"
-#include "MessageListener.h"
-
-#include "MonitorImpl.h"
-
-
-using namespace qpid::client;
-using namespace qpid::concurrent;
-
-class SimpleListener : public virtual MessageListener{
- Monitor* monitor;
-
-public:
- inline SimpleListener(Monitor* _monitor) : monitor(_monitor){}
-
- inline virtual void received(Message& /*msg*/){
- std::cout << "Received message " /**<< msg **/<< std::endl;
- monitor->acquire();
- monitor->notify();
- monitor->release();
- }
-};
-
-int main(int argc, char**)
-{
- try{
- Connection con(argc > 1);
- Channel channel;
- Exchange exchange("MyExchange", Exchange::TOPIC_EXCHANGE);
- Queue queue("MyQueue", true);
-
- string host("localhost");
-
- con.open(host);
- std::cout << "Opened connection." << std::endl;
- con.openChannel(&channel);
- std::cout << "Opened channel." << std::endl;
- channel.declareExchange(exchange);
- std::cout << "Declared exchange." << std::endl;
- channel.declareQueue(queue);
- std::cout << "Declared queue." << std::endl;
- qpid::framing::FieldTable args;
- channel.bind(exchange, queue, "MyTopic", args);
- std::cout << "Bound queue to exchange." << std::endl;
-
- //set up a message listener
- MonitorImpl monitor;
- SimpleListener listener(&monitor);
- string tag("MyTag");
- channel.consume(queue, tag, &listener);
- channel.start();
- std::cout << "Registered consumer." << std::endl;
-
- Message msg;
- string data("MyMessage");
- msg.setData(data);
- channel.publish(msg, exchange, "MyTopic");
- std::cout << "Published message." << std::endl;
-
- monitor.acquire();
- monitor.wait();
- monitor.release();
-
-
- con.closeChannel(&channel);
- std::cout << "Closed channel." << std::endl;
- con.close();
- std::cout << "Closed connection." << std::endl;
- }catch(qpid::QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
- return 1;
- }
- return 0;
-}
diff --git a/cpp/client/test/topic_listener.cpp b/cpp/client/test/topic_listener.cpp
deleted file mode 100644
index a1b8e383a0..0000000000
--- a/cpp/client/test/topic_listener.cpp
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <iostream>
-#include <sstream>
-#include "apr_time.h"
-#include "QpidError.h"
-#include "Channel.h"
-#include "Connection.h"
-#include "Exchange.h"
-#include "MessageListener.h"
-#include "Queue.h"
-
-using namespace qpid::client;
-
-class Listener : public MessageListener{
- Channel* const channel;
- const std::string responseQueue;
- const bool transactional;
- bool init;
- int count;
- apr_time_t start;
-
- void shutdown();
- void report();
-public:
- Listener(Channel* channel, const std::string& reponseQueue, bool tx);
- virtual void received(Message& msg);
-};
-
-class Args{
- string host;
- int port;
- int ackMode;
- bool transactional;
- int prefetch;
- bool trace;
- bool help;
-public:
- inline Args() : host("localhost"), port(5672), ackMode(NO_ACK), transactional(false), prefetch(1000), trace(false), help(false){}
- void parse(int argc, char** argv);
- void usage();
-
- inline const string& getHost() const { return host;}
- inline int getPort() const { return port; }
- inline int getAckMode(){ return ackMode; }
- inline bool getTransactional() const { return transactional; }
- inline int getPrefetch(){ return prefetch; }
- inline bool getTrace() const { return trace; }
- inline bool getHelp() const { return help; }
-};
-
-int main(int argc, char** argv){
- Args args;
- args.parse(argc, argv);
- if(args.getHelp()){
- args.usage();
- }else{
- try{
- Connection connection(args.getTrace());
- connection.open(args.getHost(), args.getPort());
- Channel channel(args.getTransactional(), args.getPrefetch());
- connection.openChannel(&channel);
-
- //declare exchange, queue and bind them:
- Queue response("response");
- channel.declareQueue(response);
-
- Queue control;
- channel.declareQueue(control);
- qpid::framing::FieldTable bindArgs;
- channel.bind(Exchange::DEFAULT_TOPIC_EXCHANGE, control, "topic_control", bindArgs);
- //set up listener
- Listener listener(&channel, response.getName(), args.getTransactional());
- std::string tag;
- channel.consume(control, tag, &listener, args.getAckMode());
- channel.run();
- connection.close();
- }catch(qpid::QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
- }
- }
-}
-
-Listener::Listener(Channel* _channel, const std::string& _responseq, bool tx) :
- channel(_channel), responseQueue(_responseq), transactional(tx), init(false), count(0){}
-
-void Listener::received(Message& message){
- if(!init){
- start = apr_time_as_msec(apr_time_now());
- count = 0;
- init = true;
- }
- std::string type(message.getHeaders().getString("TYPE"));
-
- if(type == "TERMINATION_REQUEST"){
- shutdown();
- }else if(type == "REPORT_REQUEST"){
- //send a report:
- report();
- init = false;
- }else if (++count % 100 == 0){
- std::cout <<"Received " << count << " messages." << std::endl;
- }
-}
-
-void Listener::shutdown(){
- channel->close();
-}
-
-void Listener::report(){
- apr_time_t finish = apr_time_as_msec(apr_time_now());
- apr_time_t time = finish - start;
- std::stringstream reportstr;
- reportstr << "Received " << count << " messages in " << time << " ms.";
- Message msg;
- msg.setData(reportstr.str());
- channel->publish(msg, string(), responseQueue);
- if(transactional){
- channel->commit();
- }
-}
-
-
-void Args::parse(int argc, char** argv){
- for(int i = 1; i < argc; i++){
- string name(argv[i]);
- if("-help" == name){
- help = true;
- break;
- }else if("-host" == name){
- host = argv[++i];
- }else if("-port" == name){
- port = atoi(argv[++i]);
- }else if("-ack_mode" == name){
- ackMode = atoi(argv[++i]);
- }else if("-transactional" == name){
- transactional = true;
- }else if("-prefetch" == name){
- prefetch = atoi(argv[++i]);
- }else if("-trace" == name){
- trace = true;
- }else{
- std::cout << "Warning: unrecognised option " << name << std::endl;
- }
- }
-}
-
-void Args::usage(){
- std::cout << "Options:" << std::endl;
- std::cout << " -help" << std::endl;
- std::cout << " Prints this usage message" << std::endl;
- std::cout << " -host <host>" << std::endl;
- std::cout << " Specifies host to connect to (default is localhost)" << std::endl;
- std::cout << " -port <port>" << std::endl;
- std::cout << " Specifies port to conect to (default is 5762)" << std::endl;
- std::cout << " -ack_mode <mode>" << std::endl;
- std::cout << " Sets the acknowledgement mode" << std::endl;
- std::cout << " 0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << std::endl;
- std::cout << " -transactional" << std::endl;
- std::cout << " Indicates the client should use transactions" << std::endl;
- std::cout << " -prefetch <count>" << std::endl;
- std::cout << " Specifies the prefetch count (default is 1000)" << std::endl;
- std::cout << " -trace" << std::endl;
- std::cout << " Indicates that the frames sent and received should be logged" << std::endl;
-}
diff --git a/cpp/client/test/topic_publisher.cpp b/cpp/client/test/topic_publisher.cpp
deleted file mode 100644
index fc6b7f3b30..0000000000
--- a/cpp/client/test/topic_publisher.cpp
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <iostream>
-#include <cstdlib>
-#include "unistd.h"
-#include "apr_time.h"
-#include "MonitorImpl.h"
-#include "QpidError.h"
-#include "Channel.h"
-#include "Connection.h"
-#include "Exchange.h"
-#include "MessageListener.h"
-#include "Queue.h"
-
-using namespace qpid::client;
-using namespace qpid::concurrent;
-
-class Publisher : public MessageListener{
- Channel* const channel;
- const std::string controlTopic;
- const bool transactional;
- MonitorImpl monitor;
- int count;
-
- void waitForCompletion(int msgs);
- string generateData(int size);
-
-public:
- Publisher(Channel* channel, const std::string& controlTopic, bool tx);
- virtual void received(Message& msg);
- apr_time_t publish(int msgs, int listeners, int size);
- void terminate();
-};
-
-class Args{
- string host;
- int port;
- int messages;
- int subscribers;
- int ackMode;
- bool transactional;
- int prefetch;
- int batches;
- int delay;
- int size;
- bool trace;
- bool help;
-public:
- inline Args() : host("localhost"), port(5672), messages(1000), subscribers(1),
- ackMode(NO_ACK), transactional(false), prefetch(1000), batches(1),
- delay(0), size(256), trace(false), help(false){}
-
- void parse(int argc, char** argv);
- void usage();
-
- inline const string& getHost() const { return host;}
- inline int getPort() const { return port; }
- inline int getMessages() const { return messages; }
- inline int getSubscribers() const { return subscribers; }
- inline int getAckMode(){ return ackMode; }
- inline bool getTransactional() const { return transactional; }
- inline int getPrefetch(){ return prefetch; }
- inline int getBatches(){ return batches; }
- inline int getDelay(){ return delay; }
- inline int getSize(){ return size; }
- inline bool getTrace() const { return trace; }
- inline bool getHelp() const { return help; }
-};
-
-int main(int argc, char** argv){
- Args args;
- args.parse(argc, argv);
- if(args.getHelp()){
- args.usage();
- }else{
- try{
- Connection connection(args.getTrace());
- connection.open(args.getHost(), args.getPort());
- Channel channel(args.getTransactional(), args.getPrefetch());
- connection.openChannel(&channel);
-
- //declare queue (relying on default binding):
- Queue response("response");
- channel.declareQueue(response);
-
- //set up listener
- Publisher publisher(&channel, "topic_control", args.getTransactional());
- std::string tag("mytag");
- channel.consume(response, tag, &publisher, args.getAckMode());
- channel.start();
-
- int batchSize(args.getBatches());
- apr_time_t max(0);
- apr_time_t min(0);
- apr_time_t sum(0);
- for(int i = 0; i < batchSize; i++){
- if(i > 0 && args.getDelay()) sleep(args.getDelay());
- apr_time_t time = publisher.publish(args.getMessages(), args.getSubscribers(), args.getSize());
- if(!max || time > max) max = time;
- if(!min || time < min) min = time;
- sum += time;
- std::cout << "Completed " << (i+1) << " of " << batchSize << " in " << time << "ms" << std::endl;
- }
- publisher.terminate();
- apr_time_t avg = sum / batchSize;
- if(batchSize > 1){
- std::cout << batchSize << " batches completed. avg=" << avg <<
- ", max=" << max << ", min=" << min << std::endl;
- }
- channel.close();
- connection.close();
- }catch(qpid::QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
- }
- }
-}
-
-Publisher::Publisher(Channel* _channel, const std::string& _controlTopic, bool tx) :
- channel(_channel), controlTopic(_controlTopic), transactional(tx){}
-
-void Publisher::received(Message& msg){
- //count responses and when all are received end the current batch
- monitor.acquire();
- if(--count == 0){
- monitor.notify();
- }
- std::cout << "Received report: " << msg.getData() << " (" << count << " remaining)." << std::endl;
- monitor.release();
-}
-
-void Publisher::waitForCompletion(int msgs){
- count = msgs;
- monitor.wait();
-}
-
-apr_time_t Publisher::publish(int msgs, int listeners, int size){
- monitor.acquire();
- Message msg;
- msg.setData(generateData(size));
- apr_time_t start(apr_time_as_msec(apr_time_now()));
- for(int i = 0; i < msgs; i++){
- channel->publish(msg, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic);
- }
- //send report request
- Message reportRequest;
- reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
- channel->publish(reportRequest, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic);
- if(transactional){
- channel->commit();
- }
-
- waitForCompletion(listeners);
- monitor.release();
- apr_time_t finish(apr_time_as_msec(apr_time_now()));
-
- return finish - start;
-}
-
-string Publisher::generateData(int size){
- string data;
- for(int i = 0; i < size; i++){
- data += ('A' + (i / 26));
- }
- return data;
-}
-
-void Publisher::terminate(){
- //send termination request
- Message terminationRequest;
- terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST");
- channel->publish(terminationRequest, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic);
- if(transactional){
- channel->commit();
- }
-}
-
-void Args::parse(int argc, char** argv){
- for(int i = 1; i < argc; i++){
- string name(argv[i]);
- if("-help" == name){
- help = true;
- break;
- }else if("-host" == name){
- host = argv[++i];
- }else if("-port" == name){
- port = atoi(argv[++i]);
- }else if("-messages" == name){
- messages = atoi(argv[++i]);
- }else if("-subscribers" == name){
- subscribers = atoi(argv[++i]);
- }else if("-ack_mode" == name){
- ackMode = atoi(argv[++i]);
- }else if("-transactional" == name){
- transactional = true;
- }else if("-prefetch" == name){
- prefetch = atoi(argv[++i]);
- }else if("-batches" == name){
- batches = atoi(argv[++i]);
- }else if("-delay" == name){
- delay = atoi(argv[++i]);
- }else if("-size" == name){
- size = atoi(argv[++i]);
- }else if("-trace" == name){
- trace = true;
- }else{
- std::cout << "Warning: unrecognised option " << name << std::endl;
- }
- }
-}
-
-void Args::usage(){
- std::cout << "Options:" << std::endl;
- std::cout << " -help" << std::endl;
- std::cout << " Prints this usage message" << std::endl;
- std::cout << " -host <host>" << std::endl;
- std::cout << " Specifies host to connect to (default is localhost)" << std::endl;
- std::cout << " -port <port>" << std::endl;
- std::cout << " Specifies port to conect to (default is 5762)" << std::endl;
- std::cout << " -messages <count>" << std::endl;
- std::cout << " Specifies how many messages to send" << std::endl;
- std::cout << " -subscribers <count>" << std::endl;
- std::cout << " Specifies how many subscribers to expect reports from" << std::endl;
- std::cout << " -ack_mode <mode>" << std::endl;
- std::cout << " Sets the acknowledgement mode" << std::endl;
- std::cout << " 0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << std::endl;
- std::cout << " -transactional" << std::endl;
- std::cout << " Indicates the client should use transactions" << std::endl;
- std::cout << " -prefetch <count>" << std::endl;
- std::cout << " Specifies the prefetch count (default is 1000)" << std::endl;
- std::cout << " -batches <count>" << std::endl;
- std::cout << " Specifies how many batches to run" << std::endl;
- std::cout << " -delay <seconds>" << std::endl;
- std::cout << " Causes a delay between each batch" << std::endl;
- std::cout << " -size <bytes>" << std::endl;
- std::cout << " Sets the size of the published messages (default is 256 bytes)" << std::endl;
- std::cout << " -trace" << std::endl;
- std::cout << " Indicates that the frames sent and received should be logged" << std::endl;
-}