diff options
author | Wayne Davison <wayne@opencoder.net> | 2020-05-25 13:31:30 -0700 |
---|---|---|
committer | Wayne Davison <wayne@opencoder.net> | 2020-05-25 13:44:48 -0700 |
commit | 4aaadc2f2970a22676df60be593c63fa37b49700 (patch) | |
tree | b86728e1f65fa908cb332f7e1b89b356716fbcc1 /token.c | |
parent | abef92c03767d0c5dc9070eba15805f0f7075e6c (diff) | |
download | rsync-4aaadc2f2970a22676df60be593c63fa37b49700.tar.gz |
Include zstd compression support.
Based on a patch by Sebastian A. Siewior. Fixes bug #14338.
Diffstat (limited to 'token.c')
-rw-r--r-- | token.c | 241 |
1 files changed, 241 insertions, 0 deletions
@@ -22,6 +22,9 @@ #include "rsync.h" #include "itypes.h" #include <zlib.h> +#ifdef SUPPORT_ZSTD +#include <zstd.h> +#endif extern int do_compression; extern int protocol_version; @@ -58,6 +61,14 @@ void init_compression_level(void) if (do_compression_level == Z_DEFAULT_COMPRESSION) do_compression_level = def_level; break; +#ifdef SUPPORT_ZSTD + case CPRES_ZSTD: + min_level = ZSTD_minCLevel(); + max_level = ZSTD_maxCLevel(); + def_level = 3; + off_level = CLVL_NOT_SPECIFIED; + break; +#endif default: /* paranoia to prevent missing case values */ exit_cleanup(RERR_UNSUPPORTED); } @@ -648,6 +659,228 @@ static void see_deflate_token(char *buf, int32 len) } while (len || rx_strm.avail_out == 0); } +#ifdef SUPPORT_ZSTD + +static ZSTD_inBuffer zstd_in_buff; +static ZSTD_outBuffer zstd_out_buff; +static ZSTD_CCtx *zstd_cctx; + +static void send_zstd_token(int f, int32 token, struct map_struct *buf, + OFF_T offset, int32 nb) +{ + static int comp_init_done, flush_pending; + ZSTD_EndDirective flush = ZSTD_e_continue; + int32 n, r; + + /* initialization */ + if (!comp_init_done) { + + zstd_cctx = ZSTD_createCCtx(); + if (!zstd_cctx) { + rprintf(FERROR, "compression init failed\n"); + exit_cleanup(RERR_PROTOCOL); + } + + obuf = new_array(char, MAX_DATA_COUNT + 2); + if (!obuf) + out_of_memory("send_deflated_token"); + + ZSTD_CCtx_setParameter(zstd_cctx, ZSTD_c_compressionLevel, + do_compression_level); + zstd_out_buff.dst = obuf + 2; + + comp_init_done = 1; + } + + if (last_token == -1) { + last_run_end = 0; + run_start = token; + flush_pending = 0; + } else if (last_token == -2) { + run_start = token; + + } else if (nb != 0 || token != last_token + 1 + || token >= run_start + 65536) { + + /* output previous run */ + r = run_start - last_run_end; + n = last_token - run_start; + + if (r >= 0 && r <= 63) { + write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r); + } else { + write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG)); + write_int(f, run_start); + } + if (n != 0) { + write_byte(f, n); + write_byte(f, n >> 8); + } + last_run_end = last_token; + run_start = token; + } + + last_token = token; + + if (nb || flush_pending) { + + zstd_in_buff.src = map_ptr(buf, offset, nb); + zstd_in_buff.size = nb; + zstd_in_buff.pos = 0; + + do { + if (zstd_out_buff.size == 0) { + zstd_out_buff.size = MAX_DATA_COUNT; + zstd_out_buff.pos = 0; + } + + /* File ended, flush */ + if (token != -2) + flush = ZSTD_e_flush; + + r = ZSTD_compressStream2(zstd_cctx, &zstd_out_buff, &zstd_in_buff, flush); + if (ZSTD_isError(r)) { + rprintf(FERROR, "ZSTD_compressStream returned %d\n", r); + exit_cleanup(RERR_STREAMIO); + } + + /* + * Nothing is sent if the buffer isn't full so avoid smaller + * transfers. If a file is finished then we flush the internal + * state and send a smaller buffer so that the remote side can + * finish the file. + */ + if (zstd_out_buff.pos == zstd_out_buff.size || flush == ZSTD_e_flush) { + n = zstd_out_buff.pos; + + obuf[0] = DEFLATED_DATA + (n >> 8); + obuf[1] = n; + write_buf(f, obuf, n+2); + + zstd_out_buff.size = 0; + } + /* + * Loop while the input buffer isn't full consumed or the + * internal state isn't fully flushed. + */ + } while (zstd_in_buff.pos < zstd_in_buff.size || r > 0); + flush_pending = token == -2; + } + + if (token == -1) { + /* end of file - clean up */ + write_byte(f, END_FLAG); + } +} + +static ZSTD_DCtx *zstd_dctx; + +static int32 recv_zstd_token(int f, char **data) +{ + static int decomp_init_done; + static int out_buffer_size; + int32 n, flag; + int r; + + if (!decomp_init_done) { + + zstd_dctx = ZSTD_createDCtx(); + if (!zstd_dctx) { + rprintf(FERROR, "ZSTD_createDStream failed\n"); + exit_cleanup(RERR_PROTOCOL); + } + + /* Output buffer fits two decompressed blocks */ + out_buffer_size = ZSTD_DStreamOutSize() * 2; + cbuf = new_array(char, MAX_DATA_COUNT); + dbuf = new_array(char, out_buffer_size); + if (!cbuf || !dbuf) + out_of_memory("recv_zstd_token"); + + zstd_in_buff.src = cbuf; + zstd_out_buff.dst = dbuf; + + decomp_init_done = 1; + } + + do { + switch (recv_state) { + case r_init: + recv_state = r_idle; + rx_token = 0; + break; + + case r_idle: + flag = read_byte(f); + if ((flag & 0xC0) == DEFLATED_DATA) { + n = ((flag & 0x3f) << 8) + read_byte(f); + read_buf(f, cbuf, n); + + zstd_in_buff.size = n; + zstd_in_buff.pos = 0; + + recv_state = r_inflating; + + } else if (flag == END_FLAG) { + /* that's all folks */ + recv_state = r_init; + return 0; + + } else { + /* here we have a token of some kind */ + if (flag & TOKEN_REL) { + rx_token += flag & 0x3f; + flag >>= 6; + } else + rx_token = read_int(f); + if (flag & 1) { + rx_run = read_byte(f); + rx_run += read_byte(f) << 8; + recv_state = r_running; + } + return -1 - rx_token; + } + break; + + case r_inflating: + zstd_out_buff.size = out_buffer_size; + zstd_out_buff.pos = 0; + + r = ZSTD_decompressStream(zstd_dctx, &zstd_out_buff, &zstd_in_buff); + n = zstd_out_buff.pos; + if (ZSTD_isError(r)) { + rprintf(FERROR, "ZSTD decomp returned %d (%d bytes)\n", r, n); + exit_cleanup(RERR_STREAMIO); + } + + /* + * If the input buffer is fully consumed and the output + * buffer is not full then next step is to read more + * data. + */ + if (zstd_in_buff.size == zstd_in_buff.pos && n < out_buffer_size) + recv_state = r_idle; + + if (n != 0) { + *data = dbuf; + return n; + } + break; + + case r_running: + ++rx_token; + if (--rx_run == 0) + recv_state = r_idle; + return -1 - rx_token; + break; + + case r_inflated: + break; + } + } while (1); +} +#endif + /** * Transmit a verbatim buffer of length @p n followed by a token. * If token == -1 then we have reached EOF @@ -658,6 +891,10 @@ void send_token(int f, int32 token, struct map_struct *buf, OFF_T offset, { if (!do_compression) simple_send_token(f, token, buf, offset, n); +#ifdef SUPPORT_ZSTD + else if (do_compression == CPRES_ZSTD) + send_zstd_token(f, token, buf, offset, n); +#endif else send_deflated_token(f, token, buf, offset, n, toklen); } @@ -674,6 +911,10 @@ int32 recv_token(int f, char **data) if (!do_compression) tok = simple_recv_token(f,data); +#ifdef SUPPORT_ZSTD + else if (do_compression == CPRES_ZSTD) + tok = recv_zstd_token(f, data); +#endif else /* CPRES_ZLIB & CPRES_ZLIBX */ tok = recv_deflated_token(f, data); return tok; |