summaryrefslogtreecommitdiff
path: root/sql/threadpool.h
blob: 57750b73e42149e76dfce739c5c490c111df0c25 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#ifndef THREADPOOL_H_INCLUDED
#define THREADPOOL_H_INCLUDED

#ifdef HAVE_POOL_OF_THREADS
/* Copyright (C) 2012 Monty Program Ab

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */

#define MAX_THREAD_GROUPS 100000

/* Threadpool parameters */
extern uint threadpool_min_threads;  /* Minimum threads in pool */
extern uint threadpool_idle_timeout; /* Shutdown idle worker threads  after this timeout */
extern uint threadpool_size; /* Number of parallel executing threads */
extern uint threadpool_max_size;
extern uint threadpool_stall_limit;  /* time interval in 10 ms units for stall checks*/
extern uint threadpool_max_threads;  /* Maximum threads in pool */
extern uint threadpool_oversubscribe;  /* Maximum active threads in group */
extern uint threadpool_prio_kickup_timer;  /* Time before low prio item gets prio boost */
#ifdef _WIN32
extern uint threadpool_mode; /* Thread pool implementation , windows or generic */
#define TP_MODE_WINDOWS 0
#define TP_MODE_GENERIC 1
#endif


struct TP_connection;
extern void tp_callback(TP_connection *c);
extern void tp_timeout_handler(TP_connection *c);



/*
  Threadpool statistics
*/
struct TP_STATISTICS
{
  /* Current number of worker thread. */
  volatile int32 num_worker_threads;
};

extern TP_STATISTICS tp_stats;


/* Functions to set threadpool parameters */
extern void tp_set_min_threads(uint val);
extern void tp_set_max_threads(uint val);
extern void tp_set_threadpool_size(uint val);
extern void tp_set_threadpool_stall_limit(uint val);
extern int tp_get_idle_thread_count();
extern int tp_get_thread_count();

/* Activate threadpool scheduler */
extern void tp_scheduler(void);

extern int show_threadpool_idle_threads(THD *thd, SHOW_VAR *var, char *buff,
                                        enum enum_var_type scope);

enum  TP_PRIORITY {
  TP_PRIORITY_HIGH,
  TP_PRIORITY_LOW,
  TP_PRIORITY_AUTO
};


enum TP_STATE
{
  TP_STATE_IDLE,
  TP_STATE_RUNNING,
};

/*
  Connection structure, encapsulates THD + structures for asynchronous
  IO and pool.

  Platform specific parts are specified in subclasses called connection_t,
  inside threadpool_win.cc and threadpool_unix.cc
*/

struct TP_connection
{
  THD*        thd;
  CONNECT*    connect;
  TP_STATE    state;
  TP_PRIORITY priority;
  TP_connection(CONNECT *c) :
    thd(0),
    connect(c),
    state(TP_STATE_IDLE),
    priority(TP_PRIORITY_HIGH)
  {}

  virtual ~TP_connection()
  {};

  /* Initialize io structures windows threadpool, epoll etc */
  virtual int init() = 0;

  virtual void set_io_timeout(int sec) = 0;

  /* Read for the next client command (async) with specified timeout */
  virtual int start_io() = 0;

  virtual void wait_begin(int type)= 0;
  virtual void wait_end() = 0;

};


struct TP_pool
{
  virtual ~TP_pool(){};
  virtual int init()= 0;
  virtual TP_connection *new_connection(CONNECT *)= 0;
  virtual void add(TP_connection *c)= 0;
  virtual int set_max_threads(uint){ return 0; }
  virtual int set_min_threads(uint){ return 0; }
  virtual int set_pool_size(uint){ return 0; }
  virtual int set_idle_timeout(uint){ return 0; }
  virtual int set_oversubscribe(uint){ return 0; }
  virtual int set_stall_limit(uint){ return 0; }
  virtual int get_thread_count() { return tp_stats.num_worker_threads; }
  virtual int get_idle_thread_count(){ return 0; }
};

#ifdef _WIN32
struct TP_pool_win:TP_pool
{
  TP_pool_win(); 
  virtual int init();
  virtual ~TP_pool_win();
  virtual TP_connection *new_connection(CONNECT *c);
  virtual void add(TP_connection *);
  virtual int set_max_threads(uint);
  virtual int set_min_threads(uint);
};
#endif

struct TP_pool_generic :TP_pool
{
  TP_pool_generic();
  ~TP_pool_generic();
  virtual int init();
  virtual TP_connection *new_connection(CONNECT *c);
  virtual void add(TP_connection *);
  virtual int set_pool_size(uint);
  virtual int set_stall_limit(uint);
  virtual int get_idle_thread_count();
};

#endif /* HAVE_POOL_OF_THREADS */
#endif /* THREADPOOL_H_INCLUDED */