summaryrefslogtreecommitdiff
path: root/examples/pigz.c
blob: 42794d0f1823be8718a9b8943a63d3a657b49587 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
/* pigz.c -- parallel implementation of gzip
 * Copyright (C) 2007 Mark Adler
 * Version 1.1  28 January 2007  Mark Adler
 */

/* Version history:
   1.0  17 Jan 2007  First version
   1.1  28 Jan 2007  Avoid void * arithmetic (some compilers don't get that)
                     Add note about requiring zlib 1.2.3
                     Allow compression level 0 (no compression)
                     Completely rewrite parallelism -- add a write thread
                     Use deflateSetDictionary() to make use of history
                     Tune argument defaults to best performance on four cores
 */

/*
   pigz compresses from stdin to stdout using threads to make use of multiple
   processors and cores.  The input is broken up into 128 KB chunks, and each
   is compressed separately.  The CRC for each chunk is also calculated
   separately.  The compressed chunks are written in order to the output,
   and the overall CRC is calculated from the CRC's of the chunks.

   The compressed data format generated is the gzip format using the deflate
   compression method.  First a gzip header is written, followed by raw deflate
   partial streams.  They are partial, in that they do not have a terminating
   block.  At the end, the deflate stream is terminated with a final empty
   static block, and lastly a gzip trailer is written with the CRC and the
   number of input bytes.

   Each raw deflate partial stream is terminated by an empty stored block
   (using the Z_SYNC_FLUSH option of zlib), in order to end that partial
   bit stream at a byte boundary.  That allows the partial streams to be
   concantenated simply as sequences of bytes.  This adds a very small four
   or five byte overhead to the output for each input chunk.

   zlib's crc32_combine() routine allows the calcuation of the CRC of the
   entire input using the independent CRC's of the chunks.  pigz requires zlib
   version 1.2.3 or later, since that is the first version that provides the
   crc32_combine() function.

   pigz uses the POSIX pthread library for thread control and communication.
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <unistd.h>
#include "zlib.h"

#define local static

/* exit with error */
local void bail(char *msg)
{
    fprintf(stderr, "pigz abort: %s\n", msg);
    exit(1);
}

/* read up to len bytes into buf, repeating read() calls as needed */
local size_t readn(int desc, unsigned char *buf, size_t len)
{
    ssize_t ret;
    size_t got;

    got = 0;
    while (len) {
        ret = read(desc, buf, len);
        if (ret < 0)
            bail("read error");
        if (ret == 0)
            break;
        buf += ret;
        len -= ret;
        got += ret;
    }
    return got;
}

/* write len bytes, repeating write() calls as needed */
local void writen(int desc, unsigned char *buf, size_t len)
{
    ssize_t ret;

    while (len) {
        ret = write(desc, buf, len);
        if (ret < 1)
            bail("write error");
        buf += ret;
        len -= ret;
    }
}

/* a flag variable for communication between two threads */
struct flag {
    int value;              /* value of flag */
    pthread_mutex_t lock;   /* lock for checking and changing flag */
    pthread_cond_t cond;    /* condition for signaling on flag change */
};

/* initialize a flag for use, starting with value val */
local void flag_init(struct flag *me, int val)
{
    me->value = val;
    pthread_mutex_init(&(me->lock), NULL);
    pthread_cond_init(&(me->cond), NULL);
}

/* set the flag to val, signal another process that may be waiting for it */
local void flag_set(struct flag *me, int val)
{
    pthread_mutex_lock(&(me->lock));
    me->value = val;
    pthread_cond_signal(&(me->cond));
    pthread_mutex_unlock(&(me->lock));
}

/* if it isn't already, wait for some other thread to set the flag to val */
local void flag_wait(struct flag *me, int val)
{
    pthread_mutex_lock(&(me->lock));
    while (me->value != val)
        pthread_cond_wait(&(me->cond), &(me->lock));
    pthread_mutex_unlock(&(me->lock));
}

