diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-02-23 22:15:29 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-02-23 22:15:29 +0000 |
| commit | afeb5065ec226e39bb0b6855db63952d9a1ba89c (patch) | |
| tree | 2341895495043707530f839a0be87774f804553e /qpid/java/bdbstore/src | |
| parent | 1b4a67ccc4b501b2b7872881609ee9f0d8c2e3eb (diff) | |
| download | qpid-python-afeb5065ec226e39bb0b6855db63952d9a1ba89c.tar.gz | |
AMQP-24 : [Java Broker] Implement distributed transactions for AMQP 0-10
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1292984 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
5 files changed, 417 insertions, 1 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 29f2a2f2fb..a91d8f359e 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -36,6 +36,7 @@ import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.LockConflictException; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationStatus; +import com.sleepycat.je.Transaction; import com.sleepycat.je.TransactionConfig; import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; @@ -68,14 +69,18 @@ import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5; +import org.apache.qpid.server.store.berkeleydb.keys.Xid; import org.apache.qpid.server.store.berkeleydb.records.BindingRecord; import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord; +import org.apache.qpid.server.store.berkeleydb.records.PreparedTransaction; import org.apache.qpid.server.store.berkeleydb.records.QueueRecord; import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory; import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_5; import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory; +import org.apache.qpid.server.store.berkeleydb.tuples.PreparedTransactionTB; import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryTB; import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory; +import org.apache.qpid.server.store.berkeleydb.tuples.XidTB; import java.io.File; import java.lang.ref.SoftReference; @@ -120,6 +125,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private String QUEUEDB_NAME = "queueDb"; private String BRIDGEDB_NAME = "bridges"; private String LINKDB_NAME = "links"; + private String XIDDB_NAME = "xids"; private Database _messageMetaDataDb; private Database _messageContentDb; @@ -129,6 +135,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private Database _queueDb; private Database _bridgeDb; private Database _linkDb; + private Database _xidDb; /* ======= * Schema: @@ -217,6 +224,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore LINKDB_NAME += "_v" + version; BRIDGEDB_NAME += "_v" + version; + + XIDDB_NAME += "_v" + version; } } @@ -272,6 +281,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } recoverQueueEntries(recoveryHandler); + } @@ -487,6 +497,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore _deliveryDb = openDatabase(DELIVERYDB_NAME, dbConfig); _linkDb = openDatabase(LINKDB_NAME, dbConfig); _bridgeDb = openDatabase(BRIDGEDB_NAME, dbConfig); + _xidDb = openDatabase(XIDDB_NAME, dbConfig); } @@ -564,6 +575,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore _linkDb.close(); } + + if (_xidDb != null) + { + _log.info("Close xid database"); + _xidDb.close(); + } + closeEnvironment(); _state = State.CLOSED; @@ -884,7 +902,52 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } - qerh.completeQueueEntryRecovery(); + + + TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = qerh.completeQueueEntryRecovery(); + + cursor = null; + try + { + cursor = _xidDb.openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + XidTB keyBinding = new XidTB(); + PreparedTransactionTB valueBinding = new PreparedTransactionTB(); + DatabaseEntry value = new DatabaseEntry(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + Xid xid = keyBinding.entryToObject(key); + PreparedTransaction preparedTransaction = valueBinding.entryToObject(value); + dtxrh.dtxRecord(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(), + preparedTransaction.getEnqueues(),preparedTransaction.getDequeues()); + } + + try + { + cursor.close(); + } + finally + { + cursor = null; + } + + } + catch (DatabaseException e) + { + _log.error("Database Error: " + e.getMessage(), e); + throw e; + } + finally + { + if (cursor != null) + { + cursor.close(); + } + } + + + dtxrh.completeDtxRecordRecovery(); } /** @@ -1481,6 +1544,69 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } } + + private void recordXid(com.sleepycat.je.Transaction txn, + long format, + byte[] globalId, + byte[] branchId, + Transaction.Record[] enqueues, + Transaction.Record[] dequeues) throws AMQStoreException + { + DatabaseEntry key = new DatabaseEntry(); + Xid xid = new Xid(format, globalId, branchId); + XidTB keyBinding = new XidTB(); + keyBinding.objectToEntry(xid,key); + + DatabaseEntry value = new DatabaseEntry(); + PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues); + PreparedTransactionTB valueBinding = new PreparedTransactionTB(); + valueBinding.objectToEntry(preparedTransaction, value); + + try + { + _xidDb.put(txn, key, value); + } + catch (DatabaseException e) + { + _log.error("Failed to write xid: " + e.getMessage(), e); + throw new AMQStoreException("Error writing xid to database", e); + } + } + + private void removeXid(com.sleepycat.je.Transaction txn, long format, byte[] globalId, byte[] branchId) + throws AMQStoreException + { + DatabaseEntry key = new DatabaseEntry(); + Xid xid = new Xid(format, globalId, branchId); + XidTB keyBinding = new XidTB(); + + keyBinding.objectToEntry(xid, key); + + + try + { + + OperationStatus status = _xidDb.delete(txn, key); + if (status == OperationStatus.NOTFOUND) + { + throw new AMQStoreException("Unable to find xid"); + } + else if (status != OperationStatus.SUCCESS) + { + throw new AMQStoreException("Unable to remove xid"); + } + + } + catch (DatabaseException e) + { + + _log.error("Failed to remove xid ", e); + _log.error(txn); + + throw new AMQStoreException("Error accessing database while removing xid: " + e.getMessage(), e); + } + } + /** * Commits all operations performed within a given transaction. * @@ -2385,6 +2511,18 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { BDBMessageStore.this.abortTran(_txn); } + + public void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException + { + BDBMessageStore.this.removeXid(_txn, format, globalId, branchId); + } + + public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, + Record[] dequeues) throws AMQStoreException + { + BDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues); + } } + } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/Xid.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/Xid.java new file mode 100644 index 0000000000..f74d67b355 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/Xid.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.store.berkeleydb.keys; + +public class Xid +{ + + private final long _format; + private final byte[] _globalId; + private final byte[] _branchId; + + public Xid(long format, byte[] globalId, byte[] branchId) + { + _format = format; + _globalId = globalId; + _branchId = branchId; + } + + public long getFormat() + { + return _format; + } + + public byte[] getGlobalId() + { + return _globalId; + } + + public byte[] getBranchId() + { + return _branchId; + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/PreparedTransaction.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/PreparedTransaction.java new file mode 100644 index 0000000000..bfd72b9a1f --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/PreparedTransaction.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.store.berkeleydb.records; + +import org.apache.qpid.server.store.MessageStore; + +public class PreparedTransaction +{ + private final MessageStore.Transaction.Record[] _enqueues; + private final MessageStore.Transaction.Record[] _dequeues; + + public PreparedTransaction(MessageStore.Transaction.Record[] enqueues, MessageStore.Transaction.Record[] dequeues) + { + _enqueues = enqueues; + _dequeues = dequeues; + } + + public MessageStore.Transaction.Record[] getEnqueues() + { + return _enqueues; + } + + public MessageStore.Transaction.Record[] getDequeues() + { + return _dequeues; + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/PreparedTransactionTB.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/PreparedTransactionTB.java new file mode 100644 index 0000000000..3eb4cb69b5 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/PreparedTransactionTB.java @@ -0,0 +1,122 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.store.berkeleydb.tuples; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.berkeleydb.records.PreparedTransaction; + +public class PreparedTransactionTB extends TupleBinding<PreparedTransaction> +{ + @Override + public PreparedTransaction entryToObject(TupleInput input) + { + MessageStore.Transaction.Record[] enqueues = readRecords(input); + + MessageStore.Transaction.Record[] dequeues = readRecords(input); + + return new PreparedTransaction(enqueues, dequeues); + } + + private MessageStore.Transaction.Record[] readRecords(TupleInput input) + { + MessageStore.Transaction.Record[] records = new MessageStore.Transaction.Record[input.readInt()]; + for(int i = 0; i < records.length; i++) + { + records[i] = new RecordImpl(input.readString(), input.readLong()); + } + return records; + } + + @Override + public void objectToEntry(PreparedTransaction preparedTransaction, TupleOutput output) + { + writeRecords(preparedTransaction.getEnqueues(), output); + writeRecords(preparedTransaction.getDequeues(), output); + + } + + private void writeRecords(MessageStore.Transaction.Record[] records, TupleOutput output) + { + if(records == null) + { + output.writeInt(0); + } + else + { + output.writeInt(records.length); + for(MessageStore.Transaction.Record record : records) + { + output.writeString(record.getQueue().getResourceName()); + output.writeLong(record.getMessage().getMessageNumber()); + } + } + } + + private static class RecordImpl implements MessageStore.Transaction.Record, TransactionLogResource, EnqueableMessage + { + + private final String _queueName; + private long _messageNumber; + + public RecordImpl(String queueName, long messageNumber) + { + _queueName = queueName; + _messageNumber = messageNumber; + } + + public TransactionLogResource getQueue() + { + return this; + } + + public EnqueableMessage getMessage() + { + return this; + } + + public long getMessageNumber() + { + return _messageNumber; + } + + public boolean isPersistent() + { + return true; + } + + public StoredMessage getStoredMessage() + { + throw new UnsupportedOperationException(); + } + + public String getResourceName() + { + return _queueName; + } + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/XidTB.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/XidTB.java new file mode 100644 index 0000000000..3a5d61b2b6 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/XidTB.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.store.berkeleydb.tuples; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; +import org.apache.qpid.server.store.berkeleydb.keys.Xid; + +public class XidTB extends TupleBinding<Xid> +{ + @Override + public Xid entryToObject(TupleInput input) + { + long format = input.readLong(); + byte[] globalId = new byte[input.readInt()]; + input.readFast(globalId); + byte[] branchId = new byte[input.readInt()]; + input.readFast(branchId); + return new Xid(format,globalId,branchId); + } + + @Override + public void objectToEntry(Xid xid, TupleOutput output) + { + output.writeLong(xid.getFormat()); + output.writeInt(xid.getGlobalId() == null ? 0 : xid.getGlobalId().length); + if(xid.getGlobalId() != null) + { + output.write(xid.getGlobalId()); + } + output.writeInt(xid.getBranchId() == null ? 0 : xid.getBranchId().length); + if(xid.getBranchId() != null) + { + output.write(xid.getBranchId()); + } + + } +} |
