summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorpekka@mysql.com <>2004-09-17 14:18:32 +0200
committerpekka@mysql.com <>2004-09-17 14:18:32 +0200
commite274047c24a972b3774a69e80164fb847afbaaaf (patch)
treecdf1acfbaba13a23d58a84c53ed3803c6d801099
parentf21e6d81ff354f4f29bca1cf2b8d9e670ef47663 (diff)
parentd02998e20f8f77c911b004644c32569adbb86c9e (diff)
downloadmariadb-git-e274047c24a972b3774a69e80164fb847afbaaaf.tar.gz
Merge pnousiainen@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb
into mysql.com:/space/pekka/ndb/version/my41-cc
-rw-r--r--mysql-test/r/ndb_charset.result191
-rw-r--r--mysql-test/r/ndb_index.result2
-rw-r--r--mysql-test/t/ndb_charset.test159
-rw-r--r--mysql-test/t/ndb_index.test2
-rw-r--r--ndb/include/kernel/signaldata/CreateTable.hpp3
-rw-r--r--ndb/include/kernel/signaldata/LqhFrag.hpp2
-rw-r--r--ndb/include/kernel/signaldata/TupFrag.hpp10
-rw-r--r--ndb/include/util/NdbSqlUtil.hpp11
-rw-r--r--ndb/src/common/util/NdbSqlUtil.cpp107
-rw-r--r--ndb/src/kernel/blocks/dbdict/Dbdict.cpp33
-rw-r--r--ndb/src/kernel/blocks/dblqh/Dblqh.hpp2
-rw-r--r--ndb/src/kernel/blocks/dblqh/DblqhMain.cpp2
-rw-r--r--ndb/src/kernel/blocks/dbtup/AttributeOffset.hpp61
-rw-r--r--ndb/src/kernel/blocks/dbtup/Dbtup.hpp44
-rw-r--r--ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp21
-rw-r--r--ndb/src/kernel/blocks/dbtup/DbtupGen.cpp1
-rw-r--r--ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp103
-rw-r--r--ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp80
-rw-r--r--ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp85
-rw-r--r--ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp29
-rw-r--r--ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp9
-rw-r--r--ndb/src/kernel/blocks/dbtux/Dbtux.hpp49
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp158
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp8
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp84
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp16
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp5
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp4
-rw-r--r--ndb/src/kernel/blocks/dbtux/Times.txt18
-rw-r--r--ndb/src/kernel/vm/MetaData.hpp3
-rw-r--r--ndb/src/ndbapi/NdbIndexOperation.cpp32
-rw-r--r--ndb/src/ndbapi/NdbOperationDefine.cpp11
-rw-r--r--ndb/src/ndbapi/NdbOperationSearch.cpp31
-rw-r--r--ndb/src/ndbapi/NdbScanOperation.cpp35
-rw-r--r--ndb/src/ndbapi/ndberror.c2
-rw-r--r--ndb/test/ndbapi/testOIBasic.cpp20
36 files changed, 1066 insertions, 367 deletions
diff --git a/mysql-test/r/ndb_charset.result b/mysql-test/r/ndb_charset.result
new file mode 100644
index 00000000000..93429a1fcb0
--- /dev/null
+++ b/mysql-test/r/ndb_charset.result
@@ -0,0 +1,191 @@
+drop table if exists t1;
+create table t1 (
+a char(3) character set latin1 collate latin1_bin primary key
+) engine=ndb;
+insert into t1 values('aAa');
+insert into t1 values('aaa');
+insert into t1 values('AAA');
+select * from t1 order by a;
+a
+AAA
+aAa
+aaa
+select * from t1 where a = 'aAa';
+a
+aAa
+select * from t1 where a = 'aaa';
+a
+aaa
+select * from t1 where a = 'AaA';
+a
+select * from t1 where a = 'AAA';
+a
+AAA
+drop table t1;
+create table t1 (
+a char(3) character set latin1 collate latin1_swedish_ci primary key
+) engine=ndb;
+insert into t1 values('aAa');
+insert into t1 values('aaa');
+ERROR 23000: Duplicate entry 'aaa' for key 1
+insert into t1 values('AAA');
+ERROR 23000: Duplicate entry 'AAA' for key 1
+select * from t1 order by a;
+a
+aAa
+select * from t1 where a = 'aAa';
+a
+aAa
+select * from t1 where a = 'aaa';
+a
+aAa
+select * from t1 where a = 'AaA';
+a
+aAa
+select * from t1 where a = 'AAA';
+a
+aAa
+drop table t1;
+create table t1 (
+p int primary key,
+a char(3) character set latin1 collate latin1_bin not null,
+unique key(a)
+) engine=ndb;
+insert into t1 values(1, 'aAa');
+insert into t1 values(2, 'aaa');
+insert into t1 values(3, 'AAA');
+select * from t1 order by p;
+p a
+1 aAa
+2 aaa
+3 AAA
+select * from t1 where a = 'aAa';
+p a
+1 aAa
+select * from t1 where a = 'aaa';
+p a
+2 aaa
+select * from t1 where a = 'AaA';
+p a
+select * from t1 where a = 'AAA';
+p a
+3 AAA
+drop table t1;
+create table t1 (
+p int primary key,
+a char(3) character set latin1 collate latin1_swedish_ci not null,
+unique key(a)
+) engine=ndb;
+insert into t1 values(1, 'aAa');
+insert into t1 values(2, 'aaa');
+ERROR 23000: Can't write, because of unique constraint, to table 't1'
+insert into t1 values(3, 'AAA');
+ERROR 23000: Can't write, because of unique constraint, to table 't1'
+select * from t1 order by p;
+p a
+1 aAa
+select * from t1 where a = 'aAa';
+p a
+1 aAa
+select * from t1 where a = 'aaa';
+p a
+1 aAa
+select * from t1 where a = 'AaA';
+p a
+1 aAa
+select * from t1 where a = 'AAA';
+p a
+1 aAa
+drop table t1;
+create table t1 (
+p int primary key,
+a char(3) character set latin1 collate latin1_bin not null,
+index(a)
+) engine=ndb;
+insert into t1 values(1, 'aAa');
+insert into t1 values(2, 'aaa');
+insert into t1 values(3, 'AAA');
+insert into t1 values(4, 'aAa');
+insert into t1 values(5, 'aaa');
+insert into t1 values(6, 'AAA');
+select * from t1 order by p;
+p a
+1 aAa
+2 aaa
+3 AAA
+4 aAa
+5 aaa
+6 AAA
+explain select * from t1 where a = 'zZz' order by p;
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE t1 ref a a 3 const 10 Using where; Using filesort
+select * from t1 where a = 'aAa' order by p;
+p a
+1 aAa
+4 aAa
+select * from t1 where a = 'aaa' order by p;
+p a
+2 aaa
+5 aaa
+select * from t1 where a = 'AaA' order by p;
+p a
+select * from t1 where a = 'AAA' order by p;
+p a
+3 AAA
+6 AAA
+drop table t1;
+create table t1 (
+p int primary key,
+a char(3) character set latin1 collate latin1_swedish_ci not null,
+index(a)
+) engine=ndb;
+insert into t1 values(1, 'aAa');
+insert into t1 values(2, 'aaa');
+insert into t1 values(3, 'AAA');
+insert into t1 values(4, 'aAa');
+insert into t1 values(5, 'aaa');
+insert into t1 values(6, 'AAA');
+select * from t1 order by p;
+p a
+1 aAa
+2 aaa
+3 AAA
+4 aAa
+5 aaa
+6 AAA
+explain select * from t1 where a = 'zZz' order by p;
+id select_type table type possible_keys key key_len ref rows Extra
+1 SIMPLE t1 ref a a 3 const 10 Using where; Using filesort
+select * from t1 where a = 'aAa' order by p;
+p a
+1 aAa
+2 aaa
+3 AAA
+4 aAa
+5 aaa
+6 AAA
+select * from t1 where a = 'aaa' order by p;
+p a
+1 aAa
+2 aaa
+3 AAA
+4 aAa
+5 aaa
+6 AAA
+select * from t1 where a = 'AaA' order by p;
+p a
+1 aAa
+2 aaa
+3 AAA
+4 aAa
+5 aaa
+6 AAA
+select * from t1 where a = 'AAA' order by p;
+p a
+1 aAa
+2 aaa
+3 AAA
+4 aAa
+5 aaa
+6 AAA
+drop table t1;
diff --git a/mysql-test/r/ndb_index.result b/mysql-test/r/ndb_index.result
index dd92c237ace..5702552b0b5 100644
--- a/mysql-test/r/ndb_index.result
+++ b/mysql-test/r/ndb_index.result
@@ -4,7 +4,7 @@ PORT varchar(16) NOT NULL,
ACCESSNODE varchar(16) NOT NULL,
POP varchar(48) NOT NULL,
ACCESSTYPE int unsigned NOT NULL,
-CUSTOMER_ID varchar(20) NOT NULL,
+CUSTOMER_ID varchar(20) collate latin1_bin NOT NULL,
PROVIDER varchar(16),
TEXPIRE int unsigned,
NUM_IP int unsigned,
diff --git a/mysql-test/t/ndb_charset.test b/mysql-test/t/ndb_charset.test
new file mode 100644
index 00000000000..b9f28ed0faf
--- /dev/null
+++ b/mysql-test/t/ndb_charset.test
@@ -0,0 +1,159 @@
+--source include/have_ndb.inc
+
+--disable_warnings
+drop table if exists t1;
+--enable_warnings
+
+#
+# Minimal NDB charset test.
+#
+
+# pk - binary
+
+create table t1 (
+ a char(3) character set latin1 collate latin1_bin primary key
+) engine=ndb;
+# ok
+insert into t1 values('aAa');
+insert into t1 values('aaa');
+insert into t1 values('AAA');
+# 3
+select * from t1 order by a;
+# 1
+select * from t1 where a = 'aAa';
+# 1
+select * from t1 where a = 'aaa';
+# 0
+select * from t1 where a = 'AaA';
+# 1
+select * from t1 where a = 'AAA';
+drop table t1;
+
+# pk - case insensitive
+
+create table t1 (
+ a char(3) character set latin1 collate latin1_swedish_ci primary key
+) engine=ndb;
+# ok
+insert into t1 values('aAa');
+# fail
+--error 1062
+insert into t1 values('aaa');
+--error 1062
+insert into t1 values('AAA');
+# 1
+select * from t1 order by a;
+# 1
+select * from t1 where a = 'aAa';
+# 1
+select * from t1 where a = 'aaa';
+# 1
+select * from t1 where a = 'AaA';
+# 1
+select * from t1 where a = 'AAA';
+drop table t1;
+
+# unique hash index - binary
+
+create table t1 (
+ p int primary key,
+ a char(3) character set latin1 collate latin1_bin not null,
+ unique key(a)
+) engine=ndb;
+# ok
+insert into t1 values(1, 'aAa');
+insert into t1 values(2, 'aaa');
+insert into t1 values(3, 'AAA');
+# 3
+select * from t1 order by p;
+# 1
+select * from t1 where a = 'aAa';
+# 1
+select * from t1 where a = 'aaa';
+# 0
+select * from t1 where a = 'AaA';
+# 1
+select * from t1 where a = 'AAA';
+drop table t1;
+
+# unique hash index - case insensitive
+
+create table t1 (
+ p int primary key,
+ a char(3) character set latin1 collate latin1_swedish_ci not null,
+ unique key(a)
+) engine=ndb;
+# ok
+insert into t1 values(1, 'aAa');
+# fail
+--error 1169
+insert into t1 values(2, 'aaa');
+--error 1169
+insert into t1 values(3, 'AAA');
+# 1
+select * from t1 order by p;
+# 1
+select * from t1 where a = 'aAa';
+# 1
+select * from t1 where a = 'aaa';
+# 1
+select * from t1 where a = 'AaA';
+# 1
+select * from t1 where a = 'AAA';
+drop table t1;
+
+# ordered index - binary
+
+create table t1 (
+ p int primary key,
+ a char(3) character set latin1 collate latin1_bin not null,
+ index(a)
+) engine=ndb;
+# ok
+insert into t1 values(1, 'aAa');
+insert into t1 values(2, 'aaa');
+insert into t1 values(3, 'AAA');
+insert into t1 values(4, 'aAa');
+insert into t1 values(5, 'aaa');
+insert into t1 values(6, 'AAA');
+# 6
+select * from t1 order by p;
+# plan
+explain select * from t1 where a = 'zZz' order by p;
+# 2
+select * from t1 where a = 'aAa' order by p;
+# 2
+select * from t1 where a = 'aaa' order by p;
+# 0
+select * from t1 where a = 'AaA' order by p;
+# 2
+select * from t1 where a = 'AAA' order by p;
+drop table t1;
+
+# ordered index - case insensitive
+
+create table t1 (
+ p int primary key,
+ a char(3) character set latin1 collate latin1_swedish_ci not null,
+ index(a)
+) engine=ndb;
+# ok
+insert into t1 values(1, 'aAa');
+insert into t1 values(2, 'aaa');
+insert into t1 values(3, 'AAA');
+insert into t1 values(4, 'aAa');
+insert into t1 values(5, 'aaa');
+insert into t1 values(6, 'AAA');
+# 6
+select * from t1 order by p;
+# plan
+explain select * from t1 where a = 'zZz' order by p;
+# 6
+select * from t1 where a = 'aAa' order by p;
+# 6
+select * from t1 where a = 'aaa' order by p;
+# 6
+select * from t1 where a = 'AaA' order by p;
+# 6
+select * from t1 where a = 'AAA' order by p;
+drop table t1;
diff --git a/mysql-test/t/ndb_index.test b/mysql-test/t/ndb_index.test
index d3977dc3ea4..e65b24a9b20 100644
--- a/mysql-test/t/ndb_index.test
+++ b/mysql-test/t/ndb_index.test
@@ -9,7 +9,7 @@ CREATE TABLE t1 (
ACCESSNODE varchar(16) NOT NULL,
POP varchar(48) NOT NULL,
ACCESSTYPE int unsigned NOT NULL,
- CUSTOMER_ID varchar(20) NOT NULL,
+ CUSTOMER_ID varchar(20) collate latin1_bin NOT NULL,
PROVIDER varchar(16),
TEXPIRE int unsigned,
NUM_IP int unsigned,
diff --git a/ndb/include/kernel/signaldata/CreateTable.hpp b/ndb/include/kernel/signaldata/CreateTable.hpp
index 424367f28d5..67e510d2ed0 100644
--- a/ndb/include/kernel/signaldata/CreateTable.hpp
+++ b/ndb/include/kernel/signaldata/CreateTable.hpp
@@ -89,7 +89,8 @@ public:
ArraySizeTooBig = 737,
RecordTooBig = 738,
InvalidPrimaryKeySize = 739,
- NullablePrimaryKey = 740
+ NullablePrimaryKey = 740,
+ InvalidCharset = 743
};
private:
diff --git a/ndb/include/kernel/signaldata/LqhFrag.hpp b/ndb/include/kernel/signaldata/LqhFrag.hpp
index 116e9c01ca0..13dfafcc653 100644
--- a/ndb/include/kernel/signaldata/LqhFrag.hpp
+++ b/ndb/include/kernel/signaldata/LqhFrag.hpp
@@ -130,7 +130,7 @@ private:
Uint32 keyLength;
Uint32 nextLCP;
Uint32 noOfKeyAttr;
- Uint32 noOfNewAttr;
+ Uint32 noOfNewAttr; // noOfCharsets in upper half
Uint32 checksumIndicator;
Uint32 noOfAttributeGroups;
Uint32 GCPIndicator;
diff --git a/ndb/include/kernel/signaldata/TupFrag.hpp b/ndb/include/kernel/signaldata/TupFrag.hpp
index c0ce22651aa..c1e861c5dff 100644
--- a/ndb/include/kernel/signaldata/TupFrag.hpp
+++ b/ndb/include/kernel/signaldata/TupFrag.hpp
@@ -119,12 +119,13 @@ class TupAddAttrReq {
friend class Dblqh;
friend class Dbtux;
public:
- STATIC_CONST( SignalLength = 4 );
+ STATIC_CONST( SignalLength = 5 );
private:
Uint32 tupConnectPtr;
Uint32 notused1;
Uint32 attrId;
Uint32 attrDescriptor;
+ Uint32 extTypeInfo;
};
class TupAddAttrConf {
@@ -141,6 +142,10 @@ class TupAddAttrRef {
friend class Dbtup;
public:
STATIC_CONST( SignalLength = 2 );
+ enum ErrorCode {
+ NoError = 0,
+ InvalidCharset = 743
+ };
private:
Uint32 userPtr;
Uint32 errorCode;
@@ -178,7 +183,8 @@ public:
STATIC_CONST( SignalLength = 2 );
enum ErrorCode {
NoError = 0,
- InvalidAttributeType = 831,
+ InvalidAttributeType = 742,
+ InvalidCharset = 743,
InvalidNodeSize = 832
};
private:
diff --git a/ndb/include/util/NdbSqlUtil.hpp b/ndb/include/util/NdbSqlUtil.hpp
index df1cb716f93..3062d1e4e1b 100644
--- a/ndb/include/util/NdbSqlUtil.hpp
+++ b/ndb/include/util/NdbSqlUtil.hpp
@@ -40,11 +40,14 @@ public:
* Compare kernel attribute values. Returns -1, 0, +1 for less,
* equal, greater, respectively. Parameters are pointers to values,
* full attribute size in words, and size of available data in words.
+ * There is also pointer to type specific extra info. Char types
+ * receive CHARSET_INFO in it.
+ *
* If available size is less than full size, CmpUnknown may be
* returned. If a value cannot be parsed, it compares like NULL i.e.
* less than any valid value.
*/
- typedef int Cmp(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size);
+ typedef int Cmp(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size);
enum CmpResult {
CmpLess = -1,
@@ -55,6 +58,7 @@ public:
/**
* Kernel data types. Must match m_typeList in NdbSqlUtil.cpp.
+ * Now also must match types in NdbDictionary.
*/
struct Type {
enum Enum {
@@ -91,6 +95,11 @@ public:
static const Type& getType(Uint32 typeId);
/**
+ * Get type by id but replace char type by corresponding binary type.
+ */
+ static const Type& getTypeBinary(Uint32 typeId);
+
+ /**
* Check character set.
*/
static bool usable_in_pk(Uint32 typeId, const void* cs);
diff --git a/ndb/src/common/util/NdbSqlUtil.cpp b/ndb/src/common/util/NdbSqlUtil.cpp
index afb9bcfff62..6e4e5919e43 100644
--- a/ndb/src/common/util/NdbSqlUtil.cpp
+++ b/ndb/src/common/util/NdbSqlUtil.cpp
@@ -176,10 +176,29 @@ NdbSqlUtil::getType(Uint32 typeId)
return m_typeList[Type::Undefined];
}
+const NdbSqlUtil::Type&
+NdbSqlUtil::getTypeBinary(Uint32 typeId)
+{
+ switch (typeId) {
+ case Type::Char:
+ typeId = Type::Binary;
+ break;
+ case Type::Varchar:
+ typeId = Type::Varbinary;
+ break;
+ case Type::Text:
+ typeId = Type::Blob;
+ break;
+ default:
+ break;
+ }
+ return getType(typeId);
+}
+
// compare
int
-NdbSqlUtil::cmpTinyint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpTinyint(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
union { Uint32 p[1]; Int8 v; } u1, u2;
@@ -193,7 +212,7 @@ NdbSqlUtil::cmpTinyint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 s
}
int
-NdbSqlUtil::cmpTinyunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpTinyunsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
union { Uint32 p[1]; Uint8 v; } u1, u2;
@@ -207,7 +226,7 @@ NdbSqlUtil::cmpTinyunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uin
}
int
-NdbSqlUtil::cmpSmallint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpSmallint(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
union { Uint32 p[1]; Int16 v; } u1, u2;
@@ -221,7 +240,7 @@ NdbSqlUtil::cmpSmallint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32
}
int
-NdbSqlUtil::cmpSmallunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpSmallunsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
union { Uint32 p[1]; Uint16 v; } u1, u2;
@@ -235,7 +254,7 @@ NdbSqlUtil::cmpSmallunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Ui
}
int
-NdbSqlUtil::cmpMediumint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpMediumint(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
union { const Uint32* p; const unsigned char* v; } u1, u2;
@@ -251,7 +270,7 @@ NdbSqlUtil::cmpMediumint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32
}
int
-NdbSqlUtil::cmpMediumunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpMediumunsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
union { const Uint32* p; const unsigned char* v; } u1, u2;
@@ -267,7 +286,7 @@ NdbSqlUtil::cmpMediumunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, U
}
int
-NdbSqlUtil::cmpInt(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpInt(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
union { Uint32 p[1]; Int32 v; } u1, u2;
@@ -281,7 +300,7 @@ NdbSqlUtil::cmpInt(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
}
int
-NdbSqlUtil::cmpUnsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpUnsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
union { Uint32 p[1]; Uint32 v; } u1, u2;
@@ -295,7 +314,7 @@ NdbSqlUtil::cmpUnsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32
}
int
-NdbSqlUtil::cmpBigint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpBigint(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
if (size >= 2) {
@@ -314,7 +333,7 @@ NdbSqlUtil::cmpBigint(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 si
}
int
-NdbSqlUtil::cmpBigunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpBigunsigned(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
if (size >= 2) {
@@ -333,7 +352,7 @@ NdbSqlUtil::cmpBigunsigned(const Uint32* p1, const Uint32* p2, Uint32 full, Uint
}
int
-NdbSqlUtil::cmpFloat(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpFloat(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
union { Uint32 p[1]; float v; } u1, u2;
@@ -348,7 +367,7 @@ NdbSqlUtil::cmpFloat(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 siz
}
int
-NdbSqlUtil::cmpDouble(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpDouble(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
if (size >= 2) {
@@ -368,7 +387,7 @@ NdbSqlUtil::cmpDouble(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 si
}
int
-NdbSqlUtil::cmpDecimal(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpDecimal(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
// not used by MySQL or NDB
@@ -377,27 +396,34 @@ NdbSqlUtil::cmpDecimal(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 s
}
int
-NdbSqlUtil::cmpChar(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpChar(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
- assert(full >= size && size > 0);
+ // collation does not work on prefix for some charsets
+ assert(full == size && size > 0);
/*
- * Char is blank-padded to length and null-padded to word size. There
- * is no terminator so we compare the full values.
+ * Char is blank-padded to length and null-padded to word size.
*/
- union { const Uint32* p; const char* v; } u1, u2;
+ union { const Uint32* p; const uchar* v; } u1, u2;
u1.p = p1;
u2.p = p2;
- int k = memcmp(u1.v, u2.v, size << 2);
- return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
+ // not const in MySQL
+ CHARSET_INFO* cs = (CHARSET_INFO*)(info);
+ // length in bytes including null padding to Uint32
+ uint l1 = (full << 2);
+ int k = (*cs->coll->strnncollsp)(cs, u1.v, l1, u2.v, l1);
+ return k < 0 ? -1 : k > 0 ? +1 : 0;
}
int
-NdbSqlUtil::cmpVarchar(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpVarchar(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
/*
* Varchar is not allowed to contain a null byte and the value is
* null-padded. Therefore comparison does not need to use the length.
+ *
+ * Not used before MySQL 5.0. Format is likely to change. Handle
+ * only binary collation for now.
*/
union { const Uint32* p; const char* v; } u1, u2;
u1.p = p1;
@@ -408,7 +434,7 @@ NdbSqlUtil::cmpVarchar(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 s
}
int
-NdbSqlUtil::cmpBinary(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpBinary(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
/*
@@ -422,12 +448,14 @@ NdbSqlUtil::cmpBinary(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 si
}
int
-NdbSqlUtil::cmpVarbinary(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpVarbinary(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
/*
* Binary data of variable length padded with nulls. The comparison
* does not need to use the length.
+ *
+ * Not used before MySQL 5.0. Format is likely to change.
*/
union { const Uint32* p; const unsigned char* v; } u1, u2;
u1.p = p1;
@@ -438,11 +466,13 @@ NdbSqlUtil::cmpVarbinary(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32
}
int
-NdbSqlUtil::cmpDatetime(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpDatetime(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
/*
* Datetime is CC YY MM DD hh mm ss \0
+ *
+ * Not used via MySQL.
*/
union { const Uint32* p; const unsigned char* v; } u1, u2;
u1.p = p1;
@@ -459,11 +489,13 @@ NdbSqlUtil::cmpDatetime(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32
}
int
-NdbSqlUtil::cmpTimespec(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpTimespec(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
/*
* Timespec is CC YY MM DD hh mm ss \0 NN NN NN NN
+ *
+ * Not used via MySQL.
*/
union { const Uint32* p; const unsigned char* v; } u1, u2;
u1.p = p1;
@@ -490,12 +522,11 @@ NdbSqlUtil::cmpTimespec(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32
}
int
-NdbSqlUtil::cmpBlob(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpBlob(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
assert(full >= size && size > 0);
/*
- * Blob comparison is on the inline bytes. Except for larger header
- * the format is like Varbinary.
+ * Blob comparison is on the inline bytes (null padded).
*/
const unsigned head = NDB_BLOB_HEAD_SIZE;
// skip blob head
@@ -510,21 +541,26 @@ NdbSqlUtil::cmpBlob(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size
}
int
-NdbSqlUtil::cmpText(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
+NdbSqlUtil::cmpText(const void* info, const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
- assert(full >= size && size > 0);
+ // collation does not work on prefix for some charsets
+ assert(full == size && size > 0);
/*
- * Text comparison is on the inline bytes. Except for larger header
- * the format is like Varchar.
+ * Text comparison is on the inline bytes (blank padded). Currently
+ * not supported for multi-byte charsets.
*/
const unsigned head = NDB_BLOB_HEAD_SIZE;
// skip blob head
if (size >= head + 1) {
- union { const Uint32* p; const char* v; } u1, u2;
+ union { const Uint32* p; const uchar* v; } u1, u2;
u1.p = p1 + head;
u2.p = p2 + head;
- int k = memcmp(u1.v, u2.v, (size - head) << 2);
- return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
+ // not const in MySQL
+ CHARSET_INFO* cs = (CHARSET_INFO*)(info);
+ // length in bytes including null padding to Uint32
+ uint l1 = (full << 2);
+ int k = (*cs->coll->strnncollsp)(cs, u1.v, l1, u2.v, l1);
+ return k < 0 ? -1 : k > 0 ? +1 : 0;
}
return CmpUnknown;
}
@@ -633,6 +669,7 @@ const Testcase testcase[] = {
int
main(int argc, char** argv)
{
+ ndb_init(); // for charsets
unsigned count = argc > 1 ? atoi(argv[1]) : 1000000;
ndbout_c("count = %u", count);
assert(count != 0);
diff --git a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
index d82083684b7..4757f1d2bf3 100644
--- a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
+++ b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
@@ -15,6 +15,7 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
+#include <my_sys.h>
#define DBDICT_C
#include "Dbdict.hpp"
@@ -4100,6 +4101,8 @@ Dbdict::execADD_FRAGREQ(Signal* signal) {
req->noOfKeyAttr = tabPtr.p->noOfPrimkey;
req->noOfNewAttr = 0;
+ // noOfCharsets passed to TUP in upper half
+ req->noOfNewAttr |= (tabPtr.p->noOfCharsets << 16);
req->checksumIndicator = 1;
req->noOfAttributeGroups = 1;
req->GCPIndicator = 0;
@@ -4161,6 +4164,8 @@ Dbdict::sendLQHADDATTRREQ(Signal* signal,
entry.attrId = attrPtr.p->attributeId;
entry.attrDescriptor = attrPtr.p->attributeDescriptor;
entry.extTypeInfo = attrPtr.p->extType;
+ // charset number passed to TUP, TUX in upper half
+ entry.extTypeInfo |= (attrPtr.p->extPrecision & ~0xFFFF);
if (tabPtr.p->isIndex()) {
Uint32 primaryAttrId;
if (attrPtr.p->nextAttrInTable != RNIL) {
@@ -4697,6 +4702,8 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it,
Uint32 keyLength = 0;
Uint32 attrCount = tablePtr.p->noOfAttributes;
Uint32 nullCount = 0;
+ Uint32 noOfCharsets = 0;
+ Uint16 charsets[128];
Uint32 recordLength = 0;
AttributeRecordPtr attrPtr;
c_attributeRecordHash.removeAll();
@@ -4751,6 +4758,31 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it,
attrPtr.p->extPrecision = attrDesc.AttributeExtPrecision;
attrPtr.p->extScale = attrDesc.AttributeExtScale;
attrPtr.p->extLength = attrDesc.AttributeExtLength;
+ // charset in upper half of precision
+ unsigned csNumber = (attrPtr.p->extPrecision >> 16);
+ if (csNumber != 0) {
+ CHARSET_INFO* cs = get_charset(csNumber, MYF(0));
+ if (cs == NULL) {
+ parseP->errorCode = CreateTableRef::InvalidCharset;
+ parseP->errorLine = __LINE__;
+ return;
+ }
+ unsigned i = 0;
+ while (i < noOfCharsets) {
+ if (charsets[i] == csNumber)
+ break;
+ i++;
+ }
+ if (i == noOfCharsets) {
+ noOfCharsets++;
+ if (noOfCharsets > sizeof(charsets)/sizeof(charsets[0])) {
+ parseP->errorCode = CreateTableRef::InvalidFormat;
+ parseP->errorLine = __LINE__;
+ return;
+ }
+ charsets[i] = csNumber;
+ }
+ }
/**
* Ignore incoming old-style type and recompute it.
@@ -4814,6 +4846,7 @@ void Dbdict::handleTabInfo(SimpleProperties::Reader & it,
tablePtr.p->noOfPrimkey = keyCount;
tablePtr.p->noOfNullAttr = nullCount;
+ tablePtr.p->noOfCharsets = noOfCharsets;
tablePtr.p->tupKeyLength = keyLength;
tabRequire(recordLength<= MAX_TUPLE_SIZE_IN_WORDS,
diff --git a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
index 5ddaa67a7d6..a94af7b59c8 100644
--- a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
+++ b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
@@ -455,7 +455,7 @@ public:
Uint16 totalAttrReceived;
Uint16 fragCopyCreation;
Uint16 noOfKeyAttr;
- Uint16 noOfNewAttr;
+ Uint32 noOfNewAttr; // noOfCharsets in upper half
Uint16 noOfAttributeGroups;
Uint16 lh3DistrBits;
Uint16 tableType;
diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
index 3b05a133bbb..8342870d69c 100644
--- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
+++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
@@ -1444,6 +1444,7 @@ Dblqh::sendAddAttrReq(Signal* signal)
tupreq->notused1 = 0;
tupreq->attrId = attrId;
tupreq->attrDescriptor = entry.attrDescriptor;
+ tupreq->extTypeInfo = entry.extTypeInfo;
sendSignal(fragptr.p->tupBlockref, GSN_TUP_ADD_ATTRREQ,
signal, TupAddAttrReq::SignalLength, JBB);
return;
@@ -7699,6 +7700,7 @@ void Dblqh::accScanConfScanLab(Signal* signal)
ndbrequire(sz == boundAiLength);
EXECUTE_DIRECT(DBTUX, GSN_TUX_BOUND_INFO,
signal, TuxBoundInfo::SignalLength + boundAiLength);
+ jamEntry();
if (req->errorCode != 0) {
jam();
/*
diff --git a/ndb/src/kernel/blocks/dbtup/AttributeOffset.hpp b/ndb/src/kernel/blocks/dbtup/AttributeOffset.hpp
index 0f3881e9024..2c62adab3e5 100644
--- a/ndb/src/kernel/blocks/dbtup/AttributeOffset.hpp
+++ b/ndb/src/kernel/blocks/dbtup/AttributeOffset.hpp
@@ -22,26 +22,59 @@ class AttributeOffset {
private:
static void setOffset(Uint32 & desc, Uint32 offset);
+ static void setCharsetPos(Uint32 & desc, Uint32 offset);
static void setNullFlagPos(Uint32 & desc, Uint32 offset);
static Uint32 getOffset(const Uint32 &);
+ static bool getCharsetFlag(const Uint32 &);
+ static Uint32 getCharsetPos(const Uint32 &);
static Uint32 getNullFlagPos(const Uint32 &);
static Uint32 getNullFlagOffset(const Uint32 &);
static Uint32 getNullFlagBitOffset(const Uint32 &);
static bool isNULL(const Uint32 &, const Uint32 &);
};
-#define AO_ATTRIBUTE_OFFSET_MASK (0xffff)
-#define AO_NULL_FLAG_POS_MASK (0x7ff)
-#define AO_NULL_FLAG_POS_SHIFT (21)
-#define AO_NULL_FLAG_WORD_MASK (31)
-#define AO_NULL_FLAG_OFFSET_SHIFT (5)
+/**
+ * Allow for 4096 attributes, all nullable, and for 128 different
+ * character sets.
+ *
+ * a = Attribute offset - 11 bits 0-10 ( addr word in 8 kb )
+ * c = Has charset flag 1 bits 11-11
+ * s = Charset pointer position - 7 bits 12-18 ( in table descriptor )
+ * f = Null flag offset in word - 5 bits 20-24 ( address 32 bits )
+ * w = Null word offset - 7 bits 25-32 ( f+w addr 4096 attrs )
+ *
+ * 1111111111222222222233
+ * 01234567890123456789012345678901
+ * aaaaaaaaaaacsssssss fffffwwwwwww
+ */
+
+#define AO_ATTRIBUTE_OFFSET_SHIFT 0
+#define AO_ATTRIBUTE_OFFSET_MASK 0x7ff
+
+#define AO_CHARSET_FLAG_SHIFT 11
+#define AO_CHARSET_POS_SHIFT 12
+#define AO_CHARSET_POS_MASK 127
+
+#define AO_NULL_FLAG_POS_MASK 0xfff // f+w
+#define AO_NULL_FLAG_POS_SHIFT 20
+
+#define AO_NULL_FLAG_WORD_MASK 31 // f
+#define AO_NULL_FLAG_OFFSET_SHIFT 5
inline
void
AttributeOffset::setOffset(Uint32 & desc, Uint32 offset){
ASSERT_MAX(offset, AO_ATTRIBUTE_OFFSET_MASK, "AttributeOffset::setOffset");
- desc |= offset;
+ desc |= (offset << AO_ATTRIBUTE_OFFSET_SHIFT);
+}
+
+inline
+void
+AttributeOffset::setCharsetPos(Uint32 & desc, Uint32 offset) {
+ ASSERT_MAX(offset, AO_CHARSET_POS_MASK, "AttributeOffset::setCharsetPos");
+ desc |= (1 << AO_CHARSET_FLAG_SHIFT);
+ desc |= (offset << AO_CHARSET_POS_SHIFT);
}
inline
@@ -55,7 +88,21 @@ inline
Uint32
AttributeOffset::getOffset(const Uint32 & desc)
{
- return desc & AO_ATTRIBUTE_OFFSET_MASK;
+ return (desc >> AO_ATTRIBUTE_OFFSET_SHIFT) & AO_ATTRIBUTE_OFFSET_MASK;
+}
+
+inline
+bool
+AttributeOffset::getCharsetFlag(const Uint32 & desc)
+{
+ return (desc >> AO_CHARSET_FLAG_SHIFT) & 1;
+}
+
+inline
+Uint32
+AttributeOffset::getCharsetPos(const Uint32 & desc)
+{
+ return (desc >> AO_CHARSET_POS_SHIFT) & AO_CHARSET_POS_MASK;
}
inline
diff --git a/ndb/src/kernel/blocks/dbtup/Dbtup.hpp b/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
index cb7e35ea73e..ce81c1c9bc8 100644
--- a/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
+++ b/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
@@ -502,6 +502,7 @@ struct Fragoperrec {
Uint32 attributeCount;
Uint32 freeNullBit;
Uint32 noOfNewAttrCount;
+ Uint32 charsetIndex;
BlockReference lqhBlockrefFrag;
};
typedef Ptr<Fragoperrec> FragoperrecPtr;
@@ -785,6 +786,7 @@ struct Tablerec {
ReadFunction* readFunctionArray;
UpdateFunction* updateFunctionArray;
+ CHARSET_INFO** charsetArray;
Uint32 readKeyArray;
Uint32 tabDescriptor;
@@ -796,6 +798,7 @@ struct Tablerec {
Uint16 tupheadsize;
Uint16 noOfAttr;
Uint16 noOfKeyAttr;
+ Uint16 noOfCharsets;
Uint16 noOfNewAttr;
Uint16 noOfNullAttr;
Uint16 noOfAttributeGroups;
@@ -1001,17 +1004,20 @@ public:
void tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node);
/*
- * TUX reads primary table attributes for index keys. Input is
- * attribute ids in AttributeHeader format. Output is pointers to
- * attribute data within tuple or 0 for NULL value.
+ * TUX reads primary table attributes for index keys. Tuple is
+ * specified by location of original tuple and version number. Input
+ * is attribute ids in AttributeHeader format. Output is attribute
+ * data with headers. Uses readAttributes with xfrm option set.
+ * Returns number of words or negative (-terrorCode) on error.
*/
- void tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, Uint32 numAttrs, const Uint32* attrIds, const Uint32** attrData);
+ int tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, const Uint32* attrIds, Uint32 numAttrs, Uint32* dataOut);
/*
* TUX reads primary key without headers into an array of words. Used
- * for md5 summing and when returning keyinfo.
+ * for md5 summing and when returning keyinfo. Returns number of
+ * words or negative (-terrorCode) on error.
*/
- void tuxReadKeys(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* pkSize, Uint32* pkData);
+ int tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* dataOut);
/*
* TUX checks if tuple is visible to scan.
@@ -1365,10 +1371,11 @@ private:
//------------------------------------------------------------------
int readAttributes(Page* const pagePtr,
Uint32 TupHeadOffset,
- Uint32* inBuffer,
+ const Uint32* inBuffer,
Uint32 inBufLen,
Uint32* outBuffer,
- Uint32 TmaxRead);
+ Uint32 TmaxRead,
+ bool xfrmFlag);
//------------------------------------------------------------------
//------------------------------------------------------------------
@@ -1614,6 +1621,20 @@ private:
Uint32 attrDescriptor,
Uint32 attrDes2);
+// *****************************************************************
+// Read char routines optionally (tXfrmFlag) apply strxfrm
+// *****************************************************************
+
+ bool readCharNotNULL(Uint32* outBuffer,
+ AttributeHeader* ahOut,
+ Uint32 attrDescriptor,
+ Uint32 attrDes2);
+
+ bool readCharNULLable(Uint32* outBuffer,
+ AttributeHeader* ahOut,
+ Uint32 attrDescriptor,
+ Uint32 attrDes2);
+
//------------------------------------------------------------------
//------------------------------------------------------------------
bool nullFlagCheck(Uint32 attrDes2);
@@ -1909,7 +1930,8 @@ private:
void updatePackedList(Signal* signal, Uint16 ahostIndex);
void setUpDescriptorReferences(Uint32 descriptorReference,
- Tablerec* const regTabPtr);
+ Tablerec* const regTabPtr,
+ const Uint32* offset);
void setUpKeyArray(Tablerec* const regTabPtr);
bool addfragtotab(Tablerec* const regTabPtr, Uint32 fragId, Uint32 fragIndex);
void deleteFragTab(Tablerec* const regTabPtr, Uint32 fragId);
@@ -2098,7 +2120,8 @@ private:
//-----------------------------------------------------------------------------
// Public methods
- Uint32 allocTabDescr(Uint32 noOfAttributes, Uint32 noOfKeyAttr, Uint32 noOfAttributeGroups);
+ Uint32 getTabDescrOffsets(const Tablerec* regTabPtr, Uint32* offset);
+ Uint32 allocTabDescr(const Tablerec* regTabPtr, Uint32* offset);
void freeTabDescr(Uint32 retRef, Uint32 retNo);
Uint32 getTabDescrWord(Uint32 index);
void setTabDescrWord(Uint32 index, Uint32 word);
@@ -2217,6 +2240,7 @@ private:
Uint32 tMaxRead;
Uint32 tOutBufIndex;
Uint32* tTupleHeader;
+ bool tXfrmFlag;
// updateAttributes module
Uint32 tInBufIndex;
diff --git a/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp b/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
index 0a47778f7c1..dfd1e37d4f5 100644
--- a/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
+++ b/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
@@ -903,7 +903,8 @@ int Dbtup::handleReadReq(Signal* signal,
&cinBuffer[0],
regOperPtr->attrinbufLen,
dst,
- dstLen);
+ dstLen,
+ false);
if (TnoOfDataRead != (Uint32)-1) {
/* ------------------------------------------------------------------------- */
// We have read all data into coutBuffer. Now send it to the API.
@@ -1274,7 +1275,8 @@ int Dbtup::interpreterStartLab(Signal* signal,
&cinBuffer[5],
RinitReadLen,
&dst[0],
- dstLen);
+ dstLen,
+ false);
if (TnoDataRW != (Uint32)-1) {
RattroutCounter = TnoDataRW;
RinstructionCounter += RinitReadLen;
@@ -1347,7 +1349,8 @@ int Dbtup::interpreterStartLab(Signal* signal,
&cinBuffer[RinstructionCounter],
RfinalRLen,
&dst[RattroutCounter],
- (dstLen - RattroutCounter));
+ (dstLen - RattroutCounter),
+ false);
if (TnoDataRW != (Uint32)-1) {
RattroutCounter += TnoDataRW;
} else {
@@ -1487,7 +1490,8 @@ int Dbtup::interpreterNextLab(Signal* signal,
&theAttrinfo,
(Uint32)1,
&TregMemBuffer[theRegister],
- (Uint32)3);
+ (Uint32)3,
+ false);
if (TnoDataRW == 2) {
/* ------------------------------------------------------------- */
// Two words read means that we get the instruction plus one 32
@@ -1833,7 +1837,8 @@ int Dbtup::interpreterNextLab(Signal* signal,
Int32 TnoDataR = readAttributes(pagePtr,
TupHeadOffset,
&attrId, 1,
- tmpArea, tmpAreaSz);
+ tmpArea, tmpAreaSz,
+ false);
if (TnoDataR == -1) {
jam();
@@ -1929,7 +1934,8 @@ int Dbtup::interpreterNextLab(Signal* signal,
Int32 TnoDataR = readAttributes(pagePtr,
TupHeadOffset,
&attrId, 1,
- tmpArea, tmpAreaSz);
+ tmpArea, tmpAreaSz,
+ false);
if (TnoDataR == -1) {
jam();
@@ -1957,7 +1963,8 @@ int Dbtup::interpreterNextLab(Signal* signal,
Int32 TnoDataR = readAttributes(pagePtr,
TupHeadOffset,
&attrId, 1,
- tmpArea, tmpAreaSz);
+ tmpArea, tmpAreaSz,
+ false);
if (TnoDataR == -1) {
jam();
diff --git a/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp b/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
index 1e57f127fbc..f3391ff7b59 100644
--- a/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
+++ b/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
@@ -1067,6 +1067,7 @@ Dbtup::initTab(Tablerec* const regTabPtr)
}//for
regTabPtr->readFunctionArray = NULL;
regTabPtr->updateFunctionArray = NULL;
+ regTabPtr->charsetArray = NULL;
regTabPtr->tabDescriptor = RNIL;
regTabPtr->attributeGroupDescriptor = RNIL;
diff --git a/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp b/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp
index ec2c63c736e..2dd707ebafc 100644
--- a/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp
+++ b/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp
@@ -112,10 +112,11 @@ Dbtup::tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& no
node = &pagePtr.p->pageWord[pageOffset] + attrDataOffset;
}
-void
-Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, Uint32 numAttrs, const Uint32* attrIds, const Uint32** attrData)
+int
+Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, const Uint32* attrIds, Uint32 numAttrs, Uint32* dataOut)
{
ljamEntry();
+ // use own variables instead of globals
FragrecordPtr fragPtr;
fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
@@ -134,6 +135,7 @@ Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tu
while (true) {
ptrCheckGuard(opPtr, cnoOfOprec, operationrec);
if (opPtr.p->realPageIdC != RNIL) {
+ // update page and offset
pagePtr.i = opPtr.p->realPageIdC;
pageOffset = opPtr.p->pageOffsetC;
ptrCheckGuard(pagePtr, cnoOfPage, page);
@@ -147,33 +149,34 @@ Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tu
ndbrequire(++loopGuard < (1 << ZTUP_VERSION_BITS));
}
}
- const Uint32 tabDescriptor = tablePtr.p->tabDescriptor;
- const Uint32* tupleHeader = &pagePtr.p->pageWord[pageOffset];
- for (Uint32 i = 0; i < numAttrs; i++) {
- AttributeHeader ah(attrIds[i]);
- const Uint32 attrId = ah.getAttributeId();
- const Uint32 index = tabDescriptor + (attrId << ZAD_LOG_SIZE);
- const Uint32 desc1 = tableDescriptor[index].tabDescr;
- const Uint32 desc2 = tableDescriptor[index + 1].tabDescr;
- if (AttributeDescriptor::getNullable(desc1)) {
- Uint32 offset = AttributeOffset::getNullFlagOffset(desc2);
- ndbrequire(offset < tablePtr.p->tupNullWords);
- offset += tablePtr.p->tupNullIndex;
- ndbrequire(offset < tablePtr.p->tupheadsize);
- if (AttributeOffset::isNULL(tupleHeader[offset], desc2)) {
- ljam();
- attrData[i] = 0;
- continue;
- }
- }
- attrData[i] = tupleHeader + AttributeOffset::getOffset(desc2);
+ // read key attributes from found tuple version
+ // save globals
+ TablerecPtr tabptr_old = tabptr;
+ FragrecordPtr fragptr_old = fragptr;
+ OperationrecPtr operPtr_old = operPtr;
+ // new globals
+ tabptr = tablePtr;
+ fragptr = fragPtr;
+ operPtr.i = RNIL;
+ operPtr.p = NULL;
+ // do it
+ int ret = readAttributes(pagePtr.p, pageOffset, attrIds, numAttrs, dataOut, ZNIL, true);
+ // restore globals
+ tabptr = tabptr_old;
+ fragptr = fragptr_old;
+ operPtr = operPtr_old;
+ // done
+ if (ret == (Uint32)-1) {
+ ret = terrorCode ? (-(int)terrorCode) : -1;
}
+ return ret;
}
-void
-Dbtup::tuxReadKeys(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* pkSize, Uint32* pkData)
+int
+Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* dataOut)
{
ljamEntry();
+ // use own variables instead of globals
FragrecordPtr fragPtr;
fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
@@ -184,25 +187,45 @@ Dbtup::tuxReadKeys(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* pk
pagePtr.i = pageId;
ptrCheckGuard(pagePtr, cnoOfPage, page);
const Uint32 tabDescriptor = tablePtr.p->tabDescriptor;
- const Uint32 numAttrs = tablePtr.p->noOfKeyAttr;
const Uint32* attrIds = &tableDescriptor[tablePtr.p->readKeyArray].tabDescr;
- const Uint32* tupleHeader = &pagePtr.p->pageWord[pageOffset];
- Uint32 size = 0;
- for (Uint32 i = 0; i < numAttrs; i++) {
- AttributeHeader ah(attrIds[i]);
- const Uint32 attrId = ah.getAttributeId();
- const Uint32 index = tabDescriptor + (attrId << ZAD_LOG_SIZE);
- const Uint32 desc1 = tableDescriptor[index].tabDescr;
- const Uint32 desc2 = tableDescriptor[index + 1].tabDescr;
- ndbrequire(! AttributeDescriptor::getNullable(desc1));
- const Uint32 attrSize = AttributeDescriptor::getSizeInWords(desc1);
- const Uint32* attrData = tupleHeader + AttributeOffset::getOffset(desc2);
- for (Uint32 j = 0; j < attrSize; j++) {
- pkData[size + j] = attrData[j];
+ const Uint32 numAttrs = tablePtr.p->noOfKeyAttr;
+ // read pk attributes from original tuple
+ // save globals
+ TablerecPtr tabptr_old = tabptr;
+ FragrecordPtr fragptr_old = fragptr;
+ OperationrecPtr operPtr_old = operPtr;
+ // new globals
+ tabptr = tablePtr;
+ fragptr = fragPtr;
+ operPtr.i = RNIL;
+ operPtr.p = NULL;
+ // do it
+ int ret = readAttributes(pagePtr.p, pageOffset, attrIds, numAttrs, dataOut, ZNIL, true);
+ // restore globals
+ tabptr = tabptr_old;
+ fragptr = fragptr_old;
+ operPtr = operPtr_old;
+ // done
+ if (ret != (Uint32)-1) {
+ // remove headers
+ Uint32 n = 0;
+ Uint32 i = 0;
+ while (n < numAttrs) {
+ const AttributeHeader ah(dataOut[i]);
+ Uint32 size = ah.getDataSize();
+ ndbrequire(size != 0);
+ for (Uint32 j = 0; j < size; j++) {
+ dataOut[i + j - n] = dataOut[i + j + 1];
+ }
+ n += 1;
+ i += 1 + size;
}
- size += attrSize;
+ ndbrequire(i == ret);
+ ret -= numAttrs;
+ } else {
+ ret = terrorCode ? (-(int)terrorCode) : -1;
}
- *pkSize = size;
+ return ret;
}
bool
diff --git a/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp b/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp
index 09889a51fa3..efea312b865 100644
--- a/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp
+++ b/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp
@@ -20,12 +20,14 @@
#include <RefConvert.hpp>
#include <ndb_limits.h>
#include <pc.hpp>
+#include <signaldata/TupFrag.hpp>
#include <signaldata/FsConf.hpp>
#include <signaldata/FsRemoveReq.hpp>
#include <signaldata/DropTab.hpp>
#include <signaldata/AlterTab.hpp>
#include <AttributeDescriptor.hpp>
#include "AttributeOffset.hpp"
+#include <my_sys.h>
#define ljam() { jamLine(20000 + __LINE__); }
#define ljamEntry() { jamEntryLine(20000 + __LINE__); }
@@ -52,7 +54,10 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
/* Uint32 schemaVersion = signal->theData[8];*/
Uint32 noOfKeyAttr = signal->theData[9];
- Uint32 noOfNewAttr = signal->theData[10];
+ Uint32 noOfNewAttr = (signal->theData[10] & 0xFFFF);
+ /* DICT sends number of character sets in upper half */
+ Uint32 noOfCharsets = (signal->theData[10] >> 16);
+
Uint32 checksumIndicator = signal->theData[11];
Uint32 noOfAttributeGroups = signal->theData[12];
Uint32 globalCheckpointIdIndicator = signal->theData[13];
@@ -75,6 +80,7 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
fragOperPtr.p->attributeCount = noOfAttributes;
fragOperPtr.p->freeNullBit = noOfNullAttr;
fragOperPtr.p->noOfNewAttrCount = noOfNewAttr;
+ fragOperPtr.p->charsetIndex = 0;
ndbrequire(reqinfo == ZADDFRAG);
@@ -156,6 +162,7 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
regTabPtr.p->tupheadsize = regTabPtr.p->tupGCPIndex;
regTabPtr.p->noOfKeyAttr = noOfKeyAttr;
+ regTabPtr.p->noOfCharsets = noOfCharsets;
regTabPtr.p->noOfAttr = noOfAttributes;
regTabPtr.p->noOfNewAttr = noOfNewAttr;
regTabPtr.p->noOfNullAttr = noOfNullAttr;
@@ -163,13 +170,14 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
regTabPtr.p->notNullAttributeMask.clear();
- Uint32 tableDescriptorRef = allocTabDescr(noOfAttributes, noOfKeyAttr, noOfAttributeGroups);
+ Uint32 offset[10];
+ Uint32 tableDescriptorRef = allocTabDescr(regTabPtr.p, offset);
if (tableDescriptorRef == RNIL) {
ljam();
fragrefuse4Lab(signal, fragOperPtr, regFragPtr, regTabPtr.p, fragId);
return;
}//if
- setUpDescriptorReferences(tableDescriptorRef, regTabPtr.p);
+ setUpDescriptorReferences(tableDescriptorRef, regTabPtr.p, offset);
} else {
ljam();
fragOperPtr.p->definingFragment = false;
@@ -251,6 +259,9 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
ptrCheckGuard(fragOperPtr, cnoOfFragoprec, fragoperrec);
Uint32 attrId = signal->theData[2];
Uint32 attrDescriptor = signal->theData[3];
+ // DICT sends extended type (ignored) and charset number
+ Uint32 extType = (signal->theData[4] & 0xFF);
+ Uint32 csNumber = (signal->theData[4] >> 16);
regTabPtr.i = fragOperPtr.p->tableidFrag;
ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec);
@@ -304,6 +315,29 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
} else {
ndbrequire(false);
}//if
+ if (csNumber != 0) {
+ CHARSET_INFO* cs = get_charset(csNumber, MYF(0));
+ if (cs == NULL) {
+ ljam();
+ terrorCode = TupAddAttrRef::InvalidCharset;
+ addattrrefuseLab(signal, regFragPtr, fragOperPtr, regTabPtr.p, fragId);
+ return;
+ }
+ Uint32 i = 0;
+ while (i < fragOperPtr.p->charsetIndex) {
+ ljam();
+ if (regTabPtr.p->charsetArray[i] == cs)
+ break;
+ i++;
+ }
+ if (i == fragOperPtr.p->charsetIndex) {
+ ljam();
+ fragOperPtr.p->charsetIndex++;
+ }
+ ndbrequire(i < regTabPtr.p->noOfCharsets);
+ regTabPtr.p->charsetArray[i] = cs;
+ AttributeOffset::setCharsetPos(attrDes2, i);
+ }
setTabDescrWord(firstTabDesIndex + 1, attrDes2);
if (regTabPtr.p->tupheadsize > MAX_TUPLE_SIZE_IN_WORDS) {
@@ -340,20 +374,28 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
return;
}//Dbtup::execTUP_ADD_ATTRREQ()
+/*
+ * Descriptor has these parts:
+ *
+ * 0 readFunctionArray ( one for each attribute )
+ * 1 updateFunctionArray ( ditto )
+ * 2 charsetArray ( pointers to distinct CHARSET_INFO )
+ * 3 readKeyArray ( attribute ids of keys )
+ * 4 attributeGroupDescriptor ( currently size 1 but unused )
+ * 5 tabDescriptor ( attribute descriptors, each ZAD_SIZE )
+ */
+
void Dbtup::setUpDescriptorReferences(Uint32 descriptorReference,
- Tablerec* const regTabPtr)
+ Tablerec* const regTabPtr,
+ const Uint32* offset)
{
- Uint32 noOfAttributes = regTabPtr->noOfAttr;
- descriptorReference += ZTD_SIZE;
- ReadFunction * tmp = (ReadFunction*)&tableDescriptor[descriptorReference].tabDescr;
- regTabPtr->readFunctionArray = tmp;
- regTabPtr->updateFunctionArray = (UpdateFunction*)(tmp + noOfAttributes);
-
- TableDescriptor * start = &tableDescriptor[descriptorReference];
- TableDescriptor * end = (TableDescriptor*)(tmp + 2 * noOfAttributes);
- regTabPtr->readKeyArray = descriptorReference + (end - start);
- regTabPtr->attributeGroupDescriptor = regTabPtr->readKeyArray + regTabPtr->noOfKeyAttr;
- regTabPtr->tabDescriptor = regTabPtr->attributeGroupDescriptor + regTabPtr->noOfAttributeGroups;
+ Uint32* desc = &tableDescriptor[descriptorReference].tabDescr;
+ regTabPtr->readFunctionArray = (ReadFunction*)(desc + offset[0]);
+ regTabPtr->updateFunctionArray = (UpdateFunction*)(desc + offset[1]);
+ regTabPtr->charsetArray = (CHARSET_INFO**)(desc + offset[2]);
+ regTabPtr->readKeyArray = descriptorReference + offset[3];
+ regTabPtr->attributeGroupDescriptor = descriptorReference + offset[4];
+ regTabPtr->tabDescriptor = descriptorReference + offset[5];
}//Dbtup::setUpDescriptorReferences()
Uint32
@@ -491,14 +533,18 @@ void Dbtup::releaseTabDescr(Tablerec* const regTabPtr)
Uint32 descriptor = regTabPtr->readKeyArray;
if (descriptor != RNIL) {
ljam();
+ Uint32 offset[10];
+ getTabDescrOffsets(regTabPtr, offset);
+
regTabPtr->tabDescriptor = RNIL;
regTabPtr->readKeyArray = RNIL;
regTabPtr->readFunctionArray = NULL;
regTabPtr->updateFunctionArray = NULL;
+ regTabPtr->charsetArray = NULL;
regTabPtr->attributeGroupDescriptor= RNIL;
- Uint32 sizeFunctionArrays = 2 * (regTabPtr->noOfAttr * sizeOfReadFunction());
- descriptor -= (sizeFunctionArrays + ZTD_SIZE);
+ // move to start of descriptor
+ descriptor -= offset[3];
Uint32 retNo = getTabDescrWord(descriptor + ZTD_DATASIZE);
ndbrequire(getTabDescrWord(descriptor + ZTD_HEADER) == ZTD_TYPE_NORMAL);
ndbrequire(retNo == getTabDescrWord((descriptor + retNo) - ZTD_TR_SIZE));
diff --git a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
index cc47ef7e78f..a4e7cb47249 100644
--- a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
+++ b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
@@ -35,6 +35,7 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
for (Uint32 i = 0; i < regTabPtr->noOfAttr; i++) {
Uint32 attrDescriptorStart = startDescriptor + (i << ZAD_LOG_SIZE);
Uint32 attrDescriptor = tableDescriptor[attrDescriptorStart].tabDescr;
+ Uint32 attrOffset = tableDescriptor[attrDescriptorStart + 1].tabDescr;
if (!AttributeDescriptor::getDynamic(attrDescriptor)) {
if ((AttributeDescriptor::getArrayType(attrDescriptor) == ZNON_ARRAY) ||
(AttributeDescriptor::getArrayType(attrDescriptor) == ZFIXED_ARRAY)) {
@@ -54,6 +55,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
} else {
ndbrequire(false);
}//if
+ // replace read function of char attribute
+ if (AttributeOffset::getCharsetFlag(attrOffset)) {
+ ljam();
+ regTabPtr->readFunctionArray[i] = &Dbtup::readCharNotNULL;
+ }
} else {
if (AttributeDescriptor::getSizeInWords(attrDescriptor) == 1) {
ljam();
@@ -72,6 +78,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHZeroWordNULLable;
regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable;
}//if
+ // replace read function of char attribute
+ if (AttributeOffset::getCharsetFlag(attrOffset)) {
+ ljam();
+ regTabPtr->readFunctionArray[i] = &Dbtup::readCharNULLable;
+ }
}//if
} else if (AttributeDescriptor::getArrayType(attrDescriptor) == ZVAR_ARRAY) {
if (!AttributeDescriptor::getNullable(attrDescriptor)) {
@@ -146,10 +157,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
/* ---------------------------------------------------------------- */
int Dbtup::readAttributes(Page* const pagePtr,
Uint32 tupHeadOffset,
- Uint32* inBuffer,
+ const Uint32* inBuffer,
Uint32 inBufLen,
Uint32* outBuffer,
- Uint32 maxRead)
+ Uint32 maxRead,
+ bool xfrmFlag)
{
Tablerec* const regTabPtr = tabptr.p;
Uint32 numAttributes = regTabPtr->noOfAttr;
@@ -162,6 +174,7 @@ int Dbtup::readAttributes(Page* const pagePtr,
tCheckOffset = regTabPtr->tupheadsize;
tMaxRead = maxRead;
tTupleHeader = &pagePtr->pageWord[tupHeadOffset];
+ tXfrmFlag = xfrmFlag;
ndbrequire(tupHeadOffset + tCheckOffset <= ZWORDS_ON_PAGE);
while (inBufIndex < inBufLen) {
@@ -542,6 +555,74 @@ Dbtup::readDynSmallVarSize(Uint32* outBuffer,
return false;
}//Dbtup::readDynSmallVarSize()
+
+bool
+Dbtup::readCharNotNULL(Uint32* outBuffer,
+ AttributeHeader* ahOut,
+ Uint32 attrDescriptor,
+ Uint32 attrDes2)
+{
+ Uint32 indexBuf = tOutBufIndex;
+ Uint32 readOffset = AttributeOffset::getOffset(attrDes2);
+ Uint32 attrNoOfWords = AttributeDescriptor::getSizeInWords(attrDescriptor);
+ Uint32 newIndexBuf = indexBuf + attrNoOfWords;
+ Uint32 maxRead = tMaxRead;
+
+ ndbrequire((readOffset + attrNoOfWords - 1) < tCheckOffset);
+ if (newIndexBuf <= maxRead) {
+ ljam();
+ ahOut->setDataSize(attrNoOfWords);
+ if (! tXfrmFlag) {
+ MEMCOPY_NO_WORDS(&outBuffer[indexBuf],
+ &tTupleHeader[readOffset],
+ attrNoOfWords);
+ } else {
+ ljam();
+ Tablerec* regTabPtr = tabptr.p;
+ Uint32 i = AttributeOffset::getCharsetPos(attrDes2);
+ ndbrequire(i < tabptr.p->noOfCharsets);
+ // not const in MySQL
+ CHARSET_INFO* cs = tabptr.p->charsetArray[i];
+ // XXX should strip Uint32 null padding
+ const unsigned nBytes = attrNoOfWords << 2;
+ unsigned n =
+ (*cs->coll->strnxfrm)(cs,
+ (uchar*)&outBuffer[indexBuf],
+ nBytes,
+ (const uchar*)&tTupleHeader[readOffset],
+ nBytes);
+ // pad with ascii spaces
+ while (n < nBytes)
+ ((uchar*)&outBuffer[indexBuf])[n++] = 0x20;
+ }
+ tOutBufIndex = newIndexBuf;
+ return true;
+ } else {
+ ljam();
+ terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+ return false;
+ }
+}
+
+bool
+Dbtup::readCharNULLable(Uint32* outBuffer,
+ AttributeHeader* ahOut,
+ Uint32 attrDescriptor,
+ Uint32 attrDes2)
+{
+ if (!nullFlagCheck(attrDes2)) {
+ ljam();
+ return readCharNotNULL(outBuffer,
+ ahOut,
+ attrDescriptor,
+ attrDes2);
+ } else {
+ ljam();
+ ahOut->setNULL();
+ return true;
+ }
+}
+
/* ---------------------------------------------------------------------- */
/* THIS ROUTINE IS USED TO UPDATE A NUMBER OF ATTRIBUTES. IT IS */
/* USED BY THE INSERT ROUTINE, THE UPDATE ROUTINE AND IT CAN BE */
diff --git a/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp b/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp
index d31ab43f108..642ba270760 100644
--- a/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp
+++ b/ndb/src/kernel/blocks/dbtup/DbtupTabDesMan.cpp
@@ -31,12 +31,33 @@
/* memory attached to fragments (could be allocated per table */
/* instead. Performs its task by a buddy algorithm. */
/* **************************************************************** */
-Uint32 Dbtup::allocTabDescr(Uint32 noOfAttributes, Uint32 noOfKeyAttr, Uint32 noOfAttributeGroups)
+
+Uint32
+Dbtup::getTabDescrOffsets(const Tablerec* regTabPtr, Uint32* offset)
+{
+ // belongs to configure.in
+ unsigned sizeOfPointer = sizeof(CHARSET_INFO*);
+ ndbrequire((sizeOfPointer & 0x3) == 0);
+ sizeOfPointer = (sizeOfPointer >> 2);
+ // do in layout order and return offsets (see DbtupMeta.cpp)
+ Uint32 allocSize = 0;
+ // magically aligned to 8 bytes
+ offset[0] = allocSize += ZTD_SIZE;
+ offset[1] = allocSize += regTabPtr->noOfAttr * sizeOfReadFunction();
+ offset[2] = allocSize += regTabPtr->noOfAttr * sizeOfReadFunction();
+ offset[3] = allocSize += regTabPtr->noOfCharsets * sizeOfPointer;
+ offset[4] = allocSize += regTabPtr->noOfKeyAttr;
+ offset[5] = allocSize += regTabPtr->noOfAttributeGroups;
+ allocSize += regTabPtr->noOfAttr * ZAD_SIZE;
+ allocSize += ZTD_TRAILER_SIZE;
+ // return number of words
+ return allocSize;
+}
+
+Uint32 Dbtup::allocTabDescr(const Tablerec* regTabPtr, Uint32* offset)
{
Uint32 reference = RNIL;
- Uint32 allocSize = (ZTD_SIZE + ZTD_TRAILER_SIZE) + (noOfAttributes * ZAD_SIZE);
- allocSize += noOfAttributeGroups;
- allocSize += ((2 * noOfAttributes * sizeOfReadFunction()) + noOfKeyAttr);
+ Uint32 allocSize = getTabDescrOffsets(regTabPtr, offset);
/* ---------------------------------------------------------------- */
/* ALWAYS ALLOCATE A MULTIPLE OF 16 BYTES */
/* ---------------------------------------------------------------- */
diff --git a/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp b/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
index a93ff4566e7..c0b49364ee6 100644
--- a/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
+++ b/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
@@ -751,7 +751,8 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
&tableDescriptor[regTabPtr->readKeyArray].tabDescr,
regTabPtr->noOfKeyAttr,
keyBuffer,
- ZATTR_BUFFER_SIZE);
+ ZATTR_BUFFER_SIZE,
+ true);
ndbrequire(noPrimKey != (Uint32)-1);
Uint32 numAttrsToRead;
@@ -792,7 +793,8 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
&readBuffer[0],
numAttrsToRead,
mainBuffer,
- ZATTR_BUFFER_SIZE);
+ ZATTR_BUFFER_SIZE,
+ true);
ndbrequire(noMainWords != (Uint32)-1);
} else {
ljam();
@@ -816,7 +818,8 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
&readBuffer[0],
numAttrsToRead,
copyBuffer,
- ZATTR_BUFFER_SIZE);
+ ZATTR_BUFFER_SIZE,
+ true);
ndbrequire(noCopyWords != (Uint32)-1);
if ((noMainWords == noCopyWords) &&
diff --git a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
index 36ac20611bb..8dca52cec04 100644
--- a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
+++ b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
@@ -163,11 +163,6 @@ private:
static const unsigned AttributeHeaderSize = 1;
/*
- * Array of pointers to TUP table attributes. Always read-on|y.
- */
- typedef const Uint32** TableData;
-
- /*
* Logical tuple address, "local key". Identifies table tuples.
*/
typedef Uint32 TupAddr;
@@ -330,11 +325,15 @@ private:
/*
* Attribute metadata. Size must be multiple of word size.
+ *
+ * Prefix comparison of char data must use strxfrm and binary
+ * comparison. The charset is currently unused.
*/
struct DescAttr {
Uint32 m_attrDesc; // standard AttributeDescriptor
Uint16 m_primaryAttrId;
- Uint16 m_typeId;
+ unsigned m_typeId : 6;
+ unsigned m_charset : 10;
};
static const unsigned DescAttrSize = sizeof(DescAttr) >> 2;
@@ -553,9 +552,9 @@ private:
void execREAD_CONFIG_REQ(Signal* signal);
// utils
void setKeyAttrs(const Frag& frag);
- void readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, TableData keyData);
- void readTablePk(const Frag& frag, TreeEnt ent, unsigned& pkSize, Data pkData);
- void copyAttrs(const Frag& frag, TableData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize);
+ void readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData);
+ void readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize);
+ void copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize);
/*
* DbtuxMeta.cpp
@@ -622,17 +621,15 @@ private:
/*
* DbtuxSearch.cpp
*/
- void searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos);
- void searchToRemove(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos);
+ void searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
+ void searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
/*
* DbtuxCmp.cpp
*/
- int cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
- int cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, TableData entryKey);
+ int cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
- int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, TableData entryKey);
/*
* DbtuxDebug.cpp
@@ -679,17 +676,27 @@ private:
Uint32 c_typeOfStart;
/*
- * Array of index key attribute ids in AttributeHeader format.
- * Includes fixed attribute sizes. This is global data set at
- * operation start and is not passed as a parameter.
+ * Global data set at operation start. Unpacked from index metadata.
+ * Not passed as parameter to methods. Invalid across timeslices.
+ *
+ * TODO inline all into index metadata
*/
+
+ // index key attr ids with sizes in AttributeHeader format
Data c_keyAttrs;
- // buffer for search key data as pointers to TUP storage
- TableData c_searchKey;
+ // pointers to index key comparison functions
+ NdbSqlUtil::Cmp** c_sqlCmp;
+
+ /*
+ * Other buffers used during the operation.
+ */
+
+ // buffer for search key data with headers
+ Data c_searchKey;
- // buffer for current entry key data as pointers to TUP storage
- TableData c_entryKey;
+ // buffer for current entry key data with headers
+ Data c_entryKey;
// buffer for scan bounds and keyinfo (primary key)
Data c_dataBuffer;
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
index debb5252386..549720cc17c 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
@@ -18,21 +18,24 @@
#include "Dbtux.hpp"
/*
- * Search key vs node prefix.
+ * Search key vs node prefix or entry
*
- * The comparison starts at given attribute position (in fact 0). The
- * position is updated by number of equal initial attributes found. The
- * prefix may be partial in which case CmpUnknown may be returned.
+ * The comparison starts at given attribute position. The position is
+ * updated by number of equal initial attributes found. The entry data
+ * may be partial in which case CmpUnknown may be returned.
*/
int
-Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, ConstData entryData, unsigned maxlen)
+Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, ConstData entryData, unsigned maxlen)
{
const unsigned numAttrs = frag.m_numAttrs;
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
// number of words of attribute data left
unsigned len2 = maxlen;
- // skip to right position in search key
- searchKey += start;
+ // skip to right position in search key only
+ for (unsigned i = 0; i < start; i++) {
+ jam();
+ searchKey += AttributeHeaderSize + searchKey.ah().getDataSize();
+ }
int ret = 0;
while (start < numAttrs) {
if (len2 <= AttributeHeaderSize) {
@@ -41,22 +44,21 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons
break;
}
len2 -= AttributeHeaderSize;
- if (*searchKey != 0) {
+ if (! searchKey.ah().isNULL()) {
if (! entryData.ah().isNULL()) {
jam();
// current attribute
const DescAttr& descAttr = descEnt.m_descAttr[start];
- const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId);
- ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined);
// full data size
const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize());
const unsigned size2 = min(size1, len2);
len2 -= size2;
// compare
- const Uint32* const p1 = *searchKey;
+ NdbSqlUtil::Cmp* const cmp = c_sqlCmp[start];
+ const Uint32* const p1 = &searchKey[AttributeHeaderSize];
const Uint32* const p2 = &entryData[AttributeHeaderSize];
- ret = (*type.m_cmp)(p1, p2, size1, size2);
+ ret = (*cmp)(0, p1, p2, size1, size2);
if (ret != 0) {
jam();
break;
@@ -75,7 +77,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons
break;
}
}
- searchKey += 1;
+ searchKey += AttributeHeaderSize + searchKey.ah().getDataSize();
entryData += AttributeHeaderSize + entryData.ah().getDataSize();
start++;
}
@@ -83,60 +85,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, Cons
}
/*
- * Search key vs tree entry.
- *
- * Start position is updated as in previous routine.
- */
-int
-Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, TableData entryKey)
-{
- const unsigned numAttrs = frag.m_numAttrs;
- const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
- // skip to right position
- searchKey += start;
- entryKey += start;
- int ret = 0;
- while (start < numAttrs) {
- if (*searchKey != 0) {
- if (*entryKey != 0) {
- jam();
- // current attribute
- const DescAttr& descAttr = descEnt.m_descAttr[start];
- const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId);
- ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined);
- // full data size
- const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
- // compare
- const Uint32* const p1 = *searchKey;
- const Uint32* const p2 = *entryKey;
- ret = (*type.m_cmp)(p1, p2, size1, size1);
- if (ret != 0) {
- jam();
- break;
- }
- } else {
- jam();
- // not NULL > NULL
- ret = +1;
- break;
- }
- } else {
- if (*entryKey != 0) {
- jam();
- // NULL < not NULL
- ret = -1;
- break;
- }
- }
- searchKey += 1;
- entryKey += 1;
- start++;
- }
- return ret;
-}
-
-/*
- * Scan bound vs node prefix.
+ * Scan bound vs node prefix or entry.
*
* Compare lower or upper bound and index attribute data. The attribute
* data may be partial in which case CmpUnknown may be returned.
@@ -183,9 +132,8 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
jam();
// current attribute
const unsigned index = boundInfo.ah().getAttributeId();
+ ndbrequire(index < frag.m_numAttrs);
const DescAttr& descAttr = descEnt.m_descAttr[index];
- const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId);
- ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined);
ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
// full data size
const unsigned size1 = boundInfo.ah().getDataSize();
@@ -193,9 +141,10 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
const unsigned size2 = min(size1, len2);
len2 -= size2;
// compare
+ NdbSqlUtil::Cmp* const cmp = c_sqlCmp[index];
const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
const Uint32* const p2 = &entryData[AttributeHeaderSize];
- int ret = (*type.m_cmp)(p1, p2, size1, size2);
+ int ret = (*cmp)(0, p1, p2, size1, size2);
if (ret != 0) {
jam();
return ret;
@@ -244,72 +193,3 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
return +1;
}
}
-
-/*
- * Scan bound vs tree entry.
- */
-int
-Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, TableData entryKey)
-{
- const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
- // direction 0-lower 1-upper
- ndbrequire(dir <= 1);
- // initialize type to equality
- unsigned type = 4;
- while (boundCount != 0) {
- // get and skip bound type
- type = boundInfo[0];
- boundInfo += 1;
- if (! boundInfo.ah().isNULL()) {
- if (*entryKey != 0) {
- jam();
- // current attribute
- const unsigned index = boundInfo.ah().getAttributeId();
- const DescAttr& descAttr = descEnt.m_descAttr[index];
- const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId);
- ndbassert(type.m_typeId != NdbSqlUtil::Type::Undefined);
- // full data size
- const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
- // compare
- const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
- const Uint32* const p2 = *entryKey;
- int ret = (*type.m_cmp)(p1, p2, size1, size1);
- if (ret != 0) {
- jam();
- return ret;
- }
- } else {
- jam();
- // not NULL > NULL
- return +1;
- }
- } else {
- jam();
- if (*entryKey != 0) {
- jam();
- // NULL < not NULL
- return -1;
- }
- }
- boundInfo += AttributeHeaderSize + boundInfo.ah().getDataSize();
- entryKey += 1;
- boundCount -= 1;
- }
- if (dir == 0) {
- // lower bound
- jam();
- if (type == 1) {
- jam();
- return +1;
- }
- return -1;
- } else {
- // upper bound
- jam();
- if (type == 3) {
- jam();
- return -1;
- }
- return +1;
- }
-}
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
index 11f4f12b7f6..8d31d2c6a55 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
@@ -207,14 +207,10 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
}
// check ordering within node
for (unsigned j = 1; j < node.getOccup(); j++) {
- unsigned start = 0;
const TreeEnt ent1 = node.getEnt(j - 1);
const TreeEnt ent2 = node.getEnt(j);
- if (j == 1) {
- readKeyAttrs(frag, ent1, start, c_searchKey);
- } else {
- memcpy(c_searchKey, c_entryKey, frag.m_numAttrs << 2);
- }
+ unsigned start = 0;
+ readKeyAttrs(frag, ent1, start, c_searchKey);
readKeyAttrs(frag, ent2, start, c_entryKey);
int ret = cmpSearchKey(frag, start, c_searchKey, c_entryKey);
if (ret == 0)
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp
index f6f1610c8c1..39cd8e25184 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp
@@ -16,8 +16,6 @@
#define DBTUX_GEN_CPP
#include "Dbtux.hpp"
-#include <signaldata/TuxContinueB.hpp>
-#include <signaldata/TuxContinueB.hpp>
Dbtux::Dbtux(const Configuration& conf) :
SimulatedBlock(DBTUX, conf),
@@ -202,8 +200,9 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal)
}
// allocate buffers
c_keyAttrs = (Uint32*)allocRecord("c_keyAttrs", sizeof(Uint32), MaxIndexAttributes);
- c_searchKey = (TableData)allocRecord("c_searchKey", sizeof(Uint32*), MaxIndexAttributes);
- c_entryKey = (TableData)allocRecord("c_entryKey", sizeof(Uint32*), MaxIndexAttributes);
+ c_sqlCmp = (NdbSqlUtil::Cmp**)allocRecord("c_sqlCmp", sizeof(NdbSqlUtil::Cmp*), MaxIndexAttributes);
+ c_searchKey = (Uint32*)allocRecord("c_searchKey", sizeof(Uint32), MaxAttrDataSize);
+ c_entryKey = (Uint32*)allocRecord("c_entryKey", sizeof(Uint32), MaxAttrDataSize);
c_dataBuffer = (Uint32*)allocRecord("c_dataBuffer", sizeof(Uint64), (MaxAttrDataSize + 1) >> 1);
// ack
ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
@@ -218,7 +217,8 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal)
void
Dbtux::setKeyAttrs(const Frag& frag)
{
- Data keyAttrs = c_keyAttrs; // global
+ Data keyAttrs = c_keyAttrs; // global
+ NdbSqlUtil::Cmp** sqlCmp = c_sqlCmp; // global
const unsigned numAttrs = frag.m_numAttrs;
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
for (unsigned i = 0; i < numAttrs; i++) {
@@ -227,75 +227,71 @@ Dbtux::setKeyAttrs(const Frag& frag)
// set attr id and fixed size
keyAttrs.ah() = AttributeHeader(descAttr.m_primaryAttrId, size);
keyAttrs += 1;
+ // set comparison method pointer
+ const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getTypeBinary(descAttr.m_typeId);
+ ndbrequire(sqlType.m_cmp != 0);
+ *(sqlCmp++) = sqlType.m_cmp;
}
}
void
-Dbtux::readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, TableData keyData)
+Dbtux::readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData)
{
ConstData keyAttrs = c_keyAttrs; // global
const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit];
const TupLoc tupLoc = ent.m_tupLoc;
const Uint32 tupVersion = ent.m_tupVersion;
ndbrequire(start < frag.m_numAttrs);
- const unsigned numAttrs = frag.m_numAttrs - start;
- // start applies to both keys and output data
+ const Uint32 numAttrs = frag.m_numAttrs - start;
+ // skip to start position in keyAttrs only
keyAttrs += start;
- keyData += start;
- c_tup->tuxReadAttrs(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, tupVersion, numAttrs, keyAttrs, keyData);
+ int ret = c_tup->tuxReadAttrs(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, tupVersion, keyAttrs, numAttrs, keyData);
jamEntry();
+ // TODO handle error
+ ndbrequire(ret > 0);
}
void
-Dbtux::readTablePk(const Frag& frag, TreeEnt ent, unsigned& pkSize, Data pkData)
+Dbtux::readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize)
{
const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit];
const TupLoc tupLoc = ent.m_tupLoc;
- Uint32 size = 0;
- c_tup->tuxReadKeys(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, &size, pkData);
- ndbrequire(size != 0);
- pkSize = size;
+ int ret = c_tup->tuxReadPk(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, pkData);
+ jamEntry();
+ // TODO handle error
+ ndbrequire(ret > 0);
+ pkSize = ret;
}
/*
- * Input is pointers to table attributes. Output is array of attribute
- * data with headers. Copies whatever fits.
+ * Copy attribute data with headers. Input is all index key data.
+ * Copies whatever fits.
*/
void
-Dbtux::copyAttrs(const Frag& frag, TableData data1, Data data2, unsigned maxlen2)
+Dbtux::copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2)
{
- ConstData keyAttrs = c_keyAttrs; // global
- const unsigned numAttrs = frag.m_numAttrs;
+ unsigned n = frag.m_numAttrs;
unsigned len2 = maxlen2;
- for (unsigned n = 0; n < numAttrs; n++) {
+ while (n != 0) {
jam();
- const unsigned attrId = keyAttrs.ah().getAttributeId();
- const unsigned dataSize = keyAttrs.ah().getDataSize();
- const Uint32* const p1 = *data1;
- if (p1 != 0) {
- if (len2 == 0)
- return;
- data2.ah() = AttributeHeader(attrId, dataSize);
- data2 += 1;
- len2 -= 1;
- unsigned n = dataSize;
- for (unsigned i = 0; i < dataSize; i++) {
- if (len2 == 0)
- return;
- *data2 = p1[i];
- data2 += 1;
- len2 -= 1;
- }
- } else {
+ const unsigned dataSize = data1.ah().getDataSize();
+ // copy header
+ if (len2 == 0)
+ return;
+ data2[0] = data1[0];
+ data1 += 1;
+ data2 += 1;
+ len2 -= 1;
+ // copy data
+ for (unsigned i = 0; i < dataSize; i++) {
if (len2 == 0)
return;
- data2.ah() = AttributeHeader(attrId, 0);
- data2.ah().setNULL();
- data2 += 1;
+ data2[i] = data1[i];
len2 -= 1;
}
- keyAttrs += 1;
- data1 += 1;
+ data1 += dataSize;
+ data2 += dataSize;
+ n -= 1;
}
#ifdef VM_TRACE
memset(data2, DataFillByte, len2 << 2);
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp
index 4bb3b940d91..3c0af3ca79d 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp
@@ -178,19 +178,31 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal)
descAttr.m_attrDesc = req->attrDescriptor;
descAttr.m_primaryAttrId = req->primaryAttrId;
descAttr.m_typeId = req->extTypeInfo & 0xFF;
+ descAttr.m_charset = (req->extTypeInfo >> 16);
#ifdef VM_TRACE
if (debugFlags & DebugMeta) {
debugOut << "Add frag " << fragPtr.i << " attr " << attrId << " " << descAttr << endl;
}
#endif
- // check if type is valid and has a comparison method
- const NdbSqlUtil::Type& type = NdbSqlUtil::getType(descAttr.m_typeId);
+ // check that type is valid and has a binary comparison method
+ const NdbSqlUtil::Type& type = NdbSqlUtil::getTypeBinary(descAttr.m_typeId);
if (type.m_typeId == NdbSqlUtil::Type::Undefined ||
type.m_cmp == 0) {
jam();
errorCode = TuxAddAttrRef::InvalidAttributeType;
break;
}
+#ifdef dbtux_uses_charset
+ if (descAttr.m_charset != 0) {
+ CHARSET_INFO *cs = get_charset(descAttr.m_charset, MYF(0));
+ // here use the non-binary type
+ if (! NdbSqlUtil::usable_in_ordered_index(descAttr.m_typeId, cs)) {
+ jam();
+ errorCode = TuxAddAttrRef::InvalidCharset;
+ break;
+ }
+ }
+#endif
if (indexPtr.p->m_numAttrs == fragOpPtr.p->m_numAttrsRecvd) {
jam();
// initialize tree header
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
index c4c33ff931f..5b161d3c4ce 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
@@ -112,6 +112,7 @@ Dbtux::execACC_SCANREQ(Signal* signal)
void
Dbtux::execTUX_BOUND_INFO(Signal* signal)
{
+ jamEntry();
struct BoundInfo {
unsigned offset;
unsigned size;
@@ -389,7 +390,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
jam();
const TreeEnt ent = scan.m_scanPos.m_ent;
// read tuple key
- readTablePk(frag, ent, pkSize, pkData);
+ readTablePk(frag, ent, pkData, pkSize);
// get read lock or exclusive lock
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
@@ -480,7 +481,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
jam();
if (pkSize == 0) {
jam();
- readTablePk(frag, ent, pkSize, pkData);
+ readTablePk(frag, ent, pkData, pkSize);
}
}
// conf signal
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
index 84048b308bc..bffbb8f5594 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
@@ -25,7 +25,7 @@
* TODO optimize for initial equal attrs in node min/max
*/
void
-Dbtux::searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos)
+Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
@@ -144,7 +144,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt sear
* to it.
*/
void
-Dbtux::searchToRemove(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos)
+Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
diff --git a/ndb/src/kernel/blocks/dbtux/Times.txt b/ndb/src/kernel/blocks/dbtux/Times.txt
index 84819ddcf97..03473353a52 100644
--- a/ndb/src/kernel/blocks/dbtux/Times.txt
+++ b/ndb/src/kernel/blocks/dbtux/Times.txt
@@ -83,7 +83,7 @@ optim 13 mc02/a 39 ms 59 ms 50 pct
mc02/c 9 ms 12 ms 44 pct
mc02/d 246 ms 289 ms 17 pct
-[ case d: what happened to PK read performance? ]
+[ case d: bug in testOIBasic killed PK read performance ]
optim 14 mc02/a 41 ms 60 ms 44 pct
mc02/b 46 ms 81 ms 73 pct
@@ -91,5 +91,21 @@ optim 14 mc02/a 41 ms 60 ms 44 pct
mc02/d 242 ms 285 ms 17 pct
[ case b: do long keys suffer from many subroutine calls? ]
+[ case d: bug in testOIBasic killed PK read performance ]
+
+none mc02/a 35 ms 60 ms 71 pct
+ mc02/b 42 ms 75 ms 76 pct
+ mc02/c 5 ms 12 ms 106 pct
+ mc02/d 165 ms 238 ms 44 pct
+
+[ johan re-installed mc02 as fedora gcc-3.3.2 ]
+[ case c: table scan has improved... ]
+
+charsets mc02/a 35 ms 60 ms 71 pct
+ mc02/b 42 ms 84 ms 97 pct
+ mc02/c 5 ms 12 ms 109 pct
+ mc02/d 190 ms 236 ms 23 pct
+
+[ case b: TUX can no longer use pointers to TUP data ]
vim: set et:
diff --git a/ndb/src/kernel/vm/MetaData.hpp b/ndb/src/kernel/vm/MetaData.hpp
index f6a941e8f9f..11e262664c1 100644
--- a/ndb/src/kernel/vm/MetaData.hpp
+++ b/ndb/src/kernel/vm/MetaData.hpp
@@ -107,6 +107,9 @@ public:
/* Number of primary key attributes (should be computed) */
Uint16 noOfPrimkey;
+ /* Number of distinct character sets (computed) */
+ Uint16 noOfCharsets;
+
/* Length of primary key in words (should be computed) */
/* For ordered index this is tree node size in words */
Uint16 tupKeyLength;
diff --git a/ndb/src/ndbapi/NdbIndexOperation.cpp b/ndb/src/ndbapi/NdbIndexOperation.cpp
index 0742f8d911c..bf4b07842f6 100644
--- a/ndb/src/ndbapi/NdbIndexOperation.cpp
+++ b/ndb/src/ndbapi/NdbIndexOperation.cpp
@@ -164,6 +164,7 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
Uint32 tData;
Uint32 tKeyInfoPosition;
const char* aValue = aValuePassed;
+ Uint32 xfrmData[1024];
Uint32 tempData[1024];
if ((theStatus == OperationDefined) &&
@@ -224,6 +225,21 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
m_theIndexDefined[i][2] = true;
Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
+ const char* aValueToWrite = aValue;
+
+ CHARSET_INFO* cs = tAttrInfo->m_cs;
+ if (cs != 0) {
+ // current limitation: strxfrm does not increase length
+ assert(cs->strxfrm_multiply == 1);
+ unsigned n =
+ (*cs->coll->strnxfrm)(cs,
+ (uchar*)xfrmData, sizeof(xfrmData),
+ (const uchar*)aValue, sizeInBytes);
+ while (n < sizeInBytes)
+ ((uchar*)xfrmData)[n++] = 0x20;
+ aValue = (char*)xfrmData;
+ }
+
Uint32 bitsInLastWord = 8 * (sizeInBytes & 3) ;
Uint32 totalSizeInWords = (sizeInBytes + 3)/4;// Inc. bits in last word
Uint32 sizeInWords = sizeInBytes / 4; // Exc. bits in last word
@@ -314,13 +330,20 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
if ((tOpType == InsertRequest) ||
(tOpType == WriteRequest)) {
if (!tAttrInfo->m_indexOnly){
+ // invalid data can crash kernel
+ if (cs != NULL &&
+ (*cs->cset->well_formed_len)(cs,
+ aValueToWrite,
+ aValueToWrite + sizeInBytes,
+ sizeInBytes) != sizeInBytes)
+ goto equal_error4;
Uint32 ahValue;
Uint32 sz = totalSizeInWords;
AttributeHeader::init(&ahValue, tAttrId, sz);
insertATTRINFO( ahValue );
- insertATTRINFOloop((Uint32*)aValue, sizeInWords);
+ insertATTRINFOloop((Uint32*)aValueToWrite, sizeInWords);
if (bitsInLastWord != 0) {
- tData = *(Uint32*)(aValue + (sizeInWords << 2));
+ tData = *(Uint32*)(aValueToWrite + (sizeInWords << 2));
tData = convertEndian(tData);
tData = tData & ((1 << bitsInLastWord) - 1);
tData = convertEndian(tData);
@@ -411,7 +434,10 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
equal_error3:
setErrorCodeAbort(4209);
-
+ return -1;
+
+ equal_error4:
+ setErrorCodeAbort(744);
return -1;
}
diff --git a/ndb/src/ndbapi/NdbOperationDefine.cpp b/ndb/src/ndbapi/NdbOperationDefine.cpp
index 6d995e06582..ad838ddd601 100644
--- a/ndb/src/ndbapi/NdbOperationDefine.cpp
+++ b/ndb/src/ndbapi/NdbOperationDefine.cpp
@@ -492,6 +492,17 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo,
// Insert Attribute Id into ATTRINFO part.
const Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
+
+ CHARSET_INFO* cs = tAttrInfo->m_cs;
+ // invalid data can crash kernel
+ if (cs != NULL &&
+ (*cs->cset->well_formed_len)(cs,
+ aValue,
+ aValue + sizeInBytes,
+ sizeInBytes) != sizeInBytes) {
+ setErrorCodeAbort(744);
+ return -1;
+ }
#if 0
tAttrSize = tAttrInfo->theAttrSize;
tArraySize = tAttrInfo->theArraySize;
diff --git a/ndb/src/ndbapi/NdbOperationSearch.cpp b/ndb/src/ndbapi/NdbOperationSearch.cpp
index 19cb133dbf7..e5166fc4a82 100644
--- a/ndb/src/ndbapi/NdbOperationSearch.cpp
+++ b/ndb/src/ndbapi/NdbOperationSearch.cpp
@@ -60,6 +60,7 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
Uint32 tData;
Uint32 tKeyInfoPosition;
const char* aValue = aValuePassed;
+ Uint32 xfrmData[1024];
Uint32 tempData[1024];
if ((theStatus == OperationDefined) &&
@@ -117,6 +118,21 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
theTupleKeyDefined[i][2] = true;
Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
+ const char* aValueToWrite = aValue;
+
+ CHARSET_INFO* cs = tAttrInfo->m_cs;
+ if (cs != 0) {
+ // current limitation: strxfrm does not increase length
+ assert(cs->strxfrm_multiply == 1);
+ unsigned n =
+ (*cs->coll->strnxfrm)(cs,
+ (uchar*)xfrmData, sizeof(xfrmData),
+ (const uchar*)aValue, sizeInBytes);
+ while (n < sizeInBytes)
+ ((uchar*)xfrmData)[n++] = 0x20;
+ aValue = (char*)xfrmData;
+ }
+
Uint32 bitsInLastWord = 8 * (sizeInBytes & 3) ;
Uint32 totalSizeInWords = (sizeInBytes + 3)/4; // Inc. bits in last word
Uint32 sizeInWords = sizeInBytes / 4; // Exc. bits in last word
@@ -206,13 +222,20 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
if ((tOpType == InsertRequest) ||
(tOpType == WriteRequest)) {
if (!tAttrInfo->m_indexOnly){
+ // invalid data can crash kernel
+ if (cs != NULL &&
+ (*cs->cset->well_formed_len)(cs,
+ aValueToWrite,
+ aValueToWrite + sizeInBytes,
+ sizeInBytes) != sizeInBytes)
+ goto equal_error4;
Uint32 ahValue;
const Uint32 sz = totalSizeInWords;
AttributeHeader::init(&ahValue, tAttrId, sz);
insertATTRINFO( ahValue );
- insertATTRINFOloop((Uint32*)aValue, sizeInWords);
+ insertATTRINFOloop((Uint32*)aValueToWrite, sizeInWords);
if (bitsInLastWord != 0) {
- tData = *(Uint32*)(aValue + (sizeInWords << 2));
+ tData = *(Uint32*)(aValueToWrite + (sizeInWords << 2));
tData = convertEndian(tData);
tData = tData & ((1 << bitsInLastWord) - 1);
tData = convertEndian(tData);
@@ -311,6 +334,10 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
equal_error3:
setErrorCodeAbort(4209);
return -1;
+
+ equal_error4:
+ setErrorCodeAbort(744);
+ return -1;
}
/******************************************************************************
diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp
index 86c174c4545..ac5f4268386 100644
--- a/ndb/src/ndbapi/NdbScanOperation.cpp
+++ b/ndb/src/ndbapi/NdbScanOperation.cpp
@@ -1096,30 +1096,43 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
theStatus == SetBound &&
(0 <= type && type <= 4) &&
len <= 8000) {
- // bound type
-
+ // insert bound type
insertATTRINFO(type);
- // attribute header
Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
+ // normalize char bound
+ CHARSET_INFO* cs = tAttrInfo->m_cs;
+ Uint32 xfrmData[2000];
+ if (cs != NULL && aValue != NULL) {
+ // current limitation: strxfrm does not increase length
+ assert(cs->strxfrm_multiply == 1);
+ unsigned n =
+ (*cs->coll->strnxfrm)(cs,
+ (uchar*)xfrmData, sizeof(xfrmData),
+ (const uchar*)aValue, sizeInBytes);
+ while (n < sizeInBytes)
+ ((uchar*)xfrmData)[n++] = 0x20;
+ aValue = (char*)xfrmData;
+ }
if (len != sizeInBytes && (len != 0)) {
setErrorCodeAbort(4209);
return -1;
}
+ // insert attribute header
len = aValue != NULL ? sizeInBytes : 0;
Uint32 tIndexAttrId = tAttrInfo->m_attrId;
Uint32 sizeInWords = (len + 3) / 4;
AttributeHeader ah(tIndexAttrId, sizeInWords);
insertATTRINFO(ah.m_value);
if (len != 0) {
- // attribute data
+ // insert attribute data
if ((UintPtr(aValue) & 0x3) == 0 && (len & 0x3) == 0)
insertATTRINFOloop((const Uint32*)aValue, sizeInWords);
else {
- Uint32 temp[2000];
- memcpy(temp, aValue, len);
+ Uint32 tempData[2000];
+ memcpy(tempData, aValue, len);
while ((len & 0x3) != 0)
- ((char*)temp)[len++] = 0;
- insertATTRINFOloop(temp, sizeInWords);
+ ((char*)tempData)[len++] = 0;
+ insertATTRINFOloop(tempData, sizeInWords);
}
}
@@ -1206,11 +1219,11 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
if((r1_null ^ (unsigned)r2->isNULL())){
return (r1_null ? -1 : 1);
}
- Uint32 type = NdbColumnImpl::getImpl(* r1->m_column).m_extType;
+ const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column);
Uint32 size = (r1->theAttrSize * r1->theArraySize + 3) / 4;
if(!r1_null){
- const NdbSqlUtil::Type& t = NdbSqlUtil::getType(type);
- int r = (*t.m_cmp)(d1, d2, size, size);
+ const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(col.m_extType);
+ int r = (*sqlType.m_cmp)(col.m_cs, d1, d2, size, size);
if(r){
assert(r != NdbSqlUtil::CmpUnknown);
return r;
diff --git a/ndb/src/ndbapi/ndberror.c b/ndb/src/ndbapi/ndberror.c
index 2ebcf4be444..037c441cc38 100644
--- a/ndb/src/ndbapi/ndberror.c
+++ b/ndb/src/ndbapi/ndberror.c
@@ -282,7 +282,7 @@ ErrorBundle ErrorCodes[] = {
{ 741, SE, "Unsupported alter table" },
{ 742, SE, "Unsupported attribute type in index" },
{ 743, SE, "Unsupported character set in table or index" },
- { 744, SE, "Character conversion error" },
+ { 744, SE, "Character string is invalid for given character set" },
{ 241, SE, "Invalid schema object version" },
{ 283, SE, "Table is being dropped" },
{ 284, SE, "Table not defined in transaction coordinator" },
diff --git a/ndb/test/ndbapi/testOIBasic.cpp b/ndb/test/ndbapi/testOIBasic.cpp
index ac28b96af80..f9eb3514926 100644
--- a/ndb/test/ndbapi/testOIBasic.cpp
+++ b/ndb/test/ndbapi/testOIBasic.cpp
@@ -28,6 +28,7 @@
#include <NdbCondition.h>
#include <NdbThread.h>
#include <NdbTick.h>
+#include <my_sys.h>
// options
@@ -37,6 +38,8 @@ struct Opt {
const char* m_bound;
const char* m_case;
bool m_core;
+ const char* m_csname;
+ CHARSET_INFO* m_cs;
bool m_dups;
NdbDictionary::Object::FragmentType m_fragtype;
unsigned m_idxloop;
@@ -59,6 +62,8 @@ struct Opt {
m_bound("01234"),
m_case(0),
m_core(false),
+ m_csname("latin1_bin"),
+ m_cs(0),
m_dups(false),
m_fragtype(NdbDictionary::Object::FragUndefined),
m_idxloop(4),
@@ -94,6 +99,7 @@ printhelp()
<< " -bound xyz use only these bound types 0-4 [" << d.m_bound << "]" << endl
<< " -case abc only given test cases (letters a-z)" << endl
<< " -core core dump on error [" << d.m_core << "]" << endl
+ << " -csname S charset (collation) of non-pk char column [" << d.m_csname << "]" << endl
<< " -dups allow duplicate tuples from index scan [" << d.m_dups << "]" << endl
<< " -fragtype T fragment type single/small/medium/large" << endl
<< " -index xyz only given index numbers (digits 1-9)" << endl
@@ -983,6 +989,10 @@ createtable(Par par)
c.setLength(col.m_length);
c.setPrimaryKey(col.m_pk);
c.setNullable(col.m_nullable);
+ if (c.getCharset()) { // test if char type
+ if (! col.m_pk)
+ c.setCharset(par.m_cs);
+ }
t.addColumn(c);
}
con.m_dic = con.m_ndb->getDictionary();
@@ -3149,6 +3159,10 @@ runtest(Par par)
LL1("start");
if (par.m_seed != 0)
srandom(par.m_seed);
+ assert(par.m_csname != 0);
+ CHARSET_INFO* cs;
+ CHK((cs = get_charset_by_name(par.m_csname, MYF(0))) != 0 || (cs = get_charset_by_csname(par.m_csname, MY_CS_PRIMARY, MYF(0))) != 0);
+ par.m_cs = cs;
Con con;
CHK(con.connect() == 0);
par.m_con = &con;
@@ -3232,6 +3246,12 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
g_opt.m_core = true;
continue;
}
+ if (strcmp(arg, "-csname") == 0) {
+ if (++argv, --argc > 0) {
+ g_opt.m_csname = strdup(argv[0]);
+ continue;
+ }
+ }
if (strcmp(arg, "-dups") == 0) {
g_opt.m_dups = true;
continue;