diff options
Diffstat (limited to 'contrib/pgbench/pgbench.c')
-rw-r--r-- | contrib/pgbench/pgbench.c | 178 |
1 files changed, 159 insertions, 19 deletions
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c index 2ad8f0bb5b..ad8e272c91 100644 --- a/contrib/pgbench/pgbench.c +++ b/contrib/pgbench/pgbench.c @@ -137,6 +137,12 @@ int unlogged_tables = 0; double sample_rate = 0.0; /* + * When threads are throttled to a given rate limit, this is the target delay + * to reach that rate in usec. 0 is the default and means no throttling. + */ +int64 throttle_delay = 0; + +/* * tablespace selection */ char *tablespace = NULL; @@ -202,11 +208,13 @@ typedef struct int listen; /* 0 indicates that an async query has been * sent */ int sleeping; /* 1 indicates that the client is napping */ + bool throttling; /* whether nap is for throttling */ int64 until; /* napping until (usec) */ Variable *variables; /* array of variable definitions */ int nvariables; instr_time txn_begin; /* used for measuring transaction latencies */ instr_time stmt_begin; /* used for measuring statement latencies */ + bool is_throttled; /* whether transaction throttling is done */ int use_file; /* index in sql_files for this client */ bool prepared[MAX_FILES]; } CState; @@ -224,6 +232,9 @@ typedef struct instr_time *exec_elapsed; /* time spent executing cmds (per Command) */ int *exec_count; /* number of cmd executions (per Command) */ unsigned short random_state[3]; /* separate randomness for each thread */ + int64 throttle_trigger; /* previous/next throttling (us) */ + int64 throttle_lag; /* total transaction lag behind throttling */ + int64 throttle_lag_max; /* max transaction lag */ } TState; #define INVALID_THREAD ((pthread_t) 0) @@ -232,6 +243,8 @@ typedef struct { instr_time conn_time; int xacts; + int64 throttle_lag; + int64 throttle_lag_max; } TResult; /* @@ -356,6 +369,7 @@ usage(void) " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n" " -P, --progress=NUM show thread progress report every NUM seconds\n" " -r, --report-latencies report average latency per command\n" + " -R, --rate=SPEC target rate in transactions per second\n" " -s, --scale=NUM report this scale factor in output\n" " -S, --select-only perform SELECT-only transactions\n" " -t, --transactions number of transactions each client runs " @@ -898,17 +912,62 @@ doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVa { PGresult *res; Command **commands; + bool trans_needs_throttle = false; top: commands = sql_files[st->use_file]; + /* + * Handle throttling once per transaction by sleeping. It is simpler + * to do this here rather than at the end, because so much complicated + * logic happens below when statements finish. + */ + if (throttle_delay && ! st->is_throttled) + { + /* + * Use inverse transform sampling to randomly generate a delay, such + * that the series of delays will approximate a Poisson distribution + * centered on the throttle_delay time. + * + * 1000 implies a 6.9 (-log(1/1000)) to 0.0 (log 1.0) delay multiplier. + * + * If transactions are too slow or a given wait is shorter than + * a transaction, the next transaction will start right away. + */ + int64 wait = (int64) + throttle_delay * -log(getrand(thread, 1, 1000)/1000.0); + + thread->throttle_trigger += wait; + + st->until = thread->throttle_trigger; + st->sleeping = 1; + st->throttling = true; + st->is_throttled = true; + if (debug) + fprintf(stderr, "client %d throttling "INT64_FORMAT" us\n", + st->id, wait); + } + if (st->sleeping) { /* are we sleeping? */ instr_time now; + int64 now_us; INSTR_TIME_SET_CURRENT(now); - if (st->until <= INSTR_TIME_GET_MICROSEC(now)) + now_us = INSTR_TIME_GET_MICROSEC(now); + if (st->until <= now_us) + { st->sleeping = 0; /* Done sleeping, go ahead with next command */ + if (st->throttling) + { + /* Measure lag of throttled transaction relative to target */ + int64 lag = now_us - st->until; + thread->throttle_lag += lag; + if (lag > thread->throttle_lag_max) + thread->throttle_lag_max = lag; + st->throttling = false; + } + } else return true; /* Still sleeping, nothing to do here */ } @@ -1095,6 +1154,15 @@ top: st->state = 0; st->use_file = (int) getrand(thread, 0, num_files - 1); commands = sql_files[st->use_file]; + st->is_throttled = false; + /* + * No transaction is underway anymore, which means there is nothing + * to listen to right now. When throttling rate limits are active, + * a sleep will happen next, as the next transaction starts. And + * then in any case the next SQL command will set listen back to 1. + */ + st->listen = 0; + trans_needs_throttle = (throttle_delay>0); } } @@ -1113,6 +1181,16 @@ top: INSTR_TIME_ACCUM_DIFF(*conn_time, end, start); } + /* + * This ensures that a throttling delay is inserted before proceeding + * with sql commands, after the first transaction. The first transaction + * throttling is performed when first entering doCustom. + */ + if (trans_needs_throttle) { + trans_needs_throttle = false; + goto top; + } + /* Record transaction start time if logging is enabled */ if (logfile && st->state == 0) INSTR_TIME_SET_CURRENT(st->txn_begin); @@ -2017,7 +2095,8 @@ process_builtin(char *tb) static void printResults(int ttype, int normal_xacts, int nclients, TState *threads, int nthreads, - instr_time total_time, instr_time conn_total_time) + instr_time total_time, instr_time conn_total_time, + int64 throttle_lag, int64 throttle_lag_max) { double time_include, tps_include, @@ -2055,6 +2134,19 @@ printResults(int ttype, int normal_xacts, int nclients, printf("number of transactions actually processed: %d\n", normal_xacts); } + + if (throttle_delay) + { + /* + * Report average transaction lag under rate limit throttling. This + * is the delay between scheduled and actual start times for the + * transaction. The measured lag may be caused by thread/client load, + * the database load, or the Poisson throttling process. + */ + printf("average rate limit schedule lag: %.3f ms (max %.3f ms)\n", + 0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max); + } + printf("tps = %f (including connections establishing)\n", tps_include); printf("tps = %f (excluding connections establishing)\n", tps_exclude); @@ -2140,6 +2232,7 @@ main(int argc, char **argv) {"unlogged-tables", no_argument, &unlogged_tables, 1}, {"sampling-rate", required_argument, NULL, 4}, {"aggregate-interval", required_argument, NULL, 5}, + {"rate", required_argument, NULL, 'R'}, {NULL, 0, NULL, 0} }; @@ -2162,6 +2255,8 @@ main(int argc, char **argv) instr_time total_time; instr_time conn_total_time; int total_xacts; + int64 throttle_lag = 0; + int64 throttle_lag_max = 0; int i; @@ -2206,7 +2301,7 @@ main(int argc, char **argv) state = (CState *) pg_malloc(sizeof(CState)); memset(state, 0, sizeof(CState)); - while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:", long_options, &optindex)) != -1) + while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:", long_options, &optindex)) != -1) { switch (c) { @@ -2371,6 +2466,19 @@ main(int argc, char **argv) exit(1); } break; + case 'R': + { + /* get a double from the beginning of option value */ + double throttle_value = atof(optarg); + if (throttle_value <= 0.0) + { + fprintf(stderr, "invalid rate limit: %s\n", optarg); + exit(1); + } + /* Invert rate limit into a time offset */ + throttle_delay = (int64) (1000000.0 / throttle_value); + } + break; case 0: /* This covers long options which take no argument. */ break; @@ -2408,6 +2516,9 @@ main(int argc, char **argv) } } + /* compute a per thread delay */ + throttle_delay *= nthreads; + if (argc > optind) dbName = argv[optind]; else @@ -2721,6 +2832,9 @@ main(int argc, char **argv) TResult *r = (TResult *) ret; total_xacts += r->xacts; + throttle_lag += r->throttle_lag; + if (r->throttle_lag_max > throttle_lag_max) + throttle_lag_max = r->throttle_lag_max; INSTR_TIME_ADD(conn_total_time, r->conn_time); free(ret); } @@ -2731,7 +2845,7 @@ main(int argc, char **argv) INSTR_TIME_SET_CURRENT(total_time); INSTR_TIME_SUBTRACT(total_time, start_time); printResults(ttype, total_xacts, nclients, threads, nthreads, - total_time, conn_total_time); + total_time, conn_total_time, throttle_lag, throttle_lag_max); return 0; } @@ -2756,6 +2870,17 @@ threadRun(void *arg) AggVals aggs; + /* + * Initialize throttling rate target for all of the thread's clients. It + * might be a little more accurate to reset thread->start_time here too. + * The possible drift seems too small relative to typical throttle delay + * times to worry about it. + */ + INSTR_TIME_SET_CURRENT(start); + thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start); + thread->throttle_lag = 0; + thread->throttle_lag_max = 0; + result = pg_malloc(sizeof(TResult)); INSTR_TIME_SET_ZERO(result->conn_time); @@ -2831,25 +2956,38 @@ threadRun(void *arg) Command **commands = sql_files[st->use_file]; int sock; - if (st->sleeping) + if (st->con == NULL) { - int this_usec; - - if (min_usec == INT64_MAX) + continue; + } + else if (st->sleeping) + { + if (st->throttling && timer_exceeded) { - instr_time now; - - INSTR_TIME_SET_CURRENT(now); - now_usec = INSTR_TIME_GET_MICROSEC(now); + /* interrupt client which has not started a transaction */ + remains--; + st->sleeping = 0; + st->throttling = false; + PQfinish(st->con); + st->con = NULL; + continue; } + else /* just a nap from the script */ + { + int this_usec; - this_usec = st->until - now_usec; - if (min_usec > this_usec) - min_usec = this_usec; - } - else if (st->con == NULL) - { - continue; + if (min_usec == INT64_MAX) + { + instr_time now; + + INSTR_TIME_SET_CURRENT(now); + now_usec = INSTR_TIME_GET_MICROSEC(now); + } + + this_usec = st->until - now_usec; + if (min_usec > this_usec) + min_usec = this_usec; + } } else if (commands[st->state]->type == META_COMMAND) { @@ -2986,6 +3124,8 @@ done: result->xacts = 0; for (i = 0; i < nstate; i++) result->xacts += state[i].cnt; + result->throttle_lag = thread->throttle_lag; + result->throttle_lag_max = thread->throttle_lag_max; INSTR_TIME_SET_CURRENT(end); INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start); if (logfile) |