Kelvin Zhang | 35cff4f | 2021-12-08 16:06:00 -0800 | [diff] [blame^] | 1 | # import mmap |
| 2 | |
| 3 | import struct |
| 4 | |
| 5 | LZ4_FRAME_MAGIC = b"\x04\x22\x4D\x18" |
| 6 | |
| 7 | |
| 8 | def scan_legacy_lz4_frames(data): |
| 9 | LZ4_LEGACY_FRAME_MAGIC = b"\x02\x21\x4C\x18" |
| 10 | index = 0 |
| 11 | while index < len(data): |
| 12 | try: |
| 13 | index = data.index(LZ4_LEGACY_FRAME_MAGIC, index) |
| 14 | print("Legacy Lz4 frame at {}".format(index)) |
| 15 | index += 4 |
| 16 | while index < len(data): |
| 17 | magic = data[index:index+4] |
| 18 | if magic == LZ4_LEGACY_FRAME_MAGIC or magic == LZ4_FRAME_MAGIC: |
| 19 | break |
| 20 | (csize,) = struct.unpack("<L", magic) |
| 21 | if index + 4 + csize >= len(data) or csize == 0: |
| 22 | break |
| 23 | print("Legacy lz4 block at {}, compressed data size {}".format(index, csize)) |
| 24 | index += csize |
| 25 | |
| 26 | except ValueError: |
| 27 | break |
| 28 | |
| 29 | |
| 30 | def scan_lz4_frames(data): |
| 31 | index = 0 |
| 32 | while index < len(data): |
| 33 | try: |
| 34 | index = data.index(LZ4_FRAME_MAGIC, index) |
| 35 | frame_offset = index |
| 36 | index += 4 |
| 37 | flag = data[index] |
| 38 | block_descriptor = data[index+1] |
| 39 | block_checksum_present = flag & 0x10 != 0 |
| 40 | content_size_present = flag & 0x8 != 0 |
| 41 | content_checksum_present = flag & 0x4 != 0 |
| 42 | dictionary_id = flag & 0x1 != 0 |
| 43 | index += 2 |
| 44 | content_size = None |
| 45 | if content_size_present: |
| 46 | content_size = struct.unpack("<Q", data[index:index+8]) |
| 47 | index += 8 |
| 48 | if dictionary_id: |
| 49 | dictionary_id = struct.unpack("<L", data[index:index+4]) |
| 50 | index += 4 |
| 51 | header_checksum = data[index:index+1] |
| 52 | index += 1 |
| 53 | print("Lz4 frame at {}, content size: {}".format( |
| 54 | frame_offset, content_size)) |
| 55 | while index < len(data): |
| 56 | (block_size,) = struct.unpack("<L", data[index:index+4]) |
| 57 | uncompressed = block_size & 0x80000000 != 0 |
| 58 | block_size &= 0x7FFFFFFF |
| 59 | index += 4 |
| 60 | index += block_size |
| 61 | if index >= len(data) or block_size == 0: |
| 62 | break |
| 63 | print("Block uncompressed: {}, size: {}".format(uncompressed, block_size)) |
| 64 | except ValueError: |
| 65 | break |
| 66 | |
| 67 | |
| 68 | def main(argv): |
| 69 | if len(argv) != 2: |
| 70 | print("Usage:", argv[0], "<path to a file>") |
| 71 | return 1 |
| 72 | path = argv[1] |
| 73 | |
| 74 | with open(path, "rb") as fp: |
| 75 | data = fp.read() |
| 76 | scan_legacy_lz4_frames(data) |
| 77 | scan_lz4_frames(data) |
| 78 | |
| 79 | |
| 80 | if __name__ == '__main__': |
| 81 | import sys |
| 82 | sys.exit(main(sys.argv)) |