summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-01-07 22:47:17 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-01-07 22:47:17 +0000
commit0129e12deaabcf3cf3be23913967397be6a12e3a (patch)
tree1ff521e7be49675201bf66f96e4956dc20bac0a8 /java
parentad776f381e2690c58c37c33d23b2389da1b2028e (diff)
downloadqpid-python-0129e12deaabcf3cf3be23913967397be6a12e3a.tar.gz
QPID-946 , QPID-2379 : QMF and Federation fixes (now works again with qpid-config, qpid-route, qpid-tool) and store (durable) routes in the DB
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1228748 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java186
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java61
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java50
-rw-r--r--java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandCompletionCommand.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java65
-rw-r--r--java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java202
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfig.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java101
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java199
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java13
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java459
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java13
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java3
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java40
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java48
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java6
-rw-r--r--java/broker/src/xsl/qmf.xsl16
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java29
30 files changed, 1502 insertions, 145 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 1d8187401d..9efa2937aa 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -27,13 +27,16 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
+import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.bind.tuple.StringBinding;
import com.sleepycat.je.*;
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
@@ -41,6 +44,8 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
@@ -100,12 +105,17 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private String DELIVERYDB_NAME = "deliveryDb";
private String EXCHANGEDB_NAME = "exchangeDb";
private String QUEUEDB_NAME = "queueDb";
+ private String BRIDGEDB_NAME = "bridges";
+ private String LINKDB_NAME = "links";
+
private Database _messageMetaDataDb;
private Database _messageContentDb;
private Database _queueBindingsDb;
private Database _deliveryDb;
private Database _exchangeDb;
private Database _queueDb;
+ private Database _bridgeDb;
+ private Database _linkDb;
/* =======
* Schema:
@@ -190,6 +200,10 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
EXCHANGEDB_NAME += "_v" + version;
QUEUEBINDINGSDB_NAME += "_v" + version;
+
+ LINKDB_NAME += "_v" + version;
+
+ BRIDGEDB_NAME += "_v" + version;
}
}
@@ -461,6 +475,9 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
_queueBindingsDb = _environment.openDatabase(null, QUEUEBINDINGSDB_NAME, dbConfig);
_messageContentDb = _environment.openDatabase(null, MESSAGECONTENTDB_NAME, dbConfig);
_deliveryDb = _environment.openDatabase(null, DELIVERYDB_NAME, dbConfig);
+ _linkDb = _environment.openDatabase(null, LINKDB_NAME, dbConfig);
+ _bridgeDb = _environment.openDatabase(null, BRIDGEDB_NAME, dbConfig);
+
}
@@ -517,6 +534,18 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
_deliveryDb.close();
}
+ if (_bridgeDb != null)
+ {
+ _log.info("Close bridge database");
+ _bridgeDb.close();
+ }
+
+ if (_linkDb != null)
+ {
+ _log.info("Close link database");
+ _linkDb.close();
+ }
+
closeEnvironment();
_state = State.CLOSED;
@@ -556,8 +585,9 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
BindingRecoveryHandler brh = erh.completeExchangeRecovery();
recoverBindings(brh);
-
- brh.completeBindingRecovery();
+
+ ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
+ recoverBrokerLinks(lrh);
}
catch (DatabaseException e)
{
@@ -674,6 +704,74 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
+
+ private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh)
+ {
+ Cursor cursor = null;
+
+ try
+ {
+ cursor = _linkDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
+ long createTime = LongBinding.entryToLong(value);
+ Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value);
+
+ ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments);
+
+ recoverBridges(brh, id);
+ }
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ }
+
+ }
+
+ private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId)
+ {
+ Cursor cursor = null;
+
+ try
+ {
+ cursor = _bridgeDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
+
+ UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
+ if(parentId.equals(linkId))
+ {
+
+ long createTime = LongBinding.entryToLong(value);
+ Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value);
+ brh.bridge(id,createTime,arguments);
+ }
+ }
+ brh.completeBridgeRecoveryForLink();
+ }
+ finally
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ }
+ }
+
+ }
+
+
private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException
{
StoredMessageRecoveryHandler mrh = msrh.begin();
@@ -1163,6 +1261,90 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
}
+ public void createBrokerLink(final BrokerLink link) throws AMQStoreException
+ {
+ if (_state != State.RECOVERING)
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ UUIDTupleBinding.getInstance().objectToEntry(link.getId(), key);
+
+ DatabaseEntry value = new DatabaseEntry();
+ LongBinding.longToEntry(link.getCreateTime(),value);
+ StringMapBinding.getInstance().objectToEntry(link.getArguments(), value);
+
+ try
+ {
+ _linkDb.put(null, key, value);
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error writing Link " + link
+ + " to database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ UUIDTupleBinding.getInstance().objectToEntry(link.getId(), key);
+ try
+ {
+ OperationStatus status = _linkDb.delete(null, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ throw new AMQStoreException("Link " + link + " not found");
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error deleting the Link " + link + " from database: " + e.getMessage(), e);
+ }
+ }
+
+ public void createBridge(final Bridge bridge) throws AMQStoreException
+ {
+ if (_state != State.RECOVERING)
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ UUIDTupleBinding.getInstance().objectToEntry(bridge.getId(), key);
+
+ DatabaseEntry value = new DatabaseEntry();
+ UUIDTupleBinding.getInstance().objectToEntry(bridge.getLink().getId(),value);
+ LongBinding.longToEntry(bridge.getCreateTime(),value);
+ StringMapBinding.getInstance().objectToEntry(bridge.getArguments(), value);
+
+ try
+ {
+ _bridgeDb.put(null, key, value);
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error writing Bridge " + bridge
+ + " to database: " + e.getMessage(), e);
+ }
+
+ }
+ }
+
+ public void deleteBridge(final Bridge bridge) throws AMQStoreException
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ UUIDTupleBinding.getInstance().objectToEntry(bridge.getId(), key);
+ try
+ {
+ OperationStatus status = _bridgeDb.delete(null, key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ throw new AMQStoreException("Bridge " + bridge + " not found");
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw new AMQStoreException("Error deleting the Bridge " + bridge + " from database: " + e.getMessage(), e);
+ }
+ }
+
/**
* Places a message onto a specified queue, in a given transaction.
*
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java
new file mode 100644
index 0000000000..f8fd39e127
--- /dev/null
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class StringMapBinding extends TupleBinding<Map<String,String>>
+{
+
+ private static final StringMapBinding INSTANCE = new StringMapBinding();
+
+ public Map<String, String> entryToObject(final TupleInput tupleInput)
+ {
+ int entries = tupleInput.readInt();
+ Map<String,String> map = new HashMap<String,String>(entries);
+ for(int i = 0; i < entries; i++)
+ {
+ map.put(tupleInput.readString(), tupleInput.readString());
+ }
+ return map;
+ }
+
+
+ public void objectToEntry(final Map<String, String> stringStringMap, final TupleOutput tupleOutput)
+ {
+ tupleOutput.writeInt(stringStringMap.size());
+ for(Map.Entry<String,String> entry : stringStringMap.entrySet())
+ {
+ tupleOutput.writeString(entry.getKey());
+ tupleOutput.writeString(entry.getValue());
+ }
+ }
+
+ public static StringMapBinding getInstance()
+ {
+ return INSTANCE;
+ }
+}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java
new file mode 100644
index 0000000000..c1a5d473f0
--- /dev/null
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+import java.util.UUID;
+
+public class UUIDTupleBinding extends TupleBinding<UUID>
+{
+ private static final UUIDTupleBinding INSTANCE = new UUIDTupleBinding();
+
+ public UUID entryToObject(final TupleInput tupleInput)
+ {
+ return new UUID(tupleInput.readLong(), tupleInput.readLong());
+ }
+
+ public void objectToEntry(final UUID uuid, final TupleOutput tupleOutput)
+ {
+ tupleOutput.writeLong(uuid.getMostSignificantBits());
+ tupleOutput.writeLong(uuid.getLeastSignificantBits());
+ }
+
+ public static UUIDTupleBinding getInstance()
+ {
+ return INSTANCE;
+ }
+
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java
index 9ee8b923fc..709b59588d 100644
--- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java
+++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java
@@ -21,6 +21,7 @@
package org.apache.qpid.qmf;
+import org.apache.log4j.Logger;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.transport.codec.BBEncoder;
import org.apache.qpid.server.message.ServerMessage;
@@ -37,6 +38,9 @@ import java.util.List;
public class QMFBrokerRequestCommand extends QMFCommand
{
+ private static final Logger _qmfLogger = Logger.getLogger("qpid.qmf");
+
+
public QMFBrokerRequestCommand(QMFCommandHeader header, BBDecoder buf)
{
super(header);
@@ -47,6 +51,8 @@ public class QMFBrokerRequestCommand extends QMFCommand
String exchangeName = message.getMessageHeader().getReplyToExchange();
String queueName = message.getMessageHeader().getReplyToRoutingKey();
+ _qmfLogger.debug("Execute: " + this);
+
QMFCommand[] commands = new QMFCommand[2];
commands[0] = new QMFBrokerResponseCommand(this, virtualHost);
commands[1] = new QMFCommandCompletionCommand(this);
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java
index 5d2717a9fb..64edc2f294 100644
--- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java
+++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java
@@ -21,6 +21,7 @@
package org.apache.qpid.qmf;
+import org.apache.log4j.Logger;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.message.ServerMessage;
@@ -35,6 +36,9 @@ import java.util.List;
public class QMFClassQueryCommand extends QMFCommand
{
+ private static final Logger _qmfLogger = Logger.getLogger("qpid.qmf");
+
+
private final String _package;
public QMFClassQueryCommand(QMFCommandHeader header, BBDecoder decoder)
@@ -48,6 +52,8 @@ public class QMFClassQueryCommand extends QMFCommand
String exchangeName = message.getMessageHeader().getReplyToExchange();
String routingKey = message.getMessageHeader().getReplyToRoutingKey();
+ _qmfLogger.debug("Execute: " + this);
+
IApplicationRegistry appRegistry = virtualHost.getApplicationRegistry();
QMFService service = appRegistry.getQMFService();
@@ -88,4 +94,12 @@ public class QMFClassQueryCommand extends QMFCommand
}
}
+
+ @Override
+ public String toString()
+ {
+ return "QMFClassQueryCommand{" +
+ "package='" + _package + '\'' +
+ '}';
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandCompletionCommand.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandCompletionCommand.java
index f163e434d1..9a25201d4c 100644
--- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandCompletionCommand.java
+++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandCompletionCommand.java
@@ -53,4 +53,13 @@ public class QMFCommandCompletionCommand extends QMFCommand
encoder.writeInt32(_status.ordinal());
encoder.writeStr8(_text);
}
+
+ @Override
+ public String toString()
+ {
+ return "QMFCommandCompletionCommand{" +
+ "status=" + _status +
+ ",text='" + _text + '\'' +
+ '}';
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java
index ff927a1de9..c11e1a9b27 100644
--- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java
+++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java
@@ -21,6 +21,7 @@
package org.apache.qpid.qmf;
+import org.apache.log4j.Logger;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.message.ServerMessage;
@@ -33,28 +34,22 @@ import java.util.*;
public class QMFGetQueryCommand extends QMFCommand
{
- private Map<String, Object> _map;
+ private static final Logger _qmfLogger = Logger.getLogger("qpid.qmf");
+
+ private String _className;
+ private String _packageName;
+ private UUID _objectId;
public QMFGetQueryCommand(QMFCommandHeader header, BBDecoder decoder)
{
super(header);
- _map = decoder.readMap();
- }
-
- public void process(VirtualHost virtualHost, ServerMessage message)
- {
- String exchangeName = message.getMessageHeader().getReplyToExchange();
- String routingKey = message.getMessageHeader().getReplyToRoutingKey();
-
- IApplicationRegistry appRegistry = virtualHost.getApplicationRegistry();
- QMFService service = appRegistry.getQMFService();
-
- String className = (String) _map.get("_class");
- String packageName = (String) _map.get("_package");
+ Map<String, Object> _map = decoder.readMap();
+ _className = (String) _map.get("_class");
+ _packageName = (String) _map.get("_package");
byte[] objectIdBytes = (byte[]) _map.get("_objectId");
- UUID objectId;
+
if(objectIdBytes != null)
{
long msb = 0;
@@ -68,21 +63,34 @@ public class QMFGetQueryCommand extends QMFCommand
{
lsb = (lsb << 8) | (objectIdBytes[i] & 0xff);
}
- objectId = new UUID(msb, lsb);
+ _objectId = new UUID(msb, lsb);
}
else
{
- objectId = null;
+ _objectId = null;
}
+
+ }
+
+ public void process(VirtualHost virtualHost, ServerMessage message)
+ {
+ String exchangeName = message.getMessageHeader().getReplyToExchange();
+ String routingKey = message.getMessageHeader().getReplyToRoutingKey();
+
+ IApplicationRegistry appRegistry = virtualHost.getApplicationRegistry();
+ QMFService service = appRegistry.getQMFService();
+
+ _qmfLogger.debug("Execute: " + this);
+
List<QMFCommand> commands = new ArrayList<QMFCommand>();
final long sampleTime = System.currentTimeMillis() * 1000000l;
Collection<QMFPackage> packages;
- if(packageName != null && packageName.length() != 0)
+ if(_packageName != null && _packageName.length() != 0)
{
- QMFPackage qmfPackage = service.getPackage(packageName);
+ QMFPackage qmfPackage = service.getPackage(_packageName);
if(qmfPackage == null)
{
packages = Collections.EMPTY_LIST;
@@ -102,9 +110,9 @@ public class QMFGetQueryCommand extends QMFCommand
Collection<QMFClass> qmfClasses;
- if(className != null && className.length() != 0)
+ if(_className != null && _className.length() != 0)
{
- QMFClass qmfClass = qmfPackage.getQMFClass(className);
+ QMFClass qmfClass = qmfPackage.getQMFClass(_className);
if(qmfClass == null)
{
qmfClasses = Collections.EMPTY_LIST;
@@ -124,9 +132,9 @@ public class QMFGetQueryCommand extends QMFCommand
{
Collection<QMFObject> objects;
- if(objectId != null)
+ if(_objectId != null)
{
- QMFObject obj = service.getObjectById(qmfClass, objectId);
+ QMFObject obj = service.getObjectById(qmfClass, _objectId);
if(obj == null)
{
objects = Collections.EMPTY_LIST;
@@ -158,7 +166,7 @@ public class QMFGetQueryCommand extends QMFCommand
for(QMFCommand cmd : commands)
{
-
+ _qmfLogger.debug("Respond: " + cmd);
QMFMessage responseMessage = new QMFMessage(routingKey, cmd);
Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
@@ -179,4 +187,13 @@ public class QMFGetQueryCommand extends QMFCommand
}
}
+ @Override
+ public String toString()
+ {
+ return "QMFGetQueryCommand{" +
+ "packageName='" + _packageName + '\'' +
+ ", className='" + _className + '\'' +
+ ", objectId=" + _objectId +
+ '}';
+ }
} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java
index 37c16efec5..4001a2a321 100644
--- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java
+++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.qmf;
+import org.apache.log4j.Logger;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.message.ServerMessage;
@@ -33,6 +34,8 @@ import java.util.ArrayList;
public class QMFMethodRequestCommand extends QMFCommand
{
+ private static final Logger _qmfLogger = Logger.getLogger("qpid.qmf");
+
private QMFMethodInvocation _methodInstance;
private QMFObject _object;
@@ -59,6 +62,9 @@ public class QMFMethodRequestCommand extends QMFCommand
String queueName = message.getMessageHeader().getReplyToRoutingKey();
QMFCommand[] commands = new QMFCommand[2];
+
+ _qmfLogger.debug("Execute: " + _methodInstance + " on " + _object);
+
commands[0] = _methodInstance.execute(_object, this);
commands[1] = new QMFCommandCompletionCommand(this);
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java
index d126717fc8..631bd3c7cc 100644
--- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java
+++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java
@@ -73,4 +73,9 @@ public abstract class QMFObject<C extends QMFClass, D extends QMFObject.Delegate
abstract public QMFCommand asInstrumentInfoCmd(long sampleTime);
abstract public QMFCommand asGetQueryResponseCmd(final QMFGetQueryCommand queryCommand, long sampleTime);
+ @Override
+ public String toString()
+ {
+ return _delegate.toString();
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java
index ed07457a23..9cacbafcc1 100644
--- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java
+++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java
@@ -21,6 +21,7 @@
package org.apache.qpid.qmf;
+import org.apache.log4j.Logger;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.message.ServerMessage;
@@ -35,6 +36,9 @@ import java.util.List;
public class QMFPackageQueryCommand extends QMFCommand
{
+
+ private static final Logger _qmfLogger = Logger.getLogger("qpid.qmf");
+
public QMFPackageQueryCommand(QMFCommandHeader header, BBDecoder decoder)
{
super(header);
@@ -53,6 +57,8 @@ public class QMFPackageQueryCommand extends QMFCommand
QMFCommand[] commands = new QMFCommand[ supportedSchemas.size() + 1 ];
+ _qmfLogger.debug("Exectuting " + this);
+
int i = 0;
for(QMFPackage p : supportedSchemas)
{
@@ -84,4 +90,9 @@ public class QMFPackageQueryCommand extends QMFCommand
}
}
}
+
+ public String toString()
+ {
+ return "QMFPackageQueryCommand";
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java
index 850ffa8610..a1260ed9e6 100644
--- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java
+++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java
@@ -21,6 +21,7 @@
package org.apache.qpid.qmf;
+import org.apache.log4j.Logger;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.message.ServerMessage;
@@ -35,6 +36,8 @@ import java.util.List;
public class QMFSchemaRequestCommand extends QMFCommand
{
+ private static final Logger _qmfLogger = Logger.getLogger("qpid.qmf");
+
private final String _packageName;
private final String _className;
private final byte[] _hash;
@@ -49,6 +52,8 @@ public class QMFSchemaRequestCommand extends QMFCommand
public void process(VirtualHost virtualHost, ServerMessage message)
{
+ _qmfLogger.debug("Execute: " + this);
+
String exchangeName = message.getMessageHeader().getReplyToExchange();
String routingKey = message.getMessageHeader().getReplyToRoutingKey();
@@ -86,4 +91,13 @@ public class QMFSchemaRequestCommand extends QMFCommand
}
}
}
+
+ @Override
+ public String toString()
+ {
+ return "QMFSchemaRequestCommand{" +
+ " packageName='" + _packageName + '\'' +
+ ", className='" + _className + '\'' +
+ '}';
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
index 6abef6fd6b..27345f0a88 100644
--- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
+++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
@@ -410,7 +410,10 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
ConcurrentHashMap<UUID, QMFObject> map = _managedObjectsById.get(qmfclass);
if(map != null)
{
- return map.get(id);
+
+ UUID key = new UUID(id.getMostSignificantBits() & (0xFFFl << 48), id.getLeastSignificantBits());
+ return map.get(key);
+
}
else
{
@@ -604,6 +607,11 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
{
return _obj.getCreateTime();
}
+
+ public String toString()
+ {
+ return _obj.toString();
+ }
}
private class BrokerDelegate implements BrokerSchema.BrokerDelegate
@@ -762,6 +770,11 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
{
return _obj.getCreateTime();
}
+
+ public String toString()
+ {
+ return _obj.toString();
+ }
}
private class VhostDelegate implements BrokerSchema.VhostDelegate
@@ -797,6 +810,11 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
{
return _obj.getCreateTime();
}
+
+ public String toString()
+ {
+ return _obj.toString();
+ }
}
private class ExchangeDelegate implements BrokerSchema.ExchangeDelegate
@@ -923,6 +941,11 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
{
return _obj.getCreateTime();
}
+
+ public String toString()
+ {
+ return _obj.toString();
+ }
}
private class QueueDelegate implements BrokerSchema.QueueDelegate
@@ -1163,6 +1186,11 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
{
return _obj.getCreateTime();
}
+
+ public String toString()
+ {
+ return _obj.toString();
+ }
}
private class BindingDelegate implements BrokerSchema.BindingDelegate
@@ -1214,6 +1242,11 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
{
return _obj.getCreateTime();
}
+
+ public String toString()
+ {
+ return _obj.toString();
+ }
}
private class ConnectionDelegate implements BrokerSchema.ConnectionDelegate
@@ -1352,6 +1385,12 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
// TODO
return 0;
}
+
+
+ public String toString()
+ {
+ return _obj.toString();
+ }
}
private class SessionDelegate implements BrokerSchema.SessionDelegate
@@ -1476,6 +1515,11 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
{
return _obj.getCreateTime();
}
+
+ public String toString()
+ {
+ return _obj.toString();
+ }
}
private class SubscriptionDelegate implements BrokerSchema.SubscriptionDelegate
@@ -1542,93 +1586,103 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
{
return _obj.getCreateTime();
}
- }
- private class BridgeDelegate implements BrokerSchema.BridgeDelegate
+ public String toString()
{
- private final BridgeConfig _obj;
+ return _obj.toString();
+ }
+ }
- private BridgeDelegate(final BridgeConfig obj)
- {
- _obj = obj;
- }
+ private class BridgeDelegate implements BrokerSchema.BridgeDelegate
+ {
+ private final BridgeConfig _obj;
- public BrokerSchema.LinkObject getLinkRef()
- {
- return (BrokerSchema.LinkObject) adapt(_obj.getLink());
- }
+ private BridgeDelegate(final BridgeConfig obj)
+ {
+ _obj = obj;
+ }
- public Integer getChannelId()
- {
- return _obj.getChannelId();
- }
+ public BrokerSchema.LinkObject getLinkRef()
+ {
+ return (BrokerSchema.LinkObject) adapt(_obj.getLink());
+ }
- public Boolean getDurable()
- {
- return _obj.isDurable();
- }
+ public Integer getChannelId()
+ {
+ return _obj.getChannelId();
+ }
- public String getSrc()
- {
- return _obj.getSource();
- }
+ public Boolean getDurable()
+ {
+ return _obj.isDurable();
+ }
- public String getDest()
- {
- return _obj.getDestination();
- }
+ public String getSrc()
+ {
+ return _obj.getSource();
+ }
- public String getKey()
- {
- return _obj.getKey();
- }
+ public String getDest()
+ {
+ return _obj.getDestination();
+ }
- public Boolean getSrcIsQueue()
- {
- return _obj.isQueueBridge();
- }
+ public String getKey()
+ {
+ return _obj.getKey();
+ }
- public Boolean getSrcIsLocal()
- {
- return _obj.isLocalSource();
- }
+ public Boolean getSrcIsQueue()
+ {
+ return _obj.isQueueBridge();
+ }
- public String getTag()
- {
- return _obj.getTag();
- }
+ public Boolean getSrcIsLocal()
+ {
+ return _obj.isLocalSource();
+ }
- public String getExcludes()
- {
- return _obj.getExcludes();
- }
+ public String getTag()
+ {
+ return _obj.getTag();
+ }
- public Boolean getDynamic()
- {
- return _obj.isDynamic();
- }
+ public String getExcludes()
+ {
+ return _obj.getExcludes();
+ }
- public Integer getSync()
- {
- return _obj.getAckBatching();
- }
+ public Boolean getDynamic()
+ {
+ return _obj.isDynamic();
+ }
- public BrokerSchema.BridgeClass.CloseMethodResponseCommand close(final BrokerSchema.BridgeClass.CloseMethodResponseCommandFactory factory)
- {
- return null;
- }
+ public Integer getSync()
+ {
+ return _obj.getAckBatching();
+ }
- public UUID getId()
- {
- return _obj.getId();
- }
+ public BrokerSchema.BridgeClass.CloseMethodResponseCommand close(final BrokerSchema.BridgeClass.CloseMethodResponseCommandFactory factory)
+ {
+ return null;
+ }
- public long getCreateTime()
- {
- return _obj.getCreateTime();
- }
+ public UUID getId()
+ {
+ return _obj.getId();
}
+ public long getCreateTime()
+ {
+ return _obj.getCreateTime();
+ }
+
+ public String toString()
+ {
+ return _obj.toString();
+ }
+ }
+
private class LinkDelegate implements BrokerSchema.LinkDelegate
{
private final LinkConfig _obj;
@@ -1665,14 +1719,12 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
public String getState()
{
- // TODO
- return "";
+ return _obj.getState();
}
public String getLastError()
{
- // TODO
- return "";
+ return _obj.getLastError();
}
public BrokerSchema.LinkClass.CloseMethodResponseCommand close(final BrokerSchema.LinkClass.CloseMethodResponseCommandFactory factory)
@@ -1706,6 +1758,12 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
{
return _obj.getCreateTime();
}
+
+ @Override
+ public String toString()
+ {
+ return _obj.toString();
+ }
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java
index 0e03e33be8..4e031f0a84 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java
@@ -40,7 +40,10 @@ public class ConfigStore
private AtomicReference<SystemConfig> _root = new AtomicReference<SystemConfig>(null);
private final AtomicLong _objectIdSource = new AtomicLong(0l);
+ private final AtomicLong _persistentObjectIdSource = new AtomicLong(0l);
+ // TODO - should load/increment this on broker startup
+ private long _sequenceNumber = 1L;
public enum Event
{
@@ -167,9 +170,23 @@ public class ConfigStore
public UUID createId()
{
- return new UUID(0l, _objectIdSource.getAndIncrement());
+ return new UUID(((_sequenceNumber & 0xFFFl)<<48), _objectIdSource.incrementAndGet());
}
+ public UUID createPersistentId()
+ {
+ return new UUID(0L, _persistentObjectIdSource.incrementAndGet());
+ }
+
+ public void persistentIdInUse(UUID id)
+ {
+ long lsb = id.getLeastSignificantBits();
+ long currentId;
+ while((currentId = _persistentObjectIdSource.get()) < lsb)
+ {
+ _persistentObjectIdSource.compareAndSet(currentId, lsb);
+ }
+ }
public SystemConfig getRoot()
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfig.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfig.java
index 5a6159df34..0b3a9076dd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfig.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfig.java
@@ -54,4 +54,8 @@ public interface LinkConfig extends ConfiguredObject<LinkConfigType, LinkConfig>
String src,
String dest,
String key, String tag, String excludes);
+
+ String getState();
+
+ String getLastError();
} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java b/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
index 1c1527aa31..4db6ee3ad2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.federation;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.BridgeConfig;
import org.apache.qpid.server.configuration.BridgeConfigType;
@@ -52,6 +53,15 @@ import java.util.concurrent.ConcurrentMap;
public class Bridge implements BridgeConfig
{
+ private static final String DURABLE = "durable";
+ private static final String DYNAMIC = "dynamic";
+ private static final String SRC_IS_QUEUE = "srcIsQueue";
+ private static final String SRC_IS_LOCAL = "srcIsLocal";
+ private static final String SOURCE = "source";
+ private static final String DESTINATION = "destination";
+ private static final String KEY = "key";
+ private static final String TAG = "tag";
+ private static final String EXCLUDES = "excludes";
private final boolean _durable;
private final boolean _dynamic;
private final boolean _queueBridge;
@@ -95,19 +105,36 @@ public class Bridge implements BridgeConfig
_key = key;
_tag = tag;
_excludes = excludes;
- _id = brokerLink.getConfigStore().createId();
+ _id = durable ? brokerLink.getConfigStore().createPersistentId() : brokerLink.getConfigStore().createId();
_transaction = new AutoCommitTransaction(getVirtualHost().getMessageStore());
- if(dynamic)
+ if(durable)
{
- if(srcIsLocal)
+ try
+ {
+ brokerLink.getVirtualHost().getDurableConfigurationStore().createBridge(this);
+ }
+ catch (AMQStoreException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ createDelegate();
+ }
+
+ private void createDelegate()
+ {
+ if(_dynamic)
+ {
+ if(_localSource)
{
// TODO
}
else
{
- if(srcIsQueue)
+ if(_queueBridge)
{
// TODO
}
@@ -119,9 +146,9 @@ public class Bridge implements BridgeConfig
}
else
{
- if(srcIsLocal)
+ if(_localSource)
{
- if(srcIsQueue)
+ if(_queueBridge)
{
_delegate = new StaticQueuePushBridge();
}
@@ -132,7 +159,7 @@ public class Bridge implements BridgeConfig
}
else
{
- if(srcIsQueue)
+ if(_queueBridge)
{
_delegate = new StaticQueuePullBridge();
}
@@ -144,6 +171,65 @@ public class Bridge implements BridgeConfig
}
}
+ public Bridge(final BrokerLink brokerLink,
+ final int bridgeNo,
+ final UUID id,
+ final long createTime,
+ final Map<String, String> arguments)
+ {
+ _link = brokerLink;
+ _bridgeNo = bridgeNo;
+ _id = id;
+ brokerLink.getConfigStore().persistentIdInUse(id);
+ _createTime = createTime;
+
+ _durable = Boolean.valueOf(arguments.get(DURABLE));
+ _dynamic = Boolean.valueOf(arguments.get(DYNAMIC));
+ _queueBridge = Boolean.valueOf(arguments.get(SRC_IS_QUEUE));
+ _localSource = Boolean.valueOf(arguments.get(SRC_IS_LOCAL));
+ _source = arguments.get(SOURCE);
+ _destination = arguments.get(DESTINATION);
+ _key = arguments.get(KEY);
+ _tag = arguments.get(TAG);
+ _excludes = arguments.get(EXCLUDES);
+
+ //TODO.
+ _transaction = new AutoCommitTransaction(getVirtualHost().getMessageStore());
+
+
+ if(_durable)
+ {
+ try
+ {
+ brokerLink.getVirtualHost().getDurableConfigurationStore().createBridge(this);
+ }
+ catch (AMQStoreException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ createDelegate();
+ }
+
+
+ public Map<String,String> getArguments()
+ {
+ Map<String,String> arguments = new HashMap<String, String>();
+
+ arguments.put(DURABLE, String.valueOf(_durable));
+ arguments.put(DYNAMIC, String.valueOf(_dynamic));
+ arguments.put(SRC_IS_QUEUE, String.valueOf(_queueBridge));
+ arguments.put(SRC_IS_LOCAL, String.valueOf(_localSource));
+ arguments.put(SOURCE, _source);
+ arguments.put(DESTINATION, _destination);
+ arguments.put(KEY, _key);
+ arguments.put(TAG, _tag);
+ arguments.put(EXCLUDES, _excludes);
+
+ return Collections.unmodifiableMap(arguments);
+ }
+
public UUID getId()
{
return _id;
@@ -318,6 +404,7 @@ public class Bridge implements BridgeConfig
}
+
private interface BridgeImpl
{
void setSession(Session session);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java b/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
index f330e2f708..a8f75d2b9b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.federation;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -29,16 +30,19 @@ import org.apache.qpid.server.configuration.LinkConfig;
import org.apache.qpid.server.configuration.LinkConfigType;
import org.apache.qpid.server.transport.ServerSession;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.Binary;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionException;
-import org.apache.qpid.transport.ConnectionListener;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionDelegate;
-import org.apache.qpid.transport.TransportException;
-
-import java.util.Map;
-import java.util.UUID;
+import org.apache.qpid.transport.*;
+import org.apache.qpid.util.Strings;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -55,6 +59,14 @@ public class BrokerLink implements LinkConfig, ConnectionListener
private static final ScheduledThreadPoolExecutor _threadPool =
new ScheduledThreadPoolExecutor(CORE_POOL_SIZE);
+ private static final String TRANSPORT = "transport";
+ private static final String HOST = "host";
+ private static final String PORT = "port";
+ private static final String REMOTE_VHOST = "remoteVhost";
+ private static final String DURABLE = "durable";
+ private static final String AUTH_MECHANISM = "authMechanism";
+ private static final String USERNAME = "username";
+ private static final String PASSWORD = "password";
private final String _transport;
@@ -68,7 +80,7 @@ public class BrokerLink implements LinkConfig, ConnectionListener
private final VirtualHost _virtualHost;
private UUID _id;
private AtomicBoolean _closing = new AtomicBoolean();
- private final long _createTime = System.currentTimeMillis();
+ private final long _createTime;
private Connection _qpidConnection;
private AtomicReference<Thread> _executor = new AtomicReference<Thread>();
private AtomicInteger _bridgeId = new AtomicInteger();
@@ -88,8 +100,10 @@ public class BrokerLink implements LinkConfig, ConnectionListener
{
doMakeConnection();
}
- };;
- ;
+ };
+
+
+
public static enum State
{
@@ -205,6 +219,44 @@ public class BrokerLink implements LinkConfig, ConnectionListener
}
};
+ public BrokerLink(final VirtualHost virtualHost, UUID id, long createTime, Map<String, String> arguments)
+ {
+ _virtualHost = virtualHost;
+ _id = id;
+ virtualHost.getConfigStore().persistentIdInUse(id);
+ _createTime = createTime;
+ _transport = arguments.get(TRANSPORT);
+
+ _host = arguments.get(HOST);
+ _port = Integer.parseInt(arguments.get(PORT));
+ _remoteVhost = arguments.get(REMOTE_VHOST);
+ _durable = Boolean.parseBoolean(arguments.get(DURABLE));
+ _authMechanism = arguments.get("authMechanism");
+ _username = arguments.get("username");
+ _password = arguments.get("password");
+
+ if(_durable)
+ {
+ try
+ {
+ _virtualHost.getDurableConfigurationStore().createBrokerLink(this);
+ }
+ catch (AMQStoreException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ _qpidConnection = new Connection();
+ _connectionConfig = new ConnectionConfigAdapter();
+ _qpidConnection.addConnectionListener(this);
+
+
+ makeConnection();
+
+ }
+
public BrokerLink(final VirtualHost virtualHost,
final String transport,
@@ -212,10 +264,13 @@ public class BrokerLink implements LinkConfig, ConnectionListener
final int port,
final String remoteVhost,
final boolean durable,
- final String authMechanism, final String username, final String password)
+ final String authMechanism,
+ final String username,
+ final String password)
{
_virtualHost = virtualHost;
_transport = transport;
+ _createTime = System.currentTimeMillis();
_host = host;
_port = port;
_remoteVhost = remoteVhost;
@@ -223,15 +278,42 @@ public class BrokerLink implements LinkConfig, ConnectionListener
_authMechanism = authMechanism;
_username = username;
_password = password;
- _id = virtualHost.getConfigStore().createId();
+ _id = durable ? virtualHost.getConfigStore().createPersistentId() : virtualHost.getConfigStore().createId();
+
+ if(durable)
+ {
+ try
+ {
+ _virtualHost.getDurableConfigurationStore().createBrokerLink(this);
+ }
+ catch (AMQStoreException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
_qpidConnection = new Connection();
_connectionConfig = new ConnectionConfigAdapter();
_qpidConnection.addConnectionListener(this);
-
makeConnection();
}
+ public Map<String,String> getArguments()
+ {
+ Map<String,String> arguments = new HashMap<String, String>();
+
+ arguments.put(TRANSPORT, _transport);
+ arguments.put(HOST, _host);
+ arguments.put(PORT, String.valueOf(_port));
+ arguments.put(REMOTE_VHOST, _remoteVhost);
+ arguments.put(DURABLE, String.valueOf(_durable));
+ arguments.put(AUTH_MECHANISM, _authMechanism);
+ arguments.put(USERNAME, _username);
+ arguments.put(PASSWORD, _password);
+
+ return Collections.unmodifiableMap(arguments);
+ }
+
private final boolean updateState(State expected, State newState)
{
return _stateUpdater.compareAndSet(this,expected,newState);
@@ -250,9 +332,50 @@ public class BrokerLink implements LinkConfig, ConnectionListener
{
try
{
+ _qpidConnection.setConnectionDelegate(new ClientDelegate(new ConnectionSettings())
+ {
+ protected SaslClient createSaslClient(List<Object> brokerMechs) throws ConnectionException,
+ SaslException
+ {
+ Map<String,Object> saslProps = new HashMap<String,Object>();
+
+
+ CallbackHandler cbh = new CallbackHandler()
+ {
+ public void handle(final Callback[] callbacks)
+ throws IOException, UnsupportedCallbackException
+ {
+ for (int i = 0; i < callbacks.length; i++)
+ {
+ Callback cb = callbacks[i];
+ if (cb instanceof NameCallback)
+ {
+ ((NameCallback)cb).setName(_username);
+ }
+ else if (cb instanceof PasswordCallback)
+ {
+ ((PasswordCallback)cb).setPassword(_password.toCharArray());
+ }
+ else
+ {
+ throw new UnsupportedCallbackException(cb);
+ }
+ }
+
+ }
+ };
+ final SaslClient sc = Sasl.createSaslClient(new String[] {"PLAIN"}, null,
+ _conSettings.getSaslProtocol(),
+ _conSettings.getSaslServerName(),
+ saslProps, cbh);
+
+ return sc;
+ }});
+
_qpidConnection.connect(_host, _port, _remoteVhost, _username, _password, "ssl".equals(_transport), _authMechanism);
final Map<String,Object> serverProps = _qpidConnection.getServerProperties();
+
_remoteFederationTag = (String) serverProps.get(ServerPropertyNames.FEDERATION_TAG);
if(_remoteFederationTag == null)
{
@@ -445,6 +568,20 @@ public class BrokerLink implements LinkConfig, ConnectionListener
}
+ public void createBridge(final UUID id, final long createTime, final Map<String, String> arguments)
+ {
+ if(!_closing.get())
+ {
+ Bridge bridge = new Bridge(this, _bridgeId.incrementAndGet(), id, createTime, arguments);
+ if(_bridges.putIfAbsent(bridge, bridge) == null)
+ {
+
+ addBridge(bridge);
+ }
+ }
+ }
+
+
private void addBridge(final Bridge bridge)
{
getConfigStore().addConfiguredObject(bridge);
@@ -509,4 +646,34 @@ public class BrokerLink implements LinkConfig, ConnectionListener
{
return _remoteFederationTag;
}
+
+ public String getState()
+ {
+ return _state.name();
+ }
+
+ public String getLastError()
+ {
+ return _lastErrorMessage;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "BrokerLink{" +
+ " _id=" + _id +
+ ", _transport='" + _transport + '\'' +
+ ", _host='" + _host + '\'' +
+ ", _port=" + _port +
+ ", _remoteVhost='" + _remoteVhost + '\'' +
+ ", _durable=" + _durable +
+ ", _authMechanism='" + _authMechanism + '\'' +
+ ", _username='" + _username + '\'' +
+ ", _password='" + _password + '\'' +
+ ", _virtualHost=" + _virtualHost +
+ ", _createTime=" + _createTime +
+ ", _remoteFederationTag='" + _remoteFederationTag + '\'' +
+ ", _state=" + _state +
+ '}';
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java b/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
index 8266c1e79f..885b039e18 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
@@ -72,13 +72,16 @@ public class ChannelLogSubject extends AbstractLogSubject
* 3 - Virtualhost
* 4 - Channel ID
*/
- ServerConnection connection = (ServerConnection) session.getConnection();
- setLogStringWithFormat(CHANNEL_FORMAT,
- connection == null ? -1L : connection.getConnectionId(),
- session.getAuthorizedPrincipal() == null ? "?" : session.getAuthorizedPrincipal().getName(),
- (connection == null || connection.getConfig() == null) ? "?" : connection.getConfig().getAddress(),
- session.getVirtualHost().getName(),
- session.getChannel());
+ if(session.getConnection() instanceof ServerConnection)
+ {
+ ServerConnection connection = (ServerConnection) session.getConnection();
+ setLogStringWithFormat(CHANNEL_FORMAT,
+ connection == null ? -1L : connection.getConnectionId(),
+ session.getAuthorizedPrincipal() == null ? "?" : session.getAuthorizedPrincipal().getName(),
+ (connection == null || connection.getConfig() == null) ? "?" : connection.getConfig().getAddress(),
+ session.getVirtualHost().getName(),
+ session.getChannel());
+ }
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 7eb1b54693..6753cf4560 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -148,7 +148,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
BrokerConfig broker = new BrokerConfigAdapter(instance);
- SystemConfig system = (SystemConfig) store.getRoot();
+ SystemConfig system = store.getRoot();
system.addBroker(broker);
instance.setBroker(broker);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
index 6a36b22400..f77b8d2dfa 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
@@ -165,7 +165,6 @@ public class BrokerConfigAdapter implements BrokerConfig
/**
* @see org.apache.qpid.server.configuration.BrokerConfig#getFeatures()
*/
- @Override
public List<String> getFeatures()
{
final List<String> features = new ArrayList<String>();
@@ -176,4 +175,16 @@ public class BrokerConfigAdapter implements BrokerConfig
return Collections.unmodifiableList(features);
}
+
+ @Override
+ public String toString()
+ {
+ return "BrokerConfigAdapter{" +
+ "_id=" + _id +
+ ", _system=" + _system +
+ ", _vhosts=" + _vhosts +
+ ", _createTime=" + _createTime +
+ ", _federationTag='" + _federationTag + '\'' +
+ '}';
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
index a883f656be..09e7fe0a11 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
@@ -21,6 +21,9 @@
package org.apache.qpid.server.store;
import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+
import org.apache.qpid.framing.FieldTable;
public interface ConfigurationRecoveryHandler
@@ -42,7 +45,19 @@ public interface ConfigurationRecoveryHandler
public static interface BindingRecoveryHandler
{
void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf);
- void completeBindingRecovery();
+ BrokerLinkRecoveryHandler completeBindingRecovery();
+ }
+
+ public static interface BrokerLinkRecoveryHandler
+ {
+ BridgeRecoveryHandler brokerLink(UUID id, long createTime, Map<String,String> arguments);
+ void completeBrokerLinkRecovery();
+ }
+
+ public static interface BridgeRecoveryHandler
+ {
+ void bridge(UUID id, long createTime, Map<String,String> arguments);
+ void completeBridgeRecoveryForLink();
}
public static interface QueueEntryRecoveryHandler
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index d90b3d02ba..45083c1595 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -21,7 +21,9 @@
package org.apache.qpid.server.store;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.lang.ref.SoftReference;
@@ -36,7 +38,10 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -47,13 +52,14 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.message.EnqueableMessage;
-import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
/**
@@ -82,6 +88,10 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
private static final String META_DATA_TABLE_NAME = "QPID_META_DATA";
private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
+ private static final String LINKS_TABLE_NAME = "QPID_LINKS";
+ private static final String BRIDGES_TABLE_NAME = "QPID_BRIDGES";
+
+
private static final int DB_VERSION = 3;
@@ -137,6 +147,49 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME;
+ private static final String CREATE_LINKS_TABLE =
+ "CREATE TABLE "+LINKS_TABLE_NAME+" ( id_lsb bigint not null,"
+ + " id_msb bigint not null,"
+ + " create_time bigint not null,"
+ + " arguments blob, PRIMARY KEY ( id_lsb, id_msb ))";
+ private static final String SELECT_FROM_LINKS =
+ "SELECT create_time, arguments FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and id_msb";
+ private static final String DELETE_FROM_LINKS = "DELETE FROM " + LINKS_TABLE_NAME
+ + " WHERE id_lsb = ? and id_msb = ?";
+ private static final String SELECT_ALL_FROM_LINKS = "SELECT id_lsb, id_msb, create_time, "
+ + "arguments FROM " + LINKS_TABLE_NAME;
+ private static final String FIND_LINK = "SELECT id_lsb, id_msb FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and"
+ + " id_msb = ?";
+ private static final String INSERT_INTO_LINKS = "INSERT INTO " + LINKS_TABLE_NAME + "( id_lsb, "
+ + "id_msb, create_time, arguments ) values (?, ?, ?, ?)";
+
+
+ private static final String CREATE_BRIDGES_TABLE =
+ "CREATE TABLE "+BRIDGES_TABLE_NAME+" ( id_lsb bigint not null,"
+ + " id_msb bigint not null,"
+ + " create_time bigint not null,"
+ + " link_id_lsb bigint not null,"
+ + " link_id_msb bigint not null,"
+ + " arguments blob, PRIMARY KEY ( id_lsb, id_msb ))";
+ private static final String SELECT_FROM_BRIDGES =
+ "SELECT create_time, link_id_lsb, link_id_msb, arguments FROM "
+ + BRIDGES_TABLE_NAME + " WHERE id_lsb = ? and id_msb = ?";
+ private static final String DELETE_FROM_BRIDGES = "DELETE FROM " + BRIDGES_TABLE_NAME
+ + " WHERE id_lsb = ? and id_msb = ?";
+ private static final String SELECT_ALL_FROM_BRIDGES = "SELECT id_lsb, id_msb, "
+ + " create_time,"
+ + " link_id_lsb, link_id_msb, "
+ + "arguments FROM " + BRIDGES_TABLE_NAME
+ + " WHERE link_id_lsb = ? and link_id_msb = ?";
+ private static final String FIND_BRIDGE = "SELECT id_lsb, id_msb FROM " + BRIDGES_TABLE_NAME +
+ " WHERE id_lsb = ? and id_msb = ?";
+ private static final String INSERT_INTO_BRIDGES = "INSERT INTO " + BRIDGES_TABLE_NAME + "( id_lsb, id_msb, "
+ + "create_time, "
+ + "link_id_lsb, link_id_msb, "
+ + "arguments )"
+ + " values (?, ?, ?, ?, ?, ?)";
+
+
private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
@@ -294,6 +347,8 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
createQueueEntryTable(conn);
createMetaDataTable(conn);
createMessageContentTable(conn);
+ createLinkTable(conn);
+ createBridgeTable(conn);
conn.close();
}
@@ -430,6 +485,40 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
}
+ private void createLinkTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(LINKS_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(CREATE_LINKS_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+
+ private void createBridgeTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(BRIDGES_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(CREATE_BRIDGES_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+
private boolean tableExists(final String tableName, final Connection conn) throws SQLException
@@ -470,7 +559,8 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
List<String> exchanges = loadExchanges(erh);
ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery();
recoverBindings(brh, exchanges);
- brh.completeBindingRecovery();
+ ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
+ recoverBrokerLinks(lrh);
}
catch (SQLException e)
{
@@ -481,6 +571,144 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
}
+ private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh)
+ throws SQLException
+ {
+ _logger.info("Recovering broker links...");
+
+ Connection conn = null;
+ try
+ {
+ conn = newAutoCommitConnection();
+
+ PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_LINKS);
+
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+
+ try
+ {
+
+ while(rs.next())
+ {
+ UUID id = new UUID(rs.getLong(2), rs.getLong(1));
+ long createTime = rs.getLong(3);
+ Blob argumentsAsBlob = rs.getBlob(4);
+
+ byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
+
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes));
+ int size = dis.readInt();
+
+ Map<String,String> arguments = new HashMap<String, String>();
+
+ for(int i = 0; i < size; i++)
+ {
+ arguments.put(dis.readUTF(), dis.readUTF());
+ }
+
+ ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments);
+
+ recoverBridges(brh, id);
+
+ }
+ }
+ catch (IOException e)
+ {
+ throw new SQLException(e.getMessage(), e);
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ finally
+ {
+ if(conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ }
+
+ private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId)
+ throws SQLException
+ {
+ _logger.info("Recovering bridges for link " + linkId + "...");
+
+ Connection conn = null;
+ try
+ {
+ conn = newAutoCommitConnection();
+
+ PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_BRIDGES);
+ stmt.setLong(1, linkId.getLeastSignificantBits());
+ stmt.setLong(2, linkId.getMostSignificantBits());
+
+
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+
+ try
+ {
+
+ while(rs.next())
+ {
+ UUID id = new UUID(rs.getLong(2), rs.getLong(1));
+ long createTime = rs.getLong(3);
+ Blob argumentsAsBlob = rs.getBlob(6);
+
+ byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
+
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes));
+ int size = dis.readInt();
+
+ Map<String,String> arguments = new HashMap<String, String>();
+
+ for(int i = 0; i < size; i++)
+ {
+ arguments.put(dis.readUTF(), dis.readUTF());
+ }
+
+ brh.bridge(id, createTime, arguments);
+
+ }
+ brh.completeBridgeRecoveryForLink();
+ }
+ catch (IOException e)
+ {
+ throw new SQLException(e.getMessage(), e);
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ finally
+ {
+ if(conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ }
+
private void loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException
{
Connection conn = newAutoCommitConnection();
@@ -1191,6 +1419,233 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor
}
+ public void createBrokerLink(final BrokerLink link) throws AMQStoreException
+ {
+ _logger.debug("public void createBrokerLink(BrokerLink = " + link + "): called");
+
+ if (_state != State.RECOVERING)
+ {
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+
+ PreparedStatement stmt = conn.prepareStatement(FIND_LINK);
+ try
+ {
+
+ stmt.setLong(1, link.getId().getLeastSignificantBits());
+ stmt.setLong(2, link.getId().getMostSignificantBits());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+
+ // If we don't have any data in the result set then we can add this queue
+ if (!rs.next())
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_LINKS);
+
+ try
+ {
+
+ insertStmt.setLong(1, link.getId().getLeastSignificantBits());
+ insertStmt.setLong(2, link.getId().getMostSignificantBits());
+ insertStmt.setLong(3, link.getCreateTime());
+
+ byte[] argumentBytes = convertStringMapToBytes(link.getArguments());
+ ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes);
+
+ insertStmt.setBinaryStream(4,bis,argumentBytes.length);
+
+ insertStmt.execute();
+ }
+ finally
+ {
+ insertStmt.close();
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ conn.close();
+
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error writing " + link + " to database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws AMQStoreException
+ {
+ byte[] argumentBytes;
+ if(arguments == null)
+ {
+ argumentBytes = new byte[0];
+ }
+ else
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+
+
+ try
+ {
+ dos.writeInt(arguments.size());
+ for(Map.Entry<String,String> arg : arguments.entrySet())
+ {
+ dos.writeUTF(arg.getKey());
+ dos.writeUTF(arg.getValue());
+ }
+ }
+ catch (IOException e)
+ {
+ // This should never happen
+ throw new AMQStoreException(e.getMessage(), e);
+ }
+ argumentBytes = bos.toByteArray();
+ }
+ return argumentBytes;
+ }
+
+ public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
+ {
+ _logger.debug("public void deleteBrokerLink( " + link + "): called");
+ Connection conn = null;
+ PreparedStatement stmt = null;
+ try
+ {
+ conn = newAutoCommitConnection();
+ stmt = conn.prepareStatement(DELETE_FROM_LINKS);
+ stmt.setLong(1, link.getId().getLeastSignificantBits());
+ stmt.setLong(2, link.getId().getMostSignificantBits());
+ int results = stmt.executeUpdate();
+
+ if (results == 0)
+ {
+ throw new AMQStoreException("Link " + link + " not found");
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error deleting Link " + link + " from database: " + e.getMessage(), e);
+ }
+ finally
+ {
+ closePreparedStatement(stmt);
+ closeConnection(conn);
+ }
+
+
+ }
+
+ public void createBridge(final Bridge bridge) throws AMQStoreException
+ {
+ _logger.debug("public void createBridge(BrokerLink = " + bridge + "): called");
+
+ if (_state != State.RECOVERING)
+ {
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+
+ PreparedStatement stmt = conn.prepareStatement(FIND_BRIDGE);
+ try
+ {
+
+ UUID id = bridge.getId();
+ stmt.setLong(1, id.getLeastSignificantBits());
+ stmt.setLong(2, id.getMostSignificantBits());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+
+ // If we don't have any data in the result set then we can add this queue
+ if (!rs.next())
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_BRIDGES);
+
+ try
+ {
+
+ insertStmt.setLong(1, id.getLeastSignificantBits());
+ insertStmt.setLong(2, id.getMostSignificantBits());
+
+ insertStmt.setLong(3, bridge.getCreateTime());
+
+ UUID linkId = bridge.getLink().getId();
+ insertStmt.setLong(4, linkId.getLeastSignificantBits());
+ insertStmt.setLong(5, linkId.getMostSignificantBits());
+
+ byte[] argumentBytes = convertStringMapToBytes(bridge.getArguments());
+ ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes);
+
+ insertStmt.setBinaryStream(6,bis,argumentBytes.length);
+
+ insertStmt.execute();
+ }
+ finally
+ {
+ insertStmt.close();
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ conn.close();
+
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error writing " + bridge + " to database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ public void deleteBridge(final Bridge bridge) throws AMQStoreException
+ {
+ _logger.debug("public void deleteBridge( " + bridge + "): called");
+ Connection conn = null;
+ PreparedStatement stmt = null;
+ try
+ {
+ conn = newAutoCommitConnection();
+ stmt = conn.prepareStatement(DELETE_FROM_BRIDGES);
+ stmt.setLong(1, bridge.getId().getLeastSignificantBits());
+ stmt.setLong(2, bridge.getId().getMostSignificantBits());
+ int results = stmt.executeUpdate();
+
+ if (results == 0)
+ {
+ throw new AMQStoreException("Bridge " + bridge + " not found");
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error deleting bridge " + bridge + " from database: " + e.getMessage(), e);
+ }
+ finally
+ {
+ closePreparedStatement(stmt);
+ closeConnection(conn);
+ }
+
+ }
+
public Transaction newTransaction()
{
return new DerbyTransaction();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index 5fb23653cb..9cd2567b7d 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -25,6 +25,8 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.queue.AMQQueue;
@@ -128,4 +130,12 @@ public interface DurableConfigurationStore
* @throws AMQStoreException If the operation fails for any reason.
*/
void updateQueue(AMQQueue queue) throws AMQStoreException;
+
+ void createBrokerLink(BrokerLink link) throws AMQStoreException;
+
+ void deleteBrokerLink(BrokerLink link) throws AMQStoreException;
+
+ void createBridge(Bridge bridge) throws AMQStoreException;
+
+ void deleteBridge(Bridge bridge) throws AMQStoreException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 005055dbaa..c5393f73a2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -31,6 +31,8 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
@@ -157,6 +159,26 @@ public class MemoryMessageStore implements MessageStore, DurableConfigurationSto
// Not required to do anything
}
+ public void createBrokerLink(final BrokerLink link) throws AMQStoreException
+ {
+
+ }
+
+ public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
+ {
+
+ }
+
+ public void createBridge(final Bridge bridge) throws AMQStoreException
+ {
+
+ }
+
+ public void deleteBridge(final Bridge bridge) throws AMQStoreException
+ {
+
+ }
+
public void configureTransactionLog(String name,
TransactionLogRecoveryHandler recoveryHandler,
Configuration storeConfiguration,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 7526b19058..7a2c07b9c8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -724,11 +724,18 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
public String toLogString()
{
- return "[" +
+ long connectionId = getConnection() instanceof ServerConnection
+ ? ((ServerConnection) getConnection()).getConnectionId()
+ : -1;
+
+ String remoteAddress = _connectionConfig instanceof ProtocolEngine
+ ? ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString()
+ : "";
+ return "[" +
MessageFormat.format(CHANNEL_FORMAT,
- ((ServerConnection) getConnection()).getConnectionId(),
+ connectionId,
getClientID(),
- ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString(),
+ remoteAddress,
getVirtualHost().getName(),
getChannel())
+ "] ";
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 24ab29444c..0baaf61f77 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.Map;
import java.util.UUID;
import org.apache.qpid.common.Closeable;
@@ -91,6 +92,8 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo
boolean durable,
String authMechanism, String username, String password);
+ public BrokerLink createBrokerConnection(UUID id, long createTime, Map<String,String> arguments);
+
ConfigStore getConfigStore();
void removeBrokerConnection(BrokerLink brokerLink);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index ce6bfe87e0..51892d965a 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.virtualhost;
+import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.MessageStore;
@@ -54,11 +55,13 @@ import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.TreeMap;
+import java.util.UUID;
public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler,
ConfigurationRecoveryHandler.QueueRecoveryHandler,
ConfigurationRecoveryHandler.ExchangeRecoveryHandler,
ConfigurationRecoveryHandler.BindingRecoveryHandler,
+ ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler,
MessageStoreRecoveryHandler,
MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
TransactionLogRecoveryHandler,
@@ -180,7 +183,19 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
public void completeMessageRecovery()
{
//TODO - log end
- //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public BridgeRecoveryHandler brokerLink(final UUID id,
+ final long createTime,
+ final Map<String, String> arguments)
+ {
+ BrokerLink blink = _virtualHost.createBrokerConnection(id, createTime, arguments);
+ return new BridgeRecoveryHandlerImpl(blink);
+
+ }
+
+ public void completeBrokerLinkRecovery()
+ {
}
private static final class ProcessAction
@@ -261,9 +276,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
}
- public void completeBindingRecovery()
+ public BrokerLinkRecoveryHandler completeBindingRecovery()
{
- //return this;
+ return this;
}
public void complete()
@@ -386,4 +401,23 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
return null;
}
}
+
+ private class BridgeRecoveryHandlerImpl implements BridgeRecoveryHandler
+ {
+ private final BrokerLink _blink;
+
+ public BridgeRecoveryHandlerImpl(final BrokerLink blink)
+ {
+ _blink = blink;
+ }
+
+ public void bridge(final UUID id, final long createTime, final Map<String, String> arguments)
+ {
+ _blink.createBridge(id, createTime, arguments);
+ }
+
+ public void completeBridgeRecoveryForLink()
+ {
+ }
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index 9b1c5474a3..715c917aa3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -54,6 +54,7 @@ import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.federation.Bridge;
import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -730,6 +731,16 @@ public class VirtualHostImpl implements VirtualHost
_statisticsEnabled = enabled;
}
+ public BrokerLink createBrokerConnection(UUID id, long createTime, Map<String,String> arguments)
+ {
+ BrokerLink blink = new BrokerLink(this, id, createTime, arguments);
+ // TODO - cope with duplicate broker link creation requests
+ _links.putIfAbsent(blink,blink);
+ getConfigStore().addConfiguredObject(blink);
+ return blink;
+ }
+
+
public void createBrokerConnection(final String transport,
final String host,
final int port,
@@ -740,10 +751,11 @@ public class VirtualHostImpl implements VirtualHost
final String password)
{
BrokerLink blink = new BrokerLink(this, transport, host, port, vhost, durable, authMechanism, username, password);
- if(_links.putIfAbsent(blink,blink) != null)
- {
- getConfigStore().addConfiguredObject(blink);
- }
+
+ // TODO - cope with duplicate broker link creation requests
+ _links.putIfAbsent(blink,blink);
+ getConfigStore().addConfiguredObject(blink);
+
}
public void removeBrokerConnection(final String transport,
@@ -782,7 +794,9 @@ public class VirtualHostImpl implements VirtualHost
public List<Exchange> exchange = new LinkedList<Exchange>();
public List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>();
public List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>();
-
+ public List<BrokerLink> links = new LinkedList<BrokerLink>();
+ public List<Bridge> bridges = new LinkedList<Bridge>();
+
public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
{
}
@@ -877,6 +891,30 @@ public class VirtualHostImpl implements VirtualHost
public void updateQueue(AMQQueue queue) throws AMQStoreException
{
}
+
+ public void createBrokerLink(final BrokerLink link) throws AMQStoreException
+ {
+ if(link.isDurable())
+ {
+ links.add(link);
+ }
+ }
+
+ public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
+ {
+ }
+
+ public void createBridge(final Bridge bridge) throws AMQStoreException
+ {
+ if(bridge.isDurable())
+ {
+ bridges.add(bridge);
+ }
+ }
+
+ public void deleteBridge(final Bridge bridge) throws AMQStoreException
+ {
+ }
}
@Override
diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index 153371c8d9..314b116829 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.binding.BindingFactory;
@@ -63,6 +64,11 @@ public class MockVirtualHost implements VirtualHost
}
+ public BrokerLink createBrokerConnection(final UUID id, final long createTime, final Map<String, String> arguments)
+ {
+ return null;
+ }
+
public IApplicationRegistry getApplicationRegistry()
{
return null;
diff --git a/java/broker/src/xsl/qmf.xsl b/java/broker/src/xsl/qmf.xsl
index 3a7e10dac8..1e98c97466 100644
--- a/java/broker/src/xsl/qmf.xsl
+++ b/java/broker/src/xsl/qmf.xsl
@@ -288,6 +288,17 @@ public class <xsl:value-of select="$ClassName"/> extends QMFPackage
<xsl:apply-templates select="node()[name()='property' or name()='statistic']" mode="optionalPropertyPresence"/>
<xsl:apply-templates select="node()[name()='property' or name()='statistic']" mode="encodeProperty"/>
}
+
+ public String toString()
+ {
+ return "QMF<xsl:value-of select="@name"/>GetQueryResponseCommand{id=" + getObject().getId()
+<xsl:for-each select="node()[name()='property' or name()='statistic']">
+<xsl:if test="@type!='hilo32' and @type!='mmaTime' ">
+ + ", <xsl:value-of select="@name"/>=" + getObject().get<xsl:call-template name="initCap"><xsl:with-param name="input"><xsl:value-of select="@name"/></xsl:with-param></xsl:call-template>()
+</xsl:if>
+</xsl:for-each>
+ + "}";
+ }
}
@@ -530,6 +541,11 @@ public class <xsl:value-of select="$ClassName"/> extends QMFPackage
{
return obj.<xsl:value-of select="@name"/>( new <xsl:value-of select="$ClassName"/>ResponseCommandFactory(cmd)<xsl:if test="node()[name()='arg' and ( @dir='I' or @dir='IO' ) ]">, </xsl:if><xsl:apply-templates select="node()[name()='arg' and ( @dir='I' or @dir='IO' ) ]" mode="methodArgList"><xsl:with-param name="prefix">_</xsl:with-param></xsl:apply-templates> );
}
+
+ public String toString()
+ {
+ return "<xsl:value-of select="$ClassName"/>["<xsl:for-each select="node()[name()='arg' and ( @dir='I' or @dir='IO' ) ]"><xsl:if test="preceding-sibling::node()[name()='arg' and ( @dir='I' or @dir='IO' ) ]">+ ", "</xsl:if>+ "<xsl:value-of select="@name"/> = " + _<xsl:value-of select="@name"/> </xsl:for-each>+"]";
+ }
}
public final class <xsl:value-of select="$ClassName"/>ResponseCommandFactory
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index fb5cf8b3c6..2d450cf09c 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -26,6 +26,8 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.message.ServerMessage;
@@ -320,4 +322,31 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
}
+ public void createBrokerLink(final BrokerLink link) throws AMQStoreException
+ {
+ doPreDelay("createBrokerLink");
+ _durableConfigurationStore.createBrokerLink(link);
+ doPostDelay("createBrokerLink");
+ }
+
+ public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
+ {
+ doPreDelay("deleteBrokerLink");
+ _durableConfigurationStore.deleteBrokerLink(link);
+ doPostDelay("deleteBrokerLink");
+ }
+
+ public void createBridge(final Bridge bridge) throws AMQStoreException
+ {
+ doPreDelay("createBridge");
+ _durableConfigurationStore.createBridge(bridge);
+ doPostDelay("createBridge");
+ }
+
+ public void deleteBridge(final Bridge bridge) throws AMQStoreException
+ {
+ doPreDelay("deleteBridge");
+ _durableConfigurationStore.deleteBridge(bridge);
+ doPostDelay("deleteBridge");
+ }
}