diff options
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/linearstore.cmake | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/ISSUES | 12 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp | 43 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/Checksum.h | 54 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp | 27 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp | 25 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/deq_rec.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp | 51 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/enq_rec.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/jrec.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp | 25 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/txn_rec.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp | 13 |
13 files changed, 211 insertions, 50 deletions
diff --git a/qpid/cpp/src/linearstore.cmake b/qpid/cpp/src/linearstore.cmake index 8568cdbb77..d576f78bef 100644 --- a/qpid/cpp/src/linearstore.cmake +++ b/qpid/cpp/src/linearstore.cmake @@ -93,6 +93,7 @@ if (BUILD_LINEARSTORE) # Journal source files set (linear_jrnl_SOURCES + qpid/linearstore/journal/Checksum.cpp qpid/linearstore/journal/data_tok.cpp qpid/linearstore/journal/deq_rec.cpp qpid/linearstore/journal/EmptyFilePool.cpp diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES index 38eeecd1d0..c3e7e4632b 100644 --- a/qpid/cpp/src/qpid/linearstore/ISSUES +++ b/qpid/cpp/src/qpid/linearstore/ISSUES @@ -6,10 +6,10 @@ Store: 1. (SOLVED) Overwrite identity: When recovering a previously used file, if the write boundary coincides with old record start, no way of discriminating old from new at boundary (used to use OWI). -2. QPID-5357: Recycling files while in use not working, however, files are recovered to EFP during recovery. Must solve +2. (SOLVED) QPID-5357: Recycling files while in use not working, however, files are recovered to EFP during recovery. Must solve #1 first. -3. QPID-5358: Checksum not implemented in record tail, not checked during read. +3. (SOLVED) QPID-5358: Checksum not implemented in record tail, not checked during read. 4. QPID-5359: Rework qpid management parameters and controls (QMF). @@ -27,9 +27,15 @@ Store: * Store analysis and status * Recovery/reading of message content +Current bugs and performance issues: +------------------------------------ +1. RH Bugzilla 1035843 - Slow performance for producers +2. RH Bugzilla 1036071 - Crash when deleting queue +3. RH Bugzilla 1035802 - Broker won't recover with durable queue +4. RH Bugzilla 1036026 - Unable to create durable queue - framing error + Code tidy-up ------------ - * Remove old comments * Use c++ cast templates instead of (xxx)y * Member names: xxx_ diff --git a/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp b/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp new file mode 100644 index 0000000000..89d654ce76 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/linearstore/journal/Checksum.h" + +namespace qpid { +namespace linearstore { +namespace journal { + +Checksum::Checksum() : a(1UL), b(0UL), MOD_ADLER(65521UL) {} + +Checksum::~Checksum() {} + +void Checksum::addData(const unsigned char* data, const std::size_t len) { + for (uint32_t i = 0; i < len; i++) { + a = (a + data[i]) % MOD_ADLER; + b = (a + b) % MOD_ADLER; + } +} + +uint32_t Checksum::getChecksum() { + return (b << 16) | a; +} + +}}} diff --git a/qpid/cpp/src/qpid/linearstore/journal/Checksum.h b/qpid/cpp/src/qpid/linearstore/journal/Checksum.h new file mode 100644 index 0000000000..d96aac2991 --- /dev/null +++ b/qpid/cpp/src/qpid/linearstore/journal/Checksum.h @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_LINEARSTORE_JOURNAL_CHECKSUM_H_ +#define QPID_LINEARSTORE_JOURNAL_CHECKSUM_H_ + +#include <cstddef> +#include <stdint.h> + +namespace qpid { +namespace linearstore { +namespace journal { + +/* + * This checksum routine uses the Adler-32 algorithm as described in + * http://en.wikipedia.org/wiki/Adler-32. It is structured so that the + * data for which the checksum must be calculated can be added in several + * stages through the addData() function, and when complete, the checksum + * is obtained through a call to getChecksum(). + */ +class Checksum +{ +private: + uint32_t a; + uint32_t b; + const uint32_t MOD_ADLER; +public: + Checksum(); + virtual ~Checksum(); + void addData(const unsigned char* data, const std::size_t len); + uint32_t getChecksum(); +}; + +}}} + +#endif // QPID_LINEARSTORE_JOURNAL_CHECKSUM_H_ diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp index 22241aa164..a8d24366de 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp @@ -24,6 +24,7 @@ #include <algorithm> #include <cstdlib> #include <iomanip> +#include "qpid/linearstore/journal/Checksum.h" #include "qpid/linearstore/journal/data_tok.h" #include "qpid/linearstore/journal/deq_rec.h" #include "qpid/linearstore/journal/EmptyFilePool.h" @@ -201,6 +202,32 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr, } readJournalData((char*)*dataPtrPtr, dataSize); + // Check enqueue record checksum + Checksum checksum; + checksum.addData((unsigned char*)&enqueueHeader, sizeof(::enq_hdr_t)); + if (xidSize > 0) { + checksum.addData((unsigned char*)*xidPtrPtr, xidSize); + } + if (dataSize > 0) { + checksum.addData((unsigned char*)*dataPtrPtr, dataSize); + } + ::rec_tail_t enqueueTail; + inFileStream_.read((char*)&enqueueTail, sizeof(::rec_tail_t)); + uint32_t cs = checksum.getChecksum(); +//std::cout << std::hex << "### rid=0x" << enqueueHeader._rhdr._rid << " rtcs=0x" << enqueueTail._checksum << " cs=0x" << cs << std::dec << std::endl; // DEBUG + int res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs); + if (res != 0) { + std::stringstream oss; + switch (res) { + case 1: oss << std::hex << "Magic: expected 0x" << ~enqueueHeader._rhdr._magic << "; found 0x" << enqueueTail._xmagic; break; + case 2: oss << std::hex << "Serial: expected 0x" << enqueueHeader._rhdr._serial << "; found 0x" << enqueueTail._serial; break; + case 3: oss << std::hex << "Record Id: expected 0x" << enqueueHeader._rhdr._rid << "; found 0x" << enqueueTail._rid; break; + case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 0x" << enqueueTail._checksum; break; + default: oss << "Unknown error " << res; + } + throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "enq_rec", "decode"); // TODO: Don't throw exception, log info + } + // Set data token dtokp->set_wstate(data_tok::ENQ); dtokp->set_rid(enqueueHeader._rhdr._rid); diff --git a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp index 0da8d439af..8b9e9d7f64 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp @@ -23,6 +23,7 @@ #include <cassert> #include <cstring> +#include "qpid/linearstore/journal/Checksum.h" #include "qpid/linearstore/journal/jexception.h" namespace qpid { @@ -55,10 +56,11 @@ deq_rec::reset(const uint64_t serial, const uint64_t rid, const uint64_t drid, _buff = 0; _deq_tail._serial = serial; _deq_tail._rid = rid; + _deq_tail._checksum = 0UL; } uint32_t -deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) +deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum) { assert(wptr != 0); assert(max_size_dblks > 0); @@ -68,6 +70,7 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES; std::size_t rem = max_size_dblks * QLS_DBLK_SIZE_BYTES; std::size_t wr_cnt = 0; + if (rec_offs_dblks) // Continuation of split dequeue record (over 2 or more pages) { if (size_dblks(rec_size()) - rec_offs_dblks > max_size_dblks) // Further split required @@ -84,8 +87,10 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) rem -= wsize; } rec_offs -= _deq_hdr._xidsize - wsize2; + checksum.addData((unsigned char*)wptr, wr_cnt); if (rem) { + _deq_tail._checksum = checksum.getChecksum(); wsize = sizeof(_deq_tail) > rec_offs ? sizeof(_deq_tail) - rec_offs : 0; wsize2 = wsize; if (wsize) @@ -109,8 +114,10 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) { std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize); wr_cnt += wsize; + checksum.addData((unsigned char*)wptr, wr_cnt); } rec_offs -= _deq_hdr._xidsize - wsize; + _deq_tail._checksum = checksum.getChecksum(); wsize = sizeof(_deq_tail) > rec_offs ? sizeof(_deq_tail) - rec_offs : 0; if (wsize) { @@ -142,8 +149,10 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) wr_cnt += wsize; rem -= wsize; } + checksum.addData((unsigned char*)wptr, wr_cnt); if (rem) { + _deq_tail._checksum = checksum.getChecksum(); wsize = rem >= sizeof(_deq_tail) ? sizeof(_deq_tail) : rem; std::memcpy((char*)wptr + wr_cnt, (void*)&_deq_tail, wsize); wr_cnt += wsize; @@ -157,6 +166,8 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) { std::memcpy((char*)wptr + wr_cnt, _xidp, _deq_hdr._xidsize); wr_cnt += _deq_hdr._xidsize; + checksum.addData((unsigned char*)wptr, wr_cnt); + _deq_tail._checksum = checksum.getChecksum(); std::memcpy((char*)wptr + wr_cnt, (void*)&_deq_tail, sizeof(_deq_tail)); wr_cnt += sizeof(_deq_tail); } @@ -172,14 +183,12 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) bool deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) { - uint32_t checksum = 0UL; // TODO: Add checksum math if (rec_offs == 0) { - //_deq_hdr.hdr_copy(h); ::rec_hdr_copy(&_deq_hdr._rhdr, &h); ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(_deq_hdr._deq_rid)); ifsp->read((char*)&_deq_hdr._xidsize, sizeof(_deq_hdr._xidsize)); - rec_offs = sizeof(_deq_hdr); + rec_offs = sizeof(::deq_hdr_t); // Read header, allocate (if req'd) for xid if (_deq_hdr._xidsize) { @@ -223,14 +232,18 @@ deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size()); assert(!ifsp->fail() && !ifsp->bad()); if (_deq_hdr._xidsize) { - int res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, checksum); + Checksum checksum; + checksum.addData((unsigned char*)&_deq_hdr, sizeof(_deq_hdr)); + checksum.addData((unsigned char*)_buff, _deq_hdr._xidsize); + uint32_t cs = checksum.getChecksum(); + int res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, cs); if (res != 0) { std::stringstream oss; switch (res) { case 1: oss << std::hex << "Magic: expected 0x" << ~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic; break; case 2: oss << std::hex << "Serial: expected 0x" << _deq_hdr._rhdr._serial << "; found 0x" << _deq_tail._serial; break; case 3: oss << std::hex << "Record Id: expected 0x" << _deq_hdr._rhdr._rid << "; found 0x" << _deq_tail._rid; break; - case 4: oss << std::hex << "Checksum: expected 0x" << checksum << "; found 0x" << _deq_tail._checksum; break; + case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 0x" << _deq_tail._checksum; break; default: oss << "Unknown error " << res; } throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "deq_rec", "decode"); // TODO: Don't throw exception, log info diff --git a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h index 5b283c15d5..c7f78e1215 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h +++ b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h @@ -48,7 +48,7 @@ public: void reset(const uint64_t serial, const uint64_t rid, const uint64_t drid, const void* const xidp, const std::size_t xidlen, const bool txn_coml_commit); - uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks); + uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum); bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs); inline bool is_txn_coml_commit() const { return ::is_txn_coml_commit(&_deq_hdr); } diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp index 9dcb2d616e..a4f19b3a7b 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp @@ -23,6 +23,7 @@ #include <cassert> #include <cstring> +#include "qpid/linearstore/journal/Checksum.h" #include "qpid/linearstore/journal/jexception.h" namespace qpid { @@ -62,7 +63,7 @@ enq_rec::reset(const uint64_t serial, const uint64_t rid, const void* const dbuf } uint32_t -enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) +enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum) { assert(wptr != 0); assert(max_size_dblks > 0); @@ -102,8 +103,10 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) } rec_offs -= _enq_hdr._dsize - wsize2; } + checksum.addData((unsigned char*)wptr, wr_cnt); if (rem) { + _enq_tail._checksum = checksum.getChecksum(); wsize = sizeof(_enq_tail) > rec_offs ? sizeof(_enq_tail) - rec_offs : 0; wsize2 = wsize; if (wsize) @@ -122,21 +125,25 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) else // No further split required { rec_offs -= sizeof(_enq_hdr); - std::size_t wsize = _enq_hdr._xidsize > rec_offs ? _enq_hdr._xidsize - rec_offs : 0; - if (wsize) + std::size_t xid_wsize = _enq_hdr._xidsize > rec_offs ? _enq_hdr._xidsize - rec_offs : 0; + if (xid_wsize) { - std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize); - wr_cnt += wsize; + std::memcpy(wptr, (const char*)_xidp + rec_offs, xid_wsize); + wr_cnt += xid_wsize; } - rec_offs -= _enq_hdr._xidsize - wsize; - wsize = _enq_hdr._dsize > rec_offs ? _enq_hdr._dsize - rec_offs : 0; - if (wsize && !::is_enq_external(&_enq_hdr)) + rec_offs -= _enq_hdr._xidsize - xid_wsize; + std::size_t data_wsize = _enq_hdr._dsize > rec_offs ? _enq_hdr._dsize - rec_offs : 0; + if (data_wsize && !::is_enq_external(&_enq_hdr)) { - std::memcpy((char*)wptr + wr_cnt, (const char*)_data + rec_offs, wsize); - wr_cnt += wsize; + std::memcpy((char*)wptr + wr_cnt, (const char*)_data + rec_offs, data_wsize); + wr_cnt += data_wsize; + } + rec_offs -= _enq_hdr._dsize - data_wsize; + if (xid_wsize || data_wsize) { + checksum.addData((unsigned char*)wptr, wr_cnt); } - rec_offs -= _enq_hdr._dsize - wsize; - wsize = sizeof(_enq_tail) > rec_offs ? sizeof(_enq_tail) - rec_offs : 0; + _enq_tail._checksum = checksum.getChecksum(); + std::size_t wsize = sizeof(_enq_tail) > rec_offs ? sizeof(_enq_tail) - rec_offs : 0; if (wsize) { std::memcpy((char*)wptr + wr_cnt, (char*)&_enq_tail + rec_offs, wsize); @@ -174,8 +181,10 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) wr_cnt += wsize; rem -= wsize; } + checksum.addData((unsigned char*)wptr, wr_cnt); if (rem) { + _enq_tail._checksum = checksum.getChecksum(); wsize = rem >= sizeof(_enq_tail) ? sizeof(_enq_tail) : rem; std::memcpy((char*)wptr + wr_cnt, (void*)&_enq_tail, wsize); wr_cnt += wsize; @@ -195,6 +204,8 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) std::memcpy((char*)wptr + wr_cnt, _data, _enq_hdr._dsize); wr_cnt += _enq_hdr._dsize; } + checksum.addData((unsigned char*)wptr, wr_cnt); + _enq_tail._checksum = checksum.getChecksum(); std::memcpy((char*)wptr + wr_cnt, (void*)&_enq_tail, sizeof(_enq_tail)); wr_cnt += sizeof(_enq_tail); #ifdef QLS_CLEAN @@ -209,15 +220,13 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) bool enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) { - uint32_t checksum = 0UL; // TODO: Add checksum math if (rec_offs == 0) { // Read header, allocate (if req'd) for xid - //_enq_hdr.hdr_copy(h); ::rec_hdr_copy(&_enq_hdr._rhdr, &h); ifsp->read((char*)&_enq_hdr._xidsize, sizeof(_enq_hdr._xidsize)); ifsp->read((char*)&_enq_hdr._dsize, sizeof(_enq_hdr._dsize)); - rec_offs = sizeof(_enq_hdr); + rec_offs = sizeof(::enq_hdr_t); if (_enq_hdr._xidsize > 0) { _buff = std::malloc(_enq_hdr._xidsize); @@ -280,18 +289,6 @@ enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) } ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size()); assert(!ifsp->fail() && !ifsp->bad()); - int res = ::rec_tail_check(&_enq_tail, &_enq_hdr._rhdr, checksum); - if (res != 0) { - std::stringstream oss; - switch (res) { - case 1: oss << std::hex << "Magic: expected 0x" << ~_enq_hdr._rhdr._magic << "; found 0x" << _enq_tail._xmagic; break; - case 2: oss << std::hex << "Serial: expected 0x" << _enq_hdr._rhdr._serial << "; found 0x" << _enq_tail._serial; break; - case 3: oss << std::hex << "Record Id: expected 0x" << _enq_hdr._rhdr._rid << "; found 0x" << _enq_tail._rid; break; - case 4: oss << std::hex << "Checksum: expected 0x" << checksum << "; found 0x" << _enq_tail._checksum; break; - default: oss << "Unknown error " << res; - } - throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "enq_rec", "decode"); // TODO: Don't throw exception, log info - } return true; } diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h index 0ce956425c..439203c052 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h +++ b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h @@ -49,7 +49,7 @@ public: void reset(const uint64_t serial, const uint64_t rid, const void* const dbuf, const std::size_t dlen, const void* const xidp, const std::size_t xidlen, const bool transient, const bool external); - uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks); + uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum); bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs); std::size_t get_xid(void** const xidpp); diff --git a/qpid/cpp/src/qpid/linearstore/journal/jrec.h b/qpid/cpp/src/qpid/linearstore/journal/jrec.h index 7cb6df13a4..7645e646f6 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jrec.h +++ b/qpid/cpp/src/qpid/linearstore/journal/jrec.h @@ -32,6 +32,8 @@ namespace qpid { namespace linearstore { namespace journal { +class Checksum; + /** * \class jrec * \brief Abstract class for all file jrecords, both data and log. This class establishes @@ -95,7 +97,7 @@ public: * \param max_size_dblks Maximum number of data-blocks to write to pointer wptr. * \returns Number of data-blocks encoded. */ - virtual uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) = 0; + virtual uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum) = 0; virtual bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) = 0; virtual std::string& str(std::string& str) const = 0; diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp index 37448f2a8d..b5d2e51ec0 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp @@ -23,6 +23,7 @@ #include <cassert> #include <cstring> +#include "qpid/linearstore/journal/Checksum.h" #include "qpid/linearstore/journal/jexception.h" namespace qpid { @@ -55,10 +56,11 @@ txn_rec::reset(const bool commitFlag, const uint64_t serial, const uint64_t rid _txn_tail._xmagic = ~_txn_hdr._rhdr._magic; _txn_tail._serial = serial; _txn_tail._rid = rid; + _txn_tail._checksum = 0UL; } uint32_t -txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) +txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum) { assert(wptr != 0); assert(max_size_dblks > 0); @@ -83,8 +85,10 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) rem -= wsize; } rec_offs -= _txn_hdr._xidsize - wsize2; + checksum.addData((unsigned char*)wptr, wr_cnt); if (rem) { + _txn_tail._checksum = checksum.getChecksum(); wsize = sizeof(_txn_tail) > rec_offs ? sizeof(_txn_tail) - rec_offs : 0; wsize2 = wsize; if (wsize) @@ -108,8 +112,10 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) { std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize); wr_cnt += wsize; + checksum.addData((unsigned char*)wptr, wr_cnt); } rec_offs -= _txn_hdr._xidsize - wsize; + _txn_tail._checksum = checksum.getChecksum(); wsize = sizeof(_txn_tail) > rec_offs ? sizeof(_txn_tail) - rec_offs : 0; if (wsize) { @@ -141,8 +147,10 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) wr_cnt += wsize; rem -= wsize; } + checksum.addData((unsigned char*)wptr, wr_cnt); if (rem) { + _txn_tail._checksum = checksum.getChecksum(); wsize = rem >= sizeof(_txn_tail) ? sizeof(_txn_tail) : rem; std::memcpy((char*)wptr + wr_cnt, (void*)&_txn_tail, wsize); wr_cnt += wsize; @@ -154,6 +162,8 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) { std::memcpy((char*)wptr + wr_cnt, _xidp, _txn_hdr._xidsize); wr_cnt += _txn_hdr._xidsize; + checksum.addData((unsigned char*)wptr, wr_cnt); + _txn_tail._checksum = checksum.getChecksum(); std::memcpy((char*)wptr + wr_cnt, (void*)&_txn_tail, sizeof(_txn_tail)); wr_cnt += sizeof(_txn_tail); #ifdef QLS_CLEAN @@ -168,14 +178,12 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) bool txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) { - uint32_t checksum = 0UL; // TODO: Add checksum math if (rec_offs == 0) { // Read header, allocate for xid - //_txn_hdr.hdr_copy(h); ::rec_hdr_copy(&_txn_hdr._rhdr, &h); ifsp->read((char*)&_txn_hdr._xidsize, sizeof(_txn_hdr._xidsize)); - rec_offs = sizeof(txn_hdr_t); + rec_offs = sizeof(::txn_hdr_t); _buff = std::malloc(_txn_hdr._xidsize); MALLOC_CHK(_buff, "_buff", "txn_rec", "rcv_decode"); } @@ -216,14 +224,19 @@ txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) throw jexception(jerrno::JERR_JREC_BADRECTAIL); // TODO: complete exception detail } assert(!ifsp->fail() && !ifsp->bad()); - int res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, checksum); + assert(_txn_hdr._xidsize > 0); + Checksum checksum; + checksum.addData((unsigned char*)&_txn_hdr, sizeof(_txn_hdr)); + checksum.addData((unsigned char*)_buff, _txn_hdr._xidsize); + uint32_t cs = checksum.getChecksum(); + int res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, cs); if (res != 0) { std::stringstream oss; switch (res) { case 1: oss << std::hex << "Magic: expected 0x" << ~_txn_hdr._rhdr._magic << "; found 0x" << _txn_tail._xmagic; break; case 2: oss << std::hex << "Serial: expected 0x" << _txn_hdr._rhdr._serial << "; found 0x" << _txn_tail._serial; break; case 3: oss << std::hex << "Record Id: expected 0x" << _txn_hdr._rhdr._rid << "; found 0x" << _txn_tail._rid; break; - case 4: oss << std::hex << "Checksum: expected 0x" << checksum << "; found 0x" << _txn_tail._checksum; break; + case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 0x" << _txn_tail._checksum; break; default: oss << "Unknown error " << res; } throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "txn_rec", "decode"); // TODO: Don't throw exception, log info diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h index 29f52ec46a..a9224a4a01 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h +++ b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h @@ -48,7 +48,7 @@ public: void reset(const bool commitFlag, const uint64_t serial, const uint64_t rid, const void* const xidp, const std::size_t xidlen); - uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks); + uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum); bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs); std::size_t get_xid(void** const xidpp); diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp index e308f4ab06..6eaa8835be 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp @@ -23,6 +23,7 @@ #include <cassert> #include "qpid/linearstore/journal/aio_callback.h" +#include "qpid/linearstore/journal/Checksum.h" #include "qpid/linearstore/journal/data_tok.h" #include "qpid/linearstore/journal/jcntl.h" #include "qpid/linearstore/journal/JournalFile.h" @@ -158,6 +159,7 @@ wmgr::enqueue(const void* const data_buff, } //std::cout << "---+++ wmgr::enqueue() ENQ rid=0x" << std::hex << rid << " po=0x" << _pg_offset_dblks << " cs=0x" << (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) << " " << std::dec << std::flush; // DEBUG bool done = false; + Checksum checksum; while (!done) { //std::cout << "*" << std::flush; // DEBUG @@ -165,7 +167,7 @@ wmgr::enqueue(const void* const data_buff, void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _enq_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks); + (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks, checksum); // Remember fid which contains the record header in case record is split over several files if (data_offs_dblks == 0) { @@ -278,6 +280,7 @@ wmgr::dequeue(data_tok* dtokp, } //std::cout << "---+++ wmgr::dequeue() DEQ rid=0x" << std::hex << rid << " drid=0x" << dequeue_rid << " " << std::dec << std::flush; // DEBUG bool done = false; + Checksum checksum; while (!done) { //std::cout << "*" << std::flush; // DEBUG @@ -285,7 +288,7 @@ wmgr::dequeue(data_tok* dtokp, void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _deq_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks); + (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks, checksum); // Remember fid which contains the record header in case record is split over several files if (data_offs_dblks == 0) { @@ -396,13 +399,14 @@ wmgr::abort(data_tok* dtokp, _abort_busy = true; } bool done = false; + Checksum checksum; while (!done) { assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS); void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks); + (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks, checksum); // Remember fid which contains the record header in case record is split over several files if (data_offs_dblks == 0) @@ -494,13 +498,14 @@ wmgr::commit(data_tok* dtokp, _commit_busy = true; } bool done = false; + Checksum checksum; while (!done) { assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS); void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks); + (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks, checksum); // Remember fid which contains the record header in case record is split over several files if (data_offs_dblks == 0) |
