diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-01-07 22:47:17 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-01-07 22:47:17 +0000 |
| commit | 0129e12deaabcf3cf3be23913967397be6a12e3a (patch) | |
| tree | 1ff521e7be49675201bf66f96e4956dc20bac0a8 /java | |
| parent | ad776f381e2690c58c37c33d23b2389da1b2028e (diff) | |
| download | qpid-python-0129e12deaabcf3cf3be23913967397be6a12e3a.tar.gz | |
QPID-946 , QPID-2379 : QMF and Federation fixes (now works again with qpid-config, qpid-route, qpid-tool) and store (durable) routes in the DB
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1228748 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
30 files changed, 1502 insertions, 145 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 1d8187401d..9efa2937aa 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -27,13 +27,16 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Queue; +import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import com.sleepycat.bind.tuple.LongBinding; +import com.sleepycat.bind.tuple.StringBinding; import com.sleepycat.je.*; import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; @@ -41,6 +44,8 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.federation.Bridge; +import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; @@ -100,12 +105,17 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private String DELIVERYDB_NAME = "deliveryDb"; private String EXCHANGEDB_NAME = "exchangeDb"; private String QUEUEDB_NAME = "queueDb"; + private String BRIDGEDB_NAME = "bridges"; + private String LINKDB_NAME = "links"; + private Database _messageMetaDataDb; private Database _messageContentDb; private Database _queueBindingsDb; private Database _deliveryDb; private Database _exchangeDb; private Database _queueDb; + private Database _bridgeDb; + private Database _linkDb; /* ======= * Schema: @@ -190,6 +200,10 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore EXCHANGEDB_NAME += "_v" + version; QUEUEBINDINGSDB_NAME += "_v" + version; + + LINKDB_NAME += "_v" + version; + + BRIDGEDB_NAME += "_v" + version; } } @@ -461,6 +475,9 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore _queueBindingsDb = _environment.openDatabase(null, QUEUEBINDINGSDB_NAME, dbConfig); _messageContentDb = _environment.openDatabase(null, MESSAGECONTENTDB_NAME, dbConfig); _deliveryDb = _environment.openDatabase(null, DELIVERYDB_NAME, dbConfig); + _linkDb = _environment.openDatabase(null, LINKDB_NAME, dbConfig); + _bridgeDb = _environment.openDatabase(null, BRIDGEDB_NAME, dbConfig); + } @@ -517,6 +534,18 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore _deliveryDb.close(); } + if (_bridgeDb != null) + { + _log.info("Close bridge database"); + _bridgeDb.close(); + } + + if (_linkDb != null) + { + _log.info("Close link database"); + _linkDb.close(); + } + closeEnvironment(); _state = State.CLOSED; @@ -556,8 +585,9 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore BindingRecoveryHandler brh = erh.completeExchangeRecovery(); recoverBindings(brh); - - brh.completeBindingRecovery(); + + ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery(); + recoverBrokerLinks(lrh); } catch (DatabaseException e) { @@ -674,6 +704,74 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } + + private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh) + { + Cursor cursor = null; + + try + { + cursor = _linkDb.openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + UUID id = UUIDTupleBinding.getInstance().entryToObject(key); + long createTime = LongBinding.entryToLong(value); + Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value); + + ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments); + + recoverBridges(brh, id); + } + } + finally + { + if (cursor != null) + { + cursor.close(); + } + } + + } + + private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId) + { + Cursor cursor = null; + + try + { + cursor = _bridgeDb.openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + UUID id = UUIDTupleBinding.getInstance().entryToObject(key); + + UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value); + if(parentId.equals(linkId)) + { + + long createTime = LongBinding.entryToLong(value); + Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value); + brh.bridge(id,createTime,arguments); + } + } + brh.completeBridgeRecoveryForLink(); + } + finally + { + if (cursor != null) + { + cursor.close(); + } + } + + } + + private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException { StoredMessageRecoveryHandler mrh = msrh.begin(); @@ -1163,6 +1261,90 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } + public void createBrokerLink(final BrokerLink link) throws AMQStoreException + { + if (_state != State.RECOVERING) + { + DatabaseEntry key = new DatabaseEntry(); + UUIDTupleBinding.getInstance().objectToEntry(link.getId(), key); + + DatabaseEntry value = new DatabaseEntry(); + LongBinding.longToEntry(link.getCreateTime(),value); + StringMapBinding.getInstance().objectToEntry(link.getArguments(), value); + + try + { + _linkDb.put(null, key, value); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error writing Link " + link + + " to database: " + e.getMessage(), e); + } + } + } + + public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException + { + DatabaseEntry key = new DatabaseEntry(); + UUIDTupleBinding.getInstance().objectToEntry(link.getId(), key); + try + { + OperationStatus status = _linkDb.delete(null, key); + if (status == OperationStatus.NOTFOUND) + { + throw new AMQStoreException("Link " + link + " not found"); + } + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error deleting the Link " + link + " from database: " + e.getMessage(), e); + } + } + + public void createBridge(final Bridge bridge) throws AMQStoreException + { + if (_state != State.RECOVERING) + { + DatabaseEntry key = new DatabaseEntry(); + UUIDTupleBinding.getInstance().objectToEntry(bridge.getId(), key); + + DatabaseEntry value = new DatabaseEntry(); + UUIDTupleBinding.getInstance().objectToEntry(bridge.getLink().getId(),value); + LongBinding.longToEntry(bridge.getCreateTime(),value); + StringMapBinding.getInstance().objectToEntry(bridge.getArguments(), value); + + try + { + _bridgeDb.put(null, key, value); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error writing Bridge " + bridge + + " to database: " + e.getMessage(), e); + } + + } + } + + public void deleteBridge(final Bridge bridge) throws AMQStoreException + { + DatabaseEntry key = new DatabaseEntry(); + UUIDTupleBinding.getInstance().objectToEntry(bridge.getId(), key); + try + { + OperationStatus status = _bridgeDb.delete(null, key); + if (status == OperationStatus.NOTFOUND) + { + throw new AMQStoreException("Bridge " + bridge + " not found"); + } + } + catch (DatabaseException e) + { + throw new AMQStoreException("Error deleting the Bridge " + bridge + " from database: " + e.getMessage(), e); + } + } + /** * Places a message onto a specified queue, in a given transaction. * diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java new file mode 100644 index 0000000000..f8fd39e127 --- /dev/null +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; + +import java.util.HashMap; +import java.util.Map; + +public class StringMapBinding extends TupleBinding<Map<String,String>> +{ + + private static final StringMapBinding INSTANCE = new StringMapBinding(); + + public Map<String, String> entryToObject(final TupleInput tupleInput) + { + int entries = tupleInput.readInt(); + Map<String,String> map = new HashMap<String,String>(entries); + for(int i = 0; i < entries; i++) + { + map.put(tupleInput.readString(), tupleInput.readString()); + } + return map; + } + + + public void objectToEntry(final Map<String, String> stringStringMap, final TupleOutput tupleOutput) + { + tupleOutput.writeInt(stringStringMap.size()); + for(Map.Entry<String,String> entry : stringStringMap.entrySet()) + { + tupleOutput.writeString(entry.getKey()); + tupleOutput.writeString(entry.getValue()); + } + } + + public static StringMapBinding getInstance() + { + return INSTANCE; + } +} diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java new file mode 100644 index 0000000000..c1a5d473f0 --- /dev/null +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; + +import java.util.UUID; + +public class UUIDTupleBinding extends TupleBinding<UUID> +{ + private static final UUIDTupleBinding INSTANCE = new UUIDTupleBinding(); + + public UUID entryToObject(final TupleInput tupleInput) + { + return new UUID(tupleInput.readLong(), tupleInput.readLong()); + } + + public void objectToEntry(final UUID uuid, final TupleOutput tupleOutput) + { + tupleOutput.writeLong(uuid.getMostSignificantBits()); + tupleOutput.writeLong(uuid.getLeastSignificantBits()); + } + + public static UUIDTupleBinding getInstance() + { + return INSTANCE; + } + + +} diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java index 9ee8b923fc..709b59588d 100644 --- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java +++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java @@ -21,6 +21,7 @@ package org.apache.qpid.qmf; +import org.apache.log4j.Logger; import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.transport.codec.BBEncoder; import org.apache.qpid.server.message.ServerMessage; @@ -37,6 +38,9 @@ import java.util.List; public class QMFBrokerRequestCommand extends QMFCommand { + private static final Logger _qmfLogger = Logger.getLogger("qpid.qmf"); + + public QMFBrokerRequestCommand(QMFCommandHeader header, BBDecoder buf) { super(header); @@ -47,6 +51,8 @@ public class QMFBrokerRequestCommand extends QMFCommand String exchangeName = message.getMessageHeader().getReplyToExchange(); String queueName = message.getMessageHeader().getReplyToRoutingKey(); + _qmfLogger.debug("Execute: " + this); + QMFCommand[] commands = new QMFCommand[2]; commands[0] = new QMFBrokerResponseCommand(this, virtualHost); commands[1] = new QMFCommandCompletionCommand(this); diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java index 5d2717a9fb..64edc2f294 100644 --- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java +++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java @@ -21,6 +21,7 @@ package org.apache.qpid.qmf; +import org.apache.log4j.Logger; import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.message.ServerMessage; @@ -35,6 +36,9 @@ import java.util.List; public class QMFClassQueryCommand extends QMFCommand { + private static final Logger _qmfLogger = Logger.getLogger("qpid.qmf"); + + private final String _package; public QMFClassQueryCommand(QMFCommandHeader header, BBDecoder decoder) @@ -48,6 +52,8 @@ public class QMFClassQueryCommand extends QMFCommand String exchangeName = message.getMessageHeader().getReplyToExchange(); String routingKey = message.getMessageHeader().getReplyToRoutingKey(); + _qmfLogger.debug("Execute: " + this); + IApplicationRegistry appRegistry = virtualHost.getApplicationRegistry(); QMFService service = appRegistry.getQMFService(); @@ -88,4 +94,12 @@ public class QMFClassQueryCommand extends QMFCommand } } + + @Override + public String toString() + { + return "QMFClassQueryCommand{" + + "package='" + _package + '\'' + + '}'; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandCompletionCommand.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandCompletionCommand.java index f163e434d1..9a25201d4c 100644 --- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandCompletionCommand.java +++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandCompletionCommand.java @@ -53,4 +53,13 @@ public class QMFCommandCompletionCommand extends QMFCommand encoder.writeInt32(_status.ordinal()); encoder.writeStr8(_text); } + + @Override + public String toString() + { + return "QMFCommandCompletionCommand{" + + "status=" + _status + + ",text='" + _text + '\'' + + '}'; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java index ff927a1de9..c11e1a9b27 100644 --- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java +++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java @@ -21,6 +21,7 @@ package org.apache.qpid.qmf; +import org.apache.log4j.Logger; import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.message.ServerMessage; @@ -33,28 +34,22 @@ import java.util.*; public class QMFGetQueryCommand extends QMFCommand { - private Map<String, Object> _map; + private static final Logger _qmfLogger = Logger.getLogger("qpid.qmf"); + + private String _className; + private String _packageName; + private UUID _objectId; public QMFGetQueryCommand(QMFCommandHeader header, BBDecoder decoder) { super(header); - _map = decoder.readMap(); - } - - public void process(VirtualHost virtualHost, ServerMessage message) - { - String exchangeName = message.getMessageHeader().getReplyToExchange(); - String routingKey = message.getMessageHeader().getReplyToRoutingKey(); - - IApplicationRegistry appRegistry = virtualHost.getApplicationRegistry(); - QMFService service = appRegistry.getQMFService(); - - String className = (String) _map.get("_class"); - String packageName = (String) _map.get("_package"); + Map<String, Object> _map = decoder.readMap(); + _className = (String) _map.get("_class"); + _packageName = (String) _map.get("_package"); byte[] objectIdBytes = (byte[]) _map.get("_objectId"); - UUID objectId; + if(objectIdBytes != null) { long msb = 0; @@ -68,21 +63,34 @@ public class QMFGetQueryCommand extends QMFCommand { lsb = (lsb << 8) | (objectIdBytes[i] & 0xff); } - objectId = new UUID(msb, lsb); + _objectId = new UUID(msb, lsb); } else { - objectId = null; + _objectId = null; } + + } + + public void process(VirtualHost virtualHost, ServerMessage message) + { + String exchangeName = message.getMessageHeader().getReplyToExchange(); + String routingKey = message.getMessageHeader().getReplyToRoutingKey(); + + IApplicationRegistry appRegistry = virtualHost.getApplicationRegistry(); + QMFService service = appRegistry.getQMFService(); + + _qmfLogger.debug("Execute: " + this); + List<QMFCommand> commands = new ArrayList<QMFCommand>(); final long sampleTime = System.currentTimeMillis() * 1000000l; Collection<QMFPackage> packages; - if(packageName != null && packageName.length() != 0) + if(_packageName != null && _packageName.length() != 0) { - QMFPackage qmfPackage = service.getPackage(packageName); + QMFPackage qmfPackage = service.getPackage(_packageName); if(qmfPackage == null) { packages = Collections.EMPTY_LIST; @@ -102,9 +110,9 @@ public class QMFGetQueryCommand extends QMFCommand Collection<QMFClass> qmfClasses; - if(className != null && className.length() != 0) + if(_className != null && _className.length() != 0) { - QMFClass qmfClass = qmfPackage.getQMFClass(className); + QMFClass qmfClass = qmfPackage.getQMFClass(_className); if(qmfClass == null) { qmfClasses = Collections.EMPTY_LIST; @@ -124,9 +132,9 @@ public class QMFGetQueryCommand extends QMFCommand { Collection<QMFObject> objects; - if(objectId != null) + if(_objectId != null) { - QMFObject obj = service.getObjectById(qmfClass, objectId); + QMFObject obj = service.getObjectById(qmfClass, _objectId); if(obj == null) { objects = Collections.EMPTY_LIST; @@ -158,7 +166,7 @@ public class QMFGetQueryCommand extends QMFCommand for(QMFCommand cmd : commands) { - + _qmfLogger.debug("Respond: " + cmd); QMFMessage responseMessage = new QMFMessage(routingKey, cmd); Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName); @@ -179,4 +187,13 @@ public class QMFGetQueryCommand extends QMFCommand } } + @Override + public String toString() + { + return "QMFGetQueryCommand{" + + "packageName='" + _packageName + '\'' + + ", className='" + _className + '\'' + + ", objectId=" + _objectId + + '}'; + } }
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java index 37c16efec5..4001a2a321 100644 --- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java +++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.qmf; +import org.apache.log4j.Logger; import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.message.ServerMessage; @@ -33,6 +34,8 @@ import java.util.ArrayList; public class QMFMethodRequestCommand extends QMFCommand { + private static final Logger _qmfLogger = Logger.getLogger("qpid.qmf"); + private QMFMethodInvocation _methodInstance; private QMFObject _object; @@ -59,6 +62,9 @@ public class QMFMethodRequestCommand extends QMFCommand String queueName = message.getMessageHeader().getReplyToRoutingKey(); QMFCommand[] commands = new QMFCommand[2]; + + _qmfLogger.debug("Execute: " + _methodInstance + " on " + _object); + commands[0] = _methodInstance.execute(_object, this); commands[1] = new QMFCommandCompletionCommand(this); diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java index d126717fc8..631bd3c7cc 100644 --- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java +++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java @@ -73,4 +73,9 @@ public abstract class QMFObject<C extends QMFClass, D extends QMFObject.Delegate abstract public QMFCommand asInstrumentInfoCmd(long sampleTime); abstract public QMFCommand asGetQueryResponseCmd(final QMFGetQueryCommand queryCommand, long sampleTime); + @Override + public String toString() + { + return _delegate.toString(); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java index ed07457a23..9cacbafcc1 100644 --- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java +++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java @@ -21,6 +21,7 @@ package org.apache.qpid.qmf; +import org.apache.log4j.Logger; import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.message.ServerMessage; @@ -35,6 +36,9 @@ import java.util.List; public class QMFPackageQueryCommand extends QMFCommand { + + private static final Logger _qmfLogger = Logger.getLogger("qpid.qmf"); + public QMFPackageQueryCommand(QMFCommandHeader header, BBDecoder decoder) { super(header); @@ -53,6 +57,8 @@ public class QMFPackageQueryCommand extends QMFCommand QMFCommand[] commands = new QMFCommand[ supportedSchemas.size() + 1 ]; + _qmfLogger.debug("Exectuting " + this); + int i = 0; for(QMFPackage p : supportedSchemas) { @@ -84,4 +90,9 @@ public class QMFPackageQueryCommand extends QMFCommand } } } + + public String toString() + { + return "QMFPackageQueryCommand"; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java index 850ffa8610..a1260ed9e6 100644 --- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java +++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java @@ -21,6 +21,7 @@ package org.apache.qpid.qmf; +import org.apache.log4j.Logger; import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.message.ServerMessage; @@ -35,6 +36,8 @@ import java.util.List; public class QMFSchemaRequestCommand extends QMFCommand { + private static final Logger _qmfLogger = Logger.getLogger("qpid.qmf"); + private final String _packageName; private final String _className; private final byte[] _hash; @@ -49,6 +52,8 @@ public class QMFSchemaRequestCommand extends QMFCommand public void process(VirtualHost virtualHost, ServerMessage message) { + _qmfLogger.debug("Execute: " + this); + String exchangeName = message.getMessageHeader().getReplyToExchange(); String routingKey = message.getMessageHeader().getReplyToRoutingKey(); @@ -86,4 +91,13 @@ public class QMFSchemaRequestCommand extends QMFCommand } } } + + @Override + public String toString() + { + return "QMFSchemaRequestCommand{" + + " packageName='" + _packageName + '\'' + + ", className='" + _className + '\'' + + '}'; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index 6abef6fd6b..27345f0a88 100644 --- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -410,7 +410,10 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable ConcurrentHashMap<UUID, QMFObject> map = _managedObjectsById.get(qmfclass); if(map != null) { - return map.get(id); + + UUID key = new UUID(id.getMostSignificantBits() & (0xFFFl << 48), id.getLeastSignificantBits()); + return map.get(key); + } else { @@ -604,6 +607,11 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable { return _obj.getCreateTime(); } + + public String toString() + { + return _obj.toString(); + } } private class BrokerDelegate implements BrokerSchema.BrokerDelegate @@ -762,6 +770,11 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable { return _obj.getCreateTime(); } + + public String toString() + { + return _obj.toString(); + } } private class VhostDelegate implements BrokerSchema.VhostDelegate @@ -797,6 +810,11 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable { return _obj.getCreateTime(); } + + public String toString() + { + return _obj.toString(); + } } private class ExchangeDelegate implements BrokerSchema.ExchangeDelegate @@ -923,6 +941,11 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable { return _obj.getCreateTime(); } + + public String toString() + { + return _obj.toString(); + } } private class QueueDelegate implements BrokerSchema.QueueDelegate @@ -1163,6 +1186,11 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable { return _obj.getCreateTime(); } + + public String toString() + { + return _obj.toString(); + } } private class BindingDelegate implements BrokerSchema.BindingDelegate @@ -1214,6 +1242,11 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable { return _obj.getCreateTime(); } + + public String toString() + { + return _obj.toString(); + } } private class ConnectionDelegate implements BrokerSchema.ConnectionDelegate @@ -1352,6 +1385,12 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable // TODO return 0; } + + + public String toString() + { + return _obj.toString(); + } } private class SessionDelegate implements BrokerSchema.SessionDelegate @@ -1476,6 +1515,11 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable { return _obj.getCreateTime(); } + + public String toString() + { + return _obj.toString(); + } } private class SubscriptionDelegate implements BrokerSchema.SubscriptionDelegate @@ -1542,93 +1586,103 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable { return _obj.getCreateTime(); } - } - private class BridgeDelegate implements BrokerSchema.BridgeDelegate + public String toString() { - private final BridgeConfig _obj; + return _obj.toString(); + } + } - private BridgeDelegate(final BridgeConfig obj) - { - _obj = obj; - } + private class BridgeDelegate implements BrokerSchema.BridgeDelegate + { + private final BridgeConfig _obj; - public BrokerSchema.LinkObject getLinkRef() - { - return (BrokerSchema.LinkObject) adapt(_obj.getLink()); - } + private BridgeDelegate(final BridgeConfig obj) + { + _obj = obj; + } - public Integer getChannelId() - { - return _obj.getChannelId(); - } + public BrokerSchema.LinkObject getLinkRef() + { + return (BrokerSchema.LinkObject) adapt(_obj.getLink()); + } - public Boolean getDurable() - { - return _obj.isDurable(); - } + public Integer getChannelId() + { + return _obj.getChannelId(); + } - public String getSrc() - { - return _obj.getSource(); - } + public Boolean getDurable() + { + return _obj.isDurable(); + } - public String getDest() - { - return _obj.getDestination(); - } + public String getSrc() + { + return _obj.getSource(); + } - public String getKey() - { - return _obj.getKey(); - } + public String getDest() + { + return _obj.getDestination(); + } - public Boolean getSrcIsQueue() - { - return _obj.isQueueBridge(); - } + public String getKey() + { + return _obj.getKey(); + } - public Boolean getSrcIsLocal() - { - return _obj.isLocalSource(); - } + public Boolean getSrcIsQueue() + { + return _obj.isQueueBridge(); + } - public String getTag() - { - return _obj.getTag(); - } + public Boolean getSrcIsLocal() + { + return _obj.isLocalSource(); + } - public String getExcludes() - { - return _obj.getExcludes(); - } + public String getTag() + { + return _obj.getTag(); + } - public Boolean getDynamic() - { - return _obj.isDynamic(); - } + public String getExcludes() + { + return _obj.getExcludes(); + } - public Integer getSync() - { - return _obj.getAckBatching(); - } + public Boolean getDynamic() + { + return _obj.isDynamic(); + } - public BrokerSchema.BridgeClass.CloseMethodResponseCommand close(final BrokerSchema.BridgeClass.CloseMethodResponseCommandFactory factory) - { - return null; - } + public Integer getSync() + { + return _obj.getAckBatching(); + } - public UUID getId() - { - return _obj.getId(); - } + public BrokerSchema.BridgeClass.CloseMethodResponseCommand close(final BrokerSchema.BridgeClass.CloseMethodResponseCommandFactory factory) + { + return null; + } - public long getCreateTime() - { - return _obj.getCreateTime(); - } + public UUID getId() + { + return _obj.getId(); } + public long getCreateTime() + { + return _obj.getCreateTime(); + } + + public String toString() + { + return _obj.toString(); + } + } + private class LinkDelegate implements BrokerSchema.LinkDelegate { private final LinkConfig _obj; @@ -1665,14 +1719,12 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable public String getState() { - // TODO - return ""; + return _obj.getState(); } public String getLastError() { - // TODO - return ""; + return _obj.getLastError(); } public BrokerSchema.LinkClass.CloseMethodResponseCommand close(final BrokerSchema.LinkClass.CloseMethodResponseCommandFactory factory) @@ -1706,6 +1758,12 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable { return _obj.getCreateTime(); } + + @Override + public String toString() + { + return _obj.toString(); + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java index 0e03e33be8..4e031f0a84 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java @@ -40,7 +40,10 @@ public class ConfigStore private AtomicReference<SystemConfig> _root = new AtomicReference<SystemConfig>(null); private final AtomicLong _objectIdSource = new AtomicLong(0l); + private final AtomicLong _persistentObjectIdSource = new AtomicLong(0l); + // TODO - should load/increment this on broker startup + private long _sequenceNumber = 1L; public enum Event { @@ -167,9 +170,23 @@ public class ConfigStore public UUID createId() { - return new UUID(0l, _objectIdSource.getAndIncrement()); + return new UUID(((_sequenceNumber & 0xFFFl)<<48), _objectIdSource.incrementAndGet()); } + public UUID createPersistentId() + { + return new UUID(0L, _persistentObjectIdSource.incrementAndGet()); + } + + public void persistentIdInUse(UUID id) + { + long lsb = id.getLeastSignificantBits(); + long currentId; + while((currentId = _persistentObjectIdSource.get()) < lsb) + { + _persistentObjectIdSource.compareAndSet(currentId, lsb); + } + } public SystemConfig getRoot() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfig.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfig.java index 5a6159df34..0b3a9076dd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfig.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfig.java @@ -54,4 +54,8 @@ public interface LinkConfig extends ConfiguredObject<LinkConfigType, LinkConfig> String src, String dest, String key, String tag, String excludes); + + String getState(); + + String getLastError(); }
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java b/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java index 1c1527aa31..4db6ee3ad2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java +++ b/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.federation; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.BridgeConfig; import org.apache.qpid.server.configuration.BridgeConfigType; @@ -52,6 +53,15 @@ import java.util.concurrent.ConcurrentMap; public class Bridge implements BridgeConfig { + private static final String DURABLE = "durable"; + private static final String DYNAMIC = "dynamic"; + private static final String SRC_IS_QUEUE = "srcIsQueue"; + private static final String SRC_IS_LOCAL = "srcIsLocal"; + private static final String SOURCE = "source"; + private static final String DESTINATION = "destination"; + private static final String KEY = "key"; + private static final String TAG = "tag"; + private static final String EXCLUDES = "excludes"; private final boolean _durable; private final boolean _dynamic; private final boolean _queueBridge; @@ -95,19 +105,36 @@ public class Bridge implements BridgeConfig _key = key; _tag = tag; _excludes = excludes; - _id = brokerLink.getConfigStore().createId(); + _id = durable ? brokerLink.getConfigStore().createPersistentId() : brokerLink.getConfigStore().createId(); _transaction = new AutoCommitTransaction(getVirtualHost().getMessageStore()); - if(dynamic) + if(durable) { - if(srcIsLocal) + try + { + brokerLink.getVirtualHost().getDurableConfigurationStore().createBridge(this); + } + catch (AMQStoreException e) + { + throw new RuntimeException(e); + } + } + + createDelegate(); + } + + private void createDelegate() + { + if(_dynamic) + { + if(_localSource) { // TODO } else { - if(srcIsQueue) + if(_queueBridge) { // TODO } @@ -119,9 +146,9 @@ public class Bridge implements BridgeConfig } else { - if(srcIsLocal) + if(_localSource) { - if(srcIsQueue) + if(_queueBridge) { _delegate = new StaticQueuePushBridge(); } @@ -132,7 +159,7 @@ public class Bridge implements BridgeConfig } else { - if(srcIsQueue) + if(_queueBridge) { _delegate = new StaticQueuePullBridge(); } @@ -144,6 +171,65 @@ public class Bridge implements BridgeConfig } } + public Bridge(final BrokerLink brokerLink, + final int bridgeNo, + final UUID id, + final long createTime, + final Map<String, String> arguments) + { + _link = brokerLink; + _bridgeNo = bridgeNo; + _id = id; + brokerLink.getConfigStore().persistentIdInUse(id); + _createTime = createTime; + + _durable = Boolean.valueOf(arguments.get(DURABLE)); + _dynamic = Boolean.valueOf(arguments.get(DYNAMIC)); + _queueBridge = Boolean.valueOf(arguments.get(SRC_IS_QUEUE)); + _localSource = Boolean.valueOf(arguments.get(SRC_IS_LOCAL)); + _source = arguments.get(SOURCE); + _destination = arguments.get(DESTINATION); + _key = arguments.get(KEY); + _tag = arguments.get(TAG); + _excludes = arguments.get(EXCLUDES); + + //TODO. + _transaction = new AutoCommitTransaction(getVirtualHost().getMessageStore()); + + + if(_durable) + { + try + { + brokerLink.getVirtualHost().getDurableConfigurationStore().createBridge(this); + } + catch (AMQStoreException e) + { + throw new RuntimeException(e); + } + } + + createDelegate(); + } + + + public Map<String,String> getArguments() + { + Map<String,String> arguments = new HashMap<String, String>(); + + arguments.put(DURABLE, String.valueOf(_durable)); + arguments.put(DYNAMIC, String.valueOf(_dynamic)); + arguments.put(SRC_IS_QUEUE, String.valueOf(_queueBridge)); + arguments.put(SRC_IS_LOCAL, String.valueOf(_localSource)); + arguments.put(SOURCE, _source); + arguments.put(DESTINATION, _destination); + arguments.put(KEY, _key); + arguments.put(TAG, _tag); + arguments.put(EXCLUDES, _excludes); + + return Collections.unmodifiableMap(arguments); + } + public UUID getId() { return _id; @@ -318,6 +404,7 @@ public class Bridge implements BridgeConfig } + private interface BridgeImpl { void setSession(Session session); diff --git a/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java b/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java index f330e2f708..a8f75d2b9b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java +++ b/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.federation; +import org.apache.qpid.AMQStoreException; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; @@ -29,16 +30,19 @@ import org.apache.qpid.server.configuration.LinkConfig; import org.apache.qpid.server.configuration.LinkConfigType; import org.apache.qpid.server.transport.ServerSession; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.transport.Binary; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionException; -import org.apache.qpid.transport.ConnectionListener; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionDelegate; -import org.apache.qpid.transport.TransportException; - -import java.util.Map; -import java.util.UUID; +import org.apache.qpid.transport.*; +import org.apache.qpid.util.Strings; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import java.io.IOException; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -55,6 +59,14 @@ public class BrokerLink implements LinkConfig, ConnectionListener private static final ScheduledThreadPoolExecutor _threadPool = new ScheduledThreadPoolExecutor(CORE_POOL_SIZE); + private static final String TRANSPORT = "transport"; + private static final String HOST = "host"; + private static final String PORT = "port"; + private static final String REMOTE_VHOST = "remoteVhost"; + private static final String DURABLE = "durable"; + private static final String AUTH_MECHANISM = "authMechanism"; + private static final String USERNAME = "username"; + private static final String PASSWORD = "password"; private final String _transport; @@ -68,7 +80,7 @@ public class BrokerLink implements LinkConfig, ConnectionListener private final VirtualHost _virtualHost; private UUID _id; private AtomicBoolean _closing = new AtomicBoolean(); - private final long _createTime = System.currentTimeMillis(); + private final long _createTime; private Connection _qpidConnection; private AtomicReference<Thread> _executor = new AtomicReference<Thread>(); private AtomicInteger _bridgeId = new AtomicInteger(); @@ -88,8 +100,10 @@ public class BrokerLink implements LinkConfig, ConnectionListener { doMakeConnection(); } - };; - ; + }; + + + public static enum State { @@ -205,6 +219,44 @@ public class BrokerLink implements LinkConfig, ConnectionListener } }; + public BrokerLink(final VirtualHost virtualHost, UUID id, long createTime, Map<String, String> arguments) + { + _virtualHost = virtualHost; + _id = id; + virtualHost.getConfigStore().persistentIdInUse(id); + _createTime = createTime; + _transport = arguments.get(TRANSPORT); + + _host = arguments.get(HOST); + _port = Integer.parseInt(arguments.get(PORT)); + _remoteVhost = arguments.get(REMOTE_VHOST); + _durable = Boolean.parseBoolean(arguments.get(DURABLE)); + _authMechanism = arguments.get("authMechanism"); + _username = arguments.get("username"); + _password = arguments.get("password"); + + if(_durable) + { + try + { + _virtualHost.getDurableConfigurationStore().createBrokerLink(this); + } + catch (AMQStoreException e) + { + throw new RuntimeException(e); + } + } + + + _qpidConnection = new Connection(); + _connectionConfig = new ConnectionConfigAdapter(); + _qpidConnection.addConnectionListener(this); + + + makeConnection(); + + } + public BrokerLink(final VirtualHost virtualHost, final String transport, @@ -212,10 +264,13 @@ public class BrokerLink implements LinkConfig, ConnectionListener final int port, final String remoteVhost, final boolean durable, - final String authMechanism, final String username, final String password) + final String authMechanism, + final String username, + final String password) { _virtualHost = virtualHost; _transport = transport; + _createTime = System.currentTimeMillis(); _host = host; _port = port; _remoteVhost = remoteVhost; @@ -223,15 +278,42 @@ public class BrokerLink implements LinkConfig, ConnectionListener _authMechanism = authMechanism; _username = username; _password = password; - _id = virtualHost.getConfigStore().createId(); + _id = durable ? virtualHost.getConfigStore().createPersistentId() : virtualHost.getConfigStore().createId(); + + if(durable) + { + try + { + _virtualHost.getDurableConfigurationStore().createBrokerLink(this); + } + catch (AMQStoreException e) + { + throw new RuntimeException(e); + } + } _qpidConnection = new Connection(); _connectionConfig = new ConnectionConfigAdapter(); _qpidConnection.addConnectionListener(this); - makeConnection(); } + public Map<String,String> getArguments() + { + Map<String,String> arguments = new HashMap<String, String>(); + + arguments.put(TRANSPORT, _transport); + arguments.put(HOST, _host); + arguments.put(PORT, String.valueOf(_port)); + arguments.put(REMOTE_VHOST, _remoteVhost); + arguments.put(DURABLE, String.valueOf(_durable)); + arguments.put(AUTH_MECHANISM, _authMechanism); + arguments.put(USERNAME, _username); + arguments.put(PASSWORD, _password); + + return Collections.unmodifiableMap(arguments); + } + private final boolean updateState(State expected, State newState) { return _stateUpdater.compareAndSet(this,expected,newState); @@ -250,9 +332,50 @@ public class BrokerLink implements LinkConfig, ConnectionListener { try { + _qpidConnection.setConnectionDelegate(new ClientDelegate(new ConnectionSettings()) + { + protected SaslClient createSaslClient(List<Object> brokerMechs) throws ConnectionException, + SaslException + { + Map<String,Object> saslProps = new HashMap<String,Object>(); + + + CallbackHandler cbh = new CallbackHandler() + { + public void handle(final Callback[] callbacks) + throws IOException, UnsupportedCallbackException + { + for (int i = 0; i < callbacks.length; i++) + { + Callback cb = callbacks[i]; + if (cb instanceof NameCallback) + { + ((NameCallback)cb).setName(_username); + } + else if (cb instanceof PasswordCallback) + { + ((PasswordCallback)cb).setPassword(_password.toCharArray()); + } + else + { + throw new UnsupportedCallbackException(cb); + } + } + + } + }; + final SaslClient sc = Sasl.createSaslClient(new String[] {"PLAIN"}, null, + _conSettings.getSaslProtocol(), + _conSettings.getSaslServerName(), + saslProps, cbh); + + return sc; + }}); + _qpidConnection.connect(_host, _port, _remoteVhost, _username, _password, "ssl".equals(_transport), _authMechanism); final Map<String,Object> serverProps = _qpidConnection.getServerProperties(); + _remoteFederationTag = (String) serverProps.get(ServerPropertyNames.FEDERATION_TAG); if(_remoteFederationTag == null) { @@ -445,6 +568,20 @@ public class BrokerLink implements LinkConfig, ConnectionListener } + public void createBridge(final UUID id, final long createTime, final Map<String, String> arguments) + { + if(!_closing.get()) + { + Bridge bridge = new Bridge(this, _bridgeId.incrementAndGet(), id, createTime, arguments); + if(_bridges.putIfAbsent(bridge, bridge) == null) + { + + addBridge(bridge); + } + } + } + + private void addBridge(final Bridge bridge) { getConfigStore().addConfiguredObject(bridge); @@ -509,4 +646,34 @@ public class BrokerLink implements LinkConfig, ConnectionListener { return _remoteFederationTag; } + + public String getState() + { + return _state.name(); + } + + public String getLastError() + { + return _lastErrorMessage; + } + + @Override + public String toString() + { + return "BrokerLink{" + + " _id=" + _id + + ", _transport='" + _transport + '\'' + + ", _host='" + _host + '\'' + + ", _port=" + _port + + ", _remoteVhost='" + _remoteVhost + '\'' + + ", _durable=" + _durable + + ", _authMechanism='" + _authMechanism + '\'' + + ", _username='" + _username + '\'' + + ", _password='" + _password + '\'' + + ", _virtualHost=" + _virtualHost + + ", _createTime=" + _createTime + + ", _remoteFederationTag='" + _remoteFederationTag + '\'' + + ", _state=" + _state + + '}'; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java b/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java index 8266c1e79f..885b039e18 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java +++ b/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java @@ -72,13 +72,16 @@ public class ChannelLogSubject extends AbstractLogSubject * 3 - Virtualhost * 4 - Channel ID */ - ServerConnection connection = (ServerConnection) session.getConnection(); - setLogStringWithFormat(CHANNEL_FORMAT, - connection == null ? -1L : connection.getConnectionId(), - session.getAuthorizedPrincipal() == null ? "?" : session.getAuthorizedPrincipal().getName(), - (connection == null || connection.getConfig() == null) ? "?" : connection.getConfig().getAddress(), - session.getVirtualHost().getName(), - session.getChannel()); + if(session.getConnection() instanceof ServerConnection) + { + ServerConnection connection = (ServerConnection) session.getConnection(); + setLogStringWithFormat(CHANNEL_FORMAT, + connection == null ? -1L : connection.getConnectionId(), + session.getAuthorizedPrincipal() == null ? "?" : session.getAuthorizedPrincipal().getName(), + (connection == null || connection.getConfig() == null) ? "?" : connection.getConfig().getAddress(), + session.getVirtualHost().getName(), + session.getChannel()); + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 7eb1b54693..6753cf4560 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -148,7 +148,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry BrokerConfig broker = new BrokerConfigAdapter(instance); - SystemConfig system = (SystemConfig) store.getRoot(); + SystemConfig system = store.getRoot(); system.addBroker(broker); instance.setBroker(broker); diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java index 6a36b22400..f77b8d2dfa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java @@ -165,7 +165,6 @@ public class BrokerConfigAdapter implements BrokerConfig /** * @see org.apache.qpid.server.configuration.BrokerConfig#getFeatures() */ - @Override public List<String> getFeatures() { final List<String> features = new ArrayList<String>(); @@ -176,4 +175,16 @@ public class BrokerConfigAdapter implements BrokerConfig return Collections.unmodifiableList(features); } + + @Override + public String toString() + { + return "BrokerConfigAdapter{" + + "_id=" + _id + + ", _system=" + _system + + ", _vhosts=" + _vhosts + + ", _createTime=" + _createTime + + ", _federationTag='" + _federationTag + '\'' + + '}'; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java index a883f656be..09e7fe0a11 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java @@ -21,6 +21,9 @@ package org.apache.qpid.server.store; import java.nio.ByteBuffer; +import java.util.Map; +import java.util.UUID; + import org.apache.qpid.framing.FieldTable; public interface ConfigurationRecoveryHandler @@ -42,7 +45,19 @@ public interface ConfigurationRecoveryHandler public static interface BindingRecoveryHandler { void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf); - void completeBindingRecovery(); + BrokerLinkRecoveryHandler completeBindingRecovery(); + } + + public static interface BrokerLinkRecoveryHandler + { + BridgeRecoveryHandler brokerLink(UUID id, long createTime, Map<String,String> arguments); + void completeBrokerLinkRecovery(); + } + + public static interface BridgeRecoveryHandler + { + void bridge(UUID id, long createTime, Map<String,String> arguments); + void completeBridgeRecoveryForLink(); } public static interface QueueEntryRecoveryHandler diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index d90b3d02ba..45083c1595 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -21,7 +21,9 @@ package org.apache.qpid.server.store; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.IOException; import java.lang.ref.SoftReference; @@ -36,7 +38,10 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -47,13 +52,14 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.federation.Bridge; +import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.messages.MessageStoreMessages; import org.apache.qpid.server.logging.messages.TransactionLogMessages; import org.apache.qpid.server.message.EnqueableMessage; -import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; /** @@ -82,6 +88,10 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor private static final String META_DATA_TABLE_NAME = "QPID_META_DATA"; private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT"; + private static final String LINKS_TABLE_NAME = "QPID_LINKS"; + private static final String BRIDGES_TABLE_NAME = "QPID_BRIDGES"; + + private static final int DB_VERSION = 3; @@ -137,6 +147,49 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?"; private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME; + private static final String CREATE_LINKS_TABLE = + "CREATE TABLE "+LINKS_TABLE_NAME+" ( id_lsb bigint not null," + + " id_msb bigint not null," + + " create_time bigint not null," + + " arguments blob, PRIMARY KEY ( id_lsb, id_msb ))"; + private static final String SELECT_FROM_LINKS = + "SELECT create_time, arguments FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and id_msb"; + private static final String DELETE_FROM_LINKS = "DELETE FROM " + LINKS_TABLE_NAME + + " WHERE id_lsb = ? and id_msb = ?"; + private static final String SELECT_ALL_FROM_LINKS = "SELECT id_lsb, id_msb, create_time, " + + "arguments FROM " + LINKS_TABLE_NAME; + private static final String FIND_LINK = "SELECT id_lsb, id_msb FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and" + + " id_msb = ?"; + private static final String INSERT_INTO_LINKS = "INSERT INTO " + LINKS_TABLE_NAME + "( id_lsb, " + + "id_msb, create_time, arguments ) values (?, ?, ?, ?)"; + + + private static final String CREATE_BRIDGES_TABLE = + "CREATE TABLE "+BRIDGES_TABLE_NAME+" ( id_lsb bigint not null," + + " id_msb bigint not null," + + " create_time bigint not null," + + " link_id_lsb bigint not null," + + " link_id_msb bigint not null," + + " arguments blob, PRIMARY KEY ( id_lsb, id_msb ))"; + private static final String SELECT_FROM_BRIDGES = + "SELECT create_time, link_id_lsb, link_id_msb, arguments FROM " + + BRIDGES_TABLE_NAME + " WHERE id_lsb = ? and id_msb = ?"; + private static final String DELETE_FROM_BRIDGES = "DELETE FROM " + BRIDGES_TABLE_NAME + + " WHERE id_lsb = ? and id_msb = ?"; + private static final String SELECT_ALL_FROM_BRIDGES = "SELECT id_lsb, id_msb, " + + " create_time," + + " link_id_lsb, link_id_msb, " + + "arguments FROM " + BRIDGES_TABLE_NAME + + " WHERE link_id_lsb = ? and link_id_msb = ?"; + private static final String FIND_BRIDGE = "SELECT id_lsb, id_msb FROM " + BRIDGES_TABLE_NAME + + " WHERE id_lsb = ? and id_msb = ?"; + private static final String INSERT_INTO_BRIDGES = "INSERT INTO " + BRIDGES_TABLE_NAME + "( id_lsb, id_msb, " + + "create_time, " + + "link_id_lsb, link_id_msb, " + + "arguments )" + + " values (?, ?, ?, ?, ?, ?)"; + + private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; @@ -294,6 +347,8 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor createQueueEntryTable(conn); createMetaDataTable(conn); createMessageContentTable(conn); + createLinkTable(conn); + createBridgeTable(conn); conn.close(); } @@ -430,6 +485,40 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor } + private void createLinkTable(final Connection conn) throws SQLException + { + if(!tableExists(LINKS_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute(CREATE_LINKS_TABLE); + } + finally + { + stmt.close(); + } + } + } + + + private void createBridgeTable(final Connection conn) throws SQLException + { + if(!tableExists(BRIDGES_TABLE_NAME, conn)) + { + Statement stmt = conn.createStatement(); + try + { + stmt.execute(CREATE_BRIDGES_TABLE); + } + finally + { + stmt.close(); + } + } + } + + private boolean tableExists(final String tableName, final Connection conn) throws SQLException @@ -470,7 +559,8 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor List<String> exchanges = loadExchanges(erh); ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery(); recoverBindings(brh, exchanges); - brh.completeBindingRecovery(); + ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery(); + recoverBrokerLinks(lrh); } catch (SQLException e) { @@ -481,6 +571,144 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor } + private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh) + throws SQLException + { + _logger.info("Recovering broker links..."); + + Connection conn = null; + try + { + conn = newAutoCommitConnection(); + + PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_LINKS); + + try + { + ResultSet rs = stmt.executeQuery(); + + try + { + + while(rs.next()) + { + UUID id = new UUID(rs.getLong(2), rs.getLong(1)); + long createTime = rs.getLong(3); + Blob argumentsAsBlob = rs.getBlob(4); + + byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length()); + + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes)); + int size = dis.readInt(); + + Map<String,String> arguments = new HashMap<String, String>(); + + for(int i = 0; i < size; i++) + { + arguments.put(dis.readUTF(), dis.readUTF()); + } + + ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments); + + recoverBridges(brh, id); + + } + } + catch (IOException e) + { + throw new SQLException(e.getMessage(), e); + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + + } + finally + { + if(conn != null) + { + conn.close(); + } + } + + } + + private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId) + throws SQLException + { + _logger.info("Recovering bridges for link " + linkId + "..."); + + Connection conn = null; + try + { + conn = newAutoCommitConnection(); + + PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_BRIDGES); + stmt.setLong(1, linkId.getLeastSignificantBits()); + stmt.setLong(2, linkId.getMostSignificantBits()); + + + try + { + ResultSet rs = stmt.executeQuery(); + + try + { + + while(rs.next()) + { + UUID id = new UUID(rs.getLong(2), rs.getLong(1)); + long createTime = rs.getLong(3); + Blob argumentsAsBlob = rs.getBlob(6); + + byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length()); + + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes)); + int size = dis.readInt(); + + Map<String,String> arguments = new HashMap<String, String>(); + + for(int i = 0; i < size; i++) + { + arguments.put(dis.readUTF(), dis.readUTF()); + } + + brh.bridge(id, createTime, arguments); + + } + brh.completeBridgeRecoveryForLink(); + } + catch (IOException e) + { + throw new SQLException(e.getMessage(), e); + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + + } + finally + { + if(conn != null) + { + conn.close(); + } + } + + } + private void loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException { Connection conn = newAutoCommitConnection(); @@ -1191,6 +1419,233 @@ public class DerbyMessageStore implements MessageStore, DurableConfigurationStor } + public void createBrokerLink(final BrokerLink link) throws AMQStoreException + { + _logger.debug("public void createBrokerLink(BrokerLink = " + link + "): called"); + + if (_state != State.RECOVERING) + { + try + { + Connection conn = newAutoCommitConnection(); + + PreparedStatement stmt = conn.prepareStatement(FIND_LINK); + try + { + + stmt.setLong(1, link.getId().getLeastSignificantBits()); + stmt.setLong(2, link.getId().getMostSignificantBits()); + ResultSet rs = stmt.executeQuery(); + try + { + + // If we don't have any data in the result set then we can add this queue + if (!rs.next()) + { + PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_LINKS); + + try + { + + insertStmt.setLong(1, link.getId().getLeastSignificantBits()); + insertStmt.setLong(2, link.getId().getMostSignificantBits()); + insertStmt.setLong(3, link.getCreateTime()); + + byte[] argumentBytes = convertStringMapToBytes(link.getArguments()); + ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes); + + insertStmt.setBinaryStream(4,bis,argumentBytes.length); + + insertStmt.execute(); + } + finally + { + insertStmt.close(); + } + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + conn.close(); + + } + catch (SQLException e) + { + throw new AMQStoreException("Error writing " + link + " to database: " + e.getMessage(), e); + } + } + } + + private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws AMQStoreException + { + byte[] argumentBytes; + if(arguments == null) + { + argumentBytes = new byte[0]; + } + else + { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + + + try + { + dos.writeInt(arguments.size()); + for(Map.Entry<String,String> arg : arguments.entrySet()) + { + dos.writeUTF(arg.getKey()); + dos.writeUTF(arg.getValue()); + } + } + catch (IOException e) + { + // This should never happen + throw new AMQStoreException(e.getMessage(), e); + } + argumentBytes = bos.toByteArray(); + } + return argumentBytes; + } + + public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException + { + _logger.debug("public void deleteBrokerLink( " + link + "): called"); + Connection conn = null; + PreparedStatement stmt = null; + try + { + conn = newAutoCommitConnection(); + stmt = conn.prepareStatement(DELETE_FROM_LINKS); + stmt.setLong(1, link.getId().getLeastSignificantBits()); + stmt.setLong(2, link.getId().getMostSignificantBits()); + int results = stmt.executeUpdate(); + + if (results == 0) + { + throw new AMQStoreException("Link " + link + " not found"); + } + } + catch (SQLException e) + { + throw new AMQStoreException("Error deleting Link " + link + " from database: " + e.getMessage(), e); + } + finally + { + closePreparedStatement(stmt); + closeConnection(conn); + } + + + } + + public void createBridge(final Bridge bridge) throws AMQStoreException + { + _logger.debug("public void createBridge(BrokerLink = " + bridge + "): called"); + + if (_state != State.RECOVERING) + { + try + { + Connection conn = newAutoCommitConnection(); + + PreparedStatement stmt = conn.prepareStatement(FIND_BRIDGE); + try + { + + UUID id = bridge.getId(); + stmt.setLong(1, id.getLeastSignificantBits()); + stmt.setLong(2, id.getMostSignificantBits()); + ResultSet rs = stmt.executeQuery(); + try + { + + // If we don't have any data in the result set then we can add this queue + if (!rs.next()) + { + PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_BRIDGES); + + try + { + + insertStmt.setLong(1, id.getLeastSignificantBits()); + insertStmt.setLong(2, id.getMostSignificantBits()); + + insertStmt.setLong(3, bridge.getCreateTime()); + + UUID linkId = bridge.getLink().getId(); + insertStmt.setLong(4, linkId.getLeastSignificantBits()); + insertStmt.setLong(5, linkId.getMostSignificantBits()); + + byte[] argumentBytes = convertStringMapToBytes(bridge.getArguments()); + ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes); + + insertStmt.setBinaryStream(6,bis,argumentBytes.length); + + insertStmt.execute(); + } + finally + { + insertStmt.close(); + } + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + conn.close(); + + } + catch (SQLException e) + { + throw new AMQStoreException("Error writing " + bridge + " to database: " + e.getMessage(), e); + } + } + } + + public void deleteBridge(final Bridge bridge) throws AMQStoreException + { + _logger.debug("public void deleteBridge( " + bridge + "): called"); + Connection conn = null; + PreparedStatement stmt = null; + try + { + conn = newAutoCommitConnection(); + stmt = conn.prepareStatement(DELETE_FROM_BRIDGES); + stmt.setLong(1, bridge.getId().getLeastSignificantBits()); + stmt.setLong(2, bridge.getId().getMostSignificantBits()); + int results = stmt.executeUpdate(); + + if (results == 0) + { + throw new AMQStoreException("Bridge " + bridge + " not found"); + } + } + catch (SQLException e) + { + throw new AMQStoreException("Error deleting bridge " + bridge + " from database: " + e.getMessage(), e); + } + finally + { + closePreparedStatement(stmt); + closeConnection(conn); + } + + } + public Transaction newTransaction() { return new DerbyTransaction(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index 5fb23653cb..9cd2567b7d 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -25,6 +25,8 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.federation.Bridge; +import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.queue.AMQQueue; @@ -128,4 +130,12 @@ public interface DurableConfigurationStore * @throws AMQStoreException If the operation fails for any reason. */ void updateQueue(AMQQueue queue) throws AMQStoreException; + + void createBrokerLink(BrokerLink link) throws AMQStoreException; + + void deleteBrokerLink(BrokerLink link) throws AMQStoreException; + + void createBridge(Bridge bridge) throws AMQStoreException; + + void deleteBridge(Bridge bridge) throws AMQStoreException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 005055dbaa..c5393f73a2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -31,6 +31,8 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.federation.Bridge; +import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; @@ -157,6 +159,26 @@ public class MemoryMessageStore implements MessageStore, DurableConfigurationSto // Not required to do anything } + public void createBrokerLink(final BrokerLink link) throws AMQStoreException + { + + } + + public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException + { + + } + + public void createBridge(final Bridge bridge) throws AMQStoreException + { + + } + + public void deleteBridge(final Bridge bridge) throws AMQStoreException + { + + } + public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler, Configuration storeConfiguration, diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 7526b19058..7a2c07b9c8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -724,11 +724,18 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public String toLogString() { - return "[" + + long connectionId = getConnection() instanceof ServerConnection + ? ((ServerConnection) getConnection()).getConnectionId() + : -1; + + String remoteAddress = _connectionConfig instanceof ProtocolEngine + ? ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString() + : ""; + return "[" + MessageFormat.format(CHANNEL_FORMAT, - ((ServerConnection) getConnection()).getConnectionId(), + connectionId, getClientID(), - ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString(), + remoteAddress, getVirtualHost().getName(), getChannel()) + "] "; diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 24ab29444c..0baaf61f77 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.Map; import java.util.UUID; import org.apache.qpid.common.Closeable; @@ -91,6 +92,8 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo boolean durable, String authMechanism, String username, String password); + public BrokerLink createBrokerConnection(UUID id, long createTime, Map<String,String> arguments); + ConfigStore getConfigStore(); void removeBrokerConnection(BrokerLink brokerLink); diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index ce6bfe87e0..51892d965a 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.virtualhost; +import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.MessageStore; @@ -54,11 +55,13 @@ import java.util.ArrayList; import java.util.Map; import java.util.HashMap; import java.util.TreeMap; +import java.util.UUID; public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler, ConfigurationRecoveryHandler.QueueRecoveryHandler, ConfigurationRecoveryHandler.ExchangeRecoveryHandler, ConfigurationRecoveryHandler.BindingRecoveryHandler, + ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler, MessageStoreRecoveryHandler, MessageStoreRecoveryHandler.StoredMessageRecoveryHandler, TransactionLogRecoveryHandler, @@ -180,7 +183,19 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa public void completeMessageRecovery() { //TODO - log end - //To change body of implemented methods use File | Settings | File Templates. + } + + public BridgeRecoveryHandler brokerLink(final UUID id, + final long createTime, + final Map<String, String> arguments) + { + BrokerLink blink = _virtualHost.createBrokerConnection(id, createTime, arguments); + return new BridgeRecoveryHandlerImpl(blink); + + } + + public void completeBrokerLinkRecovery() + { } private static final class ProcessAction @@ -261,9 +276,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } - public void completeBindingRecovery() + public BrokerLinkRecoveryHandler completeBindingRecovery() { - //return this; + return this; } public void complete() @@ -386,4 +401,23 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa return null; } } + + private class BridgeRecoveryHandlerImpl implements BridgeRecoveryHandler + { + private final BrokerLink _blink; + + public BridgeRecoveryHandlerImpl(final BrokerLink blink) + { + _blink = blink; + } + + public void bridge(final UUID id, final long createTime, final Map<String, String> arguments) + { + _blink.createBridge(id, createTime, arguments); + } + + public void completeBridgeRecoveryForLink() + { + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 9b1c5474a3..715c917aa3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -54,6 +54,7 @@ import org.apache.qpid.server.exchange.DefaultExchangeRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.federation.Bridge; import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; @@ -730,6 +731,16 @@ public class VirtualHostImpl implements VirtualHost _statisticsEnabled = enabled; } + public BrokerLink createBrokerConnection(UUID id, long createTime, Map<String,String> arguments) + { + BrokerLink blink = new BrokerLink(this, id, createTime, arguments); + // TODO - cope with duplicate broker link creation requests + _links.putIfAbsent(blink,blink); + getConfigStore().addConfiguredObject(blink); + return blink; + } + + public void createBrokerConnection(final String transport, final String host, final int port, @@ -740,10 +751,11 @@ public class VirtualHostImpl implements VirtualHost final String password) { BrokerLink blink = new BrokerLink(this, transport, host, port, vhost, durable, authMechanism, username, password); - if(_links.putIfAbsent(blink,blink) != null) - { - getConfigStore().addConfiguredObject(blink); - } + + // TODO - cope with duplicate broker link creation requests + _links.putIfAbsent(blink,blink); + getConfigStore().addConfiguredObject(blink); + } public void removeBrokerConnection(final String transport, @@ -782,7 +794,9 @@ public class VirtualHostImpl implements VirtualHost public List<Exchange> exchange = new LinkedList<Exchange>(); public List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>(); public List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>(); - + public List<BrokerLink> links = new LinkedList<BrokerLink>(); + public List<Bridge> bridges = new LinkedList<Bridge>(); + public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception { } @@ -877,6 +891,30 @@ public class VirtualHostImpl implements VirtualHost public void updateQueue(AMQQueue queue) throws AMQStoreException { } + + public void createBrokerLink(final BrokerLink link) throws AMQStoreException + { + if(link.isDurable()) + { + links.add(link); + } + } + + public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException + { + } + + public void createBridge(final Bridge bridge) throws AMQStoreException + { + if(bridge.isDurable()) + { + bridges.add(bridge); + } + } + + public void deleteBridge(final Bridge bridge) throws AMQStoreException + { + } } @Override diff --git a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index 153371c8d9..314b116829 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.Map; import java.util.UUID; import org.apache.qpid.server.binding.BindingFactory; @@ -63,6 +64,11 @@ public class MockVirtualHost implements VirtualHost } + public BrokerLink createBrokerConnection(final UUID id, final long createTime, final Map<String, String> arguments) + { + return null; + } + public IApplicationRegistry getApplicationRegistry() { return null; diff --git a/java/broker/src/xsl/qmf.xsl b/java/broker/src/xsl/qmf.xsl index 3a7e10dac8..1e98c97466 100644 --- a/java/broker/src/xsl/qmf.xsl +++ b/java/broker/src/xsl/qmf.xsl @@ -288,6 +288,17 @@ public class <xsl:value-of select="$ClassName"/> extends QMFPackage <xsl:apply-templates select="node()[name()='property' or name()='statistic']" mode="optionalPropertyPresence"/> <xsl:apply-templates select="node()[name()='property' or name()='statistic']" mode="encodeProperty"/> } + + public String toString() + { + return "QMF<xsl:value-of select="@name"/>GetQueryResponseCommand{id=" + getObject().getId() +<xsl:for-each select="node()[name()='property' or name()='statistic']"> +<xsl:if test="@type!='hilo32' and @type!='mmaTime' "> + + ", <xsl:value-of select="@name"/>=" + getObject().get<xsl:call-template name="initCap"><xsl:with-param name="input"><xsl:value-of select="@name"/></xsl:with-param></xsl:call-template>() +</xsl:if> +</xsl:for-each> + + "}"; + } } @@ -530,6 +541,11 @@ public class <xsl:value-of select="$ClassName"/> extends QMFPackage { return obj.<xsl:value-of select="@name"/>( new <xsl:value-of select="$ClassName"/>ResponseCommandFactory(cmd)<xsl:if test="node()[name()='arg' and ( @dir='I' or @dir='IO' ) ]">, </xsl:if><xsl:apply-templates select="node()[name()='arg' and ( @dir='I' or @dir='IO' ) ]" mode="methodArgList"><xsl:with-param name="prefix">_</xsl:with-param></xsl:apply-templates> ); } + + public String toString() + { + return "<xsl:value-of select="$ClassName"/>["<xsl:for-each select="node()[name()='arg' and ( @dir='I' or @dir='IO' ) ]"><xsl:if test="preceding-sibling::node()[name()='arg' and ( @dir='I' or @dir='IO' ) ]">+ ", "</xsl:if>+ "<xsl:value-of select="@name"/> = " + _<xsl:value-of select="@name"/> </xsl:for-each>+"]"; + } } public final class <xsl:value-of select="$ClassName"/>ResponseCommandFactory diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index fb5cf8b3c6..2d450cf09c 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -26,6 +26,8 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.federation.Bridge; +import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.message.ServerMessage; @@ -320,4 +322,31 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore } + public void createBrokerLink(final BrokerLink link) throws AMQStoreException + { + doPreDelay("createBrokerLink"); + _durableConfigurationStore.createBrokerLink(link); + doPostDelay("createBrokerLink"); + } + + public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException + { + doPreDelay("deleteBrokerLink"); + _durableConfigurationStore.deleteBrokerLink(link); + doPostDelay("deleteBrokerLink"); + } + + public void createBridge(final Bridge bridge) throws AMQStoreException + { + doPreDelay("createBridge"); + _durableConfigurationStore.createBridge(bridge); + doPostDelay("createBridge"); + } + + public void deleteBridge(final Bridge bridge) throws AMQStoreException + { + doPreDelay("deleteBridge"); + _durableConfigurationStore.deleteBridge(bridge); + doPostDelay("deleteBridge"); + } } |
