diff options
Diffstat (limited to 'examples/pigz.c')
-rw-r--r-- | examples/pigz.c | 452 |
1 files changed, 452 insertions, 0 deletions
diff --git a/examples/pigz.c b/examples/pigz.c new file mode 100644 index 0000000..42794d0 --- /dev/null +++ b/examples/pigz.c @@ -0,0 +1,452 @@ +/* pigz.c -- parallel implementation of gzip + * Copyright (C) 2007 Mark Adler + * Version 1.1 28 January 2007 Mark Adler + */ + +/* Version history: + 1.0 17 Jan 2007 First version + 1.1 28 Jan 2007 Avoid void * arithmetic (some compilers don't get that) + Add note about requiring zlib 1.2.3 + Allow compression level 0 (no compression) + Completely rewrite parallelism -- add a write thread + Use deflateSetDictionary() to make use of history + Tune argument defaults to best performance on four cores + */ + +/* + pigz compresses from stdin to stdout using threads to make use of multiple + processors and cores. The input is broken up into 128 KB chunks, and each + is compressed separately. The CRC for each chunk is also calculated + separately. The compressed chunks are written in order to the output, + and the overall CRC is calculated from the CRC's of the chunks. + + The compressed data format generated is the gzip format using the deflate + compression method. First a gzip header is written, followed by raw deflate + partial streams. They are partial, in that they do not have a terminating + block. At the end, the deflate stream is terminated with a final empty + static block, and lastly a gzip trailer is written with the CRC and the + number of input bytes. + + Each raw deflate partial stream is terminated by an empty stored block + (using the Z_SYNC_FLUSH option of zlib), in order to end that partial + bit stream at a byte boundary. That allows the partial streams to be + concantenated simply as sequences of bytes. This adds a very small four + or five byte overhead to the output for each input chunk. + + zlib's crc32_combine() routine allows the calcuation of the CRC of the + entire input using the independent CRC's of the chunks. pigz requires zlib + version 1.2.3 or later, since that is the first version that provides the + crc32_combine() function. + + pigz uses the POSIX pthread library for thread control and communication. + */ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <pthread.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <unistd.h> +#include "zlib.h" + +#define local static + +/* exit with error */ +local void bail(char *msg) +{ + fprintf(stderr, "pigz abort: %s\n", msg); + exit(1); +} + +/* read up to len bytes into buf, repeating read() calls as needed */ +local size_t readn(int desc, unsigned char *buf, size_t len) +{ + ssize_t ret; + size_t got; + + got = 0; + while (len) { + ret = read(desc, buf, len); + if (ret < 0) + bail("read error"); + if (ret == 0) + break; + buf += ret; + len -= ret; + got += ret; + } + return got; +} + +/* write len bytes, repeating write() calls as needed */ +local void writen(int desc, unsigned char *buf, size_t len) +{ + ssize_t ret; + + while (len) { + ret = write(desc, buf, len); + if (ret < 1) + bail("write error"); + buf += ret; + len -= ret; + } +} + +/* a flag variable for communication between two threads */ +struct flag { + int value; /* value of flag */ + pthread_mutex_t lock; /* lock for checking and changing flag */ + pthread_cond_t cond; /* condition for signaling on flag change */ +}; + +/* initialize a flag for use, starting with value val */ +local void flag_init(struct flag *me, int val) +{ + me->value = val; + pthread_mutex_init(&(me->lock), NULL); + pthread_cond_init(&(me->cond), NULL); +} + +/* set the flag to val, signal another process that may be waiting for it */ +local void flag_set(struct flag *me, int val) +{ + pthread_mutex_lock(&(me->lock)); + me->value = val; + pthread_cond_signal(&(me->cond)); + pthread_mutex_unlock(&(me->lock)); +} + +/* if it isn't already, wait for some other thread to set the flag to val */ +local void flag_wait(struct flag *me, int val) +{ + pthread_mutex_lock(&(me->lock)); + while (me->value != val) + pthread_cond_wait(&(me->cond), &(me->lock)); + pthread_mutex_unlock(&(me->lock)); +} + +/* if flag is equal to val, wait for some other thread to change it */ +local void flag_wait_not(struct flag *me, int val) +{ + pthread_mutex_lock(&(me->lock)); + while (me->value == val) + pthread_cond_wait(&(me->cond), &(me->lock)); + pthread_mutex_unlock(&(me->lock)); +} + +/* clean up the flag when done with it */ +local void flag_done(struct flag *me) +{ + pthread_cond_destroy(&(me->cond)); + pthread_mutex_destroy(&(me->lock)); +} + +/* a unit of work to feed to compress_thread() -- it is assumed that the out + buffer is large enough to hold the maximum size len bytes could deflate to, + plus five bytes for the final sync marker */ +struct work { + size_t len; /* length of input */ + unsigned long crc; /* crc of input */ + unsigned char *buf; /* input */ + unsigned char *out; /* space for output (guaranteed big enough) */ + z_stream strm; /* pre-initialized z_stream */ + struct flag busy; /* busy flag indicating work unit in use */ + pthread_t comp; /* this compression thread */ +}; + +/* busy flag values */ +#define IDLE 0 /* compress and writing done -- can start compress */ +#define COMP 1 /* compress -- input and output buffers in use */ +#define WRITE 2 /* compress done, writing output -- can read input */ + +/* read-only globals (set by main/read thread before others started) */ +local int ind; /* input file descriptor */ +local int outd; /* output file descriptor */ +local int level; /* compression level */ +local int procs; /* number of compression threads (>= 2) */ +local size_t size; /* uncompressed input size per thread (>= 32K) */ +local struct work *jobs; /* work units: jobs[0..procs-1] */ + +/* next and previous jobs[] indices */ +#define NEXT(n) ((n) == procs - 1 ? 0 : (n) + 1) +#define PREV(n) ((n) == 0 ? procs - 1 : (n) - 1) + +/* sliding dictionary size for deflate */ +#define DICT 32768U + +/* largest power of 2 that fits in an unsigned int -- used to limit requests + to zlib functions that use unsigned int lengths */ +#define MAX ((((unsigned)-1) >> 1) + 1) + +/* compress thread: compress the input in the provided work unit and compute + its crc -- assume that the amount of space at job->out is guaranteed to be + enough for the compressed output, as determined by the maximum expansion + of deflate compression -- use the input in the previous work unit (if there + is one) to set the deflate dictionary for better compression */ +local void *compress_thread(void *arg) +{ + size_t len; /* input length for this work unit */ + unsigned long crc; /* crc of input data */ + struct work *prev; /* previous work unit */ + struct work *job = arg; /* work unit for this thread */ + z_stream *strm = &(job->strm); /* zlib stream for this work unit */ + + /* reset state for a new compressed stream */ + (void)deflateReset(strm); + + /* initialize input, output, and crc */ + strm->next_in = job->buf; + strm->next_out = job->out; + len = job->len; + crc = crc32(0L, Z_NULL, 0); + + /* set dictionary if this isn't the first work unit, and if we will be + compressing something (the read thread assures that the dictionary + data in the previous work unit is still there) */ + prev = jobs + PREV(job - jobs); + if (prev->buf != NULL && len != 0) + deflateSetDictionary(strm, prev->buf + (size - DICT), DICT); + + /* run MAX-sized amounts of input through deflate and crc32 -- this loop + is needed for those cases where the integer type is smaller than the + size_t type, or when len is close to the limit of the size_t type */ + while (len > MAX) { + strm->avail_in = MAX; + strm->avail_out = (unsigned)-1; + crc = crc32(crc, strm->next_in, strm->avail_in); + (void)deflate(strm, Z_NO_FLUSH); + len -= MAX; + } + + /* run last piece through deflate and crc32, follow with a sync marker */ + if (len) { + strm->avail_in = len; + strm->avail_out = (unsigned)-1; + crc = crc32(crc, strm->next_in, strm->avail_in); + (void)deflate(strm, Z_SYNC_FLUSH); + } + + /* don't need to Z_FINISH, since we'd delete the last two bytes anyway */ + + /* return result */ + job->crc = crc; + return NULL; +} + +/* put a 4-byte integer into a byte array in LSB order */ +#define PUT4(a,b) (*(a)=(b),(a)[1]=(b)>>8,(a)[2]=(b)>>16,(a)[3]=(b)>>24) + +/* write thread: wait for compression threads to complete, write output in + order, also write gzip header and trailer around the compressed data */ +local void *write_thread(void *arg) +{ + int n; /* compress thread index */ + size_t len; /* length of input processed */ + unsigned long tot; /* total uncompressed size (overflow ok) */ + unsigned long crc; /* CRC-32 of uncompressed data */ + unsigned char wrap[10]; /* gzip header or trailer */ + + /* write simple gzip header */ + memcpy(wrap, "\037\213\10\0\0\0\0\0\0\3", 10); + wrap[8] = level == 9 ? 2 : (level == 1 ? 4 : 0); + writen(outd, wrap, 10); + + /* process output of compress threads until end of input */ + tot = 0; + crc = crc32(0L, Z_NULL, 0); + n = 0; + do { + /* wait for compress thread to start, then wait to complete */ + flag_wait(&(jobs[n].busy), COMP); + pthread_join(jobs[n].comp, NULL); + + /* now that compress is done, allow read thread to use input buffer */ + flag_set(&(jobs[n].busy), WRITE); + + /* write compressed data and update length and crc */ + writen(outd, jobs[n].out, jobs[n].strm.next_out - jobs[n].out); + len = jobs[n].len; + tot += len; + crc = crc32_combine(crc, jobs[n].crc, len); + + /* release this work unit and go to the next work unit */ + flag_set(&(jobs[n].busy), IDLE); + n = NEXT(n); + + /* an input buffer less than size in length indicates end of input */ + } while (len == size); + + /* write final static block and gzip trailer (crc and len mod 2^32) */ + wrap[0] = 3; wrap[1] = 0; + PUT4(wrap + 2, crc); + PUT4(wrap + 6, tot); + writen(outd, wrap, 10); + return NULL; +} + +/* one-time initialization of a work unit -- this is where we set the deflate + compression level and request raw deflate, and also where we set the size + of the output buffer to guarantee enough space for a worst-case deflate + ending with a Z_SYNC_FLUSH */ +local void job_init(struct work *job) +{ + int ret; /* deflateInit2() return value */ + + job->buf = malloc(size); + job->out = malloc(size + (size >> 11) + 10); + job->strm.zfree = Z_NULL; + job->strm.zalloc = Z_NULL; + job->strm.opaque = Z_NULL; + ret = deflateInit2(&(job->strm), level, Z_DEFLATED, -15, 8, + Z_DEFAULT_STRATEGY); + if (job->buf == NULL || job->out == NULL || ret != Z_OK) + bail("not enough memory"); +} + +/* compress ind to outd in the gzip format, using multiple threads for the + compression and crc calculation and another thread for writing the output -- + the read thread is the main thread */ +local void read_thread(void) +{ + int n; /* general index */ + size_t got; /* amount read */ + pthread_attr_t attr; /* thread attributes (left at defaults) */ + pthread_t write; /* write thread */ + + /* set defaults (not all pthread implementations default to joinable) */ + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + /* allocate and set up work list (individual work units will be initialized + as needed, in case the input is short), assure that allocation size + arithmetic does not overflow */ + if (size + (size >> 11) + 10 < (size >> 11) + 10 || + (ssize_t)(size + (size >> 11) + 10) < 0 || + ((size_t)0 - 1) / procs <= sizeof(struct work) || + (jobs = malloc(procs * sizeof(struct work))) == NULL) + bail("not enough memory"); + for (n = 0; n < procs; n++) { + jobs[n].buf = NULL; + flag_init(&(jobs[n].busy), IDLE); + } + + /* start write thread */ + pthread_create(&write, &attr, write_thread, NULL); + + /* read from input and start compress threads (write thread will pick up + the output of the compress threads) */ + n = 0; + do { + /* initialize this work unit if it's the first time it's used */ + if (jobs[n].buf == NULL) + job_init(jobs + n); + + /* read input data, but wait for last compress on this work unit to be + done, and wait for the dictionary to be used by the last compress on + the next work unit */ + flag_wait_not(&(jobs[n].busy), COMP); + flag_wait_not(&(jobs[NEXT(n)].busy), COMP); + got = readn(ind, jobs[n].buf, size); + + /* start compress thread, but wait for write to be done first */ + flag_wait(&(jobs[n].busy), IDLE); + jobs[n].len = got; + pthread_create(&(jobs[n].comp), &attr, compress_thread, jobs + n); + + /* mark work unit so write thread knows compress was started */ + flag_set(&(jobs[n].busy), COMP); + + /* go to the next work unit */ + n = NEXT(n); + + /* do until end of input, indicated by a read less than size */ + } while (got == size); + + /* wait for the write thread to complete -- the write thread will join with + all of the compress threads, so this waits for all of the threads to + complete */ + pthread_join(write, NULL); + + /* free up all requested resources and return */ + for (n = procs - 1; n >= 0; n--) { + flag_done(&(jobs[n].busy)); + (void)deflateEnd(&(jobs[n].strm)); + free(jobs[n].out); + free(jobs[n].buf); + } + free(jobs); + pthread_attr_destroy(&attr); +} + +/* Process arguments for level, size, and procs, compress from stdin to + stdout in the gzip format. Note that procs must be at least two in + order to provide a dictionary in one work unit for the other work + unit, and that size must be at least 32K to store a full dictionary. */ +int main(int argc, char **argv) +{ + int n; /* general index */ + int get; /* command line parameters to get */ + char *arg; /* command line argument */ + + /* set defaults -- 32 processes and 128K buffers was found to provide + good utilization of four cores (about 97%) and balanced the overall + execution time impact of more threads against more dictionary + processing for a fixed amount of memory -- the memory usage for these + settings and full use of all work units (at least 4 MB of input) is + 16.2 MB + */ + level = Z_DEFAULT_COMPRESSION; + procs = 32; + size = 131072UL; + + /* process command-line arguments */ + get = 0; + for (n = 1; n < argc; n++) { + arg = argv[n]; + if (*arg == '-') { + while (*++arg) + if (*arg >= '0' && *arg <= '9') /* compression level */ + level = *arg - '0'; + else if (*arg == 'b') /* chunk size in K */ + get |= 1; + else if (*arg == 'p') /* number of processes */ + get |= 2; + else if (*arg == 'h') { /* help */ + fputs("usage: pigz [-0..9] [-b blocksizeinK]", stderr); + fputs(" [-p processes] < foo > foo.gz\n", stderr); + return 0; + } + else + bail("invalid option"); + } + else if (get & 1) { + if (get & 2) + bail("you need to separate the -b and -p options"); + size = (size_t)(atol(arg)) << 10; /* chunk size */ + if (size < DICT) + bail("invalid option"); + get = 0; + } + else if (get & 2) { + procs = atoi(arg); /* processes */ + if (procs < 2) + bail("invalid option"); + get = 0; + } + else + bail("invalid option (you need to pipe input and output)"); + } + if (get) + bail("missing option argument"); + + /* do parallel compression from stdin to stdout (the read thread starts up + the write thread and the compression threads, and they all join before + the read thread returns) */ + ind = 0; + outd = 1; + read_thread(); + + /* done */ + return 0; +} |