summaryrefslogtreecommitdiff
path: root/storage/cassandra
diff options
context:
space:
mode:
Diffstat (limited to 'storage/cassandra')
-rw-r--r--storage/cassandra/CMakeLists.txt23
-rw-r--r--storage/cassandra/cassandra_se.cc109
-rw-r--r--storage/cassandra/cassandra_se.h2
-rw-r--r--storage/cassandra/gen-cpp/Cassandra.h2
-rw-r--r--storage/cassandra/gen-cpp/cassandra_types.h8
-rw-r--r--storage/cassandra/ha_cassandra.cc36
-rw-r--r--storage/cassandra/ha_cassandra.h5
7 files changed, 124 insertions, 61 deletions
diff --git a/storage/cassandra/CMakeLists.txt b/storage/cassandra/CMakeLists.txt
index dbccc2eb127..78cb1f8b7e8 100644
--- a/storage/cassandra/CMakeLists.txt
+++ b/storage/cassandra/CMakeLists.txt
@@ -10,6 +10,7 @@ ${Thrift_INCLUDE_DIR} # this may be set
)
# Verify that thrift linking library is found
+SET(Thrift_LIB_PATHS ${Thrift_LIB_PATHS} /usr/local/lib /opt/local/lib /opt/lib)
FIND_LIBRARY(Thrift_LIBS NAMES thrift PATHS ${Thrift_LIB_PATHS} ${Thrift_LIB})
IF(EXISTS ${Thrift_LIBS})
GET_FILENAME_COMPONENT(LINK_DIR ${Thrift_LIBS} PATH ABSOLUTE)
@@ -17,15 +18,16 @@ ELSE()
RETURN()
ENDIF()
-INCLUDE_DIRECTORIES(AFTER ${Thrift_INCLUDE_DIRS})
+INCLUDE_DIRECTORIES(AFTER ${Thrift_INCLUDE_DIRS}/..)
SET(CMAKE_REQUIRED_INCLUDES ${Thrift_INCLUDE_DIRS})
STRING(REPLACE "-fno-exceptions" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
STRING(REPLACE "-fno-implicit-templates" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
+SET(CMAKE_REQUIRED_INCLUDES "${Thrift_INCLUDE_DIRS}/..")
CHECK_CXX_SOURCE_COMPILES(
"
-#include <Thrift.h>
+#include <thrift/Thrift.h>
#include <boost/shared_ptr.hpp>
int main() {
boost::shared_ptr<char> p(new char(10));
@@ -48,21 +50,22 @@ IF(CASSANDRASE_OK)
LINK_DIRECTORIES(${LINK_DIR})
- MYSQL_ADD_PLUGIN(cassandra ${cassandra_sources} STORAGE_ENGINE MODULE_ONLY LINK_LIBRARIES thrift COMPONENT CassandraSE)
+ SET(CASSANDRA_DEB_FILES "usr/lib/mysql/plugin/ha_cassandra.so" PARENT_SCOPE)
+
+ MYSQL_ADD_PLUGIN(cassandra ${cassandra_sources} STORAGE_ENGINE MODULE_ONLY LINK_LIBRARIES thrift COMPONENT cassandra-engine)
IF (INSTALL_SYSCONFDIR)
INSTALL(FILES cassandra.cnf DESTINATION ${INSTALL_SYSCONFDIR}/my.cnf.d
- COMPONENT CassandraSE)
+ COMPONENT cassandra-engine)
ENDIF(INSTALL_SYSCONFDIR)
IF(RPM)
- SET(CPACK_COMPONENT_CASSANDRASELIBRARIES_GROUP "CassandraSE" PARENT_SCOPE)
- SET(CPACK_COMPONENTS_ALL ${CPACK_COMPONENTS_ALL} CassandraSE PARENT_SCOPE)
- SET(CPACK_RPM_CassandraSE_PACKAGE_REQUIRES "MariaDB-server" PARENT_SCOPE)
+ SET(CPACK_COMPONENTS_ALL ${CPACK_COMPONENTS_ALL} cassandra-engine PARENT_SCOPE)
+ SET(CPACK_RPM_cassandra-engine_PACKAGE_REQUIRES "MariaDB-server" PARENT_SCOPE)
# workarounds for cmake issues #13248 and #12864:
- SET(CPACK_RPM_CassandraSE_USER_FILELIST ${ignored} "%config(noreplace) /etc/my.cnf.d/*" PARENT_SCOPE)
- SET(CPACK_RPM_CassandraSE_PACKAGE_PROVIDES "cmake_bug_13248" PARENT_SCOPE)
- SET(CPACK_RPM_CassandraSE_PACKAGE_OBSOLETES "cmake_bug_13248" PARENT_SCOPE)
+ SET(CPACK_RPM_cassandra-engine_USER_FILELIST ${ignored} "%config(noreplace) /etc/my.cnf.d/*" PARENT_SCOPE)
+ SET(CPACK_RPM_cassandra-engine_PACKAGE_PROVIDES "cmake_bug_13248" PARENT_SCOPE)
+ SET(CPACK_RPM_cassandra-engine_PACKAGE_OBSOLETES "cmake_bug_13248" PARENT_SCOPE)
ENDIF(RPM)
ENDIF(CASSANDRASE_OK)
diff --git a/storage/cassandra/cassandra_se.cc b/storage/cassandra/cassandra_se.cc
index 0d62c5af7a6..111f30f715f 100644
--- a/storage/cassandra/cassandra_se.cc
+++ b/storage/cassandra/cassandra_se.cc
@@ -6,12 +6,12 @@
#include <stdio.h>
#include <stdarg.h>
-#include "Thrift.h"
-#include "transport/TSocket.h"
-#include "transport/TTransport.h"
-#include "transport/TBufferTransports.h"
-#include "protocol/TProtocol.h"
-#include "protocol/TBinaryProtocol.h"
+#include "thrift/Thrift.h"
+#include "thrift/transport/TSocket.h"
+#include "thrift/transport/TTransport.h"
+#include "thrift/transport/TBufferTransports.h"
+#include "thrift/protocol/TProtocol.h"
+#include "thrift/protocol/TBinaryProtocol.h"
#include "gen-cpp/Cassandra.h"
// cassandra includes end
@@ -42,10 +42,14 @@ class Cassandra_se_impl: public Cassandra_se_interface
ConsistencyLevel::type write_consistency;
ConsistencyLevel::type read_consistency;
-
+
+ /* Connection data */
+ std::string host;
+ int port;
/* How many times to retry an operation before giving up */
int thrift_call_retries_to_do;
+ bool inside_try_operation;
/* DDL data */
KsDef ks_def; /* KeySpace we're using (TODO: put this in table->share) */
@@ -74,15 +78,19 @@ class Cassandra_se_impl: public Cassandra_se_interface
SliceRange slice_pred_sr;
bool get_slices_returned_less;
bool get_slice_found_rows;
+
+ bool reconnect();
public:
Cassandra_se_impl() : cass(NULL),
write_consistency(ConsistencyLevel::ONE),
read_consistency(ConsistencyLevel::ONE),
- thrift_call_retries_to_do(0) {}
+ thrift_call_retries_to_do(1),
+ inside_try_operation(false)
+ {}
virtual ~Cassandra_se_impl(){ delete cass; }
/* Connection and DDL checks */
- bool connect(const char *host, int port, const char *keyspace);
+ bool connect(const char *host_arg, int port_arg, const char *keyspace);
void set_column_family(const char *cfname) { column_family.assign(cfname); }
bool setup_ddl_checks();
@@ -94,6 +102,9 @@ public:
/* Settings */
void set_consistency_levels(ulong read_cons_level, ulong write_cons_level);
+ virtual void set_n_retries(uint retries_arg) {
+ thrift_call_retries_to_do= retries_arg;
+ }
/* Writes */
void clear_insert_buffer();
@@ -170,15 +181,25 @@ Cassandra_se_interface *create_cassandra_se()
}
-bool Cassandra_se_impl::connect(const char *host, int port, const char *keyspace_arg)
+bool Cassandra_se_impl::connect(const char *host_arg, int port_arg, const char *keyspace_arg)
{
- bool res= true;
-
keyspace.assign(keyspace_arg);
+ host.assign(host_arg);
+ port= port_arg;
+ return reconnect();
+}
+
+bool Cassandra_se_impl::reconnect()
+{
+
+ delete cass;
+ cass= NULL;
+
+ bool res= true;
try {
boost::shared_ptr<TTransport> socket =
- boost::shared_ptr<TSocket>(new TSocket(host, port));
+ boost::shared_ptr<TSocket>(new TSocket(host.c_str(), port));
boost::shared_ptr<TTransport> tr =
boost::shared_ptr<TFramedTransport>(new TFramedTransport (socket));
boost::shared_ptr<TProtocol> p =
@@ -186,7 +207,7 @@ bool Cassandra_se_impl::connect(const char *host, int port, const char *keyspace
cass= new CassandraClient(p);
tr->open();
- cass->set_keyspace(keyspace_arg);
+ cass->set_keyspace(keyspace.c_str());
res= false; // success
}catch(TTransportException te){
@@ -694,7 +715,10 @@ bool Cassandra_se_impl::retryable_remove_row()
bool Cassandra_se_impl::try_operation(retryable_func_t func_to_call)
{
bool res;
- int n_retries= thrift_call_retries_to_do;
+ int n_attempts= thrift_call_retries_to_do;
+
+ bool was_inside_try_operation= inside_try_operation;
+ inside_try_operation= true;
do
{
@@ -710,31 +734,70 @@ bool Cassandra_se_impl::try_operation(retryable_func_t func_to_call)
This is supposedly a failure (or "not found" or other negative
result). We need to return this to the caller.
*/
- n_retries= 0;
+ n_attempts= 0;
}
} catch (InvalidRequestException ire) {
- n_retries= 0; /* there is no point in retrying this operation */
+ n_attempts= 0; /* there is no point in retrying this operation */
print_error("%s [%s]", ire.what(), ire.why.c_str());
} catch (UnavailableException ue) {
cassandra_counters.unavailable_exceptions++;
- if (!--n_retries)
+ if (!--n_attempts)
print_error("UnavailableException: %s", ue.what());
} catch (TimedOutException te) {
+ /*
+ Note: this is a timeout generated *inside Cassandra cluster*.
+ Connection between us and the cluster is ok, but something went wrong
+ within the cluster.
+ */
cassandra_counters.timeout_exceptions++;
- if (!--n_retries)
+ if (!--n_attempts)
print_error("TimedOutException: %s", te.what());
+ } catch (TTransportException tte) {
+ /* Something went wrong in communication between us and Cassandra */
+ cassandra_counters.network_exceptions++;
+
+ switch (tte.getType())
+ {
+ case TTransportException::NOT_OPEN:
+ case TTransportException::TIMED_OUT:
+ case TTransportException::END_OF_FILE:
+ case TTransportException::INTERRUPTED:
+ {
+ if (!was_inside_try_operation && reconnect())
+ {
+ /* Failed to reconnect, no point to retry the operation */
+ n_attempts= 0;
+ print_error("%s", tte.what());
+ }
+ else
+ {
+ n_attempts--;
+ }
+ break;
+ }
+ default:
+ {
+ /*
+ We assume it doesn't make sense to retry for
+ unknown kinds of TTransportException-s
+ */
+ n_attempts= 0;
+ print_error("%s", tte.what());
+ }
+ }
}catch(TException e){
/* todo: we may use retry for certain kinds of Thrift errors */
- n_retries= 0;
+ n_attempts= 0;
print_error("Thrift exception: %s", e.what());
} catch (...) {
- n_retries= 0; /* Don't retry */
+ n_attempts= 0; /* Don't retry */
print_error("Unknown exception");
}
- } while (res && n_retries > 0);
-
+ } while (res && n_attempts > 0);
+
+ inside_try_operation= was_inside_try_operation;
return res;
}
diff --git a/storage/cassandra/cassandra_se.h b/storage/cassandra/cassandra_se.h
index 050c65e6dde..2d3d5f27a56 100644
--- a/storage/cassandra/cassandra_se.h
+++ b/storage/cassandra/cassandra_se.h
@@ -45,6 +45,7 @@ public:
/* Settings */
virtual void set_consistency_levels(ulong read_cons_level, ulong write_cons_level)=0;
+ virtual void set_n_retries(uint retries_arg)=0;
/* Check underlying DDL */
virtual bool setup_ddl_checks()=0;
@@ -113,6 +114,7 @@ public:
ulong timeout_exceptions;
ulong unavailable_exceptions;
+ ulong network_exceptions;
};
diff --git a/storage/cassandra/gen-cpp/Cassandra.h b/storage/cassandra/gen-cpp/Cassandra.h
index 2040cc63aa2..b98570043a9 100644
--- a/storage/cassandra/gen-cpp/Cassandra.h
+++ b/storage/cassandra/gen-cpp/Cassandra.h
@@ -7,7 +7,7 @@
#ifndef Cassandra_H
#define Cassandra_H
-#include <TProcessor.h>
+#include <thrift/TProcessor.h>
#include "cassandra_types.h"
namespace org { namespace apache { namespace cassandra {
diff --git a/storage/cassandra/gen-cpp/cassandra_types.h b/storage/cassandra/gen-cpp/cassandra_types.h
index d675198dcc8..226c5fa570a 100644
--- a/storage/cassandra/gen-cpp/cassandra_types.h
+++ b/storage/cassandra/gen-cpp/cassandra_types.h
@@ -10,10 +10,10 @@
#include <inttypes.h>
#include <netinet/in.h>
-#include <Thrift.h>
-#include <TApplicationException.h>
-#include <protocol/TProtocol.h>
-#include <transport/TTransport.h>
+#include <thrift/Thrift.h>
+#include <thrift/TApplicationException.h>
+#include <thrift/protocol/TProtocol.h>
+#include <thrift/transport/TTransport.h>
diff --git a/storage/cassandra/ha_cassandra.cc b/storage/cassandra/ha_cassandra.cc
index 1f3b9e14cb3..3fbe1834f89 100644
--- a/storage/cassandra/ha_cassandra.cc
+++ b/storage/cassandra/ha_cassandra.cc
@@ -110,7 +110,7 @@ static MYSQL_THDVAR_ULONG(rnd_batch_size, PLUGIN_VAR_RQCMDARG,
static MYSQL_THDVAR_ULONG(failure_retries, PLUGIN_VAR_RQCMDARG,
"Number of times to retry Cassandra calls that failed due to timeouts or "
"network communication problems. The default, 0, means not to retry.",
- NULL, NULL, /*default*/ 0, /*min*/ 0, /*max*/ 1024*1024*1024, 0);
+ NULL, NULL, /*default*/ 3, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
/* These match values in enum_cassandra_consistency_level */
const char *cassandra_consistency_level[] =
@@ -369,16 +369,6 @@ ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg)
{}
-static const char *ha_cassandra_exts[] = {
- NullS
-};
-
-const char **ha_cassandra::bas_ext() const
-{
- return ha_cassandra_exts;
-}
-
-
int ha_cassandra::connect_and_check_options(TABLE *table_arg)
{
ha_table_option_struct *options= table_arg->s->option_struct;
@@ -1383,6 +1373,7 @@ ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_
/* fall through: */
case MYSQL_TYPE_VAR_STRING:
case MYSQL_TYPE_VARCHAR:
+ case MYSQL_TYPE_BLOB:
{
/*
Cassandra's "varint" type is a binary-encoded arbitary-length
@@ -2050,6 +2041,12 @@ void ha_cassandra::start_bulk_insert(ha_rows rows, uint flags)
int ha_cassandra::end_bulk_insert()
{
DBUG_ENTER("ha_cassandra::end_bulk_insert");
+
+ if (!doing_insert_batch)
+ {
+ /* SQL layer can make end_bulk_insert call without start_bulk_insert call */
+ DBUG_RETURN(0);
+ }
/* Flush out the insert buffer */
doing_insert_batch= false;
@@ -2219,6 +2216,7 @@ int ha_cassandra::reset()
{
se->set_consistency_levels(THDVAR(table->in_use, read_consistency),
THDVAR(table->in_use, write_consistency));
+ se->set_n_retries(THDVAR(table->in_use, failure_retries));
}
return 0;
}
@@ -2578,21 +2576,23 @@ struct st_mysql_storage_engine cassandra_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
static SHOW_VAR cassandra_status_variables[]= {
- {"Cassandra_row_inserts",
+ {"row_inserts",
(char*) &cassandra_counters.row_inserts, SHOW_LONG},
- {"Cassandra_row_insert_batches",
+ {"row_insert_batches",
(char*) &cassandra_counters.row_insert_batches, SHOW_LONG},
- {"Cassandra_multiget_keys_scanned",
+ {"multiget_keys_scanned",
(char*) &cassandra_counters.multiget_keys_scanned, SHOW_LONG},
- {"Cassandra_multiget_reads",
+ {"multiget_reads",
(char*) &cassandra_counters.multiget_reads, SHOW_LONG},
- {"Cassandra_multiget_rows_read",
+ {"multiget_rows_read",
(char*) &cassandra_counters.multiget_rows_read, SHOW_LONG},
- {"Cassandra_timeout_exceptions",
+ {"network_exceptions",
+ (char*) &cassandra_counters.network_exceptions, SHOW_LONG},
+ {"timeout_exceptions",
(char*) &cassandra_counters.timeout_exceptions, SHOW_LONG},
- {"Cassandra_unavailable_exceptions",
+ {"unavailable_exceptions",
(char*) &cassandra_counters.unavailable_exceptions, SHOW_LONG},
{NullS, NullS, SHOW_LONG}
};
diff --git a/storage/cassandra/ha_cassandra.h b/storage/cassandra/ha_cassandra.h
index cb2f9fb237b..0c225c58780 100644
--- a/storage/cassandra/ha_cassandra.h
+++ b/storage/cassandra/ha_cassandra.h
@@ -132,11 +132,6 @@ public:
const char *index_type(uint inx) { return "HASH"; }
/** @brief
- The file extensions.
- */
- const char **bas_ext() const;
-
- /** @brief
This is a list of flags that indicate what functionality the storage engine
implements. The current table flags are documented in handler.h
*/