diff options
author | unknown <pekka@mysql.com> | 2004-08-30 15:31:30 +0200 |
---|---|---|
committer | unknown <pekka@mysql.com> | 2004-08-30 15:31:30 +0200 |
commit | 9ed92fea50cd8e6688a4370905050dc3cadfb771 (patch) | |
tree | 3e2eba3bf1b2470e77a1106cd195616a5b61cc88 | |
parent | 42a7e96adb650e1db4da0049c06bbf140c6728da (diff) | |
parent | 47eb0b464c03f18c3937a2bf5c67fb29a3d96d6e (diff) | |
download | mariadb-git-9ed92fea50cd8e6688a4370905050dc3cadfb771.tar.gz |
Merge pnousiainen@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb
into mysql.com:/space/pekka/ndb/version/my41-tux
ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp:
Auto merged
ndb/src/ndbapi/NdbScanOperation.cpp:
Auto merged
-rw-r--r-- | ndb/include/util/NdbSqlUtil.hpp | 337 | ||||
-rw-r--r-- | ndb/src/common/util/NdbSqlUtil.cpp | 269 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp | 32 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp | 2 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/Times.txt | 12 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbScanOperation.cpp | 4 | ||||
-rw-r--r-- | ndb/test/ndbapi/testOIBasic.cpp | 31 |
7 files changed, 307 insertions, 380 deletions
diff --git a/ndb/include/util/NdbSqlUtil.hpp b/ndb/include/util/NdbSqlUtil.hpp index a79245868e0..1d3e96d5c7e 100644 --- a/ndb/include/util/NdbSqlUtil.hpp +++ b/ndb/include/util/NdbSqlUtil.hpp @@ -40,8 +40,9 @@ public: * Compare kernel attribute values. Returns -1, 0, +1 for less, * equal, greater, respectively. Parameters are pointers to values, * full attribute size in words, and size of available data in words. - * There are 2 special return values to check first. All values fit - * into a signed char. + * If available size is less than full size, CmpUnknown may be + * returned. If a value cannot be parsed, it compares like NULL i.e. + * less than any valid value. */ typedef int Cmp(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size); @@ -49,8 +50,7 @@ public: CmpLess = -1, CmpEqual = 0, CmpGreater = 1, - CmpUnknown = 126, // insufficient partial data - CmpError = 127 // bad data format or unimplemented comparison + CmpUnknown = 2 // insufficient partial data }; /** @@ -82,19 +82,13 @@ public: Text // Text blob }; Enum m_typeId; - Cmp* m_cmp; // set to NULL if cmp not implemented + Cmp* m_cmp; // comparison method }; /** * Get type by id. Can return the Undefined type. */ - static const Type& type(Uint32 typeId); - - /** - * Inline comparison method. Most or all real methods Type::m_cmp are - * implemented via this (trusting dead code elimination). - */ - static int cmp(Uint32 typeId, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size); + static const Type& getType(Uint32 typeId); private: /** @@ -127,323 +121,4 @@ private: static Cmp cmpText; }; -inline int -NdbSqlUtil::cmp(Uint32 typeId, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) -{ - // XXX require size >= 1 - if (size > full) - return CmpError; - switch ((Type::Enum)typeId) { - case Type::Undefined: - break; - case Type::Tinyint: - { - if (size >= 1) { - union { Uint32 p[1]; Int8 v; } u1, u2; - u1.p[0] = p1[0]; - u2.p[0] = p2[0]; - if (u1.v < u2.v) - return -1; - if (u1.v > u2.v) - return +1; - return 0; - } - return CmpUnknown; - } - case Type::Tinyunsigned: - { - if (size >= 1) { - union { Uint32 p[1]; Uint8 v; } u1, u2; - u1.p[0] = p1[0]; - u2.p[0] = p2[0]; - if (u1.v < u2.v) - return -1; - if (u1.v > u2.v) - return +1; - return 0; - } - return CmpUnknown; - } - case Type::Smallint: - { - if (size >= 1) { - union { Uint32 p[1]; Int16 v; } u1, u2; - u1.p[0] = p1[0]; - u2.p[0] = p2[0]; - if (u1.v < u2.v) - return -1; - if (u1.v > u2.v) - return +1; - return 0; - } - return CmpUnknown; - } - case Type::Smallunsigned: - { - if (size >= 1) { - union { Uint32 p[1]; Uint16 v; } u1, u2; - u1.p[0] = p1[0]; - u2.p[0] = p2[0]; - if (u1.v < u2.v) - return -1; - if (u1.v > u2.v) - return +1; - return 0; - } - return CmpUnknown; - } - case Type::Mediumint: - { - if (size >= 1) { - union { const Uint32* p; const unsigned char* v; } u1, u2; - u1.p = p1; - u2.p = p2; - Int32 v1 = sint3korr(u1.v); - Int32 v2 = sint3korr(u2.v); - if (v1 < v2) - return -1; - if (v1 > v2) - return +1; - return 0; - } - return CmpUnknown; - } - case Type::Mediumunsigned: - { - if (size >= 1) { - union { const Uint32* p; const unsigned char* v; } u1, u2; - u1.p = p1; - u2.p = p2; - Uint32 v1 = uint3korr(u1.v); - Uint32 v2 = uint3korr(u2.v); - if (v1 < v2) - return -1; - if (v1 > v2) - return +1; - return 0; - } - return CmpUnknown; - } - case Type::Int: - { - if (size >= 1) { - union { Uint32 p[1]; Int32 v; } u1, u2; - u1.p[0] = p1[0]; - u2.p[0] = p2[0]; - if (u1.v < u2.v) - return -1; - if (u1.v > u2.v) - return +1; - return 0; - } - return CmpUnknown; - } - case Type::Unsigned: - { - if (size >= 1) { - union { Uint32 p[1]; Uint32 v; } u1, u2; - u1.v = p1[0]; - u2.v = p2[0]; - if (u1.v < u2.v) - return -1; - if (u1.v > u2.v) - return +1; - return 0; - } - return CmpUnknown; - } - case Type::Bigint: - { - if (size >= 2) { - union { Uint32 p[2]; Int64 v; } u1, u2; - u1.p[0] = p1[0]; - u1.p[1] = p1[1]; - u2.p[0] = p2[0]; - u2.p[1] = p2[1]; - if (u1.v < u2.v) - return -1; - if (u1.v > u2.v) - return +1; - return 0; - } - return CmpUnknown; - } - case Type::Bigunsigned: - { - if (size >= 2) { - union { Uint32 p[2]; Uint64 v; } u1, u2; - u1.p[0] = p1[0]; - u1.p[1] = p1[1]; - u2.p[0] = p2[0]; - u2.p[1] = p2[1]; - if (u1.v < u2.v) - return -1; - if (u1.v > u2.v) - return +1; - return 0; - } - return CmpUnknown; - } - case Type::Float: - { - if (size >= 1) { - union { Uint32 p[1]; float v; } u1, u2; - u1.p[0] = p1[0]; - u2.p[0] = p2[0]; - if (u1.v < u2.v) - return -1; - if (u1.v > u2.v) - return +1; - return 0; - } - return CmpUnknown; - } - case Type::Double: - { - if (size >= 2) { - union { Uint32 p[2]; double v; } u1, u2; - u1.p[0] = p1[0]; - u1.p[1] = p1[1]; - u2.p[0] = p2[0]; - u2.p[1] = p2[1]; - if (u1.v < u2.v) - return -1; - if (u1.v > u2.v) - return +1; - return 0; - } - return CmpUnknown; - } - case Type::Decimal: - // XXX not used by MySQL or NDB - break; - case Type::Char: - { - /* - * Char is blank-padded to length and null-padded to word size. - * There is no terminator so we must compare the full values. - */ - union { const Uint32* p; const char* v; } u1, u2; - u1.p = p1; - u2.p = p2; - int k = memcmp(u1.v, u2.v, size << 2); - return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown; - } - case Type::Varchar: - { - /* - * Varchar is not allowed to contain a null byte and the stored - * value is null-padded. Therefore comparison does not need to - * use the length. - */ - if (size >= 1) { - union { const Uint32* p; const char* v; } u1, u2; - u1.p = p1; - u2.p = p2; - // length in first 2 bytes - int k = strncmp(u1.v + 2, u2.v + 2, (size << 2) - 2); - return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown; - } - return CmpUnknown; - } - case Type::Binary: - { - // compare byte wise - union { const Uint32* p; const char* v; } u1, u2; - u1.p = p1; - u2.p = p2; - int k = memcmp(u1.v, u2.v, size << 2); - return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown; - } - case Type::Varbinary: - { - // assume correctly padded and compare byte wise - if (size >= 1) { - union { const Uint32* p; const char* v; } u1, u2; - u1.p = p1; - u2.p = p2; - // length in first 2 bytes - int k = memcmp(u1.v + 2, u2.v + 2, (size << 2) - 2); - return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown; - } - return CmpUnknown; - } - case Type::Datetime: - { - /* - * Datetime is CC YY MM DD hh mm ss \0 - */ - if (size >= 1) { - union { const Uint32* p; const char* v; } u1, u2; - u1.p = p1; - u2.p = p2; - // skip format check - int k = memcmp(u1.v, u2.v, 4); - if (k != 0) - return k < 0 ? -1 : +1; - if (size >= 2) { - k = memcmp(u1.v + 4, u2.v + 4, 4); - return k < 0 ? -1 : k > 0 ? +1 : 0; - } - } - return CmpUnknown; - } - case Type::Timespec: - { - /* - * Timespec is CC YY MM DD hh mm ss \0 NN NN NN NN - */ - if (size >= 1) { - union { const Uint32* p; const char* v; } u1, u2; - u1.p = p1; - u2.p = p2; - // skip format check - int k = memcmp(u1.v, u2.v, 4); - if (k != 0) - return k < 0 ? -1 : +1; - if (size >= 2) { - k = memcmp(u1.v + 4, u2.v + 4, 4); - if (k != 0) - return k < 0 ? -1 : +1; - Uint32 n1 = *(const Uint32*)(u1.v + 8); - Uint32 n2 = *(const Uint32*)(u2.v + 8); - if (n1 < n2) - return -1; - if (n2 > n1) - return +1; - return 0; - } - } - return CmpUnknown; - } - case Type::Blob: - { - // skip blob head, the rest is binary - const unsigned skip = NDB_BLOB_HEAD_SIZE; - if (size >= skip + 1) { - union { const Uint32* p; const char* v; } u1, u2; - u1.p = p1 + skip; - u2.p = p2 + skip; - int k = memcmp(u1.v, u2.v, (size - 1) << 2); - return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown; - } - return CmpUnknown; - } - case Type::Text: - { - // skip blob head, the rest is char - const unsigned skip = NDB_BLOB_HEAD_SIZE; - if (size >= skip + 1) { - union { const Uint32* p; const char* v; } u1, u2; - u1.p = p1 + skip; - u2.p = p2 + skip; - int k = memcmp(u1.v, u2.v, (size - 1) << 2); - return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown; - } - return CmpUnknown; - } - } - return CmpError; -} - #endif diff --git a/ndb/src/common/util/NdbSqlUtil.cpp b/ndb/src/common/util/NdbSqlUtil.cpp index 9d05fc7fb02..84a6f6e6c21 100644 --- a/ndb/src/common/util/NdbSqlUtil.cpp +++ b/ndb/src/common/util/NdbSqlUtil.cpp @@ -167,7 +167,7 @@ NdbSqlUtil::m_typeList[] = { }; const NdbSqlUtil::Type& -NdbSqlUtil::type(Uint32 typeId) +NdbSqlUtil::getType(Uint32 typeId) { if (typeId < sizeof(m_typeList) / sizeof(m_typeList[0]) && m_typeList[typeId].m_typeId != Type::Undefined) { @@ -181,127 +181,352 @@ NdbSqlUtil::type(Uint32 typeId) int NdbSqlUtil::cmpTinyint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Tinyint, p1, p2, full, size); + assert(full >= size && size > 0); + union { Uint32 p[1]; Int8 v; } u1, u2; + u1.p[0] = p1[0]; + u2.p[0] = p2[0]; + if (u1.v < u2.v) + return -1; + if (u1.v > u2.v) + return +1; + return 0; } int NdbSqlUtil::cmpTinyunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Tinyunsigned, p1, p2, full, size); + assert(full >= size && size > 0); + union { Uint32 p[1]; Uint8 v; } u1, u2; + u1.p[0] = p1[0]; + u2.p[0] = p2[0]; + if (u1.v < u2.v) + return -1; + if (u1.v > u2.v) + return +1; + return 0; } int NdbSqlUtil::cmpSmallint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Smallint, p1, p2, full, size); + assert(full >= size && size > 0); + union { Uint32 p[1]; Int16 v; } u1, u2; + u1.p[0] = p1[0]; + u2.p[0] = p2[0]; + if (u1.v < u2.v) + return -1; + if (u1.v > u2.v) + return +1; + return 0; } int NdbSqlUtil::cmpSmallunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Smallunsigned, p1, p2, full, size); + assert(full >= size && size > 0); + union { Uint32 p[1]; Uint16 v; } u1, u2; + u1.p[0] = p1[0]; + u2.p[0] = p2[0]; + if (u1.v < u2.v) + return -1; + if (u1.v > u2.v) + return +1; + return 0; } int NdbSqlUtil::cmpMediumint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Mediumint, p1, p2, full, size); + assert(full >= size && size > 0); + union { const Uint32* p; const unsigned char* v; } u1, u2; + u1.p = p1; + u2.p = p2; + Int32 v1 = sint3korr(u1.v); + Int32 v2 = sint3korr(u2.v); + if (v1 < v2) + return -1; + if (v1 > v2) + return +1; + return 0; } int NdbSqlUtil::cmpMediumunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Mediumunsigned, p1, p2, full, size); + assert(full >= size && size > 0); + union { const Uint32* p; const unsigned char* v; } u1, u2; + u1.p = p1; + u2.p = p2; + Uint32 v1 = uint3korr(u1.v); + Uint32 v2 = uint3korr(u2.v); + if (v1 < v2) + return -1; + if (v1 > v2) + return +1; + return 0; } int NdbSqlUtil::cmpInt(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Int, p1, p2, full, size); + assert(full >= size && size > 0); + union { Uint32 p[1]; Int32 v; } u1, u2; + u1.p[0] = p1[0]; + u2.p[0] = p2[0]; + if (u1.v < u2.v) + return -1; + if (u1.v > u2.v) + return +1; + return 0; } int NdbSqlUtil::cmpUnsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Unsigned, p1, p2, full, size); + assert(full >= size && size > 0); + union { Uint32 p[1]; Uint32 v; } u1, u2; + u1.v = p1[0]; + u2.v = p2[0]; + if (u1.v < u2.v) + return -1; + if (u1.v > u2.v) + return +1; + return 0; } int NdbSqlUtil::cmpBigint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Bigint, p1, p2, full, size); + assert(full >= size && size > 0); + if (size >= 2) { + union { Uint32 p[2]; Int64 v; } u1, u2; + u1.p[0] = p1[0]; + u1.p[1] = p1[1]; + u2.p[0] = p2[0]; + u2.p[1] = p2[1]; + if (u1.v < u2.v) + return -1; + if (u1.v > u2.v) + return +1; + return 0; + } + return CmpUnknown; } int NdbSqlUtil::cmpBigunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Bigunsigned, p1, p2, full, size); + assert(full >= size && size > 0); + if (size >= 2) { + union { Uint32 p[2]; Uint64 v; } u1, u2; + u1.p[0] = p1[0]; + u1.p[1] = p1[1]; + u2.p[0] = p2[0]; + u2.p[1] = p2[1]; + if (u1.v < u2.v) + return -1; + if (u1.v > u2.v) + return +1; + return 0; + } + return CmpUnknown; } int NdbSqlUtil::cmpFloat(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Float, p1, p2, full, size); + assert(full >= size && size > 0); + union { Uint32 p[1]; float v; } u1, u2; + u1.p[0] = p1[0]; + u2.p[0] = p2[0]; + // no format check + if (u1.v < u2.v) + return -1; + if (u1.v > u2.v) + return +1; + return 0; } int NdbSqlUtil::cmpDouble(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Double, p1, p2, full, size); + assert(full >= size && size > 0); + if (size >= 2) { + union { Uint32 p[2]; double v; } u1, u2; + u1.p[0] = p1[0]; + u1.p[1] = p1[1]; + u2.p[0] = p2[0]; + u2.p[1] = p2[1]; + // no format check + if (u1.v < u2.v) + return -1; + if (u1.v > u2.v) + return +1; + return 0; + } + return CmpUnknown; } int NdbSqlUtil::cmpDecimal(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Decimal, p1, p2, full, size); + assert(full >= size && size > 0); + // not used by MySQL or NDB + assert(false); + return 0; } int NdbSqlUtil::cmpChar(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Char, p1, p2, full, size); + assert(full >= size && size > 0); + /* + * Char is blank-padded to length and null-padded to word size. There + * is no terminator so we compare the full values. + */ + union { const Uint32* p; const char* v; } u1, u2; + u1.p = p1; + u2.p = p2; + int k = memcmp(u1.v, u2.v, size << 2); + return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown; } int NdbSqlUtil::cmpVarchar(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Varchar, p1, p2, full, size); + assert(full >= size && size > 0); + /* + * Varchar is not allowed to contain a null byte and the value is + * null-padded. Therefore comparison does not need to use the length. + */ + union { const Uint32* p; const char* v; } u1, u2; + u1.p = p1; + u2.p = p2; + // skip length in first 2 bytes + int k = strncmp(u1.v + 2, u2.v + 2, (size << 2) - 2); + return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown; } int NdbSqlUtil::cmpBinary(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Binary, p1, p2, full, size); + assert(full >= size && size > 0); + /* + * Binary data of full length. Compare bytewise. + */ + union { const Uint32* p; const unsigned char* v; } u1, u2; + u1.p = p1; + u2.p = p2; + int k = memcmp(u1.v, u2.v, size << 2); + return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown; } int NdbSqlUtil::cmpVarbinary(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Varbinary, p1, p2, full, size); + assert(full >= size && size > 0); + /* + * Binary data of variable length padded with nulls. The comparison + * does not need to use the length. + */ + union { const Uint32* p; const unsigned char* v; } u1, u2; + u1.p = p1; + u2.p = p2; + // skip length in first 2 bytes + int k = memcmp(u1.v + 2, u2.v + 2, (size << 2) - 2); + return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown; } int NdbSqlUtil::cmpDatetime(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Datetime, p1, p2, full, size); + assert(full >= size && size > 0); + /* + * Datetime is CC YY MM DD hh mm ss \0 + */ + union { const Uint32* p; const unsigned char* v; } u1, u2; + u1.p = p1; + u2.p = p2; + // no format check + int k = memcmp(u1.v, u2.v, 4); + if (k != 0) + return k < 0 ? -1 : +1; + if (size >= 2) { + k = memcmp(u1.v + 4, u2.v + 4, 4); + return k < 0 ? -1 : k > 0 ? +1 : 0; + } + return CmpUnknown; } int NdbSqlUtil::cmpTimespec(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Timespec, p1, p2, full, size); + assert(full >= size && size > 0); + /* + * Timespec is CC YY MM DD hh mm ss \0 NN NN NN NN + */ + union { const Uint32* p; const unsigned char* v; } u1, u2; + u1.p = p1; + u2.p = p2; + // no format check + int k = memcmp(u1.v, u2.v, 4); + if (k != 0) + return k < 0 ? -1 : +1; + if (size >= 2) { + k = memcmp(u1.v + 4, u2.v + 4, 4); + if (k != 0) + return k < 0 ? -1 : +1; + if (size >= 3) { + Uint32 n1 = *(const Uint32*)(u1.v + 8); + Uint32 n2 = *(const Uint32*)(u2.v + 8); + if (n1 < n2) + return -1; + if (n2 > n1) + return +1; + return 0; + } + } + return CmpUnknown; } int NdbSqlUtil::cmpBlob(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Blob, p1, p2, full, size); + assert(full >= size && size > 0); + /* + * Blob comparison is on the inline bytes. Except for larger header + * the format is like Varbinary. + */ + const unsigned head = NDB_BLOB_HEAD_SIZE; + // skip blob head + if (size >= head + 1) { + union { const Uint32* p; const unsigned char* v; } u1, u2; + u1.p = p1 + head; + u2.p = p2 + head; + int k = memcmp(u1.v, u2.v, (size - head) << 2); + return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown; + } + return CmpUnknown; } int NdbSqlUtil::cmpText(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - return cmp(Type::Text, p1, p2, full, size); + assert(full >= size && size > 0); + /* + * Text comparison is on the inline bytes. Except for larger header + * the format is like Varchar. + */ + const unsigned head = NDB_BLOB_HEAD_SIZE; + // skip blob head + if (size >= head + 1) { + union { const Uint32* p; const char* v; } u1, u2; + u1.p = p1 + head; + u2.p = p2 + head; + int k = memcmp(u1.v, u2.v, (size - head) << 2); + return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown; + } + return CmpUnknown; } #ifdef NDB_SQL_UTIL_TEST diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp index 1b8755a1dc4..debb5252386 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp @@ -35,7 +35,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons searchKey += start; int ret = 0; while (start < numAttrs) { - if (len2 < AttributeHeaderSize) { + if (len2 <= AttributeHeaderSize) { jam(); ret = NdbSqlUtil::CmpUnknown; break; @@ -46,7 +46,8 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons jam(); // current attribute const DescAttr& descAttr = descEnt.m_descAttr[start]; - const unsigned typeId = descAttr.m_typeId; + const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId); + ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined); // full data size const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc); ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize()); @@ -55,7 +56,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons // compare const Uint32* const p1 = *searchKey; const Uint32* const p2 = &entryData[AttributeHeaderSize]; - ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2); + ret = (*type.m_cmp)(p1, p2, size1, size2); if (ret != 0) { jam(); break; @@ -78,8 +79,6 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons entryData += AttributeHeaderSize + entryData.ah().getDataSize(); start++; } - // XXX until data format errors are handled - ndbrequire(ret != NdbSqlUtil::CmpError); return ret; } @@ -103,13 +102,14 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Tabl jam(); // current attribute const DescAttr& descAttr = descEnt.m_descAttr[start]; - const unsigned typeId = descAttr.m_typeId; + const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId); + ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined); // full data size const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc); // compare const Uint32* const p1 = *searchKey; const Uint32* const p2 = *entryKey; - ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size1); + ret = (*type.m_cmp)(p1, p2, size1, size1); if (ret != 0) { jam(); break; @@ -132,8 +132,6 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Tabl entryKey += 1; start++; } - // XXX until data format errors are handled - ndbrequire(ret != NdbSqlUtil::CmpError); return ret; } @@ -172,7 +170,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne */ unsigned type = 4; while (boundCount != 0) { - if (len2 < AttributeHeaderSize) { + if (len2 <= AttributeHeaderSize) { jam(); return NdbSqlUtil::CmpUnknown; } @@ -186,7 +184,8 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne // current attribute const unsigned index = boundInfo.ah().getAttributeId(); const DescAttr& descAttr = descEnt.m_descAttr[index]; - const unsigned typeId = descAttr.m_typeId; + const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId); + ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined); ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId); // full data size const unsigned size1 = boundInfo.ah().getDataSize(); @@ -196,9 +195,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne // compare const Uint32* const p1 = &boundInfo[AttributeHeaderSize]; const Uint32* const p2 = &entryData[AttributeHeaderSize]; - int ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2); - // XXX until data format errors are handled - ndbrequire(ret != NdbSqlUtil::CmpError); + int ret = (*type.m_cmp)(p1, p2, size1, size2); if (ret != 0) { jam(); return ret; @@ -269,15 +266,14 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne // current attribute const unsigned index = boundInfo.ah().getAttributeId(); const DescAttr& descAttr = descEnt.m_descAttr[index]; - const unsigned typeId = descAttr.m_typeId; + const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId); + ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined); // full data size const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc); // compare const Uint32* const p1 = &boundInfo[AttributeHeaderSize]; const Uint32* const p2 = *entryKey; - int ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size1); - // XXX until data format errors are handled - ndbrequire(ret != NdbSqlUtil::CmpError); + int ret = (*type.m_cmp)(p1, p2, size1, size1); if (ret != 0) { jam(); return ret; diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp index 803314ae52d..4bb3b940d91 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp @@ -184,7 +184,7 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal) } #endif // check if type is valid and has a comparison method - const NdbSqlUtil::Type& type = NdbSqlUtil::type(descAttr.m_typeId); + const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId); if (type.m_typeId == NdbSqlUtil::Type::Undefined || type.m_cmp == 0) { jam(); diff --git a/ndb/src/kernel/blocks/dbtux/Times.txt b/ndb/src/kernel/blocks/dbtux/Times.txt index c4744a23c07..84819ddcf97 100644 --- a/ndb/src/kernel/blocks/dbtux/Times.txt +++ b/ndb/src/kernel/blocks/dbtux/Times.txt @@ -28,6 +28,8 @@ d shows ms / 1000 rows for each and index time overhead samples 10% of all PKs (100,000 pk reads, 100,000 scans) +the "pct" values are from more accurate total times (not shown) + 040616 mc02/a 40 ms 87 ms 114 pct mc02/b 51 ms 128 ms 148 pct @@ -75,11 +77,19 @@ optim 13 mc02/a 40 ms 57 ms 42 pct mc02/d 170 ms 256 ms 50 pct after wl-1884 store all-NULL keys (the tests have pctnull=10 per column) -[ what happened to PK read performance? ] optim 13 mc02/a 39 ms 59 ms 50 pct mc02/b 47 ms 77 ms 61 pct mc02/c 9 ms 12 ms 44 pct mc02/d 246 ms 289 ms 17 pct +[ case d: what happened to PK read performance? ] + +optim 14 mc02/a 41 ms 60 ms 44 pct + mc02/b 46 ms 81 ms 73 pct + mc02/c 9 ms 13 ms 37 pct + mc02/d 242 ms 285 ms 17 pct + +[ case b: do long keys suffer from many subroutine calls? ] + vim: set et: diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index 0aa40f968bb..d6c744f8959 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -1224,10 +1224,10 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, Uint32 type = NdbColumnImpl::getImpl(* r1->m_column).m_extType; Uint32 size = (r1->theAttrSize * r1->theArraySize + 3) / 4; if(!r1_null){ - char r = NdbSqlUtil::cmp(type, d1, d2, size, size); + const NdbSqlUtil::Type& t = NdbSqlUtil::getType(type); + int r = (*t.m_cmp)(d1, d2, size, size); if(r){ assert(r != NdbSqlUtil::CmpUnknown); - assert(r != NdbSqlUtil::CmpError); return r; } } diff --git a/ndb/test/ndbapi/testOIBasic.cpp b/ndb/test/ndbapi/testOIBasic.cpp index c58dd8538e9..29d03f0c33e 100644 --- a/ndb/test/ndbapi/testOIBasic.cpp +++ b/ndb/test/ndbapi/testOIBasic.cpp @@ -34,6 +34,7 @@ struct Opt { // common options unsigned m_batch; + const char* m_bound; const char* m_case; bool m_core; bool m_dups; @@ -43,6 +44,7 @@ struct Opt { unsigned m_loop; bool m_nologging; bool m_msglock; + unsigned m_pctnull; unsigned m_rows; unsigned m_samples; unsigned m_scanrd; @@ -54,6 +56,7 @@ struct Opt { unsigned m_v; Opt() : m_batch(32), + m_bound("01234"), m_case(0), m_core(false), m_dups(false), @@ -63,6 +66,7 @@ struct Opt { m_loop(1), m_nologging(false), m_msglock(true), + m_pctnull(10), m_rows(1000), m_samples(0), m_scanrd(240), @@ -87,6 +91,7 @@ printhelp() ndbout << "usage: testOIbasic [options]" << endl << " -batch N pk operations in batch [" << d.m_batch << "]" << endl + << " -bound xyz use only these bound types 0-4 [" << d.m_bound << "]" << endl << " -case abc only given test cases (letters a-z)" << endl << " -core core dump on error [" << d.m_core << "]" << endl << " -dups allow duplicate tuples from index scan [" << d.m_dups << "]" << endl @@ -94,6 +99,7 @@ printhelp() << " -index xyz only given index numbers (digits 1-9)" << endl << " -loop N loop count full suite 0=forever [" << d.m_loop << "]" << endl << " -nologging create tables in no-logging mode" << endl + << " -pctnull N pct NULL values in nullable column [" << d.m_pctnull << "]" << endl << " -rows N rows per thread [" << d.m_rows << "]" << endl << " -samples N samples for some timings (0=all) [" << d.m_samples << "]" << endl << " -scanrd N scan read parallelism [" << d.m_scanrd << "]" << endl @@ -198,7 +204,6 @@ struct Par : public Opt { Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; } unsigned m_totrows; // value calculation - unsigned m_pctnull; unsigned m_range; unsigned m_pctrange; // do verify after read @@ -214,7 +219,6 @@ struct Par : public Opt { m_set(0), m_tmr(0), m_totrows(m_threads * m_rows), - m_pctnull(10), m_range(m_rows), m_pctrange(0), m_verify(false), @@ -1622,7 +1626,6 @@ Set::calc(Par par, unsigned i) m_row[i] = new Row(tab); Row& row = *m_row[i]; // value generation parameters - par.m_pctnull = 10; par.m_pctrange = 40; row.calc(par, i); } @@ -1898,8 +1901,11 @@ BSet::calc(Par par) BVal& bval = *new BVal(icol); m_bval[m_bvals++] = &bval; bval.m_null = false; - // equality bound only on i==0 - unsigned sel = urandom(5 - i); + unsigned sel; + do { + // equality bound only on i==0 + sel = urandom(5 - i); + } while (strchr(par.m_bound, '0' + sel) == 0); if (sel < 2) bval.m_type = 0 | (1 << i); else if (sel < 4) @@ -3207,6 +3213,15 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535) continue; } } + if (strcmp(arg, "-bound") == 0) { + if (++argv, --argc > 0) { + const char* p = argv[0]; + if (strlen(p) != 0 && strlen(p) == strspn(p, "01234")) { + g_opt.m_bound = strdup(p); + continue; + } + } + } if (strcmp(arg, "-case") == 0) { if (++argv, --argc > 0) { g_opt.m_case = strdup(argv[0]); @@ -3257,6 +3272,12 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535) g_opt.m_nologging = true; continue; } + if (strcmp(arg, "-pctnull") == 0) { + if (++argv, --argc > 0) { + g_opt.m_pctnull = atoi(argv[0]); + continue; + } + } if (strcmp(arg, "-rows") == 0) { if (++argv, --argc > 0) { g_opt.m_rows = atoi(argv[0]); |