diff options
author | pekka@mysql.com <> | 2004-09-17 14:18:32 +0200 |
---|---|---|
committer | pekka@mysql.com <> | 2004-09-17 14:18:32 +0200 |
commit | e274047c24a972b3774a69e80164fb847afbaaaf (patch) | |
tree | cdf1acfbaba13a23d58a84c53ed3803c6d801099 | |
parent | f21e6d81ff354f4f29bca1cf2b8d9e670ef47663 (diff) | |
parent | d02998e20f8f77c911b004644c32569adbb86c9e (diff) | |
download | mariadb-git-e274047c24a972b3774a69e80164fb847afbaaaf.tar.gz |
Merge pnousiainen@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb
into mysql.com:/space/pekka/ndb/version/my41-cc
36 files changed, 1066 insertions, 367 deletions
diff --git a/mysql-test/r/ndb_charset.result b/mysql-test/r/ndb_charset.result new file mode 100644 index 00000000000..93429a1fcb0 --- /dev/null +++ b/mysql-test/r/ndb_charset.result @@ -0,0 +1,191 @@ +drop table if exists t1; +create table t1 ( +a char(3) character set latin1 collate latin1_bin primary key +) engine=ndb; +insert into t1 values('aAa'); +insert into t1 values('aaa'); +insert into t1 values('AAA'); +select * from t1 order by a; +a +AAA +aAa +aaa +select * from t1 where a = 'aAa'; +a +aAa +select * from t1 where a = 'aaa'; +a +aaa +select * from t1 where a = 'AaA'; +a +select * from t1 where a = 'AAA'; +a +AAA +drop table t1; +create table t1 ( +a char(3) character set latin1 collate latin1_swedish_ci primary key +) engine=ndb; +insert into t1 values('aAa'); +insert into t1 values('aaa'); +ERROR 23000: Duplicate entry 'aaa' for key 1 +insert into t1 values('AAA'); +ERROR 23000: Duplicate entry 'AAA' for key 1 +select * from t1 order by a; +a +aAa +select * from t1 where a = 'aAa'; +a +aAa +select * from t1 where a = 'aaa'; +a +aAa +select * from t1 where a = 'AaA'; +a +aAa +select * from t1 where a = 'AAA'; +a +aAa +drop table t1; +create table t1 ( +p int primary key, +a char(3) character set latin1 collate latin1_bin not null, +unique key(a) +) engine=ndb; +insert into t1 values(1, 'aAa'); +insert into t1 values(2, 'aaa'); +insert into t1 values(3, 'AAA'); +select * from t1 order by p; +p a +1 aAa +2 aaa +3 AAA +select * from t1 where a = 'aAa'; +p a +1 aAa +select * from t1 where a = 'aaa'; +p a +2 aaa +select * from t1 where a = 'AaA'; +p a +select * from t1 where a = 'AAA'; +p a +3 AAA +drop table t1; +create table t1 ( +p int primary key, +a char(3) character set latin1 collate latin1_swedish_ci not null, +unique key(a) +) engine=ndb; +insert into t1 values(1, 'aAa'); +insert into t1 values(2, 'aaa'); +ERROR 23000: Can't write, because of unique constraint, to table 't1' +insert into t1 values(3, 'AAA'); +ERROR 23000: Can't write, because of unique constraint, to table 't1' +select * from t1 order by p; +p a +1 aAa +select * from t1 where a = 'aAa'; +p a +1 aAa +select * from t1 where a = 'aaa'; +p a +1 aAa +select * from t1 where a = 'AaA'; +p a +1 aAa +select * from t1 where a = 'AAA'; +p a +1 aAa +drop table t1; +create table t1 ( +p int primary key, +a char(3) character set latin1 collate latin1_bin not null, +index(a) +) engine=ndb; +insert into t1 values(1, 'aAa'); +insert into t1 values(2, 'aaa'); +insert into t1 values(3, 'AAA'); +insert into t1 values(4, 'aAa'); +insert into t1 values(5, 'aaa'); +insert into t1 values(6, 'AAA'); +select * from t1 order by p; +p a +1 aAa +2 aaa +3 AAA +4 aAa +5 aaa +6 AAA +explain select * from t1 where a = 'zZz' order by p; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE t1 ref a a 3 const 10 Using where; Using filesort +select * from t1 where a = 'aAa' order by p; +p a +1 aAa +4 aAa +select * from t1 where a = 'aaa' order by p; +p a +2 aaa +5 aaa +select * from t1 where a = 'AaA' order by p; +p a +select * from t1 where a = 'AAA' order by p; +p a +3 AAA +6 AAA +drop table t1; +create table t1 ( +p int primary key, +a char(3) character set latin1 collate latin1_swedish_ci not null, +index(a) +) engine=ndb; +insert into t1 values(1, 'aAa'); +insert into t1 values(2, 'aaa'); +insert into t1 values(3, 'AAA'); +insert into t1 values(4, 'aAa'); +insert into t1 values(5, 'aaa'); +insert into t1 values(6, 'AAA'); +select * from t1 order by p; +p a +1 aAa +2 aaa +3 AAA +4 aAa +5 aaa +6 AAA +explain select * from t1 where a = 'zZz' order by p; +id select_type table type possible_keys key key_len ref rows Extra +1 SIMPLE t1 ref a a 3 const 10 Using where; Using filesort +select * from t1 where a = 'aAa' order by p; +p a +1 aAa +2 aaa +3 AAA +4 aAa +5 aaa +6 AAA +select * from t1 where a = 'aaa' order by p; +p a +1 aAa +2 aaa +3 AAA +4 aAa +5 aaa +6 AAA +select * from t1 where a = 'AaA' order by p; +p a +1 aAa +2 aaa +3 AAA +4 aAa +5 aaa +6 AAA +select * from t1 where a = 'AAA' order by p; +p a +1 aAa +2 aaa +3 AAA +4 aAa +5 aaa +6 AAA +drop table t1; diff --git a/mysql-test/r/ndb_index.result b/mysql-test/r/ndb_index.result index dd92c237ace..5702552b0b5 100644 --- a/mysql-test/r/ndb_index.result +++ b/mysql-test/r/ndb_index.result @@ -4,7 +4,7 @@ PORT varchar(16) NOT NULL, ACCESSNODE varchar(16) NOT NULL, POP varchar(48) NOT NULL, ACCESSTYPE int unsigned NOT NULL, -CUSTOMER_ID varchar(20) NOT NULL, +CUSTOMER_ID varchar(20) collate latin1_bin NOT NULL, PROVIDER varchar(16), TEXPIRE int unsigned, NUM_IP int unsigned, diff --git a/mysql-test/t/ndb_charset.test b/mysql-test/t/ndb_charset.test new file mode 100644 index 00000000000..b9f28ed0faf --- /dev/null +++ b/mysql-test/t/ndb_charset.test @@ -0,0 +1,159 @@ +--source include/have_ndb.inc + +--disable_warnings +drop table if exists t1; +--enable_warnings + +# +# Minimal NDB charset test. +# + +# pk - binary + +create table t1 ( + a char(3) character set latin1 collate latin1_bin primary key +) engine=ndb; +# ok +insert into t1 values('aAa'); +insert into t1 values('aaa'); +insert into t1 values('AAA'); +# 3 +select * from t1 order by a; +# 1 +select * from t1 where a = 'aAa'; +# 1 +select * from t1 where a = 'aaa'; +# 0 +select * from t1 where a = 'AaA'; +# 1 +select * from t1 where a = 'AAA'; +drop table t1; + +# pk - case insensitive + +create table t1 ( + a char(3) character set latin1 collate latin1_swedish_ci primary key +) engine=ndb; +# ok +insert into t1 values('aAa'); +# fail +--error 1062 +insert into t1 values('aaa'); +--error 1062 +insert into t1 values('AAA'); +# 1 +select * from t1 order by a; +# 1 +select * from t1 where a = 'aAa'; +# 1 +select * from t1 where a = 'aaa'; +# 1 +select * from t1 where a = 'AaA'; +# 1 +select * from t1 where a = 'AAA'; +drop table t1; + +# unique hash index - binary + +create table t1 ( + p int primary key, + a char(3) character set latin1 collate latin1_bin not null, + unique key(a) +) engine=ndb; +# ok +insert into t1 values(1, 'aAa'); +insert into t1 values(2, 'aaa'); +insert into t1 values(3, 'AAA'); +# 3 +select * from t1 order by p; +# 1 +select * from t1 where a = 'aAa'; +# 1 +select * from t1 where a = 'aaa'; +# 0 +select * from t1 where a = 'AaA'; +# 1 +select * from t1 where a = 'AAA'; +drop table t1; + +# unique hash index - case insensitive + +create table t1 ( + p int primary key, + a char(3) character set latin1 collate latin1_swedish_ci not null, + unique key(a) +) engine=ndb; +# ok +insert into t1 values(1, 'aAa'); +# fail +--error 1169 +insert into t1 values(2, 'aaa'); +--error 1169 +insert into t1 values(3, 'AAA'); +# 1 +select * from t1 order by p; +# 1 +select * from t1 where a = 'aAa'; +# 1 +select * from t1 where a = 'aaa'; +# 1 +select * from t1 where a = 'AaA'; +# 1 +select * from t1 where a = 'AAA'; +drop table t1; + +# ordered index - binary + +create table t1 ( + p int primary key, + a char(3) character set latin1 collate latin1_bin not null, + index(a) +) engine=ndb; +# ok +insert into t1 values(1, 'aAa'); +insert into t1 values(2, 'aaa'); +insert into t1 values(3, 'AAA'); +insert into t1 values(4, 'aAa'); +insert into t1 values(5, 'aaa'); +insert into t1 values(6, 'AAA'); +# 6 +select * from t1 order by p; +# plan +explain select * from t1 where a = 'zZz' order by p; +# 2 +select * from t1 where a = 'aAa' order by p; +# 2 +select * from t1 where a = 'aaa' order by p; +# 0 +select * from t1 where a = 'AaA' order by p; +# 2 +select * from t1 where a = 'AAA' order by p; +drop table t1; + +# ordered index - case insensitive + +create table t1 ( + p int primary key, + a char(3) character set latin1 collate latin1_swedish_ci not null, + index(a) +) engine=ndb; +# ok +insert into t1 values(1, 'aAa'); +insert into t1 values(2, 'aaa'); +insert into t1 values(3, 'AAA'); +insert into t1 values(4, 'aAa'); +insert into t1 values(5, 'aaa'); +insert into t1 values(6, 'AAA'); +# 6 +select * from t1 order by p; +# plan +explain select * from t1 where a = 'zZz' order by p; +# 6 +select * from t1 where a = 'aAa' order by p; +# 6 +select * from t1 where a = 'aaa' order by p; +# 6 +select * from t1 where a = 'AaA' order by p; +# 6 +select * from t1 where a = 'AAA' order by p; +drop table t1; diff --git a/mysql-test/t/ndb_index.test b/mysql-test/t/ndb_index.test index d3977dc3ea4..e65b24a9b20 100644 --- a/mysql-test/t/ndb_index.test +++ b/mysql-test/t/ndb_index.test @@ -9,7 +9,7 @@ CREATE TABLE t1 ( ACCESSNODE varchar(16) NOT NULL, POP varchar(48) NOT NULL, ACCESSTYPE int unsigned NOT NULL, - CUSTOMER_ID varchar(20) NOT NULL, + CUSTOMER_ID varchar(20) collate latin1_bin NOT NULL, PROVIDER varchar(16), TEXPIRE int unsigned, NUM_IP int unsigned, diff --git a/ndb/include/kernel/signaldata/CreateTable.hpp b/ndb/include/kernel/signaldata/CreateTable.hpp index 424367f28d5..67e510d2ed0 100644 --- a/ndb/include/kernel/signaldata/CreateTable.hpp +++ b/ndb/include/kernel/signaldata/CreateTable.hpp @@ -89,7 +89,8 @@ public: ArraySizeTooBig = 737, RecordTooBig = 738, InvalidPrimaryKeySize = 739, - NullablePrimaryKey = 740 + NullablePrimaryKey = 740, + InvalidCharset = 743 }; private: diff --git a/ndb/include/kernel/signaldata/LqhFrag.hpp b/ndb/include/kernel/signaldata/LqhFrag.hpp index 116e9c01ca0..13dfafcc653 100644 --- a/ndb/include/kernel/signaldata/LqhFrag.hpp +++ b/ndb/include/kernel/signaldata/LqhFrag.hpp @@ -130,7 +130,7 @@ private: Uint32 keyLength; Uint32 nextLCP; Uint32 noOfKeyAttr; - Uint32 noOfNewAttr; + Uint32 noOfNewAttr; // noOfCharsets in upper half Uint32 checksumIndicator; Uint32 noOfAttributeGroups; Uint32 GCPIndicator; diff --git a/ndb/include/kernel/signaldata/TupFrag.hpp b/ndb/include/kernel/signaldata/TupFrag.hpp index c0ce22651aa..c1e861c5dff 100644 --- a/ndb/include/kernel/signaldata/TupFrag.hpp +++ b/ndb/include/kernel/signaldata/TupFrag.hpp @@ -119,12 +119,13 @@ class TupAddAttrReq { friend class Dblqh; friend class Dbtux; public: - STATIC_CONST( SignalLength = 4 ); + STATIC_CONST( SignalLength = 5 ); private: Uint32 tupConnectPtr; Uint32 notused1; Uint32 attrId; Uint32 attrDescriptor; + Uint32 extTypeInfo; }; class TupAddAttrConf { @@ -141,6 +142,10 @@ class TupAddAttrRef { friend class Dbtup; public: STATIC_CONST( SignalLength = 2 ); + enum ErrorCode { + NoError = 0, + InvalidCharset = 743 + }; private: Uint32 userPtr; Uint32 errorCode; @@ -178,7 +183,8 @@ public: STATIC_CONST( SignalLength = 2 ); enum ErrorCode { NoError = 0, - InvalidAttributeType = 831, + InvalidAttributeType = 742, + InvalidCharset = 743, InvalidNodeSize = 832 }; private: diff --git a/ndb/include/util/NdbSqlUtil.hpp b/ndb/include/util/NdbSqlUtil.hpp index df1cb716f93..3062d1e4e1b 100644 --- a/ndb/include/util/NdbSqlUtil.hpp +++ b/ndb/include/util/NdbSqlUtil.hpp @@ -40,11 +40,14 @@ public: * Compare kernel attribute values. Returns -1, 0, +1 for less, * equal, greater, respectively. Parameters are pointers to values, * full attribute size in words, and size of available data in words. + * There is also pointer to type specific extra info. Char types + * receive CHARSET_INFO in it. + * * If available size is less than full size, CmpUnknown may be * returned. If a value cannot be parsed, it compares like NULL i.e. * less than any valid value. */ - typedef int Cmp(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size); + typedef int Cmp(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size); enum CmpResult { CmpLess = -1, @@ -55,6 +58,7 @@ public: /** * Kernel data types. Must match m_typeList in NdbSqlUtil.cpp. + * Now also must match types in NdbDictionary. */ struct Type { enum Enum { @@ -91,6 +95,11 @@ public: static const Type& getType(Uint32 typeId); /** + * Get type by id but replace char type by corresponding binary type. + */ + static const Type& getTypeBinary(Uint32 typeId); + + /** * Check character set. */ static bool usable_in_pk(Uint32 typeId, const void* cs); diff --git a/ndb/src/common/util/NdbSqlUtil.cpp b/ndb/src/common/util/NdbSqlUtil.cpp index afb9bcfff62..6e4e5919e43 100644 --- a/ndb/src/common/util/NdbSqlUtil.cpp +++ b/ndb/src/common/util/NdbSqlUtil.cpp @@ -176,10 +176,29 @@ NdbSqlUtil::getType(Uint32 typeId) return m_typeList[Type::Undefined]; } +const NdbSqlUtil::Type& +NdbSqlUtil::getTypeBinary(Uint32 typeId) +{ + switch (typeId) { + case Type::Char: + typeId = Type::Binary; + break; + case Type::Varchar: + typeId = Type::Varbinary; + break; + case Type::Text: + typeId = Type::Blob; + break; + default: + break; + } + return getType(typeId); +} + // compare int -NdbSqlUtil::cmpTinyint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpTinyint(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { assert(full >= size && size > 0); union { Uint32 p[1]; Int8 v; } u1, u2; @@ -193,7 +212,7 @@ NdbSqlUtil::cmpTinyint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 s } int -NdbSqlUtil::cmpTinyunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpTinyunsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { assert(full >= size && size > 0); union { Uint32 p[1]; Uint8 v; } u1, u2; @@ -207,7 +226,7 @@ NdbSqlUtil::cmpTinyunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uin } int -NdbSqlUtil::cmpSmallint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpSmallint(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { assert(full >= size && size > 0); union { Uint32 p[1]; Int16 v; } u1, u2; @@ -221,7 +240,7 @@ NdbSqlUtil::cmpSmallint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 } int -NdbSqlUtil::cmpSmallunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpSmallunsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { assert(full >= size && size > 0); union { Uint32 p[1]; Uint16 v; } u1, u2; @@ -235,7 +254,7 @@ NdbSqlUtil::cmpSmallunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Ui } int -NdbSqlUtil::cmpMediumint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpMediumint(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { assert(full >= size && size > 0); union { const Uint32* p; const unsigned char* v; } u1, u2; @@ -251,7 +270,7 @@ NdbSqlUtil::cmpMediumint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 } int -NdbSqlUtil::cmpMediumunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpMediumunsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { assert(full >= size && size > 0); union { const Uint32* p; const unsigned char* v; } u1, u2; @@ -267,7 +286,7 @@ NdbSqlUtil::cmpMediumunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, U } int -NdbSqlUtil::cmpInt(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpInt(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { assert(full >= size && size > 0); union { Uint32 p[1]; Int32 v; } u1, u2; @@ -281,7 +300,7 @@ NdbSqlUtil::cmpInt(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) } int -NdbSqlUtil::cmpUnsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpUnsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { assert(full >= size && size > 0); union { Uint32 p[1]; Uint32 v; } u1, u2; @@ -295,7 +314,7 @@ NdbSqlUtil::cmpUnsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 } int -NdbSqlUtil::cmpBigint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpBigint(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { assert(full >= size && size > 0); if (size >= 2) { @@ -314,7 +333,7 @@ NdbSqlUtil::cmpBigint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 si } int -NdbSqlUtil::cmpBigunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpBigunsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { assert(full >= size && size > 0); if (size >= 2) { @@ -333,7 +352,7 @@ NdbSqlUtil::cmpBigunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint } int -NdbSqlUtil::cmpFloat(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpFloat(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { assert(full >= size && size > 0); union { Uint32 p[1]; float v; } u1, u2; @@ -348,7 +367,7 @@ NdbSqlUtil::cmpFloat(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 siz } int -NdbSqlUtil::cmpDouble(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpDouble(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { assert(full >= size && size > 0); if (size >= 2) { @@ -368,7 +387,7 @@ NdbSqlUtil::cmpDouble(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 si } int -NdbSqlUtil::cmpDecimal(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpDecimal(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { assert(full >= size && size > 0); // not used by MySQL or NDB @@ -377,27 +396,34 @@ NdbSqlUtil::cmpDecimal(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 s } int -NdbSqlUtil::cmpChar(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpChar(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - assert(full >= size && size > 0); + // collation does not work on prefix for some charsets + assert(full == size && size > 0); /* - * Char is blank-padded to length and null-padded to word size. There - * is no terminator so we compare the full values. + * Char is blank-padded to length and null-padded to word size. */ - union { const Uint32* p; const char* v; } u1, u2; + union { const Uint32* p; const uchar* v; } u1, u2; u1.p = p1; u2.p = p2; - int k = memcmp(u1.v, u2.v, size << 2); - return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown; + // not const in MySQL + CHARSET_INFO* cs = (CHARSET_INFO*)(info); + // length in bytes including null padding to Uint32 + uint l1 = (full << 2); + int k = (*cs->coll->strnncollsp)(cs, u1.v, l1, u2.v, l1); + return k < 0 ? -1 : k > 0 ? +1 : 0; } int -NdbSqlUtil::cmpVarchar(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpVarchar(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { assert(full >= size && size > 0); /* * Varchar is not allowed to contain a null byte and the value is * null-padded. Therefore comparison does not need to use the length. + * + * Not used before MySQL 5.0. Format is likely to change. Handle + * only binary collation for now. */ union { const Uint32* p; const char* v; } u1, u2; u1.p = p1; @@ -408,7 +434,7 @@ NdbSqlUtil::cmpVarchar(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 s } int -NdbSqlUtil::cmpBinary(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpBinary(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { assert(full >= size && size > 0); /* @@ -422,12 +448,14 @@ NdbSqlUtil::cmpBinary(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 si } int -NdbSqlUtil::cmpVarbinary(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpVarbinary(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { assert(full >= size && size > 0); /* * Binary data of variable length padded with nulls. The comparison * does not need to use the length. + * + * Not used before MySQL 5.0. Format is likely to change. */ union { const Uint32* p; const unsigned char* v; } u1, u2; u1.p = p1; @@ -438,11 +466,13 @@ NdbSqlUtil::cmpVarbinary(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 } int -NdbSqlUtil::cmpDatetime(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpDatetime(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { assert(full >= size && size > 0); /* * Datetime is CC YY MM DD hh mm ss \0 + * + * Not used via MySQL. */ union { const Uint32* p; const unsigned char* v; } u1, u2; u1.p = p1; @@ -459,11 +489,13 @@ NdbSqlUtil::cmpDatetime(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 } int -NdbSqlUtil::cmpTimespec(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpTimespec(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { assert(full >= size && size > 0); /* * Timespec is CC YY MM DD hh mm ss \0 NN NN NN NN + * + * Not used via MySQL. */ union { const Uint32* p; const unsigned char* v; } u1, u2; u1.p = p1; @@ -490,12 +522,11 @@ NdbSqlUtil::cmpTimespec(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 } int -NdbSqlUtil::cmpBlob(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpBlob(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { assert(full >= size && size > 0); /* - * Blob comparison is on the inline bytes. Except for larger header - * the format is like Varbinary. + * Blob comparison is on the inline bytes (null padded). */ const unsigned head = NDB_BLOB_HEAD_SIZE; // skip blob head @@ -510,21 +541,26 @@ NdbSqlUtil::cmpBlob(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size } int -NdbSqlUtil::cmpText(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) +NdbSqlUtil::cmpText(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size) { - assert(full >= size && size > 0); + // collation does not work on prefix for some charsets + assert(full == size && size > 0); /* - * Text comparison is on the inline bytes. Except for larger header - * the format is like Varchar. + * Text comparison is on the inline bytes (blank padded). Currently + * not supported for multi-byte charsets. */ const unsigned head = NDB_BLOB_HEAD_SIZE; // skip blob head if (size >= head + 1) { - union { const Uint32* p; const char* v; } u1, u2; + union { const Uint32* p; const uchar* v; } u1, u2; u1.p = p1 + head; u2.p = p2 + head; - int k = memcmp(u1.v, u2.v, (size - head) << 2); - return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown; + // not const in MySQL + CHARSET_INFO* cs = (CHARSET_INFO*)(info); + // length in bytes including null padding to Uint32 + uint l1 = (full << 2); + int k = (*cs->coll->strnncollsp)(cs, u1.v, l1, u2.v, l1); + return k < 0 ? -1 : k > 0 ? +1 : 0; } return CmpUnknown; } @@ -633,6 +669,7 @@ const Testcase testcase[] = { int main(int argc, char** argv) { + ndb_init(); // for charsets unsigned count = argc > 1 ? atoi(argv[1]) : 1000000; ndbout_c("count = %u", count); assert(count != 0); diff --git a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp index d82083684b7..4757f1d2bf3 100644 --- a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp +++ b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp @@ -15,6 +15,7 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include <ndb_global.h> +#include <my_sys.h> #define DBDICT_C #include "Dbdict.hpp" @@ -4100,6 +4101,8 @@ Dbdict::execADD_FRAGREQ(Signal* signal) { req->noOfKeyAttr = tabPtr.p->noOfPrimkey; req->noOfNewAttr = 0; + // noOfCharsets passed to TUP in upper half + req->noOfNewAttr |= (tabPtr.p->noOfCharsets << 16); req->checksumIndicator = 1; req->noOfAttributeGroups = 1; req->GCPIndicator = 0; @@ -4161,6 +4164,8 @@ Dbdict::sendLQHADDATTRREQ(Signal* signal, entry.attrId = attrPtr.p->attributeId; entry.attrDescriptor = attrPtr.p->attributeDescriptor; entry.extTypeInfo = attrPtr.p->extType; + // charset number passed to TUP, TUX in upper half + entry.extTypeInfo |= (attrPtr.p->extPrecision & ~0xFFFF); if (tabPtr.p->isIndex()) { Uint32 primaryAttrId; if (attrPtr.p->nextAttrInTable != RNIL) { @@ -4697,6 +4702,8 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it, Uint32 keyLength = 0; Uint32 attrCount = tablePtr.p->noOfAttributes; Uint32 nullCount = 0; + Uint32 noOfCharsets = 0; + Uint16 charsets[128]; Uint32 recordLength = 0; AttributeRecordPtr attrPtr; c_attributeRecordHash.removeAll(); @@ -4751,6 +4758,31 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it, attrPtr.p->extPrecision = attrDesc.AttributeExtPrecision; attrPtr.p->extScale = attrDesc.AttributeExtScale; attrPtr.p->extLength = attrDesc.AttributeExtLength; + // charset in upper half of precision + unsigned csNumber = (attrPtr.p->extPrecision >> 16); + if (csNumber != 0) { + CHARSET_INFO* cs = get_charset(csNumber, MYF(0)); + if (cs == NULL) { + parseP->errorCode = CreateTableRef::InvalidCharset; + parseP->errorLine = __LINE__; + return; + } + unsigned i = 0; + while (i < noOfCharsets) { + if (charsets[i] == csNumber) + break; + i++; + } + if (i == noOfCharsets) { + noOfCharsets++; + if (noOfCharsets > sizeof(charsets)/sizeof(charsets[0])) { + parseP->errorCode = CreateTableRef::InvalidFormat; + parseP->errorLine = __LINE__; + return; + } + charsets[i] = csNumber; + } + } /** * Ignore incoming old-style type and recompute it. @@ -4814,6 +4846,7 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it, tablePtr.p->noOfPrimkey = keyCount; tablePtr.p->noOfNullAttr = nullCount; + tablePtr.p->noOfCharsets = noOfCharsets; tablePtr.p->tupKeyLength = keyLength; tabRequire(recordLength<= MAX_TUPLE_SIZE_IN_WORDS, diff --git a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp index 5ddaa67a7d6..a94af7b59c8 100644 --- a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp +++ b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp @@ -455,7 +455,7 @@ public: Uint16 totalAttrReceived; Uint16 fragCopyCreation; Uint16 noOfKeyAttr; - Uint16 noOfNewAttr; + Uint32 noOfNewAttr; // noOfCharsets in upper half Uint16 noOfAttributeGroups; Uint16 lh3DistrBits; Uint16 tableType; diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index 3b05a133bbb..8342870d69c 100644 --- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -1444,6 +1444,7 @@ Dblqh::sendAddAttrReq(Signal* signal) tupreq->notused1 = 0; tupreq->attrId = attrId; tupreq->attrDescriptor = entry.attrDescriptor; + tupreq->extTypeInfo = entry.extTypeInfo; sendSignal(fragptr.p->tupBlockref, GSN_TUP_ADD_ATTRREQ, signal, TupAddAttrReq::SignalLength, JBB); return; @@ -7699,6 +7700,7 @@ void Dblqh::accScanConfScanLab(Signal* signal) ndbrequire(sz == boundAiLength); EXECUTE_DIRECT(DBTUX, GSN_TUX_BOUND_INFO, signal, TuxBoundInfo::SignalLength + boundAiLength); + jamEntry(); if (req->errorCode != 0) { jam(); /* diff --git a/ndb/src/kernel/blocks/dbtup/AttributeOffset.hpp b/ndb/src/kernel/blocks/dbtup/AttributeOffset.hpp index 0f3881e9024..2c62adab3e5 100644 --- a/ndb/src/kernel/blocks/dbtup/AttributeOffset.hpp +++ b/ndb/src/kernel/blocks/dbtup/AttributeOffset.hpp @@ -22,26 +22,59 @@ class AttributeOffset { private: static void setOffset(Uint32 & desc, Uint32 offset); + static void setCharsetPos(Uint32 & desc, Uint32 offset); static void setNullFlagPos(Uint32 & desc, Uint32 offset); static Uint32 getOffset(const Uint32 &); + static bool getCharsetFlag(const Uint32 &); + static Uint32 getCharsetPos(const Uint32 &); static Uint32 getNullFlagPos(const Uint32 &); static Uint32 getNullFlagOffset(const Uint32 &); static Uint32 getNullFlagBitOffset(const Uint32 &); static bool isNULL(const Uint32 &, const Uint32 &); }; -#define AO_ATTRIBUTE_OFFSET_MASK (0xffff) -#define AO_NULL_FLAG_POS_MASK (0x7ff) -#define AO_NULL_FLAG_POS_SHIFT (21) -#define AO_NULL_FLAG_WORD_MASK (31) -#define AO_NULL_FLAG_OFFSET_SHIFT (5) +/** + * Allow for 4096 attributes, all nullable, and for 128 different + * character sets. + * + * a = Attribute offset - 11 bits 0-10 ( addr word in 8 kb ) + * c = Has charset flag 1 bits 11-11 + * s = Charset pointer position - 7 bits 12-18 ( in table descriptor ) + * f = Null flag offset in word - 5 bits 20-24 ( address 32 bits ) + * w = Null word offset - 7 bits 25-32 ( f+w addr 4096 attrs ) + * + * 1111111111222222222233 + * 01234567890123456789012345678901 + * aaaaaaaaaaacsssssss fffffwwwwwww + */ + +#define AO_ATTRIBUTE_OFFSET_SHIFT 0 +#define AO_ATTRIBUTE_OFFSET_MASK 0x7ff + +#define AO_CHARSET_FLAG_SHIFT 11 +#define AO_CHARSET_POS_SHIFT 12 +#define AO_CHARSET_POS_MASK 127 + +#define AO_NULL_FLAG_POS_MASK 0xfff // f+w +#define AO_NULL_FLAG_POS_SHIFT 20 + +#define AO_NULL_FLAG_WORD_MASK 31 // f +#define AO_NULL_FLAG_OFFSET_SHIFT 5 inline void AttributeOffset::setOffset(Uint32 & desc, Uint32 offset){ ASSERT_MAX(offset, AO_ATTRIBUTE_OFFSET_MASK, "AttributeOffset::setOffset"); - desc |= offset; + desc |= (offset << AO_ATTRIBUTE_OFFSET_SHIFT); +} + +inline +void +AttributeOffset::setCharsetPos(Uint32 & desc, Uint32 offset) { + ASSERT_MAX(offset, AO_CHARSET_POS_MASK, "AttributeOffset::setCharsetPos"); + desc |= (1 << AO_CHARSET_FLAG_SHIFT); + desc |= (offset << AO_CHARSET_POS_SHIFT); } inline @@ -55,7 +88,21 @@ inline Uint32 AttributeOffset::getOffset(const Uint32 & desc) { - return desc & AO_ATTRIBUTE_OFFSET_MASK; + return (desc >> AO_ATTRIBUTE_OFFSET_SHIFT) & AO_ATTRIBUTE_OFFSET_MASK; +} + +inline +bool +AttributeOffset::getCharsetFlag(const Uint32 & desc) +{ + return (desc >> AO_CHARSET_FLAG_SHIFT) & 1; +} + +inline +Uint32 +AttributeOffset::getCharsetPos(const Uint32 & desc) +{ + return (desc >> AO_CHARSET_POS_SHIFT) & AO_CHARSET_POS_MASK; } inline diff --git a/ndb/src/kernel/blocks/dbtup/Dbtup.hpp b/ndb/src/kernel/blocks/dbtup/Dbtup.hpp index cb7e35ea73e..ce81c1c9bc8 100644 --- a/ndb/src/kernel/blocks/dbtup/Dbtup.hpp +++ b/ndb/src/kernel/blocks/dbtup/Dbtup.hpp @@ -502,6 +502,7 @@ struct Fragoperrec { Uint32 attributeCount; Uint32 freeNullBit; Uint32 noOfNewAttrCount; + Uint32 charsetIndex; BlockReference lqhBlockrefFrag; }; typedef Ptr<Fragoperrec> FragoperrecPtr; @@ -785,6 +786,7 @@ struct Tablerec { ReadFunction* readFunctionArray; UpdateFunction* updateFunctionArray; + CHARSET_INFO** charsetArray; Uint32 readKeyArray; Uint32 tabDescriptor; @@ -796,6 +798,7 @@ struct Tablerec { Uint16 tupheadsize; Uint16 noOfAttr; Uint16 noOfKeyAttr; + Uint16 noOfCharsets; Uint16 noOfNewAttr; Uint16 noOfNullAttr; Uint16 noOfAttributeGroups; @@ -1001,17 +1004,20 @@ public: void tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node); /* - * TUX reads primary table attributes for index keys. Input is - * attribute ids in AttributeHeader format. Output is pointers to - * attribute data within tuple or 0 for NULL value. + * TUX reads primary table attributes for index keys. Tuple is + * specified by location of original tuple and version number. Input + * is attribute ids in AttributeHeader format. Output is attribute + * data with headers. Uses readAttributes with xfrm option set. + * Returns number of words or negative (-terrorCode) on error. */ - void tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, Uint32 numAttrs, const Uint32* attrIds, const Uint32** attrData); + int tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, const Uint32* attrIds, Uint32 numAttrs, Uint32* dataOut); /* * TUX reads primary key without headers into an array of words. Used - * for md5 summing and when returning keyinfo. + * for md5 summing and when returning keyinfo. Returns number of + * words or negative (-terrorCode) on error. */ - void tuxReadKeys(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* pkSize, Uint32* pkData); + int tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* dataOut); /* * TUX checks if tuple is visible to scan. @@ -1365,10 +1371,11 @@ private: //------------------------------------------------------------------ int readAttributes(Page* const pagePtr, Uint32 TupHeadOffset, - Uint32* inBuffer, + const Uint32* inBuffer, Uint32 inBufLen, Uint32* outBuffer, - Uint32 TmaxRead); + Uint32 TmaxRead, + bool xfrmFlag); //------------------------------------------------------------------ //------------------------------------------------------------------ @@ -1614,6 +1621,20 @@ private: Uint32 attrDescriptor, Uint32 attrDes2); +// ***************************************************************** +// Read char routines optionally (tXfrmFlag) apply strxfrm +// ***************************************************************** + + bool readCharNotNULL(Uint32* outBuffer, + AttributeHeader* ahOut, + Uint32 attrDescriptor, + Uint32 attrDes2); + + bool readCharNULLable(Uint32* outBuffer, + AttributeHeader* ahOut, + Uint32 attrDescriptor, + Uint32 attrDes2); + //------------------------------------------------------------------ //------------------------------------------------------------------ bool nullFlagCheck(Uint32 attrDes2); @@ -1909,7 +1930,8 @@ private: void updatePackedList(Signal* signal, Uint16 ahostIndex); void setUpDescriptorReferences(Uint32 descriptorReference, - Tablerec* const regTabPtr); + Tablerec* const regTabPtr, + const Uint32* offset); void setUpKeyArray(Tablerec* const regTabPtr); bool addfragtotab(Tablerec* const regTabPtr, Uint32 fragId, Uint32 fragIndex); void deleteFragTab(Tablerec* const regTabPtr, Uint32 fragId); @@ -2098,7 +2120,8 @@ private: //----------------------------------------------------------------------------- // Public methods - Uint32 allocTabDescr(Uint32 noOfAttributes, Uint32 noOfKeyAttr, Uint32 noOfAttributeGroups); + Uint32 getTabDescrOffsets(const Tablerec* regTabPtr, Uint32* offset); + Uint32 allocTabDescr(const Tablerec* regTabPtr, Uint32* offset); void freeTabDescr(Uint32 retRef, Uint32 retNo); Uint32 getTabDescrWord(Uint32 index); void setTabDescrWord(Uint32 index, Uint32 word); @@ -2217,6 +2240,7 @@ private: Uint32 tMaxRead; Uint32 tOutBufIndex; Uint32* tTupleHeader; + bool tXfrmFlag; // updateAttributes module Uint32 tInBufIndex; diff --git a/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp b/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp index 0a47778f7c1..dfd1e37d4f5 100644 --- a/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp +++ b/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp @@ -903,7 +903,8 @@ int Dbtup::handleReadReq(Signal* signal, &cinBuffer[0], regOperPtr->attrinbufLen, dst, - dstLen); + dstLen, + false); if (TnoOfDataRead != (Uint32)-1) { /* ------------------------------------------------------------------------- */ // We have read all data into coutBuffer. Now send it to the API. @@ -1274,7 +1275,8 @@ int Dbtup::interpreterStartLab(Signal* signal, &cinBuffer[5], RinitReadLen, &dst[0], - dstLen); + dstLen, + false); if (TnoDataRW != (Uint32)-1) { RattroutCounter = TnoDataRW; RinstructionCounter += RinitReadLen; @@ -1347,7 +1349,8 @@ int Dbtup::interpreterStartLab(Signal* signal, &cinBuffer[RinstructionCounter], RfinalRLen, &dst[RattroutCounter], - (dstLen - RattroutCounter)); + (dstLen - RattroutCounter), + false); if (TnoDataRW != (Uint32)-1) { RattroutCounter += TnoDataRW; } else { @@ -1487,7 +1490,8 @@ int Dbtup::interpreterNextLab(Signal* signal, &theAttrinfo, (Uint32)1, &TregMemBuffer[theRegister], - (Uint32)3); + (Uint32)3, + false); if (TnoDataRW == 2) { /* ------------------------------------------------------------- */ // Two words read means that we get the instruction plus one 32 @@ -1833,7 +1837,8 @@ int Dbtup::interpreterNextLab(Signal* signal, Int32 TnoDataR = readAttributes(pagePtr, TupHeadOffset, &attrId, 1, - tmpArea, tmpAreaSz); + tmpArea, tmpAreaSz, + false); if (TnoDataR == -1) { jam(); @@ -1929,7 +1934,8 @@ int Dbtup::interpreterNextLab(Signal* signal, Int32 TnoDataR = readAttributes(pagePtr, TupHeadOffset, &attrId, 1, - tmpArea, tmpAreaSz); + tmpArea, tmpAreaSz, + false); if (TnoDataR == -1) { jam(); @@ -1957,7 +1963,8 @@ int Dbtup::interpreterNextLab(Signal* signal, Int32 TnoDataR = readAttributes(pagePtr, TupHeadOffset, &attrId, 1, - tmpArea, tmpAreaSz); + tmpArea, tmpAreaSz, + false); if (TnoDataR == -1) { jam(); diff --git a/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp b/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp index 1e57f127fbc..f3391ff7b59 100644 --- a/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp +++ b/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp @@ -1067,6 +1067,7 @@ Dbtup::initTab(Tablerec* const regTabPtr) }//for regTabPtr->readFunctionArray = NULL; regTabPtr->updateFunctionArray = NULL; + regTabPtr->charsetArray = NULL; regTabPtr->tabDescriptor = RNIL; regTabPtr->attributeGroupDescriptor = RNIL; diff --git a/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp b/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp index ec2c63c736e..2dd707ebafc 100644 --- a/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp +++ b/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp @@ -112,10 +112,11 @@ Dbtup::tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& no node = &pagePtr.p->pageWord[pageOffset] + attrDataOffset; } -void -Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, Uint32 numAttrs, const Uint32* attrIds, const Uint32** attrData) +int +Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, const Uint32* attrIds, Uint32 numAttrs, Uint32* dataOut) { ljamEntry(); + // use own variables instead of globals FragrecordPtr fragPtr; fragPtr.i = fragPtrI; ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord); @@ -134,6 +135,7 @@ Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tu while (true) { ptrCheckGuard(opPtr, cnoOfOprec, operationrec); if (opPtr.p->realPageIdC != RNIL) { + // update page and offset pagePtr.i = opPtr.p->realPageIdC; pageOffset = opPtr.p->pageOffsetC; ptrCheckGuard(pagePtr, cnoOfPage, page); @@ -147,33 +149,34 @@ Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tu ndbrequire(++loopGuard < (1 << ZTUP_VERSION_BITS)); } } - const Uint32 tabDescriptor = tablePtr.p->tabDescriptor; - const Uint32* tupleHeader = &pagePtr.p->pageWord[pageOffset]; - for (Uint32 i = 0; i < numAttrs; i++) { - AttributeHeader ah(attrIds[i]); - const Uint32 attrId = ah.getAttributeId(); - const Uint32 index = tabDescriptor + (attrId << ZAD_LOG_SIZE); - const Uint32 desc1 = tableDescriptor[index].tabDescr; - const Uint32 desc2 = tableDescriptor[index + 1].tabDescr; - if (AttributeDescriptor::getNullable(desc1)) { - Uint32 offset = AttributeOffset::getNullFlagOffset(desc2); - ndbrequire(offset < tablePtr.p->tupNullWords); - offset += tablePtr.p->tupNullIndex; - ndbrequire(offset < tablePtr.p->tupheadsize); - if (AttributeOffset::isNULL(tupleHeader[offset], desc2)) { - ljam(); - attrData[i] = 0; - continue; - } - } - attrData[i] = tupleHeader + AttributeOffset::getOffset(desc2); + // read key attributes from found tuple version + // save globals + TablerecPtr tabptr_old = tabptr; + FragrecordPtr fragptr_old = fragptr; + OperationrecPtr operPtr_old = operPtr; + // new globals + tabptr = tablePtr; + fragptr = fragPtr; + operPtr.i = RNIL; + operPtr.p = NULL; + // do it + int ret = readAttributes(pagePtr.p, pageOffset, attrIds, numAttrs, dataOut, ZNIL, true); + // restore globals + tabptr = tabptr_old; + fragptr = fragptr_old; + operPtr = operPtr_old; + // done + if (ret == (Uint32)-1) { + ret = terrorCode ? (-(int)terrorCode) : -1; } + return ret; } -void -Dbtup::tuxReadKeys(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* pkSize, Uint32* pkData) +int +Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* dataOut) { ljamEntry(); + // use own variables instead of globals FragrecordPtr fragPtr; fragPtr.i = fragPtrI; ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord); @@ -184,25 +187,45 @@ Dbtup::tuxReadKeys(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* pk pagePtr.i = pageId; ptrCheckGuard(pagePtr, cnoOfPage, page); const Uint32 tabDescriptor = tablePtr.p->tabDescriptor; - const Uint32 numAttrs = tablePtr.p->noOfKeyAttr; const Uint32* attrIds = &tableDescriptor[tablePtr.p->readKeyArray].tabDescr; - const Uint32* tupleHeader = &pagePtr.p->pageWord[pageOffset]; - Uint32 size = 0; - for (Uint32 i = 0; i < numAttrs; i++) { - AttributeHeader ah(attrIds[i]); - const Uint32 attrId = ah.getAttributeId(); - const Uint32 index = tabDescriptor + (attrId << ZAD_LOG_SIZE); - const Uint32 desc1 = tableDescriptor[index].tabDescr; - const Uint32 desc2 = tableDescriptor[index + 1].tabDescr; - ndbrequire(! AttributeDescriptor::getNullable(desc1)); - const Uint32 attrSize = AttributeDescriptor::getSizeInWords(desc1); - const Uint32* attrData = tupleHeader + AttributeOffset::getOffset(desc2); - for (Uint32 j = 0; j < attrSize; j++) { - pkData[size + j] = attrData[j]; + const Uint32 numAttrs = tablePtr.p->noOfKeyAttr; + // read pk attributes from original tuple + // save globals + TablerecPtr tabptr_old = tabptr; + FragrecordPtr fragptr_old = fragptr; + OperationrecPtr operPtr_old = operPtr; + // new globals + tabptr = tablePtr; + fragptr = fragPtr; + operPtr.i = RNIL; + operPtr.p = NULL; + // do it + int ret = readAttributes(pagePtr.p, pageOffset, attrIds, numAttrs, dataOut, ZNIL, true); + // restore globals + tabptr = tabptr_old; + fragptr = fragptr_old; + operPtr = operPtr_old; + // done + if (ret != (Uint32)-1) { + // remove headers + Uint32 n = 0; + Uint32 i = 0; + while (n < numAttrs) { + const AttributeHeader ah(dataOut[i]); + Uint32 size = ah.getDataSize(); + ndbrequire(size != 0); + for (Uint32 j = 0; j < size; j++) { + dataOut[i + j - n] = dataOut[i + j + 1]; + } + n += 1; + i += 1 + size; } - size += attrSize; + ndbrequire(i == ret); + ret -= numAttrs; + } else { + ret = terrorCode ? (-(int)terrorCode) : -1; } - *pkSize = size; + return ret; } bool diff --git a/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp b/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp index 09889a51fa3..efea312b865 100644 --- a/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp +++ b/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp @@ -20,12 +20,14 @@ #include <RefConvert.hpp> #include <ndb_limits.h> #include <pc.hpp> +#include <signaldata/TupFrag.hpp> #include <signaldata/FsConf.hpp> #include <signaldata/FsRemoveReq.hpp> #include <signaldata/DropTab.hpp> #include <signaldata/AlterTab.hpp> #include <AttributeDescriptor.hpp> #include "AttributeOffset.hpp" +#include <my_sys.h> #define ljam() { jamLine(20000 + __LINE__); } #define ljamEntry() { jamEntryLine(20000 + __LINE__); } @@ -52,7 +54,10 @@ void Dbtup::execTUPFRAGREQ(Signal* signal) /* Uint32 schemaVersion = signal->theData[8];*/ Uint32 noOfKeyAttr = signal->theData[9]; - Uint32 noOfNewAttr = signal->theData[10]; + Uint32 noOfNewAttr = (signal->theData[10] & 0xFFFF); + /* DICT sends number of character sets in upper half */ + Uint32 noOfCharsets = (signal->theData[10] >> 16); + Uint32 checksumIndicator = signal->theData[11]; Uint32 noOfAttributeGroups = signal->theData[12]; Uint32 globalCheckpointIdIndicator = signal->theData[13]; @@ -75,6 +80,7 @@ void Dbtup::execTUPFRAGREQ(Signal* signal) fragOperPtr.p->attributeCount = noOfAttributes; fragOperPtr.p->freeNullBit = noOfNullAttr; fragOperPtr.p->noOfNewAttrCount = noOfNewAttr; + fragOperPtr.p->charsetIndex = 0; ndbrequire(reqinfo == ZADDFRAG); @@ -156,6 +162,7 @@ void Dbtup::execTUPFRAGREQ(Signal* signal) regTabPtr.p->tupheadsize = regTabPtr.p->tupGCPIndex; regTabPtr.p->noOfKeyAttr = noOfKeyAttr; + regTabPtr.p->noOfCharsets = noOfCharsets; regTabPtr.p->noOfAttr = noOfAttributes; regTabPtr.p->noOfNewAttr = noOfNewAttr; regTabPtr.p->noOfNullAttr = noOfNullAttr; @@ -163,13 +170,14 @@ void Dbtup::execTUPFRAGREQ(Signal* signal) regTabPtr.p->notNullAttributeMask.clear(); - Uint32 tableDescriptorRef = allocTabDescr(noOfAttributes, noOfKeyAttr, noOfAttributeGroups); + Uint32 offset[10]; + Uint32 tableDescriptorRef = allocTabDescr(regTabPtr.p, offset); if (tableDescriptorRef == RNIL) { ljam(); fragrefuse4Lab(signal, fragOperPtr, regFragPtr, regTabPtr.p, fragId); return; }//if - setUpDescriptorReferences(tableDescriptorRef, regTabPtr.p); + setUpDescriptorReferences(tableDescriptorRef, regTabPtr.p, offset); } else { ljam(); fragOperPtr.p->definingFragment = false; @@ -251,6 +259,9 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) ptrCheckGuard(fragOperPtr, cnoOfFragoprec, fragoperrec); Uint32 attrId = signal->theData[2]; Uint32 attrDescriptor = signal->theData[3]; + // DICT sends extended type (ignored) and charset number + Uint32 extType = (signal->theData[4] & 0xFF); + Uint32 csNumber = (signal->theData[4] >> 16); regTabPtr.i = fragOperPtr.p->tableidFrag; ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec); @@ -304,6 +315,29 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) } else { ndbrequire(false); }//if + if (csNumber != 0) { + CHARSET_INFO* cs = get_charset(csNumber, MYF(0)); + if (cs == NULL) { + ljam(); + terrorCode = TupAddAttrRef::InvalidCharset; + addattrrefuseLab(signal, regFragPtr, fragOperPtr, regTabPtr.p, fragId); + return; + } + Uint32 i = 0; + while (i < fragOperPtr.p->charsetIndex) { + ljam(); + if (regTabPtr.p->charsetArray[i] == cs) + break; + i++; + } + if (i == fragOperPtr.p->charsetIndex) { + ljam(); + fragOperPtr.p->charsetIndex++; + } + ndbrequire(i < regTabPtr.p->noOfCharsets); + regTabPtr.p->charsetArray[i] = cs; + AttributeOffset::setCharsetPos(attrDes2, i); + } setTabDescrWord(firstTabDesIndex + 1, attrDes2); if (regTabPtr.p->tupheadsize > MAX_TUPLE_SIZE_IN_WORDS) { @@ -340,20 +374,28 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) return; }//Dbtup::execTUP_ADD_ATTRREQ() +/* + * Descriptor has these parts: + * + * 0 readFunctionArray ( one for each attribute ) + * 1 updateFunctionArray ( ditto ) + * 2 charsetArray ( pointers to distinct CHARSET_INFO ) + * 3 readKeyArray ( attribute ids of keys ) + * 4 attributeGroupDescriptor ( currently size 1 but unused ) + * 5 tabDescriptor ( attribute descriptors, each ZAD_SIZE ) + */ + void Dbtup::setUpDescriptorReferences(Uint32 descriptorReference, - Tablerec* const regTabPtr) + Tablerec* const regTabPtr, + const Uint32* offset) { - Uint32 noOfAttributes = regTabPtr->noOfAttr; - descriptorReference += ZTD_SIZE; - ReadFunction * tmp = (ReadFunction*)&tableDescriptor[descriptorReference].tabDescr; - regTabPtr->readFunctionArray = tmp; - regTabPtr->updateFunctionArray = (UpdateFunction*)(tmp + noOfAttributes); - - TableDescriptor * start = &tableDescriptor[descriptorReference]; - TableDescriptor * end = (TableDescriptor*)(tmp + 2 * noOfAttributes); - regTabPtr->readKeyArray = descriptorReference + (end - start); - regTabPtr->attributeGroupDescriptor = regTabPtr->readKeyArray + regTabPtr->noOfKeyAttr; - regTabPtr->tabDescriptor = regTabPtr->attributeGroupDescriptor + regTabPtr->noOfAttributeGroups; + Uint32* desc = &tableDescriptor[descriptorReference].tabDescr; + regTabPtr->readFunctionArray = (ReadFunction*)(desc + offset[0]); + regTabPtr->updateFunctionArray = (UpdateFunction*)(desc + offset[1]); + regTabPtr->charsetArray = (CHARSET_INFO**)(desc + offset[2]); + regTabPtr->readKeyArray = descriptorReference + offset[3]; + regTabPtr->attributeGroupDescriptor = descriptorReference + offset[4]; + regTabPtr->tabDescriptor = descriptorReference + offset[5]; }//Dbtup::setUpDescriptorReferences() Uint32 @@ -491,14 +533,18 @@ void Dbtup::releaseTabDescr(Tablerec* const regTabPtr) Uint32 descriptor = regTabPtr->readKeyArray; if (descriptor != RNIL) { ljam(); + Uint32 offset[10]; + getTabDescrOffsets(regTabPtr, offset); + regTabPtr->tabDescriptor = RNIL; regTabPtr->readKeyArray = RNIL; regTabPtr->readFunctionArray = NULL; regTabPtr->updateFunctionArray = NULL; + regTabPtr->charsetArray = NULL; regTabPtr->attributeGroupDescriptor= RNIL; - Uint32 sizeFunctionArrays = 2 * (regTabPtr->noOfAttr * sizeOfReadFunction()); - descriptor -= (sizeFunctionArrays + ZTD_SIZE); + // move to start of descriptor + descriptor -= offset[3]; Uint32 retNo = getTabDescrWord(descriptor + ZTD_DATASIZE); ndbrequire(getTabDescrWord(descriptor + ZTD_HEADER) == ZTD_TYPE_NORMAL); ndbrequire(retNo == getTabDescrWord((descriptor + retNo) - ZTD_TR_SIZE)); diff --git a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp index cc47ef7e78f..a4e7cb47249 100644 --- a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp +++ b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp @@ -35,6 +35,7 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr) for (Uint32 i = 0; i < regTabPtr->noOfAttr; i++) { Uint32 attrDescriptorStart = startDescriptor + (i << ZAD_LOG_SIZE); Uint32 attrDescriptor = tableDescriptor[attrDescriptorStart].tabDescr; + Uint32 attrOffset = tableDescriptor[attrDescriptorStart + 1].tabDescr; if (!AttributeDescriptor::getDynamic(attrDescriptor)) { if ((AttributeDescriptor::getArrayType(attrDescriptor) == ZNON_ARRAY) || (AttributeDescriptor::getArrayType(attrDescriptor) == ZFIXED_ARRAY)) { @@ -54,6 +55,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr) } else { ndbrequire(false); }//if + // replace read function of char attribute + if (AttributeOffset::getCharsetFlag(attrOffset)) { + ljam(); + regTabPtr->readFunctionArray[i] = &Dbtup::readCharNotNULL; + } } else { if (AttributeDescriptor::getSizeInWords(attrDescriptor) == 1) { ljam(); @@ -72,6 +78,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr) regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHZeroWordNULLable; regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable; }//if + // replace read function of char attribute + if (AttributeOffset::getCharsetFlag(attrOffset)) { + ljam(); + regTabPtr->readFunctionArray[i] = &Dbtup::readCharNULLable; + } }//if } else if (AttributeDescriptor::getArrayType(attrDescriptor) == ZVAR_ARRAY) { if (!AttributeDescriptor::getNullable(attrDescriptor)) { @@ -146,10 +157,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr) /* ---------------------------------------------------------------- */ int Dbtup::readAttributes(Page* const pagePtr, Uint32 tupHeadOffset, - Uint32* inBuffer, + const Uint32* inBuffer, Uint32 inBufLen, Uint32* outBuffer, - Uint32 maxRead) + Uint32 maxRead, + bool xfrmFlag) { Tablerec* const regTabPtr = tabptr.p; Uint32 numAttributes = regTabPtr->noOfAttr; @@ -162,6 +174,7 @@ int Dbtup::readAttributes(Page* const pagePtr, tCheckOffset = regTabPtr->tupheadsize; tMaxRead = maxRead; tTupleHeader = &pagePtr->pageWord[tupHeadOffset]; + tXfrmFlag = xfrmFlag; ndbrequire(tupHeadOffset + tCheckOffset <= ZWORDS_ON_PAGE); while (inBufIndex < inBufLen) { @@ -542,6 +555,74 @@ Dbtup::readDynSmallVarSize(Uint32* outBuffer, return false; }//Dbtup::readDynSmallVarSize() + +bool +Dbtup::readCharNotNULL(Uint32* outBuffer, + AttributeHeader* ahOut, + Uint32 attrDescriptor, + Uint32 attrDes2) +{ + Uint32 indexBuf = tOutBufIndex; + Uint32 readOffset = AttributeOffset::getOffset(attrDes2); + Uint32 attrNoOfWords = AttributeDescriptor::getSizeInWords(attrDescriptor); + Uint32 newIndexBuf = indexBuf + attrNoOfWords; + Uint32 maxRead = tMaxRead; + + ndbrequire((readOffset + attrNoOfWords - 1) < tCheckOffset); + if (newIndexBuf <= maxRead) { + ljam(); + ahOut->setDataSize(attrNoOfWords); + if (! tXfrmFlag) { + MEMCOPY_NO_WORDS(&outBuffer[indexBuf], + &tTupleHeader[readOffset], + attrNoOfWords); + } else { + ljam(); + Tablerec* regTabPtr = tabptr.p; + Uint32 i = AttributeOffset::getCharsetPos(attrDes2); + ndbrequire(i < tabptr.p->noOfCharsets); + // not const in MySQL + CHARSET_INFO* cs = tabptr.p->charsetArray[i]; + // XXX should strip Uint32 null padding + const unsigned nBytes = attrNoOfWords << 2; + unsigned n = + (*cs->coll->strnxfrm)(cs, + (uchar*)&outBuffer[indexBuf], + nBytes, + (const uchar*)&tTupleHeader[readOffset], + nBytes); + // pad with ascii spaces + while (n < nBytes) + ((uchar*)&outBuffer[indexBuf])[n++] = 0x20; + } + tOutBufIndex = newIndexBuf; + return true; + } else { + ljam(); + terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR; + return false; + } +} + +bool +Dbtup::readCharNULLable(Uint32* outBuffer, + AttributeHeader* ahOut, + Uint32 attrDescriptor, + Uint32 attrDes2) +{ + if (!nullFlagCheck(attrDes2)) { + ljam(); + return readCharNotNULL(outBuffer, + ahOut, + attrDescriptor, + attrDes2); + } else { + ljam(); + ahOut->setNULL(); + return true; + } +} + /* ---------------------------------------------------------------------- */ /* THIS ROUTINE IS USED TO UPDATE A NUMBER OF ATTRIBUTES. IT IS */ /* USED BY THE INSERT ROUTINE, THE UPDATE ROUTINE AND IT CAN BE */ diff --git a/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp b/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp index d31ab43f108..642ba270760 100644 --- a/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp +++ b/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp @@ -31,12 +31,33 @@ /* memory attached to fragments (could be allocated per table */ /* instead. Performs its task by a buddy algorithm. */ /* **************************************************************** */ -Uint32 Dbtup::allocTabDescr(Uint32 noOfAttributes, Uint32 noOfKeyAttr, Uint32 noOfAttributeGroups) + +Uint32 +Dbtup::getTabDescrOffsets(const Tablerec* regTabPtr, Uint32* offset) +{ + // belongs to configure.in + unsigned sizeOfPointer = sizeof(CHARSET_INFO*); + ndbrequire((sizeOfPointer & 0x3) == 0); + sizeOfPointer = (sizeOfPointer >> 2); + // do in layout order and return offsets (see DbtupMeta.cpp) + Uint32 allocSize = 0; + // magically aligned to 8 bytes + offset[0] = allocSize += ZTD_SIZE; + offset[1] = allocSize += regTabPtr->noOfAttr * sizeOfReadFunction(); + offset[2] = allocSize += regTabPtr->noOfAttr * sizeOfReadFunction(); + offset[3] = allocSize += regTabPtr->noOfCharsets * sizeOfPointer; + offset[4] = allocSize += regTabPtr->noOfKeyAttr; + offset[5] = allocSize += regTabPtr->noOfAttributeGroups; + allocSize += regTabPtr->noOfAttr * ZAD_SIZE; + allocSize += ZTD_TRAILER_SIZE; + // return number of words + return allocSize; +} + +Uint32 Dbtup::allocTabDescr(const Tablerec* regTabPtr, Uint32* offset) { Uint32 reference = RNIL; - Uint32 allocSize = (ZTD_SIZE + ZTD_TRAILER_SIZE) + (noOfAttributes * ZAD_SIZE); - allocSize += noOfAttributeGroups; - allocSize += ((2 * noOfAttributes * sizeOfReadFunction()) + noOfKeyAttr); + Uint32 allocSize = getTabDescrOffsets(regTabPtr, offset); /* ---------------------------------------------------------------- */ /* ALWAYS ALLOCATE A MULTIPLE OF 16 BYTES */ /* ---------------------------------------------------------------- */ diff --git a/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp b/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp index a93ff4566e7..c0b49364ee6 100644 --- a/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp +++ b/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp @@ -751,7 +751,8 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr, &tableDescriptor[regTabPtr->readKeyArray].tabDescr, regTabPtr->noOfKeyAttr, keyBuffer, - ZATTR_BUFFER_SIZE); + ZATTR_BUFFER_SIZE, + true); ndbrequire(noPrimKey != (Uint32)-1); Uint32 numAttrsToRead; @@ -792,7 +793,8 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr, &readBuffer[0], numAttrsToRead, mainBuffer, - ZATTR_BUFFER_SIZE); + ZATTR_BUFFER_SIZE, + true); ndbrequire(noMainWords != (Uint32)-1); } else { ljam(); @@ -816,7 +818,8 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr, &readBuffer[0], numAttrsToRead, copyBuffer, - ZATTR_BUFFER_SIZE); + ZATTR_BUFFER_SIZE, + true); ndbrequire(noCopyWords != (Uint32)-1); if ((noMainWords == noCopyWords) && diff --git a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp index 36ac20611bb..8dca52cec04 100644 --- a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp +++ b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp @@ -163,11 +163,6 @@ private: static const unsigned AttributeHeaderSize = 1; /* - * Array of pointers to TUP table attributes. Always read-on|y. - */ - typedef const Uint32** TableData; - - /* * Logical tuple address, "local key". Identifies table tuples. */ typedef Uint32 TupAddr; @@ -330,11 +325,15 @@ private: /* * Attribute metadata. Size must be multiple of word size. + * + * Prefix comparison of char data must use strxfrm and binary + * comparison. The charset is currently unused. */ struct DescAttr { Uint32 m_attrDesc; // standard AttributeDescriptor Uint16 m_primaryAttrId; - Uint16 m_typeId; + unsigned m_typeId : 6; + unsigned m_charset : 10; }; static const unsigned DescAttrSize = sizeof(DescAttr) >> 2; @@ -553,9 +552,9 @@ private: void execREAD_CONFIG_REQ(Signal* signal); // utils void setKeyAttrs(const Frag& frag); - void readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, TableData keyData); - void readTablePk(const Frag& frag, TreeEnt ent, unsigned& pkSize, Data pkData); - void copyAttrs(const Frag& frag, TableData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize); + void readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData); + void readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize); + void copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize); /* * DbtuxMeta.cpp @@ -622,17 +621,15 @@ private: /* * DbtuxSearch.cpp */ - void searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos); - void searchToRemove(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos); + void searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); + void searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); void searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos); /* * DbtuxCmp.cpp */ - int cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, ConstData entryData, unsigned maxlen = MaxAttrDataSize); - int cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, TableData entryKey); + int cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, ConstData entryData, unsigned maxlen = MaxAttrDataSize); int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen = MaxAttrDataSize); - int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, TableData entryKey); /* * DbtuxDebug.cpp @@ -679,17 +676,27 @@ private: Uint32 c_typeOfStart; /* - * Array of index key attribute ids in AttributeHeader format. - * Includes fixed attribute sizes. This is global data set at - * operation start and is not passed as a parameter. + * Global data set at operation start. Unpacked from index metadata. + * Not passed as parameter to methods. Invalid across timeslices. + * + * TODO inline all into index metadata */ + + // index key attr ids with sizes in AttributeHeader format Data c_keyAttrs; - // buffer for search key data as pointers to TUP storage - TableData c_searchKey; + // pointers to index key comparison functions + NdbSqlUtil::Cmp** c_sqlCmp; + + /* + * Other buffers used during the operation. + */ + + // buffer for search key data with headers + Data c_searchKey; - // buffer for current entry key data as pointers to TUP storage - TableData c_entryKey; + // buffer for current entry key data with headers + Data c_entryKey; // buffer for scan bounds and keyinfo (primary key) Data c_dataBuffer; diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp index debb5252386..549720cc17c 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp @@ -18,21 +18,24 @@ #include "Dbtux.hpp" /* - * Search key vs node prefix. + * Search key vs node prefix or entry * - * The comparison starts at given attribute position (in fact 0). The - * position is updated by number of equal initial attributes found. The - * prefix may be partial in which case CmpUnknown may be returned. + * The comparison starts at given attribute position. The position is + * updated by number of equal initial attributes found. The entry data + * may be partial in which case CmpUnknown may be returned. */ int -Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, ConstData entryData, unsigned maxlen) +Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, ConstData entryData, unsigned maxlen) { const unsigned numAttrs = frag.m_numAttrs; const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff); // number of words of attribute data left unsigned len2 = maxlen; - // skip to right position in search key - searchKey += start; + // skip to right position in search key only + for (unsigned i = 0; i < start; i++) { + jam(); + searchKey += AttributeHeaderSize + searchKey.ah().getDataSize(); + } int ret = 0; while (start < numAttrs) { if (len2 <= AttributeHeaderSize) { @@ -41,22 +44,21 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons break; } len2 -= AttributeHeaderSize; - if (*searchKey != 0) { + if (! searchKey.ah().isNULL()) { if (! entryData.ah().isNULL()) { jam(); // current attribute const DescAttr& descAttr = descEnt.m_descAttr[start]; - const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId); - ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined); // full data size const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc); ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize()); const unsigned size2 = min(size1, len2); len2 -= size2; // compare - const Uint32* const p1 = *searchKey; + NdbSqlUtil::Cmp* const cmp = c_sqlCmp[start]; + const Uint32* const p1 = &searchKey[AttributeHeaderSize]; const Uint32* const p2 = &entryData[AttributeHeaderSize]; - ret = (*type.m_cmp)(p1, p2, size1, size2); + ret = (*cmp)(0, p1, p2, size1, size2); if (ret != 0) { jam(); break; @@ -75,7 +77,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons break; } } - searchKey += 1; + searchKey += AttributeHeaderSize + searchKey.ah().getDataSize(); entryData += AttributeHeaderSize + entryData.ah().getDataSize(); start++; } @@ -83,60 +85,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons } /* - * Search key vs tree entry. - * - * Start position is updated as in previous routine. - */ -int -Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, TableData entryKey) -{ - const unsigned numAttrs = frag.m_numAttrs; - const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff); - // skip to right position - searchKey += start; - entryKey += start; - int ret = 0; - while (start < numAttrs) { - if (*searchKey != 0) { - if (*entryKey != 0) { - jam(); - // current attribute - const DescAttr& descAttr = descEnt.m_descAttr[start]; - const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId); - ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined); - // full data size - const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc); - // compare - const Uint32* const p1 = *searchKey; - const Uint32* const p2 = *entryKey; - ret = (*type.m_cmp)(p1, p2, size1, size1); - if (ret != 0) { - jam(); - break; - } - } else { - jam(); - // not NULL > NULL - ret = +1; - break; - } - } else { - if (*entryKey != 0) { - jam(); - // NULL < not NULL - ret = -1; - break; - } - } - searchKey += 1; - entryKey += 1; - start++; - } - return ret; -} - -/* - * Scan bound vs node prefix. + * Scan bound vs node prefix or entry. * * Compare lower or upper bound and index attribute data. The attribute * data may be partial in which case CmpUnknown may be returned. @@ -183,9 +132,8 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne jam(); // current attribute const unsigned index = boundInfo.ah().getAttributeId(); + ndbrequire(index < frag.m_numAttrs); const DescAttr& descAttr = descEnt.m_descAttr[index]; - const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId); - ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined); ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId); // full data size const unsigned size1 = boundInfo.ah().getDataSize(); @@ -193,9 +141,10 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne const unsigned size2 = min(size1, len2); len2 -= size2; // compare + NdbSqlUtil::Cmp* const cmp = c_sqlCmp[index]; const Uint32* const p1 = &boundInfo[AttributeHeaderSize]; const Uint32* const p2 = &entryData[AttributeHeaderSize]; - int ret = (*type.m_cmp)(p1, p2, size1, size2); + int ret = (*cmp)(0, p1, p2, size1, size2); if (ret != 0) { jam(); return ret; @@ -244,72 +193,3 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne return +1; } } - -/* - * Scan bound vs tree entry. - */ -int -Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, TableData entryKey) -{ - const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff); - // direction 0-lower 1-upper - ndbrequire(dir <= 1); - // initialize type to equality - unsigned type = 4; - while (boundCount != 0) { - // get and skip bound type - type = boundInfo[0]; - boundInfo += 1; - if (! boundInfo.ah().isNULL()) { - if (*entryKey != 0) { - jam(); - // current attribute - const unsigned index = boundInfo.ah().getAttributeId(); - const DescAttr& descAttr = descEnt.m_descAttr[index]; - const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId); - ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined); - // full data size - const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc); - // compare - const Uint32* const p1 = &boundInfo[AttributeHeaderSize]; - const Uint32* const p2 = *entryKey; - int ret = (*type.m_cmp)(p1, p2, size1, size1); - if (ret != 0) { - jam(); - return ret; - } - } else { - jam(); - // not NULL > NULL - return +1; - } - } else { - jam(); - if (*entryKey != 0) { - jam(); - // NULL < not NULL - return -1; - } - } - boundInfo += AttributeHeaderSize + boundInfo.ah().getDataSize(); - entryKey += 1; - boundCount -= 1; - } - if (dir == 0) { - // lower bound - jam(); - if (type == 1) { - jam(); - return +1; - } - return -1; - } else { - // upper bound - jam(); - if (type == 3) { - jam(); - return -1; - } - return +1; - } -} diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp index 11f4f12b7f6..8d31d2c6a55 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp @@ -207,14 +207,10 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& } // check ordering within node for (unsigned j = 1; j < node.getOccup(); j++) { - unsigned start = 0; const TreeEnt ent1 = node.getEnt(j - 1); const TreeEnt ent2 = node.getEnt(j); - if (j == 1) { - readKeyAttrs(frag, ent1, start, c_searchKey); - } else { - memcpy(c_searchKey, c_entryKey, frag.m_numAttrs << 2); - } + unsigned start = 0; + readKeyAttrs(frag, ent1, start, c_searchKey); readKeyAttrs(frag, ent2, start, c_entryKey); int ret = cmpSearchKey(frag, start, c_searchKey, c_entryKey); if (ret == 0) diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp index f6f1610c8c1..39cd8e25184 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp @@ -16,8 +16,6 @@ #define DBTUX_GEN_CPP #include "Dbtux.hpp" -#include <signaldata/TuxContinueB.hpp> -#include <signaldata/TuxContinueB.hpp> Dbtux::Dbtux(const Configuration& conf) : SimulatedBlock(DBTUX, conf), @@ -202,8 +200,9 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal) } // allocate buffers c_keyAttrs = (Uint32*)allocRecord("c_keyAttrs", sizeof(Uint32), MaxIndexAttributes); - c_searchKey = (TableData)allocRecord("c_searchKey", sizeof(Uint32*), MaxIndexAttributes); - c_entryKey = (TableData)allocRecord("c_entryKey", sizeof(Uint32*), MaxIndexAttributes); + c_sqlCmp = (NdbSqlUtil::Cmp**)allocRecord("c_sqlCmp", sizeof(NdbSqlUtil::Cmp*), MaxIndexAttributes); + c_searchKey = (Uint32*)allocRecord("c_searchKey", sizeof(Uint32), MaxAttrDataSize); + c_entryKey = (Uint32*)allocRecord("c_entryKey", sizeof(Uint32), MaxAttrDataSize); c_dataBuffer = (Uint32*)allocRecord("c_dataBuffer", sizeof(Uint64), (MaxAttrDataSize + 1) >> 1); // ack ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend(); @@ -218,7 +217,8 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal) void Dbtux::setKeyAttrs(const Frag& frag) { - Data keyAttrs = c_keyAttrs; // global + Data keyAttrs = c_keyAttrs; // global + NdbSqlUtil::Cmp** sqlCmp = c_sqlCmp; // global const unsigned numAttrs = frag.m_numAttrs; const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff); for (unsigned i = 0; i < numAttrs; i++) { @@ -227,75 +227,71 @@ Dbtux::setKeyAttrs(const Frag& frag) // set attr id and fixed size keyAttrs.ah() = AttributeHeader(descAttr.m_primaryAttrId, size); keyAttrs += 1; + // set comparison method pointer + const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getTypeBinary(descAttr.m_typeId); + ndbrequire(sqlType.m_cmp != 0); + *(sqlCmp++) = sqlType.m_cmp; } } void -Dbtux::readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, TableData keyData) +Dbtux::readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData) { ConstData keyAttrs = c_keyAttrs; // global const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit]; const TupLoc tupLoc = ent.m_tupLoc; const Uint32 tupVersion = ent.m_tupVersion; ndbrequire(start < frag.m_numAttrs); - const unsigned numAttrs = frag.m_numAttrs - start; - // start applies to both keys and output data + const Uint32 numAttrs = frag.m_numAttrs - start; + // skip to start position in keyAttrs only keyAttrs += start; - keyData += start; - c_tup->tuxReadAttrs(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, tupVersion, numAttrs, keyAttrs, keyData); + int ret = c_tup->tuxReadAttrs(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, tupVersion, keyAttrs, numAttrs, keyData); jamEntry(); + // TODO handle error + ndbrequire(ret > 0); } void -Dbtux::readTablePk(const Frag& frag, TreeEnt ent, unsigned& pkSize, Data pkData) +Dbtux::readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize) { const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit]; const TupLoc tupLoc = ent.m_tupLoc; - Uint32 size = 0; - c_tup->tuxReadKeys(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, &size, pkData); - ndbrequire(size != 0); - pkSize = size; + int ret = c_tup->tuxReadPk(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, pkData); + jamEntry(); + // TODO handle error + ndbrequire(ret > 0); + pkSize = ret; } /* - * Input is pointers to table attributes. Output is array of attribute - * data with headers. Copies whatever fits. + * Copy attribute data with headers. Input is all index key data. + * Copies whatever fits. */ void -Dbtux::copyAttrs(const Frag& frag, TableData data1, Data data2, unsigned maxlen2) +Dbtux::copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2) { - ConstData keyAttrs = c_keyAttrs; // global - const unsigned numAttrs = frag.m_numAttrs; + unsigned n = frag.m_numAttrs; unsigned len2 = maxlen2; - for (unsigned n = 0; n < numAttrs; n++) { + while (n != 0) { jam(); - const unsigned attrId = keyAttrs.ah().getAttributeId(); - const unsigned dataSize = keyAttrs.ah().getDataSize(); - const Uint32* const p1 = *data1; - if (p1 != 0) { - if (len2 == 0) - return; - data2.ah() = AttributeHeader(attrId, dataSize); - data2 += 1; - len2 -= 1; - unsigned n = dataSize; - for (unsigned i = 0; i < dataSize; i++) { - if (len2 == 0) - return; - *data2 = p1[i]; - data2 += 1; - len2 -= 1; - } - } else { + const unsigned dataSize = data1.ah().getDataSize(); + // copy header + if (len2 == 0) + return; + data2[0] = data1[0]; + data1 += 1; + data2 += 1; + len2 -= 1; + // copy data + for (unsigned i = 0; i < dataSize; i++) { if (len2 == 0) return; - data2.ah() = AttributeHeader(attrId, 0); - data2.ah().setNULL(); - data2 += 1; + data2[i] = data1[i]; len2 -= 1; } - keyAttrs += 1; - data1 += 1; + data1 += dataSize; + data2 += dataSize; + n -= 1; } #ifdef VM_TRACE memset(data2, DataFillByte, len2 << 2); diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp index 4bb3b940d91..3c0af3ca79d 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp @@ -178,19 +178,31 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal) descAttr.m_attrDesc = req->attrDescriptor; descAttr.m_primaryAttrId = req->primaryAttrId; descAttr.m_typeId = req->extTypeInfo & 0xFF; + descAttr.m_charset = (req->extTypeInfo >> 16); #ifdef VM_TRACE if (debugFlags & DebugMeta) { debugOut << "Add frag " << fragPtr.i << " attr " << attrId << " " << descAttr << endl; } #endif - // check if type is valid and has a comparison method - const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId); + // check that type is valid and has a binary comparison method + const NdbSqlUtil::Type& type = NdbSqlUtil::getTypeBinary(descAttr.m_typeId); if (type.m_typeId == NdbSqlUtil::Type::Undefined || type.m_cmp == 0) { jam(); errorCode = TuxAddAttrRef::InvalidAttributeType; break; } +#ifdef dbtux_uses_charset + if (descAttr.m_charset != 0) { + CHARSET_INFO *cs = get_charset(descAttr.m_charset, MYF(0)); + // here use the non-binary type + if (! NdbSqlUtil::usable_in_ordered_index(descAttr.m_typeId, cs)) { + jam(); + errorCode = TuxAddAttrRef::InvalidCharset; + break; + } + } +#endif if (indexPtr.p->m_numAttrs == fragOpPtr.p->m_numAttrsRecvd) { jam(); // initialize tree header diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp index c4c33ff931f..5b161d3c4ce 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp @@ -112,6 +112,7 @@ Dbtux::execACC_SCANREQ(Signal* signal) void Dbtux::execTUX_BOUND_INFO(Signal* signal) { + jamEntry(); struct BoundInfo { unsigned offset; unsigned size; @@ -389,7 +390,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) jam(); const TreeEnt ent = scan.m_scanPos.m_ent; // read tuple key - readTablePk(frag, ent, pkSize, pkData); + readTablePk(frag, ent, pkData, pkSize); // get read lock or exclusive lock AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; @@ -480,7 +481,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) jam(); if (pkSize == 0) { jam(); - readTablePk(frag, ent, pkSize, pkData); + readTablePk(frag, ent, pkData, pkSize); } } // conf signal diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp index 84048b308bc..bffbb8f5594 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp @@ -25,7 +25,7 @@ * TODO optimize for initial equal attrs in node min/max */ void -Dbtux::searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos) +Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos) { const TreeHead& tree = frag.m_tree; const unsigned numAttrs = frag.m_numAttrs; @@ -144,7 +144,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt sear * to it. */ void -Dbtux::searchToRemove(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos) +Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos) { const TreeHead& tree = frag.m_tree; const unsigned numAttrs = frag.m_numAttrs; diff --git a/ndb/src/kernel/blocks/dbtux/Times.txt b/ndb/src/kernel/blocks/dbtux/Times.txt index 84819ddcf97..03473353a52 100644 --- a/ndb/src/kernel/blocks/dbtux/Times.txt +++ b/ndb/src/kernel/blocks/dbtux/Times.txt @@ -83,7 +83,7 @@ optim 13 mc02/a 39 ms 59 ms 50 pct mc02/c 9 ms 12 ms 44 pct mc02/d 246 ms 289 ms 17 pct -[ case d: what happened to PK read performance? ] +[ case d: bug in testOIBasic killed PK read performance ] optim 14 mc02/a 41 ms 60 ms 44 pct mc02/b 46 ms 81 ms 73 pct @@ -91,5 +91,21 @@ optim 14 mc02/a 41 ms 60 ms 44 pct mc02/d 242 ms 285 ms 17 pct [ case b: do long keys suffer from many subroutine calls? ] +[ case d: bug in testOIBasic killed PK read performance ] + +none mc02/a 35 ms 60 ms 71 pct + mc02/b 42 ms 75 ms 76 pct + mc02/c 5 ms 12 ms 106 pct + mc02/d 165 ms 238 ms 44 pct + +[ johan re-installed mc02 as fedora gcc-3.3.2 ] +[ case c: table scan has improved... ] + +charsets mc02/a 35 ms 60 ms 71 pct + mc02/b 42 ms 84 ms 97 pct + mc02/c 5 ms 12 ms 109 pct + mc02/d 190 ms 236 ms 23 pct + +[ case b: TUX can no longer use pointers to TUP data ] vim: set et: diff --git a/ndb/src/kernel/vm/MetaData.hpp b/ndb/src/kernel/vm/MetaData.hpp index f6a941e8f9f..11e262664c1 100644 --- a/ndb/src/kernel/vm/MetaData.hpp +++ b/ndb/src/kernel/vm/MetaData.hpp @@ -107,6 +107,9 @@ public: /* Number of primary key attributes (should be computed) */ Uint16 noOfPrimkey; + /* Number of distinct character sets (computed) */ + Uint16 noOfCharsets; + /* Length of primary key in words (should be computed) */ /* For ordered index this is tree node size in words */ Uint16 tupKeyLength; diff --git a/ndb/src/ndbapi/NdbIndexOperation.cpp b/ndb/src/ndbapi/NdbIndexOperation.cpp index 0742f8d911c..bf4b07842f6 100644 --- a/ndb/src/ndbapi/NdbIndexOperation.cpp +++ b/ndb/src/ndbapi/NdbIndexOperation.cpp @@ -164,6 +164,7 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo, Uint32 tData; Uint32 tKeyInfoPosition; const char* aValue = aValuePassed; + Uint32 xfrmData[1024]; Uint32 tempData[1024]; if ((theStatus == OperationDefined) && @@ -224,6 +225,21 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo, m_theIndexDefined[i][2] = true; Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize; + const char* aValueToWrite = aValue; + + CHARSET_INFO* cs = tAttrInfo->m_cs; + if (cs != 0) { + // current limitation: strxfrm does not increase length + assert(cs->strxfrm_multiply == 1); + unsigned n = + (*cs->coll->strnxfrm)(cs, + (uchar*)xfrmData, sizeof(xfrmData), + (const uchar*)aValue, sizeInBytes); + while (n < sizeInBytes) + ((uchar*)xfrmData)[n++] = 0x20; + aValue = (char*)xfrmData; + } + Uint32 bitsInLastWord = 8 * (sizeInBytes & 3) ; Uint32 totalSizeInWords = (sizeInBytes + 3)/4;// Inc. bits in last word Uint32 sizeInWords = sizeInBytes / 4; // Exc. bits in last word @@ -314,13 +330,20 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo, if ((tOpType == InsertRequest) || (tOpType == WriteRequest)) { if (!tAttrInfo->m_indexOnly){ + // invalid data can crash kernel + if (cs != NULL && + (*cs->cset->well_formed_len)(cs, + aValueToWrite, + aValueToWrite + sizeInBytes, + sizeInBytes) != sizeInBytes) + goto equal_error4; Uint32 ahValue; Uint32 sz = totalSizeInWords; AttributeHeader::init(&ahValue, tAttrId, sz); insertATTRINFO( ahValue ); - insertATTRINFOloop((Uint32*)aValue, sizeInWords); + insertATTRINFOloop((Uint32*)aValueToWrite, sizeInWords); if (bitsInLastWord != 0) { - tData = *(Uint32*)(aValue + (sizeInWords << 2)); + tData = *(Uint32*)(aValueToWrite + (sizeInWords << 2)); tData = convertEndian(tData); tData = tData & ((1 << bitsInLastWord) - 1); tData = convertEndian(tData); @@ -411,7 +434,10 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo, equal_error3: setErrorCodeAbort(4209); - + return -1; + + equal_error4: + setErrorCodeAbort(744); return -1; } diff --git a/ndb/src/ndbapi/NdbOperationDefine.cpp b/ndb/src/ndbapi/NdbOperationDefine.cpp index 6d995e06582..ad838ddd601 100644 --- a/ndb/src/ndbapi/NdbOperationDefine.cpp +++ b/ndb/src/ndbapi/NdbOperationDefine.cpp @@ -492,6 +492,17 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo, // Insert Attribute Id into ATTRINFO part. const Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize; + + CHARSET_INFO* cs = tAttrInfo->m_cs; + // invalid data can crash kernel + if (cs != NULL && + (*cs->cset->well_formed_len)(cs, + aValue, + aValue + sizeInBytes, + sizeInBytes) != sizeInBytes) { + setErrorCodeAbort(744); + return -1; + } #if 0 tAttrSize = tAttrInfo->theAttrSize; tArraySize = tAttrInfo->theArraySize; diff --git a/ndb/src/ndbapi/NdbOperationSearch.cpp b/ndb/src/ndbapi/NdbOperationSearch.cpp index 19cb133dbf7..e5166fc4a82 100644 --- a/ndb/src/ndbapi/NdbOperationSearch.cpp +++ b/ndb/src/ndbapi/NdbOperationSearch.cpp @@ -60,6 +60,7 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, Uint32 tData; Uint32 tKeyInfoPosition; const char* aValue = aValuePassed; + Uint32 xfrmData[1024]; Uint32 tempData[1024]; if ((theStatus == OperationDefined) && @@ -117,6 +118,21 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, theTupleKeyDefined[i][2] = true; Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize; + const char* aValueToWrite = aValue; + + CHARSET_INFO* cs = tAttrInfo->m_cs; + if (cs != 0) { + // current limitation: strxfrm does not increase length + assert(cs->strxfrm_multiply == 1); + unsigned n = + (*cs->coll->strnxfrm)(cs, + (uchar*)xfrmData, sizeof(xfrmData), + (const uchar*)aValue, sizeInBytes); + while (n < sizeInBytes) + ((uchar*)xfrmData)[n++] = 0x20; + aValue = (char*)xfrmData; + } + Uint32 bitsInLastWord = 8 * (sizeInBytes & 3) ; Uint32 totalSizeInWords = (sizeInBytes + 3)/4; // Inc. bits in last word Uint32 sizeInWords = sizeInBytes / 4; // Exc. bits in last word @@ -206,13 +222,20 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, if ((tOpType == InsertRequest) || (tOpType == WriteRequest)) { if (!tAttrInfo->m_indexOnly){ + // invalid data can crash kernel + if (cs != NULL && + (*cs->cset->well_formed_len)(cs, + aValueToWrite, + aValueToWrite + sizeInBytes, + sizeInBytes) != sizeInBytes) + goto equal_error4; Uint32 ahValue; const Uint32 sz = totalSizeInWords; AttributeHeader::init(&ahValue, tAttrId, sz); insertATTRINFO( ahValue ); - insertATTRINFOloop((Uint32*)aValue, sizeInWords); + insertATTRINFOloop((Uint32*)aValueToWrite, sizeInWords); if (bitsInLastWord != 0) { - tData = *(Uint32*)(aValue + (sizeInWords << 2)); + tData = *(Uint32*)(aValueToWrite + (sizeInWords << 2)); tData = convertEndian(tData); tData = tData & ((1 << bitsInLastWord) - 1); tData = convertEndian(tData); @@ -311,6 +334,10 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, equal_error3: setErrorCodeAbort(4209); return -1; + + equal_error4: + setErrorCodeAbort(744); + return -1; } /****************************************************************************** diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index 86c174c4545..ac5f4268386 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -1096,30 +1096,43 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, theStatus == SetBound && (0 <= type && type <= 4) && len <= 8000) { - // bound type - + // insert bound type insertATTRINFO(type); - // attribute header Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize; + // normalize char bound + CHARSET_INFO* cs = tAttrInfo->m_cs; + Uint32 xfrmData[2000]; + if (cs != NULL && aValue != NULL) { + // current limitation: strxfrm does not increase length + assert(cs->strxfrm_multiply == 1); + unsigned n = + (*cs->coll->strnxfrm)(cs, + (uchar*)xfrmData, sizeof(xfrmData), + (const uchar*)aValue, sizeInBytes); + while (n < sizeInBytes) + ((uchar*)xfrmData)[n++] = 0x20; + aValue = (char*)xfrmData; + } if (len != sizeInBytes && (len != 0)) { setErrorCodeAbort(4209); return -1; } + // insert attribute header len = aValue != NULL ? sizeInBytes : 0; Uint32 tIndexAttrId = tAttrInfo->m_attrId; Uint32 sizeInWords = (len + 3) / 4; AttributeHeader ah(tIndexAttrId, sizeInWords); insertATTRINFO(ah.m_value); if (len != 0) { - // attribute data + // insert attribute data if ((UintPtr(aValue) & 0x3) == 0 && (len & 0x3) == 0) insertATTRINFOloop((const Uint32*)aValue, sizeInWords); else { - Uint32 temp[2000]; - memcpy(temp, aValue, len); + Uint32 tempData[2000]; + memcpy(tempData, aValue, len); while ((len & 0x3) != 0) - ((char*)temp)[len++] = 0; - insertATTRINFOloop(temp, sizeInWords); + ((char*)tempData)[len++] = 0; + insertATTRINFOloop(tempData, sizeInWords); } } @@ -1206,11 +1219,11 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, if((r1_null ^ (unsigned)r2->isNULL())){ return (r1_null ? -1 : 1); } - Uint32 type = NdbColumnImpl::getImpl(* r1->m_column).m_extType; + const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column); Uint32 size = (r1->theAttrSize * r1->theArraySize + 3) / 4; if(!r1_null){ - const NdbSqlUtil::Type& t = NdbSqlUtil::getType(type); - int r = (*t.m_cmp)(d1, d2, size, size); + const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(col.m_extType); + int r = (*sqlType.m_cmp)(col.m_cs, d1, d2, size, size); if(r){ assert(r != NdbSqlUtil::CmpUnknown); return r; diff --git a/ndb/src/ndbapi/ndberror.c b/ndb/src/ndbapi/ndberror.c index 2ebcf4be444..037c441cc38 100644 --- a/ndb/src/ndbapi/ndberror.c +++ b/ndb/src/ndbapi/ndberror.c @@ -282,7 +282,7 @@ ErrorBundle ErrorCodes[] = { { 741, SE, "Unsupported alter table" }, { 742, SE, "Unsupported attribute type in index" }, { 743, SE, "Unsupported character set in table or index" }, - { 744, SE, "Character conversion error" }, + { 744, SE, "Character string is invalid for given character set" }, { 241, SE, "Invalid schema object version" }, { 283, SE, "Table is being dropped" }, { 284, SE, "Table not defined in transaction coordinator" }, diff --git a/ndb/test/ndbapi/testOIBasic.cpp b/ndb/test/ndbapi/testOIBasic.cpp index ac28b96af80..f9eb3514926 100644 --- a/ndb/test/ndbapi/testOIBasic.cpp +++ b/ndb/test/ndbapi/testOIBasic.cpp @@ -28,6 +28,7 @@ #include <NdbCondition.h> #include <NdbThread.h> #include <NdbTick.h> +#include <my_sys.h> // options @@ -37,6 +38,8 @@ struct Opt { const char* m_bound; const char* m_case; bool m_core; + const char* m_csname; + CHARSET_INFO* m_cs; bool m_dups; NdbDictionary::Object::FragmentType m_fragtype; unsigned m_idxloop; @@ -59,6 +62,8 @@ struct Opt { m_bound("01234"), m_case(0), m_core(false), + m_csname("latin1_bin"), + m_cs(0), m_dups(false), m_fragtype(NdbDictionary::Object::FragUndefined), m_idxloop(4), @@ -94,6 +99,7 @@ printhelp() << " -bound xyz use only these bound types 0-4 [" << d.m_bound << "]" << endl << " -case abc only given test cases (letters a-z)" << endl << " -core core dump on error [" << d.m_core << "]" << endl + << " -csname S charset (collation) of non-pk char column [" << d.m_csname << "]" << endl << " -dups allow duplicate tuples from index scan [" << d.m_dups << "]" << endl << " -fragtype T fragment type single/small/medium/large" << endl << " -index xyz only given index numbers (digits 1-9)" << endl @@ -983,6 +989,10 @@ createtable(Par par) c.setLength(col.m_length); c.setPrimaryKey(col.m_pk); c.setNullable(col.m_nullable); + if (c.getCharset()) { // test if char type + if (! col.m_pk) + c.setCharset(par.m_cs); + } t.addColumn(c); } con.m_dic = con.m_ndb->getDictionary(); @@ -3149,6 +3159,10 @@ runtest(Par par) LL1("start"); if (par.m_seed != 0) srandom(par.m_seed); + assert(par.m_csname != 0); + CHARSET_INFO* cs; + CHK((cs = get_charset_by_name(par.m_csname, MYF(0))) != 0 || (cs = get_charset_by_csname(par.m_csname, MY_CS_PRIMARY, MYF(0))) != 0); + par.m_cs = cs; Con con; CHK(con.connect() == 0); par.m_con = &con; @@ -3232,6 +3246,12 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535) g_opt.m_core = true; continue; } + if (strcmp(arg, "-csname") == 0) { + if (++argv, --argc > 0) { + g_opt.m_csname = strdup(argv[0]); + continue; + } + } if (strcmp(arg, "-dups") == 0) { g_opt.m_dups = true; continue; |