diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2006-11-22 16:57:35 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2006-11-22 16:57:35 +0000 |
| commit | d46ac2955c4871c9f22067f47490095e2c5f1806 (patch) | |
| tree | 7e76ef7e4ca47e4cc57c83f7950bf97c3eceb210 /cpp/src/qpid/client | |
| parent | 018723f3889e9a1f63585dddba8eecff1d168501 (diff) | |
| download | qpid-python-d46ac2955c4871c9f22067f47490095e2c5f1806.tar.gz | |
Merged AMQP version-sensitive generated files with C++ trunk. Phase 1 of merge complete - all locations where version info is required in the framing, broker and client code, the version has been hard-coded to mahor=8, minor=0. Next step: make broker and client version-aware.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@478237 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/Channel.cpp | 66 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Channel.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connection.cpp | 31 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connection.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/client/MethodBodyInstances.h | 101 |
5 files changed, 157 insertions, 43 deletions
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp index a6c6bfea51..6901407072 100644 --- a/cpp/src/qpid/client/Channel.cpp +++ b/cpp/src/qpid/client/Channel.cpp @@ -22,6 +22,7 @@ #include <qpid/sys/Monitor.h> #include <qpid/client/Message.h> #include <qpid/QpidError.h> +#include <qpid/client/MethodBodyInstances.h> using namespace boost; //to use dynamic_pointer_cast using namespace qpid::client; @@ -35,7 +36,10 @@ Channel::Channel(bool _transactional, u_int16_t _prefetch) : incoming(0), closed(true), prefetch(_prefetch), - transactional(_transactional) + transactional(_transactional), +// AMQP version management change - kpvdr 2006-11-20 +// TODO: Make this class version-aware and link these hard-wired numbers to that version + version(8, 0) { } Channel::~Channel(){ @@ -50,9 +54,11 @@ void Channel::setPrefetch(u_int16_t _prefetch){ } void Channel::setQos(){ - sendAndReceive(new AMQFrame(id, new BasicQosBody(0, prefetch, false)), basic_qos_ok); +// AMQP version management change - kpvdr 2006-11-20 +// TODO: Make this class version-aware and link these hard-wired numbers to that version + sendAndReceive(new AMQFrame(id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok); if(transactional){ - sendAndReceive(new AMQFrame(id, new TxSelectBody()), tx_select_ok); + sendAndReceive(new AMQFrame(id, new TxSelectBody(version)), method_bodies.tx_select_ok); } } @@ -60,9 +66,9 @@ void Channel::declareExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); string type = exchange.getType(); FieldTable args; - AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(0, name, type, false, false, false, false, !synch, args)); + AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args)); if(synch){ - sendAndReceive(frame, exchange_declare_ok); + sendAndReceive(frame, method_bodies.exchange_declare_ok); }else{ out->send(frame); } @@ -70,9 +76,9 @@ void Channel::declareExchange(Exchange& exchange, bool synch){ void Channel::deleteExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); - AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(0, name, false, !synch)); + AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(version, 0, name, false, !synch)); if(synch){ - sendAndReceive(frame, exchange_delete_ok); + sendAndReceive(frame, method_bodies.exchange_delete_ok); }else{ out->send(frame); } @@ -81,11 +87,11 @@ void Channel::deleteExchange(Exchange& exchange, bool synch){ void Channel::declareQueue(Queue& queue, bool synch){ string name = queue.getName(); FieldTable args; - AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(0, name, false, false, + AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(version, 0, name, false, false, queue.isExclusive(), queue.isAutoDelete(), !synch, args)); if(synch){ - sendAndReceive(frame, queue_declare_ok); + sendAndReceive(frame, method_bodies.queue_declare_ok); if(queue.getName().length() == 0){ QueueDeclareOkBody::shared_ptr response = dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse()); @@ -99,9 +105,9 @@ void Channel::declareQueue(Queue& queue, bool synch){ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){ //ticket, queue, ifunused, ifempty, nowait string name = queue.getName(); - AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(0, name, ifunused, ifempty, !synch)); + AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)); if(synch){ - sendAndReceive(frame, queue_delete_ok); + sendAndReceive(frame, method_bodies.queue_delete_ok); }else{ out->send(frame); } @@ -110,9 +116,9 @@ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch) void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ string e = exchange.getName(); string q = queue.getName(); - AMQFrame* frame = new AMQFrame(id, new QueueBindBody(0, q, e, key,!synch, args)); + AMQFrame* frame = new AMQFrame(id, new QueueBindBody(version, 0, q, e, key,!synch, args)); if(synch){ - sendAndReceive(frame, queue_bind_ok); + sendAndReceive(frame, method_bodies.queue_bind_ok); }else{ out->send(frame); } @@ -122,9 +128,9 @@ void Channel::consume(Queue& queue, std::string& tag, MessageListener* listener, int ackMode, bool noLocal, bool synch){ string q = queue.getName(); - AMQFrame* frame = new AMQFrame(id, new BasicConsumeBody(0, q, (string&) tag, noLocal, ackMode == NO_ACK, false, !synch)); + AMQFrame* frame = new AMQFrame(id, new BasicConsumeBody(version, 0, q, (string&) tag, noLocal, ackMode == NO_ACK, false, !synch)); if(synch){ - sendAndReceive(frame, basic_consume_ok); + sendAndReceive(frame, method_bodies.basic_consume_ok); BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse()); tag = response->getConsumerTag(); }else{ @@ -140,12 +146,12 @@ void Channel::consume(Queue& queue, std::string& tag, MessageListener* listener, void Channel::cancel(std::string& tag, bool synch){ Consumer* c = consumers[tag]; if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){ - out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true))); + out->send(new AMQFrame(id, new BasicAckBody(version, c->lastDeliveryTag, true))); } - AMQFrame* frame = new AMQFrame(id, new BasicCancelBody((string&) tag, !synch)); + AMQFrame* frame = new AMQFrame(id, new BasicCancelBody(version, (string&) tag, !synch)); if(synch){ - sendAndReceive(frame, basic_cancel_ok); + sendAndReceive(frame, method_bodies.basic_cancel_ok); }else{ out->send(frame); } @@ -181,12 +187,12 @@ void Channel::retrieve(Message& msg){ bool Channel::get(Message& msg, const Queue& queue, int ackMode){ string name = queue.getName(); - AMQFrame* frame = new AMQFrame(id, new BasicGetBody(0, name, ackMode)); + AMQFrame* frame = new AMQFrame(id, new BasicGetBody(version, 0, name, ackMode)); responses.expect(); out->send(frame); responses.waitForResponse(); AMQMethodBody::shared_ptr response = responses.getResponse(); - if(basic_get_ok.match(response.get())){ + if(method_bodies.basic_get_ok.match(response.get())){ if(incoming != 0){ std::cout << "Existing message not complete" << std::endl; THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); @@ -195,7 +201,7 @@ bool Channel::get(Message& msg, const Queue& queue, int ackMode){ } retrieve(msg); return true; - }if(basic_get_empty.match(response.get())){ + }if(method_bodies.basic_get_empty.match(response.get())){ return false; }else{ THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get."); @@ -207,7 +213,7 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string& string e = exchange.getName(); string key = routingKey; - out->send(new AMQFrame(id, new BasicPublishBody(0, e, key, mandatory, immediate))); + out->send(new AMQFrame(id, new BasicPublishBody(version, 0, e, key, mandatory, immediate))); //break msg up into header frame and content frame(s) and send these string data = msg.getData(); msg.header->setContentSize(data.length()); @@ -233,38 +239,38 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string& } void Channel::commit(){ - AMQFrame* frame = new AMQFrame(id, new TxCommitBody()); - sendAndReceive(frame, tx_commit_ok); + AMQFrame* frame = new AMQFrame(id, new TxCommitBody(version)); + sendAndReceive(frame, method_bodies.tx_commit_ok); } void Channel::rollback(){ - AMQFrame* frame = new AMQFrame(id, new TxRollbackBody()); - sendAndReceive(frame, tx_rollback_ok); + AMQFrame* frame = new AMQFrame(id, new TxRollbackBody(version)); + sendAndReceive(frame, method_bodies.tx_rollback_ok); } void Channel::handleMethod(AMQMethodBody::shared_ptr body){ //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request if(responses.isWaiting()){ responses.signalResponse(body); - }else if(basic_deliver.match(body.get())){ + }else if(method_bodies.basic_deliver.match(body.get())){ if(incoming != 0){ std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl; THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); }else{ incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body)); } - }else if(basic_return.match(body.get())){ + }else if(method_bodies.basic_return.match(body.get())){ if(incoming != 0){ std::cout << "Existing message not complete" << std::endl; THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); }else{ incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body)); } - }else if(channel_close.match(body.get())){ + }else if(method_bodies.channel_close.match(body.get())){ con->removeChannel(this); //need to signal application that channel has been closed through exception - }else if(channel_flow.match(body.get())){ + }else if(method_bodies.channel_flow.match(body.get())){ }else{ //signal error diff --git a/cpp/src/qpid/client/Channel.h b/cpp/src/qpid/client/Channel.h index b2e08f5756..e850c1c626 100644 --- a/cpp/src/qpid/client/Channel.h +++ b/cpp/src/qpid/client/Channel.h @@ -65,6 +65,7 @@ namespace client { u_int16_t prefetch; const bool transactional; + qpid::framing::ProtocolVersion version; void enqueue(); void retrieve(Message& msg); diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index 93f170742a..de324fdab4 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -23,6 +23,7 @@ #include <qpid/client/Message.h> #include <qpid/QpidError.h> #include <iostream> +#include <qpid/client/MethodBodyInstances.h> using namespace qpid::client; using namespace qpid::framing; @@ -31,7 +32,11 @@ using namespace qpid::sys; u_int16_t Connection::channelIdCounter; -Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true){ +Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true), +// AMQP version management change - kpvdr 2006-11-20 +// TODO: Make this class version-aware and link these hard-wired numbers to that version + version(8, 0) +{ connector = new Connector(debug, _max_frame_size); } @@ -51,14 +56,14 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui ProtocolInitiation* header = new ProtocolInitiation(8, 0); responses.expect(); connector->init(header); - responses.receive(connection_start); + responses.receive(method_bodies.connection_start); FieldTable props; string mechanism("PLAIN"); string response = ((char)0) + uid + ((char)0) + pwd; string locale("en_US"); responses.expect(); - out->send(new AMQFrame(0, new ConnectionStartOkBody(props, mechanism, response, locale))); + out->send(new AMQFrame(0, new ConnectionStartOkBody(version, props, mechanism, response, locale))); /** * Assume for now that further challenges will not be required @@ -68,10 +73,10 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui out->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); **/ - responses.receive(connection_tune); + responses.receive(method_bodies.connection_tune); ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse()); - out->send(new AMQFrame(0, new ConnectionTuneOkBody(proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat()))); + out->send(new AMQFrame(0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat()))); u_int16_t heartbeat = proposal->getHeartbeat(); connector->setReadTimeout(heartbeat * 2); @@ -81,12 +86,12 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui string capabilities; string vhost = virtualhost; responses.expect(); - out->send(new AMQFrame(0, new ConnectionOpenBody(vhost, capabilities, true))); + out->send(new AMQFrame(0, new ConnectionOpenBody(version, vhost, capabilities, true))); //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true). responses.waitForResponse(); - if(responses.validate(connection_open_ok)){ + if(responses.validate(method_bodies.connection_open_ok)){ //ok - }else if(responses.validate(connection_redirect)){ + }else if(responses.validate(method_bodies.connection_redirect)){ //ignore for now ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse())); std::cout << "Received redirection to " << redirect->getHost() << std::endl; @@ -103,7 +108,7 @@ void Connection::close(){ u_int16_t classId(0); u_int16_t methodId(0); - sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, text, classId, methodId)), connection_close_ok); + sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok); connector->close(); } } @@ -115,7 +120,7 @@ void Connection::openChannel(Channel* channel){ channels[channel->id] = channel; //now send frame to open channel and wait for response string oob; - channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(oob)), channel_open_ok); + channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok); channel->setQos(); channel->closed = false; } @@ -133,7 +138,7 @@ void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_ //send frame to close channel channel->cancelAll(); channel->closed = true; - channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(code, text, classId, methodId)), channel_close_ok); + channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok); channel->con = 0; channel->out = 0; removeChannel(channel); @@ -171,7 +176,7 @@ void Connection::handleMethod(AMQMethodBody::shared_ptr body){ //connection.close, basic.deliver, basic.return or a response to a synchronous request if(responses.isWaiting()){ responses.signalResponse(body); - }else if(connection_close.match(body.get())){ + }else if(method_bodies.connection_close.match(body.get())){ //send back close ok //close socket ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get()); @@ -206,7 +211,7 @@ void Connection::error(int code, const string& msg, int classid, int methodid){ std::cout << " [" << methodid << ":" << classid << "]"; } std::cout << std::endl; - sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, msg, classid, methodid)), connection_close_ok); + sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok); connector->close(); } diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index 340ebe9a0f..c7b1fb8dd0 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -59,6 +59,7 @@ class Connection : public virtual qpid::framing::InputHandler, qpid::framing::OutputHandler* out; ResponseHandler responses; volatile bool closed; + qpid::framing::ProtocolVersion version; void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e); void error(int code, const std::string& msg, int classid = 0, int methodid = 0); diff --git a/cpp/src/qpid/client/MethodBodyInstances.h b/cpp/src/qpid/client/MethodBodyInstances.h new file mode 100644 index 0000000000..a2bd9dadd9 --- /dev/null +++ b/cpp/src/qpid/client/MethodBodyInstances.h @@ -0,0 +1,101 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <qpid/framing/amqp_framing.h> + +/** + * This file replaces the auto-generated instances in the former + * amqp_methods.h file. Add additional instances as needed. + */ + +#ifndef _MethodBodyInstances_h_ +#define _MethodBodyInstances_h_ + +namespace qpid { +namespace client { + +class MethodBodyInstances +{ +private: + qpid::framing::ProtocolVersion version; +public: + const qpid::framing::BasicCancelOkBody basic_cancel_ok; + const qpid::framing::BasicConsumeOkBody basic_consume_ok; + const qpid::framing::BasicDeliverBody basic_deliver; + const qpid::framing::BasicGetEmptyBody basic_get_empty; + const qpid::framing::BasicGetOkBody basic_get_ok; + const qpid::framing::BasicQosBody basic_qos_ok; + const qpid::framing::BasicReturnBody basic_return; + const qpid::framing::ChannelCloseBody channel_close; + const qpid::framing::ChannelCloseOkBody channel_close_ok; + const qpid::framing::ChannelFlowBody channel_flow; + const qpid::framing::ChannelOpenOkBody channel_open_ok; + const qpid::framing::ConnectionCloseBody connection_close; + const qpid::framing::ConnectionCloseOkBody connection_close_ok; + const qpid::framing::ConnectionOpenOkBody connection_open_ok; + const qpid::framing::ConnectionRedirectBody connection_redirect; + const qpid::framing::ConnectionStartBody connection_start; + const qpid::framing::ConnectionTuneBody connection_tune; + const qpid::framing::ExchangeDeclareOkBody exchange_declare_ok; + const qpid::framing::ExchangeDeleteOkBody exchange_delete_ok; + const qpid::framing::QueueDeclareOkBody queue_declare_ok; + const qpid::framing::QueueDeleteOkBody queue_delete_ok; + const qpid::framing::QueueBindOkBody queue_bind_ok; + const qpid::framing::TxCommitOkBody tx_commit_ok; + const qpid::framing::TxRollbackOkBody tx_rollback_ok; + const qpid::framing::TxSelectOkBody tx_select_ok; + + MethodBodyInstances(u_int8_t major, u_int8_t minor) : + version(major, minor), + basic_cancel_ok(version), + basic_consume_ok(version), + basic_deliver(version), + basic_get_empty(version), + basic_get_ok(version), + basic_qos_ok(version), + basic_return(version), + channel_close(version), + channel_close_ok(version), + channel_flow(version), + channel_open_ok(version), + connection_close(version), + connection_close_ok(version), + connection_open_ok(version), + connection_redirect(version), + connection_start(version), + connection_tune(version), + exchange_declare_ok(version), + exchange_delete_ok(version), + queue_declare_ok(version), + queue_delete_ok(version), + queue_bind_ok(version), + tx_commit_ok(version), + tx_rollback_ok(version), + tx_select_ok(version) + {} + +}; + +static MethodBodyInstances method_bodies(8, 0); + +} // namespace client +} // namespace qpid + +#endif |
