summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/linearstore.cmake1
-rw-r--r--qpid/cpp/src/qpid/linearstore/ISSUES12
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp43
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/Checksum.h54
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp27
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp25
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/deq_rec.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp51
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/enq_rec.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jrec.h4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp25
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/txn_rec.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp13
13 files changed, 211 insertions, 50 deletions
diff --git a/qpid/cpp/src/linearstore.cmake b/qpid/cpp/src/linearstore.cmake
index 8568cdbb77..d576f78bef 100644
--- a/qpid/cpp/src/linearstore.cmake
+++ b/qpid/cpp/src/linearstore.cmake
@@ -93,6 +93,7 @@ if (BUILD_LINEARSTORE)
# Journal source files
set (linear_jrnl_SOURCES
+ qpid/linearstore/journal/Checksum.cpp
qpid/linearstore/journal/data_tok.cpp
qpid/linearstore/journal/deq_rec.cpp
qpid/linearstore/journal/EmptyFilePool.cpp
diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES
index 38eeecd1d0..c3e7e4632b 100644
--- a/qpid/cpp/src/qpid/linearstore/ISSUES
+++ b/qpid/cpp/src/qpid/linearstore/ISSUES
@@ -6,10 +6,10 @@ Store:
1. (SOLVED) Overwrite identity: When recovering a previously used file, if the write boundary coincides with old record
start, no way of discriminating old from new at boundary (used to use OWI).
-2. QPID-5357: Recycling files while in use not working, however, files are recovered to EFP during recovery. Must solve
+2. (SOLVED) QPID-5357: Recycling files while in use not working, however, files are recovered to EFP during recovery. Must solve
#1 first.
-3. QPID-5358: Checksum not implemented in record tail, not checked during read.
+3. (SOLVED) QPID-5358: Checksum not implemented in record tail, not checked during read.
4. QPID-5359: Rework qpid management parameters and controls (QMF).
@@ -27,9 +27,15 @@ Store:
* Store analysis and status
* Recovery/reading of message content
+Current bugs and performance issues:
+------------------------------------
+1. RH Bugzilla 1035843 - Slow performance for producers
+2. RH Bugzilla 1036071 - Crash when deleting queue
+3. RH Bugzilla 1035802 - Broker won't recover with durable queue
+4. RH Bugzilla 1036026 - Unable to create durable queue - framing error
+
Code tidy-up
------------
-
* Remove old comments
* Use c++ cast templates instead of (xxx)y
* Member names: xxx_
diff --git a/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp b/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp
new file mode 100644
index 0000000000..89d654ce76
--- /dev/null
+++ b/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/linearstore/journal/Checksum.h"
+
+namespace qpid {
+namespace linearstore {
+namespace journal {
+
+Checksum::Checksum() : a(1UL), b(0UL), MOD_ADLER(65521UL) {}
+
+Checksum::~Checksum() {}
+
+void Checksum::addData(const unsigned char* data, const std::size_t len) {
+ for (uint32_t i = 0; i < len; i++) {
+ a = (a + data[i]) % MOD_ADLER;
+ b = (a + b) % MOD_ADLER;
+ }
+}
+
+uint32_t Checksum::getChecksum() {
+ return (b << 16) | a;
+}
+
+}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/Checksum.h b/qpid/cpp/src/qpid/linearstore/journal/Checksum.h
new file mode 100644
index 0000000000..d96aac2991
--- /dev/null
+++ b/qpid/cpp/src/qpid/linearstore/journal/Checksum.h
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef QPID_LINEARSTORE_JOURNAL_CHECKSUM_H_
+#define QPID_LINEARSTORE_JOURNAL_CHECKSUM_H_
+
+#include <cstddef>
+#include <stdint.h>
+
+namespace qpid {
+namespace linearstore {
+namespace journal {
+
+/*
+ * This checksum routine uses the Adler-32 algorithm as described in
+ * http://en.wikipedia.org/wiki/Adler-32. It is structured so that the
+ * data for which the checksum must be calculated can be added in several
+ * stages through the addData() function, and when complete, the checksum
+ * is obtained through a call to getChecksum().
+ */
+class Checksum
+{
+private:
+ uint32_t a;
+ uint32_t b;
+ const uint32_t MOD_ADLER;
+public:
+ Checksum();
+ virtual ~Checksum();
+ void addData(const unsigned char* data, const std::size_t len);
+ uint32_t getChecksum();
+};
+
+}}}
+
+#endif // QPID_LINEARSTORE_JOURNAL_CHECKSUM_H_
diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
index 22241aa164..a8d24366de 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
@@ -24,6 +24,7 @@
#include <algorithm>
#include <cstdlib>
#include <iomanip>
+#include "qpid/linearstore/journal/Checksum.h"
#include "qpid/linearstore/journal/data_tok.h"
#include "qpid/linearstore/journal/deq_rec.h"
#include "qpid/linearstore/journal/EmptyFilePool.h"
@@ -201,6 +202,32 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr,
}
readJournalData((char*)*dataPtrPtr, dataSize);
+ // Check enqueue record checksum
+ Checksum checksum;
+ checksum.addData((unsigned char*)&enqueueHeader, sizeof(::enq_hdr_t));
+ if (xidSize > 0) {
+ checksum.addData((unsigned char*)*xidPtrPtr, xidSize);
+ }
+ if (dataSize > 0) {
+ checksum.addData((unsigned char*)*dataPtrPtr, dataSize);
+ }
+ ::rec_tail_t enqueueTail;
+ inFileStream_.read((char*)&enqueueTail, sizeof(::rec_tail_t));
+ uint32_t cs = checksum.getChecksum();
+//std::cout << std::hex << "### rid=0x" << enqueueHeader._rhdr._rid << " rtcs=0x" << enqueueTail._checksum << " cs=0x" << cs << std::dec << std::endl; // DEBUG
+ int res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs);
+ if (res != 0) {
+ std::stringstream oss;
+ switch (res) {
+ case 1: oss << std::hex << "Magic: expected 0x" << ~enqueueHeader._rhdr._magic << "; found 0x" << enqueueTail._xmagic; break;
+ case 2: oss << std::hex << "Serial: expected 0x" << enqueueHeader._rhdr._serial << "; found 0x" << enqueueTail._serial; break;
+ case 3: oss << std::hex << "Record Id: expected 0x" << enqueueHeader._rhdr._rid << "; found 0x" << enqueueTail._rid; break;
+ case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 0x" << enqueueTail._checksum; break;
+ default: oss << "Unknown error " << res;
+ }
+ throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "enq_rec", "decode"); // TODO: Don't throw exception, log info
+ }
+
// Set data token
dtokp->set_wstate(data_tok::ENQ);
dtokp->set_rid(enqueueHeader._rhdr._rid);
diff --git a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp
index 0da8d439af..8b9e9d7f64 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp
@@ -23,6 +23,7 @@
#include <cassert>
#include <cstring>
+#include "qpid/linearstore/journal/Checksum.h"
#include "qpid/linearstore/journal/jexception.h"
namespace qpid {
@@ -55,10 +56,11 @@ deq_rec::reset(const uint64_t serial, const uint64_t rid, const uint64_t drid,
_buff = 0;
_deq_tail._serial = serial;
_deq_tail._rid = rid;
+ _deq_tail._checksum = 0UL;
}
uint32_t
-deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
+deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum)
{
assert(wptr != 0);
assert(max_size_dblks > 0);
@@ -68,6 +70,7 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
std::size_t rec_offs = rec_offs_dblks * QLS_DBLK_SIZE_BYTES;
std::size_t rem = max_size_dblks * QLS_DBLK_SIZE_BYTES;
std::size_t wr_cnt = 0;
+
if (rec_offs_dblks) // Continuation of split dequeue record (over 2 or more pages)
{
if (size_dblks(rec_size()) - rec_offs_dblks > max_size_dblks) // Further split required
@@ -84,8 +87,10 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
rem -= wsize;
}
rec_offs -= _deq_hdr._xidsize - wsize2;
+ checksum.addData((unsigned char*)wptr, wr_cnt);
if (rem)
{
+ _deq_tail._checksum = checksum.getChecksum();
wsize = sizeof(_deq_tail) > rec_offs ? sizeof(_deq_tail) - rec_offs : 0;
wsize2 = wsize;
if (wsize)
@@ -109,8 +114,10 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
{
std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
wr_cnt += wsize;
+ checksum.addData((unsigned char*)wptr, wr_cnt);
}
rec_offs -= _deq_hdr._xidsize - wsize;
+ _deq_tail._checksum = checksum.getChecksum();
wsize = sizeof(_deq_tail) > rec_offs ? sizeof(_deq_tail) - rec_offs : 0;
if (wsize)
{
@@ -142,8 +149,10 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
wr_cnt += wsize;
rem -= wsize;
}
+ checksum.addData((unsigned char*)wptr, wr_cnt);
if (rem)
{
+ _deq_tail._checksum = checksum.getChecksum();
wsize = rem >= sizeof(_deq_tail) ? sizeof(_deq_tail) : rem;
std::memcpy((char*)wptr + wr_cnt, (void*)&_deq_tail, wsize);
wr_cnt += wsize;
@@ -157,6 +166,8 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
{
std::memcpy((char*)wptr + wr_cnt, _xidp, _deq_hdr._xidsize);
wr_cnt += _deq_hdr._xidsize;
+ checksum.addData((unsigned char*)wptr, wr_cnt);
+ _deq_tail._checksum = checksum.getChecksum();
std::memcpy((char*)wptr + wr_cnt, (void*)&_deq_tail, sizeof(_deq_tail));
wr_cnt += sizeof(_deq_tail);
}
@@ -172,14 +183,12 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
bool
deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
{
- uint32_t checksum = 0UL; // TODO: Add checksum math
if (rec_offs == 0)
{
- //_deq_hdr.hdr_copy(h);
::rec_hdr_copy(&_deq_hdr._rhdr, &h);
ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(_deq_hdr._deq_rid));
ifsp->read((char*)&_deq_hdr._xidsize, sizeof(_deq_hdr._xidsize));
- rec_offs = sizeof(_deq_hdr);
+ rec_offs = sizeof(::deq_hdr_t);
// Read header, allocate (if req'd) for xid
if (_deq_hdr._xidsize)
{
@@ -223,14 +232,18 @@ deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
assert(!ifsp->fail() && !ifsp->bad());
if (_deq_hdr._xidsize) {
- int res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, checksum);
+ Checksum checksum;
+ checksum.addData((unsigned char*)&_deq_hdr, sizeof(_deq_hdr));
+ checksum.addData((unsigned char*)_buff, _deq_hdr._xidsize);
+ uint32_t cs = checksum.getChecksum();
+ int res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, cs);
if (res != 0) {
std::stringstream oss;
switch (res) {
case 1: oss << std::hex << "Magic: expected 0x" << ~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic; break;
case 2: oss << std::hex << "Serial: expected 0x" << _deq_hdr._rhdr._serial << "; found 0x" << _deq_tail._serial; break;
case 3: oss << std::hex << "Record Id: expected 0x" << _deq_hdr._rhdr._rid << "; found 0x" << _deq_tail._rid; break;
- case 4: oss << std::hex << "Checksum: expected 0x" << checksum << "; found 0x" << _deq_tail._checksum; break;
+ case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 0x" << _deq_tail._checksum; break;
default: oss << "Unknown error " << res;
}
throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "deq_rec", "decode"); // TODO: Don't throw exception, log info
diff --git a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h
index 5b283c15d5..c7f78e1215 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h
@@ -48,7 +48,7 @@ public:
void reset(const uint64_t serial, const uint64_t rid, const uint64_t drid, const void* const xidp,
const std::size_t xidlen, const bool txn_coml_commit);
- uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
+ uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum);
bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs);
inline bool is_txn_coml_commit() const { return ::is_txn_coml_commit(&_deq_hdr); }
diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
index 9dcb2d616e..a4f19b3a7b 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
@@ -23,6 +23,7 @@
#include <cassert>
#include <cstring>
+#include "qpid/linearstore/journal/Checksum.h"
#include "qpid/linearstore/journal/jexception.h"
namespace qpid {
@@ -62,7 +63,7 @@ enq_rec::reset(const uint64_t serial, const uint64_t rid, const void* const dbuf
}
uint32_t
-enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
+enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum)
{
assert(wptr != 0);
assert(max_size_dblks > 0);
@@ -102,8 +103,10 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
}
rec_offs -= _enq_hdr._dsize - wsize2;
}
+ checksum.addData((unsigned char*)wptr, wr_cnt);
if (rem)
{
+ _enq_tail._checksum = checksum.getChecksum();
wsize = sizeof(_enq_tail) > rec_offs ? sizeof(_enq_tail) - rec_offs : 0;
wsize2 = wsize;
if (wsize)
@@ -122,21 +125,25 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
else // No further split required
{
rec_offs -= sizeof(_enq_hdr);
- std::size_t wsize = _enq_hdr._xidsize > rec_offs ? _enq_hdr._xidsize - rec_offs : 0;
- if (wsize)
+ std::size_t xid_wsize = _enq_hdr._xidsize > rec_offs ? _enq_hdr._xidsize - rec_offs : 0;
+ if (xid_wsize)
{
- std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
- wr_cnt += wsize;
+ std::memcpy(wptr, (const char*)_xidp + rec_offs, xid_wsize);
+ wr_cnt += xid_wsize;
}
- rec_offs -= _enq_hdr._xidsize - wsize;
- wsize = _enq_hdr._dsize > rec_offs ? _enq_hdr._dsize - rec_offs : 0;
- if (wsize && !::is_enq_external(&_enq_hdr))
+ rec_offs -= _enq_hdr._xidsize - xid_wsize;
+ std::size_t data_wsize = _enq_hdr._dsize > rec_offs ? _enq_hdr._dsize - rec_offs : 0;
+ if (data_wsize && !::is_enq_external(&_enq_hdr))
{
- std::memcpy((char*)wptr + wr_cnt, (const char*)_data + rec_offs, wsize);
- wr_cnt += wsize;
+ std::memcpy((char*)wptr + wr_cnt, (const char*)_data + rec_offs, data_wsize);
+ wr_cnt += data_wsize;
+ }
+ rec_offs -= _enq_hdr._dsize - data_wsize;
+ if (xid_wsize || data_wsize) {
+ checksum.addData((unsigned char*)wptr, wr_cnt);
}
- rec_offs -= _enq_hdr._dsize - wsize;
- wsize = sizeof(_enq_tail) > rec_offs ? sizeof(_enq_tail) - rec_offs : 0;
+ _enq_tail._checksum = checksum.getChecksum();
+ std::size_t wsize = sizeof(_enq_tail) > rec_offs ? sizeof(_enq_tail) - rec_offs : 0;
if (wsize)
{
std::memcpy((char*)wptr + wr_cnt, (char*)&_enq_tail + rec_offs, wsize);
@@ -174,8 +181,10 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
wr_cnt += wsize;
rem -= wsize;
}
+ checksum.addData((unsigned char*)wptr, wr_cnt);
if (rem)
{
+ _enq_tail._checksum = checksum.getChecksum();
wsize = rem >= sizeof(_enq_tail) ? sizeof(_enq_tail) : rem;
std::memcpy((char*)wptr + wr_cnt, (void*)&_enq_tail, wsize);
wr_cnt += wsize;
@@ -195,6 +204,8 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
std::memcpy((char*)wptr + wr_cnt, _data, _enq_hdr._dsize);
wr_cnt += _enq_hdr._dsize;
}
+ checksum.addData((unsigned char*)wptr, wr_cnt);
+ _enq_tail._checksum = checksum.getChecksum();
std::memcpy((char*)wptr + wr_cnt, (void*)&_enq_tail, sizeof(_enq_tail));
wr_cnt += sizeof(_enq_tail);
#ifdef QLS_CLEAN
@@ -209,15 +220,13 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
bool
enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
{
- uint32_t checksum = 0UL; // TODO: Add checksum math
if (rec_offs == 0)
{
// Read header, allocate (if req'd) for xid
- //_enq_hdr.hdr_copy(h);
::rec_hdr_copy(&_enq_hdr._rhdr, &h);
ifsp->read((char*)&_enq_hdr._xidsize, sizeof(_enq_hdr._xidsize));
ifsp->read((char*)&_enq_hdr._dsize, sizeof(_enq_hdr._dsize));
- rec_offs = sizeof(_enq_hdr);
+ rec_offs = sizeof(::enq_hdr_t);
if (_enq_hdr._xidsize > 0)
{
_buff = std::malloc(_enq_hdr._xidsize);
@@ -280,18 +289,6 @@ enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
}
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
assert(!ifsp->fail() && !ifsp->bad());
- int res = ::rec_tail_check(&_enq_tail, &_enq_hdr._rhdr, checksum);
- if (res != 0) {
- std::stringstream oss;
- switch (res) {
- case 1: oss << std::hex << "Magic: expected 0x" << ~_enq_hdr._rhdr._magic << "; found 0x" << _enq_tail._xmagic; break;
- case 2: oss << std::hex << "Serial: expected 0x" << _enq_hdr._rhdr._serial << "; found 0x" << _enq_tail._serial; break;
- case 3: oss << std::hex << "Record Id: expected 0x" << _enq_hdr._rhdr._rid << "; found 0x" << _enq_tail._rid; break;
- case 4: oss << std::hex << "Checksum: expected 0x" << checksum << "; found 0x" << _enq_tail._checksum; break;
- default: oss << "Unknown error " << res;
- }
- throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "enq_rec", "decode"); // TODO: Don't throw exception, log info
- }
return true;
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h
index 0ce956425c..439203c052 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h
@@ -49,7 +49,7 @@ public:
void reset(const uint64_t serial, const uint64_t rid, const void* const dbuf, const std::size_t dlen,
const void* const xidp, const std::size_t xidlen, const bool transient, const bool external);
- uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
+ uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum);
bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs);
std::size_t get_xid(void** const xidpp);
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jrec.h b/qpid/cpp/src/qpid/linearstore/journal/jrec.h
index 7cb6df13a4..7645e646f6 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jrec.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/jrec.h
@@ -32,6 +32,8 @@ namespace qpid {
namespace linearstore {
namespace journal {
+class Checksum;
+
/**
* \class jrec
* \brief Abstract class for all file jrecords, both data and log. This class establishes
@@ -95,7 +97,7 @@ public:
* \param max_size_dblks Maximum number of data-blocks to write to pointer wptr.
* \returns Number of data-blocks encoded.
*/
- virtual uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks) = 0;
+ virtual uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum) = 0;
virtual bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) = 0;
virtual std::string& str(std::string& str) const = 0;
diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
index 37448f2a8d..b5d2e51ec0 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
@@ -23,6 +23,7 @@
#include <cassert>
#include <cstring>
+#include "qpid/linearstore/journal/Checksum.h"
#include "qpid/linearstore/journal/jexception.h"
namespace qpid {
@@ -55,10 +56,11 @@ txn_rec::reset(const bool commitFlag, const uint64_t serial, const uint64_t rid
_txn_tail._xmagic = ~_txn_hdr._rhdr._magic;
_txn_tail._serial = serial;
_txn_tail._rid = rid;
+ _txn_tail._checksum = 0UL;
}
uint32_t
-txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
+txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum)
{
assert(wptr != 0);
assert(max_size_dblks > 0);
@@ -83,8 +85,10 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
rem -= wsize;
}
rec_offs -= _txn_hdr._xidsize - wsize2;
+ checksum.addData((unsigned char*)wptr, wr_cnt);
if (rem)
{
+ _txn_tail._checksum = checksum.getChecksum();
wsize = sizeof(_txn_tail) > rec_offs ? sizeof(_txn_tail) - rec_offs : 0;
wsize2 = wsize;
if (wsize)
@@ -108,8 +112,10 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
{
std::memcpy(wptr, (const char*)_xidp + rec_offs, wsize);
wr_cnt += wsize;
+ checksum.addData((unsigned char*)wptr, wr_cnt);
}
rec_offs -= _txn_hdr._xidsize - wsize;
+ _txn_tail._checksum = checksum.getChecksum();
wsize = sizeof(_txn_tail) > rec_offs ? sizeof(_txn_tail) - rec_offs : 0;
if (wsize)
{
@@ -141,8 +147,10 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
wr_cnt += wsize;
rem -= wsize;
}
+ checksum.addData((unsigned char*)wptr, wr_cnt);
if (rem)
{
+ _txn_tail._checksum = checksum.getChecksum();
wsize = rem >= sizeof(_txn_tail) ? sizeof(_txn_tail) : rem;
std::memcpy((char*)wptr + wr_cnt, (void*)&_txn_tail, wsize);
wr_cnt += wsize;
@@ -154,6 +162,8 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
{
std::memcpy((char*)wptr + wr_cnt, _xidp, _txn_hdr._xidsize);
wr_cnt += _txn_hdr._xidsize;
+ checksum.addData((unsigned char*)wptr, wr_cnt);
+ _txn_tail._checksum = checksum.getChecksum();
std::memcpy((char*)wptr + wr_cnt, (void*)&_txn_tail, sizeof(_txn_tail));
wr_cnt += sizeof(_txn_tail);
#ifdef QLS_CLEAN
@@ -168,14 +178,12 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks)
bool
txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
{
- uint32_t checksum = 0UL; // TODO: Add checksum math
if (rec_offs == 0)
{
// Read header, allocate for xid
- //_txn_hdr.hdr_copy(h);
::rec_hdr_copy(&_txn_hdr._rhdr, &h);
ifsp->read((char*)&_txn_hdr._xidsize, sizeof(_txn_hdr._xidsize));
- rec_offs = sizeof(txn_hdr_t);
+ rec_offs = sizeof(::txn_hdr_t);
_buff = std::malloc(_txn_hdr._xidsize);
MALLOC_CHK(_buff, "_buff", "txn_rec", "rcv_decode");
}
@@ -216,14 +224,19 @@ txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
throw jexception(jerrno::JERR_JREC_BADRECTAIL); // TODO: complete exception detail
}
assert(!ifsp->fail() && !ifsp->bad());
- int res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, checksum);
+ assert(_txn_hdr._xidsize > 0);
+ Checksum checksum;
+ checksum.addData((unsigned char*)&_txn_hdr, sizeof(_txn_hdr));
+ checksum.addData((unsigned char*)_buff, _txn_hdr._xidsize);
+ uint32_t cs = checksum.getChecksum();
+ int res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, cs);
if (res != 0) {
std::stringstream oss;
switch (res) {
case 1: oss << std::hex << "Magic: expected 0x" << ~_txn_hdr._rhdr._magic << "; found 0x" << _txn_tail._xmagic; break;
case 2: oss << std::hex << "Serial: expected 0x" << _txn_hdr._rhdr._serial << "; found 0x" << _txn_tail._serial; break;
case 3: oss << std::hex << "Record Id: expected 0x" << _txn_hdr._rhdr._rid << "; found 0x" << _txn_tail._rid; break;
- case 4: oss << std::hex << "Checksum: expected 0x" << checksum << "; found 0x" << _txn_tail._checksum; break;
+ case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 0x" << _txn_tail._checksum; break;
default: oss << "Unknown error " << res;
}
throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "txn_rec", "decode"); // TODO: Don't throw exception, log info
diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h
index 29f52ec46a..a9224a4a01 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h
@@ -48,7 +48,7 @@ public:
void reset(const bool commitFlag, const uint64_t serial, const uint64_t rid, const void* const xidp,
const std::size_t xidlen);
- uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks);
+ uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum);
bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs);
std::size_t get_xid(void** const xidpp);
diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
index e308f4ab06..6eaa8835be 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
@@ -23,6 +23,7 @@
#include <cassert>
#include "qpid/linearstore/journal/aio_callback.h"
+#include "qpid/linearstore/journal/Checksum.h"
#include "qpid/linearstore/journal/data_tok.h"
#include "qpid/linearstore/journal/jcntl.h"
#include "qpid/linearstore/journal/JournalFile.h"
@@ -158,6 +159,7 @@ wmgr::enqueue(const void* const data_buff,
}
//std::cout << "---+++ wmgr::enqueue() ENQ rid=0x" << std::hex << rid << " po=0x" << _pg_offset_dblks << " cs=0x" << (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) << " " << std::dec << std::flush; // DEBUG
bool done = false;
+ Checksum checksum;
while (!done)
{
//std::cout << "*" << std::flush; // DEBUG
@@ -165,7 +167,7 @@ wmgr::enqueue(const void* const data_buff,
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES);
uint32_t data_offs_dblks = dtokp->dblocks_written();
uint32_t ret = _enq_rec.encode(wptr, data_offs_dblks,
- (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
+ (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks, checksum);
// Remember fid which contains the record header in case record is split over several files
if (data_offs_dblks == 0) {
@@ -278,6 +280,7 @@ wmgr::dequeue(data_tok* dtokp,
}
//std::cout << "---+++ wmgr::dequeue() DEQ rid=0x" << std::hex << rid << " drid=0x" << dequeue_rid << " " << std::dec << std::flush; // DEBUG
bool done = false;
+ Checksum checksum;
while (!done)
{
//std::cout << "*" << std::flush; // DEBUG
@@ -285,7 +288,7 @@ wmgr::dequeue(data_tok* dtokp,
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES);
uint32_t data_offs_dblks = dtokp->dblocks_written();
uint32_t ret = _deq_rec.encode(wptr, data_offs_dblks,
- (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
+ (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks, checksum);
// Remember fid which contains the record header in case record is split over several files
if (data_offs_dblks == 0) {
@@ -396,13 +399,14 @@ wmgr::abort(data_tok* dtokp,
_abort_busy = true;
}
bool done = false;
+ Checksum checksum;
while (!done)
{
assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS);
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES);
uint32_t data_offs_dblks = dtokp->dblocks_written();
uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
- (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
+ (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks, checksum);
// Remember fid which contains the record header in case record is split over several files
if (data_offs_dblks == 0)
@@ -494,13 +498,14 @@ wmgr::commit(data_tok* dtokp,
_commit_busy = true;
}
bool done = false;
+ Checksum checksum;
while (!done)
{
assert(_pg_offset_dblks < _cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS);
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * QLS_DBLK_SIZE_BYTES);
uint32_t data_offs_dblks = dtokp->dblocks_written();
uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
- (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks);
+ (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks, checksum);
// Remember fid which contains the record header in case record is split over several files
if (data_offs_dblks == 0)