summaryrefslogtreecommitdiff
path: root/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp')
-rw-r--r--wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp109
1 files changed, 103 insertions, 6 deletions
diff --git a/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp b/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp
index c3afdf2280..1bc9a15d92 100644
--- a/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp
+++ b/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp
@@ -47,10 +47,9 @@ using namespace qpid::client;
using namespace std;
-// Note on locks: Use "this" for fast counting and idle/busy
+// Note on locks: Use thisLock for fast counting and idle/busy
// notifications. Use the "sessions" list to serialize session
// creation/reaping and overall tear down.
-// TODO: switch "this" lock to separate non-visible Object.
AmqpConnection::AmqpConnection(String^ server, int port) :
@@ -58,19 +57,65 @@ AmqpConnection::AmqpConnection(String^ server, int port) :
busyCount(0),
disposed(false)
{
+ initialize (server, port, false, false, nullptr, nullptr);
+}
+
+AmqpConnection::AmqpConnection(System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password) :
+ connectionp(NULL),
+ busyCount(0),
+ disposed(false)
+{
+ initialize (server, port, ssl, saslPlain, username, password);
+}
+
+void AmqpConnection::initialize(System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password)
+{
+ if (server == nullptr)
+ throw gcnew ArgumentNullException("AMQP server");
+ if (saslPlain) {
+ if (username == nullptr)
+ throw gcnew ArgumentNullException("username");
+ if (username == nullptr)
+ throw gcnew ArgumentNullException("password");
+ }
+
bool success = false;
System::Exception^ openException = nullptr;
sessions = gcnew Collections::Generic::List<AmqpSession^>();
+ thisLock = gcnew Object();
try {
connectionp = new Connection;
- connectionp->open (QpidMarshal::ToNative(server), port);
+
+ if (ssl || saslPlain) {
+ ConnectionSettings proposedSettings;
+ proposedSettings.host = QpidMarshal::ToNative(server);
+ proposedSettings.port = port;
+ if (ssl)
+ proposedSettings.protocol = "ssl";
+
+ if (saslPlain) {
+ proposedSettings.username = QpidMarshal::ToNative(username);
+ proposedSettings.password = QpidMarshal::ToNative(password);
+ proposedSettings.mechanism = "PLAIN";
+ }
+
+ connectionp->open (proposedSettings);
+ }
+ else {
+ connectionp->open (QpidMarshal::ToNative(server), port);
+ }
+
// TODO: registerFailureCallback for failover
success = true;
const ConnectionSettings& settings = connectionp->getNegotiatedSettings();
this->maxFrameSize = settings.maxFrameSize;
this->host = server;
this->port = port;
+ this->ssl = ssl;
+ this->saslPlain = saslPlain;
+ this->username = username;
+ this->password = password;
this->isOpen = true;
} catch (const qpid::Exception& error) {
String^ errmsg = gcnew String(error.what());
@@ -89,7 +134,7 @@ AmqpConnection::AmqpConnection(String^ server, int port) :
AmqpConnection^ AmqpConnection::Clone() {
if (disposed)
throw gcnew ObjectDisposedException("AmqpConnection.Clone");
- return gcnew AmqpConnection (this->host, this->port);
+ return gcnew AmqpConnection (this->host, this->port, this->ssl, this->saslPlain, this->username, this->password);
}
void AmqpConnection::Cleanup()
@@ -153,7 +198,7 @@ void AmqpConnection::NotifyBusy()
{
bool changed = false;
{
- lock l(this);
+ lock l(thisLock);
if (busyCount++ == 0)
changed = true;
}
@@ -166,7 +211,7 @@ void AmqpConnection::NotifyIdle()
{
bool connectionIdle = false;
{
- lock l(this);
+ lock l(thisLock);
if (--busyCount == 0)
connectionIdle = true;
}
@@ -175,5 +220,57 @@ void AmqpConnection::NotifyIdle()
}
}
+void HexAppend(StringBuilder^ sb, String^ s) {
+ if (s->Length > 0) {
+ array<unsigned char>^ bytes = Encoding::UTF8->GetBytes(s);
+ for each (unsigned char b in bytes) {
+ sb->Append(String::Format("{0:x2}", b));
+ }
+ }
+ sb->Append(".");
+}
+
+
+// Note: any change to this format has to be reflected in the DTC plugin's xa_open()
+// for now: "QPIDdsnV2.port.host.instance_id.SSL_tf.SASL_mech.username.password"
+// This extended info is needed so that the DTC can make a separate connection to the broker
+// for recovery.
+
+String^ AmqpConnection::DataSourceName::get() {
+ if (dataSourceName == nullptr) {
+ StringBuilder^ sb = gcnew StringBuilder();
+ sb->Append("QPIDdsnV2.");
+
+ sb->Append(this->port);
+ sb->Append(".");
+
+ HexAppend(sb, this->host);
+
+ sb->Append(System::Diagnostics::Process::GetCurrentProcess()->Id);
+ sb->Append("-");
+ sb->Append(AppDomain::CurrentDomain->Id);
+ sb->Append(".");
+
+ if (this->ssl)
+ sb->Append("T");
+ else
+ sb->Append("F");
+ sb->Append(".");
+
+ if (this->saslPlain) {
+ sb->Append("P.");
+ HexAppend(sb, this->username);
+ HexAppend(sb, this->password);
+ }
+ else {
+ // SASL anonymous
+ sb->Append("A.");
+ }
+
+ dataSourceName = sb->ToString();
+ }
+ return dataSourceName;
+}
+
}}} // namespace Apache::Qpid::Interop