/* if flag is equal to val, wait for some other thread to change it */
local void flag_wait_not(struct flag *me, int val)
{
    pthread_mutex_lock(&(me->lock));
    while (me->value == val)
        pthread_cond_wait(&(me->cond), &(me->lock));
    pthread_mutex_unlock(&(me->lock));
}

/* clean up the flag when done with it */
local void flag_done(struct flag *me)
{
    pthread_cond_destroy(&(me->cond));
    pthread_mutex_destroy(&(me->lock));
}

/* a unit of work to feed to compress_thread() -- it is assumed that the out
   buffer is large enough to hold the maximum size len bytes could deflate to,
   plus five bytes for the final sync marker */
struct work {
    size_t len;                 /* length of input */
    unsigned long crc;          /* crc of input */
    unsigned char *buf;         /* input */
    unsigned char *out;         /* space for output (guaranteed big enough) */
    z_stream strm;              /* pre-initialized z_stream */
    struct flag busy;           /* busy flag indicating work unit in use */
    pthread_t comp;             /* this compression thread */
};

/* busy flag values */
#define IDLE 0          /* compress and writing done -- can start compress */
#define COMP 1          /* compress -- input and output buffers in use */
#define WRITE 2         /* compress done, writing output -- can read input */

/* read-only globals (set by main/read thread before others started) */
local int ind;              /* input file descriptor */
local int outd;             /* output file descriptor */
local int level;            /* compression level */
local int procs;            /* number of compression threads (>= 2) */
local size_t size;          /* uncompressed input size per thread (>= 32K) */
local struct work *jobs;    /* work units: jobs[0..procs-1] */

/* next and previous jobs[] indices */
#define NEXT(n) ((n) == procs - 1 ? 0 : (n) + 1)
#define PREV(n) ((n) == 0 ? procs - 1 : (n) - 1)

/* sliding dictionary size for deflate */
#define DICT 32768U

/* largest power of 2 that fits in an unsigned int -- used to limit requests
   to zlib functions that use unsigned int lengths */
#define MAX ((((unsigned)-1) >> 1) + 1)

/* compress thread: compress the input in the provided work unit and compute
   its crc -- assume that the amount of space at job->out is guaranteed to be
   enough for the compressed output, as determined by the maximum expansion
   of deflate compression -- use the input in the previous work unit (if there
   is one) to set the deflate dictionary for better compression */
local void *compress_thread(void *arg)
{
    size_t len;                     /* input length for this work unit */
    unsigned long crc;              /* crc of input data */
    struct work *prev;              /* previous work unit */
    struct work *job = arg;         /* work unit for this thread */
    z_stream *strm = &(job->strm);  /* zlib stream for this work unit */

    /* reset state for a new compressed stream */
    (void)deflateReset(strm);

    /* initialize input, output, and crc */
    strm->next_in = job->buf;
    strm->next_out = job->out;
    len = job->len;
    crc = crc32(0L, Z_NULL, 0);

    /* set dictionary if this isn't the first work unit, and if we will be
       compressing something (the read thread assures that the dictionary
       data in the previous work unit is still there) */
    prev = jobs + PREV(job - jobs);
    if (prev->buf != NULL && len != 0)
        deflateSetDictionary(strm, prev->buf + (size - DICT), DICT);

    /* run MAX-sized amounts of input through deflate and crc32 -- this loop
       is needed for those cases where the integer type is smaller than the
       size_t type, or when len is close to the limit of the size_t type */
    while (len > MAX) {
        strm->avail_in = MAX;
        strm->avail_out = (unsigned)-1;
        crc = crc32(crc, strm->next_in, strm->avail_in);
        (void)deflate(strm, Z_NO_FLUSH);
        len -= MAX;
    }

    /* run last piece through deflate and crc32, follow with a sync marker */
    if (len) {
        strm->avail_in = len;
        strm->avail_out = (unsigned)-1;
        crc = crc32(crc, strm->next_in, strm->avail_in);
        (void)deflate(strm, Z_SYNC_FLUSH);
    }

    /* don't need to Z_FINISH, since we'd delete the last two bytes anyway */

    /* return result */
    job->crc = crc;
    return NULL;
}

