adb: implement zstd compression for file sync.
Bug: http://b/150827486
Test: test_device.py
Change-Id: I9fac4c760d9dbdce0b3b883db975cfa9b27a9e80
diff --git a/adb/compression_utils.h b/adb/compression_utils.h
index a0c48a2..a747108 100644
--- a/adb/compression_utils.h
+++ b/adb/compression_utils.h
@@ -25,6 +25,7 @@
#include <brotli/decode.h>
#include <brotli/encode.h>
#include <lz4frame.h>
+#include <zstd.h>
#include "types.h"
@@ -381,3 +382,105 @@
std::unique_ptr<LZ4F_cctx, LZ4F_errorCode_t (*)(LZ4F_cctx*)> encoder_;
IOVector output_buffer_;
};
+
+struct ZstdDecoder final : public Decoder {
+ explicit ZstdDecoder(std::span<char> output_buffer)
+ : Decoder(output_buffer), decoder_(ZSTD_createDStream(), ZSTD_freeDStream) {
+ if (!decoder_) {
+ LOG(FATAL) << "failed to initialize Zstd decompression context";
+ }
+ }
+
+ DecodeResult Decode(std::span<char>* output) final {
+ ZSTD_inBuffer in;
+ in.src = input_buffer_.front_data();
+ in.size = input_buffer_.front_size();
+ in.pos = 0;
+
+ ZSTD_outBuffer out;
+ out.dst = output_buffer_.data();
+ // The standard specifies size() as returning size_t, but our current version of
+ // libc++ returns a signed value instead.
+ out.size = static_cast<size_t>(output_buffer_.size());
+ out.pos = 0;
+
+ size_t rc = ZSTD_decompressStream(decoder_.get(), &out, &in);
+ if (ZSTD_isError(rc)) {
+ LOG(ERROR) << "ZSTD_decompressStream failed: " << ZSTD_getErrorName(rc);
+ return DecodeResult::Error;
+ }
+
+ input_buffer_.drop_front(in.pos);
+ if (rc == 0) {
+ if (!input_buffer_.empty()) {
+ LOG(ERROR) << "Zstd stream hit end before reading all data";
+ return DecodeResult::Error;
+ }
+ zstd_done_ = true;
+ }
+
+ *output = std::span<char>(output_buffer_.data(), out.pos);
+
+ if (finished_) {
+ return input_buffer_.empty() && zstd_done_ ? DecodeResult::Done
+ : DecodeResult::MoreOutput;
+ }
+ return DecodeResult::NeedInput;
+ }
+
+ private:
+ bool zstd_done_ = false;
+ std::unique_ptr<ZSTD_DStream, size_t (*)(ZSTD_DStream*)> decoder_;
+};
+
+struct ZstdEncoder final : public Encoder {
+ explicit ZstdEncoder(size_t output_block_size)
+ : Encoder(output_block_size), encoder_(ZSTD_createCStream(), ZSTD_freeCStream) {
+ if (!encoder_) {
+ LOG(FATAL) << "failed to initialize Zstd compression context";
+ }
+ ZSTD_CCtx_setParameter(encoder_.get(), ZSTD_c_compressionLevel, 1);
+ }
+
+ EncodeResult Encode(Block* output) final {
+ ZSTD_inBuffer in;
+ in.src = input_buffer_.front_data();
+ in.size = input_buffer_.front_size();
+ in.pos = 0;
+
+ output->resize(output_block_size_);
+
+ ZSTD_outBuffer out;
+ out.dst = output->data();
+ out.size = static_cast<size_t>(output->size());
+ out.pos = 0;
+
+ ZSTD_EndDirective end_directive = finished_ ? ZSTD_e_end : ZSTD_e_continue;
+ size_t rc = ZSTD_compressStream2(encoder_.get(), &out, &in, end_directive);
+ if (ZSTD_isError(rc)) {
+ LOG(ERROR) << "ZSTD_compressStream2 failed: " << ZSTD_getErrorName(rc);
+ return EncodeResult::Error;
+ }
+
+ input_buffer_.drop_front(in.pos);
+ output->resize(out.pos);
+
+ if (rc == 0) {
+ // Zstd finished flushing its data.
+ if (finished_) {
+ if (!input_buffer_.empty()) {
+ LOG(ERROR) << "ZSTD_compressStream2 finished early";
+ return EncodeResult::Error;
+ }
+ return EncodeResult::Done;
+ } else {
+ return input_buffer_.empty() ? EncodeResult::NeedInput : EncodeResult::MoreOutput;
+ }
+ } else {
+ return EncodeResult::MoreOutput;
+ }
+ }
+
+ private:
+ std::unique_ptr<ZSTD_CStream, size_t (*)(ZSTD_CStream*)> encoder_;
+};