adb: implement LZ4 compression.

Add support for LZ4 compression, which compresses and decompresses far
more quickly than brotli, at the cost of worse compression ratio.

`adb sync -d system` speeds (in MB/s) on aosp_blueline-eng:

           none    brotli    lz4
USB 3.0     120       110    190
USB 2.0      38        75     63

Bug: https://issuetracker.google.com/150827486
Test: python3 -m unittest test_device.FileOperationsTest{Uncompressed,Brotli,LZ4}
Change-Id: Ibef6ac15a76b4e5dcd02d7fb9433cbb1c02b8382
diff --git a/adb/Android.bp b/adb/Android.bp
index 12d9a14..6386a78 100644
--- a/adb/Android.bp
+++ b/adb/Android.bp
@@ -470,6 +470,7 @@
         "libadbd_core",
         "libbrotli",
         "libdiagnose_usb",
+        "liblz4",
     ],
 
     shared_libs: [
@@ -571,6 +572,7 @@
         "libbrotli",
         "libcutils_sockets",
         "libdiagnose_usb",
+        "liblz4",
         "libmdnssd",
     ],
 
@@ -605,6 +607,7 @@
         "libadbd_services",
         "libasyncio",
         "libcap",
+        "liblz4",
         "libminijail",
         "libssl",
     ],
diff --git a/adb/client/commandline.cpp b/adb/client/commandline.cpp
index 9078ae9..02f6e9c 100644
--- a/adb/client/commandline.cpp
+++ b/adb/client/commandline.cpp
@@ -1331,6 +1331,8 @@
 
     if (str == "brotli") {
         return CompressionType::Brotli;
+    } else if (str == "lz4") {
+        return CompressionType::LZ4;
     }
 
     error_exit("unexpected compression type %s", str.c_str());
diff --git a/adb/client/file_sync_client.cpp b/adb/client/file_sync_client.cpp
index c71880c..75334d7 100644
--- a/adb/client/file_sync_client.cpp
+++ b/adb/client/file_sync_client.cpp
@@ -237,6 +237,7 @@
             have_ls_v2_ = CanUseFeature(features_, kFeatureLs2);
             have_sendrecv_v2_ = CanUseFeature(features_, kFeatureSendRecv2);
             have_sendrecv_v2_brotli_ = CanUseFeature(features_, kFeatureSendRecv2Brotli);
+            have_sendrecv_v2_lz4_ = CanUseFeature(features_, kFeatureSendRecv2LZ4);
             fd.reset(adb_connect("sync:", &error));
             if (fd < 0) {
                 Error("connect failed: %s", error.c_str());
@@ -262,12 +263,15 @@
 
     bool HaveSendRecv2() const { return have_sendrecv_v2_; }
     bool HaveSendRecv2Brotli() const { return have_sendrecv_v2_brotli_; }
+    bool HaveSendRecv2LZ4() const { return have_sendrecv_v2_lz4_; }
 
     // Resolve a compression type which might be CompressionType::Any to a specific compression
     // algorithm.
     CompressionType ResolveCompressionType(CompressionType compression) const {
         if (compression == CompressionType::Any) {
-            if (HaveSendRecv2Brotli()) {
+            if (HaveSendRecv2LZ4()) {
+                return CompressionType::LZ4;
+            } else if (HaveSendRecv2Brotli()) {
                 return CompressionType::Brotli;
             }
             return CompressionType::None;
@@ -361,6 +365,10 @@
                 msg.send_v2_setup.flags = kSyncFlagBrotli;
                 break;
 
+            case CompressionType::LZ4:
+                msg.send_v2_setup.flags = kSyncFlagLZ4;
+                break;
+
             case CompressionType::Any:
                 LOG(FATAL) << "unexpected CompressionType::Any";
         }
@@ -400,6 +408,10 @@
                 msg.recv_v2_setup.flags |= kSyncFlagBrotli;
                 break;
 
+            case CompressionType::LZ4:
+                msg.recv_v2_setup.flags |= kSyncFlagLZ4;
+                break;
+
             case CompressionType::Any:
                 LOG(FATAL) << "unexpected CompressionType::Any";
         }
@@ -599,7 +611,7 @@
         syncsendbuf sbuf;
         sbuf.id = ID_DATA;
 
-        std::variant<std::monostate, NullEncoder, BrotliEncoder> encoder_storage;
+        std::variant<std::monostate, NullEncoder, BrotliEncoder, LZ4Encoder> encoder_storage;
         Encoder* encoder = nullptr;
         switch (compression) {
             case CompressionType::None:
@@ -610,6 +622,10 @@
                 encoder = &encoder_storage.emplace<BrotliEncoder>(SYNC_DATA_MAX);
                 break;
 
+            case CompressionType::LZ4:
+                encoder = &encoder_storage.emplace<LZ4Encoder>(SYNC_DATA_MAX);
+                break;
+
             case CompressionType::Any:
                 LOG(FATAL) << "unexpected CompressionType::Any";
         }
@@ -891,6 +907,7 @@
     bool have_ls_v2_;
     bool have_sendrecv_v2_;
     bool have_sendrecv_v2_brotli_;
+    bool have_sendrecv_v2_lz4_;
 
     TransferLedger global_ledger_;
     TransferLedger current_ledger_;
@@ -1093,7 +1110,7 @@
     uint64_t bytes_copied = 0;
 
     Block buffer(SYNC_DATA_MAX);
-    std::variant<std::monostate, NullDecoder, BrotliDecoder> decoder_storage;
+    std::variant<std::monostate, NullDecoder, BrotliDecoder, LZ4Decoder> decoder_storage;
     Decoder* decoder = nullptr;
 
     std::span buffer_span(buffer.data(), buffer.size());
@@ -1106,6 +1123,10 @@
             decoder = &decoder_storage.emplace<BrotliDecoder>(buffer_span);
             break;
 
+        case CompressionType::LZ4:
+            decoder = &decoder_storage.emplace<LZ4Decoder>(buffer_span);
+            break;
+
         case CompressionType::Any:
             LOG(FATAL) << "unexpected CompressionType::Any";
     }
@@ -1160,8 +1181,7 @@
             }
 
             bytes_copied += output.size();
-
-            sc.RecordBytesTransferred(msg.data.size);
+            sc.RecordBytesTransferred(output.size());
             sc.ReportProgress(name != nullptr ? name : rpath, bytes_copied, expected_size);
 
             if (result == DecodeResult::NeedInput) {
diff --git a/adb/compression_utils.h b/adb/compression_utils.h
index f349697..a0c48a2 100644
--- a/adb/compression_utils.h
+++ b/adb/compression_utils.h
@@ -24,6 +24,7 @@
 
 #include <brotli/decode.h>
 #include <brotli/encode.h>
+#include <lz4frame.h>
 
 #include "types.h"
 
@@ -229,3 +230,154 @@
     size_t output_bytes_left_;
     std::unique_ptr<BrotliEncoderState, void (*)(BrotliEncoderState*)> encoder_;
 };
+
+struct LZ4Decoder final : public Decoder {
+    explicit LZ4Decoder(std::span<char> output_buffer)
+        : Decoder(output_buffer), decoder_(nullptr, nullptr) {
+        LZ4F_dctx* dctx;
+        if (LZ4F_createDecompressionContext(&dctx, LZ4F_VERSION) != 0) {
+            LOG(FATAL) << "failed to initialize LZ4 decompression context";
+        }
+        decoder_ = std::unique_ptr<LZ4F_dctx, decltype(&LZ4F_freeDecompressionContext)>(
+                dctx, LZ4F_freeDecompressionContext);
+    }
+
+    DecodeResult Decode(std::span<char>* output) final {
+        size_t available_in = input_buffer_.front_size();
+        const char* next_in = input_buffer_.front_data();
+
+        size_t available_out = output_buffer_.size();
+        char* next_out = output_buffer_.data();
+
+        size_t rc = LZ4F_decompress(decoder_.get(), next_out, &available_out, next_in,
+                                    &available_in, nullptr);
+        if (LZ4F_isError(rc)) {
+            LOG(ERROR) << "LZ4F_decompress failed: " << LZ4F_getErrorName(rc);
+            return DecodeResult::Error;
+        }
+
+        input_buffer_.drop_front(available_in);
+
+        if (rc == 0) {
+            if (!input_buffer_.empty()) {
+                LOG(ERROR) << "LZ4 stream hit end before reading all data";
+                return DecodeResult::Error;
+            }
+            lz4_done_ = true;
+        }
+
+        *output = std::span<char>(output_buffer_.data(), available_out);
+
+        if (finished_) {
+            return input_buffer_.empty() && lz4_done_ ? DecodeResult::Done
+                                                      : DecodeResult::MoreOutput;
+        }
+
+        return DecodeResult::NeedInput;
+    }
+
+  private:
+    bool lz4_done_ = false;
+    std::unique_ptr<LZ4F_dctx, LZ4F_errorCode_t (*)(LZ4F_dctx*)> decoder_;
+};
+
+struct LZ4Encoder final : public Encoder {
+    explicit LZ4Encoder(size_t output_block_size)
+        : Encoder(output_block_size), encoder_(nullptr, nullptr) {
+        LZ4F_cctx* cctx;
+        if (LZ4F_createCompressionContext(&cctx, LZ4F_VERSION) != 0) {
+            LOG(FATAL) << "failed to initialize LZ4 compression context";
+        }
+        encoder_ = std::unique_ptr<LZ4F_cctx, decltype(&LZ4F_freeCompressionContext)>(
+                cctx, LZ4F_freeCompressionContext);
+        Block header(LZ4F_HEADER_SIZE_MAX);
+        size_t rc = LZ4F_compressBegin(encoder_.get(), header.data(), header.size(), nullptr);
+        if (LZ4F_isError(rc)) {
+            LOG(FATAL) << "LZ4F_compressBegin failed: %s", LZ4F_getErrorName(rc);
+        }
+        header.resize(rc);
+        output_buffer_.append(std::move(header));
+    }
+
+    // As an optimization, only emit a block if we have an entire output block ready, or we're done.
+    bool OutputReady() const {
+        return output_buffer_.size() >= output_block_size_ || lz4_finalized_;
+    }
+
+    // TODO: Switch the output type to IOVector to remove a copy?
+    EncodeResult Encode(Block* output) final {
+        size_t available_in = input_buffer_.front_size();
+        const char* next_in = input_buffer_.front_data();
+
+        // LZ4 makes no guarantees about being able to recover from trying to compress with an
+        // insufficiently large output buffer. LZ4F_compressBound tells us how much buffer we
+        // need to compress a given number of bytes, but the smallest value seems to be bigger
+        // than SYNC_DATA_MAX, so we need to buffer ourselves.
+
+        // Input size chosen to be a local maximum for LZ4F_compressBound (i.e. the block size).
+        constexpr size_t max_input_size = 65536;
+        const size_t encode_block_size = LZ4F_compressBound(max_input_size, nullptr);
+
+        if (available_in != 0) {
+            if (lz4_finalized_) {
+                LOG(ERROR) << "LZ4Encoder received data after Finish?";
+                return EncodeResult::Error;
+            }
+
+            available_in = std::min(available_in, max_input_size);
+
+            Block encode_block(encode_block_size);
+            size_t available_out = encode_block.capacity();
+            char* next_out = encode_block.data();
+
+            size_t rc = LZ4F_compressUpdate(encoder_.get(), next_out, available_out, next_in,
+                                            available_in, nullptr);
+            if (LZ4F_isError(rc)) {
+                LOG(ERROR) << "LZ4F_compressUpdate failed: " << LZ4F_getErrorName(rc);
+                return EncodeResult::Error;
+            }
+
+            input_buffer_.drop_front(available_in);
+
+            available_out -= rc;
+            next_out += rc;
+
+            encode_block.resize(encode_block_size - available_out);
+            output_buffer_.append(std::move(encode_block));
+        }
+
+        if (finished_ && !lz4_finalized_) {
+            lz4_finalized_ = true;
+
+            Block final_block(encode_block_size + 4);
+            size_t rc = LZ4F_compressEnd(encoder_.get(), final_block.data(), final_block.size(),
+                                         nullptr);
+            if (LZ4F_isError(rc)) {
+                LOG(ERROR) << "LZ4F_compressEnd failed: " << LZ4F_getErrorName(rc);
+                return EncodeResult::Error;
+            }
+
+            final_block.resize(rc);
+            output_buffer_.append(std::move(final_block));
+        }
+
+        if (OutputReady()) {
+            size_t len = std::min(output_block_size_, output_buffer_.size());
+            *output = output_buffer_.take_front(len).coalesce();
+        } else {
+            output->clear();
+        }
+
+        if (lz4_finalized_ && output_buffer_.empty()) {
+            return EncodeResult::Done;
+        } else if (OutputReady()) {
+            return EncodeResult::MoreOutput;
+        }
+        return EncodeResult::NeedInput;
+    }
+
+  private:
+    bool lz4_finalized_ = false;
+    std::unique_ptr<LZ4F_cctx, LZ4F_errorCode_t (*)(LZ4F_cctx*)> encoder_;
+    IOVector output_buffer_;
+};
diff --git a/adb/daemon/file_sync_service.cpp b/adb/daemon/file_sync_service.cpp
index 3138ab4..3436e32 100644
--- a/adb/daemon/file_sync_service.cpp
+++ b/adb/daemon/file_sync_service.cpp
@@ -272,7 +272,7 @@
     syncmsg msg;
     Block buffer(SYNC_DATA_MAX);
     std::span<char> buffer_span(buffer.data(), buffer.size());
