summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ClientChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/ClientChannel.cpp')
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp107
1 files changed, 28 insertions, 79 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index 8b85017ba0..f407b5a2f9 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -49,28 +49,19 @@ const std::string empty;
}}
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
- connection(0), prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false)
+ prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false)
{
}
-Channel::~Channel(){
- closeInternal();
-}
+Channel::~Channel(){}
-void Channel::open(ChannelId id, Connection& con)
+void Channel::open(ConnectionImpl::shared_ptr c, SessionCore::shared_ptr s)
{
if (isOpen())
- THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id);
- connection = &con;
- channelId = id;
- //link up handlers:
- channelHandler.out = boost::bind(&ConnectionHandler::outgoing, &(connection->handler), _1);
- channelHandler.in = boost::bind(&ExecutionHandler::handle, &executionHandler, _1);
- executionHandler.out = boost::bind(&ChannelHandler::outgoing, &channelHandler, _1);
- //set up close notification:
- channelHandler.onClose = boost::bind(&Channel::peerClose, this, _1, _2);
-
- channelHandler.open(id);
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
+
+ connection = c;
+ session = s;
}
bool Channel::isOpen() const {
@@ -79,10 +70,10 @@ bool Channel::isOpen() const {
}
void Channel::setQos() {
- executionHandler.send(make_shared_ptr(new BasicQosBody(version, 0, getPrefetch(), false)));
+ sendSync(false, make_shared_ptr(new BasicQosBody(version, 0, getPrefetch(), false)));
if(isTransactional()) {
//I think this is wrong! should only send TxSelect once...
- executionHandler.send(make_shared_ptr(new TxSelectBody(version)));
+ sendSync(false, make_shared_ptr(new TxSelectBody(version)));
}
}
@@ -133,63 +124,52 @@ void Channel::bind(const Exchange& exchange, const Queue& queue, const std::stri
}
void Channel::commit(){
- executionHandler.send(make_shared_ptr(new TxCommitBody(version)));
+ sendSync(false, make_shared_ptr(new TxCommitBody(version)));
}
void Channel::rollback(){
- executionHandler.send(make_shared_ptr(new TxRollbackBody(version)));
+ sendSync(false, make_shared_ptr(new TxRollbackBody(version)));
}
void Channel::close()
{
- channelHandler.close();
+ session->close();
{
Mutex::ScopedLock l(lock);
if (connection);
{
- connection->erase(channelId);
- connection = 0;
+ connection->released(session);
+ connection.reset();
}
}
stop();
}
-
// Channel closed by peer.
void Channel::peerClose(uint16_t code, const std::string& message) {
assert(isOpen());
//record reason:
errorCode = code;
errorText = message;
- closeInternal();
stop();
- futures.close(code, message);
-}
-
-void Channel::closeInternal() {
- Mutex::ScopedLock l(lock);
- if (connection);
- {
- connection = 0;
- }
}
AMQMethodBody::shared_ptr Channel::sendAndReceive(AMQMethodBody::shared_ptr toSend, ClassId /*c*/, MethodId /*m*/)
{
-
- boost::shared_ptr<FutureResponse> fr(futures.createResponse());
- executionHandler.send(toSend, boost::bind(&FutureResponse::completed, fr), boost::bind(&FutureResponse::received, fr, _1));
- return fr->getResponse();
+ session->setSync(true);
+ Response r = session->send(toSend, true);
+ session->setSync(false);
+ return r.getPtr();
}
void Channel::sendSync(bool sync, AMQMethodBody::shared_ptr command)
{
if(sync) {
- boost::shared_ptr<FutureCompletion> fc(futures.createCompletion());
- executionHandler.send(command, boost::bind(&FutureCompletion::completed, fc));
- fc->waitForCompletion();
+ session->setSync(true);
+ session->send(command, false);
+ session->setSync(false);
} else {
- executionHandler.send(command);
+ session->send(command);
}
}
@@ -199,7 +179,7 @@ AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
if(sync)
return sendAndReceive(body, c, m);
else {
- executionHandler.send(body);
+ session->send(body);
return AMQMethodBody::shared_ptr();
}
}
@@ -246,8 +226,8 @@ void Channel::cancel(const std::string& tag, bool synch) {
bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
AMQMethodBody::shared_ptr request(new BasicGetBody(version, 0, queue.getName(), ackMode));
- AMQMethodBody::shared_ptr response = sendAndReceive(request);
- if (response && response->isA<BasicGetEmptyBody>()) {
+ Response response = session->send(request, true);
+ if (response.isA<BasicGetEmptyBody>()) {
return false;
} else {
ReceivedContent::shared_ptr content = gets.pop();
@@ -263,38 +243,7 @@ void Channel::publish(const Message& msg, const Exchange& exchange,
const string e = exchange.getName();
string key = routingKey;
- executionHandler.sendContent(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)),
- msg, msg.getData(), connection->getMaxFrameSize());//sending framesize here is horrible, fix this!
- /*
- // Make a header for the message
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- BasicHeaderProperties::copy(
- *static_cast<BasicHeaderProperties*>(header->getProperties()), msg);
- header->setContentSize(msg.getData().size());
-
- executionHandler.send(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
- executionHandler.sendContent(header);
- string data = msg.getData();
- u_int64_t data_length = data.length();
- if(data_length > 0){
- //frame itself uses 8 bytes
- u_int32_t frag_size = connection->getMaxFrameSize() - 8;
- if(data_length < frag_size){
- executionHandler.sendContent(make_shared_ptr(new AMQContentBody(data)));
- }else{
- u_int32_t offset = 0;
- u_int32_t remaining = data_length - offset;
- while (remaining > 0) {
- u_int32_t length = remaining > frag_size ? frag_size : remaining;
- string frag(data.substr(offset, length));
- executionHandler.sendContent(make_shared_ptr(new AMQContentBody(frag)));
-
- offset += length;
- remaining = data_length - offset;
- }
- }
- }
- */
+ session->send(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)), msg, false);
}
void Channel::start(){
@@ -303,7 +252,7 @@ void Channel::start(){
}
void Channel::stop() {
- executionHandler.received.close();
+ session->stop();
gets.close();
Mutex::ScopedLock l(stopLock);
if(running) {
@@ -315,7 +264,7 @@ void Channel::stop() {
void Channel::run() {
try {
while (true) {
- ReceivedContent::shared_ptr content = executionHandler.received.pop();
+ ReceivedContent::shared_ptr content = session->get();
//need to dispatch this to the relevant listener:
if (content->isA<BasicDeliverBody>()) {
ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag());