summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.h
blob: 0b9619e5e83e42f3f4ad52c728064c5f43f69074 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#ifndef RPL_PARALLEL_H
#define RPL_PARALLEL_H

#include "log_event.h"


struct rpl_parallel;
struct rpl_parallel_entry;
struct rpl_parallel_thread_pool;

class Relay_log_info;
struct rpl_parallel_thread {
  bool delay_start;
  bool running;
  bool stop;
  mysql_mutex_t LOCK_rpl_thread;
  mysql_cond_t COND_rpl_thread;
  struct rpl_parallel_thread *next;             /* For free list. */
  struct rpl_parallel_thread_pool *pool;
  THD *thd;
  struct rpl_parallel_entry *current_entry;
  struct queued_event {
    queued_event *next;
    Log_event *ev;
    rpl_group_info *rgi;
    ulonglong future_event_relay_log_pos;
    char event_relay_log_name[FN_REFLEN];
    char future_event_master_log_name[FN_REFLEN];
    ulonglong event_relay_log_pos;
    my_off_t future_event_master_log_pos;
    size_t event_size;
  } *event_queue, *last_in_queue;
  uint64 queued_size;

  void enqueue(queued_event *qev)
  {
    if (last_in_queue)
      last_in_queue->next= qev;
    else
      event_queue= qev;
    last_in_queue= qev;
    queued_size+= qev->event_size;
  }

  void dequeue(queued_event *list)
  {
    queued_event *tmp;

    DBUG_ASSERT(list == event_queue);
    event_queue= last_in_queue= NULL;
    for (tmp= list; tmp; tmp= tmp->next)
      queued_size-= tmp->event_size;
  }
};


struct rpl_parallel_thread_pool {
  uint32 count;
  struct rpl_parallel_thread **threads;
  struct rpl_parallel_thread *free_list;
  mysql_mutex_t LOCK_rpl_thread_pool;
  mysql_cond_t COND_rpl_thread_pool;
  bool changing;
  bool inited;

  rpl_parallel_thread_pool();
  int init(uint32 size);
  void destroy();
  struct rpl_parallel_thread *get_thread(rpl_parallel_entry *entry);
};


struct rpl_parallel_entry {
  uint32 domain_id;
  uint32 last_server_id;
  uint64 last_seq_no;
  uint64 last_commit_id;
  bool active;
  rpl_parallel_thread *rpl_thread;
  /*
    The sub_id of the last transaction to commit within this domain_id.
    Must be accessed under LOCK_parallel_entry protection.
  */
  uint64 last_committed_sub_id;
  mysql_mutex_t LOCK_parallel_entry;
  mysql_cond_t COND_parallel_entry;
  /*
    The sub_id of the last event group in this replication domain that was
    queued for execution by a worker thread.
  */
  uint64 current_sub_id;
  rpl_group_info *current_group_info;
  /*
    The sub_id of the last event group in the previous batch of group-committed
    transactions.

    When we spawn parallel worker threads for the next group-committed batch,
    they first need to wait for this sub_id to be committed before it is safe
    to start executing them.
  */
  uint64 prev_groupcommit_sub_id;
};
struct rpl_parallel {
  HASH domain_hash;
  rpl_parallel_entry *current;
  bool sql_thread_stopping;

  rpl_parallel();
  ~rpl_parallel();
  void reset();
  rpl_parallel_entry *find(uint32 domain_id);
  void wait_for_done();
  bool do_event(rpl_group_info *serial_rgi, Log_event *ev,
                ulonglong event_size);
};


extern struct rpl_parallel_thread_pool global_rpl_thread_pool;


extern int rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
                                            uint32 new_count,
                                            bool skip_check= false);

#endif  /* RPL_PARALLEL_H */