| Kelvin Zhang | 35cff4f | 2021-12-08 16:06:00 -0800 | [diff] [blame] | 1 | # import mmap | 
|  | 2 |  | 
|  | 3 | import struct | 
|  | 4 |  | 
|  | 5 | LZ4_FRAME_MAGIC = b"\x04\x22\x4D\x18" | 
|  | 6 |  | 
|  | 7 |  | 
|  | 8 | def scan_legacy_lz4_frames(data): | 
|  | 9 | LZ4_LEGACY_FRAME_MAGIC = b"\x02\x21\x4C\x18" | 
|  | 10 | index = 0 | 
|  | 11 | while index < len(data): | 
|  | 12 | try: | 
|  | 13 | index = data.index(LZ4_LEGACY_FRAME_MAGIC, index) | 
|  | 14 | print("Legacy Lz4 frame at {}".format(index)) | 
|  | 15 | index += 4 | 
|  | 16 | while index < len(data): | 
|  | 17 | magic = data[index:index+4] | 
|  | 18 | if magic == LZ4_LEGACY_FRAME_MAGIC or magic == LZ4_FRAME_MAGIC: | 
|  | 19 | break | 
|  | 20 | (csize,) = struct.unpack("<L", magic) | 
|  | 21 | if index + 4 + csize >= len(data) or csize == 0: | 
|  | 22 | break | 
|  | 23 | print("Legacy lz4 block at {}, compressed data size {}".format(index, csize)) | 
|  | 24 | index += csize | 
|  | 25 |  | 
|  | 26 | except ValueError: | 
|  | 27 | break | 
|  | 28 |  | 
|  | 29 |  | 
|  | 30 | def scan_lz4_frames(data): | 
|  | 31 | index = 0 | 
|  | 32 | while index < len(data): | 
|  | 33 | try: | 
|  | 34 | index = data.index(LZ4_FRAME_MAGIC, index) | 
|  | 35 | frame_offset = index | 
|  | 36 | index += 4 | 
|  | 37 | flag = data[index] | 
|  | 38 | block_descriptor = data[index+1] | 
|  | 39 | block_checksum_present = flag & 0x10 != 0 | 
|  | 40 | content_size_present = flag & 0x8 != 0 | 
|  | 41 | content_checksum_present = flag & 0x4 != 0 | 
|  | 42 | dictionary_id = flag & 0x1 != 0 | 
|  | 43 | index += 2 | 
|  | 44 | content_size = None | 
|  | 45 | if content_size_present: | 
|  | 46 | content_size = struct.unpack("<Q", data[index:index+8]) | 
|  | 47 | index += 8 | 
|  | 48 | if dictionary_id: | 
|  | 49 | dictionary_id = struct.unpack("<L", data[index:index+4]) | 
|  | 50 | index += 4 | 
|  | 51 | header_checksum = data[index:index+1] | 
|  | 52 | index += 1 | 
|  | 53 | print("Lz4 frame at {}, content size: {}".format( | 
|  | 54 | frame_offset, content_size)) | 
|  | 55 | while index < len(data): | 
|  | 56 | (block_size,) = struct.unpack("<L", data[index:index+4]) | 
|  | 57 | uncompressed = block_size & 0x80000000 != 0 | 
|  | 58 | block_size &= 0x7FFFFFFF | 
|  | 59 | index += 4 | 
|  | 60 | index += block_size | 
|  | 61 | if index >= len(data) or block_size == 0: | 
|  | 62 | break | 
|  | 63 | print("Block uncompressed: {}, size: {}".format(uncompressed, block_size)) | 
|  | 64 | except ValueError: | 
|  | 65 | break | 
|  | 66 |  | 
|  | 67 |  | 
|  | 68 | def main(argv): | 
|  | 69 | if len(argv) != 2: | 
|  | 70 | print("Usage:", argv[0], "<path to a file>") | 
|  | 71 | return 1 | 
|  | 72 | path = argv[1] | 
|  | 73 |  | 
|  | 74 | with open(path, "rb") as fp: | 
|  | 75 | data = fp.read() | 
|  | 76 | scan_legacy_lz4_frames(data) | 
|  | 77 | scan_lz4_frames(data) | 
|  | 78 |  | 
|  | 79 |  | 
|  | 80 | if __name__ == '__main__': | 
|  | 81 | import sys | 
|  | 82 | sys.exit(main(sys.argv)) |