summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/cfgparse.c50
-rw-r--r--src/listener.c26
2 files changed, 74 insertions, 2 deletions
diff --git a/src/cfgparse.c b/src/cfgparse.c
index ada344d17..173dcdb32 100644
--- a/src/cfgparse.c
+++ b/src/cfgparse.c
@@ -2566,8 +2566,54 @@ int check_config_validity()
/* apply thread masks and groups to all receivers */
list_for_each_entry(li, &bind_conf->listeners, by_bind) {
- li->rx.bind_thread = bind_conf->bind_thread;
- li->rx.bind_tgroup = bind_conf->bind_tgroup;
+ if (bind_conf->settings.shards <= 1) {
+ li->rx.bind_thread = bind_conf->bind_thread;
+ li->rx.bind_tgroup = bind_conf->bind_tgroup;
+ } else {
+ struct listener *new_li;
+ int shard, shards, todo, done, bit;
+ ulong mask;
+
+ shards = bind_conf->settings.shards;
+ todo = my_popcountl(bind_conf->bind_thread);
+
+ /* no more shards than total threads */
+ if (shards > todo)
+ shards = todo;
+
+ shard = done = bit = 0;
+ new_li = li;
+
+ while (1) {
+ mask = 0;
+ while (done < todo) {
+ /* enlarge mask to cover next bit of bind_thread */
+ while (!(bind_conf->bind_thread & (1UL << bit)))
+ bit++;
+ mask |= (1UL << bit);
+ bit++;
+ done += shards;
+ }
+
+ new_li->rx.bind_thread = bind_conf->bind_thread & mask;
+ new_li->rx.bind_tgroup = bind_conf->bind_tgroup;
+ done -= todo;
+
+ shard++;
+ if (shard >= shards)
+ break;
+
+ /* create another listener for new shards */
+ new_li = clone_listener(li);
+ if (!new_li) {
+ ha_alert("Out of memory while trying to allocate extra listener for shard %d in %s %s\n",
+ shard, proxy_type_str(curproxy), curproxy->id);
+ cfgerr++;
+ err_code |= ERR_FATAL | ERR_ALERT;
+ goto out;
+ }
+ }
+ }
}
}
diff --git a/src/listener.c b/src/listener.c
index 044d22390..f16ba2d0a 100644
--- a/src/listener.c
+++ b/src/listener.c
@@ -1360,6 +1360,7 @@ struct bind_conf *bind_conf_alloc(struct proxy *fe, const char *file,
bind_conf->settings.ux.uid = -1;
bind_conf->settings.ux.gid = -1;
bind_conf->settings.ux.mode = 0;
+ bind_conf->settings.shards = 1;
bind_conf->xprt = xprt;
bind_conf->frontend = fe;
bind_conf->severity_output = CLI_SEVERITY_NONE;
@@ -1639,6 +1640,30 @@ static int bind_parse_proto(char **args, int cur_arg, struct proxy *px, struct b
return 0;
}
+/* parse the "shards" bind keyword. Takes an integer or "by-thread" */
+static int bind_parse_shards(char **args, int cur_arg, struct proxy *px, struct bind_conf *conf, char **err)
+{
+ int val;
+
+ if (!*args[cur_arg + 1]) {
+ memprintf(err, "'%s' : missing value", args[cur_arg]);
+ return ERR_ALERT | ERR_FATAL;
+ }
+
+ if (strcmp(args[cur_arg + 1], "by-thread") == 0) {
+ val = MAX_THREADS; /* will be trimmed later anyway */
+ } else {
+ val = atol(args[cur_arg + 1]);
+ if (val < 1 || val > MAX_THREADS) {
+ memprintf(err, "'%s' : invalid value %d, allowed range is %d..%d or 'by-thread'", args[cur_arg], val, 1, MAX_THREADS);
+ return ERR_ALERT | ERR_FATAL;
+ }
+ }
+
+ conf->settings.shards = val;
+ return 0;
+}
+
/* parse the "thread" bind keyword */
static int bind_parse_thread(char **args, int cur_arg, struct proxy *px, struct bind_conf *conf, char **err)
{
@@ -1734,6 +1759,7 @@ static struct bind_kw_list bind_kws = { "ALL", { }, {
{ "nice", bind_parse_nice, 1 }, /* set nice of listening socket */
{ "process", bind_parse_process, 1 }, /* set list of allowed process for this socket */
{ "proto", bind_parse_proto, 1 }, /* set the proto to use for all incoming connections */
+ { "shards", bind_parse_shards, 1 }, /* set number of shards */
{ "thread", bind_parse_thread, 1 }, /* set list of allowed threads for this socket */
{ /* END */ },
}};