diff options
author | He Zhenxing <zhenxing.he@sun.com> | 2009-09-26 12:49:49 +0800 |
---|---|---|
committer | He Zhenxing <zhenxing.he@sun.com> | 2009-09-26 12:49:49 +0800 |
commit | 623ed58cfda0aef6b6bf545a4200357a58a8a4cc (patch) | |
tree | 28e6a4c77de3c3073b4dbe0b0e09e019adeaa556 /sql/rpl_handler.cc | |
parent | e465d113832aeac61a36902c7976d455e1525234 (diff) | |
download | mariadb-git-623ed58cfda0aef6b6bf545a4200357a58a8a4cc.tar.gz |
Backporting WL#4398 WL#1720
Backporting BUG#44058 BUG#42244 BUG#45672 BUG#45673
Backporting BUG#45819 BUG#45973 BUG#39012
Diffstat (limited to 'sql/rpl_handler.cc')
-rw-r--r-- | sql/rpl_handler.cc | 493 |
1 files changed, 493 insertions, 0 deletions
diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc new file mode 100644 index 00000000000..aea838928b9 --- /dev/null +++ b/sql/rpl_handler.cc @@ -0,0 +1,493 @@ +/* Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "mysql_priv.h" + +#include "rpl_mi.h" +#include "sql_repl.h" +#include "log_event.h" +#include "rpl_filter.h" +#include <my_dir.h> +#include "rpl_handler.h" + +Trans_delegate *transaction_delegate; +Binlog_storage_delegate *binlog_storage_delegate; +#ifdef HAVE_REPLICATION +Binlog_transmit_delegate *binlog_transmit_delegate; +Binlog_relay_IO_delegate *binlog_relay_io_delegate; +#endif /* HAVE_REPLICATION */ + +/* + structure to save transaction log filename and position +*/ +typedef struct Trans_binlog_info { + my_off_t log_pos; + char log_file[FN_REFLEN]; +} Trans_binlog_info; + +static pthread_key(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); + +int get_user_var_int(const char *name, + long long int *value, int *null_value) +{ + my_bool null_val; + user_var_entry *entry= + (user_var_entry*) hash_search(¤t_thd->user_vars, + (uchar*) name, strlen(name)); + if (!entry) + return 1; + *value= entry->val_int(&null_val); + if (null_value) + *null_value= null_val; + return 0; +} + +int get_user_var_real(const char *name, + double *value, int *null_value) +{ + my_bool null_val; + user_var_entry *entry= + (user_var_entry*) hash_search(¤t_thd->user_vars, + (uchar*) name, strlen(name)); + if (!entry) + return 1; + *value= entry->val_real(&null_val); + if (null_value) + *null_value= null_val; + return 0; +} + +int get_user_var_str(const char *name, char *value, + size_t len, unsigned int precision, int *null_value) +{ + String str; + my_bool null_val; + user_var_entry *entry= + (user_var_entry*) hash_search(¤t_thd->user_vars, + (uchar*) name, strlen(name)); + if (!entry) + return 1; + entry->val_str(&null_val, &str, precision); + strncpy(value, str.c_ptr(), len); + if (null_value) + *null_value= null_val; + return 0; +} + +int delegates_init() +{ + static unsigned char trans_mem[sizeof(Trans_delegate)]; + static unsigned char storage_mem[sizeof(Binlog_storage_delegate)]; +#ifdef HAVE_REPLICATION + static unsigned char transmit_mem[sizeof(Binlog_transmit_delegate)]; + static unsigned char relay_io_mem[sizeof(Binlog_relay_IO_delegate)]; +#endif + + if (!(transaction_delegate= new (trans_mem) Trans_delegate) + || (!transaction_delegate->is_inited()) + || !(binlog_storage_delegate= new (storage_mem) Binlog_storage_delegate) + || (!binlog_storage_delegate->is_inited()) +#ifdef HAVE_REPLICATION + || !(binlog_transmit_delegate= new (transmit_mem) Binlog_transmit_delegate) + || (!binlog_transmit_delegate->is_inited()) + || !(binlog_relay_io_delegate= new (relay_io_mem) Binlog_relay_IO_delegate) + || (!binlog_relay_io_delegate->is_inited()) +#endif /* HAVE_REPLICATION */ + ) + return 1; + + if (pthread_key_create(&RPL_TRANS_BINLOG_INFO, NULL)) + return 1; + return 0; +} + +void delegates_destroy() +{ + if (transaction_delegate) + transaction_delegate->~Trans_delegate(); + if (binlog_storage_delegate) + binlog_storage_delegate->~Binlog_storage_delegate(); +#ifdef HAVE_REPLICATION + if (binlog_transmit_delegate) + binlog_transmit_delegate->~Binlog_transmit_delegate(); + if (binlog_relay_io_delegate) + binlog_relay_io_delegate->~Binlog_relay_IO_delegate(); +#endif /* HAVE_REPLICATION */ +} + +/* + This macro is used by almost all the Delegate methods to iterate + over all the observers running given callback function of the + delegate . + + Add observer plugins to the thd->lex list, after each statement, all + plugins add to thd->lex will be automatically unlocked. + */ +#define FOREACH_OBSERVER(r, f, thd, args) \ + param.server_id= thd->server_id; \ + read_lock(); \ + Observer_info_iterator iter= observer_info_iter(); \ + Observer_info *info= iter++; \ + for (; info; info= iter++) \ + { \ + plugin_ref plugin= \ + my_plugin_lock(thd, &info->plugin); \ + if (!plugin) \ + { \ + r= 1; \ + break; \ + } \ + if (((Observer *)info->observer)->f \ + && ((Observer *)info->observer)->f args) \ + { \ + r= 1; \ + plugin_unlock(thd, plugin); \ + break; \ + } \ + plugin_unlock(thd, plugin); \ + } \ + unlock() + + +int Trans_delegate::after_commit(THD *thd, bool all) +{ + Trans_param param; + bool is_real_trans= (all || thd->transaction.all.ha_list == 0); + if (is_real_trans) + param.flags |= TRANS_IS_REAL_TRANS; + + Trans_binlog_info *log_info= + my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); + + param.log_file= log_info ? log_info->log_file : 0; + param.log_pos= log_info ? log_info->log_pos : 0; + + int ret= 0; + FOREACH_OBSERVER(ret, after_commit, thd, (¶m)); + + /* + This is the end of a real transaction or autocommit statement, we + can free the memory allocated for binlog file and position. + */ + if (is_real_trans && log_info) + { + my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL); + my_free(log_info, MYF(0)); + } + return ret; +} + +int Trans_delegate::after_rollback(THD *thd, bool all) +{ + Trans_param param; + bool is_real_trans= (all || thd->transaction.all.ha_list == 0); + if (is_real_trans) + param.flags |= TRANS_IS_REAL_TRANS; + + Trans_binlog_info *log_info= + my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); + + param.log_file= log_info ? log_info->log_file : 0; + param.log_pos= log_info ? log_info->log_pos : 0; + + int ret= 0; + FOREACH_OBSERVER(ret, after_commit, thd, (¶m)); + + /* + This is the end of a real transaction or autocommit statement, we + can free the memory allocated for binlog file and position. + */ + if (is_real_trans && log_info) + { + my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL); + my_free(log_info, MYF(0)); + } + return ret; +} + +int Binlog_storage_delegate::after_flush(THD *thd, + const char *log_file, + my_off_t log_pos, + bool synced) +{ + Binlog_storage_param param; + uint32 flags=0; + if (synced) + flags |= BINLOG_STORAGE_IS_SYNCED; + + Trans_binlog_info *log_info= + my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); + + if (!log_info) + { + if(!(log_info= + (Trans_binlog_info *)my_malloc(sizeof(Trans_binlog_info), MYF(0)))) + return 1; + my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, log_info); + } + + strcpy(log_info->log_file, log_file+dirname_length(log_file)); + log_info->log_pos = log_pos; + + int ret= 0; + FOREACH_OBSERVER(ret, after_flush, thd, + (¶m, log_info->log_file, log_info->log_pos, flags)); + return ret; +} + +#ifdef HAVE_REPLICATION +int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags, + const char *log_file, + my_off_t log_pos) +{ + Binlog_transmit_param param; + param.flags= flags; + + int ret= 0; + FOREACH_OBSERVER(ret, transmit_start, thd, (¶m, log_file, log_pos)); + return ret; +} + +int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags) +{ + Binlog_transmit_param param; + param.flags= flags; + + int ret= 0; + FOREACH_OBSERVER(ret, transmit_stop, thd, (¶m)); + return ret; +} + +int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags, + String *packet) +{ + /* NOTE2ME: Maximum extra header size for each observer, I hope 32 + bytes should be enough for each Observer to reserve their extra + header. If later found this is not enough, we can increase this + /HEZX + */ +#define RESERVE_HEADER_SIZE 32 + unsigned char header[RESERVE_HEADER_SIZE]; + ulong hlen; + Binlog_transmit_param param; + param.flags= flags; + param.server_id= thd->server_id; + + int ret= 0; + read_lock(); + Observer_info_iterator iter= observer_info_iter(); + Observer_info *info= iter++; + for (; info; info= iter++) + { + plugin_ref plugin= + my_plugin_lock(thd, &info->plugin); + if (!plugin) + { + ret= 1; + break; + } + hlen= 0; + if (((Observer *)info->observer)->reserve_header + && ((Observer *)info->observer)->reserve_header(¶m, + header, + RESERVE_HEADER_SIZE, + &hlen)) + { + ret= 1; + plugin_unlock(thd, plugin); + break; + } + plugin_unlock(thd, plugin); + if (hlen == 0) + continue; + if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen)) + { + ret= 1; + break; + } + } + unlock(); + return ret; +} + +int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags, + String *packet, + const char *log_file, + my_off_t log_pos) +{ + Binlog_transmit_param param; + param.flags= flags; + + int ret= 0; + FOREACH_OBSERVER(ret, before_send_event, thd, + (¶m, (uchar *)packet->c_ptr(), + packet->length(), + log_file+dirname_length(log_file), log_pos)); + return ret; +} + +int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags, + String *packet) +{ + Binlog_transmit_param param; + param.flags= flags; + + int ret= 0; + FOREACH_OBSERVER(ret, after_send_event, thd, + (¶m, packet->c_ptr(), packet->length())); + return ret; +} + +int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags) + +{ + Binlog_transmit_param param; + param.flags= flags; + + int ret= 0; + FOREACH_OBSERVER(ret, after_reset_master, thd, (¶m)); + return ret; +} + +void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param, + Master_info *mi) +{ + param->mysql= mi->mysql; + param->user= mi->user; + param->host= mi->host; + param->port= mi->port; + param->master_log_name= mi->master_log_name; + param->master_log_pos= mi->master_log_pos; +} + +int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi) +{ + Binlog_relay_IO_param param; + init_param(¶m, mi); + + int ret= 0; + FOREACH_OBSERVER(ret, thread_start, thd, (¶m)); + return ret; +} + + +int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi) +{ + + Binlog_relay_IO_param param; + init_param(¶m, mi); + + int ret= 0; + FOREACH_OBSERVER(ret, thread_stop, thd, (¶m)); + return ret; +} + +int Binlog_relay_IO_delegate::before_request_transmit(THD *thd, + Master_info *mi, + ushort flags) +{ + Binlog_relay_IO_param param; + init_param(¶m, mi); + + int ret= 0; + FOREACH_OBSERVER(ret, before_request_transmit, thd, (¶m, (uint32)flags)); + return ret; +} + +int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi, + const char *packet, ulong len, + const char **event_buf, + ulong *event_len) +{ + Binlog_relay_IO_param param; + init_param(¶m, mi); + + int ret= 0; + FOREACH_OBSERVER(ret, after_read_event, thd, + (¶m, packet, len, event_buf, event_len)); + return ret; +} + +int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi, + const char *event_buf, + ulong event_len, + bool synced) +{ + Binlog_relay_IO_param param; + init_param(¶m, mi); + + uint32 flags=0; + if (synced) + flags |= BINLOG_STORAGE_IS_SYNCED; + + int ret= 0; + FOREACH_OBSERVER(ret, after_queue_event, thd, + (¶m, event_buf, event_len, flags)); + return ret; +} + +int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi) + +{ + Binlog_relay_IO_param param; + init_param(¶m, mi); + + int ret= 0; + FOREACH_OBSERVER(ret, after_reset_slave, thd, (¶m)); + return ret; +} +#endif /* HAVE_REPLICATION */ + +int register_trans_observer(Trans_observer *observer, void *p) +{ + return transaction_delegate->add_observer(observer, (st_plugin_int *)p); +} + +int unregister_trans_observer(Trans_observer *observer, void *p) +{ + return transaction_delegate->remove_observer(observer, (st_plugin_int *)p); +} + +int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p) +{ + return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p); +} + +int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p) +{ + return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p); +} + +#ifdef HAVE_REPLICATION +int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) +{ + return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p); +} + +int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) +{ + return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p); +} + +int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) +{ + return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p); +} + +int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) +{ + return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p); +} +#endif /* HAVE_REPLICATION */ |