| Kelvin Zhang | 35cff4f | 2021-12-08 16:06:00 -0800 | [diff] [blame] | 1 | # import mmap | 
 | 2 |  | 
 | 3 | import struct | 
 | 4 |  | 
 | 5 | LZ4_FRAME_MAGIC = b"\x04\x22\x4D\x18" | 
 | 6 |  | 
 | 7 |  | 
 | 8 | def scan_legacy_lz4_frames(data): | 
 | 9 |   LZ4_LEGACY_FRAME_MAGIC = b"\x02\x21\x4C\x18" | 
 | 10 |   index = 0 | 
 | 11 |   while index < len(data): | 
 | 12 |     try: | 
 | 13 |       index = data.index(LZ4_LEGACY_FRAME_MAGIC, index) | 
 | 14 |       print("Legacy Lz4 frame at {}".format(index)) | 
 | 15 |       index += 4 | 
 | 16 |       while index < len(data): | 
 | 17 |         magic = data[index:index+4] | 
 | 18 |         if magic == LZ4_LEGACY_FRAME_MAGIC or magic == LZ4_FRAME_MAGIC: | 
 | 19 |           break | 
 | 20 |         (csize,) = struct.unpack("<L", magic) | 
 | 21 |         if index + 4 + csize >= len(data) or csize == 0: | 
 | 22 |           break | 
 | 23 |         print("Legacy lz4 block at {}, compressed data size {}".format(index, csize)) | 
 | 24 |         index += csize | 
 | 25 |  | 
 | 26 |     except ValueError: | 
 | 27 |       break | 
 | 28 |  | 
 | 29 |  | 
 | 30 | def scan_lz4_frames(data): | 
 | 31 |   index = 0 | 
 | 32 |   while index < len(data): | 
 | 33 |     try: | 
 | 34 |       index = data.index(LZ4_FRAME_MAGIC, index) | 
 | 35 |       frame_offset = index | 
 | 36 |       index += 4 | 
 | 37 |       flag = data[index] | 
 | 38 |       block_descriptor = data[index+1] | 
 | 39 |       block_checksum_present = flag & 0x10 != 0 | 
 | 40 |       content_size_present = flag & 0x8 != 0 | 
 | 41 |       content_checksum_present = flag & 0x4 != 0 | 
 | 42 |       dictionary_id = flag & 0x1 != 0 | 
 | 43 |       index += 2 | 
 | 44 |       content_size = None | 
 | 45 |       if content_size_present: | 
 | 46 |         content_size = struct.unpack("<Q", data[index:index+8]) | 
 | 47 |         index += 8 | 
 | 48 |       if dictionary_id: | 
 | 49 |         dictionary_id = struct.unpack("<L", data[index:index+4]) | 
 | 50 |         index += 4 | 
 | 51 |       header_checksum = data[index:index+1] | 
 | 52 |       index += 1 | 
 | 53 |       print("Lz4 frame at {}, content size: {}".format( | 
 | 54 |           frame_offset, content_size)) | 
 | 55 |       while index < len(data): | 
 | 56 |         (block_size,) = struct.unpack("<L", data[index:index+4]) | 
 | 57 |         uncompressed = block_size & 0x80000000 != 0 | 
 | 58 |         block_size &= 0x7FFFFFFF | 
 | 59 |         index += 4 | 
 | 60 |         index += block_size | 
 | 61 |         if index >= len(data) or block_size == 0: | 
 | 62 |           break | 
 | 63 |         print("Block uncompressed: {}, size: {}".format(uncompressed, block_size)) | 
 | 64 |     except ValueError: | 
 | 65 |       break | 
 | 66 |  | 
 | 67 |  | 
 | 68 | def main(argv): | 
 | 69 |   if len(argv) != 2: | 
 | 70 |     print("Usage:", argv[0], "<path to a file>") | 
 | 71 |     return 1 | 
 | 72 |   path = argv[1] | 
 | 73 |  | 
 | 74 |   with open(path, "rb") as fp: | 
 | 75 |     data = fp.read() | 
 | 76 |     scan_legacy_lz4_frames(data) | 
 | 77 |     scan_lz4_frames(data) | 
 | 78 |  | 
 | 79 |  | 
 | 80 | if __name__ == '__main__': | 
 | 81 |   import sys | 
 | 82 |   sys.exit(main(sys.argv)) |