summaryrefslogtreecommitdiff
path: root/storage/maria
diff options
context:
space:
mode:
Diffstat (limited to 'storage/maria')
-rw-r--r--storage/maria/ft_maria.c4
-rw-r--r--storage/maria/ha_maria.cc167
-rw-r--r--storage/maria/lockman.c13
-rw-r--r--storage/maria/ma_blockrec.c12
-rw-r--r--storage/maria/ma_check.c20
-rw-r--r--storage/maria/ma_check_standalone.h6
-rw-r--r--storage/maria/ma_close.c3
-rw-r--r--storage/maria/ma_create.c2
-rw-r--r--storage/maria/ma_delete.c2
-rw-r--r--storage/maria/ma_extra.c6
-rw-r--r--storage/maria/ma_ft_boolean_search.c56
-rw-r--r--storage/maria/ma_ft_nlq_search.c23
-rw-r--r--storage/maria/ma_ft_parser.c38
-rw-r--r--storage/maria/ma_ftdefs.h16
-rw-r--r--storage/maria/ma_init.c5
-rw-r--r--storage/maria/ma_key_recover.h1
-rw-r--r--storage/maria/ma_locking.c5
-rw-r--r--storage/maria/ma_loghandler.c748
-rw-r--r--storage/maria/ma_loghandler.h16
-rw-r--r--storage/maria/ma_page.c24
-rw-r--r--storage/maria/ma_recovery.c3
-rw-r--r--storage/maria/ma_rkey.c3
-rw-r--r--storage/maria/ma_search.c107
-rw-r--r--storage/maria/ma_sort.c11
-rw-r--r--storage/maria/ma_state.c2
-rw-r--r--storage/maria/ma_test3.c4
-rw-r--r--storage/maria/ma_write.c6
-rw-r--r--storage/maria/maria_def.h10
-rw-r--r--storage/maria/maria_ftdump.c16
-rw-r--r--storage/maria/trnman.c6
30 files changed, 1038 insertions, 297 deletions
diff --git a/storage/maria/ft_maria.c b/storage/maria/ft_maria.c
index 1b082f904d0..b1b24592593 100644
--- a/storage/maria/ft_maria.c
+++ b/storage/maria/ft_maria.c
@@ -22,8 +22,8 @@
#include "ma_ftdefs.h"
FT_INFO *maria_ft_init_search(uint flags, void *info, uint keynr,
- uchar *query, uint query_len, CHARSET_INFO *cs,
- uchar *record)
+ uchar *query, size_t query_len,
+ CHARSET_INFO *cs, uchar *record)
{
FT_INFO *res;
if (flags & FT_BOOL)
diff --git a/storage/maria/ha_maria.cc b/storage/maria/ha_maria.cc
index 187cc3465bf..7c34a5f7595 100644
--- a/storage/maria/ha_maria.cc
+++ b/storage/maria/ha_maria.cc
@@ -102,22 +102,40 @@ TYPELIB maria_translog_purge_type_typelib=
array_elements(maria_translog_purge_type_names) - 1, "",
maria_translog_purge_type_names, NULL
};
+
+/* transactional log directory sync */
const char *maria_sync_log_dir_names[]=
{
"NEVER", "NEWFILE", "ALWAYS", NullS
};
-
TYPELIB maria_sync_log_dir_typelib=
{
array_elements(maria_sync_log_dir_names) - 1, "",
maria_sync_log_dir_names, NULL
};
+/* transactional log group commit */
+const char *maria_group_commit_names[]=
+{
+ "none", "hard", "soft", NullS
+};
+TYPELIB maria_group_commit_typelib=
+{
+ array_elements(maria_group_commit_names) - 1, "",
+ maria_group_commit_names, NULL
+};
+
/** Interval between background checkpoints in seconds */
static ulong checkpoint_interval;
static void update_checkpoint_interval(MYSQL_THD thd,
struct st_mysql_sys_var *var,
void *var_ptr, const void *save);
+static void update_maria_group_commit(MYSQL_THD thd,
+ struct st_mysql_sys_var *var,
+ void *var_ptr, const void *save);
+static void update_maria_group_commit_interval(MYSQL_THD thd,
+ struct st_mysql_sys_var *var,
+ void *var_ptr, const void *save);
/** After that many consecutive recovery failures, remove logs */
static ulong force_start_after_recovery_failures;
static void update_log_file_size(MYSQL_THD thd,
@@ -164,6 +182,24 @@ static MYSQL_SYSVAR_ULONG(log_file_size, log_file_size,
NULL, update_log_file_size, TRANSLOG_FILE_SIZE,
TRANSLOG_MIN_FILE_SIZE, 0xffffffffL, TRANSLOG_PAGE_SIZE);
+static MYSQL_SYSVAR_ENUM(group_commit, maria_group_commit,
+ PLUGIN_VAR_RQCMDARG,
+ "Specifies maria group commit mode. "
+ "Possible values are \"none\" (no group commit), "
+ "\"hard\" (with waiting to actual commit), "
+ "\"soft\" (no wait for commit (DANGEROUS!!!))",
+ NULL, update_maria_group_commit,
+ TRANSLOG_GCOMMIT_NONE, &maria_group_commit_typelib);
+
+static MYSQL_SYSVAR_ULONG(group_commit_interval, maria_group_commit_interval,
+ PLUGIN_VAR_RQCMDARG,
+ "Interval between commite in microseconds (1/1000000c)."
+ " 0 stands for no waiting"
+ " for other threads to come and do a commit in \"hard\" mode and no"
+ " sync()/commit at all in \"soft\" mode. Option has only an effect"
+ " if maria_group_commit is used",
+ NULL, update_maria_group_commit_interval, 0, 0, UINT_MAX, 1);
+
static MYSQL_SYSVAR_ENUM(log_purge_type, log_purge_type,
PLUGIN_VAR_RQCMDARG,
"Specifies how maria transactional log will be purged. "
@@ -665,10 +701,9 @@ int maria_check_definition(MARIA_KEYDEF *t1_keyinfo,
extern "C" {
-volatile int *_ma_killed_ptr(HA_CHECK *param)
+int _ma_killed_ptr(HA_CHECK *param)
{
- /* In theory Unsafe conversion, but should be ok for now */
- return (int*) &(((THD *) (param->thd))->killed);
+ return thd_killed((THD*)param->thd);
}
@@ -1928,8 +1963,7 @@ end:
bool ha_maria::check_and_repair(THD *thd)
{
int error, crashed;
- char *old_query;
- uint old_query_length;
+ LEX_STRING old_query;
HA_CHECK_OPT check_opt;
DBUG_ENTER("ha_maria::check_and_repair");
@@ -1957,11 +1991,9 @@ bool ha_maria::check_and_repair(THD *thd)
if (!file->state->del && (maria_recover_options & HA_RECOVER_QUICK))
check_opt.flags |= T_QUICK;
- old_query= thd->query;
- old_query_length= thd->query_length;
+ old_query= thd->query_string;
pthread_mutex_lock(&LOCK_thread_count);
- thd->query= table->s->table_name.str;
- thd->query_length= table->s->table_name.length;
+ thd->query_string= table->s->table_name;
pthread_mutex_unlock(&LOCK_thread_count);
if (!(crashed= maria_is_crashed(file)))
@@ -1981,8 +2013,7 @@ bool ha_maria::check_and_repair(THD *thd)
error= 1;
}
pthread_mutex_lock(&LOCK_thread_count);
- thd->query= old_query;
- thd->query_length= old_query_length;
+ thd->query_string= old_query;
pthread_mutex_unlock(&LOCK_thread_count);
DBUG_RETURN(error);
}
@@ -2294,9 +2325,12 @@ int ha_maria::extra(enum ha_extra_function operation)
extern_lock(F_UNLOCK) (which resets file->trn) followed by maria_close()
without calling commit/rollback in between. If file->trn is not set
we can't remove file->share from the transaction list in the extra() call.
+
+ table->in_use is not set in the case this is a done as part of closefrm()
+ as part of drop table.
*/
- if (!file->trn &&
+ if (file->s->now_transactional && !file->trn && table->in_use &&
(operation == HA_EXTRA_PREPARE_FOR_DROP ||
operation == HA_EXTRA_PREPARE_FOR_RENAME))
{
@@ -2330,7 +2364,7 @@ int ha_maria::delete_all_rows()
{
THD *thd= current_thd;
(void) translog_log_debug_info(file->trn, LOGREC_DEBUG_INFO_QUERY,
- (uchar*) thd->query, thd->query_length);
+ (uchar*) thd->query(), thd->query_length());
if (file->s->now_transactional &&
((table->in_use->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) ||
table->in_use->locked_tables))
@@ -2349,7 +2383,7 @@ int ha_maria::delete_table(const char *name)
{
THD *thd= current_thd;
(void) translog_log_debug_info(0, LOGREC_DEBUG_INFO_QUERY,
- (uchar*) thd->query, thd->query_length);
+ (uchar*) thd->query(), thd->query_length());
return maria_delete_table(name);
}
@@ -2430,7 +2464,8 @@ int ha_maria::external_lock(THD *thd, int lock_type)
trnman_set_flags(trn, trnman_get_flags(trn) | TRN_STATE_INFO_LOGGED |
TRN_STATE_TABLES_CAN_CHANGE);
(void) translog_log_debug_info(trn, LOGREC_DEBUG_INFO_QUERY,
- (uchar*) thd->query, thd->query_length);
+ (uchar*) thd->query(),
+ thd->query_length());
}
#endif
}
@@ -2526,7 +2561,8 @@ int ha_maria::start_stmt(THD *thd, thr_lock_type lock_type)
{
trnman_set_flags(trn, trnman_get_flags(trn) | TRN_STATE_INFO_LOGGED);
(void) translog_log_debug_info(trn, LOGREC_DEBUG_INFO_QUERY,
- (uchar*) thd->query, thd->query_length);
+ (uchar*) thd->query(),
+ thd->query_length());
}
#endif
}
@@ -2807,7 +2843,7 @@ int ha_maria::create(const char *name, register TABLE *table_arg,
create_flags|= HA_CREATE_PAGE_CHECKSUM;
(void) translog_log_debug_info(0, LOGREC_DEBUG_INFO_QUERY,
- (uchar*) thd->query, thd->query_length);
+ (uchar*) thd->query(), thd->query_length());
/* TODO: Check that the following fn_format is really needed */
error=
@@ -2827,7 +2863,7 @@ int ha_maria::rename_table(const char *from, const char *to)
{
THD *thd= current_thd;
(void) translog_log_debug_info(0, LOGREC_DEBUG_INFO_QUERY,
- (uchar*) thd->query, thd->query_length);
+ (uchar*) thd->query(), thd->query_length());
return maria_rename(from, to);
}
@@ -3316,11 +3352,13 @@ static struct st_mysql_sys_var* system_variables[]= {
MYSQL_SYSVAR(block_size),
MYSQL_SYSVAR(checkpoint_interval),
MYSQL_SYSVAR(force_start_after_recovery_failures),
- MYSQL_SYSVAR(page_checksum),
+ MYSQL_SYSVAR(group_commit),
+ MYSQL_SYSVAR(group_commit_interval),
MYSQL_SYSVAR(log_dir_path),
MYSQL_SYSVAR(log_file_size),
MYSQL_SYSVAR(log_purge_type),
MYSQL_SYSVAR(max_sort_file_size),
+ MYSQL_SYSVAR(page_checksum),
MYSQL_SYSVAR(pagecache_age_threshold),
MYSQL_SYSVAR(pagecache_buffer_size),
MYSQL_SYSVAR(pagecache_division_limit),
@@ -3347,6 +3385,92 @@ static void update_checkpoint_interval(MYSQL_THD thd,
}
/**
+ @brief Updates group commit mode
+*/
+
+static void update_maria_group_commit(MYSQL_THD thd,
+ struct st_mysql_sys_var *var,
+ void *var_ptr, const void *save)
+{
+ ulong value= (ulong)*((long *)var_ptr);
+ DBUG_ENTER("update_maria_group_commit");
+ DBUG_PRINT("enter", ("old value: %lu new value %lu rate %lu",
+ value, (ulong)(*(long *)save),
+ maria_group_commit_interval));
+ /* old value */
+ switch (value) {
+ case TRANSLOG_GCOMMIT_NONE:
+ break;
+ case TRANSLOG_GCOMMIT_HARD:
+ translog_hard_group_commit(FALSE);
+ break;
+ case TRANSLOG_GCOMMIT_SOFT:
+ translog_soft_sync(FALSE);
+ if (maria_group_commit_interval)
+ translog_soft_sync_end();
+ break;
+ default:
+ DBUG_ASSERT(0); /* impossible */
+ }
+ value= *(ulong *)var_ptr= (ulong)(*(long *)save);
+ translog_sync();
+ /* new value */
+ switch (value) {
+ case TRANSLOG_GCOMMIT_NONE:
+ break;
+ case TRANSLOG_GCOMMIT_HARD:
+ translog_hard_group_commit(TRUE);
+ break;
+ case TRANSLOG_GCOMMIT_SOFT:
+ translog_soft_sync(TRUE);
+ /* variable change made under global lock so we can just read it */
+ if (maria_group_commit_interval)
+ translog_soft_sync_start();
+ break;
+ default:
+ DBUG_ASSERT(0); /* impossible */
+ }
+ DBUG_VOID_RETURN;
+}
+
+/**
+ @brief Updates group commit interval
+*/
+
+static void update_maria_group_commit_interval(MYSQL_THD thd,
+ struct st_mysql_sys_var *var,
+ void *var_ptr, const void *save)
+{
+ ulong new_value= (ulong)*((long *)save);
+ ulong *value_ptr= (ulong*) var_ptr;
+ DBUG_ENTER("update_maria_group_commit_interval");
+ DBUG_PRINT("enter", ("old value: %lu new value %lu group commit %lu",
+ *value_ptr, new_value, maria_group_commit));
+
+ /* variable change made under global lock so we can just read it */
+ switch (maria_group_commit) {
+ case TRANSLOG_GCOMMIT_NONE:
+ *value_ptr= new_value;
+ translog_set_group_commit_interval(new_value);
+ break;
+ case TRANSLOG_GCOMMIT_HARD:
+ *value_ptr= new_value;
+ translog_set_group_commit_interval(new_value);
+ break;
+ case TRANSLOG_GCOMMIT_SOFT:
+ if (*value_ptr)
+ translog_soft_sync_end();
+ translog_set_group_commit_interval(new_value);
+ if ((*value_ptr= new_value))
+ translog_soft_sync_start();
+ break;
+ default:
+ DBUG_ASSERT(0); /* impossible */
+ }
+ DBUG_VOID_RETURN;
+}
+
+/**
@brief Updates the transaction log file limit.
*/
@@ -3368,6 +3492,7 @@ static SHOW_VAR status_variables[]= {
{"Maria_pagecache_reads", (char*) &maria_pagecache_var.global_cache_read, SHOW_LONGLONG},
{"Maria_pagecache_write_requests", (char*) &maria_pagecache_var.global_cache_w_requests, SHOW_LONGLONG},
{"Maria_pagecache_writes", (char*) &maria_pagecache_var.global_cache_write, SHOW_LONGLONG},
+ {"Maria_transaction_log_syncs", (char*) &translog_syncs, SHOW_LONGLONG},
{NullS, NullS, SHOW_LONG}
};
@@ -3437,7 +3562,7 @@ mysql_declare_plugin(maria)
MYSQL_STORAGE_ENGINE_PLUGIN,
&maria_storage_engine,
"MARIA",
- "MySQL AB",
+ "Monty Program Ab",
"Crash-safe tables with MyISAM heritage",
PLUGIN_LICENSE_GPL,
ha_maria_init, /* Plugin Init */
diff --git a/storage/maria/lockman.c b/storage/maria/lockman.c
index e7f3c81b0fd..d6d4dcd44e6 100644
--- a/storage/maria/lockman.c
+++ b/storage/maria/lockman.c
@@ -360,7 +360,7 @@ retry:
else
{
if (my_atomic_casptr((void **)cursor->prev,
- (void **)&cursor->curr, cursor->next))
+ (void **)(char*) &cursor->curr, cursor->next))
_lf_alloc_free(pins, cursor->curr);
else
{
@@ -421,7 +421,8 @@ static int lockinsert(LOCK * volatile *head, LOCK *node, LF_PINS *pins,
node->link= (intptr)cursor.curr;
DBUG_ASSERT(node->link != (intptr)node);
DBUG_ASSERT(cursor.prev != &node->link);
- if (!my_atomic_casptr((void **)cursor.prev, (void **)&cursor.curr, node))
+ if (!my_atomic_casptr((void **)cursor.prev,
+ (void **)(char*) &cursor.curr, node))
{
res= REPEAT_ONCE_MORE;
node->flags&= ~ACTIVE;
@@ -498,11 +499,11 @@ static int lockdelete(LOCK * volatile *head, LOCK *node, LF_PINS *pins)
then we can delete. Good news is - this is only required when rolling
back a savepoint.
*/
- if (my_atomic_casptr((void **)&(cursor.curr->link),
- (void **)&cursor.next, 1+(char *)cursor.next))
+ if (my_atomic_casptr((void **)(char*)&(cursor.curr->link),
+ (void **)(char*)&cursor.next, 1+(char *)cursor.next))
{
if (my_atomic_casptr((void **)cursor.prev,
- (void **)&cursor.curr, cursor.next))
+ (void **)(char*)&cursor.curr, cursor.next))
_lf_alloc_free(pins, cursor.curr);
else
lockfind(head, node, &cursor, pins);
@@ -573,7 +574,7 @@ static void initialize_bucket(LOCKMAN *lm, LOCK * volatile *node,
my_free((void *)dummy, MYF(0));
dummy= cur;
}
- my_atomic_casptr((void **)node, (void **)&tmp, dummy);
+ my_atomic_casptr((void **)node, (void **)(char*) &tmp, dummy);
}
static inline uint calc_hash(uint64 resource)
diff --git a/storage/maria/ma_blockrec.c b/storage/maria/ma_blockrec.c
index 694c8b7b5ff..89701913c9a 100644
--- a/storage/maria/ma_blockrec.c
+++ b/storage/maria/ma_blockrec.c
@@ -430,8 +430,9 @@ my_bool _ma_once_end_block_record(MARIA_SHARE *share)
if (share->bitmap.file.file >= 0)
{
if (flush_pagecache_blocks(share->pagecache, &share->bitmap.file,
- share->temporary ? FLUSH_IGNORE_CHANGED :
- FLUSH_RELEASE))
+ ((share->temporary || share->deleting) ?
+ FLUSH_IGNORE_CHANGED :
+ FLUSH_RELEASE)))
res= 1;
/*
File must be synced as it is going out of the maria_open_list and so
@@ -1688,7 +1689,8 @@ static my_bool get_head_or_tail_page(MARIA_HA *info,
if (!page_link.changed)
goto crashed;
- DBUG_ASSERT((res->buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == page_type);
+ DBUG_ASSERT((uint) (res->buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) ==
+ page_type);
if (!(dir= find_free_position(page_type == HEAD_PAGE ? info : 0,
res->buff, block_size, &res->rownr,
&res->length, &res->empty_space)))
@@ -6094,7 +6096,7 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
DBUG_RETURN(0);
}
- if (((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) != page_type))
+ if (((uint) (buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) != page_type))
{
/*
This is a page that has been freed before and now should be
@@ -6241,7 +6243,7 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
Note that in case the page is not anymore a head or tail page
a future redo will fix the bitmap.
*/
- if ((buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == page_type)
+ if ((uint) (buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK) == page_type)
{
empty_space= uint2korr(buff+EMPTY_SPACE_OFFSET);
if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE,
diff --git a/storage/maria/ma_check.c b/storage/maria/ma_check.c
index e33849bef04..4f93bf812a3 100644
--- a/storage/maria/ma_check.c
+++ b/storage/maria/ma_check.c
@@ -215,7 +215,7 @@ int maria_chk_del(HA_CHECK *param, register MARIA_HA *info,
empty=0;
for (i= share->state.state.del ; i > 0L && next_link != HA_OFFSET_ERROR ; i--)
{
- if (*_ma_killed_ptr(param))
+ if (_ma_killed_ptr(param))
DBUG_RETURN(1);
if (test_flag & T_VERBOSE)
printf(" %9s",llstr(next_link,buff));
@@ -310,7 +310,7 @@ static int check_k_link(HA_CHECK *param, register MARIA_HA *info,
records= (ha_rows) (share->state.state.key_file_length / block_size);
while (next_link != HA_OFFSET_ERROR && records > 0)
{
- if (*_ma_killed_ptr(param))
+ if (_ma_killed_ptr(param))
DBUG_RETURN(1);
if (param->testflag & T_VERBOSE)
printf("%16s",llstr(next_link,llbuff));
@@ -876,10 +876,10 @@ static int chk_index(HA_CHECK *param, MARIA_HA *info, MARIA_KEYDEF *keyinfo,
tmp_key.data= tmp_key_buff;
for ( ;; )
{
- if (*_ma_killed_ptr(param))
- goto err;
if (nod_flag)
{
+ if (_ma_killed_ptr(param))
+ goto err;
next_page= _ma_kpos(nod_flag,keypos);
if (chk_index_down(param,info,keyinfo,next_page,
temp_buff,keys,key_checksum,level+1))
@@ -1180,7 +1180,7 @@ static int check_static_record(HA_CHECK *param, MARIA_HA *info, int extend,
pos= 0;
while (pos < share->state.state.data_file_length)
{
- if (*_ma_killed_ptr(param))
+ if (_ma_killed_ptr(param))
return -1;
if (my_b_read(&param->read_cache, record,
share->base.pack_reclength))
@@ -1230,7 +1230,7 @@ static int check_dynamic_record(HA_CHECK *param, MARIA_HA *info, int extend,
{
my_bool got_error= 0;
int flag;
- if (*_ma_killed_ptr(param))
+ if (_ma_killed_ptr(param))
DBUG_RETURN(-1);
flag= block_info.second_read=0;
@@ -1451,7 +1451,7 @@ static int check_compressed_record(HA_CHECK *param, MARIA_HA *info, int extend,
pos= share->pack.header_length; /* Skip header */
while (pos < share->state.state.data_file_length)
{
- if (*_ma_killed_ptr(param))
+ if (_ma_killed_ptr(param))
DBUG_RETURN(-1);
if (_ma_read_cache(&param->read_cache, block_info.header, pos,
@@ -1815,7 +1815,7 @@ static int check_block_record(HA_CHECK *param, MARIA_HA *info, int extend,
LINT_INIT(row_count);
LINT_INIT(empty_space);
- if (*_ma_killed_ptr(param))
+ if (_ma_killed_ptr(param))
{
_ma_scan_end_block_record(info);
return -1;
@@ -4631,7 +4631,7 @@ static int sort_get_next_record(MARIA_SORT_PARAM *sort_param)
char llbuff[22],llbuff2[22];
DBUG_ENTER("sort_get_next_record");
- if (*_ma_killed_ptr(param))
+ if (_ma_killed_ptr(param))
DBUG_RETURN(1);
switch (sort_info->org_data_file_type) {
@@ -6018,7 +6018,7 @@ int maria_update_state_info(HA_CHECK *param, MARIA_HA *info,uint update)
{
if (update & UPDATE_TIME)
{
- share->state.check_time= (long) time((time_t*) 0);
+ share->state.check_time= time((time_t*) 0);
if (!share->state.create_time)
share->state.create_time= share->state.check_time;
}
diff --git a/storage/maria/ma_check_standalone.h b/storage/maria/ma_check_standalone.h
index 3874d722d6c..9b30c96089f 100644
--- a/storage/maria/ma_check_standalone.h
+++ b/storage/maria/ma_check_standalone.h
@@ -30,11 +30,9 @@
Check if check/repair operation was killed by a signal
*/
-static int not_killed= 0;
-
-volatile int *_ma_killed_ptr(HA_CHECK *param __attribute__((unused)))
+int _ma_killed_ptr(HA_CHECK *param __attribute__((unused)))
{
- return &not_killed; /* always NULL */
+ return 0;
}
/* print warnings and errors */
diff --git a/storage/maria/ma_close.c b/storage/maria/ma_close.c
index 235b37f7030..df525d45d14 100644
--- a/storage/maria/ma_close.c
+++ b/storage/maria/ma_close.c
@@ -79,7 +79,7 @@ int maria_close(register MARIA_HA *info)
if ((*share->once_end)(share))
error= my_errno;
if (flush_pagecache_blocks(share->pagecache, &share->kfile,
- (share->temporary ?
+ ((share->temporary || share->deleting) ?
FLUSH_IGNORE_CHANGED :
FLUSH_RELEASE)))
error= my_errno;
@@ -177,6 +177,7 @@ int maria_close(register MARIA_HA *info)
{
(void) pthread_mutex_destroy(&share->intern_lock);
(void) pthread_mutex_destroy(&share->close_lock);
+ (void) pthread_cond_destroy(&share->key_del_cond);
my_free((uchar *)share, MYF(0));
/*
If share cannot be freed, it's because checkpoint has previously
diff --git a/storage/maria/ma_create.c b/storage/maria/ma_create.c
index 05b11929d0a..6886dc8f291 100644
--- a/storage/maria/ma_create.c
+++ b/storage/maria/ma_create.c
@@ -772,7 +772,7 @@ int maria_create(const char *name, enum data_file_type datafile_type,
share.base.min_block_length= share.base.pack_reclength;
if (! (flags & HA_DONT_TOUCH_DATA))
- share.state.create_time= (long) time((time_t*) 0);
+ share.state.create_time= time((time_t*) 0);
pthread_mutex_lock(&THR_LOCK_maria);
diff --git a/storage/maria/ma_delete.c b/storage/maria/ma_delete.c
index 5b0a2ac8884..0e9e5caafbf 100644
--- a/storage/maria/ma_delete.c
+++ b/storage/maria/ma_delete.c
@@ -169,6 +169,8 @@ my_bool _ma_ck_delete(MARIA_HA *info, MARIA_KEY *key)
MARIA_KEY org_key;
DBUG_ENTER("_ma_ck_delete");
+ LINT_INIT_STRUCT(org_key);
+
save_key_data= key->data;
if (share->now_transactional)
{
diff --git a/storage/maria/ma_extra.c b/storage/maria/ma_extra.c
index 4499cca2885..7a30b613ea5 100644
--- a/storage/maria/ma_extra.c
+++ b/storage/maria/ma_extra.c
@@ -305,6 +305,12 @@ int maria_extra(MARIA_HA *info, enum ha_extra_function function,
pthread_mutex_unlock(&THR_LOCK_maria);
break;
case HA_EXTRA_PREPARE_FOR_DROP:
+ /* Signals about intent to delete this table */
+ share->deleting= TRUE;
+ share->global_changed= FALSE; /* force writing changed flag */
+ /* To force repair if reopened */
+ _ma_mark_file_changed(info);
+ /* Fall trough */
case HA_EXTRA_PREPARE_FOR_RENAME:
{
my_bool do_flush= test(function != HA_EXTRA_PREPARE_FOR_DROP);
diff --git a/storage/maria/ma_ft_boolean_search.c b/storage/maria/ma_ft_boolean_search.c
index 763b8827a6c..d302892ce05 100644
--- a/storage/maria/ma_ft_boolean_search.c
+++ b/storage/maria/ma_ft_boolean_search.c
@@ -180,7 +180,7 @@ typedef struct st_my_ftb_param
static int ftb_query_add_word(MYSQL_FTPARSER_PARAM *param,
- char *word, int word_len,
+ const uchar *word, mysql_ft_size_t word_len,
MYSQL_FTPARSER_BOOLEAN_INFO *info)
{
MY_FTB_PARAM *ftb_param= param->mysql_ftparam;
@@ -282,24 +282,24 @@ static int ftb_query_add_word(MYSQL_FTPARSER_PARAM *param,
static int ftb_parse_query_internal(MYSQL_FTPARSER_PARAM *param,
- char *query, int len)
+ const uchar *query, mysql_ft_size_t len)
{
MY_FTB_PARAM *ftb_param= param->mysql_ftparam;
MYSQL_FTPARSER_BOOLEAN_INFO info;
CHARSET_INFO *cs= ftb_param->ftb->charset;
- uchar **start= (uchar**) &query;
- uchar *end= (uchar*) query + len;
+ const uchar **start= &query;
+ const uchar *end= query + len;
FT_WORD w;
info.prev= ' ';
info.quot= 0;
while (maria_ft_get_word(cs, start, end, &w, &info))
- param->mysql_add_word(param, (char *) w.pos, w.len, &info);
+ param->mysql_add_word(param, w.pos, w.len, &info);
return(0);
}
-static int _ftb_parse_query(FTB *ftb, uchar *query, uint len,
+static int _ftb_parse_query(FTB *ftb, uchar *query, size_t len,
struct st_mysql_ftparser *parser)
{
MYSQL_FTPARSER_PARAM *param;
@@ -321,7 +321,7 @@ static int _ftb_parse_query(FTB *ftb, uchar *query, uint len,
param->mysql_add_word= ftb_query_add_word;
param->mysql_ftparam= (void *)&ftb_param;
param->cs= ftb->charset;
- param->doc= (char*) query;
+ param->doc= query;
param->length= len;
param->flags= 0;
param->mode= MYSQL_FTPARSER_FULL_BOOLEAN_INFO;
@@ -539,8 +539,8 @@ static void _ftb_init_index_search(FT_INFO *ftb)
FT_INFO * maria_ft_init_boolean_search(MARIA_HA *info, uint keynr,
- uchar *query,
- uint query_len, CHARSET_INFO *cs)
+ uchar *query, size_t query_len,
+ CHARSET_INFO *cs)
{
FTB *ftb;
FTB_EXPR *ftbe;
@@ -592,7 +592,7 @@ FT_INFO * maria_ft_init_boolean_search(MARIA_HA *info, uint keynr,
sizeof(FTB_WORD *)*ftb->queue.elements);
memcpy(ftb->list, ftb->queue.root+1, sizeof(FTB_WORD *)*ftb->queue.elements);
my_qsort2(ftb->list, ftb->queue.elements, sizeof(FTB_WORD *),
- (qsort2_cmp)FTB_WORD_cmp_list, ftb->charset);
+ (qsort2_cmp)FTB_WORD_cmp_list, (void*) ftb->charset);
if (ftb->queue.elements<2) ftb->with_scan &= ~FTB_FLAG_TRUNC;
ftb->state=READY;
return ftb;
@@ -615,8 +615,9 @@ typedef struct st_my_ftb_phrase_param
static int ftb_phrase_add_word(MYSQL_FTPARSER_PARAM *param,
- char *word, int word_len,
- MYSQL_FTPARSER_BOOLEAN_INFO *boolean_info __attribute__((unused)))
+ const uchar *word, mysql_ft_size_t word_len,
+ MYSQL_FTPARSER_BOOLEAN_INFO
+ *boolean_info __attribute__((unused)))
{
MY_FTB_PHRASE_PARAM *phrase_param= param->mysql_ftparam;
FT_WORD *w= (FT_WORD *)phrase_param->document->data;
@@ -647,15 +648,16 @@ static int ftb_phrase_add_word(MYSQL_FTPARSER_PARAM *param,
static int ftb_check_phrase_internal(MYSQL_FTPARSER_PARAM *param,
- char *document, int len)
+ const uchar *document,
+ mysql_ft_size_t len)
{
FT_WORD word;
MY_FTB_PHRASE_PARAM *phrase_param= param->mysql_ftparam;
- const uchar *docend= (uchar*) document + len;
- while (maria_ft_simple_get_word(phrase_param->cs, (uchar**) &document,
+ const uchar *docend= document + len;
+ while (maria_ft_simple_get_word(phrase_param->cs, &document,
docend, &word, FALSE))
{
- param->mysql_add_word(param, (char*) word.pos, word.len, 0);
+ param->mysql_add_word(param, word.pos, word.len, 0);
if (phrase_param->match)
break;
}
@@ -678,8 +680,8 @@ static int ftb_check_phrase_internal(MYSQL_FTPARSER_PARAM *param,
-1 is returned if error occurs.
*/
-static int _ftb_check_phrase(FTB *ftb, const uchar *document, uint len,
- FTB_EXPR *ftbe, struct st_mysql_ftparser *parser)
+static int _ftb_check_phrase(FTB *ftb, const uchar *document, size_t len,
+ FTB_EXPR *ftbe, struct st_mysql_ftparser *parser)
{
MY_FTB_PHRASE_PARAM ftb_param;
MYSQL_FTPARSER_PARAM *param;
@@ -699,7 +701,7 @@ static int _ftb_check_phrase(FTB *ftb, const uchar *document, uint len,
param->mysql_add_word= ftb_phrase_add_word;
param->mysql_ftparam= (void *)&ftb_param;
param->cs= ftb->charset;
- param->doc= (char *) document;
+ param->doc= document;
param->length= len;
param->flags= 0;
param->mode= MYSQL_FTPARSER_WITH_STOPWORDS;
@@ -872,8 +874,9 @@ typedef struct st_my_ftb_find_param
static int ftb_find_relevance_add_word(MYSQL_FTPARSER_PARAM *param,
- char *word, int len,
- MYSQL_FTPARSER_BOOLEAN_INFO *boolean_info __attribute__((unused)))
+ const uchar *word, mysql_ft_size_t len,
+ MYSQL_FTPARSER_BOOLEAN_INFO
+ *boolean_info __attribute__((unused)))
{
MY_FTB_FIND_PARAM *ftb_param= param->mysql_ftparam;
FT_INFO *ftb= ftb_param->ftb;
@@ -933,15 +936,14 @@ static int ftb_find_relevance_add_word(MYSQL_FTPARSER_PARAM *param,
static int ftb_find_relevance_parse(MYSQL_FTPARSER_PARAM *param,
- char *doc, int len)
+ const uchar *doc, mysql_ft_size_t len)
{
MY_FTB_FIND_PARAM *ftb_param= param->mysql_ftparam;
FT_INFO *ftb= ftb_param->ftb;
- uchar *end= (uchar*) doc + len;
+ const uchar *end= doc + len;
FT_WORD w;
- while (maria_ft_simple_get_word(ftb->charset, (uchar**) &doc,
- end, &w, TRUE))
- param->mysql_add_word(param, (char *) w.pos, w.len, 0);
+ while (maria_ft_simple_get_word(ftb->charset, &doc, end, &w, TRUE))
+ param->mysql_add_word(param, w.pos, w.len, 0);
return(0);
}
@@ -998,7 +1000,7 @@ float maria_ft_boolean_find_relevance(FT_INFO *ftb, uchar *record, uint length)
{
if (!ftsi.pos)
continue;
- param->doc= (char *)ftsi.pos;
+ param->doc= ftsi.pos;
param->length= ftsi.len;
if (unlikely(parser->parse(param)))
return 0;
diff --git a/storage/maria/ma_ft_nlq_search.c b/storage/maria/ma_ft_nlq_search.c
index 1a85c50174e..927f34f8b72 100644
--- a/storage/maria/ma_ft_nlq_search.c
+++ b/storage/maria/ma_ft_nlq_search.c
@@ -51,6 +51,7 @@ typedef struct st_ft_superdoc
double tmp_weight;
} FT_SUPERDOC;
+
static int FT_SUPERDOC_cmp(void* cmp_arg __attribute__((unused)),
FT_SUPERDOC *p1, FT_SUPERDOC *p2)
{
@@ -63,7 +64,8 @@ static int FT_SUPERDOC_cmp(void* cmp_arg __attribute__((unused)),
static int walk_and_match(FT_WORD *word, uint32 count, ALL_IN_ONE *aio)
{
- int subkeys, r;
+ FT_WEIGTH subkeys;
+ int r;
uint doc_cnt;
FT_SUPERDOC sdoc, *sptr;
TREE_ELEMENT *selem;
@@ -90,9 +92,9 @@ static int walk_and_match(FT_WORD *word, uint32 count, ALL_IN_ONE *aio)
/* Skip rows inserted by current inserted */
for (r= _ma_search(info, &key, SEARCH_FIND, key_root) ;
!r &&
- (subkeys=ft_sintXkorr(info->last_key.data +
- info->last_key.data_length +
- info->last_key.ref_length - extra)) > 0 &&
+ (subkeys.i= ft_sintXkorr(info->last_key.data +
+ info->last_key.data_length +
+ info->last_key.ref_length - extra)) > 0 &&
info->cur_row.lastpos >= info->state->data_file_length ;
r= _ma_search_next(info, &info->last_key, SEARCH_BIGGER, key_root))
;
@@ -111,7 +113,7 @@ static int walk_and_match(FT_WORD *word, uint32 count, ALL_IN_ONE *aio)
key.data+1, key.data_length-1, 0, 0))
break;
- if (subkeys<0)
+ if (subkeys.i < 0)
{
if (doc_cnt)
DBUG_RETURN(1); /* index is corrupted */
@@ -127,7 +129,8 @@ static int walk_and_match(FT_WORD *word, uint32 count, ALL_IN_ONE *aio)
goto do_skip;
}
#if HA_FT_WTYPE == HA_KEYTYPE_FLOAT
- tmp_weight=*(float*)&subkeys;
+ /* The weight we read was actually a float */
+ tmp_weight= subkeys.f;
#else
#error
#endif
@@ -162,9 +165,9 @@ static int walk_and_match(FT_WORD *word, uint32 count, ALL_IN_ONE *aio)
else
r= _ma_search(info, &info->last_key, SEARCH_BIGGER, key_root);
do_skip:
- while ((subkeys=ft_sintXkorr(info->last_key.data +
- info->last_key.data_length +
- info->last_key.ref_length - extra)) > 0 &&
+ while ((subkeys.i= ft_sintXkorr(info->last_key.data +
+ info->last_key.data_length +
+ info->last_key.ref_length - extra)) > 0 &&
!r && info->cur_row.lastpos >= info->state->data_file_length)
r= _ma_search_next(info, &info->last_key, SEARCH_BIGGER, key_root);
@@ -205,7 +208,7 @@ static int FT_DOC_cmp(void *unused __attribute__((unused)),
FT_INFO *maria_ft_init_nlq_search(MARIA_HA *info, uint keynr, uchar *query,
- uint query_len, uint flags, uchar *record)
+ size_t query_len, uint flags, uchar *record)
{
TREE wtree;
ALL_IN_ONE aio;
diff --git a/storage/maria/ma_ft_parser.c b/storage/maria/ma_ft_parser.c
index bdfbbb936ce..b35c2227ca2 100644
--- a/storage/maria/ma_ft_parser.c
+++ b/storage/maria/ma_ft_parser.c
@@ -109,10 +109,11 @@ my_bool maria_ft_boolean_check_syntax_string(const uchar *str)
3 - right bracket
4 - stopword found
*/
-uchar maria_ft_get_word(CHARSET_INFO *cs, uchar **start, uchar *end,
+uchar maria_ft_get_word(CHARSET_INFO *cs, const uchar **start,
+ const uchar *end,
FT_WORD *word, MYSQL_FTPARSER_BOOLEAN_INFO *param)
{
- uchar *doc=*start;
+ const uchar *doc= *start;
int ctype;
uint mwc, length;
int mbl;
@@ -203,11 +204,11 @@ ret:
return param->type;
}
-uchar maria_ft_simple_get_word(CHARSET_INFO *cs, uchar **start,
+uchar maria_ft_simple_get_word(CHARSET_INFO *cs, const uchar **start,
const uchar *end, FT_WORD *word,
my_bool skip_stopwords)
{
- uchar *doc= *start;
+ const uchar *doc= *start;
uint mwc, length;
int ctype, mbl;
DBUG_ENTER("maria_ft_simple_get_word");
@@ -253,14 +254,16 @@ void maria_ft_parse_init(TREE *wtree, CHARSET_INFO *cs)
{
DBUG_ENTER("maria_ft_parse_init");
if (!is_tree_inited(wtree))
- init_tree(wtree,0,0,sizeof(FT_WORD),(qsort_cmp2)&FT_WORD_cmp,0,NULL, cs);
+ init_tree(wtree,0,0,sizeof(FT_WORD),(qsort_cmp2)&FT_WORD_cmp,0, NULL,
+ (void*) cs);
DBUG_VOID_RETURN;
}
static int maria_ft_add_word(MYSQL_FTPARSER_PARAM *param,
- char *word, int word_len,
- MYSQL_FTPARSER_BOOLEAN_INFO *boolean_info __attribute__((unused)))
+ const uchar *word, mysql_ft_size_t word_len,
+ MYSQL_FTPARSER_BOOLEAN_INFO *boolean_info
+ __attribute__((unused)))
{
TREE *wtree;
FT_WORD w;
@@ -276,7 +279,7 @@ static int maria_ft_add_word(MYSQL_FTPARSER_PARAM *param,
w.pos= ptr;
}
else
- w.pos= (uchar *) word;
+ w.pos= word;
w.len= word_len;
if (!tree_insert(wtree, &w, 0, wtree->custom_arg))
{
@@ -288,24 +291,25 @@ static int maria_ft_add_word(MYSQL_FTPARSER_PARAM *param,
static int maria_ft_parse_internal(MYSQL_FTPARSER_PARAM *param,
- char *doc_arg, int doc_len)
+ const uchar *doc_arg,
+ mysql_ft_size_t doc_len)
{
- uchar *doc= (uchar*) doc_arg;
- uchar *end= doc + doc_len;
+ const uchar *doc= doc_arg;
+ const uchar *end= doc + doc_len;
MY_FT_PARSER_PARAM *ft_param=param->mysql_ftparam;
TREE *wtree= ft_param->wtree;
FT_WORD w;
DBUG_ENTER("maria_ft_parse_internal");
while (maria_ft_simple_get_word(wtree->custom_arg, &doc, end, &w, TRUE))
- if (param->mysql_add_word(param, (char *) w.pos, w.len, 0))
+ if (param->mysql_add_word(param, w.pos, w.len, 0))
DBUG_RETURN(1);
DBUG_RETURN(0);
}
-int maria_ft_parse(TREE *wtree, uchar *doc, int doclen,
- struct st_mysql_ftparser *parser,
+int maria_ft_parse(TREE *wtree, uchar *doc, size_t doclen,
+ struct st_mysql_ftparser *parser,
MYSQL_FTPARSER_PARAM *param, MEM_ROOT *mem_root)
{
MY_FT_PARSER_PARAM my_param;
@@ -318,7 +322,7 @@ int maria_ft_parse(TREE *wtree, uchar *doc, int doclen,
param->mysql_add_word= maria_ft_add_word;
param->mysql_ftparam= &my_param;
param->cs= wtree->custom_arg;
- param->doc= (char *) doc;
+ param->doc= doc;
param->length= doclen;
param->mode= MYSQL_FTPARSER_SIMPLE_MODE;
DBUG_RETURN(parser->parse(param));
@@ -378,8 +382,8 @@ MYSQL_FTPARSER_PARAM *maria_ftparser_call_initializer(MARIA_HA *info,
mysql_add_word != 0 - parser is initialized, or no
initialization needed. */
info->ftparser_param[ftparser_nr].mysql_add_word=
- (int (*)(struct st_mysql_ftparser_param *, char *, int,
- MYSQL_FTPARSER_BOOLEAN_INFO *)) 1;
+ (int (*)(struct st_mysql_ftparser_param *, const uchar *,
+ mysql_ft_size_t, MYSQL_FTPARSER_BOOLEAN_INFO *)) 1;
if (parser->init && parser->init(&info->ftparser_param[ftparser_nr]))
return 0;
}
diff --git a/storage/maria/ma_ftdefs.h b/storage/maria/ma_ftdefs.h
index 7e83d774aed..4ce4e9e22ba 100644
--- a/storage/maria/ma_ftdefs.h
+++ b/storage/maria/ma_ftdefs.h
@@ -96,7 +96,7 @@
#define FTB_RQUOT (ft_boolean_syntax[11])
typedef struct st_maria_ft_word {
- uchar * pos;
+ const uchar * pos;
uint len;
double weight;
} FT_WORD;
@@ -106,9 +106,9 @@ int is_stopword(char *word, uint len);
MARIA_KEY *_ma_ft_make_key(MARIA_HA *, MARIA_KEY *, uint , uchar *, FT_WORD *,
my_off_t);
-uchar maria_ft_get_word(CHARSET_INFO *, uchar **, uchar *, FT_WORD *,
- MYSQL_FTPARSER_BOOLEAN_INFO *);
-uchar maria_ft_simple_get_word(CHARSET_INFO *, uchar **, const uchar *,
+uchar maria_ft_get_word(CHARSET_INFO *, const uchar **, const uchar *,
+ FT_WORD *, MYSQL_FTPARSER_BOOLEAN_INFO *);
+uchar maria_ft_simple_get_word(CHARSET_INFO *, const uchar **, const uchar *,
FT_WORD *, my_bool);
typedef struct _st_maria_ft_seg_iterator {
@@ -122,15 +122,17 @@ void _ma_ft_segiterator_dummy_init(const uchar *, uint, FT_SEG_ITERATOR *);
uint _ma_ft_segiterator(FT_SEG_ITERATOR *);
void maria_ft_parse_init(TREE *, CHARSET_INFO *);
-int maria_ft_parse(TREE *, uchar *, int, struct st_mysql_ftparser *parser,
+int maria_ft_parse(TREE *, uchar *, size_t, struct st_mysql_ftparser *parser,
MYSQL_FTPARSER_PARAM *, MEM_ROOT *);
FT_WORD * maria_ft_linearize(TREE *, MEM_ROOT *);
FT_WORD * _ma_ft_parserecord(MARIA_HA *, uint, const uchar *, MEM_ROOT *);
uint _ma_ft_parse(TREE *, MARIA_HA *, uint, const uchar *,
MYSQL_FTPARSER_PARAM *, MEM_ROOT *);
-FT_INFO *maria_ft_init_nlq_search(MARIA_HA *, uint, uchar *, uint, uint, uchar *);
-FT_INFO *maria_ft_init_boolean_search(MARIA_HA *, uint, uchar *, uint, CHARSET_INFO *);
+FT_INFO *maria_ft_init_nlq_search(MARIA_HA *, uint, uchar *, size_t, uint,
+ uchar *);
+FT_INFO *maria_ft_init_boolean_search(MARIA_HA *, uint, uchar *, size_t,
+ CHARSET_INFO *);
extern const struct _ft_vft _ma_ft_vft_nlq;
int maria_ft_nlq_read_next(FT_INFO *, char *);
diff --git a/storage/maria/ma_init.c b/storage/maria/ma_init.c
index 1f2eddd7e30..552b0767bec 100644
--- a/storage/maria/ma_init.c
+++ b/storage/maria/ma_init.c
@@ -82,6 +82,11 @@ void maria_end(void)
maria_inited= maria_multi_threaded= FALSE;
ft_free_stopwords();
ma_checkpoint_end();
+ if (translog_status == TRANSLOG_OK)
+ {
+ translog_soft_sync_end();
+ translog_sync();
+ }
if ((trid= trnman_get_max_trid()) > max_trid_in_control_file)
{
/*
diff --git a/storage/maria/ma_key_recover.h b/storage/maria/ma_key_recover.h
index 4b4198f3008..b580433c99a 100644
--- a/storage/maria/ma_key_recover.h
+++ b/storage/maria/ma_key_recover.h
@@ -63,7 +63,6 @@ extern my_bool write_hook_for_undo_key_insert(enum translog_record_type type,
extern my_bool write_hook_for_undo_key_delete(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info,
LSN *lsn, void *hook_arg);
-void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn);
my_bool _ma_log_prefix(MARIA_PAGE *page, uint changed_length, int move_length);
my_bool _ma_log_suffix(MARIA_PAGE *page, uint org_length,
diff --git a/storage/maria/ma_locking.c b/storage/maria/ma_locking.c
index e1986a7dfaa..b355d7bc792 100644
--- a/storage/maria/ma_locking.c
+++ b/storage/maria/ma_locking.c
@@ -387,6 +387,9 @@ int _ma_test_if_changed(register MARIA_HA *info)
open_count is not maintained on disk for temporary tables.
*/
+#define _MA_ALREADY_MARKED_FILE_CHANGED \
+ ((share->state.changed & STATE_CHANGED) && share->global_changed)
+
int _ma_mark_file_changed(MARIA_HA *info)
{
uchar buff[3];
@@ -394,8 +397,6 @@ int _ma_mark_file_changed(MARIA_HA *info)
int error= 1;
DBUG_ENTER("_ma_mark_file_changed");
-#define _MA_ALREADY_MARKED_FILE_CHANGED \
- ((share->state.changed & STATE_CHANGED) && share->global_changed)
if (_MA_ALREADY_MARKED_FILE_CHANGED)
DBUG_RETURN(0);
pthread_mutex_lock(&share->intern_lock); /* recheck under mutex */
diff --git a/storage/maria/ma_loghandler.c b/storage/maria/ma_loghandler.c
index c5efab506f1..476b091594e 100644
--- a/storage/maria/ma_loghandler.c
+++ b/storage/maria/ma_loghandler.c
@@ -18,6 +18,7 @@
#include "ma_blockrec.h" /* for some constants and in-write hooks */
#include "ma_key_recover.h" /* For some in-write hooks */
#include "ma_checkpoint.h"
+#include "ma_servicethread.h"
/*
On Windows, neither my_open() nor my_sync() work for directories.
@@ -47,6 +48,15 @@
#include <m_ctype.h>
#endif
+/** @brief protects checkpoint_in_progress */
+static pthread_mutex_t LOCK_soft_sync;
+/** @brief for killing the background checkpoint thread */
+static pthread_cond_t COND_soft_sync;
+/** @brief control structure for checkpoint background thread */
+static MA_SERVICE_THREAD_CONTROL soft_sync_control=
+ {THREAD_DEAD, FALSE, &LOCK_soft_sync, &COND_soft_sync};
+
+
/* transaction log file descriptor */
typedef struct st_translog_file
{
@@ -124,10 +134,24 @@ struct st_translog_buffer
/* Previous buffer offset to detect it flush finish */
TRANSLOG_ADDRESS prev_buffer_offset;
/*
+ If the buffer was forced to close it save value of its horizon
+ otherwise LSN_IMPOSSIBLE
+ */
+ TRANSLOG_ADDRESS pre_force_close_horizon;
+ /*
How much is written (or will be written when copy_to_buffer_in_progress
become 0) to this buffer
*/
translog_size_t size;
+ /*
+ When moving from one log buffer to another, we write the last of the
+ previous buffer to file and then move to start using the new log
+ buffer. In the case of a part filed last page, this page is not moved
+ to the start of the new buffer but instead we set the 'skip_data'
+ variable to tell us how much data at the beginning of the buffer is not
+ relevant.
+ */
+ uint skipped_data;
/* File handler for this buffer */
TRANSLOG_FILE *file;
/* Threads which are waiting for buffer filling/freeing */
@@ -304,6 +328,7 @@ struct st_translog_descriptor
*/
pthread_mutex_t log_flush_lock;
pthread_cond_t log_flush_cond;
+ pthread_cond_t new_goal_cond;
/* Protects changing of headers of finished files (max_lsn) */
pthread_mutex_t file_header_lock;
@@ -344,13 +369,39 @@ static struct st_translog_descriptor log_descriptor;
ulong log_purge_type= TRANSLOG_PURGE_IMMIDIATE;
ulong log_file_size= TRANSLOG_FILE_SIZE;
+/* sync() of log files directory mode */
ulong sync_log_dir= TRANSLOG_SYNC_DIR_NEWFILE;
+ulong maria_group_commit= TRANSLOG_GCOMMIT_NONE;
+ulong maria_group_commit_interval= 0;
/* Marker for end of log */
static uchar end_of_log= 0;
#define END_OF_LOG &end_of_log
+/**
+ Switch for "soft" sync (no real sync() but periodical sync by service
+ thread)
+*/
+static volatile my_bool soft_sync= FALSE;
+/**
+ Switch for "hard" group commit mode
+*/
+static volatile my_bool hard_group_commit= FALSE;
+/**
+ File numbers interval which have to be sync()
+*/
+static uint32 soft_sync_min= 0;
+static uint32 soft_sync_max= 0;
+static uint32 soft_need_sync= 1;
+/**
+ stores interval in microseconds
+*/
+static uint32 group_commit_wait= 0;
enum enum_translog_status translog_status= TRANSLOG_UNINITED;
+ulonglong translog_syncs= 0; /* Number of sync()s */
+
+/* time of last flush */
+static ulonglong flush_start= 0;
/* chunk types */
#define TRANSLOG_CHUNK_LSN 0x00 /* 0 chunk refer as LSN (head or tail */
@@ -980,12 +1031,17 @@ static TRANSLOG_FILE *get_logfile_by_number(uint32 file_no)
static TRANSLOG_FILE *get_current_logfile()
{
TRANSLOG_FILE *file;
+ DBUG_ENTER("get_current_logfile");
rw_rdlock(&log_descriptor.open_files_lock);
+ DBUG_PRINT("info", ("max_file: %lu min_file: %lu open_files: %lu",
+ (ulong) log_descriptor.max_file,
+ (ulong) log_descriptor.min_file,
+ (ulong) log_descriptor.open_files.elements));
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
log_descriptor.open_files.elements);
file= *dynamic_element(&log_descriptor.open_files, 0, TRANSLOG_FILE **);
rw_unlock(&log_descriptor.open_files_lock);
- return (file);
+ DBUG_RETURN(file);
}
uchar NEAR maria_trans_file_magic[]=
@@ -1069,6 +1125,7 @@ static my_bool translog_write_file_header()
static my_bool translog_max_lsn_to_header(File file, LSN lsn)
{
uchar lsn_buff[LSN_STORE_SIZE];
+ my_bool rc;
DBUG_ENTER("translog_max_lsn_to_header");
DBUG_PRINT("enter", ("File descriptor: %ld "
"lsn: (%lu,0x%lx)",
@@ -1077,11 +1134,17 @@ static my_bool translog_max_lsn_to_header(File file, LSN lsn)
lsn_store(lsn_buff, lsn);
- DBUG_RETURN(my_pwrite(file, lsn_buff,
- LSN_STORE_SIZE,
- (LOG_HEADER_DATA_SIZE - LSN_STORE_SIZE),
- log_write_flags) != 0 ||
- my_sync(file, MYF(MY_WME)) != 0);
+ rc= (my_pwrite(file, lsn_buff,
+ LSN_STORE_SIZE,
+ (LOG_HEADER_DATA_SIZE - LSN_STORE_SIZE),
+ log_write_flags) != 0 ||
+ my_sync(file, MYF(MY_WME)) != 0);
+ /*
+ We should not increase counter in case of error above, but it is so
+ unlikely that we can ignore this case
+ */
+ translog_syncs++;
+ DBUG_RETURN(rc);
}
@@ -1394,6 +1457,7 @@ LSN translog_get_file_max_lsn_stored(uint32 file)
{
LOGHANDLER_FILE_INFO info;
+ LINT_INIT_STRUCT(info);
File fd= open_logfile_by_number_no_cache(file);
if ((fd < 0) ||
(translog_read_file_header(&info, fd) | my_close(fd, MYF(MY_WME))))
@@ -1423,7 +1487,9 @@ LSN translog_get_file_max_lsn_stored(uint32 file)
static my_bool translog_buffer_init(struct st_translog_buffer *buffer, int num)
{
DBUG_ENTER("translog_buffer_init");
- buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE;
+ buffer->pre_force_close_horizon=
+ buffer->prev_last_lsn= buffer->last_lsn=
+ LSN_IMPOSSIBLE;
DBUG_PRINT("info", ("last_lsn and prev_last_lsn set to 0 buffer: 0x%lx",
(ulong) buffer));
@@ -1435,6 +1501,7 @@ static my_bool translog_buffer_init(struct st_translog_buffer *buffer, int num)
memset(buffer->buffer, TRANSLOG_FILLER, TRANSLOG_WRITE_BUFFER);
/* Buffer size */
buffer->size= 0;
+ buffer->skipped_data= 0;
/* cond of thread which is waiting for buffer filling */
if (pthread_cond_init(&buffer->waiting_filling_buffer, 0))
DBUG_RETURN(1);
@@ -1489,7 +1556,10 @@ static my_bool translog_close_log_file(TRANSLOG_FILE *file)
TODO: sync only we have changed the log
*/
if (!file->is_sync)
+ {
rc= my_sync(file->handler.file, MYF(MY_WME));
+ translog_syncs++;
+ }
rc|= my_close(file->handler.file, MYF(MY_WME));
my_free(file, MYF(0));
return test(rc);
@@ -2044,7 +2114,8 @@ static void translog_start_buffer(struct st_translog_buffer *buffer,
(ulong) LSN_OFFSET(log_descriptor.horizon),
(ulong) LSN_OFFSET(log_descriptor.horizon)));
DBUG_ASSERT(buffer_no == buffer->buffer_no);
- buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE;
+ buffer->pre_force_close_horizon=
+ buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE;
DBUG_PRINT("info", ("last_lsn and prev_last_lsn set to 0 buffer: 0x%lx",
(ulong) buffer));
buffer->offset= log_descriptor.horizon;
@@ -2052,6 +2123,7 @@ static void translog_start_buffer(struct st_translog_buffer *buffer,
buffer->file= get_current_logfile();
buffer->overlay= 0;
buffer->size= 0;
+ buffer->skipped_data= 0;
translog_cursor_init(cursor, buffer, buffer_no);
DBUG_PRINT("info", ("file: #%ld (%d) init cursor #%u: 0x%lx "
"chaser: %d Size: %lu (%lu)",
@@ -2523,6 +2595,7 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
TRANSLOG_ADDRESS offset= buffer->offset;
TRANSLOG_FILE *file= buffer->file;
uint8 ver= buffer->ver;
+ uint skipped_data;
DBUG_ENTER("translog_buffer_flush");
DBUG_PRINT("enter",
("Buffer: #%u 0x%lx file: %d offset: (%lu,0x%lx) size: %lu",
@@ -2557,6 +2630,8 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
disk
*/
file= buffer->file;
+ skipped_data= buffer->skipped_data;
+ DBUG_ASSERT(skipped_data < TRANSLOG_PAGE_SIZE);
for (i= 0, pg= LSN_OFFSET(buffer->offset) / TRANSLOG_PAGE_SIZE;
i < buffer->size;
i+= TRANSLOG_PAGE_SIZE, pg++)
@@ -2573,13 +2648,16 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
DBUG_ASSERT(i + TRANSLOG_PAGE_SIZE <= buffer->size);
if (translog_status != TRANSLOG_OK && translog_status != TRANSLOG_SHUTDOWN)
DBUG_RETURN(1);
- if (pagecache_inject(log_descriptor.pagecache,
+ if (pagecache_write_part(log_descriptor.pagecache,
&file->handler, pg, 3,
buffer->buffer + i,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED,
- PAGECACHE_PIN_LEFT_UNPINNED, 0,
- LSN_IMPOSSIBLE))
+ PAGECACHE_PIN_LEFT_UNPINNED,
+ PAGECACHE_WRITE_DONE, 0,
+ LSN_IMPOSSIBLE,
+ skipped_data,
+ TRANSLOG_PAGE_SIZE - skipped_data))
{
DBUG_PRINT("error",
("Can't write page (%lu,0x%lx) to pagecache, error: %d",
@@ -2589,10 +2667,12 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
translog_stop_writing();
DBUG_RETURN(1);
}
+ skipped_data= 0;
}
file->is_sync= 0;
- if (my_pwrite(file->handler.file, buffer->buffer,
- buffer->size, LSN_OFFSET(buffer->offset),
+ if (my_pwrite(file->handler.file, buffer->buffer + buffer->skipped_data,
+ buffer->size - buffer->skipped_data,
+ LSN_OFFSET(buffer->offset) + buffer->skipped_data,
log_write_flags))
{
DBUG_PRINT("error", ("Can't write buffer (%lu,0x%lx) size %lu "
@@ -2823,8 +2903,8 @@ static my_bool translog_page_validator(uchar *page,
data->was_recovered= 0;
- if (uint3korr(page) != page_no ||
- uint3korr(page + 3) != data->number)
+ if ((pgcache_page_no_t) uint3korr(page) != page_no ||
+ (uint32) uint3korr(page + 3) != data->number)
{
DBUG_PRINT("error", ("Page (%lu,0x%lx): "
"page address written in the page is incorrect: "
@@ -2985,6 +3065,7 @@ restart:
uchar *from, *table= NULL;
int is_last_unfinished_page;
uint last_protected_sector= 0;
+ uint skipped_data= curr_buffer->skipped_data;
TRANSLOG_FILE file_copy;
uint8 ver= curr_buffer->ver;
translog_wait_for_writers(curr_buffer);
@@ -2997,7 +3078,38 @@ restart:
}
DBUG_ASSERT(LSN_FILE_NO(addr) == LSN_FILE_NO(curr_buffer->offset));
from= curr_buffer->buffer + (addr - curr_buffer->offset);
- memcpy(buffer, from, TRANSLOG_PAGE_SIZE);
+ if (skipped_data && addr == curr_buffer->offset)
+ {
+ /*
+ We read page part of which is not present in buffer,
+ so we should read absent part from file (page cache actually)
+ */
+ file= get_logfile_by_number(file_no);
+ DBUG_ASSERT(file != NULL);
+ /*
+ it's ok to not lock the page because:
+ - The log handler has it's own page cache.
+ - There is only one thread that can access the log
+ cache at a time
+ */
+ if (!(buffer= pagecache_read(log_descriptor.pagecache,
+ &file->handler,
+ LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE,
+ 3, buffer,
+ PAGECACHE_PLAIN_PAGE,
+ PAGECACHE_LOCK_LEFT_UNLOCKED,
+ NULL)))
+ DBUG_RETURN(NULL);
+ }
+ else
+ skipped_data= 0; /* Read after skipped in buffer data */
+ /*
+ Now we have correct data in buffer up to 'skipped_data'. The
+ following memcpy() will move the data from the internal buffer
+ that was not yet on disk.
+ */
+ memcpy(buffer + skipped_data, from + skipped_data,
+ TRANSLOG_PAGE_SIZE - skipped_data);
/*
We can use copy then in translog_page_validator() because it
do not put it permanently somewhere.
@@ -3291,6 +3403,7 @@ static my_bool translog_truncate_log(TRANSLOG_ADDRESS addr)
uint32 next_page_offset, page_rest;
uint32 i;
File fd;
+ int rc;
TRANSLOG_VALIDATOR_DATA data;
char path[FN_REFLEN];
uchar page_buff[TRANSLOG_PAGE_SIZE];
@@ -3316,14 +3429,19 @@ static my_bool translog_truncate_log(TRANSLOG_ADDRESS addr)
TRANSLOG_PAGE_SIZE);
page_rest= next_page_offset - LSN_OFFSET(addr);
memset(page_buff, TRANSLOG_FILLER, page_rest);
- if ((fd= open_logfile_by_number_no_cache(LSN_FILE_NO(addr))) < 0 ||
- ((my_chsize(fd, next_page_offset, TRANSLOG_FILLER, MYF(MY_WME)) ||
- (page_rest && my_pwrite(fd, page_buff, page_rest, LSN_OFFSET(addr),
- log_write_flags)) ||
- my_sync(fd, MYF(MY_WME))) |
- my_close(fd, MYF(MY_WME))) ||
- (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS &&
- sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD))))
+ rc= ((fd= open_logfile_by_number_no_cache(LSN_FILE_NO(addr))) < 0 ||
+ ((my_chsize(fd, next_page_offset, TRANSLOG_FILLER, MYF(MY_WME)) ||
+ (page_rest && my_pwrite(fd, page_buff, page_rest, LSN_OFFSET(addr),
+ log_write_flags)) ||
+ my_sync(fd, MYF(MY_WME)))));
+ translog_syncs++;
+ rc|= (fd > 0 && my_close(fd, MYF(MY_WME)));
+ if (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS)
+ {
+ rc|= sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD));
+ translog_syncs++;
+ }
+ if (rc)
DBUG_RETURN(1);
/* fix the horizon */
@@ -3483,7 +3601,10 @@ my_bool translog_init_with_table(const char *directory,
my_bool version_changed= 0;
DBUG_ENTER("translog_init_with_table");
+ translog_syncs= 0;
+ flush_start= 0;
id_to_share= NULL;
+
log_descriptor.directory_fd= -1;
log_descriptor.is_everything_flushed= 1;
log_descriptor.flush_in_progress= 0;
@@ -3511,6 +3632,7 @@ my_bool translog_init_with_table(const char *directory,
pthread_mutex_init(&log_descriptor.dirty_buffer_mask_lock,
MY_MUTEX_INIT_FAST) ||
pthread_cond_init(&log_descriptor.log_flush_cond, 0) ||
+ pthread_cond_init(&log_descriptor.new_goal_cond, 0) ||
my_rwlock_init(&log_descriptor.open_files_lock,
NULL) ||
my_init_dynamic_array(&log_descriptor.open_files,
@@ -3912,7 +4034,6 @@ my_bool translog_init_with_table(const char *directory,
log_descriptor.flushed= log_descriptor.horizon;
log_descriptor.in_buffers_only= log_descriptor.bc.buffer->offset;
log_descriptor.max_lsn= LSN_IMPOSSIBLE; /* set to 0 */
- log_descriptor.previous_flush_horizon= log_descriptor.horizon;
/*
Now 'flushed' is set to 'horizon' value, but 'horizon' is (potentially)
address of the next LSN and we want indicate that all LSNs that are
@@ -3995,6 +4116,10 @@ my_bool translog_init_with_table(const char *directory,
It is beginning of the log => there is no LSNs in the log =>
There is no harm in leaving it "as-is".
*/
+ log_descriptor.previous_flush_horizon= log_descriptor.horizon;
+ DBUG_PRINT("info", ("previous_flush_horizon: (%lu,0x%lx)",
+ LSN_IN_PARTS(log_descriptor.
+ previous_flush_horizon)));
DBUG_RETURN(0);
}
file_no--;
@@ -4070,6 +4195,9 @@ my_bool translog_init_with_table(const char *directory,
translog_free_record_header(&rec);
}
}
+ log_descriptor.previous_flush_horizon= log_descriptor.horizon;
+ DBUG_PRINT("info", ("previous_flush_horizon: (%lu,0x%lx)",
+ LSN_IN_PARTS(log_descriptor.previous_flush_horizon)));
DBUG_RETURN(0);
err:
ma_message_no_user(0, "log initialization failed");
@@ -4157,6 +4285,7 @@ void translog_destroy()
pthread_mutex_destroy(&log_descriptor.log_flush_lock);
pthread_mutex_destroy(&log_descriptor.dirty_buffer_mask_lock);
pthread_cond_destroy(&log_descriptor.log_flush_cond);
+ pthread_cond_destroy(&log_descriptor.new_goal_cond);
rwlock_destroy(&log_descriptor.open_files_lock);
delete_dynamic(&log_descriptor.open_files);
delete_dynamic(&log_descriptor.unfinished_files);
@@ -6885,11 +7014,11 @@ int translog_read_record_header_from_buffer(uchar *page,
{
translog_size_t res;
DBUG_ENTER("translog_read_record_header_from_buffer");
+ DBUG_PRINT("info", ("page byte: 0x%x offset: %u",
+ (uint) page[page_offset], (uint) page_offset));
DBUG_ASSERT(translog_is_LSN_chunk(page[page_offset]));
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
- DBUG_PRINT("info", ("page byte: 0x%x offset: %u",
- (uint) page[page_offset], (uint) page_offset));
buff->type= (page[page_offset] & TRANSLOG_REC_TYPE);
buff->short_trid= uint2korr(page + page_offset + 1);
DBUG_PRINT("info", ("Type %u, Short TrID %u, LSN (%lu,0x%lx)",
@@ -7356,27 +7485,27 @@ static void translog_force_current_buffer_to_finish()
"Buffer addr: (%lu,0x%lx) "
"Page addr: (%lu,0x%lx) "
"size: %lu (%lu) Pg: %u left: %u in progress %u",
- (uint) log_descriptor.bc.buffer_no,
- (ulong) log_descriptor.bc.buffer,
- LSN_IN_PARTS(log_descriptor.bc.buffer->offset),
+ (uint) old_buffer_no,
+ (ulong) old_buffer,
+ LSN_IN_PARTS(old_buffer->offset),
(ulong) LSN_FILE_NO(log_descriptor.horizon),
(ulong) (LSN_OFFSET(log_descriptor.horizon) -
log_descriptor.bc.current_page_fill),
- (ulong) log_descriptor.bc.buffer->size,
+ (ulong) old_buffer->size,
(ulong) (log_descriptor.bc.ptr -log_descriptor.bc.
buffer->buffer),
(uint) log_descriptor.bc.current_page_fill,
(uint) left,
- (uint) log_descriptor.bc.buffer->
+ (uint) old_buffer->
copy_to_buffer_in_progress));
translog_lock_assert_owner();
LINT_INIT(current_page_fill);
- new_buff_beginning= log_descriptor.bc.buffer->offset;
- new_buff_beginning+= log_descriptor.bc.buffer->size; /* increase offset */
+ new_buff_beginning= old_buffer->offset;
+ new_buff_beginning+= old_buffer->size; /* increase offset */
DBUG_ASSERT(log_descriptor.bc.ptr !=NULL);
DBUG_ASSERT(LSN_FILE_NO(log_descriptor.horizon) ==
- LSN_FILE_NO(log_descriptor.bc.buffer->offset));
+ LSN_FILE_NO(old_buffer->offset));
translog_check_cursor(&log_descriptor.bc);
DBUG_ASSERT(left < TRANSLOG_PAGE_SIZE);
if (left)
@@ -7387,18 +7516,20 @@ static void translog_force_current_buffer_to_finish()
*/
DBUG_PRINT("info", ("left: %u", (uint) left));
+ old_buffer->pre_force_close_horizon=
+ old_buffer->offset + old_buffer->size;
/* decrease offset */
new_buff_beginning-= log_descriptor.bc.current_page_fill;
current_page_fill= log_descriptor.bc.current_page_fill;
memset(log_descriptor.bc.ptr, TRANSLOG_FILLER, left);
- log_descriptor.bc.buffer->size+= left;
+ old_buffer->size+= left;
DBUG_PRINT("info", ("Finish Page buffer #%u: 0x%lx "
"Size: %lu",
- (uint) log_descriptor.bc.buffer->buffer_no,
- (ulong) log_descriptor.bc.buffer,
- (ulong) log_descriptor.bc.buffer->size));
- DBUG_ASSERT(log_descriptor.bc.buffer->buffer_no ==
+ (uint) old_buffer->buffer_no,
+ (ulong) old_buffer,
+ (ulong) old_buffer->size));
+ DBUG_ASSERT(old_buffer->buffer_no ==
log_descriptor.bc.buffer_no);
}
else
@@ -7509,11 +7640,21 @@ static void translog_force_current_buffer_to_finish()
if (left)
{
- /*
- TODO: do not copy beginning of the page if we have no CRC or sector
- checks on
- */
- memcpy(new_buffer->buffer, data, current_page_fill);
+ if (log_descriptor.flags &
+ (TRANSLOG_PAGE_CRC | TRANSLOG_SECTOR_PROTECTION))
+ memcpy(new_buffer->buffer, data, current_page_fill);
+ else
+ {
+ /*
+ This page header does not change if we add more data to the page so
+ we can not copy it and will not overwrite later
+ */
+ new_buffer->skipped_data= current_page_fill;
+#ifndef DBUG_OFF
+ memset(new_buffer->buffer, 0xa5, current_page_fill);
+#endif
+ DBUG_ASSERT(new_buffer->skipped_data < TRANSLOG_PAGE_SIZE);
+ }
}
old_buffer->next_buffer_offset= new_buffer->offset;
translog_buffer_lock(new_buffer);
@@ -7561,6 +7702,7 @@ void translog_flush_set_new_goal_and_wait(TRANSLOG_ADDRESS lsn)
{
log_descriptor.next_pass_max_lsn= lsn;
log_descriptor.max_lsn_requester= pthread_self();
+ pthread_cond_broadcast(&log_descriptor.new_goal_cond);
}
while (flush_no == log_descriptor.flush_no)
{
@@ -7572,67 +7714,79 @@ void translog_flush_set_new_goal_and_wait(TRANSLOG_ADDRESS lsn)
/**
- @brief Flush the log up to given LSN (included)
+ @brief sync() range of files (inclusive) and directory (by request)
- @param lsn log record serial number up to which (inclusive)
- the log has to be flushed
+ @param min min internal file number to flush
+ @param max max internal file number to flush
+ @param sync_dir need sync directory
- @return Operation status
+ return Operation status
@retval 0 OK
@retval 1 Error
-
*/
-my_bool translog_flush(TRANSLOG_ADDRESS lsn)
+static my_bool translog_sync_files(uint32 min, uint32 max,
+ my_bool sync_dir)
{
- LSN sent_to_disk= LSN_IMPOSSIBLE;
- TRANSLOG_ADDRESS flush_horizon;
- uint fn, i;
- dirty_buffer_mask_t dirty_buffer_mask;
- uint8 last_buffer_no, start_buffer_no;
+ uint fn;
my_bool rc= 0;
- DBUG_ENTER("translog_flush");
- DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
- DBUG_ASSERT(translog_status == TRANSLOG_OK ||
- translog_status == TRANSLOG_READONLY);
- LINT_INIT(sent_to_disk);
+ ulonglong flush_interval;
+ DBUG_ENTER("translog_sync_files");
+ DBUG_PRINT("info", ("min: %lu max: %lu sync dir: %d",
+ (ulong) min, (ulong) max, (int) sync_dir));
+ DBUG_ASSERT(min <= max);
- pthread_mutex_lock(&log_descriptor.log_flush_lock);
- DBUG_PRINT("info", ("Everything is flushed up to (%lu,0x%lx)",
- LSN_IN_PARTS(log_descriptor.flushed)));
- if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0)
+ flush_interval= group_commit_wait;
+ if (flush_interval)
+ flush_start= my_micro_time();
+ for (fn= min; fn <= max; fn++)
{
- pthread_mutex_unlock(&log_descriptor.log_flush_lock);
- DBUG_RETURN(0);
- }
- if (log_descriptor.flush_in_progress)
- {
- translog_flush_set_new_goal_and_wait(lsn);
- if (!pthread_equal(log_descriptor.max_lsn_requester, pthread_self()))
+ TRANSLOG_FILE *file= get_logfile_by_number(fn);
+ DBUG_ASSERT(file != NULL);
+ if (!file->is_sync)
{
- /* fix lsn if it was horizon */
- if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->last_lsn) > 0)
- lsn= BUFFER_MAX_LSN(log_descriptor.bc.buffer);
- translog_flush_wait_for_end(lsn);
- pthread_mutex_unlock(&log_descriptor.log_flush_lock);
- DBUG_RETURN(0);
+ if (my_sync(file->handler.file, MYF(MY_WME)))
+ {
+ rc= 1;
+ translog_stop_writing();
+ DBUG_RETURN(rc);
+ }
+ translog_syncs++;
+ file->is_sync= 1;
}
- log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE;
}
- log_descriptor.flush_in_progress= 1;
- flush_horizon= log_descriptor.previous_flush_horizon;
- DBUG_PRINT("info", ("flush_in_progress is set"));
- pthread_mutex_unlock(&log_descriptor.log_flush_lock);
- translog_lock();
- if (log_descriptor.is_everything_flushed)
+ if (sync_dir)
{
- DBUG_PRINT("info", ("everything is flushed"));
- rc= (translog_status == TRANSLOG_READONLY);
- translog_unlock();
- goto out;
+ if (!(rc= sync_dir(log_descriptor.directory_fd,
+ MYF(MY_WME | MY_IGNORE_BADFD))))
+ translog_syncs++;
}
+ DBUG_RETURN(rc);
+}
+
+
+/*
+ @brief Flushes buffers with LSNs in them less or equal address <lsn>
+
+ @param lsn address up to which all LSNs should be flushed,
+ can be reset to real last LSN address
+ @parem sent_to_disk returns 'sent to disk' position
+ @param flush_horizon returns horizon of the flush
+
+ @note About terminology see comment to translog_flush().
+*/
+
+void translog_flush_buffers(TRANSLOG_ADDRESS *lsn,
+ TRANSLOG_ADDRESS *sent_to_disk,
+ TRANSLOG_ADDRESS *flush_horizon)
+{
+ dirty_buffer_mask_t dirty_buffer_mask;
+ uint i;
+ uint8 last_buffer_no, start_buffer_no;
+ DBUG_ENTER("translog_flush_buffers");
+
/*
We will recheck information when will lock buffers one by
one so we can use unprotected read here (this is just for
@@ -7656,15 +7810,15 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn)
/*
if LSN up to which we have to flush bigger then maximum LSN of previous
buffer and at least one LSN was saved in the current buffer (last_lsn !=
- LSN_IMPOSSIBLE) then we better finish the current buffer.
+ LSN_IMPOSSIBLE) then we have to close the current buffer.
*/
- if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->prev_last_lsn) > 0 &&
+ if (cmp_translog_addr(*lsn, log_descriptor.bc.buffer->prev_last_lsn) > 0 &&
log_descriptor.bc.buffer->last_lsn != LSN_IMPOSSIBLE)
{
struct st_translog_buffer *buffer= log_descriptor.bc.buffer;
- lsn= log_descriptor.bc.buffer->last_lsn; /* fix lsn if it was horizon */
+ *lsn= log_descriptor.bc.buffer->last_lsn; /* fix lsn if it was horizon */
DBUG_PRINT("info", ("LSN to flush fixed to last lsn: (%lu,0x%lx)",
- LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn)));
+ LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn)));
last_buffer_no= log_descriptor.bc.buffer_no;
log_descriptor.is_everything_flushed= 1;
translog_force_current_buffer_to_finish();
@@ -7676,8 +7830,10 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn)
TRANSLOG_BUFFERS_NO);
translog_unlock();
}
- sent_to_disk= translog_get_sent_to_disk();
- if (cmp_translog_addr(lsn, sent_to_disk) > 0)
+
+ /* flush buffers */
+ *sent_to_disk= translog_get_sent_to_disk();
+ if (cmp_translog_addr(*lsn, *sent_to_disk) > 0)
{
DBUG_PRINT("info", ("Start buffer #: %u last buffer #: %u",
@@ -7697,53 +7853,238 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn)
LSN_IN_PARTS(buffer->last_lsn),
(buffer->file ?
"dirty" : "closed")));
- if (buffer->prev_last_lsn <= lsn &&
+ if (buffer->prev_last_lsn <= *lsn &&
buffer->file != NULL)
{
- DBUG_ASSERT(flush_horizon <= buffer->offset + buffer->size);
- flush_horizon= buffer->offset + buffer->size;
+ DBUG_ASSERT(*flush_horizon <= buffer->offset + buffer->size);
+ *flush_horizon= (buffer->pre_force_close_horizon != LSN_IMPOSSIBLE ?
+ buffer->pre_force_close_horizon :
+ buffer->offset + buffer->size);
+ /* pre_force_close_horizon is reset during new buffer start */
+ DBUG_PRINT("info", ("flush_horizon: (%lu,0x%lx)",
+ LSN_IN_PARTS(*flush_horizon)));
+ DBUG_ASSERT(*flush_horizon <= log_descriptor.horizon);
+
translog_buffer_flush(buffer);
}
translog_buffer_unlock(buffer);
i= (i + 1) % TRANSLOG_BUFFERS_NO;
} while (i != last_buffer_no);
- sent_to_disk= translog_get_sent_to_disk();
+ *sent_to_disk= translog_get_sent_to_disk();
+ }
+
+ DBUG_VOID_RETURN;
+}
+
+/**
+ @brief Flush the log up to given LSN (included)
+
+ @param lsn log record serial number up to which (inclusive)
+ the log has to be flushed
+
+ @return Operation status
+ @retval 0 OK
+ @retval 1 Error
+
+ @note
+
+ - Non group commit logic: Commits made in passes. Thread which started
+ flush first is performing actual flush, other threads sets new goal (LSN)
+ of the next pass (if it is maximum) and waits for the pass end or just
+ wait for the pass end.
+
+ - If hard group commit enabled and rate set to zero:
+ The first thread sends all changed buffers to disk. This is repeated
+ as long as there are new LSNs added. The process can not loop
+ forever because we have limited number of threads and they will wait
+ for the data to be synced.
+ Pseudo code:
+
+ do
+ send changed buffers to disk
+ while new_goal
+ sync
+
+ - If hard group commit switched ON and less than rate microseconds has
+ passed from last sync, then after buffers have been sent to disk
+ wait until rate microseconds has passed since last sync, do sync and return.
+ This ensures that if we call sync infrequently we don't do any waits.
+
+ - If soft group commit enabled everything works as with 'non group commit'
+ but the thread doesn't do any real sync(). If rate is not zero the
+ sync() will be performed by a service thread with the given rate
+ when needed (new LSN appears).
+
+ @note Terminology:
+ 'sent to disk' means written to disk but not sync()ed,
+ 'flushed' mean sent to disk and synced().
+*/
+
+my_bool translog_flush(TRANSLOG_ADDRESS lsn)
+{
+ struct timespec abstime;
+ ulonglong flush_interval;
+ ulonglong time_spent;
+ LSN sent_to_disk= LSN_IMPOSSIBLE;
+ TRANSLOG_ADDRESS flush_horizon;
+ my_bool rc= 0;
+ my_bool hgroup_commit_at_start;
+ DBUG_ENTER("translog_flush");
+ DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
+ DBUG_ASSERT(translog_status == TRANSLOG_OK ||
+ translog_status == TRANSLOG_READONLY);
+ LINT_INIT(sent_to_disk);
+
+ pthread_mutex_lock(&log_descriptor.log_flush_lock);
+ DBUG_PRINT("info", ("Everything is flushed up to (%lu,0x%lx)",
+ LSN_IN_PARTS(log_descriptor.flushed)));
+ if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0)
+ {
+ pthread_mutex_unlock(&log_descriptor.log_flush_lock);
+ DBUG_RETURN(0);
}
+ if (log_descriptor.flush_in_progress)
+ {
+ translog_lock();
+ /* fix lsn if it was horizon */
+ if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->last_lsn) > 0)
+ lsn= BUFFER_MAX_LSN(log_descriptor.bc.buffer);
+ translog_unlock();
+ translog_flush_set_new_goal_and_wait(lsn);
+ if (!pthread_equal(log_descriptor.max_lsn_requester, pthread_self()))
+ {
+ /*
+ translog_flush_wait_for_end() release log_flush_lock while is
+ waiting then acquire it again
+ */
+ translog_flush_wait_for_end(lsn);
+ pthread_mutex_unlock(&log_descriptor.log_flush_lock);
+ DBUG_RETURN(0);
+ }
+ log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE;
+ }
+ log_descriptor.flush_in_progress= 1;
+ flush_horizon= log_descriptor.previous_flush_horizon;
+ DBUG_PRINT("info", ("flush_in_progress is set, flush_horizon: (%lu,0x%lx)",
+ LSN_IN_PARTS(flush_horizon)));
+ pthread_mutex_unlock(&log_descriptor.log_flush_lock);
+
+ hgroup_commit_at_start= hard_group_commit;
+ if (hgroup_commit_at_start)
+ flush_interval= group_commit_wait;
- /* sync files from previous flush till current one */
- for (fn= LSN_FILE_NO(log_descriptor.flushed); fn <= LSN_FILE_NO(lsn); fn++)
+ translog_lock();
+ if (log_descriptor.is_everything_flushed)
{
- TRANSLOG_FILE *file= get_logfile_by_number(fn);
- DBUG_ASSERT(file != NULL);
- if (!file->is_sync)
+ DBUG_PRINT("info", ("everything is flushed"));
+ translog_unlock();
+ pthread_mutex_lock(&log_descriptor.log_flush_lock);
+ goto out;
+ }
+
+ for (;;)
+ {
+ /* Following function flushes buffers and makes translog_unlock() */
+ translog_flush_buffers(&lsn, &sent_to_disk, &flush_horizon);
+
+ if (!hgroup_commit_at_start)
+ break; /* flush pass is ended */
+
+retest:
+ /*
+ We do not check time here because pthread_mutex_lock rarely takes
+ a lot of time so we can sacrifice a bit precision to performance
+ (taking into account that my_micro_time() might be expensive call).
+ */
+ if (flush_interval == 0)
+ break; /* flush pass is ended */
+
+ pthread_mutex_lock(&log_descriptor.log_flush_lock);
+ if (log_descriptor.next_pass_max_lsn == LSN_IMPOSSIBLE)
{
- if (my_sync(file->handler.file, MYF(MY_WME)))
+ if (flush_interval == 0 ||
+ (time_spent= (my_micro_time() - flush_start)) >= flush_interval)
{
- rc= 1;
- translog_stop_writing();
- sent_to_disk= LSN_IMPOSSIBLE;
- goto out;
+ pthread_mutex_unlock(&log_descriptor.log_flush_lock);
+ break;
}
- file->is_sync= 1;
+ DBUG_PRINT("info", ("flush waits: %llu interval: %llu spent: %llu",
+ flush_interval - time_spent,
+ flush_interval, time_spent));
+ /* wait time or next goal */
+ set_timespec_nsec(abstime, flush_interval - time_spent);
+ pthread_cond_timedwait(&log_descriptor.new_goal_cond,
+ &log_descriptor.log_flush_lock,
+ &abstime);
+ pthread_mutex_unlock(&log_descriptor.log_flush_lock);
+ DBUG_PRINT("info", ("retest conditions"));
+ goto retest;
}
+
+ /* take next goal */
+ lsn= log_descriptor.next_pass_max_lsn;
+ log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE;
+ /* prevent other thread from continue */
+ log_descriptor.max_lsn_requester= pthread_self();
+ DBUG_PRINT("info", ("flush took next goal: (%lu,0x%lx)",
+ LSN_IN_PARTS(lsn)));
+ pthread_mutex_unlock(&log_descriptor.log_flush_lock);
+
+ /* next flush pass */
+ DBUG_PRINT("info", ("next flush pass"));
+ translog_lock();
}
- if (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS &&
- (LSN_FILE_NO(log_descriptor.previous_flush_horizon) !=
- LSN_FILE_NO(flush_horizon) ||
- ((LSN_OFFSET(log_descriptor.previous_flush_horizon) - 1) /
- TRANSLOG_PAGE_SIZE) !=
- ((LSN_OFFSET(flush_horizon) - 1) / TRANSLOG_PAGE_SIZE)))
- rc|= sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD));
+ /*
+ sync() files from previous flush till current one
+ */
+ if (!soft_sync || hgroup_commit_at_start)
+ {
+ if ((rc=
+ translog_sync_files(LSN_FILE_NO(log_descriptor.flushed),
+ LSN_FILE_NO(lsn),
+ sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS &&
+ (LSN_FILE_NO(log_descriptor.
+ previous_flush_horizon) !=
+ LSN_FILE_NO(flush_horizon) ||
+ (LSN_OFFSET(log_descriptor.
+ previous_flush_horizon) /
+ TRANSLOG_PAGE_SIZE) !=
+ (LSN_OFFSET(flush_horizon) /
+ TRANSLOG_PAGE_SIZE)))))
+ {
+ sent_to_disk= LSN_IMPOSSIBLE;
+ pthread_mutex_lock(&log_descriptor.log_flush_lock);
+ goto out;
+ }
+ /* keep values for soft sync() and forced sync() actual */
+ {
+ uint32 fileno= LSN_FILE_NO(lsn);
+ my_atomic_rwlock_wrlock(&soft_sync_rwl);
+ my_atomic_store32(&soft_sync_min, fileno);
+ my_atomic_store32(&soft_sync_max, fileno);
+ my_atomic_rwlock_wrunlock(&soft_sync_rwl);
+ }
+ }
+ else
+ {
+ my_atomic_rwlock_wrlock(&soft_sync_rwl);
+ my_atomic_store32(&soft_sync_max, LSN_FILE_NO(lsn));
+ my_atomic_store32(&soft_need_sync, 1);
+ my_atomic_rwlock_wrunlock(&soft_sync_rwl);
+ }
+
+ DBUG_ASSERT(flush_horizon <= log_descriptor.horizon);
+
+ pthread_mutex_lock(&log_descriptor.log_flush_lock);
log_descriptor.previous_flush_horizon= flush_horizon;
out:
- pthread_mutex_lock(&log_descriptor.log_flush_lock);
if (sent_to_disk != LSN_IMPOSSIBLE)
log_descriptor.flushed= sent_to_disk;
log_descriptor.flush_in_progress= 0;
log_descriptor.flush_no++;
DBUG_PRINT("info", ("flush_in_progress is dropped"));
- pthread_mutex_unlock(&log_descriptor.log_flush_lock);\
+ pthread_mutex_unlock(&log_descriptor.log_flush_lock);
pthread_cond_broadcast(&log_descriptor.log_flush_cond);
DBUG_RETURN(rc);
}
@@ -8113,6 +8454,8 @@ LSN translog_first_theoretical_lsn()
my_bool translog_purge(TRANSLOG_ADDRESS low)
{
uint32 last_need_file= LSN_FILE_NO(low);
+ uint32 min_unsync;
+ int soft;
TRANSLOG_ADDRESS horizon= translog_get_horizon();
int rc= 0;
DBUG_ENTER("translog_purge");
@@ -8120,12 +8463,26 @@ my_bool translog_purge(TRANSLOG_ADDRESS low)
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
+ soft= soft_sync;
+ my_atomic_rwlock_wrlock(&soft_sync_rwl);
+ min_unsync= my_atomic_load32(&soft_sync_min);
+ my_atomic_rwlock_wrunlock(&soft_sync_rwl);
+ DBUG_PRINT("info", ("min_unsync: %lu", (ulong) min_unsync));
+ if (soft && min_unsync < last_need_file)
+ {
+ last_need_file= min_unsync;
+ DBUG_PRINT("info", ("last_need_file set to %lu", (ulong)last_need_file));
+ }
+
pthread_mutex_lock(&log_descriptor.purger_lock);
+ DBUG_PRINT("info", ("last_lsn_checked file: %lu:",
+ (ulong) log_descriptor.last_lsn_checked));
if (LSN_FILE_NO(log_descriptor.last_lsn_checked) < last_need_file)
{
uint32 i;
uint32 min_file= translog_first_file(horizon, 1);
DBUG_ASSERT(min_file != 0); /* log is already started */
+ DBUG_PRINT("info", ("min_file: %lu:",(ulong) min_file));
for(i= min_file; i < last_need_file && rc == 0; i++)
{
LSN lsn= translog_get_file_max_lsn_stored(i);
@@ -8356,6 +8713,159 @@ my_bool translog_log_debug_info(TRN *trn __attribute__((unused)),
}
+
+/**
+ Sets soft sync mode
+
+ @param mode TRUE if we need switch soft sync on else off
+*/
+
+void translog_soft_sync(my_bool mode)
+{
+ soft_sync= mode;
+}
+
+
+/**
+ Sets hard group commit
+
+ @param mode TRUE if we need switch hard group commit on else off
+*/
+
+void translog_hard_group_commit(my_bool mode)
+{
+ hard_group_commit= mode;
+}
+
+
+/**
+ @brief forced log sync (used when we are switching modes)
+*/
+
+void translog_sync()
+{
+ uint32 max= get_current_logfile()->number;
+ uint32 min;
+ DBUG_ENTER("ma_translog_sync");
+
+ my_atomic_rwlock_rdlock(&soft_sync_rwl);
+ min= my_atomic_load32(&soft_sync_min);
+ my_atomic_rwlock_rdunlock(&soft_sync_rwl);
+ if (!min)
+ min= max;
+
+ translog_sync_files(min, max, sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS);
+
+ DBUG_VOID_RETURN;
+}
+
+
+/**
+ @brief set rate for group commit
+
+ @param interval interval to set.
+
+ @note We use this function with additional variable because have to
+ restart service thread with new value which we can't make inside changing
+ variable routine (update_maria_group_commit_interval)
+*/
+
+void translog_set_group_commit_interval(uint32 interval)
+{
+ DBUG_ENTER("translog_set_group_commit_interval");
+ group_commit_wait= interval;
+ DBUG_PRINT("info", ("wait: %llu",
+ (ulonglong)group_commit_wait));
+ DBUG_VOID_RETURN;
+}
+
+
+/**
+ @brief syncing service thread
+*/
+
+static pthread_handler_t
+ma_soft_sync_background( void *arg __attribute__((unused)))
+{
+
+ my_thread_init();
+ {
+ DBUG_ENTER("ma_soft_sync_background");
+ for(;;)
+ {
+ ulonglong prev_loop= my_micro_time();
+ ulonglong time, sleep;
+ uint32 min, max, sync_request;
+ my_atomic_rwlock_rdlock(&soft_sync_rwl);
+ min= my_atomic_load32(&soft_sync_min);
+ max= my_atomic_load32(&soft_sync_max);
+ sync_request= my_atomic_load32(&soft_need_sync);
+ my_atomic_store32(&soft_sync_min, max);
+ my_atomic_store32(&soft_need_sync, 0);
+ my_atomic_rwlock_rdunlock(&soft_sync_rwl);
+
+ sleep= group_commit_wait;
+ if (sync_request)
+ translog_sync_files(min, max, FALSE);
+ time= my_micro_time() - prev_loop;
+ if (time > sleep)
+ sleep= 0;
+ else
+ sleep-= time;
+ if (my_service_thread_sleep(&soft_sync_control, sleep))
+ break;
+ }
+ my_service_thread_signal_end(&soft_sync_control);
+ my_thread_end();
+ DBUG_RETURN(0);
+ }
+}
+
+
+/**
+ @brief Starts syncing thread
+*/
+
+int translog_soft_sync_start(void)
+{
+ pthread_t th;
+ int res= 0;
+ uint32 min, max;
+ DBUG_ENTER("translog_soft_sync_start");
+
+ /* check and init variables */
+ my_atomic_rwlock_rdlock(&soft_sync_rwl);
+ min= my_atomic_load32(&soft_sync_min);
+ max= my_atomic_load32(&soft_sync_max);
+ if (!max)
+ my_atomic_store32(&soft_sync_max, (max= get_current_logfile()->number));
+ if (!min)
+ my_atomic_store32(&soft_sync_min, max);
+ my_atomic_store32(&soft_need_sync, 1);
+ my_atomic_rwlock_rdunlock(&soft_sync_rwl);
+
+ if (!(res= ma_service_thread_control_init(&soft_sync_control)))
+ if (!(res= pthread_create(&th, NULL, ma_soft_sync_background, NULL)))
+ soft_sync_control.status= THREAD_RUNNING;
+ DBUG_RETURN(res);
+}
+
+
+/**
+ @brief Stops syncing thread
+*/
+
+void translog_soft_sync_end(void)
+{
+ DBUG_ENTER("translog_soft_sync_end");
+ if (soft_sync_control.inited)
+ {
+ ma_service_thread_control_end(&soft_sync_control);
+ }
+ DBUG_VOID_RETURN;
+}
+
+
#ifdef MARIA_DUMP_LOG
#include <my_getopt.h>
extern void translog_example_table_init();
diff --git a/storage/maria/ma_loghandler.h b/storage/maria/ma_loghandler.h
index dba6283e303..224d93fb24b 100644
--- a/storage/maria/ma_loghandler.h
+++ b/storage/maria/ma_loghandler.h
@@ -342,6 +342,14 @@ enum enum_translog_status
TRANSLOG_SHUTDOWN /* going to shutdown the loghandler */
};
extern enum enum_translog_status translog_status;
+extern ulonglong translog_syncs; /* Number of sync()s */
+
+void translog_soft_sync(my_bool mode);
+void translog_hard_group_commit(my_bool mode);
+int translog_soft_sync_start(void);
+void translog_soft_sync_end(void);
+void translog_sync();
+void translog_set_group_commit_interval(uint32 interval);
/*
all the rest added because of recovery; should we make
@@ -441,6 +449,14 @@ extern LOG_DESC log_record_type_descriptor[LOGREC_NUMBER_OF_TYPES];
typedef enum
{
+ TRANSLOG_GCOMMIT_NONE,
+ TRANSLOG_GCOMMIT_HARD,
+ TRANSLOG_GCOMMIT_SOFT
+} enum_maria_group_commit;
+extern ulong maria_group_commit;
+extern ulong maria_group_commit_interval;
+typedef enum
+{
TRANSLOG_PURGE_IMMIDIATE,
TRANSLOG_PURGE_EXTERNAL,
TRANSLOG_PURGE_ONDEMAND
diff --git a/storage/maria/ma_page.c b/storage/maria/ma_page.c
index 54361b8ee3b..acbee2a6f07 100644
--- a/storage/maria/ma_page.c
+++ b/storage/maria/ma_page.c
@@ -64,6 +64,15 @@ void _ma_page_setup(MARIA_PAGE *page, MARIA_HA *info,
share->base.key_reflength : 0);
}
+#ifdef IDENTICAL_PAGES_AFTER_RECOVERY
+void page_cleanup(MARIA_SHARE *share, MARIA_PAGE *page)
+{
+ uint length= page->size;
+ DBUG_ASSERT(length <= block_size - KEYPAGE_CHECKSUM_SIZE);
+ bzero(page->buff + length, share->block_size - length);
+}
+#endif
+
/**
Fetch a key-page in memory
@@ -102,8 +111,10 @@ my_bool _ma_fetch_keypage(MARIA_PAGE *page, MARIA_HA *info,
if (lock != PAGECACHE_LOCK_LEFT_UNLOCKED)
{
- DBUG_ASSERT(lock == PAGECACHE_LOCK_WRITE);
- page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
+ DBUG_ASSERT(lock == PAGECACHE_LOCK_WRITE || PAGECACHE_LOCK_READ);
+ page_link.unlock= (lock == PAGECACHE_LOCK_WRITE ?
+ PAGECACHE_LOCK_WRITE_UNLOCK :
+ PAGECACHE_LOCK_READ_UNLOCK);
page_link.changed= 0;
push_dynamic(&info->pinned_pages, (void*) &page_link);
page->link_offset= info->pinned_pages.elements-1;
@@ -209,14 +220,7 @@ my_bool _ma_write_keypage(MARIA_PAGE *page, enum pagecache_page_lock lock,
}
#endif
-#ifdef IDENTICAL_PAGES_AFTER_RECOVERY
- {
- uint length= page->size;
- DBUG_ASSERT(length <= block_size - KEYPAGE_CHECKSUM_SIZE);
- bzero(buff + length, block_size - length);
- }
-#endif
-
+ page_cleanup(share, page);
res= pagecache_write(share->pagecache,
&share->kfile,
(pgcache_page_no_t) (page->pos / block_size),
diff --git a/storage/maria/ma_recovery.c b/storage/maria/ma_recovery.c
index 65ad767d8ef..7b3065b0208 100644
--- a/storage/maria/ma_recovery.c
+++ b/storage/maria/ma_recovery.c
@@ -312,11 +312,14 @@ int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply,
now= my_getsystime();
in_redo_phase= TRUE;
+ trnman_init(max_trid_in_control_file);
if (run_redo_phase(from_lsn, apply))
{
ma_message_no_user(0, "Redo phase failed");
+ trnman_destroy();
goto err;
}
+ trnman_destroy();
if ((uncommitted_trans=
end_of_redo_phase(should_run_undo_phase)) == (uint)-1)
diff --git a/storage/maria/ma_rkey.c b/storage/maria/ma_rkey.c
index 32a0e141b1d..24b275d0ba6 100644
--- a/storage/maria/ma_rkey.c
+++ b/storage/maria/ma_rkey.c
@@ -83,6 +83,9 @@ int maria_rkey(MARIA_HA *info, uchar *buf, int inx, const uchar *key_data,
rw_rdlock(&keyinfo->root_lock);
nextflag= maria_read_vec[search_flag] | key.flag;
+ if (search_flag != HA_READ_KEY_EXACT ||
+ ((keyinfo->flag & (HA_NOSAME | HA_NULL_PART)) != HA_NOSAME))
+ nextflag|= SEARCH_SAVE_BUFF;
switch (keyinfo->key_alg) {
#ifdef HAVE_RTREE_KEYS
diff --git a/storage/maria/ma_search.c b/storage/maria/ma_search.c
index a50f3c49a26..d48749b1629 100644
--- a/storage/maria/ma_search.c
+++ b/storage/maria/ma_search.c
@@ -18,6 +18,10 @@
#include "ma_fulltext.h"
#include "m_ctype.h"
+static int _ma_search_no_save(register MARIA_HA *info, MARIA_KEY *key,
+ uint32 nextflag, register my_off_t pos,
+ MARIA_PINNED_PAGE **res_page_link,
+ uchar **res_page_buff);
static my_bool _ma_get_prev_key(MARIA_KEY *key, MARIA_PAGE *ma_page,
uchar *keypos);
@@ -57,7 +61,51 @@ int _ma_check_index(MARIA_HA *info, int inx)
*/
int _ma_search(register MARIA_HA *info, MARIA_KEY *key, uint32 nextflag,
- register my_off_t pos)
+ my_off_t pos)
+{
+ int error;
+ MARIA_PINNED_PAGE *page_link;
+ uchar *page_buff;
+
+ info->page_changed= 1; /* If page not saved */
+ if (!(error= _ma_search_no_save(info, key, nextflag, pos, &page_link,
+ &page_buff)))
+ {
+ if (nextflag & SEARCH_SAVE_BUFF)
+ {
+ bmove512(info->keyread_buff, page_buff, info->s->block_size);
+
+ /* Save position for a possible read next / previous */
+ info->int_keypos= info->keyread_buff + (ulonglong) info->int_keypos;
+ info->int_maxpos= info->keyread_buff + (ulonglong) info->int_maxpos;
+ info->int_keytree_version= key->keyinfo->version;
+ info->last_search_keypage= info->last_keypage;
+ info->page_changed= 0;
+ info->keyread_buff_used= 0;
+ }
+ }
+ _ma_unpin_all_pages(info, LSN_IMPOSSIBLE);
+ return (error);
+}
+
+/**
+ @breif Search after row by a key
+
+ ret_page_link Will contain pointer to page where we found key
+
+ @note
+ Position to row is stored in info->lastpos
+
+ @return
+ @retval 0 ok (key found)
+ @retval -1 Not found
+ @retval 1 If one should continue search on higher level
+*/
+
+static int _ma_search_no_save(register MARIA_HA *info, MARIA_KEY *key,
+ uint32 nextflag, register my_off_t pos,
+ MARIA_PINNED_PAGE **res_page_link,
+ uchar **res_page_buff)
{
my_bool last_key_not_used;
int error,flag;
@@ -66,6 +114,7 @@ int _ma_search(register MARIA_HA *info, MARIA_KEY *key, uint32 nextflag,
uchar lastkey[MARIA_MAX_KEY_BUFF];
MARIA_KEYDEF *keyinfo= key->keyinfo;
MARIA_PAGE page;
+ MARIA_PINNED_PAGE *page_link;
DBUG_ENTER("_ma_search");
DBUG_PRINT("enter",("pos: %lu nextflag: %u lastpos: %lu",
(ulong) pos, nextflag, (ulong) info->cur_row.lastpos));
@@ -81,10 +130,11 @@ int _ma_search(register MARIA_HA *info, MARIA_KEY *key, uint32 nextflag,
}
if (_ma_fetch_keypage(&page, info, keyinfo, pos,
- PAGECACHE_LOCK_LEFT_UNLOCKED,
- DFLT_INIT_HITS, info->keyread_buff,
- test(!(nextflag & SEARCH_SAVE_BUFF))))
+ PAGECACHE_LOCK_READ, DFLT_INIT_HITS, 0, 0))
goto err;
+ page_link= dynamic_element(&info->pinned_pages,
+ info->pinned_pages.elements-1,
+ MARIA_PINNED_PAGE*);
DBUG_DUMP("page", page.buff, page.size);
flag= (*keyinfo->bin_search)(key, &page, nextflag, &keypos, lastkey,
@@ -98,8 +148,9 @@ int _ma_search(register MARIA_HA *info, MARIA_KEY *key, uint32 nextflag,
if (flag)
{
- if ((error= _ma_search(info, key, nextflag,
- _ma_kpos(nod_flag,keypos))) <= 0)
+ if ((error= _ma_search_no_save(info, key, nextflag,
+ _ma_kpos(nod_flag,keypos),
+ res_page_link, res_page_buff)) <= 0)
DBUG_RETURN(error);
if (flag >0)
@@ -118,26 +169,15 @@ int _ma_search(register MARIA_HA *info, MARIA_KEY *key, uint32 nextflag,
((keyinfo->flag & (HA_NOSAME | HA_NULL_PART)) != HA_NOSAME ||
(key->flag & SEARCH_PART_KEY) || info->s->base.born_transactional))
{
- if ((error= _ma_search(info, key, (nextflag | SEARCH_FIND) &
- ~(SEARCH_BIGGER | SEARCH_SMALLER | SEARCH_LAST),
- _ma_kpos(nod_flag,keypos))) >= 0 ||
+ if ((error= _ma_search_no_save(info, key, (nextflag | SEARCH_FIND) &
+ ~(SEARCH_BIGGER | SEARCH_SMALLER |
+ SEARCH_LAST),
+ _ma_kpos(nod_flag,keypos),
+ res_page_link, res_page_buff)) >= 0 ||
my_errno != HA_ERR_KEY_NOT_FOUND)
DBUG_RETURN(error);
- info->last_keypage= HA_OFFSET_ERROR; /* Buffer not in mem */
}
}
- if (pos != info->last_keypage)
- {
- uchar *old_buff= page.buff;
- if (_ma_fetch_keypage(&page, info, keyinfo, pos,
- PAGECACHE_LOCK_LEFT_UNLOCKED,DFLT_INIT_HITS,
- info->keyread_buff,
- test(!(nextflag & SEARCH_SAVE_BUFF))))
- goto err;
- /* Restore position if page buffer moved */
- keypos= page.buff + (keypos - old_buff);
- maxpos= page.buff + (maxpos - old_buff);
- }
info->last_key.keyinfo= keyinfo;
if ((nextflag & (SEARCH_SMALLER | SEARCH_LAST)) && flag != 0)
@@ -172,16 +212,15 @@ int _ma_search(register MARIA_HA *info, MARIA_KEY *key, uint32 nextflag,
}
info->cur_row.lastpos= _ma_row_pos_from_key(&info->last_key);
info->cur_row.trid= _ma_trid_from_key(&info->last_key);
- /* Save position for a possible read next / previous */
- info->int_keypos= info->keyread_buff + (keypos - page.buff);
- info->int_maxpos= info->keyread_buff + (maxpos - page.buff);
- info->int_nod_flag=nod_flag;
- info->int_keytree_version=keyinfo->version;
- info->last_search_keypage=info->last_keypage;
- info->page_changed=0;
- /* Set marker that buffer was used (Marker for mi_search_next()) */
- info->keyread_buff_used= (info->keyread_buff != page.buff);
+ /* Store offset to key */
+ info->int_keypos= (uchar*) (keypos - page.buff);
+ info->int_maxpos= (uchar*) (maxpos - page.buff);
+ info->int_nod_flag= nod_flag;
+ info->last_keypage= pos;
+ *res_page_link= page_link;
+ *res_page_buff= page.buff;
+
DBUG_PRINT("exit",("found key at %lu",(ulong) info->cur_row.lastpos));
DBUG_RETURN(0);
@@ -190,7 +229,7 @@ err:
info->cur_row.lastpos= HA_OFFSET_ERROR;
info->page_changed=1;
DBUG_RETURN (-1);
-} /* _ma_search */
+}
/*
@@ -389,7 +428,7 @@ int _ma_prefix_search(const MARIA_KEY *key, const MARIA_PAGE *ma_page,
uint length_pack;
MARIA_KEYDEF *keyinfo= key->keyinfo;
MARIA_SHARE *share= keyinfo->share;
- uchar *sort_order= keyinfo->seg->charset->sort_order;
+ const uchar *sort_order= keyinfo->seg->charset->sort_order;
DBUG_ENTER("_ma_prefix_search");
LINT_INIT(length);
@@ -1883,7 +1922,7 @@ _ma_calc_var_pack_key_length(const MARIA_KEY *int_key, uint nod_flag,
uint key_length,ref_length,org_key_length=0,
length_pack,new_key_length,diff_flag,pack_marker;
const uchar *key, *start, *end, *key_end;
- uchar *sort_order;
+ const uchar *sort_order;
my_bool same_length;
MARIA_KEYDEF *keyinfo= int_key->keyinfo;
diff --git a/storage/maria/ma_sort.c b/storage/maria/ma_sort.c
index fa2cbab995a..387563ebaac 100644
--- a/storage/maria/ma_sort.c
+++ b/storage/maria/ma_sort.c
@@ -920,7 +920,6 @@ merge_buffers(MARIA_SORT_PARAM *info, uint keys, IO_CACHE *from_file,
uchar *strpos;
BUFFPEK *buffpek,**refpek;
QUEUE queue;
- volatile int *killed= _ma_killed_ptr(info->sort_info->param);
DBUG_ENTER("merge_buffers");
count=error=0;
@@ -953,10 +952,6 @@ merge_buffers(MARIA_SORT_PARAM *info, uint keys, IO_CACHE *from_file,
{
for (;;)
{
- if (*killed)
- {
- error=1; goto err;
- }
buffpek=(BUFFPEK*) queue_top(&queue);
if (to_file)
{
@@ -976,6 +971,12 @@ merge_buffers(MARIA_SORT_PARAM *info, uint keys, IO_CACHE *from_file,
buffpek->key+=sort_length;
if (! --buffpek->mem_count)
{
+ /* It's enough to check for killedptr before a slow operation */
+ if (_ma_killed_ptr(info->sort_info->param))
+ {
+ error=1;
+ goto err;
+ }
if (!(error=(int) info->read_to_buffer(from_file,buffpek,sort_length)))
{
uchar *base= buffpek->base;
diff --git a/storage/maria/ma_state.c b/storage/maria/ma_state.c
index 0b7fba9f55a..d7ddc73e2b4 100644
--- a/storage/maria/ma_state.c
+++ b/storage/maria/ma_state.c
@@ -528,7 +528,7 @@ void _ma_remove_table_from_trnman(MARIA_SHARE *share, TRN *trn)
safe_mutex_assert_owner(&share->intern_lock);
- for (prev= (MARIA_USED_TABLES**) &trn->used_tables, tables= *prev;
+ for (prev= (MARIA_USED_TABLES**) (char*) &trn->used_tables, tables= *prev;
tables;
tables= *prev)
{
diff --git a/storage/maria/ma_test3.c b/storage/maria/ma_test3.c
index 8dd631380a0..040d6fa78c2 100644
--- a/storage/maria/ma_test3.c
+++ b/storage/maria/ma_test3.c
@@ -180,7 +180,7 @@ void start_test(int id)
if (pagecacheing && rnd(2) == 0)
init_pagecache(maria_pagecache, 65536L, 0, 0, MARIA_KEY_BLOCK_LENGTH,
MY_WME);
- printf("Process %d, pid: %d\n",id,getpid()); fflush(stdout);
+ printf("Process %d, pid: %ld\n",id,(long) getpid()); fflush(stdout);
for (error=i=0 ; i < tests && !error; i++)
{
@@ -362,7 +362,7 @@ int test_write(MARIA_HA *file,int id,int lock_type)
maria_extra(file,HA_EXTRA_WRITE_CACHE,0);
}
- sprintf((char*) record.id,"%7d",getpid());
+ sprintf((char*) record.id,"%7ld", (long) getpid());
strnmov((char*) record.text,"Testing...", sizeof(record.text));
tries=(uint) rnd(100)+10;
diff --git a/storage/maria/ma_write.c b/storage/maria/ma_write.c
index 20304618229..3b9ca46899f 100644
--- a/storage/maria/ma_write.c
+++ b/storage/maria/ma_write.c
@@ -587,6 +587,12 @@ my_bool _ma_enlarge_root(MARIA_HA *info, MARIA_KEY *key, my_off_t *root)
/*
Search after a position for a key and store it there
+ TODO:
+ Change this to use pagecache directly instead of creating a copy
+ of the page. To do this, we must however change write-key-on-page
+ algorithm to not overwrite the buffer but instead store any overflow
+ key in a separate buffer.
+
@return
@retval -1 error
@retval 0 ok
diff --git a/storage/maria/maria_def.h b/storage/maria/maria_def.h
index 80a0beeef0b..8b379408521 100644
--- a/storage/maria/maria_def.h
+++ b/storage/maria/maria_def.h
@@ -390,6 +390,7 @@ typedef struct st_maria_share
my_bool now_transactional;
my_bool have_versioning;
my_bool key_del_used; /* != 0 if key_del is locked */
+ my_bool deleting; /* we are going to delete this table */
#ifdef THREAD
THR_LOCK lock;
void (*lock_restore_status)(void *);
@@ -982,6 +983,11 @@ extern ulonglong transid_get_packed(MARIA_SHARE *share, const uchar *from);
#define page_store_info(share, page) \
_ma_store_keypage_flag((share), (page)->buff, (page)->flag); \
_ma_store_page_used((share), (page)->buff, (page)->size);
+#ifdef IDENTICAL_PAGES_AFTER_RECOVERY
+void page_cleanup(MARIA_SHARE *share, MARIA_PAGE *page)
+#else
+#define page_cleanup(A,B) while (0)
+#endif
extern MARIA_KEY *_ma_make_key(MARIA_HA *info, MARIA_KEY *int_key, uint keynr,
uchar *key, const uchar *record,
@@ -1164,7 +1170,7 @@ int _ma_flush_table_files(MARIA_HA *info, uint flush_data_or_index,
Functions needed by _ma_check (are overridden in MySQL/ha_maria.cc).
See ma_check_standalone.h .
*/
-volatile int *_ma_killed_ptr(HA_CHECK *param);
+int _ma_killed_ptr(HA_CHECK *param);
void _ma_check_print_error _VARARGS((HA_CHECK *param, const char *fmt, ...))
ATTRIBUTE_FORMAT(printf, 2, 3);
void _ma_check_print_warning _VARARGS((HA_CHECK *param, const char *fmt, ...))
@@ -1200,7 +1206,7 @@ void _ma_tmp_disable_logging_for_table(MARIA_HA *info,
my_bool log_incomplete);
my_bool _ma_reenable_logging_for_table(MARIA_HA *info, my_bool flush_pages);
my_bool write_log_record_for_bulk_insert(MARIA_HA *info);
-
+void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn);
#define MARIA_NO_CRC_NORMAL_PAGE 0xffffffff
#define MARIA_NO_CRC_BITMAP_PAGE 0xfffffffe
diff --git a/storage/maria/maria_ftdump.c b/storage/maria/maria_ftdump.c
index 5e3b47b956e..8b545e6e9af 100644
--- a/storage/maria/maria_ftdump.c
+++ b/storage/maria/maria_ftdump.c
@@ -53,7 +53,7 @@ static struct my_option my_long_options[] =
int main(int argc,char *argv[])
{
- int error=0, subkeys;
+ int error=0;
uint keylen, keylen2=0, inx, doc_cnt=0;
float weight= 1.0;
double gws, min_gws=0, avg_gws=0;
@@ -112,11 +112,12 @@ int main(int argc,char *argv[])
while (!(error=maria_rnext(info,NULL,inx)))
{
+ FT_WEIGTH subkeys;
keylen=*(info->lastkey_buff);
- subkeys=ft_sintXkorr(info->lastkey_buff + keylen + 1);
- if (subkeys >= 0)
- weight=*(float*)&subkeys;
+ subkeys.i= ft_sintXkorr(info->lastkey_buff + keylen + 1);
+ if (subkeys.i >= 0)
+ weight= subkeys.f;
#ifdef HAVE_SNPRINTF
snprintf(buf,MAX_LEN,"%.*s",(int) keylen,info->lastkey_buff+1);
@@ -153,14 +154,15 @@ int main(int argc,char *argv[])
keylen2=keylen;
doc_cnt=0;
}
- doc_cnt+= (subkeys >= 0 ? 1 : -subkeys);
+ doc_cnt+= (subkeys.i >= 0 ? 1 : -subkeys.i);
}
if (dump)
{
- if (subkeys>=0)
+ if (subkeys.i >= 0)
printf("%9lx %20.7f %s\n", (long) info->cur_row.lastpos,weight,buf);
else
- printf("%9lx => %17d %s\n",(long) info->cur_row.lastpos,-subkeys,buf);
+ printf("%9lx => %17d %s\n",(long) info->cur_row.lastpos,-subkeys.i,
+ buf);
}
if (verbose && (total%HOW_OFTEN_TO_WRITE)==0)
printf("%10ld\r",total);
diff --git a/storage/maria/trnman.c b/storage/maria/trnman.c
index 43fac68806f..ceb8ad2ae2d 100644
--- a/storage/maria/trnman.c
+++ b/storage/maria/trnman.c
@@ -300,8 +300,8 @@ TRN *trnman_new_trn(WT_THD *wt)
(ABA isn't possible, we're behind a mutex
*/
my_atomic_rwlock_wrlock(&LOCK_pool);
- while (tmp.trn && !my_atomic_casptr((void **)&pool, &tmp.v,
- (void *)tmp.trn->next))
+ while (tmp.trn && !my_atomic_casptr((void **)(char*) &pool, &tmp.v,
+ (void *)tmp.trn->next))
/* no-op */;
my_atomic_rwlock_wrunlock(&LOCK_pool);
@@ -545,7 +545,7 @@ static void trnman_free_trn(TRN *trn)
down after the loop at -O2
*/
*(TRN * volatile *)&(trn->next)= tmp.trn;
- } while (!my_atomic_casptr((void **)&pool, &tmp.v, trn));
+ } while (!my_atomic_casptr((void **)(char*)&pool, &tmp.v, trn));
my_atomic_rwlock_wrunlock(&LOCK_pool);
}