adb: implement LZ4 compression.
Add support for LZ4 compression, which compresses and decompresses far
more quickly than brotli, at the cost of worse compression ratio.
`adb sync -d system` speeds (in MB/s) on aosp_blueline-eng:
none brotli lz4
USB 3.0 120 110 190
USB 2.0 38 75 63
Bug: https://issuetracker.google.com/150827486
Test: python3 -m unittest test_device.FileOperationsTest{Uncompressed,Brotli,LZ4}
Change-Id: Ibef6ac15a76b4e5dcd02d7fb9433cbb1c02b8382
diff --git a/adb/Android.bp b/adb/Android.bp
index 12d9a14..6386a78 100644
--- a/adb/Android.bp
+++ b/adb/Android.bp
@@ -470,6 +470,7 @@
"libadbd_core",
"libbrotli",
"libdiagnose_usb",
+ "liblz4",
],
shared_libs: [
@@ -571,6 +572,7 @@
"libbrotli",
"libcutils_sockets",
"libdiagnose_usb",
+ "liblz4",
"libmdnssd",
],
@@ -605,6 +607,7 @@
"libadbd_services",
"libasyncio",
"libcap",
+ "liblz4",
"libminijail",
"libssl",
],
diff --git a/adb/client/commandline.cpp b/adb/client/commandline.cpp
index 9078ae9..02f6e9c 100644
--- a/adb/client/commandline.cpp
+++ b/adb/client/commandline.cpp
@@ -1331,6 +1331,8 @@
if (str == "brotli") {
return CompressionType::Brotli;
+ } else if (str == "lz4") {
+ return CompressionType::LZ4;
}
error_exit("unexpected compression type %s", str.c_str());
diff --git a/adb/client/file_sync_client.cpp b/adb/client/file_sync_client.cpp
index c71880c..75334d7 100644
--- a/adb/client/file_sync_client.cpp
+++ b/adb/client/file_sync_client.cpp
@@ -237,6 +237,7 @@
have_ls_v2_ = CanUseFeature(features_, kFeatureLs2);
have_sendrecv_v2_ = CanUseFeature(features_, kFeatureSendRecv2);
have_sendrecv_v2_brotli_ = CanUseFeature(features_, kFeatureSendRecv2Brotli);
+ have_sendrecv_v2_lz4_ = CanUseFeature(features_, kFeatureSendRecv2LZ4);
fd.reset(adb_connect("sync:", &error));
if (fd < 0) {
Error("connect failed: %s", error.c_str());
@@ -262,12 +263,15 @@
bool HaveSendRecv2() const { return have_sendrecv_v2_; }
bool HaveSendRecv2Brotli() const { return have_sendrecv_v2_brotli_; }
+ bool HaveSendRecv2LZ4() const { return have_sendrecv_v2_lz4_; }
// Resolve a compression type which might be CompressionType::Any to a specific compression
// algorithm.
CompressionType ResolveCompressionType(CompressionType compression) const {
if (compression == CompressionType::Any) {
- if (HaveSendRecv2Brotli()) {
+ if (HaveSendRecv2LZ4()) {
+ return CompressionType::LZ4;
+ } else if (HaveSendRecv2Brotli()) {
return CompressionType::Brotli;
}
return CompressionType::None;
@@ -361,6 +365,10 @@
msg.send_v2_setup.flags = kSyncFlagBrotli;
break;
+ case CompressionType::LZ4:
+ msg.send_v2_setup.flags = kSyncFlagLZ4;
+ break;
+
case CompressionType::Any:
LOG(FATAL) << "unexpected CompressionType::Any";
}
@@ -400,6 +408,10 @@
msg.recv_v2_setup.flags |= kSyncFlagBrotli;
break;
+ case CompressionType::LZ4:
+ msg.recv_v2_setup.flags |= kSyncFlagLZ4;
+ break;
+
case CompressionType::Any:
LOG(FATAL) << "unexpected CompressionType::Any";
}
@@ -599,7 +611,7 @@
syncsendbuf sbuf;
sbuf.id = ID_DATA;
- std::variant<std::monostate, NullEncoder, BrotliEncoder> encoder_storage;
+ std::variant<std::monostate, NullEncoder, BrotliEncoder, LZ4Encoder> encoder_storage;
Encoder* encoder = nullptr;
switch (compression) {
case CompressionType::None:
@@ -610,6 +622,10 @@
encoder = &encoder_storage.emplace<BrotliEncoder>(SYNC_DATA_MAX);
break;
+ case CompressionType::LZ4:
+ encoder = &encoder_storage.emplace<LZ4Encoder>(SYNC_DATA_MAX);
+ break;
+
case CompressionType::Any:
LOG(FATAL) << "unexpected CompressionType::Any";
}
@@ -891,6 +907,7 @@
bool have_ls_v2_;
bool have_sendrecv_v2_;
bool have_sendrecv_v2_brotli_;
+ bool have_sendrecv_v2_lz4_;
TransferLedger global_ledger_;
TransferLedger current_ledger_;
@@ -1093,7 +1110,7 @@
uint64_t bytes_copied = 0;
Block buffer(SYNC_DATA_MAX);
- std::variant<std::monostate, NullDecoder, BrotliDecoder> decoder_storage;
+ std::variant<std::monostate, NullDecoder, BrotliDecoder, LZ4Decoder> decoder_storage;
Decoder* decoder = nullptr;
std::span buffer_span(buffer.data(), buffer.size());
@@ -1106,6 +1123,10 @@
decoder = &decoder_storage.emplace<BrotliDecoder>(buffer_span);
break;
+ case CompressionType::LZ4:
+ decoder = &decoder_storage.emplace<LZ4Decoder>(buffer_span);
+ break;
+
case CompressionType::Any:
LOG(FATAL) << "unexpected CompressionType::Any";
}
@@ -1160,8 +1181,7 @@
}
bytes_copied += output.size();
-
- sc.RecordBytesTransferred(msg.data.size);
+ sc.RecordBytesTransferred(output.size());
sc.ReportProgress(name != nullptr ? name : rpath, bytes_copied, expected_size);
if (result == DecodeResult::NeedInput) {
diff --git a/adb/compression_utils.h b/adb/compression_utils.h
index f349697..a0c48a2 100644
--- a/adb/compression_utils.h
+++ b/adb/compression_utils.h
@@ -24,6 +24,7 @@
#include <brotli/decode.h>
#include <brotli/encode.h>
+#include <lz4frame.h>
#include "types.h"
@@ -229,3 +230,154 @@
size_t output_bytes_left_;
std::unique_ptr<BrotliEncoderState, void (*)(BrotliEncoderState*)> encoder_;
};
+
+struct LZ4Decoder final : public Decoder {
+ explicit LZ4Decoder(std::span<char> output_buffer)
+ : Decoder(output_buffer), decoder_(nullptr, nullptr) {
+ LZ4F_dctx* dctx;
+ if (LZ4F_createDecompressionContext(&dctx, LZ4F_VERSION) != 0) {
+ LOG(FATAL) << "failed to initialize LZ4 decompression context";
+ }
+ decoder_ = std::unique_ptr<LZ4F_dctx, decltype(&LZ4F_freeDecompressionContext)>(
+ dctx, LZ4F_freeDecompressionContext);
+ }
+
+ DecodeResult Decode(std::span<char>* output) final {
+ size_t available_in = input_buffer_.front_size();
+ const char* next_in = input_buffer_.front_data();
+
+ size_t available_out = output_buffer_.size();
+ char* next_out = output_buffer_.data();
+
+ size_t rc = LZ4F_decompress(decoder_.get(), next_out, &available_out, next_in,
+ &available_in, nullptr);
+ if (LZ4F_isError(rc)) {
+ LOG(ERROR) << "LZ4F_decompress failed: " << LZ4F_getErrorName(rc);
+ return DecodeResult::Error;
+ }
+
+ input_buffer_.drop_front(available_in);
+
+ if (rc == 0) {
+ if (!input_buffer_.empty()) {
+ LOG(ERROR) << "LZ4 stream hit end before reading all data";
+ return DecodeResult::Error;
+ }
+ lz4_done_ = true;
+ }
+
+ *output = std::span<char>(output_buffer_.data(), available_out);
+
+ if (finished_) {
+ return input_buffer_.empty() && lz4_done_ ? DecodeResult::Done
+ : DecodeResult::MoreOutput;
+ }
+
+ return DecodeResult::NeedInput;
+ }
+
+ private:
+ bool lz4_done_ = false;
+ std::unique_ptr<LZ4F_dctx, LZ4F_errorCode_t (*)(LZ4F_dctx*)> decoder_;
+};
+
+struct LZ4Encoder final : public Encoder {
+ explicit LZ4Encoder(size_t output_block_size)
+ : Encoder(output_block_size), encoder_(nullptr, nullptr) {
+ LZ4F_cctx* cctx;
+ if (LZ4F_createCompressionContext(&cctx, LZ4F_VERSION) != 0) {
+ LOG(FATAL) << "failed to initialize LZ4 compression context";
+ }
+ encoder_ = std::unique_ptr<LZ4F_cctx, decltype(&LZ4F_freeCompressionContext)>(
+ cctx, LZ4F_freeCompressionContext);
+ Block header(LZ4F_HEADER_SIZE_MAX);
+ size_t rc = LZ4F_compressBegin(encoder_.get(), header.data(), header.size(), nullptr);
+ if (LZ4F_isError(rc)) {
+ LOG(FATAL) << "LZ4F_compressBegin failed: %s", LZ4F_getErrorName(rc);
+ }
+ header.resize(rc);
+ output_buffer_.append(std::move(header));
+ }
+
+ // As an optimization, only emit a block if we have an entire output block ready, or we're done.
+ bool OutputReady() const {
+ return output_buffer_.size() >= output_block_size_ || lz4_finalized_;
+ }
+
+ // TODO: Switch the output type to IOVector to remove a copy?
+ EncodeResult Encode(Block* output) final {
+ size_t available_in = input_buffer_.front_size();
+ const char* next_in = input_buffer_.front_data();
+
+ // LZ4 makes no guarantees about being able to recover from trying to compress with an
+ // insufficiently large output buffer. LZ4F_compressBound tells us how much buffer we
+ // need to compress a given number of bytes, but the smallest value seems to be bigger
+ // than SYNC_DATA_MAX, so we need to buffer ourselves.
+
+ // Input size chosen to be a local maximum for LZ4F_compressBound (i.e. the block size).
+ constexpr size_t max_input_size = 65536;
+ const size_t encode_block_size = LZ4F_compressBound(max_input_size, nullptr);
+
+ if (available_in != 0) {
+ if (lz4_finalized_) {
+ LOG(ERROR) << "LZ4Encoder received data after Finish?";
+ return EncodeResult::Error;
+ }
+
+ available_in = std::min(available_in, max_input_size);
+
+ Block encode_block(encode_block_size);
+ size_t available_out = encode_block.capacity();
+ char* next_out = encode_block.data();
+
+ size_t rc = LZ4F_compressUpdate(encoder_.get(), next_out, available_out, next_in,
+ available_in, nullptr);
+ if (LZ4F_isError(rc)) {
+ LOG(ERROR) << "LZ4F_compressUpdate failed: " << LZ4F_getErrorName(rc);
+ return EncodeResult::Error;
+ }
+
+ input_buffer_.drop_front(available_in);
+
+ available_out -= rc;
+ next_out += rc;
+
+ encode_block.resize(encode_block_size - available_out);
+ output_buffer_.append(std::move(encode_block));
+ }
+
+ if (finished_ && !lz4_finalized_) {
+ lz4_finalized_ = true;
+
+ Block final_block(encode_block_size + 4);
+ size_t rc = LZ4F_compressEnd(encoder_.get(), final_block.data(), final_block.size(),
+ nullptr);
+ if (LZ4F_isError(rc)) {
+ LOG(ERROR) << "LZ4F_compressEnd failed: " << LZ4F_getErrorName(rc);
+ return EncodeResult::Error;
+ }
+
+ final_block.resize(rc);
+ output_buffer_.append(std::move(final_block));
+ }
+
+ if (OutputReady()) {
+ size_t len = std::min(output_block_size_, output_buffer_.size());
+ *output = output_buffer_.take_front(len).coalesce();
+ } else {
+ output->clear();
+ }
+
+ if (lz4_finalized_ && output_buffer_.empty()) {
+ return EncodeResult::Done;
+ } else if (OutputReady()) {
+ return EncodeResult::MoreOutput;
+ }
+ return EncodeResult::NeedInput;
+ }
+
+ private:
+ bool lz4_finalized_ = false;
+ std::unique_ptr<LZ4F_cctx, LZ4F_errorCode_t (*)(LZ4F_cctx*)> encoder_;
+ IOVector output_buffer_;
+};
diff --git a/adb/daemon/file_sync_service.cpp b/adb/daemon/file_sync_service.cpp
index 3138ab4..3436e32 100644
--- a/adb/daemon/file_sync_service.cpp
+++ b/adb/daemon/file_sync_service.cpp
@@ -272,7 +272,7 @@
syncmsg msg;
Block buffer(SYNC_DATA_MAX);
std::span<char> buffer_span(buffer.data(), buffer.size());
- std::variant<std::monostate, NullDecoder, BrotliDecoder> decoder_storage;
+ std::variant<std::monostate, NullDecoder, BrotliDecoder, LZ4Decoder> decoder_storage;
Decoder* decoder = nullptr;
switch (compression) {
@@ -284,6 +284,10 @@
decoder = &decoder_storage.emplace<BrotliDecoder>(buffer_span);
break;
+ case CompressionType::LZ4:
+ decoder = &decoder_storage.emplace<LZ4Decoder>(buffer_span);
+ break;
+
case CompressionType::Any:
LOG(FATAL) << "unexpected CompressionType::Any";
}
@@ -569,6 +573,15 @@
}
compression = CompressionType::Brotli;
}
+ if (msg.send_v2_setup.flags & kSyncFlagLZ4) {
+ msg.send_v2_setup.flags &= ~kSyncFlagLZ4;
+ if (compression) {
+ SendSyncFail(s, android::base::StringPrintf("multiple compression flags received: %d",
+ orig_flags));
+ return false;
+ }
+ compression = CompressionType::LZ4;
+ }
if (msg.send_v2_setup.flags) {
SendSyncFail(s, android::base::StringPrintf("unknown flags: %d", msg.send_v2_setup.flags));
@@ -598,7 +611,7 @@
syncmsg msg;
msg.data.id = ID_DATA;
- std::variant<std::monostate, NullEncoder, BrotliEncoder> encoder_storage;
+ std::variant<std::monostate, NullEncoder, BrotliEncoder, LZ4Encoder> encoder_storage;
Encoder* encoder;
switch (compression) {
@@ -610,6 +623,10 @@
encoder = &encoder_storage.emplace<BrotliEncoder>(SYNC_DATA_MAX);
break;
+ case CompressionType::LZ4:
+ encoder = &encoder_storage.emplace<LZ4Encoder>(SYNC_DATA_MAX);
+ break;
+
case CompressionType::Any:
LOG(FATAL) << "unexpected CompressionType::Any";
}
@@ -688,6 +705,15 @@
}
compression = CompressionType::Brotli;
}
+ if (msg.recv_v2_setup.flags & kSyncFlagLZ4) {
+ msg.recv_v2_setup.flags &= ~kSyncFlagLZ4;
+ if (compression) {
+ SendSyncFail(s, android::base::StringPrintf("multiple compression flags received: %d",
+ orig_flags));
+ return false;
+ }
+ compression = CompressionType::LZ4;
+ }
if (msg.recv_v2_setup.flags) {
SendSyncFail(s, android::base::StringPrintf("unknown flags: %d", msg.recv_v2_setup.flags));
diff --git a/adb/file_sync_protocol.h b/adb/file_sync_protocol.h
index 70425f7..90bd763 100644
--- a/adb/file_sync_protocol.h
+++ b/adb/file_sync_protocol.h
@@ -92,12 +92,14 @@
enum SyncFlag : uint32_t {
kSyncFlagNone = 0,
kSyncFlagBrotli = 1,
+ kSyncFlagLZ4 = 2,
};
enum class CompressionType {
None,
Any,
Brotli,
+ LZ4,
};
// send_v1 sent the path in a buffer, followed by a comma and the mode as a string.
diff --git a/adb/test_device.py b/adb/test_device.py
index 496a0ff..3be7c9a 100755
--- a/adb/test_device.py
+++ b/adb/test_device.py
@@ -1302,6 +1302,10 @@
compression = "brotli"
+class FileOperationsTestLZ4(FileOperationsTest.Base):
+ compression = "lz4"
+
+
class DeviceOfflineTest(DeviceTest):
def _get_device_state(self, serialno):
output = subprocess.check_output(self.device.adb_cmd + ['devices'])
diff --git a/adb/transport.cpp b/adb/transport.cpp
index e06dbe3..cef3850 100644
--- a/adb/transport.cpp
+++ b/adb/transport.cpp
@@ -84,6 +84,7 @@
const char* const kFeatureTrackApp = "track_app";
const char* const kFeatureSendRecv2 = "sendrecv_v2";
const char* const kFeatureSendRecv2Brotli = "sendrecv_v2_brotli";
+const char* const kFeatureSendRecv2LZ4 = "sendrecv_v2_lz4";
namespace {
@@ -1183,6 +1184,7 @@
kFeatureTrackApp,
kFeatureSendRecv2,
kFeatureSendRecv2Brotli,
+ kFeatureSendRecv2LZ4,
// Increment ADB_SERVER_VERSION when adding a feature that adbd needs
// to know about. Otherwise, the client can be stuck running an old
// version of the server even after upgrading their copy of adb.
diff --git a/adb/transport.h b/adb/transport.h
index b1984db..12803b5 100644
--- a/adb/transport.h
+++ b/adb/transport.h
@@ -88,6 +88,8 @@
extern const char* const kFeatureSendRecv2;
// adbd supports brotli for send/recv v2.
extern const char* const kFeatureSendRecv2Brotli;
+// adbd supports LZ4 for send/recv v2.
+extern const char* const kFeatureSendRecv2LZ4;
TransportId NextTransportId();