From f6194ef39af5864f792412460c354cc339dde7d1 Mon Sep 17 00:00:00 2001 From: Mark Adler Date: Fri, 9 Sep 2011 23:26:40 -0700 Subject: zlib 1.2.3.4 --- examples/README.examples | 21 +- examples/enough.c | 569 ++++++++++++++++++++ examples/gzlog.c | 1303 ++++++++++++++++++++++++++++++++++------------ examples/gzlog.h | 93 ++-- examples/pigz.c | 452 ++++++++++++++++ 5 files changed, 2074 insertions(+), 364 deletions(-) create mode 100644 examples/enough.c create mode 100644 examples/pigz.c (limited to 'examples') diff --git a/examples/README.examples b/examples/README.examples index 5632d7a..146919c 100644 --- a/examples/README.examples +++ b/examples/README.examples @@ -1,4 +1,10 @@ -This directory contains examples of the use of zlib. +This directory contains examples of the use of zlib and other relevant +programs and documentation. + +enough.c + calculation and justification of ENOUGH parameter in inftrees.h + - calculates the maximum table space used in inflate tree + construction over all possible Huffman codes fitblk.c compress just enough input to nearly fill a requested output size @@ -23,9 +29,16 @@ gzjoin.c gzlog.c gzlog.h - efficiently maintain a message log file in gzip format - - illustrates use of raw deflate and Z_SYNC_FLUSH - - illustrates use of gzip header extra field + efficiently and robustly maintain a message log file in gzip format + - illustrates use of raw deflate, Z_PARTIAL_FLUSH, deflatePrime(), + and deflateSetDictionary() + - illustrates use of a gzip header extra field + +pigz.c + parallel implementation of gzip compression + - uses pthreads to speed up compression on multiple core machines + - illustrates the use of deflateSetDictionary() with raw deflate + - illustrates the use of crc32_combine() zlib_how.html painfully comprehensive description of zpipe.c (see below) diff --git a/examples/enough.c b/examples/enough.c new file mode 100644 index 0000000..b570707 --- /dev/null +++ b/examples/enough.c @@ -0,0 +1,569 @@ +/* enough.c -- determine the maximum size of inflate's Huffman code tables over + * all possible valid and complete Huffman codes, subject to a length limit. + * Copyright (C) 2007, 2008 Mark Adler + * Version 1.3 17 February 2008 Mark Adler + */ + +/* Version history: + 1.0 3 Jan 2007 First version (derived from codecount.c version 1.4) + 1.1 4 Jan 2007 Use faster incremental table usage computation + Prune examine() search on previously visited states + 1.2 5 Jan 2007 Comments clean up + As inflate does, decrease root for short codes + Refuse cases where inflate would increase root + 1.3 17 Feb 2008 Add argument for initial root table size + Fix bug for initial root table size == max - 1 + Use a macro to compute the history index + */ + +/* + Examine all possible Huffman codes for a given number of symbols and a + maximum code length in bits to determine the maximum table size for zilb's + inflate. Only complete Huffman codes are counted. + + Two codes are considered distinct if the vectors of the number of codes per + length are not identical. So permutations of the symbol assignments result + in the same code for the counting, as do permutations of the assignments of + the bit values to the codes (i.e. only canonical codes are counted). + + We build a code from shorter to longer lengths, determining how many symbols + are coded at each length. At each step, we have how many symbols remain to + be coded, what the last code length used was, and how many bit patterns of + that length remain unused. Then we add one to the code length and double the + number of unused patterns to graduate to the next code length. We then + assign all portions of the remaining symbols to that code length that + preserve the properties of a correct and eventually complete code. Those + properties are: we cannot use more bit patterns than are available; and when + all the symbols are used, there are exactly zero possible bit patterns + remaining. + + The inflate Huffman decoding algorithm uses two-level lookup tables for + speed. There is a single first-level table to decode codes up to root bits + in length (root == 9 in the current inflate implementation). The table + has 1 << root entries and is indexed by the next root bits of input. Codes + shorter than root bits have replicated table entries, so that the correct + entry is pointed to regardless of the bits that follow the short code. If + the code is longer than root bits, then the table entry points to a second- + level table. The size of that table is determined by the longest code with + that root-bit prefix. If that longest code has length len, then the table + has size 1 << (len - root), to index the remaining bits in that set of + codes. Each subsequent root-bit prefix then has its own sub-table. The + total number of table entries required by the code is calculated + incrementally as the number of codes at each bit length is populated. When + all of the codes are shorter than root bits, then root is reduced to the + longest code length, resulting in a single, smaller, one-level table. + + The inflate algorithm also provides for small values of root (relative to + the log2 of the number of symbols), where the shortest code has more bits + than root. In that case, root is increased to the length of the shortest + code. This program, by design, does not handle that case, so it is verified + that the number of symbols is less than 2^(root + 1). + + In order to speed up the examination (by about ten orders of magnitude for + the default arguments), the intermediate states in the build-up of a code + are remembered and previously visited branches are pruned. The memory + required for this will increase rapidly with the total number of symbols and + the maximum code length in bits. However this is a very small price to pay + for the vast speedup. + + First, all of the possible Huffman codes are counted, and reachable + intermediate states are noted by a non-zero count in a saved-results array. + Second, the intermediate states that lead to (root + 1) bit or longer codes + are used to look at all sub-codes from those junctures for their inflate + memory usage. (The amount of memory used is not affected by the number of + codes of root bits or less in length.) Third, the visited states in the + construction of those sub-codes and the associated calculation of the table + size is recalled in order to avoid recalculating from the same juncture. + Beginning the code examination at (root + 1) bit codes, which is enabled by + identifying the reachable nodes, accounts for about six of the orders of + magnitude of improvement for the default arguments. About another four + orders of magnitude come from not revisiting previous states. Out of + approximately 2x10^16 possible Huffman codes, only about 2x10^6 sub-codes + need to be examined to cover all of the possible table memory usage cases + for the default arguments of 286 symbols limited to 15-bit codes. + + Note that an unsigned long long type is used for counting. It is quite easy + to exceed the capacity of an eight-byte integer with a large number of + symbols and a large maximum code length, so multiple-precision arithmetic + would need to replace the unsigned long long arithmetic in that case. This + program will abort if an overflow occurs. The big_t type identifies where + the counting takes place. + + An unsigned long long type is also used for calculating the number of + possible codes remaining at the maximum length. This limits the maximum + code length to the number of bits in a long long minus the number of bits + needed to represent the symbols in a flat code. The code_t type identifies + where the bit pattern counting takes place. + */ + +#include +#include +#include +#include + +#define local static + +/* special data types */ +typedef unsigned long long big_t; /* type for code counting */ +typedef unsigned long long code_t; /* type for bit pattern counting */ +struct tab { /* type for been here check */ + size_t len; /* length of bit vector in char's */ + char *vec; /* allocated bit vector */ +}; + +/* The array for saving results, num[], is indexed with this triplet: + + syms: number of symbols remaining to code + left: number of available bit patterns at length len + len: number of bits in the codes currently being assigned + + Those indices are constrained thusly when saving results: + + syms: 3..totsym (totsym == total symbols to code) + left: 2..syms - 1, but only the evens (so syms == 8 -> 2, 4, 6) + len: 1..max - 1 (max == maximum code length in bits) + + syms == 2 is not saved since that immediately leads to a single code. left + must be even, since it represents the number of available bit patterns at + the current length, which is double the number at the previous length. + left ends at syms-1 since left == syms immediately results in a single code. + (left > sym is not allowed since that would result in an incomplete code.) + len is less than max, since the code completes immediately when len == max. + + The offset into the array is calculated for the three indices with the + first one (syms) being outermost, and the last one (len) being innermost. + We build the array with length max-1 lists for the len index, with syms-3 + of those for each symbol. There are totsym-2 of those, with each one + varying in length as a function of sym. See the calculation of index in + count() for the index, and the calculation of size in main() for the size + of the array. + + For the deflate example of 286 symbols limited to 15-bit codes, the array + has 284,284 entries, taking up 2.17 MB for an 8-byte big_t. More than + half of the space allocated for saved results is actually used -- not all + possible triplets are reached in the generation of valid Huffman codes. + */ + +/* The array for tracking visited states, done[], is itself indexed identically + to the num[] array as described above for the (syms, left, len) triplet. + Each element in the array is further indexed by the (mem, rem) doublet, + where mem is the amount of inflate table space used so far, and rem is the + remaining unused entries in the current inflate sub-table. Each indexed + element is simply one bit indicating whether the state has been visited or + not. Since the ranges for mem and rem are not known a priori, each bit + vector is of a variable size, and grows as needed to accommodate the visited + states. mem and rem are used to calculate a single index in a triangular + array. Since the range of mem is expected in the default case to be about + ten times larger than the range of rem, the array is skewed to reduce the + memory usage, with eight times the range for mem than for rem. See the + calculations for offset and bit in beenhere() for the details. + + For the deflate example of 286 symbols limited to 15-bit codes, the bit + vectors grow to total approximately 21 MB, in addition to the 4.3 MB done[] + array itself. + */ + +/* Globals to avoid propagating constants or constant pointers recursively */ +local int max; /* maximum allowed bit length for the codes */ +local int root; /* size of base code table in bits */ +local int large; /* largest code table so far */ +local size_t size; /* number of elements in num and done */ +local int *code; /* number of symbols assigned to each bit length */ +local big_t *num; /* saved results array for code counting */ +local struct tab *done; /* states already evaluated array */ + +/* Index function for num[] and done[] */ +#define INDEX(i,j,k) (((size_t)((i-1)>>1)*((i-2)>>1)+(j>>1)-1)*(max-1)+k-1) + +/* Free allocated space. Uses globals code, num, and done. */ +local void cleanup(void) +{ + size_t n; + + if (done != NULL) { + for (n = 0; n < size; n++) + if (done[n].len) + free(done[n].vec); + free(done); + } + if (num != NULL) + free(num); + if (code != NULL) + free(code); +} + +/* Return the number of possible Huffman codes using bit patterns of lengths + len through max inclusive, coding syms symbols, with left bit patterns of + length len unused -- return -1 if there is an overflow in the counting. + Keep a record of previous results in num to prevent repeating the same + calculation. Uses the globals max and num. */ +local big_t count(int syms, int len, int left) +{ + big_t sum; /* number of possible codes from this juncture */ + big_t got; /* value returned from count() */ + int least; /* least number of syms to use at this juncture */ + int most; /* most number of syms to use at this juncture */ + int use; /* number of bit patterns to use in next call */ + size_t index; /* index of this case in *num */ + + /* see if only one possible code */ + if (syms == left) + return 1; + + /* note and verify the expected state */ + assert(syms > left && left > 0 && len < max); + + /* see if we've done this one already */ + index = INDEX(syms, left, len); + got = num[index]; + if (got) + return got; /* we have -- return the saved result */ + + /* we need to use at least this many bit patterns so that the code won't be + incomplete at the next length (more bit patterns than symbols) */ + least = (left << 1) - syms; + if (least < 0) + least = 0; + + /* we can use at most this many bit patterns, lest there not be enough + available for the remaining symbols at the maximum length (if there were + no limit to the code length, this would become: most = left - 1) */ + most = (((code_t)left << (max - len)) - syms) / + (((code_t)1 << (max - len)) - 1); + + /* count all possible codes from this juncture and add them up */ + sum = 0; + for (use = least; use <= most; use++) { + got = count(syms - use, len + 1, (left - use) << 1); + sum += got; + if (got == -1 || sum < got) /* overflow */ + return -1; + } + + /* verify that all recursive calls are productive */ + assert(sum != 0); + + /* save the result and return it */ + num[index] = sum; + return sum; +} + +/* Return true if we've been here before, set to true if not. Set a bit in a + bit vector to indicate visiting this state. Each (syms,len,left) state + has a variable size bit vector indexed by (mem,rem). The bit vector is + lengthened if needed to allow setting the (mem,rem) bit. */ +local int beenhere(int syms, int len, int left, int mem, int rem) +{ + size_t index; /* index for this state's bit vector */ + size_t offset; /* offset in this state's bit vector */ + int bit; /* mask for this state's bit */ + size_t length; /* length of the bit vector in bytes */ + char *vector; /* new or enlarged bit vector */ + + /* point to vector for (syms,left,len), bit in vector for (mem,rem) */ + index = INDEX(syms, left, len); + mem -= 1 << root; + offset = (mem >> 3) + rem; + offset = ((offset * (offset + 1)) >> 1) + rem; + bit = 1 << (mem & 7); + + /* see if we've been here */ + length = done[index].len; + if (offset < length && (done[index].vec[offset] & bit) != 0) + return 1; /* done this! */ + + /* we haven't been here before -- set the bit to show we have now */ + + /* see if we need to lengthen the vector in order to set the bit */ + if (length <= offset) { + /* if we have one already, enlarge it, zero out the appended space */ + if (length) { + do { + length <<= 1; + } while (length <= offset); + vector = realloc(done[index].vec, length); + if (vector != NULL) + memset(vector + done[index].len, 0, length - done[index].len); + } + + /* otherwise we need to make a new vector and zero it out */ + else { + length = 1 << (len - root); + while (length <= offset) + length <<= 1; + vector = calloc(length, sizeof(char)); + } + + /* in either case, bail if we can't get the memory */ + if (vector == NULL) { + fputs("abort: unable to allocate enough memory\n", stderr); + cleanup(); + exit(1); + } + + /* install the new vector */ + done[index].len = length; + done[index].vec = vector; + } + + /* set the bit */ + done[index].vec[offset] |= bit; + return 0; +} + +/* Examine all possible codes from the given node (syms, len, left). Compute + the amount of memory required to build inflate's decoding tables, where the + number of code structures used so far is mem, and the number remaining in + the current sub-table is rem. Uses the globals max, code, root, large, and + done. */ +local void examine(int syms, int len, int left, int mem, int rem) +{ + int least; /* least number of syms to use at this juncture */ + int most; /* most number of syms to use at this juncture */ + int use; /* number of bit patterns to use in next call */ + + /* see if we have a complete code */ + if (syms == left) { + /* set the last code entry */ + code[len] = left; + + /* complete computation of memory used by this code */ + while (rem < left) { + left -= rem; + rem = 1 << (len - root); + mem += rem; + } + assert(rem == left); + + /* if this is a new maximum, show the entries used and the sub-code */ + if (mem > large) { + large = mem; + printf("max %d: ", mem); + for (use = root + 1; use <= max; use++) + if (code[use]) + printf("%d[%d] ", code[use], use); + putchar('\n'); + fflush(stdout); + } + + /* remove entries as we drop back down in the recursion */ + code[len] = 0; + return; + } + + /* prune the tree if we can */ + if (beenhere(syms, len, left, mem, rem)) + return; + + /* we need to use at least this many bit patterns so that the code won't be + incomplete at the next length (more bit patterns than symbols) */ + least = (left << 1) - syms; + if (least < 0) + least = 0; + + /* we can use at most this many bit patterns, lest there not be enough + available for the remaining symbols at the maximum length (if there were + no limit to the code length, this would become: most = left - 1) */ + most = (((code_t)left << (max - len)) - syms) / + (((code_t)1 << (max - len)) - 1); + + /* occupy least table spaces, creating new sub-tables as needed */ + use = least; + while (rem < use) { + use -= rem; + rem = 1 << (len - root); + mem += rem; + } + rem -= use; + + /* examine codes from here, updating table space as we go */ + for (use = least; use <= most; use++) { + code[len] = use; + examine(syms - use, len + 1, (left - use) << 1, + mem + (rem ? 1 << (len - root) : 0), rem << 1); + if (rem == 0) { + rem = 1 << (len - root); + mem += rem; + } + rem--; + } + + /* remove entries as we drop back down in the recursion */ + code[len] = 0; +} + +/* Look at all sub-codes starting with root + 1 bits. Look at only the valid + intermediate code states (syms, left, len). For each completed code, + calculate the amount of memory required by inflate to build the decoding + tables. Find the maximum amount of memory required and show the code that + requires that maximum. Uses the globals max, root, and num. */ +local void enough(int syms) +{ + int n; /* number of remaing symbols for this node */ + int left; /* number of unused bit patterns at this length */ + size_t index; /* index of this case in *num */ + + /* clear code */ + for (n = 0; n <= max; n++) + code[n] = 0; + + /* look at all (root + 1) bit and longer codes */ + large = 1 << root; /* base table */ + if (root < max) /* otherwise, there's only a base table */ + for (n = 3; n <= syms; n++) + for (left = 2; left < n; left += 2) + { + /* look at all reachable (root + 1) bit nodes, and the + resulting codes (complete at root + 2 or more) */ + index = INDEX(n, left, root + 1); + if (root + 1 < max && num[index]) /* reachable node */ + examine(n, root + 1, left, 1 << root, 0); + + /* also look at root bit codes with completions at root + 1 + bits (not saved in num, since complete), just in case */ + if (num[index - 1] && n <= left << 1) + examine((n - left) << 1, root + 1, (n - left) << 1, + 1 << root, 0); + } + + /* done */ + printf("done: maximum of %d table entries\n", large); +} + +/* + Examine and show the total number of possible Huffman codes for a given + maximum number of symbols, initial root table size, and maximum code length + in bits -- those are the command arguments in that order. The default + values are 286, 9, and 15 respectively, for the deflate literal/length code. + The possible codes are counted for each number of coded symbols from two to + the maximum. The counts for each of those and the total number of codes are + shown. The maximum number of inflate table entires is then calculated + across all possible codes. Each new maximum number of table entries and the + associated sub-code (starting at root + 1 == 10 bits) is shown. + + To count and examine Huffman codes that are not length-limited, provide a + maximum length equal to the number of symbols minus one. + + For the deflate literal/length code, use "enough". For the deflate distance + code, use "enough 30 6". + + This uses the %llu printf format to print big_t numbers, which assumes that + big_t is an unsigned long long. If the big_t type is changed (for example + to a multiple precision type), the method of printing will also need to be + updated. + */ +int main(int argc, char **argv) +{ + int syms; /* total number of symbols to code */ + int n; /* number of symbols to code for this run */ + big_t got; /* return value of count() */ + big_t sum; /* accumulated number of codes over n */ + + /* set up globals for cleanup() */ + code = NULL; + num = NULL; + done = NULL; + + /* get arguments -- default to the deflate literal/length code */ + syms = 286; + root = 9; + max = 15; + if (argc > 1) { + syms = atoi(argv[1]); + if (argc > 2) { + root = atoi(argv[2]); + if (argc > 3) + max = atoi(argv[3]); + } + } + if (argc > 4 || syms < 2 || root < 1 || max < 1) { + fputs("invalid arguments, need: [sym >= 2 [root >= 1 [max >= 1]]]\n", + stderr); + return 1; + } + + /* if not restricting the code length, the longest is syms - 1 */ + if (max > syms - 1) + max = syms - 1; + + /* determine the number of bits in a code_t */ + n = 0; + while (((code_t)1 << n) != 0) + n++; + + /* make sure that the calculation of most will not overflow */ + if (max > n || syms - 2 >= (((code_t)0 - 1) >> (max - 1))) { + fputs("abort: code length too long for internal types\n", stderr); + return 1; + } + + /* reject impossible code requests */ + if (syms - 1 > ((code_t)1 << max) - 1) { + fprintf(stderr, "%d symbols cannot be coded in %d bits\n", + syms, max); + return 1; + } + + /* allocate code vector */ + code = calloc(max + 1, sizeof(int)); + if (code == NULL) { + fputs("abort: unable to allocate enough memory\n", stderr); + return 1; + } + + /* determine size of saved results array, checking for overflows, + allocate and clear the array (set all to zero with calloc()) */ + if (syms == 2) /* iff max == 1 */ + num = NULL; /* won't be saving any results */ + else { + size = syms >> 1; + if (size > ((size_t)0 - 1) / (n = (syms - 1) >> 1) || + (size *= n, size > ((size_t)0 - 1) / (n = max - 1)) || + (size *= n, size > ((size_t)0 - 1) / sizeof(big_t)) || + (num = calloc(size, sizeof(big_t))) == NULL) { + fputs("abort: unable to allocate enough memory\n", stderr); + cleanup(); + return 1; + } + } + + /* count possible codes for all numbers of symbols, add up counts */ + sum = 0; + for (n = 2; n <= syms; n++) { + got = count(n, 1, 2); + sum += got; + if (got == -1 || sum < got) { /* overflow */ + fputs("abort: can't count that high!\n", stderr); + cleanup(); + return 1; + } + printf("%llu %d-codes\n", got, n); + } + printf("%llu total codes for 2 to %d symbols", sum, syms); + if (max < syms - 1) + printf(" (%d-bit length limit)\n", max); + else + puts(" (no length limit)"); + + /* allocate and clear done array for beenhere() */ + if (syms == 2) + done = NULL; + else if (size > ((size_t)0 - 1) / sizeof(struct tab) || + (done = calloc(size, sizeof(struct tab))) == NULL) { + fputs("abort: unable to allocate enough memory\n", stderr); + cleanup(); + return 1; + } + + /* find and show maximum inflate table usage */ + if (root > max) /* reduce root to max length */ + root = max; + if (syms < ((code_t)1 << (root + 1))) + enough(syms); + else + puts("cannot handle minimum code lengths > root"); + + /* done */ + cleanup(); + return 0; +} diff --git a/examples/gzlog.c b/examples/gzlog.c index b6acdef..4daf1c2 100644 --- a/examples/gzlog.c +++ b/examples/gzlog.c @@ -1,413 +1,1058 @@ /* * gzlog.c - * Copyright (C) 2004 Mark Adler + * Copyright (C) 2004, 2008 Mark Adler, all rights reserved * For conditions of distribution and use, see copyright notice in gzlog.h - * version 1.0, 26 Nov 2004 - * + * version 2.0, 25 Apr 2008 */ -#include /* memcmp() */ -#include /* malloc(), free(), NULL */ -#include /* size_t, off_t */ -#include /* read(), close(), sleep(), ftruncate(), */ - /* lseek() */ -#include /* open() */ -#include /* flock() */ -#include "zlib.h" /* deflateInit2(), deflate(), deflateEnd() */ +/* + gzlog provides a mechanism for frequently appending short strings to a gzip + file that is efficient both in execution time and compression ratio. The + strategy is to write the short strings in an uncompressed form to the end of + the gzip file, only compressing when the amount of uncompressed data has + reached a given threshold. + + gzlog also provides protection against interruptions in the process due to + system crashes. The status of the operation is recorded in an extra field + in the gzip file, and is only updated once the gzip file is brought to a + valid state. The last data to be appended or compressed is saved in an + auxiliary file, so that if the operation is interrupted, it can be completed + the next time an append operation is attempted. + + gzlog maintains another auxiliary file with the last 32K of data from the + compressed portion, which is preloaded for the compression of the subsequent + data. This minimizes the impact to the compression ratio of appending. + */ + +/* + Operations Concept: + + Files (log name "foo"): + foo.gz -- gzip file with the complete log + foo.add -- last message to append or last data to compress + foo.dict -- dictionary of the last 32K of data for next compression + foo.temp -- temporary dictionary file for compression after this one + foo.lock -- lock file for reading and writing the other files + foo.repairs -- log file for log file recovery operations (not compressed) + + gzip file structure: + - fixed-length (no file name) header with extra field (see below) + - compressed data ending initially with empty stored block + - uncompressed data filling out originally empty stored block and + subsequent stored blocks as needed (16K max each) + - gzip trailer + - no junk at end (no other gzip streams) + + When appending data, the information in the first three items above plus the + foo.add file are sufficient to recover an interrupted append operation. The + extra field has the necessary information to restore the start of the last + stored block and determine where to append the data in the foo.add file, as + well as the crc and length of the gzip data before the append operation. + + The foo.add file is created before the gzip file is marked for append, and + deleted after the gzip file is marked as complete. So if the append + operation is interrupted, the data to add will still be there. If due to + some external force, the foo.add file gets deleted between when the append + operation was interrupted and when recovery is attempted, the gzip file will + still be restored, but without the appended data. + + When compressing data, the information in the first two items above plus the + foo.add file are sufficient to recover an interrupted compress operation. + The extra field has the necessary information to find the end of the + compressed data, and contains both the crc and length of just the compressed + data and of the complete set of data including the contents of the foo.add + file. + + Again, the foo.add file is maintained during the compress operation in case + of an interruption. If in the unlikely event the foo.add file with the data + to be compressed is missing due to some external force, a gzip file with + just the previous compressed data will be reconstructed. In this case, all + of the data that was to be compressed is lost (approximately one megabyte). + This will not occur if all that happened was an interruption of the compress + operation. + + The third state that is marked is the replacement of the old dictionary with + the new dictionary after a compress operation. Once compression is + complete, the gzip file is marked as being in the replace state. This + completes the gzip file, so an interrupt after being so marked does not + result in recompression. Then the dictionary file is replaced, and the gzip + file is marked as completed. This state prevents the possibility of + restarting compression with the wrong dictionary file. + + All three operations are wrapped by a lock/unlock procedure. In order to + gain exclusive access to the log files, first a foo.lock file must be + exclusively created. When all operations are complete, the lock is + released by deleting the foo.lock file. If when attempting to create the + lock file, it already exists and the modify time of the lock file is more + than five minutes old (set by the PATIENCE define below), then the old + lock file is considered stale and deleted, and the exclusive creation of + the lock file is retried. To assure that there are no false assessments + of the staleness of the lock file, the operations periodically touch the + lock file to update the modified date. + + Following is the definition of the extra field with all of the information + required to enable the above append and compress operations and their + recovery if interrupted. Multi-byte values are stored little endian + (consistent with the gzip format). File pointers are eight bytes long. + The crc's and lengths for the gzip trailer are four bytes long. (Note that + the length at the end of a gzip file is used for error checking only, and + for large files is actually the length modulo 2^32.) The stored block + length is two bytes long. The gzip extra field two-byte identification is + "ap" for append. It is assumed that writing the extra field to the file is + an "atomic" operation. That is, either all of the extra field is written + to the file, or none of it is, if the operation is interrupted right at the + point of updating the extra field. This is a reasonable assumption, since + the extra field is within the first 52 bytes of the file, which is smaller + than any expected block size for a mass storage device (usually 512 bytes or + larger). + + Extra field (35 bytes): + - Pointer to first stored block length -- this points to the two-byte length + of the first stored block, which is followed by the two-byte, one's + complement of that length. The stored block length is preceded by the + three-bit header of the stored block, which is the actual start of the + stored block in the deflate format. See the bit offset field below. + - Pointer to the last stored block length. This is the same as above, but + for the last stored block of the uncompressed data in the gzip file. + Initially this is the same as the first stored block length pointer. + When the stored block gets to 16K (see the MAX_STORE define), then a new + stored block as added, at which point the last stored block length pointer + is different from the first stored block length pointer. When they are + different, the first bit of the last stored block header is eight bits, or + one byte back from the block length. + - Compressed data crc and length. This is the crc and length of the data + that is in the compressed portion of the deflate stream. These are used + only in the event that the foo.add file containing the data to compress is + lost after a compress operation is interrupted. + - Total data crc and length. This is the crc and length of all of the data + stored in the gzip file, compressed and uncompressed. It is used to + reconstruct the gzip trailer when compressing, as well as when recovering + interrupted operations. + - Final stored block length. This is used to quickly find where to append, + and allows the restoration of the original final stored block state when + an append operation is interrupted. + - First stored block start as the number of bits back from the final stored + block first length byte. This value is in the range of 3..10, and is + stored as the low three bits of the final byte of the extra field after + subtracting three (0..7). This allows the last-block bit of the stored + block header to be updated when a new stored block is added, for the case + when the first stored block and the last stored block are the same. (When + they are different, the numbers of bits back is known to be eight.) This + also allows for new compressed data to be appended to the old compressed + data in the compress operation, overwriting the previous first stored + block, or for the compressed data to be terminated and a valid gzip file + reconstructed on the off chance that a compression operation was + interrupted and the data to compress in the foo.add file was deleted. + - The operation in process. This is the next two bits in the last byte (the + bits under the mask 0x18). The are interpreted as 0: nothing in process, + 1: append in process, 2: compress in process, 3: replace in process. + - The top three bits of the last byte in the extra field are reserved and + are currently set to zero. + + Main procedure: + - Exclusively create the foo.lock file using the O_CREAT and O_EXCL modes of + the system open() call. If the modify time of an existing lock file is + more than PATIENCE seconds old, then the lock file is deleted and the + exclusive create is retried. + - Load the extra field from the foo.gz file, and see if an operation was in + progress but not completed. If so, apply the recovery procedure below. + - Perform the append procedure with the provided data. + - If the uncompressed data in the foo.gz file is 1MB or more, apply the + compress procedure. + - Delete the foo.lock file. + + Append procedure: + - Put what to append in the foo.add file so that the operation can be + restarted if this procedure is interrupted. + - Mark the foo.gz extra field with the append operation in progress. + + Restore the original last-block bit and stored block length of the last + stored block from the information in the extra field, in case a previous + append operation was interrupted. + - Append the provided data to the last stored block, creating new stored + blocks as needed and updating the stored blocks last-block bits and + lengths. + - Update the crc and length with the new data, and write the gzip trailer. + - Write over the extra field (with a single write operation) with the new + pointers, lengths, and crc's, and mark the gzip file as not in process. + Though there is still a foo.add file, it will be ignored since nothing + is in process. If a foo.add file is leftover from a previously + completed operation, it is truncated when writing new data to it. + - Delete the foo.add file. + + Compress and replace procedures: + - Read all of the uncompressed data in the stored blocks in foo.gz and write + it to foo.add. Also write foo.temp with the last 32K of that data to + provide a dictionary for the next invocation of this procedure. + - Rewrite the extra field marking foo.gz with a compression in process. + * If there is no data provided to compress (due to a missing foo.add file + when recovering), reconstruct and truncate the foo.gz file to contain + only the previous compressed data and proceed to the step after the next + one. Otherwise ... + - Compress the data with the dictionary in foo.dict, and write to the + foo.gz file starting at the bit immediately following the last previously + compressed block. If there is no foo.dict, proceed anyway with the + compression at slightly reduced efficiency. (For the foo.dict file to be + missing requires some external failure beyond simply the interruption of + a compress operation.) During this process, the foo.lock file is + periodically touched to assure that that file is not considered stale by + another process before we're done. The deflation is terminated with a + non-last empty static block (10 bits long), that is then located and + written over by a last-bit-set empty stored block. + - Append the crc and length of the data in the gzip file (previously + calculated during the append operations). + - Write over the extra field with the updated stored block offsets, bits + back, crc's, and lengths, and mark foo.gz as in process for a replacement + of the dictionary. + @ Delete the foo.add file. + - Replace foo.dict with foo.temp. + - Write over the extra field, marking foo.gz as complete. + + Recovery procedure: + - If not a replace recovery, read in the foo.add file, and provide that data + to the appropriate recovery below. If there is no foo.add file, provide + a zero data length to the recovery. In that case, the append recovery + restores the foo.gz to the previous compressed + uncompressed data state. + For the the compress recovery, a missing foo.add file results in foo.gz + being restored to the previous compressed-only data state. + - Append recovery: + - Pick up append at + step above + - Compress recovery: + - Pick up compress at * step above + - Replace recovery: + - Pick up compress at @ step above + - Log the repair with a date stamp in foo.repairs + */ + +#include +#include /* rename, fopen, fprintf, fclose */ +#include /* malloc, free */ +#include /* strlen, strrchr, strcpy, strncpy, strcmp */ +#include /* open */ +#include /* lseek, read, write, close, unlink, sleep, */ + /* ftruncate, fsync */ +#include /* errno */ +#include /* time, ctime */ +#include /* stat */ +#include /* utimes */ +#include "zlib.h" /* crc32 */ + +#include "gzlog.h" /* header for external access */ -#include "gzlog.h" /* interface */ #define local static +typedef unsigned int uint; +typedef unsigned long ulong; + +/* Macro for debugging to deterministically force recovery operations */ +#ifdef DEBUG + #include /* longjmp */ + jmp_buf gzlog_jump; /* where to go back to */ + int gzlog_bail = 0; /* which point to bail at (1..8) */ + int gzlog_count = -1; /* number of times through to wait */ +# define BAIL(n) do { if (n == gzlog_bail && gzlog_count-- == 0) \ + longjmp(gzlog_jump, gzlog_bail); } while (0) +#else +# define BAIL(n) +#endif + +/* how old the lock file can be in seconds before considering it stale */ +#define PATIENCE 300 + +/* maximum stored block size in Kbytes -- must be in 1..63 */ +#define MAX_STORE 16 -/* log object structure */ -typedef struct { - int id; /* object identifier */ - int fd; /* log file descriptor */ - off_t extra; /* offset of extra "ap" subfield */ - off_t mark_off; /* offset of marked data */ - off_t last_off; /* offset of last block */ - unsigned long crc; /* uncompressed crc */ - unsigned long len; /* uncompressed length (modulo 2^32) */ - unsigned stored; /* length of current stored block */ -} gz_log; - -#define GZLOGID 19334 /* gz_log object identifier */ - -#define LOCK_RETRY 1 /* retry lock once a second */ -#define LOCK_PATIENCE 1200 /* try about twenty minutes before forcing */ - -/* acquire a lock on a file */ -local int lock(int fd) +/* number of stored Kbytes to trigger compression (must be >= 32 to allow + dictionary construction, and <= 204 * MAX_STORE, in order for >> 10 to + discard the stored block headers contribution of five bytes each) */ +#define TRIGGER 1024 + +/* size of a deflate dictionary (this cannot be changed) */ +#define DICT 32768U + +/* values for the operation (2 bits) */ +#define NO_OP 0 +#define APPEND_OP 1 +#define COMPRESS_OP 2 +#define REPLACE_OP 3 + +/* macros to extract little-endian integers from an unsigned byte buffer */ +#define PULL2(p) ((p)[0]+((uint)((p)[1])<<8)) +#define PULL4(p) (PULL2(p)+((ulong)PULL2(p+2)<<16)) +#define PULL8(p) (PULL4(p)+((off_t)PULL4(p+4)<<32)) + +/* macros to store integers into a byte buffer in little-endian order */ +#define PUT2(p,a) do {(p)[0]=a;(p)[1]=(a)>>8;} while(0) +#define PUT4(p,a) do {PUT2(p,a);PUT2(p+2,a>>16);} while(0) +#define PUT8(p,a) do {PUT4(p,a);PUT4(p+4,a>>32);} while(0) + +/* internal structure for log information */ +#define LOGID "\106\035\172" /* should be three non-zero characters */ +struct log { + char id[4]; /* contains LOGID to detect inadvertent overwrites */ + int fd; /* file descriptor for .gz file, opened read/write */ + char *path; /* allocated path, e.g. "/var/log/foo" or "foo" */ + char *end; /* end of path, for appending suffices such as ".gz" */ + off_t first; /* offset of first stored block first length byte */ + int back; /* location of first block id in bits back from first */ + uint stored; /* bytes currently in last stored block */ + off_t last; /* offset of last stored block first length byte */ + ulong ccrc; /* crc of compressed data */ + ulong clen; /* length (modulo 2^32) of compressed data */ + ulong tcrc; /* crc of total data */ + ulong tlen; /* length (modulo 2^32) of total data */ + time_t lock; /* last modify time of our lock file */ +}; + +/* gzip header for gzlog */ +local unsigned char log_gzhead[] = { + 0x1f, 0x8b, /* magic gzip id */ + 8, /* compression method is deflate */ + 4, /* there is an extra field (no file name) */ + 0, 0, 0, 0, /* no modification time provided */ + 0, 0xff, /* no extra flags, no OS specified */ + 39, 0, 'a', 'p', 35, 0 /* extra field with "ap" subfield */ + /* 35 is EXTRA, 39 is EXTRA + 4 */ +}; + +#define HEAD sizeof(log_gzhead) /* should be 16 */ + +/* initial gzip extra field content (52 == HEAD + EXTRA + 1) */ +local unsigned char log_gzext[] = { + 52, 0, 0, 0, 0, 0, 0, 0, /* offset of first stored block length */ + 52, 0, 0, 0, 0, 0, 0, 0, /* offset of last stored block length */ + 0, 0, 0, 0, 0, 0, 0, 0, /* compressed data crc and length */ + 0, 0, 0, 0, 0, 0, 0, 0, /* total data crc and length */ + 0, 0, /* final stored block data length */ + 5 /* op is NO_OP, last bit 8 bits back */ +}; + +#define EXTRA sizeof(log_gzext) /* should be 35 */ + +/* initial gzip data and trailer */ +local unsigned char log_gzbody[] = { + 1, 0, 0, 0xff, 0xff, /* empty stored block (last) */ + 0, 0, 0, 0, /* crc */ + 0, 0, 0, 0 /* uncompressed length */ +}; + +#define BODY sizeof(log_gzbody) + +/* Exclusively create foo.lock in order to negotiate exclusive access to the + foo.* files. If the modify time of an existing lock file is greater than + PATIENCE seconds in the past, then consider the lock file to have been + abandoned, delete it, and try the exclusive create again. Save the lock + file modify time for verification of ownership. Return 0 on success, or -1 + on failure, usually due to an access restriction or invalid path. Note that + if stat() or unlink() fails, it may be due to another process noticing the + abandoned lock file a smidge sooner and deleting it, so those are not + flagged as an error. */ +local int log_lock(struct log *log) { - int patience; + int fd; + struct stat st; - /* try to lock every LOCK_RETRY seconds for LOCK_PATIENCE seconds */ - patience = LOCK_PATIENCE; - do { - if (flock(fd, LOCK_EX + LOCK_NB) == 0) - return 0; - (void)sleep(LOCK_RETRY); - patience -= LOCK_RETRY; - } while (patience > 0); + strcpy(log->end, ".lock"); + while ((fd = open(log->path, O_CREAT | O_EXCL, 0644)) < 0) { + if (errno != EEXIST) + return -1; + if (stat(log->path, &st) == 0 && time(NULL) - st.st_mtime > PATIENCE) { + unlink(log->path); + continue; + } + sleep(2); /* relinquish the CPU for two seconds while waiting */ + } + close(fd); + if (stat(log->path, &st) == 0) + log->lock = st.st_mtime; + return 0; +} - /* we've run out of patience -- give up */ - return -1; +/* Update the modify time of the lock file to now, in order to prevent another + task from thinking that the lock is stale. Save the lock file modify time + for verification of ownership. */ +local void log_touch(struct log *log) +{ + struct stat st; + + strcpy(log->end, ".lock"); + utimes(log->path, NULL); + if (stat(log->path, &st) == 0) + log->lock = st.st_mtime; } -/* release lock */ -local void unlock(int fd) +/* Check the log file modify time against what is expected. Return true if + this is not our lock. If it is our lock, touch it to keep it. */ +local int log_check(struct log *log) { - (void)flock(fd, LOCK_UN); + struct stat st; + + strcpy(log->end, ".lock"); + if (stat(log->path, &st) || st.st_mtime != log->lock) + return 1; + log_touch(log); + return 0; } -/* release a log object */ -local void log_clean(gz_log *log) +/* Unlock a previously acquired lock, but only if it's ours. */ +local void log_unlock(struct log *log) { - unlock(log->fd); - (void)close(log->fd); - free(log); + if (log_check(log)) + return; + strcpy(log->end, ".lock"); + unlink(log->path); + log->lock = 0; } -/* read an unsigned long from a byte buffer little-endian */ -local unsigned long make_ulg(unsigned char *buf) +/* Check the gzip header and read in the extra field, filling in the values in + the log structure. Return op on success or -1 if the gzip header was not as + expected. op is the current operation in progress last written to the extra + field. This assumes that the gzip file has already been opened, with the + file descriptor log->fd. */ +local int log_head(struct log *log) { - int n; - unsigned long val; + int op; + unsigned char buf[HEAD + EXTRA]; - val = (unsigned long)(*buf++); - for (n = 8; n < 32; n += 8) - val += (unsigned long)(*buf++) << n; - return val; + if (lseek(log->fd, 0, SEEK_SET) < 0 || + read(log->fd, buf, HEAD + EXTRA) != HEAD + EXTRA || + memcmp(buf, log_gzhead, HEAD)) { + return -1; + } + log->first = PULL8(buf + HEAD); + log->last = PULL8(buf + HEAD + 8); + log->ccrc = PULL4(buf + HEAD + 16); + log->clen = PULL4(buf + HEAD + 20); + log->tcrc = PULL4(buf + HEAD + 24); + log->tlen = PULL4(buf + HEAD + 28); + log->stored = PULL2(buf + HEAD + 32); + log->back = 3 + (buf[HEAD + 34] & 7); + op = (buf[HEAD + 34] >> 3) & 3; + return op; } -/* read an off_t from a byte buffer little-endian */ -local off_t make_off(unsigned char *buf) +/* Write over the extra field contents, marking the operation as op. Use fsync + to assure that the device is written to, and in the requested order. This + operation, and only this operation, is assumed to be atomic in order to + assure that the log is recoverable in the event of an interruption at any + point in the process. Return -1 if the write to foo.gz failed. */ +local int log_mark(struct log *log, int op) { - int n; - off_t val; + int ret; + unsigned char ext[EXTRA]; - val = (off_t)(*buf++); - for (n = 8; n < 64; n += 8) - val += (off_t)(*buf++) << n; - return val; + PUT8(ext, log->first); + PUT8(ext + 8, log->last); + PUT4(ext + 16, log->ccrc); + PUT4(ext + 20, log->clen); + PUT4(ext + 24, log->tcrc); + PUT4(ext + 28, log->tlen); + PUT2(ext + 32, log->stored); + ext[34] = log->back - 3 + (op << 3); + fsync(log->fd); + ret = lseek(log->fd, HEAD, SEEK_SET) < 0 || + write(log->fd, ext, EXTRA) != EXTRA ? -1 : 0; + fsync(log->fd); + return ret; } -/* write an unsigned long little-endian to byte buffer */ -local void dice_ulg(unsigned long val, unsigned char *buf) +/* Rewrite the last block header bits and subsequent zero bits to get to a byte + boundary, setting the last block bit if last is true, and then write the + remainder of the stored block header (length and one's complement). Leave + the file pointer after the end of the last stored block data. Return -1 if + there is a read or write failure on the foo.gz file */ +local int log_last(struct log *log, int last) { - int n; + int back, len, mask; + unsigned char buf[6]; + + /* determine the locations of the bytes and bits to modify */ + back = log->last == log->first ? log->back : 8; + len = back > 8 ? 2 : 1; /* bytes back from log->last */ + mask = 0x80 >> ((back - 1) & 7); /* mask for block last-bit */ + + /* get the byte to modify (one or two back) into buf[0] -- don't need to + read the byte if the last-bit is eight bits back, since in that case + the entire byte will be modified */ + buf[0] = 0; + if (back != 8 && (lseek(log->fd, log->last - len, SEEK_SET) < 0 || + read(log->fd, buf, 1) != 1)) + return -1; + + /* change the last-bit of the last stored block as requested -- note + that all bits above the last-bit are set to zero, per the type bits + of a stored block being 00 and per the convention that the bits to + bring the stream to a byte boundary are also zeros */ + buf[1] = 0; + buf[2 - len] = (*buf & (mask - 1)) + (last ? mask : 0); - for (n = 0; n < 4; n++) { - *buf++ = val & 0xff; - val >>= 8; + /* write the modified stored block header and lengths, move the file + pointer to after the last stored block data */ + PUT2(buf + 2, log->stored); + PUT2(buf + 4, log->stored ^ 0xffff); + return lseek(log->fd, log->last - len, SEEK_SET) < 0 || + write(log->fd, buf + 2 - len, len + 4) != len + 4 || + lseek(log->fd, log->stored, SEEK_CUR) < 0 ? -1 : 0; +} + +/* Append len bytes from data to the locked and open log file. len may be zero + if recovering and no .add file was found. In that case, the previous state + of the foo.gz file is restored. The data is appended uncompressed in + deflate stored blocks. Return -1 if there was an error reading or writing + the foo.gz file. */ +local int log_append(struct log *log, unsigned char *data, size_t len) +{ + uint put; + off_t end; + unsigned char buf[8]; + + /* set the last block last-bit and length, in case recovering an + interrupted append, then position the file pointer to append to the + block */ + if (log_last(log, 1)) + return -1; + + /* append, adding stored blocks and updating the offset of the last stored + block as needed, and update the total crc and length */ + while (len) { + /* append as much as we can to the last block */ + put = (MAX_STORE << 10) - log->stored; + if (put > len) + put = (uint)len; + if (put) { + if (write(log->fd, data, put) != put) + return -1; + BAIL(1); + log->tcrc = crc32(log->tcrc, data, put); + log->tlen += put; + log->stored += put; + data += put; + len -= put; + } + + /* if we need to, add a new empty stored block */ + if (len) { + /* mark current block as not last */ + if (log_last(log, 0)) + return -1; + + /* point to new, empty stored block */ + log->last += 4 + log->stored + 1; + log->stored = 0; + } + + /* mark last block as last, update its length */ + if (log_last(log, 1)) + return -1; + BAIL(2); } + + /* write the new crc and length trailer, and truncate just in case (could + be recovering from partial append with a missing foo.add file) */ + PUT4(buf, log->tcrc); + PUT4(buf + 4, log->tlen); + if (write(log->fd, buf, 8) != 8 || + (end = lseek(log->fd, 0, SEEK_CUR)) < 0 || ftruncate(log->fd, end)) + return -1; + + /* write the extra field, marking the log file as done, delete .add file */ + if (log_mark(log, NO_OP)) + return -1; + strcpy(log->end, ".add"); + unlink(log->path); /* ignore error, since may not exist */ + return 0; } -/* write an off_t little-endian to byte buffer */ -local void dice_off(off_t val, unsigned char *buf) +/* Replace the foo.dict file with the foo.temp file. Also delete the foo.add + file, since the compress operation may have been interrupted before that was + done. Returns 1 if memory could not be allocated, or -1 if reading or + writing foo.gz fails, or if the rename fails for some reason other than + foo.temp not existing. foo.temp not existing is a permitted error, since + the replace operation may have been interrupted after the rename is done, + but before foo.gz is marked as complete. */ +local int log_replace(struct log *log) { - int n; + int ret; + char *dest; + + /* delete foo.add file */ + strcpy(log->end, ".add"); + unlink(log->path); /* ignore error, since may not exist */ + BAIL(3); + + /* rename foo.name to foo.dict, replacing foo.dict if it exists */ + strcpy(log->end, ".dict"); + dest = malloc(strlen(log->path) + 1); + if (dest == NULL) + return -2; + strcpy(dest, log->path); + strcpy(log->end, ".temp"); + ret = rename(log->path, dest); + free(dest); + if (ret && errno != ENOENT) + return -1; + BAIL(4); - for (n = 0; n < 8; n++) { - *buf++ = val & 0xff; - val >>= 8; + /* mark the foo.gz file as done */ + return log_mark(log, NO_OP); +} + +/* Compress the len bytes at data and append the compressed data to the + foo.gz deflate data immediately after the previous compressed data. This + overwrites the previous uncompressed data, which was stored in foo.add + and is the data provided in data[0..len-1]. If this operation is + interrupted, it picks up at the start of this routine, with the foo.add + file read in again. If there is no data to compress (len == 0), then we + simply terminate the foo.gz file after the previously compressed data, + appending a final empty stored block and the gzip trailer. Return -1 if + reading or writing the log.gz file failed, or -2 if there was a memory + allocation failure. */ +local int log_compress(struct log *log, unsigned char *data, size_t len) +{ + int fd; + uint got, max; + ssize_t dict; + off_t end; + z_stream strm; + unsigned char buf[DICT]; + + /* compress and append compressed data */ + if (len) { + /* set up for deflate, allocating memory */ + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + if (deflateInit2(&strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, -15, 8, + Z_DEFAULT_STRATEGY) != Z_OK) + return -2; + + /* read in dictionary (last 32K of data that was compressed) */ + strcpy(log->end, ".dict"); + fd = open(log->path, O_RDONLY, 0); + if (fd >= 0) { + dict = read(fd, buf, DICT); + close(fd); + if (dict < 0) { + deflateEnd(&strm); + return -1; + } + if (dict) + deflateSetDictionary(&strm, buf, (uint)dict); + } + log_touch(log); + + /* prime deflate with last bits of previous block, position write + pointer to write those bits and overwrite what follows */ + if (lseek(log->fd, log->first - (log->back > 8 ? 2 : 1), + SEEK_SET) < 0 || + read(log->fd, buf, 1) != 1 || lseek(log->fd, -1, SEEK_CUR) < 0) { + deflateEnd(&strm); + return -1; + } + deflatePrime(&strm, (8 - log->back) & 7, *buf); + + /* compress, finishing with a partial non-last empty static block */ + strm.next_in = data; + max = (((uint)0 - 1) >> 1) + 1; /* in case int smaller than size_t */ + do { + strm.avail_in = len > max ? max : (uint)len; + len -= strm.avail_in; + do { + strm.avail_out = DICT; + strm.next_out = buf; + deflate(&strm, len ? Z_NO_FLUSH : Z_PARTIAL_FLUSH); + got = DICT - strm.avail_out; + if (got && write(log->fd, buf, got) != got) { + deflateEnd(&strm); + return -1; + } + log_touch(log); + } while (strm.avail_out == 0); + } while (len); + deflateEnd(&strm); + BAIL(5); + + /* find start of empty static block -- scanning backwards the first one + bit is the second bit of the block, if the last byte is zero, then + we know the byte before that has a one in the top bit, since an + empty static block is ten bits long */ + if ((log->first = lseek(log->fd, -1, SEEK_CUR)) < 0 || + read(log->fd, buf, 1) != 1) + return -1; + log->first++; + if (*buf) { + log->back = 1; + while ((*buf & ((uint)1 << (8 - log->back++))) == 0) + ; /* guaranteed to terminate, since *buf != 0 */ + } + else + log->back = 10; + + /* update compressed crc and length */ + log->ccrc = log->tcrc; + log->clen = log->tlen; + } + else { + /* no data to compress -- fix up existing gzip stream */ + log->tcrc = log->ccrc; + log->tlen = log->clen; } + + /* complete and truncate gzip stream */ + log->last = log->first; + log->stored = 0; + PUT4(buf, log->tcrc); + PUT4(buf + 4, log->tlen); + if (log_last(log, 1) || write(log->fd, buf, 8) != 8 || + (end = lseek(log->fd, 0, SEEK_CUR)) < 0 || ftruncate(log->fd, end)) + return -1; + BAIL(6); + + /* mark as being in the replace operation */ + if (log_mark(log, REPLACE_OP)) + return -1; + + /* execute the replace operation and mark the file as done */ + return log_replace(log); } -/* initial, empty gzip file for appending */ -local char empty_gz[] = { - 0x1f, 0x8b, /* magic gzip id */ - 8, /* compression method is deflate */ - 4, /* there is an extra field */ - 0, 0, 0, 0, /* no modification time provided */ - 0, 0xff, /* no extra flags, no OS */ - 20, 0, 'a', 'p', 16, 0, /* extra field with "ap" subfield */ - 32, 0, 0, 0, 0, 0, 0, 0, /* offset of uncompressed data */ - 32, 0, 0, 0, 0, 0, 0, 0, /* offset of last block */ - 1, 0, 0, 0xff, 0xff, /* empty stored block (last) */ - 0, 0, 0, 0, /* crc */ - 0, 0, 0, 0 /* uncompressed length */ -}; +/* log a repair record to the .repairs file */ +local void log_log(struct log *log, int op, char *record) +{ + time_t now; + FILE *rec; -/* initialize a log object with locking */ -void *gzlog_open(char *path) + now = time(NULL); + strcpy(log->end, ".repairs"); + rec = fopen(log->path, "a"); + if (rec == NULL) + return; + fprintf(rec, "%.24s %s recovery: %s\n", ctime(&now), op == APPEND_OP ? + "append" : (op == COMPRESS_OP ? "compress" : "replace"), record); + fclose(rec); + return; +} + +/* Recover the interrupted operation op. First read foo.add for recovering an + append or compress operation. Return -1 if there was an error reading or + writing foo.gz or reading an existing foo.add, or -2 if there was a memory + allocation failure. */ +local int log_recover(struct log *log, int op) { - unsigned xlen; - unsigned char temp[20]; - unsigned sub_len; - int good; - gz_log *log; - - /* allocate log structure */ - log = malloc(sizeof(gz_log)); - if (log == NULL) - return NULL; - log->id = GZLOGID; + int fd, ret = 0; + unsigned char *data = NULL; + size_t len = 0; + struct stat st; - /* open file, creating it if necessary, and locking it */ - log->fd = open(path, O_RDWR | O_CREAT, 0600); - if (log->fd < 0) { - free(log); - return NULL; + /* log recovery */ + log_log(log, op, "start"); + + /* load foo.add file if expected and present */ + if (op == APPEND_OP || op == COMPRESS_OP) { + strcpy(log->end, ".add"); + if (stat(log->path, &st) == 0 && st.st_size) { + len = (size_t)(st.st_size); + if (len != st.st_size || (data = malloc(st.st_size)) == NULL) { + log_log(log, op, "allocation failure"); + return -2; + } + if ((fd = open(log->path, O_RDONLY, 0)) < 0) { + log_log(log, op, ".add file read failure"); + return -1; + } + ret = read(fd, data, len) != len; + close(fd); + if (ret) { + log_log(log, op, ".add file read failure"); + return -1; + } + log_log(log, op, "loaded .add file"); + } + else + log_log(log, op, "missing .add file!"); + } + + /* recover the interrupted operation */ + switch (op) { + case APPEND_OP: + ret = log_append(log, data, len); + break; + case COMPRESS_OP: + ret = log_compress(log, data, len); + break; + case REPLACE_OP: + ret = log_replace(log); } - if (lock(log->fd)) { + + /* log status */ + log_log(log, op, ret ? "failure" : "complete"); + + /* clean up */ + if (data != NULL) + free(data); + return ret; +} + +/* Close the foo.gz file (if open) and release the lock. */ +local void log_close(struct log *log) +{ + if (log->fd >= 0) close(log->fd); - free(log); - return NULL; + log->fd = -1; + log_unlock(log); +} + +/* Open foo.gz, verify the header, and load the extra field contents, after + first creating the foo.lock file to gain exclusive access to the foo.* + files. If foo.gz does not exist or is empty, then write the initial header, + extra, and body content of an empty foo.gz log file. If there is an error + creating the lock file due to access restrictions, or an error reading or + writing the foo.gz file, or if the foo.gz file is not a proper log file for + this object (e.g. not a gzip file or does not contain the expected extra + field), then return true. If there is an error, the lock is released. + Otherwise, the lock is left in place. */ +local int log_open(struct log *log) +{ + int op; + + /* release open file resource if left over -- can occur if lock lost + between gzlog_open() and gzlog_write() */ + if (log->fd >= 0) + close(log->fd); + log->fd = -1; + + /* negotiate exclusive access */ + if (log_lock(log) < 0) + return -1; + + /* open the log file, foo.gz */ + strcpy(log->end, ".gz"); + log->fd = open(log->path, O_RDWR | O_CREAT, 0644); + if (log->fd < 0) { + log_close(log); + return -1; } - /* if file is empty, write new gzip stream */ + /* if new, initialize foo.gz with an empty log, delete old dictionary */ if (lseek(log->fd, 0, SEEK_END) == 0) { - if (write(log->fd, empty_gz, sizeof(empty_gz)) != sizeof(empty_gz)) { - log_clean(log); - return NULL; + if (write(log->fd, log_gzhead, HEAD) != HEAD || + write(log->fd, log_gzext, EXTRA) != EXTRA || + write(log->fd, log_gzbody, BODY) != BODY) { + log_close(log); + return -1; } + strcpy(log->end, ".dict"); + unlink(log->path); } - /* check gzip header */ - (void)lseek(log->fd, 0, SEEK_SET); - if (read(log->fd, temp, 12) != 12 || temp[0] != 0x1f || - temp[1] != 0x8b || temp[2] != 8 || (temp[3] & 4) == 0) { - log_clean(log); - return NULL; + /* verify log file and load extra field information */ + if ((op = log_head(log)) < 0) { + log_close(log); + return -1; } - /* process extra field to find "ap" sub-field */ - xlen = temp[10] + (temp[11] << 8); - good = 0; - while (xlen) { - if (xlen < 4 || read(log->fd, temp, 4) != 4) - break; - sub_len = temp[2]; - sub_len += temp[3] << 8; - xlen -= 4; - if (memcmp(temp, "ap", 2) == 0 && sub_len == 16) { - good = 1; - break; - } - if (xlen < sub_len) - break; - (void)lseek(log->fd, sub_len, SEEK_CUR); - xlen -= sub_len; + /* check for interrupted process and if so, recover */ + if (op != NO_OP && log_recover(log, op)) { + log_close(log); + return -1; } - if (!good) { - log_clean(log); + + /* touch the lock file to prevent another process from grabbing it */ + log_touch(log); + return 0; +} + +/* See gzlog.h for the description of the external methods below */ +gzlog *gzlog_open(char *path) +{ + size_t n; + struct log *log; + + /* check arguments */ + if (path == NULL || *path == 0) return NULL; - } - /* read in "ap" sub-field */ - log->extra = lseek(log->fd, 0, SEEK_CUR); - if (read(log->fd, temp, 16) != 16) { - log_clean(log); + /* allocate and initialize log structure */ + log = malloc(sizeof(struct log)); + if (log == NULL) + return NULL; + strcpy(log->id, LOGID); + log->fd = -1; + + /* save path and end of path for name construction */ + n = strlen(path); + log->path = malloc(n + 9); /* allow for ".repairs" */ + if (log->path == NULL) { + free(log); return NULL; } - log->mark_off = make_off(temp); - log->last_off = make_off(temp + 8); - - /* get crc, length of gzip file */ - (void)lseek(log->fd, log->last_off, SEEK_SET); - if (read(log->fd, temp, 13) != 13 || - memcmp(temp, "\001\000\000\377\377", 5) != 0) { - log_clean(log); + strcpy(log->path, path); + log->end = log->path + n; + + /* gain exclusive access and verify log file -- may perform a + recovery operation if needed */ + if (log_open(log)) { + free(log->path); + free(log); return NULL; } - log->crc = make_ulg(temp + 5); - log->len = make_ulg(temp + 9); - /* set up to write over empty last block */ - (void)lseek(log->fd, log->last_off + 5, SEEK_SET); - log->stored = 0; - return (void *)log; + /* return pointer to log structure */ + return log; } -/* maximum amount to put in a stored block before starting a new one */ -#define MAX_BLOCK 16384 - -/* write a block to a log object */ -int gzlog_write(void *obj, char *data, size_t len) +/* gzlog_compress() return values: + 0: all good + -1: file i/o error (usually access issue) + -2: memory allocation failure + -3: invalid log pointer argument */ +int gzlog_compress(gzlog *logd) { - size_t some; - unsigned char temp[5]; - gz_log *log; + int fd, ret; + uint block; + size_t len, next; + unsigned char *data, buf[5]; + struct log *log = logd; - /* check object */ - log = (gz_log *)obj; - if (log == NULL || log->id != GZLOGID) - return 1; + /* check arguments */ + if (log == NULL || strcmp(log->id, LOGID) || len < 0) + return -3; - /* write stored blocks until all of the input is written */ - do { - some = MAX_BLOCK - log->stored; - if (some > len) - some = len; - if (write(log->fd, data, some) != some) - return 1; - log->crc = crc32(log->crc, (unsigned char *)data, some); - log->len += some; - len -= some; - data += some; - log->stored += some; - - /* if the stored block is full, end it and start another */ - if (log->stored == MAX_BLOCK) { - (void)lseek(log->fd, log->last_off, SEEK_SET); - temp[0] = 0; - dice_ulg(log->stored + ((unsigned long)(~log->stored) << 16), - temp + 1); - if (write(log->fd, temp, 5) != 5) - return 1; - log->last_off = lseek(log->fd, log->stored, SEEK_CUR); - (void)lseek(log->fd, 5, SEEK_CUR); - log->stored = 0; - } - } while (len); - return 0; -} + /* see if we lost the lock -- if so get it again and reload the extra + field information (it probably changed), recover last operation if + necessary */ + if (log_check(log) && log_open(log)) + return -1; -/* recompress the remaining stored deflate data in place */ -local int recomp(gz_log *log) -{ - z_stream strm; - size_t len, max; - unsigned char *in; - unsigned char *out; - unsigned char temp[16]; - - /* allocate space and read it all in (it's around 1 MB) */ - len = log->last_off - log->mark_off; - max = len + (len >> 12) + (len >> 14) + 11; - out = malloc(max); - if (out == NULL) - return 1; - in = malloc(len); - if (in == NULL) { - free(out); - return 1; - } - (void)lseek(log->fd, log->mark_off, SEEK_SET); - if (read(log->fd, in, len) != len) { - free(in); - free(out); - return 1; - } + /* create space for uncompressed data */ + len = ((size_t)(log->last - log->first) & ~(((size_t)1 << 10) - 1)) + + log->stored; + if ((data = malloc(len)) == NULL) + return -2; - /* recompress in memory, decoding stored data as we go */ - /* note: this assumes that unsigned is four bytes or more */ - /* consider not making that assumption */ - strm.zalloc = Z_NULL; - strm.zfree = Z_NULL; - strm.opaque = Z_NULL; - if (deflateInit2(&strm, Z_BEST_COMPRESSION, Z_DEFLATED, -15, 8, - Z_DEFAULT_STRATEGY) != Z_OK) { - free(in); - free(out); - return 1; - } - strm.next_in = in; - strm.avail_out = max; - strm.next_out = out; - while (len >= 5) { - if (strm.next_in[0] != 0) + /* do statement here is just a cheap trick for error handling */ + do { + /* read in the uncompressed data */ + if (lseek(log->fd, log->first - 1, SEEK_SET) < 0) break; - strm.avail_in = strm.next_in[1] + (strm.next_in[2] << 8); - strm.next_in += 5; - len -= 5; - if (strm.avail_in != 0) { - if (len < strm.avail_in) + next = 0; + while (next < len) { + if (read(log->fd, buf, 5) != 5) break; - len -= strm.avail_in; - (void)deflate(&strm, Z_NO_FLUSH); - if (strm.avail_in != 0 || strm.avail_out == 0) + block = PULL2(buf + 1); + if (next + block > len || + read(log->fd, (char *)data + next, block) != block) break; + next += block; } - } - (void)deflate(&strm, Z_SYNC_FLUSH); - (void)deflateEnd(&strm); - free(in); - if (len != 0 || strm.avail_out == 0) { - free(out); - return 1; - } + if (lseek(log->fd, 0, SEEK_CUR) != log->last + 4 + log->stored) + break; + log_touch(log); - /* overwrite stored data with compressed data */ - (void)lseek(log->fd, log->mark_off, SEEK_SET); - len = max - strm.avail_out; - if (write(log->fd, out, len) != len) { - free(out); - return 1; - } - free(out); - - /* write last empty block, crc, and length */ - log->mark_off = log->last_off = lseek(log->fd, 0, SEEK_CUR); - temp[0] = 1; - dice_ulg(0xffffL << 16, temp + 1); - dice_ulg(log->crc, temp + 5); - dice_ulg(log->len, temp + 9); - if (write(log->fd, temp, 13) != 13) - return 1; + /* write the uncompressed data to the .add file */ + strcpy(log->end, ".add"); + fd = open(log->path, O_WRONLY | O_CREAT | O_TRUNC, 0644); + if (fd < 0) + break; + ret = write(fd, data, len) != len; + if (ret | close(fd)) + break; + log_touch(log); - /* truncate file to discard remaining stored data and old trailer */ - ftruncate(log->fd, lseek(log->fd, 0, SEEK_CUR)); + /* write the dictionary for the next compress to the .temp file */ + strcpy(log->end, ".temp"); + fd = open(log->path, O_WRONLY | O_CREAT | O_TRUNC, 0644); + if (fd < 0) + break; + next = DICT > len ? len : DICT; + ret = write(fd, (char *)data + len - next, next) != next; + if (ret | close(fd)) + break; + log_touch(log); - /* update extra field to point to new last empty block */ - (void)lseek(log->fd, log->extra, SEEK_SET); - dice_off(log->mark_off, temp); - dice_off(log->last_off, temp + 8); - if (write(log->fd, temp, 16) != 16) - return 1; - return 0; -} + /* roll back to compressed data, mark the compress in progress */ + log->last = log->first; + log->stored = 0; + if (log_mark(log, COMPRESS_OP)) + break; + BAIL(7); + + /* compress and append the data (clears mark) */ + ret = log_compress(log, data, len); + free(data); + return ret; + } while (0); -/* maximum accumulation of stored blocks before compressing */ -#define MAX_STORED 1048576 + /* broke out of do above on i/o error */ + free(data); + return -1; +} -/* close log object */ -int gzlog_close(void *obj) +/* gzlog_write() return values: + 0: all good + -1: file i/o error (usually access issue) + -2: memory allocation failure + -3: invalid log pointer argument */ +int gzlog_write(gzlog *logd, void *data, size_t len) { - unsigned char temp[8]; - gz_log *log; + int fd, ret; + struct log *log = logd; - /* check object */ - log = (gz_log *)obj; - if (log == NULL || log->id != GZLOGID) - return 1; + /* check arguments */ + if (log == NULL || strcmp(log->id, LOGID) || len < 0) + return -3; + if (data == NULL || len == 0) + return 0; - /* go to start of most recent block being written */ - (void)lseek(log->fd, log->last_off, SEEK_SET); - - /* if some stuff was put there, update block */ - if (log->stored) { - temp[0] = 0; - dice_ulg(log->stored + ((unsigned long)(~log->stored) << 16), - temp + 1); - if (write(log->fd, temp, 5) != 5) - return 1; - log->last_off = lseek(log->fd, log->stored, SEEK_CUR); - } + /* see if we lost the lock -- if so get it again and reload the extra + field information (it probably changed), recover last operation if + necessary */ + if (log_check(log) && log_open(log)) + return -1; - /* write last block (empty) */ - if (write(log->fd, "\001\000\000\377\377", 5) != 5) - return 1; + /* create and write .add file */ + strcpy(log->end, ".add"); + fd = open(log->path, O_WRONLY | O_CREAT | O_TRUNC, 0644); + if (fd < 0) + return -1; + ret = write(fd, data, len) != len; + if (ret | close(fd)) + return -1; + log_touch(log); - /* write updated crc and uncompressed length */ - dice_ulg(log->crc, temp); - dice_ulg(log->len, temp + 4); - if (write(log->fd, temp, 8) != 8) - return 1; + /* mark log file with append in progress */ + if (log_mark(log, APPEND_OP)) + return -1; + BAIL(8); - /* put offset of that last block in gzip extra block */ - (void)lseek(log->fd, log->extra + 8, SEEK_SET); - dice_off(log->last_off, temp); - if (write(log->fd, temp, 8) != 8) - return 1; + /* append data (clears mark) */ + if (log_append(log, data, len)) + return -1; - /* if more than 1 MB stored, then time to compress it */ - if (log->last_off - log->mark_off > MAX_STORED) { - if (recomp(log)) - return 1; - } + /* check to see if it's time to compress -- if not, then done */ + if (((log->last - log->first) >> 10) + (log->stored >> 10) < TRIGGER) + return 0; + + /* time to compress */ + return gzlog_compress(log); +} + +/* gzlog_close() return values: + 0: ok + -3: invalid log pointer argument */ +int gzlog_close(gzlog *logd) +{ + struct log *log = logd; - /* unlock and close file */ - log_clean(log); + /* check arguments */ + if (log == NULL || strcmp(log->id, LOGID)) + return -3; + + /* close the log file and release the lock */ + log_close(log); + + /* free structure and return */ + if (log->path != NULL) + free(log->path); + strcpy(log->id, "bad"); + free(log); return 0; } diff --git a/examples/gzlog.h b/examples/gzlog.h index a800bd5..c461426 100644 --- a/examples/gzlog.h +++ b/examples/gzlog.h @@ -1,6 +1,6 @@ /* gzlog.h - Copyright (C) 2004 Mark Adler, all rights reserved - version 1.0, 26 Nov 2004 + Copyright (C) 2004, 2008 Mark Adler, all rights reserved + version 2.0, 25 Apr 2008 This software is provided 'as-is', without any express or implied warranty. In no event will the author be held liable for any damages @@ -21,38 +21,69 @@ Mark Adler madler@alumni.caltech.edu */ +/* Version History: + 1.0 26 Nov 2004 First version + 2.0 25 Apr 2008 Complete redesign for recovery of interrupted operations + Interface changed slightly in that now path is a prefix + Compression now occurs as needed during gzlog_write() + gzlog_write() now always leaves the log file as valid gzip + */ + /* The gzlog object allows writing short messages to a gzipped log file, opening the log file locked for small bursts, and then closing it. The log - object works by appending stored data to the gzip file until 1 MB has been - accumulated. At that time, the stored data is compressed, and replaces the - uncompressed data in the file. The log file is truncated to its new size at - that time. After closing, the log file is always valid gzip file that can - decompressed to recover what was written. - - A gzip header "extra" field contains two file offsets for appending. The - first points to just after the last compressed data. The second points to - the last stored block in the deflate stream, which is empty. All of the - data between those pointers is uncompressed. + object works by appending stored (uncompressed) data to the gzip file until + 1 MB has been accumulated. At that time, the stored data is compressed, and + replaces the uncompressed data in the file. The log file is truncated to + its new size at that time. After each write operation, the log file is a + valid gzip file that can decompressed to recover what was written. + + The gzlog operations can be interupted at any point due to an application or + system crash, and the log file will be recovered the next time the log is + opened with gzlog_open(). */ +#ifndef GZLOG_H +#define GZLOG_H + +/* gzlog object type */ +typedef void gzlog; + /* Open a gzlog object, creating the log file if it does not exist. Return - NULL on error. Note that gzlog_open() could take a long time to return if - there is difficulty in locking the file. */ -void *gzlog_open(char *path); - -/* Write to a gzlog object. Return non-zero on error. This function will - simply write data to the file uncompressed. Compression of the data - will not occur until gzlog_close() is called. It is expected that - gzlog_write() is used for a short message, and then gzlog_close() is - called. If a large amount of data is to be written, then the application - should write no more than 1 MB at a time with gzlog_write() before - calling gzlog_close() and then gzlog_open() again. */ -int gzlog_write(void *log, char *data, size_t len); - -/* Close a gzlog object. Return non-zero on error. The log file is locked - until this function is called. This function will compress stored data - at the end of the gzip file if at least 1 MB has been accumulated. Note - that the file will not be a valid gzip file until this function completes. - */ -int gzlog_close(void *log); + NULL on error. Note that gzlog_open() could take a while to complete if it + has to wait to verify that a lock is stale (possibly for five minutes), or + if there is significant contention with other instantiations of this object + when locking the resource. path is the prefix of the file names created by + this object. If path is "foo", then the log file will be "foo.gz", and + other auxiliary files will be created and destroyed during the process: + "foo.dict" for a compression dictionary, "foo.temp" for a temporary (next) + dictionary, "foo.add" for data being added or compressed, "foo.lock" for the + lock file, and "foo.repairs" to log recovery operations performed due to + interrupted gzlog operations. A gzlog_open() followed by a gzlog_close() + will recover a previously interrupted operation, if any. */ +gzlog *gzlog_open(char *path); + +/* Write to a gzlog object. Return zero on success, -1 if there is a file i/o + error on any of the gzlog files (this should not happen if gzlog_open() + succeeded, unless the device has run out of space or leftover auxiliary + files have permissions or ownership that prevent their use), -2 if there is + a memory allocation failure, or -3 if the log argument is invalid (e.g. if + it was not created by gzlog_open()). This function will write data to the + file uncompressed, until 1 MB has been accumulated, at which time that data + will be compressed. The log file will be a valid gzip file upon successful + return. */ +int gzlog_write(gzlog *log, void *data, size_t len); + +/* Force compression of any uncompressed data in the log. This should be used + sparingly, if at all. The main application would be when a log file will + not be appended to again. If this is used to compress frequently while + appending, it will both significantly increase the execution time and + reduce the compression ratio. The return codes are the same as for + gzlog_write(). */ +int gzlog_compress(gzlog *log); + +/* Close a gzlog object. Return zero on success, -3 if the log argument is + invalid. The log object is freed, and so cannot be referenced again. */ +int gzlog_close(gzlog *log); + +#endif diff --git a/examples/pigz.c b/examples/pigz.c new file mode 100644 index 0000000..42794d0 --- /dev/null +++ b/examples/pigz.c @@ -0,0 +1,452 @@ +/* pigz.c -- parallel implementation of gzip + * Copyright (C) 2007 Mark Adler + * Version 1.1 28 January 2007 Mark Adler + */ + +/* Version history: + 1.0 17 Jan 2007 First version + 1.1 28 Jan 2007 Avoid void * arithmetic (some compilers don't get that) + Add note about requiring zlib 1.2.3 + Allow compression level 0 (no compression) + Completely rewrite parallelism -- add a write thread + Use deflateSetDictionary() to make use of history + Tune argument defaults to best performance on four cores + */ + +/* + pigz compresses from stdin to stdout using threads to make use of multiple + processors and cores. The input is broken up into 128 KB chunks, and each + is compressed separately. The CRC for each chunk is also calculated + separately. The compressed chunks are written in order to the output, + and the overall CRC is calculated from the CRC's of the chunks. + + The compressed data format generated is the gzip format using the deflate + compression method. First a gzip header is written, followed by raw deflate + partial streams. They are partial, in that they do not have a terminating + block. At the end, the deflate stream is terminated with a final empty + static block, and lastly a gzip trailer is written with the CRC and the + number of input bytes. + + Each raw deflate partial stream is terminated by an empty stored block + (using the Z_SYNC_FLUSH option of zlib), in order to end that partial + bit stream at a byte boundary. That allows the partial streams to be + concantenated simply as sequences of bytes. This adds a very small four + or five byte overhead to the output for each input chunk. + + zlib's crc32_combine() routine allows the calcuation of the CRC of the + entire input using the independent CRC's of the chunks. pigz requires zlib + version 1.2.3 or later, since that is the first version that provides the + crc32_combine() function. + + pigz uses the POSIX pthread library for thread control and communication. + */ + +#include +#include +#include +#include +#include +#include +#include +#include "zlib.h" + +#define local static + +/* exit with error */ +local void bail(char *msg) +{ + fprintf(stderr, "pigz abort: %s\n", msg); + exit(1); +} + +/* read up to len bytes into buf, repeating read() calls as needed */ +local size_t readn(int desc, unsigned char *buf, size_t len) +{ + ssize_t ret; + size_t got; + + got = 0; + while (len) { + ret = read(desc, buf, len); + if (ret < 0) + bail("read error"); + if (ret == 0) + break; + buf += ret; + len -= ret; + got += ret; + } + return got; +} + +/* write len bytes, repeating write() calls as needed */ +local void writen(int desc, unsigned char *buf, size_t len) +{ + ssize_t ret; + + while (len) { + ret = write(desc, buf, len); + if (ret < 1) + bail("write error"); + buf += ret; + len -= ret; + } +} + +/* a flag variable for communication between two threads */ +struct flag { + int value; /* value of flag */ + pthread_mutex_t lock; /* lock for checking and changing flag */ + pthread_cond_t cond; /* condition for signaling on flag change */ +}; + +/* initialize a flag for use, starting with value val */ +local void flag_init(struct flag *me, int val) +{ + me->value = val; + pthread_mutex_init(&(me->lock), NULL); + pthread_cond_init(&(me->cond), NULL); +} + +/* set the flag to val, signal another process that may be waiting for it */ +local void flag_set(struct flag *me, int val) +{ + pthread_mutex_lock(&(me->lock)); + me->value = val; + pthread_cond_signal(&(me->cond)); + pthread_mutex_unlock(&(me->lock)); +} + +/* if it isn't already, wait for some other thread to set the flag to val */ +local void flag_wait(struct flag *me, int val) +{ + pthread_mutex_lock(&(me->lock)); + while (me->value != val) + pthread_cond_wait(&(me->cond), &(me->lock)); + pthread_mutex_unlock(&(me->lock)); +} + +/* if flag is equal to val, wait for some other thread to change it */ +local void flag_wait_not(struct flag *me, int val) +{ + pthread_mutex_lock(&(me->lock)); + while (me->value == val) + pthread_cond_wait(&(me->cond), &(me->lock)); + pthread_mutex_unlock(&(me->lock)); +} + +/* clean up the flag when done with it */ +local void flag_done(struct flag *me) +{ + pthread_cond_destroy(&(me->cond)); + pthread_mutex_destroy(&(me->lock)); +} + +/* a unit of work to feed to compress_thread() -- it is assumed that the out + buffer is large enough to hold the maximum size len bytes could deflate to, + plus five bytes for the final sync marker */ +struct work { + size_t len; /* length of input */ + unsigned long crc; /* crc of input */ + unsigned char *buf; /* input */ + unsigned char *out; /* space for output (guaranteed big enough) */ + z_stream strm; /* pre-initialized z_stream */ + struct flag busy; /* busy flag indicating work unit in use */ + pthread_t comp; /* this compression thread */ +}; + +/* busy flag values */ +#define IDLE 0 /* compress and writing done -- can start compress */ +#define COMP 1 /* compress -- input and output buffers in use */ +#define WRITE 2 /* compress done, writing output -- can read input */ + +/* read-only globals (set by main/read thread before others started) */ +local int ind; /* input file descriptor */ +local int outd; /* output file descriptor */ +local int level; /* compression level */ +local int procs; /* number of compression threads (>= 2) */ +local size_t size; /* uncompressed input size per thread (>= 32K) */ +local struct work *jobs; /* work units: jobs[0..procs-1] */ + +/* next and previous jobs[] indices */ +#define NEXT(n) ((n) == procs - 1 ? 0 : (n) + 1) +#define PREV(n) ((n) == 0 ? procs - 1 : (n) - 1) + +/* sliding dictionary size for deflate */ +#define DICT 32768U + +/* largest power of 2 that fits in an unsigned int -- used to limit requests + to zlib functions that use unsigned int lengths */ +#define MAX ((((unsigned)-1) >> 1) + 1) + +/* compress thread: compress the input in the provided work unit and compute + its crc -- assume that the amount of space at job->out is guaranteed to be + enough for the compressed output, as determined by the maximum expansion + of deflate compression -- use the input in the previous work unit (if there + is one) to set the deflate dictionary for better compression */ +local void *compress_thread(void *arg) +{ + size_t len; /* input length for this work unit */ + unsigned long crc; /* crc of input data */ + struct work *prev; /* previous work unit */ + struct work *job = arg; /* work unit for this thread */ + z_stream *strm = &(job->strm); /* zlib stream for this work unit */ + + /* reset state for a new compressed stream */ + (void)deflateReset(strm); + + /* initialize input, output, and crc */ + strm->next_in = job->buf; + strm->next_out = job->out; + len = job->len; + crc = crc32(0L, Z_NULL, 0); + + /* set dictionary if this isn't the first work unit, and if we will be + compressing something (the read thread assures that the dictionary + data in the previous work unit is still there) */ + prev = jobs + PREV(job - jobs); + if (prev->buf != NULL && len != 0) + deflateSetDictionary(strm, prev->buf + (size - DICT), DICT); + + /* run MAX-sized amounts of input through deflate and crc32 -- this loop + is needed for those cases where the integer type is smaller than the + size_t type, or when len is close to the limit of the size_t type */ + while (len > MAX) { + strm->avail_in = MAX; + strm->avail_out = (unsigned)-1; + crc = crc32(crc, strm->next_in, strm->avail_in); + (void)deflate(strm, Z_NO_FLUSH); + len -= MAX; + } + + /* run last piece through deflate and crc32, follow with a sync marker */ + if (len) { + strm->avail_in = len; + strm->avail_out = (unsigned)-1; + crc = crc32(crc, strm->next_in, strm->avail_in); + (void)deflate(strm, Z_SYNC_FLUSH); + } + + /* don't need to Z_FINISH, since we'd delete the last two bytes anyway */ + + /* return result */ + job->crc = crc; + return NULL; +} + +/* put a 4-byte integer into a byte array in LSB order */ +#define PUT4(a,b) (*(a)=(b),(a)[1]=(b)>>8,(a)[2]=(b)>>16,(a)[3]=(b)>>24) + +/* write thread: wait for compression threads to complete, write output in + order, also write gzip header and trailer around the compressed data */ +local void *write_thread(void *arg) +{ + int n; /* compress thread index */ + size_t len; /* length of input processed */ + unsigned long tot; /* total uncompressed size (overflow ok) */ + unsigned long crc; /* CRC-32 of uncompressed data */ + unsigned char wrap[10]; /* gzip header or trailer */ + + /* write simple gzip header */ + memcpy(wrap, "\037\213\10\0\0\0\0\0\0\3", 10); + wrap[8] = level == 9 ? 2 : (level == 1 ? 4 : 0); + writen(outd, wrap, 10); + + /* process output of compress threads until end of input */ + tot = 0; + crc = crc32(0L, Z_NULL, 0); + n = 0; + do { + /* wait for compress thread to start, then wait to complete */ + flag_wait(&(jobs[n].busy), COMP); + pthread_join(jobs[n].comp, NULL); + + /* now that compress is done, allow read thread to use input buffer */ + flag_set(&(jobs[n].busy), WRITE); + + /* write compressed data and update length and crc */ + writen(outd, jobs[n].out, jobs[n].strm.next_out - jobs[n].out); + len = jobs[n].len; + tot += len; + crc = crc32_combine(crc, jobs[n].crc, len); + + /* release this work unit and go to the next work unit */ + flag_set(&(jobs[n].busy), IDLE); + n = NEXT(n); + + /* an input buffer less than size in length indicates end of input */ + } while (len == size); + + /* write final static block and gzip trailer (crc and len mod 2^32) */ + wrap[0] = 3; wrap[1] = 0; + PUT4(wrap + 2, crc); + PUT4(wrap + 6, tot); + writen(outd, wrap, 10); + return NULL; +} + +/* one-time initialization of a work unit -- this is where we set the deflate + compression level and request raw deflate, and also where we set the size + of the output buffer to guarantee enough space for a worst-case deflate + ending with a Z_SYNC_FLUSH */ +local void job_init(struct work *job) +{ + int ret; /* deflateInit2() return value */ + + job->buf = malloc(size); + job->out = malloc(size + (size >> 11) + 10); + job->strm.zfree = Z_NULL; + job->strm.zalloc = Z_NULL; + job->strm.opaque = Z_NULL; + ret = deflateInit2(&(job->strm), level, Z_DEFLATED, -15, 8, + Z_DEFAULT_STRATEGY); + if (job->buf == NULL || job->out == NULL || ret != Z_OK) + bail("not enough memory"); +} + +/* compress ind to outd in the gzip format, using multiple threads for the + compression and crc calculation and another thread for writing the output -- + the read thread is the main thread */ +local void read_thread(void) +{ + int n; /* general index */ + size_t got; /* amount read */ + pthread_attr_t attr; /* thread attributes (left at defaults) */ + pthread_t write; /* write thread */ + + /* set defaults (not all pthread implementations default to joinable) */ + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + /* allocate and set up work list (individual work units will be initialized + as needed, in case the input is short), assure that allocation size + arithmetic does not overflow */ + if (size + (size >> 11) + 10 < (size >> 11) + 10 || + (ssize_t)(size + (size >> 11) + 10) < 0 || + ((size_t)0 - 1) / procs <= sizeof(struct work) || + (jobs = malloc(procs * sizeof(struct work))) == NULL) + bail("not enough memory"); + for (n = 0; n < procs; n++) { + jobs[n].buf = NULL; + flag_init(&(jobs[n].busy), IDLE); + } + + /* start write thread */ + pthread_create(&write, &attr, write_thread, NULL); + + /* read from input and start compress threads (write thread will pick up + the output of the compress threads) */ + n = 0; + do { + /* initialize this work unit if it's the first time it's used */ + if (jobs[n].buf == NULL) + job_init(jobs + n); + + /* read input data, but wait for last compress on this work unit to be + done, and wait for the dictionary to be used by the last compress on + the next work unit */ + flag_wait_not(&(jobs[n].busy), COMP); + flag_wait_not(&(jobs[NEXT(n)].busy), COMP); + got = readn(ind, jobs[n].buf, size); + + /* start compress thread, but wait for write to be done first */ + flag_wait(&(jobs[n].busy), IDLE); + jobs[n].len = got; + pthread_create(&(jobs[n].comp), &attr, compress_thread, jobs + n); + + /* mark work unit so write thread knows compress was started */ + flag_set(&(jobs[n].busy), COMP); + + /* go to the next work unit */ + n = NEXT(n); + + /* do until end of input, indicated by a read less than size */ + } while (got == size); + + /* wait for the write thread to complete -- the write thread will join with + all of the compress threads, so this waits for all of the threads to + complete */ + pthread_join(write, NULL); + + /* free up all requested resources and return */ + for (n = procs - 1; n >= 0; n--) { + flag_done(&(jobs[n].busy)); + (void)deflateEnd(&(jobs[n].strm)); + free(jobs[n].out); + free(jobs[n].buf); + } + free(jobs); + pthread_attr_destroy(&attr); +} + +/* Process arguments for level, size, and procs, compress from stdin to + stdout in the gzip format. Note that procs must be at least two in + order to provide a dictionary in one work unit for the other work + unit, and that size must be at least 32K to store a full dictionary. */ +int main(int argc, char **argv) +{ + int n; /* general index */ + int get; /* command line parameters to get */ + char *arg; /* command line argument */ + + /* set defaults -- 32 processes and 128K buffers was found to provide + good utilization of four cores (about 97%) and balanced the overall + execution time impact of more threads against more dictionary + processing for a fixed amount of memory -- the memory usage for these + settings and full use of all work units (at least 4 MB of input) is + 16.2 MB + */ + level = Z_DEFAULT_COMPRESSION; + procs = 32; + size = 131072UL; + + /* process command-line arguments */ + get = 0; + for (n = 1; n < argc; n++) { + arg = argv[n]; + if (*arg == '-') { + while (*++arg) + if (*arg >= '0' && *arg <= '9') /* compression level */ + level = *arg - '0'; + else if (*arg == 'b') /* chunk size in K */ + get |= 1; + else if (*arg == 'p') /* number of processes */ + get |= 2; + else if (*arg == 'h') { /* help */ + fputs("usage: pigz [-0..9] [-b blocksizeinK]", stderr); + fputs(" [-p processes] < foo > foo.gz\n", stderr); + return 0; + } + else + bail("invalid option"); + } + else if (get & 1) { + if (get & 2) + bail("you need to separate the -b and -p options"); + size = (size_t)(atol(arg)) << 10; /* chunk size */ + if (size < DICT) + bail("invalid option"); + get = 0; + } + else if (get & 2) { + procs = atoi(arg); /* processes */ + if (procs < 2) + bail("invalid option"); + get = 0; + } + else + bail("invalid option (you need to pipe input and output)"); + } + if (get) + bail("missing option argument"); + + /* do parallel compression from stdin to stdout (the read thread starts up + the write thread and the compression threads, and they all join before + the read thread returns) */ + ind = 0; + outd = 1; + read_thread(); + + /* done */ + return 0; +} -- cgit v1.2.1