-    std::variant<std::monostate, NullDecoder, BrotliDecoder> decoder_storage;
+    std::variant<std::monostate, NullDecoder, BrotliDecoder, LZ4Decoder> decoder_storage;
     Decoder* decoder = nullptr;
 
     switch (compression) {
@@ -284,6 +284,10 @@
             decoder = &decoder_storage.emplace<BrotliDecoder>(buffer_span);
             break;
 
+        case CompressionType::LZ4:
+            decoder = &decoder_storage.emplace<LZ4Decoder>(buffer_span);
+            break;
+
         case CompressionType::Any:
             LOG(FATAL) << "unexpected CompressionType::Any";
     }
@@ -569,6 +573,15 @@
         }
         compression = CompressionType::Brotli;
     }
+    if (msg.send_v2_setup.flags & kSyncFlagLZ4) {
+        msg.send_v2_setup.flags &= ~kSyncFlagLZ4;
+        if (compression) {
+            SendSyncFail(s, android::base::StringPrintf("multiple compression flags received: %d",
+                                                        orig_flags));
+            return false;
+        }
+        compression = CompressionType::LZ4;
+    }
 
     if (msg.send_v2_setup.flags) {
         SendSyncFail(s, android::base::StringPrintf("unknown flags: %d", msg.send_v2_setup.flags));
@@ -598,7 +611,7 @@
     syncmsg msg;
     msg.data.id = ID_DATA;
 
-    std::variant<std::monostate, NullEncoder, BrotliEncoder> encoder_storage;
+    std::variant<std::monostate, NullEncoder, BrotliEncoder, LZ4Encoder> encoder_storage;
     Encoder* encoder;
 
     switch (compression) {
@@ -610,6 +623,10 @@
             encoder = &encoder_storage.emplace<BrotliEncoder>(SYNC_DATA_MAX);
             break;
 
+        case CompressionType::LZ4:
+            encoder = &encoder_storage.emplace<LZ4Encoder>(SYNC_DATA_MAX);
+            break;
+
         case CompressionType::Any:
             LOG(FATAL) << "unexpected CompressionType::Any";
     }
@@ -688,6 +705,15 @@
         }
         compression = CompressionType::Brotli;
     }
