diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2013-12-03 21:46:39 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2013-12-03 21:46:39 +0000 |
| commit | be7f29b88c183c6c24e15539d7a9b4a58a6d420e (patch) | |
| tree | 56586945c6c38c03df97219fd9fb35ee058c355c /qpid/cpp/src | |
| parent | d88a384d89cf3439683393514c076797d5ceff5e (diff) | |
| download | qpid-python-be7f29b88c183c6c24e15539d7a9b4a58a6d420e.tar.gz | |
QPID-5358: Checksum not implemented in record tail, not checked during read. Implemented the Alder-32 algorithm to check every record from the start of the header to the start of the record tail. Upon recovery, the recovered record checksum is compared to the record tail checksum. This should detect the condition where a multi-page record may have its header and tail written, but one or more of its inbetween pages may not be fully written.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1547601 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/linearstore.cmake | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/ISSUES | 12 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp | 43 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/Checksum.h | 54 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp | 27 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp | 25 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/deq_rec.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp | 51 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/enq_rec.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/jrec.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp | 25 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/txn_rec.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp | 13 |
13 files changed, 211 insertions, 50 deletions
diff --git a/qpid/cpp/src/linearstore.cmake b/qpid/cpp/src/linearstore.cmake index 8568cdbb77..d576f78bef 100644 --- a/qpid/cpp/src/linearstore.cmake +++ b/qpid/cpp/src/linearstore.cmake @@ -93,6 +93,7 @@ if (BUILD_LINEARSTORE) # Journal source files set (linear_jrnl_SOURCES + qpid/linearstore/journal/Checksum.cpp qpid/linearstore/journal/data_tok.cpp qpid/linearstore/journal/deq_rec.cpp qpid/linearstore/journal/EmptyFilePool.cpp diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES index 38eeecd1d0..c3e7e4632b 100644 --- a/qpid/cpp/src/qpid/linearstore/ISSUES +++ b/qpid/cpp/src/qpid/linearstore/ISSUES @@ -6,10 +6,10 @@ Store: 1. (SOLVED) Overwrite identity: When recovering a previously used file, if the write boundary coincides with old record start, no way of discriminating old from new at boundary (used to use OWI). -2. QPID-5357: Recycling files while in use not working, however, files are recovered to EFP during recovery. Must solve +2. (SOLVED) QPID-5357: Recycling files while in use not working, however, files are recovered to EFP during recovery. Must solve #1 first. -3. QPID-5358: Checksum not implemented in record tail, not checked during read. +3. (SOLVED) QPID-5358: Checksum not implemented in record tail, not checked during read. 4. QPID-5359: Rework qpid management parameters and controls (QMF). @@ -27,9 +27,15 @@ Store: * Store analysis and status * Recovery/reading of message content +Current bugs and performance issues: +------------------------------------ +1. RH Bugzilla 1035843 - Slow performance for producers +2. RH Bugzilla 1036071 - Crash when deleting queue +3. RH Bugzilla 1035802 - Broker won't recover with durable queue +4. RH Bugzilla 1036026 - Unable to create durable queue - framing error + Code tidy-up ------------ - * Remove old comments * Use c++ cast templates instead of (xxx)y * Member names: xxx_ diff --git a/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp b/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp new file mode 100644 index 0000000000..89d654ce76 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/linearstore/journal/Checksum.h" + +namespace qpid { +namespace linearstore { +namespace journal { + +Checksum::Checksum() : a(1UL), b(0UL), MOD_ADLER(65521UL) {} + +Checksum::~Checksum() {} + +void Checksum::addData(const unsigned char* data, const std::size_t len) { + for (uint32_t i = 0; i < len; i++) { + a = (a + data[i]) % MOD_ADLER; + b = (a + b) % MOD_ADLER; + } +} + +uint32_t Checksum::getChecksum() { + return (b << 16) | a; +} + +}}} diff --git a/qpid/cpp/src/qpid/linearstore/journal/Checksum.h b/qpid/cpp/src/qpid/linearstore/journal/Checksum.h new file mode 100644 index 0000000000..d96aac2991 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/journal/Checksum.h @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_LINEARSTORE_JOURNAL_CHECKSUM_H_ +#define QPID_LINEARSTORE_JOURNAL_CHECKSUM_H_ + +#include <cstddef> +#include <stdint.h> + +namespace qpid { +namespace linearstore { +namespace journal { + +/* + * This checksum routine uses the Adler-32 algorithm as described in + * http://en.wikipedia.org/wiki/Adler-32. It is structured so that the + * data for which the checksum must be calculated can be added in several + * stages through the addData() function, and when complete, the checksum + * is obtained through a call to getChecksum(). + */ +class Checksum +{ +private: + uint32_t a; + uint32_t b; + const uint32_t MOD_ADLER; +public: + Checksum(); + virtual ~Checksum(); + void addData(const unsigned char* data, const std::size_t len); + uint32_t getChecksum(); +}; + +}}} + +#endif // QPID_LINEARSTORE_JOURNAL_CHECKSUM_H_ diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp index 22241aa164..a8d24366de 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp @@ -24,6 +24,7 @@ #include <algorithm> #include <cstdlib> #include <iomanip> +#include "qpid/linearstore/journal/Checksum.h" #include "qpid/linearstore/journal/data_tok.h" #include "qpid/linearstore/journal/deq_rec.h" #include "qpid/linearstore/journal/EmptyFilePool.h" @@ -201,6 +202,32 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr, } readJournalData((char*)*dataPtrPtr, dataSize); + // Check enqueue record checksum + Checksum checksum; + checksum.addData((unsigned char*)&enqueueHeader, sizeof(::enq_hdr_t)); + if (xidSize > 0) { + checksum.addData((unsigned char*)*xidPtrPtr, xidSize); + } + if (dataSize > 0) { + checksum.addData((unsigned char*)*dataPtrPtr, dataSize); + } + ::rec_tail_t enqueueTail; + inFileStream_.read((char*)&enqueueTail, sizeof(::rec_tail_t)); + uint32_t cs = checksum.getChecksum(); +//std::cout << std::hex << "### rid=0x" << enqueueHeader._rhdr._rid << " rtcs=0x" << enqueueTail._checksum << " cs=0x" << cs << std::dec << std::endl; // DEBUG + int res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs); + if (res != 0) { + std::stringstream oss; + switch (res) { + case 1: oss << std::hex << "Magic: expected 0x" << ~enqueueHeader._rhdr._magic << "; found 0x" << enqueueTail._xmagic; break; + case 2: oss << std::hex << "Serial: expected 0x" << enqueueHeader._rhdr._serial << "; found 0x" << enqueueTail._serial; break; + case 3: oss << std::hex << "Record Id: expected 0x" << enqueueHeader._rhdr._rid << "; found 0x" << enqueueTail._rid; break; + case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 0x" << enqueueTail._checksum; break; + default: oss << "Unknown error " << res; + } + throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "enq_rec", "decode"); // TODO: Don't throw exception, log info + } + // Set data token dtokp->set_wstate(data_tok::ENQ); dtokp->set_rid(enqueueHeader._rhdr._rid); diff --git a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp index 0da8d439af..8b9e9d7f64 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp @@ -23,6 +23,7 @@ #include <cassert> #include <cstring> +#include "qpid/linearstore/journal/Checksum.h" #include "qpid/linearstore/journal/jexception.h" namespace qpid { @@ -55,10 +56,11 @@ deq_rec::reset(const uint64_t serial, const uint64_t rid, const uint64_t drid, _buff = 0; _deq_tail._serial = serial; _deq_tail._rid = rid; + _deq_tail._checksum = 0UL; } uint32_t -deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) +deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum) { assert(wptr != 0); assert(max_size_dblks > 0); @@ -68,6 +70,7 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES; std::size_t rem = max_size_dblks * QLS_DBLK_SIZE_BYTES; std::size_t wr_cnt = 0; + if (rec_offs_dblks) // Continuation of split dequeue record (over 2 or more pages) { if (size_dblks(rec_size()) - rec_offs_dblks > max_size_dblks) // Further split required @@ -84,8 +87,10 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) rem -= wsize; } rec_offs -= _deq_hdr._xidsize - wsize2; + checksum.addData((unsigned char*)wptr, wr_cnt); if (rem) { + _deq_tail._checksum = checksum.getChecksum(); wsize = sizeof(_deq_tail) > rec_offs ? sizeof(_deq_tail) - rec_offs : 0; wsize2 = wsize; if (wsize) @@ -109,8 +114,10 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) { std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize); wr_cnt += wsize; + checksum.addData((unsigned char*)wptr, wr_cnt); } rec_offs -= _deq_hdr._xidsize - wsize; + _deq_tail._checksum = checksum.getChecksum(); wsize = sizeof(_deq_tail) > rec_offs ? sizeof(_deq_tail) - rec_offs : 0; if (wsize) { @@ -142,8 +149,10 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) wr_cnt += wsize; rem -= wsize; } + checksum.addData((unsigned char*)wptr, wr_cnt); if (rem) { + _deq_tail._checksum = checksum.getChecksum(); wsize = rem >= sizeof(_deq_tail) ? sizeof(_deq_tail) : rem; std::memcpy((char*)wptr + wr_cnt, (void*)&_deq_tail, wsize); wr_cnt += wsize; @@ -157,6 +166,8 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) { std::memcpy((char*)wptr + wr_cnt, _xidp, _deq_hdr._xidsize); wr_cnt += _deq_hdr._xidsize; + checksum.addData((unsigned char*)wptr, wr_cnt); + _deq_tail._checksum = checksum.getChecksum(); std::memcpy((char*)wptr + wr_cnt, (void*)&_deq_tail, sizeof(_deq_tail)); wr_cnt += sizeof(_deq_tail); } @@ -172,14 +183,12 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) bool deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) { - uint32_t checksum = 0UL; // TODO: Add checksum math if (rec_offs == 0) { - //_deq_hdr.hdr_copy(h); ::rec_hdr_copy(&_deq_hdr._rhdr, &h); ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(_deq_hdr._deq_rid)); ifsp->read((char*)&_deq_hdr._xidsize, sizeof(_deq_hdr._xidsize)); - rec_offs = sizeof(_deq_hdr); + rec_offs = sizeof(::deq_hdr_t); // Read header, allocate (if req'd) for xid if (_deq_hdr._xidsize) { @@ -223,14 +232,18 @@ deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size()); assert(!ifsp->fail() && !ifsp->bad()); if (_deq_hdr._xidsize) { - int res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, checksum); + Checksum checksum; + checksum.addData((unsigned char*)&_deq_hdr, sizeof(_deq_hdr)); + checksum.addData((unsigned char*)_buff, _deq_hdr._xidsize); + uint32_t cs = checksum.getChecksum(); + int res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, cs); if (res != 0) { std::stringstream oss; switch (res) { case 1: oss << std::hex << "Magic: expected 0x" << ~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic; break; case 2: oss << std::hex << "Serial: expected 0x" << _deq_hdr._rhdr._serial << "; found 0x" << _deq_tail._serial; break; case 3: oss << std::hex << "Record Id: expected 0x" << _deq_hdr._rhdr._rid << "; found 0x" << _deq_tail._rid; break; - case 4: oss << std::hex << "Checksum: expected 0x" << checksum << "; found 0x" << _deq_tail._checksum; break; + case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 0x" << _deq_tail._checksum; break; default: oss << "Unknown error " << res; } throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "deq_rec", "decode"); // TODO: Don't throw exception, log info diff --git a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h index 5b283c15d5..c7f78e1215 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h +++ b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h @@ -48,7 +48,7 @@ public: void reset(const uint64_t serial, const uint64_t rid, const uint64_t drid, const void* const xidp, const std::size_t xidlen, const bool txn_coml_commit); - uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks); + uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum); bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs); inline bool is_txn_coml_commit() const { return ::is_txn_coml_commit(&_deq_hdr); } diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp index 9dcb2d616e..a4f19b3a7b 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp @@ -23,6 +23,7 @@ #include <cassert> #include <cstring> +#include "qpid/linearstore/journal/Checksum.h" #include "qpid/linearstore/journal/jexception.h" namespace qpid { @@ -62,7 +63,7 @@ enq_rec::reset(const uint64_t serial, const uint64_t rid, const void* const dbuf } uint32_t -enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) +enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum) { assert(wptr != 0); assert(max_size_dblks > 0); @@ -102,8 +103,10 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) } rec_offs -= _enq_hdr._dsize - wsize2; } + checksum.addData((unsigned char*)wptr, wr_cnt); if (rem) { + _enq_tail._checksum = checksum.getChecksum(); wsize = sizeof(_enq_tail) > rec_offs ? sizeof(_enq_tail) - rec_offs : 0; wsize2 = wsize; if (wsize) @@ -122,21 +125,25 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) else // No further split required { rec_offs -= sizeof(_enq_hdr); - std::size_t wsize = _enq_hdr._xidsize > rec_offs ? _enq_hdr._xidsize - rec_offs : 0; - if (wsize) + std::size_t xid_wsize = _enq_hdr._xidsize > rec_offs ? _enq_hdr._xidsize - rec_offs : 0; + if (xid_wsize) { - std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize); - wr_cnt += wsize; + std::memcpy(wptr, (const char*)_xidp + rec_offs, xid_wsize); + wr_cnt += xid_wsize; } - rec_offs -= _enq_hdr._xidsize - wsize; - wsize = _enq_hdr._dsize > rec_offs ? _enq_hdr._dsize - rec_offs : 0; - if (wsize && !::is_enq_external(&_enq_hdr)) + rec_offs -= _enq_hdr._xidsize - xid_wsize; + std::size_t data_wsize = _enq_hdr._dsize > rec_offs ? _enq_hdr._dsize - rec_offs : 0; + if (data_wsize && !::is_enq_external(&_enq_hdr)) { - std::memcpy((char*)wptr + wr_cnt, (const char*)_data + rec_offs, wsize); - wr_cnt += wsize; + std::memcpy((char*)wptr + wr_cnt, (const char*)_data + rec_offs, data_wsize); + wr_cnt += data_wsize; + } + rec_offs -= _enq_hdr._dsize - data_wsize; + if (xid_wsize || data_wsize) { + checksum.addData((unsigned char*)wptr, wr_cnt); } - rec_offs -= _enq_hdr._dsize - wsize; - wsize = sizeof(_enq_tail) > rec_offs ? sizeof(_enq_tail) - rec_offs : 0; + _enq_tail._checksum = checksum.getChecksum(); + std::size_t wsize = sizeof(_enq_tail) > rec_offs ? sizeof(_enq_tail) - rec_offs : 0; if (wsize) { std::memcpy((char*)wptr + wr_cnt, (char*)&_enq_tail + rec_offs, wsize); @@ -174,8 +181,10 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) wr_cnt += wsize; rem -= wsize; } + checksum.addData((unsigned char*)wptr, wr_cnt); if (rem) { + _enq_tail._checksum = checksum.getChecksum(); wsize = rem >= sizeof(_enq_tail) ? sizeof(_enq_tail) : rem; std::memcpy((char*)wptr + wr_cnt, (void*)&_enq_tail, wsize); wr_cnt += wsize; @@ -195,6 +204,8 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) std::memcpy((char*)wptr + wr_cnt, _data, _enq_hdr._dsize); wr_cnt += _enq_hdr._dsize; } + checksum.addData((unsigned char*)wptr, wr_cnt); + _enq_tail._checksum = checksum.getChecksum(); std::memcpy((char*)wptr + wr_cnt, (void*)&_enq_tail, sizeof(_enq_tail)); wr_cnt += sizeof(_enq_tail); #ifdef QLS_CLEAN @@ -209,15 +220,13 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) bool enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) { - uint32_t checksum = 0UL; // TODO: Add checksum math if (rec_offs == 0) { // Read header, allocate (if req'd) for xid - //_enq_hdr.hdr_copy(h); ::rec_hdr_copy(&_enq_hdr._rhdr, &h); ifsp->read((char*)&_enq_hdr._xidsize, sizeof(_enq_hdr._xidsize)); ifsp->read((char*)&_enq_hdr._dsize, sizeof(_enq_hdr._dsize)); - rec_offs = sizeof(_enq_hdr); + rec_offs = sizeof(::enq_hdr_t); if (_enq_hdr._xidsize > 0) { _buff = std::malloc(_enq_hdr._xidsize); @@ -280,18 +289,6 @@ enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) } ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size()); assert(!ifsp->fail() && !ifsp->bad()); - int res = ::rec_tail_check(&_enq_tail, &_enq_hdr._rhdr, checksum); - if (res != 0) { - std::stringstream oss; - switch (res) { - case 1: oss << std::hex << "Magic: expected 0x" << ~_enq_hdr._rhdr._magic << "; found 0x" << _enq_tail._xmagic; break; - case 2: oss << std::hex << "Serial: expected 0x" << _enq_hdr._rhdr._serial << "; found 0x" << _enq_tail._serial; break; - case 3: oss << std::hex << "Record Id: expected 0x" << _enq_hdr._rhdr._rid << "; found 0x" << _enq_tail._rid; break; - case 4: oss << std::hex << "Checksum: expected 0x" << checksum << "; found 0x" << _enq_tail._checksum; break; - default: oss << "Unknown error " << res; - } - throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "enq_rec", "decode"); // TODO: Don't throw exception, log info - } return true; } diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h index 0ce956425c..439203c052 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h +++ b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h @@ -49,7 +49,7 @@ public: void reset(const uint64_t serial, const uint64_t rid, const void* const dbuf, const std::size_t dlen, const void* const xidp, const std::size_t xidlen, const bool transient, const bool external); - uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks); + uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum); bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs); std::size_t get_xid(void** const xidpp); diff --git a/qpid/cpp/src/qpid/linearstore/journal/jrec.h b/qpid/cpp/src/qpid/linearstore/journal/jrec.h index 7cb6df13a4..7645e646f6 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jrec.h +++ b/qpid/cpp/src/qpid/linearstore/journal/jrec.h @@ -32,6 +32,8 @@ namespace qpid { namespace linearstore { namespace journal { +class Checksum; + /** * \class jrec * \brief Abstract class for all file jrecords, both data and log. This class establishes @@ -95,7 +97,7 @@ public: * \param max_size_dblks Maximum number of data-blocks to write to pointer wptr. * \returns Number of data-blocks encoded. */ - virtual uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) = 0; + virtual uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum) = 0; virtual bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) = 0; virtual std::string& str(std::string& str) const = 0; diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp index 37448f2a8d..b5d2e51ec0 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp @@ -23,6 +23,7 @@ #include <cassert> #include <cstring> +#include "qpid/linearstore/journal/Checksum.h" #include "qpid/linearstore/journal/jexception.h" namespace qpid { @@ -55,10 +56,11 @@ txn_rec::reset(const bool commitFlag, const uint64_t serial, const uint64_t rid _txn_tail._xmagic = ~_txn_hdr._rhdr._magic; _txn_tail._serial = serial; _txn_tail._rid = rid; + _txn_tail._checksum = 0UL; } uint32_t -txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) +txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum) { assert(wptr != 0); assert(max_size_dblks > 0); @@ -83,8 +85,10 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) rem -= wsize; } rec_offs -= _txn_hdr._xidsize - wsize2; + checksum.addData((unsigned char*)wptr, wr_cnt); if (rem) { + _txn_tail._checksum = checksum.getChecksum(); wsize = sizeof(_txn_tail) > rec_offs ? sizeof(_txn_tail) - rec_offs : 0; wsize2 = wsize; if (wsize) @@ -108,8 +112,10 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) { std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize); wr_cnt += wsize; + checksum.addData((unsigned char*)wptr, wr_cnt); } rec_offs -= _txn_hdr._xidsize - wsize; + _txn_tail._checksum = checksum.getChecksum(); wsize = sizeof(_txn_tail) > rec_offs ? sizeof(_txn_tail) - rec_offs : 0; if (wsize) { @@ -141,8 +147,10 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) wr_cnt += wsize; rem -= wsize; } + checksum.addData((unsigned char*)wptr, wr_cnt); if (rem) { + _txn_tail._checksum = checksum.getChecksum(); wsize = rem >= sizeof(_txn_tail) ? sizeof(_txn_tail) : rem; std::memcpy((char*)wptr + wr_cnt, (void*)&_txn_tail, wsize); wr_cnt += wsize; @@ -154,6 +162,8 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) { std::memcpy((char*)wptr + wr_cnt, _xidp, _txn_hdr._xidsize); wr_cnt += _txn_hdr._xidsize; + checksum.addData((unsigned char*)wptr, wr_cnt); + _txn_tail._checksum = checksum.getChecksum(); std::memcpy((char*)wptr + wr_cnt, (void*)&_txn_tail, sizeof(_txn_tail)); wr_cnt += sizeof(_txn_tail); #ifdef QLS_CLEAN @@ -168,14 +178,12 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) bool txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) { - uint32_t checksum = 0UL; // TODO: Add checksum math if (rec_offs == 0) { // Read header, allocate for xid - //_txn_hdr.hdr_copy(h); ::rec_hdr_copy(&_txn_hdr._rhdr, &h); ifsp->read((char*)&_txn_hdr._xidsize, sizeof(_txn_hdr._xidsize)); - rec_offs = sizeof(txn_hdr_t); + rec_offs = sizeof(::txn_hdr_t); _buff = std::malloc(_txn_hdr._xidsize); MALLOC_CHK(_buff, "_buff", "txn_rec", "rcv_decode"); } @@ -216,14 +224,19 @@ txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) throw jexception(jerrno::JERR_JREC_BADRECTAIL); // TODO: complete exception detail } assert(!ifsp->fail() && !ifsp->bad()); - int res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, checksum); + assert(_txn_hdr._xidsize > 0); + Checksum checksum; + checksum.addData((unsigned char*)&_txn_hdr, sizeof(_txn_hdr)); + checksum.addData((unsigned char*)_buff, _txn_hdr._xidsize); + uint32_t cs = checksum.getChecksum(); + int res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, cs); if (res != 0) { std::stringstream oss; switch (res) { case 1: oss << std::hex << "Magic: expected 0x" << ~_txn_hdr._rhdr._magic << "; found 0x" << _txn_tail._xmagic; break; case 2: oss << std::hex << "Serial: expected 0x" << _txn_hdr._rhdr._serial << "; found 0x" << _txn_tail._serial; break; case 3: oss << std::hex << "Record Id: expected 0x" << _txn_hdr._rhdr._rid << "; found 0x" << _txn_tail._rid; break; - case 4: oss << std::hex << "Checksum: expected 0x" << checksum << "; found 0x" << _txn_tail._checksum; break; + case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 0x" << _txn_tail._checksum; break; default: oss << "Unknown error " << res; } throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "txn_rec", "decode"); // TODO: Don't throw exception, log info diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h index 29f52ec46a..a9224a4a01 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h +++ b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h @@ -48,7 +48,7 @@ public: void reset(const bool commitFlag, const uint64_t serial, const uint64_t rid, const void* const xidp, const std::size_t xidlen); - uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks); + uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum); bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs); std::size_t get_xid(void** const xidpp); diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp index e308f4ab06..6eaa8835be 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp @@ -23,6 +23,7 @@ #include <cassert> #include "qpid/linearstore/journal/aio_callback.h" +#include "qpid/linearstore/journal/Checksum.h" #include "qpid/linearstore/journal/data_tok.h" #include "qpid/linearstore/journal/jcntl.h" #include "qpid/linearstore/journal/JournalFile.h" @@ -158,6 +159,7 @@ wmgr::enqueue(const void* const data_buff, } //std::cout << "---+++ wmgr::enqueue() ENQ rid=0x" << std::hex << rid << " po=0x" << _pg_offset_dblks << " cs=0x" << (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) << " " << std::dec << std::flush; // DEBUG bool done = false; + Checksum checksum; while (!done) { //std::cout << "*" << std::flush; // DEBUG @@ -165,7 +167,7 @@ wmgr::enqueue(const void* const data_buff, void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _enq_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks); + (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks, checksum); // Remember fid which contains the record header in case record is split over several files if (data_offs_dblks == 0) { @@ -278,6 +280,7 @@ wmgr::dequeue(data_tok* dtokp, } //std::cout << "---+++ wmgr::dequeue() DEQ rid=0x" << std::hex << rid << " drid=0x" << dequeue_rid << " " << std::dec << std::flush; // DEBUG bool done = false; + Checksum checksum; while (!done) { //std::cout << "*" << std::flush; // DEBUG @@ -285,7 +288,7 @@ wmgr::dequeue(data_tok* dtokp, void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _deq_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks); + (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks, checksum); // Remember fid which contains the record header in case record is split over several files if (data_offs_dblks == 0) { @@ -396,13 +399,14 @@ wmgr::abort(data_tok* dtokp, _abort_busy = true; } bool done = false; + Checksum checksum; while (!done) { assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS); void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks); + (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks, checksum); // Remember fid which contains the record header in case record is split over several files if (data_offs_dblks == 0) @@ -494,13 +498,14 @@ wmgr::commit(data_tok* dtokp, _commit_busy = true; } bool done = false; + Checksum checksum; while (!done) { assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS); void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks); + (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks, checksum); // Remember fid which contains the record header in case record is split over several files if (data_offs_dblks == 0) |