/* put a 4-byte integer into a byte array in LSB order */
#define PUT4(a,b) (*(a)=(b),(a)[1]=(b)>>8,(a)[2]=(b)>>16,(a)[3]=(b)>>24)

/* write thread: wait for compression threads to complete, write output in
   order, also write gzip header and trailer around the compressed data */
local void *write_thread(void *arg)
{
    int n;                          /* compress thread index */
    size_t len;                     /* length of input processed */
    unsigned long tot;              /* total uncompressed size (overflow ok) */
    unsigned long crc;              /* CRC-32 of uncompressed data */
    unsigned char wrap[10];         /* gzip header or trailer */

    /* write simple gzip header */
    memcpy(wrap, "\037\213\10\0\0\0\0\0\0\3", 10);
    wrap[8] = level == 9 ? 2 : (level == 1 ? 4 : 0);
    writen(outd, wrap, 10);

    /* process output of compress threads until end of input */    
    tot = 0;
    crc = crc32(0L, Z_NULL, 0);
    n = 0;
    do {
        /* wait for compress thread to start, then wait to complete */
        flag_wait(&(jobs[n].busy), COMP);
        pthread_join(jobs[n].comp, NULL);

        /* now that compress is done, allow read thread to use input buffer */
        flag_set(&(jobs[n].busy), WRITE);

        /* write compressed data and update length and crc */
        writen(outd, jobs[n].out, jobs[n].strm.next_out - jobs[n].out);
        len = jobs[n].len;
        tot += len;
        crc = crc32_combine(crc, jobs[n].crc, len);

        /* release this work unit and go to the next work unit */
        flag_set(&(jobs[n].busy), IDLE);
        n = NEXT(n);

        /* an input buffer less than size in length indicates end of input */
    } while (len == size);

    /* write final static block and gzip trailer (crc and len mod 2^32) */
    wrap[0] = 3;  wrap[1] = 0;
    PUT4(wrap + 2, crc);
    PUT4(wrap + 6, tot);
    writen(outd, wrap, 10);
    return NULL;
}

/* one-time initialization of a work unit -- this is where we set the deflate
   compression level and request raw deflate, and also where we set the size
   of the output buffer to guarantee enough space for a worst-case deflate
   ending with a Z_SYNC_FLUSH */
local void job_init(struct work *job)
{
    int ret;                        /* deflateInit2() return value */

    job->buf = malloc(size);
    job->out = malloc(size + (size >> 11) + 10);
    job->strm.zfree = Z_NULL;
    job->strm.zalloc = Z_NULL;
    job->strm.opaque = Z_NULL;
    ret = deflateInit2(&(job->strm), level, Z_DEFLATED, -15, 8,
                       Z_DEFAULT_STRATEGY);
    if (job->buf == NULL || job->out == NULL || ret != Z_OK)
        bail("not enough memory");
}

/* compress ind to outd in the gzip format, using multiple threads for the
   compression and crc calculation and another thread for writing the output --
   the read thread is the main thread */
