1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "ConnectionImpl.h"
#include <boost/bind.hpp>
#include <boost/format.hpp>
using namespace qpid::client;
using namespace qpid::framing;
ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c)
{
handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
handler.out = boost::bind(&Connector::send, connector, _1);
handler.onClose = boost::bind(&ConnectionImpl::closed, this);
handler.onError = boost::bind(&ConnectionImpl::closedByPeer, this, _1, _2);
connector->setInputHandler(&handler);
connector->setTimeoutHandler(this);
connector->setShutdownHandler(this);
}
void ConnectionImpl::allocated(SessionCore::shared_ptr session)
{
if (sessions.find(session->getId()) != sessions.end()) {
throw Exception("Id already in use.");
}
sessions[session->getId()] = session;
}
void ConnectionImpl::released(SessionCore::shared_ptr session)
{
SessionMap::iterator i = sessions.find(session->getId());
if (i != sessions.end()) {
sessions.erase(i);
}
}
void ConnectionImpl::handle(framing::AMQFrame& frame)
{
handler.outgoing(frame);
}
void ConnectionImpl::incoming(framing::AMQFrame& frame)
{
uint16_t id = frame.getChannel();
SessionCore::shared_ptr session = sessions[id];
if (!session) {
throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
}
session->handle(frame);
}
void ConnectionImpl::open(const std::string& host, int port,
const std::string& uid, const std::string& pwd,
const std::string& vhost)
{
//TODO: better management of connection properties
handler.uid = uid;
handler.pwd = pwd;
handler.vhost = vhost;
connector->connect(host, port);
connector->init();
handler.waitForOpen();
}
void ConnectionImpl::close()
{
handler.close();
}
void ConnectionImpl::closed()
{
closedByPeer(200, "OK");
}
void ConnectionImpl::closedByPeer(uint16_t code, const std::string& text)
{
for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
i->second->closed(code, text);
}
sessions.clear();
connector->close();
}
void ConnectionImpl::idleIn()
{
connector->close();
}
void ConnectionImpl::idleOut()
{
AMQFrame frame(0, new AMQHeartbeatBody());
connector->send(frame);
}
void ConnectionImpl::shutdown() {
//this indicates that the socket to the server has closed
for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
i->second->closed(0, "Unexpected socket closure.");
}
sessions.clear();
}
|