+    if (msg.recv_v2_setup.flags & kSyncFlagLZ4) {
+        msg.recv_v2_setup.flags &= ~kSyncFlagLZ4;
+        if (compression) {
+            SendSyncFail(s, android::base::StringPrintf("multiple compression flags received: %d",
+                                                        orig_flags));
+            return false;
+        }
+        compression = CompressionType::LZ4;
+    }
 
     if (msg.recv_v2_setup.flags) {
         SendSyncFail(s, android::base::StringPrintf("unknown flags: %d", msg.recv_v2_setup.flags));
diff --git a/adb/file_sync_protocol.h b/adb/file_sync_protocol.h
index 70425f7..90bd763 100644
--- a/adb/file_sync_protocol.h
+++ b/adb/file_sync_protocol.h
@@ -92,12 +92,14 @@
 enum SyncFlag : uint32_t {
     kSyncFlagNone = 0,
     kSyncFlagBrotli = 1,
+    kSyncFlagLZ4 = 2,
 };
 
 enum class CompressionType {
     None,
     Any,
     Brotli,
+    LZ4,
 };
 
 // send_v1 sent the path in a buffer, followed by a comma and the mode as a string.
diff --git a/adb/test_device.py b/adb/test_device.py
index 496a0ff..3be7c9a 100755
--- a/adb/test_device.py
+++ b/adb/test_device.py
@@ -1302,6 +1302,10 @@
     compression = "brotli"
 
 
+class FileOperationsTestLZ4(FileOperationsTest.Base):
+    compression = "lz4"
+
+
 class DeviceOfflineTest(DeviceTest):
     def _get_device_state(self, serialno):
         output = subprocess.check_output(self.device.adb_cmd + ['devices'])
diff --git a/adb/transport.cpp b/adb/transport.cpp
index e06dbe3..cef3850 100644
--- a/adb/transport.cpp
+++ b/adb/transport.cpp
@@ -84,6 +84,7 @@
 const char* const kFeatureTrackApp = "track_app";
 const char* const kFeatureSendRecv2 = "sendrecv_v2";
 const char* const kFeatureSendRecv2Brotli = "sendrecv_v2_brotli";
+const char* const kFeatureSendRecv2LZ4 = "sendrecv_v2_lz4";
 
 namespace {
 
@@ -1183,6 +1184,7 @@
             kFeatureTrackApp,
             kFeatureSendRecv2,
             kFeatureSendRecv2Brotli,
+            kFeatureSendRecv2LZ4,
             // Increment ADB_SERVER_VERSION when adding a feature that adbd needs
             // to know about. Otherwise, the client can be stuck running an old
             // version of the server even after upgrading their copy of adb.
diff --git a/adb/transport.h b/adb/transport.h
index b1984db..12803b5 100644
--- a/adb/transport.h
+++ b/adb/transport.h
@@ -88,6 +88,8 @@
 extern const char* const kFeatureSendRecv2;
 // adbd supports brotli for send/recv v2.
 extern const char* const kFeatureSendRecv2Brotli;
+// adbd supports LZ4 for send/recv v2.
+extern const char* const kFeatureSendRecv2LZ4;
 
 TransportId NextTransportId();