diff options
Diffstat (limited to 'storage/spider/hs_client/hstcpcli.cpp')
-rw-r--r-- | storage/spider/hs_client/hstcpcli.cpp | 667 |
1 files changed, 0 insertions, 667 deletions
diff --git a/storage/spider/hs_client/hstcpcli.cpp b/storage/spider/hs_client/hstcpcli.cpp deleted file mode 100644 index b3dea208f8f..00000000000 --- a/storage/spider/hs_client/hstcpcli.cpp +++ /dev/null @@ -1,667 +0,0 @@ - -// vim:sw=2:ai - -/* - * Copyright (C) 2010-2011 DeNA Co.,Ltd.. All rights reserved. - * Copyright (C) 2011-2017 Kentoku SHIBA - * See COPYRIGHT.txt for details. - */ - -#include <my_global.h> -#include "mysql_version.h" -#include "hs_compat.h" -#if MYSQL_VERSION_ID < 50500 -#include "mysql_priv.h" -#include <mysql/plugin.h> -#else -#include "sql_priv.h" -#include "probes_mysql.h" -#include "sql_class.h" -#endif - -#include "hstcpcli.hpp" -#include "auto_file.hpp" -#include "string_util.hpp" -#include "auto_addrinfo.hpp" -#include "escape.hpp" -#include "util.hpp" - -/* TODO */ -#if !defined(__linux__) && !defined(__FreeBSD__) && !defined(MSG_NOSIGNAL) -#define MSG_NOSIGNAL 0 -#endif - -#define DBG(x) - -namespace dena { - -hstresult::hstresult() -{ - SPD_INIT_DYNAMIC_ARRAY2(&flds, sizeof(string_ref), NULL, 16, 16, - MYF(MY_WME)); -} - -hstresult::~hstresult() -{ - delete_dynamic(&flds); -} - -struct hstcpcli : public hstcpcli_i, private noncopyable { - hstcpcli(const socket_args& args); - virtual ~hstcpcli(); - virtual void close(); - virtual int reconnect(); - virtual bool stable_point(); - virtual void request_buf_open_index(size_t pst_id, const char *dbn, - const char *tbl, const char *idx, const char *retflds, const char *filflds); - virtual void request_buf_auth(const char *secret, const char *typ); - virtual void request_buf_exec_generic(size_t pst_id, const string_ref& op, - const string_ref *kvs, size_t kvslen, uint32 limit, uint32 skip, - const string_ref& mod_op, const string_ref *mvs, size_t mvslen, - const hstcpcli_filter *fils, size_t filslen, int invalues_keypart, - const string_ref *invalues, size_t invalueslen); - virtual size_t request_buf_append(const char *start, const char *finish); - virtual void request_reset(); - virtual int request_send(); - virtual int response_recv(size_t& num_flds_r); - virtual int get_result(hstresult& result); - virtual const string_ref *get_next_row(); - virtual const string_ref *get_next_row_from_result(hstresult& result); - virtual size_t get_row_size(); - virtual size_t get_row_size_from_result(hstresult& result); - virtual void response_buf_remove(); - virtual int get_error_code(); - virtual String& get_error(); - virtual void clear_error(); - virtual int set_timeout(int send_timeout, int recv_timeout); - virtual size_t get_num_req_bufd() { return num_req_bufd; } - virtual size_t get_num_req_sent() { return num_req_sent; } - virtual size_t get_num_req_rcvd() { return num_req_rcvd; } - virtual size_t get_response_end_offset() { return response_end_offset; } - virtual const char *get_readbuf_begin() { return readbuf.begin(); } - virtual const char *get_readbuf_end() { return readbuf.end(); } - virtual const char *get_writebuf_begin() { return writebuf.begin(); } - virtual size_t get_writebuf_size() { return writebuf.size(); } - virtual void write_error_to_log(const char *func_name, const char *file_name, - ulong line_no); - private: - int read_more(); - int set_error(int code, const String& str); - int set_error(int code, const char *str); - private: - auto_file fd; - socket_args sargs; - string_buffer readbuf; - string_buffer writebuf; - size_t response_end_offset; /* incl newline */ - size_t cur_row_offset; - size_t cur_row_size; - size_t num_flds; - size_t num_req_bufd; /* buffered but not yet sent */ - size_t num_req_sent; /* sent but not yet received */ - size_t num_req_rcvd; /* received but not yet removed */ - int error_code; - String error_str; - DYNAMIC_ARRAY flds; - int errno_buf; -}; - -hstcpcli::hstcpcli(const socket_args& args) - : sargs(args), response_end_offset(0), cur_row_offset(0), cur_row_size(0), - num_flds(0), num_req_bufd(0), num_req_sent(0), num_req_rcvd(0), - error_code(0), errno_buf(0) -{ - String err; - SPD_INIT_DYNAMIC_ARRAY2(&flds, sizeof(string_ref), NULL, 16, 16, MYF(MY_WME)); - if (socket_connect(fd, sargs, err) != 0) { - set_error(-1, err); - } -} - -hstcpcli::~hstcpcli() -{ - delete_dynamic(&flds); -} - -void -hstcpcli::close() -{ - fd.close(); - readbuf.clear(); - writebuf.clear(); - response_end_offset = 0; - cur_row_offset = 0; - num_flds = 0; - num_req_bufd = 0; - num_req_sent = 0; - num_req_rcvd = 0; -} - -int -hstcpcli::reconnect() -{ - clear_error(); - close(); - String err; - if (socket_connect(fd, sargs, err) != 0) { - set_error(-1, err); - } - return error_code; -} - -int -hstcpcli::set_timeout(int send_timeout, int recv_timeout) -{ - String err; - sargs.send_timeout = send_timeout; - sargs.recv_timeout = recv_timeout; - if (socket_set_timeout(fd, sargs, err) != 0) { - set_error(-1, err); - } - return error_code; -} - -bool -hstcpcli::stable_point() -{ - /* returns true if cli can send a new request */ - return fd.get() >= 0 && num_req_bufd == 0 && num_req_sent == 0 && - num_req_rcvd == 0 && response_end_offset == 0; -} - -int -hstcpcli::get_error_code() -{ - return error_code; -} - -String& -hstcpcli::get_error() -{ - return error_str; -} - -int -hstcpcli::read_more() -{ - const size_t block_size = 4096; // FIXME - char *const wp = readbuf.make_space(block_size); - int rlen; - errno = 0; - while ((rlen = read(fd.get(), wp, block_size)) <= 0) { - errno_buf = errno; - if (rlen < 0) { - if (errno == EINTR || errno == EAGAIN) - { - errno = 0; - continue; - } - error_str = String(STRING_WITH_LEN("read: failed"), &my_charset_bin); - } else { - error_str = String(STRING_WITH_LEN("read: eof"), &my_charset_bin); - } - return rlen; - } - readbuf.space_wrote(rlen); - return rlen; -} - -void -hstcpcli::clear_error() -{ - DBG(fprintf(stderr, "CLEAR_ERROR: %d\n", error_code)); - error_code = 0; - error_str.length(0); -} - -int -hstcpcli::set_error(int code, const String& str) -{ - DBG(fprintf(stderr, "SET_ERROR: %d\n", code)); - error_code = code; - error_str = str; - return error_code; -} - -int -hstcpcli::set_error(int code, const char *str) -{ - uint32 str_len = strlen(str); - DBG(fprintf(stderr, "SET_ERROR: %d\n", code)); - error_code = code; - error_str.length(0); - if (error_str.reserve(str_len + 1)) - return 0; - error_str.q_append(str, str_len); - error_str.c_ptr_safe(); - return error_code; -} - -void -hstcpcli::request_buf_open_index(size_t pst_id, const char *dbn, - const char *tbl, const char *idx, const char *retflds, const char *filflds) -{ -/* - if (num_req_sent > 0 || num_req_rcvd > 0) { -*/ - if (num_req_rcvd > 0) { - close(); - set_error(-1, "request_buf_open_index: protocol out of sync"); - return; - } - const string_ref dbn_ref(dbn, strlen(dbn)); - const string_ref tbl_ref(tbl, strlen(tbl)); - const string_ref idx_ref(idx, strlen(idx)); - const string_ref rfs_ref(retflds, strlen(retflds)); - writebuf.append_literal("P\t"); - append_uint32(writebuf, pst_id); // FIXME size_t ? - writebuf.append_literal("\t"); - writebuf.append(dbn_ref.begin(), dbn_ref.end()); - writebuf.append_literal("\t"); - writebuf.append(tbl_ref.begin(), tbl_ref.end()); - writebuf.append_literal("\t"); - writebuf.append(idx_ref.begin(), idx_ref.end()); - writebuf.append_literal("\t"); - writebuf.append(rfs_ref.begin(), rfs_ref.end()); - if (filflds != 0) { - const string_ref fls_ref(filflds, strlen(filflds)); - writebuf.append_literal("\t"); - writebuf.append(fls_ref.begin(), fls_ref.end()); - } - writebuf.append_literal("\n"); - ++num_req_bufd; -} - -void -hstcpcli::request_buf_auth(const char *secret, const char *typ) -{ -/* - if (num_req_sent > 0 || num_req_rcvd > 0) { -*/ - if (num_req_rcvd > 0) { - close(); - set_error(-1, "request_buf_auth: protocol out of sync"); - return; - } - if (typ == 0) { - typ = "1"; - } - const string_ref typ_ref(typ, strlen(typ)); - const string_ref secret_ref(secret, strlen(secret)); - writebuf.append_literal("A\t"); - writebuf.append(typ_ref.begin(), typ_ref.end()); - writebuf.append_literal("\t"); - writebuf.append(secret_ref.begin(), secret_ref.end()); - writebuf.append_literal("\n"); - ++num_req_bufd; -} - -namespace { - -void -append_delim_value(string_buffer& buf, const char *start, const char *finish) -{ - if (start == 0) { - /* null */ - const char t[] = "\t\0"; - buf.append(t, t + 2); - } else { - /* non-null */ - buf.append_literal("\t"); - escape_string(buf, start, finish); - } -} - -}; - -void -hstcpcli::request_buf_exec_generic(size_t pst_id, const string_ref& op, - const string_ref *kvs, size_t kvslen, uint32 limit, uint32 skip, - const string_ref& mod_op, const string_ref *mvs, size_t mvslen, - const hstcpcli_filter *fils, size_t filslen, int invalues_keypart, - const string_ref *invalues, size_t invalueslen) -{ -/* - if (num_req_sent > 0 || num_req_rcvd > 0) { -*/ - if (num_req_rcvd > 0) { - close(); - set_error(-1, "request_buf_exec_generic: protocol out of sync"); - return; - } - append_uint32(writebuf, pst_id); // FIXME size_t ? - writebuf.append_literal("\t"); - writebuf.append(op.begin(), op.end()); - writebuf.append_literal("\t"); - append_uint32(writebuf, kvslen); // FIXME size_t ? - for (size_t i = 0; i < kvslen; ++i) { - const string_ref& kv = kvs[i]; - append_delim_value(writebuf, kv.begin(), kv.end()); - } - if (limit != 0 || skip != 0 || invalues_keypart >= 0 || - mod_op.size() != 0 || filslen != 0) { - /* has more option */ - writebuf.append_literal("\t"); - append_uint32(writebuf, limit); // FIXME size_t ? - if (skip != 0 || invalues_keypart >= 0 || - mod_op.size() != 0 || filslen != 0) { - writebuf.append_literal("\t"); - append_uint32(writebuf, skip); // FIXME size_t ? - } - if (invalues_keypart >= 0) { - writebuf.append_literal("\t@\t"); - append_uint32(writebuf, invalues_keypart); - writebuf.append_literal("\t"); - append_uint32(writebuf, invalueslen); - for (size_t i = 0; i < invalueslen; ++i) { - const string_ref& s = invalues[i]; - append_delim_value(writebuf, s.begin(), s.end()); - } - } - for (size_t i = 0; i < filslen; ++i) { - const hstcpcli_filter& f = fils[i]; - writebuf.append_literal("\t"); - writebuf.append(f.filter_type.begin(), f.filter_type.end()); - writebuf.append_literal("\t"); - writebuf.append(f.op.begin(), f.op.end()); - writebuf.append_literal("\t"); - append_uint32(writebuf, f.ff_offset); - append_delim_value(writebuf, f.val.begin(), f.val.end()); - } - if (mod_op.size() != 0) { - writebuf.append_literal("\t"); - writebuf.append(mod_op.begin(), mod_op.end()); - for (size_t i = 0; i < mvslen; ++i) { - const string_ref& mv = mvs[i]; - append_delim_value(writebuf, mv.begin(), mv.end()); - } - } - } - writebuf.append_literal("\n"); - ++num_req_bufd; -} - -size_t -hstcpcli::request_buf_append(const char *start, const char *finish) -{ -/* - if (num_req_sent > 0 || num_req_rcvd > 0) { -*/ - if (num_req_rcvd > 0) { - close(); - set_error(-1, "request_buf_append: protocol out of sync"); - return 0; - } - const char *nl = start; - size_t num_req = 0; - while ((nl = memchr_char(nl, '\n', finish - nl))) { - if (nl == finish) - break; - num_req++; - nl++; - } - num_req++; - writebuf.append(start, finish); - if (*(finish - 1) != '\n') - writebuf.append_literal("\n"); - num_req_bufd += num_req; - return num_req; -} - -void -hstcpcli::request_reset() -{ - if (num_req_bufd) { - writebuf.erase_front(writebuf.size()); - num_req_bufd = 0; - } -} - -int -hstcpcli::request_send() -{ - if (error_code < 0) { - return error_code; - } - clear_error(); - if (fd.get() < 0) { - close(); - return set_error(-1, "write: closed"); - } -/* - if (num_req_bufd == 0 || num_req_sent > 0 || num_req_rcvd > 0) { -*/ - if (num_req_bufd == 0 || num_req_rcvd > 0) { - close(); - return set_error(-1, "request_send: protocol out of sync"); - } - const size_t wrlen = writebuf.size(); - const ssize_t r = send(fd.get(), writebuf.begin(), wrlen, MSG_NOSIGNAL); - if (r <= 0) { - close(); - return set_error(-1, r < 0 ? "write: failed" : "write: eof"); - } - writebuf.erase_front(r); - if (static_cast<size_t>(r) != wrlen) { - close(); - return set_error(-1, "write: incomplete"); - } - num_req_sent += num_req_bufd; - num_req_bufd = 0; - DBG(fprintf(stderr, "REQSEND 0\n")); - return 0; -} - -int -hstcpcli::response_recv(size_t& num_flds_r) -{ - if (error_code < 0) { - return error_code; - } - clear_error(); - if (num_req_bufd > 0 || num_req_sent == 0 || num_req_rcvd > 0 || - response_end_offset != 0) { - close(); - return set_error(-1, "response_recv: protocol out of sync"); - } - cur_row_offset = 0; - num_flds_r = num_flds = 0; - if (fd.get() < 0) { - return set_error(-1, "read: closed"); - } - size_t offset = 0; - while (true) { - const char *const lbegin = readbuf.begin() + offset; - const char *const lend = readbuf.end(); - if (lbegin < lend) - { - const char *const nl = memchr_char(lbegin, '\n', lend - lbegin); - if (nl != 0) { - offset += (nl + 1) - lbegin; - break; - } - offset += lend - lbegin; - } - if (read_more() <= 0) { - close(); - error_code = -1; - return error_code; - } - } - response_end_offset = offset; - --num_req_sent; - ++num_req_rcvd; - char *start = readbuf.begin(); - char *const finish = start + response_end_offset - 1; - const size_t resp_code = read_ui32(start, finish); - skip_one(start, finish); - num_flds_r = num_flds = read_ui32(start, finish); - if (resp_code != 0) { - skip_one(start, finish); - char *const err_begin = start; - read_token(start, finish); - char *const err_end = start; - String e = String(err_begin, (uint32)(err_end - err_begin), &my_charset_bin); - if (!e.length()) { - e = String(STRING_WITH_LEN("unknown_error"), &my_charset_bin); - } - return set_error(resp_code, e); - } - cur_row_size = 0; - cur_row_offset = start - readbuf.begin(); - DBG(fprintf(stderr, "[%s] ro=%zu eol=%zu\n", - String(readbuf.begin(), readbuf.begin() + response_end_offset) - .c_str(), - cur_row_offset, response_end_offset)); - DBG(fprintf(stderr, "RES 0\n")); - if (flds.max_element < num_flds) - { - if (allocate_dynamic(&flds, num_flds)) - return set_error(-1, "out of memory"); - } - flds.elements = num_flds; - return 0; -} - -int -hstcpcli::get_result(hstresult& result) -{ -/* - readbuf.swap(result.readbuf); -*/ - char *const wp = result.readbuf.make_space(response_end_offset); - memcpy(wp, readbuf.begin(), response_end_offset); - result.readbuf.space_wrote(response_end_offset); - result.response_end_offset = response_end_offset; - result.num_flds = num_flds; - result.cur_row_size = cur_row_size; - result.cur_row_offset = cur_row_offset; - if (result.flds.max_element < num_flds) - { - if (allocate_dynamic(&result.flds, num_flds)) - return set_error(-1, "out of memory"); - } - result.flds.elements = num_flds; - return 0; -} - -const string_ref * -hstcpcli::get_next_row() -{ - if (num_flds == 0 || flds.elements < num_flds) { - DBG(fprintf(stderr, "GNR NF 0\n")); - return 0; - } - char *start = readbuf.begin() + cur_row_offset; - char *const finish = readbuf.begin() + response_end_offset - 1; - if (start >= finish) { /* start[0] == nl */ - DBG(fprintf(stderr, "GNR FIN 0 %p %p\n", start, finish)); - return 0; - } - for (size_t i = 0; i < num_flds; ++i) { - skip_one(start, finish); - char *const fld_begin = start; - read_token(start, finish); - char *const fld_end = start; - char *wp = fld_begin; - if (is_null_expression(fld_begin, fld_end)) { - /* null */ - ((string_ref *) flds.buffer)[i] = string_ref(); - } else { - unescape_string(wp, fld_begin, fld_end); /* in-place */ - ((string_ref *) flds.buffer)[i] = string_ref(fld_begin, wp); - } - } - cur_row_size = start - (readbuf.begin() + cur_row_offset); - cur_row_offset = start - readbuf.begin(); - return (string_ref *) flds.buffer; -} - -const string_ref * -hstcpcli::get_next_row_from_result(hstresult& result) -{ - if (result.num_flds == 0 || result.flds.elements < result.num_flds) { - DBG(fprintf(stderr, "GNR NF 0\n")); - return 0; - } - char *start = result.readbuf.begin() + result.cur_row_offset; - char *const finish = result.readbuf.begin() + result.response_end_offset - 1; - if (start >= finish) { /* start[0] == nl */ - DBG(fprintf(stderr, "GNR FIN 0 %p %p\n", start, finish)); - return 0; - } - for (size_t i = 0; i < result.num_flds; ++i) { - skip_one(start, finish); - char *const fld_begin = start; - read_token(start, finish); - char *const fld_end = start; - char *wp = fld_begin; - if (is_null_expression(fld_begin, fld_end)) { - /* null */ - ((string_ref *) result.flds.buffer)[i] = string_ref(); - } else { - unescape_string(wp, fld_begin, fld_end); /* in-place */ - ((string_ref *) result.flds.buffer)[i] = string_ref(fld_begin, wp); - } - } - result.cur_row_size = - start - (result.readbuf.begin() + result.cur_row_offset); - result.cur_row_offset = start - result.readbuf.begin(); - return (string_ref *) result.flds.buffer; -} - -size_t -hstcpcli::get_row_size() -{ - return cur_row_size; -} - -size_t -hstcpcli::get_row_size_from_result(hstresult& result) -{ - return result.cur_row_size; -} - -void -hstcpcli::response_buf_remove() -{ - if (response_end_offset == 0) { - close(); - set_error(-1, "response_buf_remove: protocol out of sync"); - return; - } - readbuf.erase_front(response_end_offset); - response_end_offset = 0; - --num_req_rcvd; - cur_row_offset = 0; - num_flds = 0; -} - -void -hstcpcli::write_error_to_log( - const char *func_name, - const char *file_name, - ulong line_no -) { - if (errno_buf) { - time_t cur_time = (time_t) time((time_t*) 0); - struct tm lt; - struct tm *l_time = localtime_r(&cur_time, <); - fprintf(stderr, - "%04d%02d%02d %02d:%02d:%02d [ERROR] hstcpcli: [%d][%s]" - " [%s][%s][%lu] errno=%d\n", - l_time->tm_year + 1900, l_time->tm_mon + 1, l_time->tm_mday, - l_time->tm_hour, l_time->tm_min, l_time->tm_sec, - error_code, error_str.c_ptr_safe(), - func_name, file_name, line_no, errno_buf); - } -} - -hstcpcli_ptr -hstcpcli_i::create(const socket_args& args) -{ - return hstcpcli_ptr(new hstcpcli(args)); -} - -}; - |