summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2006-11-22 16:57:35 +0000
committerKim van der Riet <kpvdr@apache.org>2006-11-22 16:57:35 +0000
commitd46ac2955c4871c9f22067f47490095e2c5f1806 (patch)
tree7e76ef7e4ca47e4cc57c83f7950bf97c3eceb210 /cpp/src/qpid/client
parent018723f3889e9a1f63585dddba8eecff1d168501 (diff)
downloadqpid-python-d46ac2955c4871c9f22067f47490095e2c5f1806.tar.gz
Merged AMQP version-sensitive generated files with C++ trunk. Phase 1 of merge complete - all locations where version info is required in the framing, broker and client code, the version has been hard-coded to mahor=8, minor=0. Next step: make broker and client version-aware.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@478237 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/Channel.cpp66
-rw-r--r--cpp/src/qpid/client/Channel.h1
-rw-r--r--cpp/src/qpid/client/Connection.cpp31
-rw-r--r--cpp/src/qpid/client/Connection.h1
-rw-r--r--cpp/src/qpid/client/MethodBodyInstances.h101
5 files changed, 157 insertions, 43 deletions
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp
index a6c6bfea51..6901407072 100644
--- a/cpp/src/qpid/client/Channel.cpp
+++ b/cpp/src/qpid/client/Channel.cpp
@@ -22,6 +22,7 @@
#include <qpid/sys/Monitor.h>
#include <qpid/client/Message.h>
#include <qpid/QpidError.h>
+#include <qpid/client/MethodBodyInstances.h>
using namespace boost; //to use dynamic_pointer_cast
using namespace qpid::client;
@@ -35,7 +36,10 @@ Channel::Channel(bool _transactional, u_int16_t _prefetch) :
incoming(0),
closed(true),
prefetch(_prefetch),
- transactional(_transactional)
+ transactional(_transactional),
+// AMQP version management change - kpvdr 2006-11-20
+// TODO: Make this class version-aware and link these hard-wired numbers to that version
+ version(8, 0)
{ }
Channel::~Channel(){
@@ -50,9 +54,11 @@ void Channel::setPrefetch(u_int16_t _prefetch){
}
void Channel::setQos(){
- sendAndReceive(new AMQFrame(id, new BasicQosBody(0, prefetch, false)), basic_qos_ok);
+// AMQP version management change - kpvdr 2006-11-20
+// TODO: Make this class version-aware and link these hard-wired numbers to that version
+ sendAndReceive(new AMQFrame(id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok);
if(transactional){
- sendAndReceive(new AMQFrame(id, new TxSelectBody()), tx_select_ok);
+ sendAndReceive(new AMQFrame(id, new TxSelectBody(version)), method_bodies.tx_select_ok);
}
}
@@ -60,9 +66,9 @@ void Channel::declareExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
string type = exchange.getType();
FieldTable args;
- AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(0, name, type, false, false, false, false, !synch, args));
+ AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args));
if(synch){
- sendAndReceive(frame, exchange_declare_ok);
+ sendAndReceive(frame, method_bodies.exchange_declare_ok);
}else{
out->send(frame);
}
@@ -70,9 +76,9 @@ void Channel::declareExchange(Exchange& exchange, bool synch){
void Channel::deleteExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
- AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(0, name, false, !synch));
+ AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(version, 0, name, false, !synch));
if(synch){
- sendAndReceive(frame, exchange_delete_ok);
+ sendAndReceive(frame, method_bodies.exchange_delete_ok);
}else{
out->send(frame);
}
@@ -81,11 +87,11 @@ void Channel::deleteExchange(Exchange& exchange, bool synch){
void Channel::declareQueue(Queue& queue, bool synch){
string name = queue.getName();
FieldTable args;
- AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(0, name, false, false,
+ AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(version, 0, name, false, false,
queue.isExclusive(),
queue.isAutoDelete(), !synch, args));
if(synch){
- sendAndReceive(frame, queue_declare_ok);
+ sendAndReceive(frame, method_bodies.queue_declare_ok);
if(queue.getName().length() == 0){
QueueDeclareOkBody::shared_ptr response =
dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse());
@@ -99,9 +105,9 @@ void Channel::declareQueue(Queue& queue, bool synch){
void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
//ticket, queue, ifunused, ifempty, nowait
string name = queue.getName();
- AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(0, name, ifunused, ifempty, !synch));
+ AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
if(synch){
- sendAndReceive(frame, queue_delete_ok);
+ sendAndReceive(frame, method_bodies.queue_delete_ok);
}else{
out->send(frame);
}
@@ -110,9 +116,9 @@ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch)
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
string e = exchange.getName();
string q = queue.getName();
- AMQFrame* frame = new AMQFrame(id, new QueueBindBody(0, q, e, key,!synch, args));
+ AMQFrame* frame = new AMQFrame(id, new QueueBindBody(version, 0, q, e, key,!synch, args));
if(synch){
- sendAndReceive(frame, queue_bind_ok);
+ sendAndReceive(frame, method_bodies.queue_bind_ok);
}else{
out->send(frame);
}
@@ -122,9 +128,9 @@ void Channel::consume(Queue& queue, std::string& tag, MessageListener* listener,
int ackMode, bool noLocal, bool synch){
string q = queue.getName();
- AMQFrame* frame = new AMQFrame(id, new BasicConsumeBody(0, q, (string&) tag, noLocal, ackMode == NO_ACK, false, !synch));
+ AMQFrame* frame = new AMQFrame(id, new BasicConsumeBody(version, 0, q, (string&) tag, noLocal, ackMode == NO_ACK, false, !synch));
if(synch){
- sendAndReceive(frame, basic_consume_ok);
+ sendAndReceive(frame, method_bodies.basic_consume_ok);
BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse());
tag = response->getConsumerTag();
}else{
@@ -140,12 +146,12 @@ void Channel::consume(Queue& queue, std::string& tag, MessageListener* listener,
void Channel::cancel(std::string& tag, bool synch){
Consumer* c = consumers[tag];
if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){
- out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true)));
+ out->send(new AMQFrame(id, new BasicAckBody(version, c->lastDeliveryTag, true)));
}
- AMQFrame* frame = new AMQFrame(id, new BasicCancelBody((string&) tag, !synch));
+ AMQFrame* frame = new AMQFrame(id, new BasicCancelBody(version, (string&) tag, !synch));
if(synch){
- sendAndReceive(frame, basic_cancel_ok);
+ sendAndReceive(frame, method_bodies.basic_cancel_ok);
}else{
out->send(frame);
}
@@ -181,12 +187,12 @@ void Channel::retrieve(Message& msg){
bool Channel::get(Message& msg, const Queue& queue, int ackMode){
string name = queue.getName();
- AMQFrame* frame = new AMQFrame(id, new BasicGetBody(0, name, ackMode));
+ AMQFrame* frame = new AMQFrame(id, new BasicGetBody(version, 0, name, ackMode));
responses.expect();
out->send(frame);
responses.waitForResponse();
AMQMethodBody::shared_ptr response = responses.getResponse();
- if(basic_get_ok.match(response.get())){
+ if(method_bodies.basic_get_ok.match(response.get())){
if(incoming != 0){
std::cout << "Existing message not complete" << std::endl;
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
@@ -195,7 +201,7 @@ bool Channel::get(Message& msg, const Queue& queue, int ackMode){
}
retrieve(msg);
return true;
- }if(basic_get_empty.match(response.get())){
+ }if(method_bodies.basic_get_empty.match(response.get())){
return false;
}else{
THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get.");
@@ -207,7 +213,7 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string&
string e = exchange.getName();
string key = routingKey;
- out->send(new AMQFrame(id, new BasicPublishBody(0, e, key, mandatory, immediate)));
+ out->send(new AMQFrame(id, new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
//break msg up into header frame and content frame(s) and send these
string data = msg.getData();
msg.header->setContentSize(data.length());
@@ -233,38 +239,38 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string&
}
void Channel::commit(){
- AMQFrame* frame = new AMQFrame(id, new TxCommitBody());
- sendAndReceive(frame, tx_commit_ok);
+ AMQFrame* frame = new AMQFrame(id, new TxCommitBody(version));
+ sendAndReceive(frame, method_bodies.tx_commit_ok);
}
void Channel::rollback(){
- AMQFrame* frame = new AMQFrame(id, new TxRollbackBody());
- sendAndReceive(frame, tx_rollback_ok);
+ AMQFrame* frame = new AMQFrame(id, new TxRollbackBody(version));
+ sendAndReceive(frame, method_bodies.tx_rollback_ok);
}
void Channel::handleMethod(AMQMethodBody::shared_ptr body){
//channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request
if(responses.isWaiting()){
responses.signalResponse(body);
- }else if(basic_deliver.match(body.get())){
+ }else if(method_bodies.basic_deliver.match(body.get())){
if(incoming != 0){
std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl;
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
}else{
incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body));
}
- }else if(basic_return.match(body.get())){
+ }else if(method_bodies.basic_return.match(body.get())){
if(incoming != 0){
std::cout << "Existing message not complete" << std::endl;
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
}else{
incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body));
}
- }else if(channel_close.match(body.get())){
+ }else if(method_bodies.channel_close.match(body.get())){
con->removeChannel(this);
//need to signal application that channel has been closed through exception
- }else if(channel_flow.match(body.get())){
+ }else if(method_bodies.channel_flow.match(body.get())){
}else{
//signal error
diff --git a/cpp/src/qpid/client/Channel.h b/cpp/src/qpid/client/Channel.h
index b2e08f5756..e850c1c626 100644
--- a/cpp/src/qpid/client/Channel.h
+++ b/cpp/src/qpid/client/Channel.h
@@ -65,6 +65,7 @@ namespace client {
u_int16_t prefetch;
const bool transactional;
+ qpid::framing::ProtocolVersion version;
void enqueue();
void retrieve(Message& msg);
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp
index 93f170742a..de324fdab4 100644
--- a/cpp/src/qpid/client/Connection.cpp
+++ b/cpp/src/qpid/client/Connection.cpp
@@ -23,6 +23,7 @@
#include <qpid/client/Message.h>
#include <qpid/QpidError.h>
#include <iostream>
+#include <qpid/client/MethodBodyInstances.h>
using namespace qpid::client;
using namespace qpid::framing;
@@ -31,7 +32,11 @@ using namespace qpid::sys;
u_int16_t Connection::channelIdCounter;
-Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true){
+Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true),
+// AMQP version management change - kpvdr 2006-11-20
+// TODO: Make this class version-aware and link these hard-wired numbers to that version
+ version(8, 0)
+{
connector = new Connector(debug, _max_frame_size);
}
@@ -51,14 +56,14 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui
ProtocolInitiation* header = new ProtocolInitiation(8, 0);
responses.expect();
connector->init(header);
- responses.receive(connection_start);
+ responses.receive(method_bodies.connection_start);
FieldTable props;
string mechanism("PLAIN");
string response = ((char)0) + uid + ((char)0) + pwd;
string locale("en_US");
responses.expect();
- out->send(new AMQFrame(0, new ConnectionStartOkBody(props, mechanism, response, locale)));
+ out->send(new AMQFrame(0, new ConnectionStartOkBody(version, props, mechanism, response, locale)));
/**
* Assume for now that further challenges will not be required
@@ -68,10 +73,10 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui
out->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
**/
- responses.receive(connection_tune);
+ responses.receive(method_bodies.connection_tune);
ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse());
- out->send(new AMQFrame(0, new ConnectionTuneOkBody(proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat())));
+ out->send(new AMQFrame(0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat())));
u_int16_t heartbeat = proposal->getHeartbeat();
connector->setReadTimeout(heartbeat * 2);
@@ -81,12 +86,12 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui
string capabilities;
string vhost = virtualhost;
responses.expect();
- out->send(new AMQFrame(0, new ConnectionOpenBody(vhost, capabilities, true)));
+ out->send(new AMQFrame(0, new ConnectionOpenBody(version, vhost, capabilities, true)));
//receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true).
responses.waitForResponse();
- if(responses.validate(connection_open_ok)){
+ if(responses.validate(method_bodies.connection_open_ok)){
//ok
- }else if(responses.validate(connection_redirect)){
+ }else if(responses.validate(method_bodies.connection_redirect)){
//ignore for now
ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse()));
std::cout << "Received redirection to " << redirect->getHost() << std::endl;
@@ -103,7 +108,7 @@ void Connection::close(){
u_int16_t classId(0);
u_int16_t methodId(0);
- sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, text, classId, methodId)), connection_close_ok);
+ sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok);
connector->close();
}
}
@@ -115,7 +120,7 @@ void Connection::openChannel(Channel* channel){
channels[channel->id] = channel;
//now send frame to open channel and wait for response
string oob;
- channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(oob)), channel_open_ok);
+ channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok);
channel->setQos();
channel->closed = false;
}
@@ -133,7 +138,7 @@ void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_
//send frame to close channel
channel->cancelAll();
channel->closed = true;
- channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(code, text, classId, methodId)), channel_close_ok);
+ channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok);
channel->con = 0;
channel->out = 0;
removeChannel(channel);
@@ -171,7 +176,7 @@ void Connection::handleMethod(AMQMethodBody::shared_ptr body){
//connection.close, basic.deliver, basic.return or a response to a synchronous request
if(responses.isWaiting()){
responses.signalResponse(body);
- }else if(connection_close.match(body.get())){
+ }else if(method_bodies.connection_close.match(body.get())){
//send back close ok
//close socket
ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get());
@@ -206,7 +211,7 @@ void Connection::error(int code, const string& msg, int classid, int methodid){
std::cout << " [" << methodid << ":" << classid << "]";
}
std::cout << std::endl;
- sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, msg, classid, methodid)), connection_close_ok);
+ sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok);
connector->close();
}
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index 340ebe9a0f..c7b1fb8dd0 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -59,6 +59,7 @@ class Connection : public virtual qpid::framing::InputHandler,
qpid::framing::OutputHandler* out;
ResponseHandler responses;
volatile bool closed;
+ qpid::framing::ProtocolVersion version;
void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e);
void error(int code, const std::string& msg, int classid = 0, int methodid = 0);
diff --git a/cpp/src/qpid/client/MethodBodyInstances.h b/cpp/src/qpid/client/MethodBodyInstances.h
new file mode 100644
index 0000000000..a2bd9dadd9
--- /dev/null
+++ b/cpp/src/qpid/client/MethodBodyInstances.h
@@ -0,0 +1,101 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/framing/amqp_framing.h>
+
+/**
+ * This file replaces the auto-generated instances in the former
+ * amqp_methods.h file. Add additional instances as needed.
+ */
+
+#ifndef _MethodBodyInstances_h_
+#define _MethodBodyInstances_h_
+
+namespace qpid {
+namespace client {
+
+class MethodBodyInstances
+{
+private:
+ qpid::framing::ProtocolVersion version;
+public:
+ const qpid::framing::BasicCancelOkBody basic_cancel_ok;
+ const qpid::framing::BasicConsumeOkBody basic_consume_ok;
+ const qpid::framing::BasicDeliverBody basic_deliver;
+ const qpid::framing::BasicGetEmptyBody basic_get_empty;
+ const qpid::framing::BasicGetOkBody basic_get_ok;
+ const qpid::framing::BasicQosBody basic_qos_ok;
+ const qpid::framing::BasicReturnBody basic_return;
+ const qpid::framing::ChannelCloseBody channel_close;
+ const qpid::framing::ChannelCloseOkBody channel_close_ok;
+ const qpid::framing::ChannelFlowBody channel_flow;
+ const qpid::framing::ChannelOpenOkBody channel_open_ok;
+ const qpid::framing::ConnectionCloseBody connection_close;
+ const qpid::framing::ConnectionCloseOkBody connection_close_ok;
+ const qpid::framing::ConnectionOpenOkBody connection_open_ok;
+ const qpid::framing::ConnectionRedirectBody connection_redirect;
+ const qpid::framing::ConnectionStartBody connection_start;
+ const qpid::framing::ConnectionTuneBody connection_tune;
+ const qpid::framing::ExchangeDeclareOkBody exchange_declare_ok;
+ const qpid::framing::ExchangeDeleteOkBody exchange_delete_ok;
+ const qpid::framing::QueueDeclareOkBody queue_declare_ok;
+ const qpid::framing::QueueDeleteOkBody queue_delete_ok;
+ const qpid::framing::QueueBindOkBody queue_bind_ok;
+ const qpid::framing::TxCommitOkBody tx_commit_ok;
+ const qpid::framing::TxRollbackOkBody tx_rollback_ok;
+ const qpid::framing::TxSelectOkBody tx_select_ok;
+
+ MethodBodyInstances(u_int8_t major, u_int8_t minor) :
+ version(major, minor),
+ basic_cancel_ok(version),
+ basic_consume_ok(version),
+ basic_deliver(version),
+ basic_get_empty(version),
+ basic_get_ok(version),
+ basic_qos_ok(version),
+ basic_return(version),
+ channel_close(version),
+ channel_close_ok(version),
+ channel_flow(version),
+ channel_open_ok(version),
+ connection_close(version),
+ connection_close_ok(version),
+ connection_open_ok(version),
+ connection_redirect(version),
+ connection_start(version),
+ connection_tune(version),
+ exchange_declare_ok(version),
+ exchange_delete_ok(version),
+ queue_declare_ok(version),
+ queue_delete_ok(version),
+ queue_bind_ok(version),
+ tx_commit_ok(version),
+ tx_rollback_ok(version),
+ tx_select_ok(version)
+ {}
+
+};
+
+static MethodBodyInstances method_bodies(8, 0);
+
+} // namespace client
+} // namespace qpid
+
+#endif