local void read_thread(void)
{
    int n;                          /* general index */
    size_t got;                     /* amount read */
    pthread_attr_t attr;            /* thread attributes (left at defaults) */
    pthread_t write;                /* write thread */

    /* set defaults (not all pthread implementations default to joinable) */
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    /* allocate and set up work list (individual work units will be initialized
       as needed, in case the input is short), assure that allocation size
       arithmetic does not overflow */
    if (size + (size >> 11) + 10 < (size >> 11) + 10 ||
        (ssize_t)(size + (size >> 11) + 10) < 0 ||
        ((size_t)0 - 1) / procs <= sizeof(struct work) ||
        (jobs = malloc(procs * sizeof(struct work))) == NULL)
        bail("not enough memory");
    for (n = 0; n < procs; n++) {
        jobs[n].buf = NULL;
        flag_init(&(jobs[n].busy), IDLE);
    }

    /* start write thread */
    pthread_create(&write, &attr, write_thread, NULL);

    /* read from input and start compress threads (write thread will pick up
       the output of the compress threads) */
    n = 0;
    do {
        /* initialize this work unit if it's the first time it's used */
        if (jobs[n].buf == NULL)
            job_init(jobs + n);

        /* read input data, but wait for last compress on this work unit to be
           done, and wait for the dictionary to be used by the last compress on
           the next work unit */
        flag_wait_not(&(jobs[n].busy), COMP);
        flag_wait_not(&(jobs[NEXT(n)].busy), COMP);
        got = readn(ind, jobs[n].buf, size);

        /* start compress thread, but wait for write to be done first */
        flag_wait(&(jobs[n].busy), IDLE);
        jobs[n].len = got;
        pthread_create(&(jobs[n].comp), &attr, compress_thread, jobs + n);

        /* mark work unit so write thread knows compress was started */
        flag_set(&(jobs[n].busy), COMP);

        /* go to the next work unit */
        n = NEXT(n);

        /* do until end of input, indicated by a read less than size */
    } while (got == size);

    /* wait for the write thread to complete -- the write thread will join with
       all of the compress threads, so this waits for all of the threads to
       complete */
    pthread_join(write, NULL);

    /* free up all requested resources and return */
    for (n = procs - 1; n >= 0; n--) {
        flag_done(&(jobs[n].busy));
        (void)deflateEnd(&(jobs[n].strm));
        free(jobs[n].out);
        free(jobs[n].buf);
    }
    free(jobs);
    pthread_attr_destroy(&attr);
}

/* Process arguments for level, size, and procs, compress from stdin to
   stdout in the gzip format.  Note that procs must be at least two in
   order to provide a dictionary in one work unit for the other work
   unit, and that size must be at least 32K to store a full dictionary. */
int main(int argc, char **argv)
{
    int n;                          /* general index */
    int get;                        /* command line parameters to get */
    char *arg;                      /* command line argument */

    /* set defaults -- 32 processes and 128K buffers was found to provide
       good utilization of four cores (about 97%) and balanced the overall
       execution time impact of more threads against more dictionary
       processing for a fixed amount of memory -- the memory usage for these
       settings and full use of all work units (at least 4 MB of input) is
       16.2 MB
       */
    level = Z_DEFAULT_COMPRESSION;
    procs = 32;
    size = 131072UL;

    /* process command-line arguments */
    get = 0;
    for (n = 1; n < argc; n++) {
        arg = argv[n];
        if (*arg == '-') {
            while (*++arg)
                if (*arg >= '0' && *arg <= '9')     /* compression level */
                    level = *arg - '0';
                else if (*arg == 'b')               /* chunk size in K */
                    get |= 1;
                else if (*arg == 'p')               /* number of processes */
                    get |= 2;
                else if (*arg == 'h') {             /* help */
                    fputs("usage: pigz [-0..9] [-b blocksizeinK]", stderr);
                    fputs(" [-p processes] < foo > foo.gz\n", stderr);
                    return 0;
                }
                else
                    bail("invalid option");
        }
        else if (get & 1) {
            if (get & 2)
                bail("you need to separate the -b and -p options");
            size = (size_t)(atol(arg)) << 10;       /* chunk size */
            if (size < DICT)
                bail("invalid option");
            get = 0;
        }
        else if (get & 2) {
            procs = atoi(arg);                      /* processes */
            if (procs < 2)
                bail("invalid option");
            get = 0;
        }
        else
            bail("invalid option (you need to pipe input and output)");
    }
    if (get)
        bail("missing option argument");

    /* do parallel compression from stdin to stdout (the read thread starts up
       the write thread and the compression threads, and they all join before
       the read thread returns) */
    ind = 0;
    outd = 1;
    read_thread();

    /* done */
    return 0;
}