From b2efcb6ed3e1e2104836928cda81ed69f2f24559 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Sun, 5 Aug 2007 13:25:36 +0000 Subject: Added first cut of generated client interface. Old channel interface still supported; shares SessionCore with the new interface. Todo: allow applications to signal completion of received commands; keywrod args for interface. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@562866 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/client/ClientChannel.cpp | 107 +++++++++------------------------- 1 file changed, 28 insertions(+), 79 deletions(-) (limited to 'cpp/src/qpid/client/ClientChannel.cpp') diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index 8b85017ba0..f407b5a2f9 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -49,28 +49,19 @@ const std::string empty; }} Channel::Channel(bool _transactional, u_int16_t _prefetch) : - connection(0), prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false) + prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false) { } -Channel::~Channel(){ - closeInternal(); -} +Channel::~Channel(){} -void Channel::open(ChannelId id, Connection& con) +void Channel::open(ConnectionImpl::shared_ptr c, SessionCore::shared_ptr s) { if (isOpen()) - THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id); - connection = &con; - channelId = id; - //link up handlers: - channelHandler.out = boost::bind(&ConnectionHandler::outgoing, &(connection->handler), _1); - channelHandler.in = boost::bind(&ExecutionHandler::handle, &executionHandler, _1); - executionHandler.out = boost::bind(&ChannelHandler::outgoing, &channelHandler, _1); - //set up close notification: - channelHandler.onClose = boost::bind(&Channel::peerClose, this, _1, _2); - - channelHandler.open(id); + THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel"); + + connection = c; + session = s; } bool Channel::isOpen() const { @@ -79,10 +70,10 @@ bool Channel::isOpen() const { } void Channel::setQos() { - executionHandler.send(make_shared_ptr(new BasicQosBody(version, 0, getPrefetch(), false))); + sendSync(false, make_shared_ptr(new BasicQosBody(version, 0, getPrefetch(), false))); if(isTransactional()) { //I think this is wrong! should only send TxSelect once... - executionHandler.send(make_shared_ptr(new TxSelectBody(version))); + sendSync(false, make_shared_ptr(new TxSelectBody(version))); } } @@ -133,63 +124,52 @@ void Channel::bind(const Exchange& exchange, const Queue& queue, const std::stri } void Channel::commit(){ - executionHandler.send(make_shared_ptr(new TxCommitBody(version))); + sendSync(false, make_shared_ptr(new TxCommitBody(version))); } void Channel::rollback(){ - executionHandler.send(make_shared_ptr(new TxRollbackBody(version))); + sendSync(false, make_shared_ptr(new TxRollbackBody(version))); } void Channel::close() { - channelHandler.close(); + session->close(); { Mutex::ScopedLock l(lock); if (connection); { - connection->erase(channelId); - connection = 0; + connection->released(session); + connection.reset(); } } stop(); } - // Channel closed by peer. void Channel::peerClose(uint16_t code, const std::string& message) { assert(isOpen()); //record reason: errorCode = code; errorText = message; - closeInternal(); stop(); - futures.close(code, message); -} - -void Channel::closeInternal() { - Mutex::ScopedLock l(lock); - if (connection); - { - connection = 0; - } } AMQMethodBody::shared_ptr Channel::sendAndReceive(AMQMethodBody::shared_ptr toSend, ClassId /*c*/, MethodId /*m*/) { - - boost::shared_ptr fr(futures.createResponse()); - executionHandler.send(toSend, boost::bind(&FutureResponse::completed, fr), boost::bind(&FutureResponse::received, fr, _1)); - return fr->getResponse(); + session->setSync(true); + Response r = session->send(toSend, true); + session->setSync(false); + return r.getPtr(); } void Channel::sendSync(bool sync, AMQMethodBody::shared_ptr command) { if(sync) { - boost::shared_ptr fc(futures.createCompletion()); - executionHandler.send(command, boost::bind(&FutureCompletion::completed, fc)); - fc->waitForCompletion(); + session->setSync(true); + session->send(command, false); + session->setSync(false); } else { - executionHandler.send(command); + session->send(command); } } @@ -199,7 +179,7 @@ AMQMethodBody::shared_ptr Channel::sendAndReceiveSync( if(sync) return sendAndReceive(body, c, m); else { - executionHandler.send(body); + session->send(body); return AMQMethodBody::shared_ptr(); } } @@ -246,8 +226,8 @@ void Channel::cancel(const std::string& tag, bool synch) { bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { AMQMethodBody::shared_ptr request(new BasicGetBody(version, 0, queue.getName(), ackMode)); - AMQMethodBody::shared_ptr response = sendAndReceive(request); - if (response && response->isA()) { + Response response = session->send(request, true); + if (response.isA()) { return false; } else { ReceivedContent::shared_ptr content = gets.pop(); @@ -263,38 +243,7 @@ void Channel::publish(const Message& msg, const Exchange& exchange, const string e = exchange.getName(); string key = routingKey; - executionHandler.sendContent(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)), - msg, msg.getData(), connection->getMaxFrameSize());//sending framesize here is horrible, fix this! - /* - // Make a header for the message - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - BasicHeaderProperties::copy( - *static_cast(header->getProperties()), msg); - header->setContentSize(msg.getData().size()); - - executionHandler.send(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate))); - executionHandler.sendContent(header); - string data = msg.getData(); - u_int64_t data_length = data.length(); - if(data_length > 0){ - //frame itself uses 8 bytes - u_int32_t frag_size = connection->getMaxFrameSize() - 8; - if(data_length < frag_size){ - executionHandler.sendContent(make_shared_ptr(new AMQContentBody(data))); - }else{ - u_int32_t offset = 0; - u_int32_t remaining = data_length - offset; - while (remaining > 0) { - u_int32_t length = remaining > frag_size ? frag_size : remaining; - string frag(data.substr(offset, length)); - executionHandler.sendContent(make_shared_ptr(new AMQContentBody(frag))); - - offset += length; - remaining = data_length - offset; - } - } - } - */ + session->send(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)), msg, false); } void Channel::start(){ @@ -303,7 +252,7 @@ void Channel::start(){ } void Channel::stop() { - executionHandler.received.close(); + session->stop(); gets.close(); Mutex::ScopedLock l(stopLock); if(running) { @@ -315,7 +264,7 @@ void Channel::stop() { void Channel::run() { try { while (true) { - ReceivedContent::shared_ptr content = executionHandler.received.pop(); + ReceivedContent::shared_ptr content = session->get(); //need to dispatch this to the relevant listener: if (content->isA()) { ConsumerMap::iterator i = consumers.find(content->as()->getConsumerTag()); -- cgit v1.2.1