diff options
Diffstat (limited to 'storage/cassandra')
-rw-r--r-- | storage/cassandra/CMakeLists.txt | 23 | ||||
-rw-r--r-- | storage/cassandra/cassandra_se.cc | 109 | ||||
-rw-r--r-- | storage/cassandra/cassandra_se.h | 2 | ||||
-rw-r--r-- | storage/cassandra/gen-cpp/Cassandra.h | 2 | ||||
-rw-r--r-- | storage/cassandra/gen-cpp/cassandra_types.h | 8 | ||||
-rw-r--r-- | storage/cassandra/ha_cassandra.cc | 36 | ||||
-rw-r--r-- | storage/cassandra/ha_cassandra.h | 5 |
7 files changed, 124 insertions, 61 deletions
diff --git a/storage/cassandra/CMakeLists.txt b/storage/cassandra/CMakeLists.txt index dbccc2eb127..78cb1f8b7e8 100644 --- a/storage/cassandra/CMakeLists.txt +++ b/storage/cassandra/CMakeLists.txt @@ -10,6 +10,7 @@ ${Thrift_INCLUDE_DIR} # this may be set ) # Verify that thrift linking library is found +SET(Thrift_LIB_PATHS ${Thrift_LIB_PATHS} /usr/local/lib /opt/local/lib /opt/lib) FIND_LIBRARY(Thrift_LIBS NAMES thrift PATHS ${Thrift_LIB_PATHS} ${Thrift_LIB}) IF(EXISTS ${Thrift_LIBS}) GET_FILENAME_COMPONENT(LINK_DIR ${Thrift_LIBS} PATH ABSOLUTE) @@ -17,15 +18,16 @@ ELSE() RETURN() ENDIF() -INCLUDE_DIRECTORIES(AFTER ${Thrift_INCLUDE_DIRS}) +INCLUDE_DIRECTORIES(AFTER ${Thrift_INCLUDE_DIRS}/..) SET(CMAKE_REQUIRED_INCLUDES ${Thrift_INCLUDE_DIRS}) STRING(REPLACE "-fno-exceptions" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS}) STRING(REPLACE "-fno-implicit-templates" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS}) +SET(CMAKE_REQUIRED_INCLUDES "${Thrift_INCLUDE_DIRS}/..") CHECK_CXX_SOURCE_COMPILES( " -#include <Thrift.h> +#include <thrift/Thrift.h> #include <boost/shared_ptr.hpp> int main() { boost::shared_ptr<char> p(new char(10)); @@ -48,21 +50,22 @@ IF(CASSANDRASE_OK) LINK_DIRECTORIES(${LINK_DIR}) - MYSQL_ADD_PLUGIN(cassandra ${cassandra_sources} STORAGE_ENGINE MODULE_ONLY LINK_LIBRARIES thrift COMPONENT CassandraSE) + SET(CASSANDRA_DEB_FILES "usr/lib/mysql/plugin/ha_cassandra.so" PARENT_SCOPE) + + MYSQL_ADD_PLUGIN(cassandra ${cassandra_sources} STORAGE_ENGINE MODULE_ONLY LINK_LIBRARIES thrift COMPONENT cassandra-engine) IF (INSTALL_SYSCONFDIR) INSTALL(FILES cassandra.cnf DESTINATION ${INSTALL_SYSCONFDIR}/my.cnf.d - COMPONENT CassandraSE) + COMPONENT cassandra-engine) ENDIF(INSTALL_SYSCONFDIR) IF(RPM) - SET(CPACK_COMPONENT_CASSANDRASELIBRARIES_GROUP "CassandraSE" PARENT_SCOPE) - SET(CPACK_COMPONENTS_ALL ${CPACK_COMPONENTS_ALL} CassandraSE PARENT_SCOPE) - SET(CPACK_RPM_CassandraSE_PACKAGE_REQUIRES "MariaDB-server" PARENT_SCOPE) + SET(CPACK_COMPONENTS_ALL ${CPACK_COMPONENTS_ALL} cassandra-engine PARENT_SCOPE) + SET(CPACK_RPM_cassandra-engine_PACKAGE_REQUIRES "MariaDB-server" PARENT_SCOPE) # workarounds for cmake issues #13248 and #12864: - SET(CPACK_RPM_CassandraSE_USER_FILELIST ${ignored} "%config(noreplace) /etc/my.cnf.d/*" PARENT_SCOPE) - SET(CPACK_RPM_CassandraSE_PACKAGE_PROVIDES "cmake_bug_13248" PARENT_SCOPE) - SET(CPACK_RPM_CassandraSE_PACKAGE_OBSOLETES "cmake_bug_13248" PARENT_SCOPE) + SET(CPACK_RPM_cassandra-engine_USER_FILELIST ${ignored} "%config(noreplace) /etc/my.cnf.d/*" PARENT_SCOPE) + SET(CPACK_RPM_cassandra-engine_PACKAGE_PROVIDES "cmake_bug_13248" PARENT_SCOPE) + SET(CPACK_RPM_cassandra-engine_PACKAGE_OBSOLETES "cmake_bug_13248" PARENT_SCOPE) ENDIF(RPM) ENDIF(CASSANDRASE_OK) diff --git a/storage/cassandra/cassandra_se.cc b/storage/cassandra/cassandra_se.cc index 0d62c5af7a6..111f30f715f 100644 --- a/storage/cassandra/cassandra_se.cc +++ b/storage/cassandra/cassandra_se.cc @@ -6,12 +6,12 @@ #include <stdio.h> #include <stdarg.h> -#include "Thrift.h" -#include "transport/TSocket.h" -#include "transport/TTransport.h" -#include "transport/TBufferTransports.h" -#include "protocol/TProtocol.h" -#include "protocol/TBinaryProtocol.h" +#include "thrift/Thrift.h" +#include "thrift/transport/TSocket.h" +#include "thrift/transport/TTransport.h" +#include "thrift/transport/TBufferTransports.h" +#include "thrift/protocol/TProtocol.h" +#include "thrift/protocol/TBinaryProtocol.h" #include "gen-cpp/Cassandra.h" // cassandra includes end @@ -42,10 +42,14 @@ class Cassandra_se_impl: public Cassandra_se_interface ConsistencyLevel::type write_consistency; ConsistencyLevel::type read_consistency; - + + /* Connection data */ + std::string host; + int port; /* How many times to retry an operation before giving up */ int thrift_call_retries_to_do; + bool inside_try_operation; /* DDL data */ KsDef ks_def; /* KeySpace we're using (TODO: put this in table->share) */ @@ -74,15 +78,19 @@ class Cassandra_se_impl: public Cassandra_se_interface SliceRange slice_pred_sr; bool get_slices_returned_less; bool get_slice_found_rows; + + bool reconnect(); public: Cassandra_se_impl() : cass(NULL), write_consistency(ConsistencyLevel::ONE), read_consistency(ConsistencyLevel::ONE), - thrift_call_retries_to_do(0) {} + thrift_call_retries_to_do(1), + inside_try_operation(false) + {} virtual ~Cassandra_se_impl(){ delete cass; } /* Connection and DDL checks */ - bool connect(const char *host, int port, const char *keyspace); + bool connect(const char *host_arg, int port_arg, const char *keyspace); void set_column_family(const char *cfname) { column_family.assign(cfname); } bool setup_ddl_checks(); @@ -94,6 +102,9 @@ public: /* Settings */ void set_consistency_levels(ulong read_cons_level, ulong write_cons_level); + virtual void set_n_retries(uint retries_arg) { + thrift_call_retries_to_do= retries_arg; + } /* Writes */ void clear_insert_buffer(); @@ -170,15 +181,25 @@ Cassandra_se_interface *create_cassandra_se() } -bool Cassandra_se_impl::connect(const char *host, int port, const char *keyspace_arg) +bool Cassandra_se_impl::connect(const char *host_arg, int port_arg, const char *keyspace_arg) { - bool res= true; - keyspace.assign(keyspace_arg); + host.assign(host_arg); + port= port_arg; + return reconnect(); +} + +bool Cassandra_se_impl::reconnect() +{ + + delete cass; + cass= NULL; + + bool res= true; try { boost::shared_ptr<TTransport> socket = - boost::shared_ptr<TSocket>(new TSocket(host, port)); + boost::shared_ptr<TSocket>(new TSocket(host.c_str(), port)); boost::shared_ptr<TTransport> tr = boost::shared_ptr<TFramedTransport>(new TFramedTransport (socket)); boost::shared_ptr<TProtocol> p = @@ -186,7 +207,7 @@ bool Cassandra_se_impl::connect(const char *host, int port, const char *keyspace cass= new CassandraClient(p); tr->open(); - cass->set_keyspace(keyspace_arg); + cass->set_keyspace(keyspace.c_str()); res= false; // success }catch(TTransportException te){ @@ -694,7 +715,10 @@ bool Cassandra_se_impl::retryable_remove_row() bool Cassandra_se_impl::try_operation(retryable_func_t func_to_call) { bool res; - int n_retries= thrift_call_retries_to_do; + int n_attempts= thrift_call_retries_to_do; + + bool was_inside_try_operation= inside_try_operation; + inside_try_operation= true; do { @@ -710,31 +734,70 @@ bool Cassandra_se_impl::try_operation(retryable_func_t func_to_call) This is supposedly a failure (or "not found" or other negative result). We need to return this to the caller. */ - n_retries= 0; + n_attempts= 0; } } catch (InvalidRequestException ire) { - n_retries= 0; /* there is no point in retrying this operation */ + n_attempts= 0; /* there is no point in retrying this operation */ print_error("%s [%s]", ire.what(), ire.why.c_str()); } catch (UnavailableException ue) { cassandra_counters.unavailable_exceptions++; - if (!--n_retries) + if (!--n_attempts) print_error("UnavailableException: %s", ue.what()); } catch (TimedOutException te) { + /* + Note: this is a timeout generated *inside Cassandra cluster*. + Connection between us and the cluster is ok, but something went wrong + within the cluster. + */ cassandra_counters.timeout_exceptions++; - if (!--n_retries) + if (!--n_attempts) print_error("TimedOutException: %s", te.what()); + } catch (TTransportException tte) { + /* Something went wrong in communication between us and Cassandra */ + cassandra_counters.network_exceptions++; + + switch (tte.getType()) + { + case TTransportException::NOT_OPEN: + case TTransportException::TIMED_OUT: + case TTransportException::END_OF_FILE: + case TTransportException::INTERRUPTED: + { + if (!was_inside_try_operation && reconnect()) + { + /* Failed to reconnect, no point to retry the operation */ + n_attempts= 0; + print_error("%s", tte.what()); + } + else + { + n_attempts--; + } + break; + } + default: + { + /* + We assume it doesn't make sense to retry for + unknown kinds of TTransportException-s + */ + n_attempts= 0; + print_error("%s", tte.what()); + } + } }catch(TException e){ /* todo: we may use retry for certain kinds of Thrift errors */ - n_retries= 0; + n_attempts= 0; print_error("Thrift exception: %s", e.what()); } catch (...) { - n_retries= 0; /* Don't retry */ + n_attempts= 0; /* Don't retry */ print_error("Unknown exception"); } - } while (res && n_retries > 0); - + } while (res && n_attempts > 0); + + inside_try_operation= was_inside_try_operation; return res; } diff --git a/storage/cassandra/cassandra_se.h b/storage/cassandra/cassandra_se.h index 050c65e6dde..2d3d5f27a56 100644 --- a/storage/cassandra/cassandra_se.h +++ b/storage/cassandra/cassandra_se.h @@ -45,6 +45,7 @@ public: /* Settings */ virtual void set_consistency_levels(ulong read_cons_level, ulong write_cons_level)=0; + virtual void set_n_retries(uint retries_arg)=0; /* Check underlying DDL */ virtual bool setup_ddl_checks()=0; @@ -113,6 +114,7 @@ public: ulong timeout_exceptions; ulong unavailable_exceptions; + ulong network_exceptions; }; diff --git a/storage/cassandra/gen-cpp/Cassandra.h b/storage/cassandra/gen-cpp/Cassandra.h index 2040cc63aa2..b98570043a9 100644 --- a/storage/cassandra/gen-cpp/Cassandra.h +++ b/storage/cassandra/gen-cpp/Cassandra.h @@ -7,7 +7,7 @@ #ifndef Cassandra_H #define Cassandra_H -#include <TProcessor.h> +#include <thrift/TProcessor.h> #include "cassandra_types.h" namespace org { namespace apache { namespace cassandra { diff --git a/storage/cassandra/gen-cpp/cassandra_types.h b/storage/cassandra/gen-cpp/cassandra_types.h index d675198dcc8..226c5fa570a 100644 --- a/storage/cassandra/gen-cpp/cassandra_types.h +++ b/storage/cassandra/gen-cpp/cassandra_types.h @@ -10,10 +10,10 @@ #include <inttypes.h> #include <netinet/in.h> -#include <Thrift.h> -#include <TApplicationException.h> -#include <protocol/TProtocol.h> -#include <transport/TTransport.h> +#include <thrift/Thrift.h> +#include <thrift/TApplicationException.h> +#include <thrift/protocol/TProtocol.h> +#include <thrift/transport/TTransport.h> diff --git a/storage/cassandra/ha_cassandra.cc b/storage/cassandra/ha_cassandra.cc index 1f3b9e14cb3..3fbe1834f89 100644 --- a/storage/cassandra/ha_cassandra.cc +++ b/storage/cassandra/ha_cassandra.cc @@ -110,7 +110,7 @@ static MYSQL_THDVAR_ULONG(rnd_batch_size, PLUGIN_VAR_RQCMDARG, static MYSQL_THDVAR_ULONG(failure_retries, PLUGIN_VAR_RQCMDARG, "Number of times to retry Cassandra calls that failed due to timeouts or " "network communication problems. The default, 0, means not to retry.", - NULL, NULL, /*default*/ 0, /*min*/ 0, /*max*/ 1024*1024*1024, 0); + NULL, NULL, /*default*/ 3, /*min*/ 1, /*max*/ 1024*1024*1024, 0); /* These match values in enum_cassandra_consistency_level */ const char *cassandra_consistency_level[] = @@ -369,16 +369,6 @@ ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg) {} -static const char *ha_cassandra_exts[] = { - NullS -}; - -const char **ha_cassandra::bas_ext() const -{ - return ha_cassandra_exts; -} - - int ha_cassandra::connect_and_check_options(TABLE *table_arg) { ha_table_option_struct *options= table_arg->s->option_struct; @@ -1383,6 +1373,7 @@ ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_ /* fall through: */ case MYSQL_TYPE_VAR_STRING: case MYSQL_TYPE_VARCHAR: + case MYSQL_TYPE_BLOB: { /* Cassandra's "varint" type is a binary-encoded arbitary-length @@ -2050,6 +2041,12 @@ void ha_cassandra::start_bulk_insert(ha_rows rows, uint flags) int ha_cassandra::end_bulk_insert() { DBUG_ENTER("ha_cassandra::end_bulk_insert"); + + if (!doing_insert_batch) + { + /* SQL layer can make end_bulk_insert call without start_bulk_insert call */ + DBUG_RETURN(0); + } /* Flush out the insert buffer */ doing_insert_batch= false; @@ -2219,6 +2216,7 @@ int ha_cassandra::reset() { se->set_consistency_levels(THDVAR(table->in_use, read_consistency), THDVAR(table->in_use, write_consistency)); + se->set_n_retries(THDVAR(table->in_use, failure_retries)); } return 0; } @@ -2578,21 +2576,23 @@ struct st_mysql_storage_engine cassandra_storage_engine= { MYSQL_HANDLERTON_INTERFACE_VERSION }; static SHOW_VAR cassandra_status_variables[]= { - {"Cassandra_row_inserts", + {"row_inserts", (char*) &cassandra_counters.row_inserts, SHOW_LONG}, - {"Cassandra_row_insert_batches", + {"row_insert_batches", (char*) &cassandra_counters.row_insert_batches, SHOW_LONG}, - {"Cassandra_multiget_keys_scanned", + {"multiget_keys_scanned", (char*) &cassandra_counters.multiget_keys_scanned, SHOW_LONG}, - {"Cassandra_multiget_reads", + {"multiget_reads", (char*) &cassandra_counters.multiget_reads, SHOW_LONG}, - {"Cassandra_multiget_rows_read", + {"multiget_rows_read", (char*) &cassandra_counters.multiget_rows_read, SHOW_LONG}, - {"Cassandra_timeout_exceptions", + {"network_exceptions", + (char*) &cassandra_counters.network_exceptions, SHOW_LONG}, + {"timeout_exceptions", (char*) &cassandra_counters.timeout_exceptions, SHOW_LONG}, - {"Cassandra_unavailable_exceptions", + {"unavailable_exceptions", (char*) &cassandra_counters.unavailable_exceptions, SHOW_LONG}, {NullS, NullS, SHOW_LONG} }; diff --git a/storage/cassandra/ha_cassandra.h b/storage/cassandra/ha_cassandra.h index cb2f9fb237b..0c225c58780 100644 --- a/storage/cassandra/ha_cassandra.h +++ b/storage/cassandra/ha_cassandra.h @@ -132,11 +132,6 @@ public: const char *index_type(uint inx) { return "HASH"; } /** @brief - The file extensions. - */ - const char **bas_ext() const; - - /** @brief This is a list of flags that indicate what functionality the storage engine implements. The current table flags are documented in handler.h */ |