Inseob Kim | 9cda397 | 2021-10-12 22:59:12 +0900 | [diff] [blame^] | 1 | #!/usr/bin/env python |
| 2 | # |
| 3 | # Copyright 2021 Google Inc. All rights reserved. |
| 4 | # |
| 5 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | # you may not use this file except in compliance with the License. |
| 7 | # You may obtain a copy of the License at |
| 8 | # |
| 9 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | # |
| 11 | # Unless required by applicable law or agreed to in writing, software |
| 12 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | # See the License for the specific language governing permissions and |
| 15 | # limitations under the License. |
| 16 | |
| 17 | """ |
| 18 | `fsverity_metadata_generator` generates fsverity metadata and signature to a |
| 19 | container file |
| 20 | |
| 21 | This actually is a simple wrapper around the `fsverity` program. A file is |
| 22 | signed by the program which produces the PKCS#7 signature file, merkle tree file |
| 23 | , and the fsverity_descriptor file. Then the files are packed into a single |
| 24 | output file so that the information about the signing stays together. |
| 25 | |
| 26 | Currently, the output of this script is used by `fd_server` which is the host- |
| 27 | side backend of an authfs filesystem. `fd_server` uses this file in case when |
| 28 | the underlying filesystem (ext4, etc.) on the device doesn't support the |
| 29 | fsverity feature natively in which case the information is read directly from |
| 30 | the filesystem using ioctl. |
| 31 | """ |
| 32 | |
| 33 | import argparse |
| 34 | import os |
| 35 | import re |
| 36 | import shutil |
| 37 | import subprocess |
| 38 | import sys |
| 39 | import tempfile |
| 40 | from struct import * |
| 41 | |
| 42 | class TempDirectory(object): |
| 43 | def __enter__(self): |
| 44 | self.name = tempfile.mkdtemp() |
| 45 | return self.name |
| 46 | |
| 47 | def __exit__(self, *unused): |
| 48 | shutil.rmtree(self.name) |
| 49 | |
| 50 | class FSVerityMetadataGenerator: |
| 51 | def __init__(self, fsverity_path): |
| 52 | self._fsverity_path = fsverity_path |
| 53 | |
| 54 | # Default values for some properties |
| 55 | self.set_hash_alg("sha256") |
| 56 | self.set_signature('none') |
| 57 | |
| 58 | def set_key(self, key): |
| 59 | self._key = key |
| 60 | |
| 61 | def set_cert(self, cert): |
| 62 | self._cert = cert |
| 63 | |
| 64 | def set_hash_alg(self, hash_alg): |
| 65 | self._hash_alg = hash_alg |
| 66 | |
| 67 | def set_signature(self, signature): |
| 68 | self._signature = signature |
| 69 | |
| 70 | def _raw_signature(pkcs7_sig_file): |
| 71 | """ Extracts raw signature from DER formatted PKCS#7 detached signature file |
| 72 | |
| 73 | Do that by parsing the ASN.1 tree to get the location of the signature |
| 74 | in the file and then read the portion. |
| 75 | """ |
| 76 | |
| 77 | # Note: there seems to be no public python API (even in 3p modules) that |
| 78 | # provides direct access to the raw signature at this moment. So, `openssl |
| 79 | # asn1parse` commandline tool is used instead. |
| 80 | cmd = ['openssl', 'asn1parse'] |
| 81 | cmd.extend(['-inform', 'DER']) |
| 82 | cmd.extend(['-in', pkcs7_sig_file]) |
| 83 | out = subprocess.check_output(cmd, universal_newlines=True) |
| 84 | |
| 85 | # The signature is the last element in the tree |
| 86 | last_line = out.splitlines()[-1] |
| 87 | m = re.search('(\d+):.*hl=\s*(\d+)\s*l=\s*(\d+)\s*.*OCTET STRING', last_line) |
| 88 | if not m: |
| 89 | raise RuntimeError("Failed to parse asn1parse output: " + out) |
| 90 | offset = int(m.group(1)) |
| 91 | header_len = int(m.group(2)) |
| 92 | size = int(m.group(3)) |
| 93 | with open(pkcs7_sig_file, 'rb') as f: |
| 94 | f.seek(offset + header_len) |
| 95 | return f.read(size) |
| 96 | |
| 97 | def generate(self, input_file, output_file=None): |
| 98 | if self._signature != 'none': |
| 99 | if not self._key: |
| 100 | raise RuntimeError("key must be specified.") |
| 101 | if not self._cert: |
| 102 | raise RuntimeError("cert must be specified.") |
| 103 | |
| 104 | if not output_file: |
| 105 | output_file = input_file + '.fsv_meta' |
| 106 | |
| 107 | with TempDirectory() as temp_dir: |
| 108 | self._do_generate(input_file, output_file, temp_dir) |
| 109 | |
| 110 | def _do_generate(self, input_file, output_file, work_dir): |
| 111 | # temporary files |
| 112 | desc_file = os.path.join(work_dir, 'desc') |
| 113 | merkletree_file = os.path.join(work_dir, 'merkletree') |
| 114 | sig_file = os.path.join(work_dir, 'signature') |
| 115 | |
| 116 | # run the fsverity util to create the temporary files |
| 117 | cmd = [self._fsverity_path] |
| 118 | if self._signature == 'none': |
| 119 | cmd.append('digest') |
| 120 | cmd.append(input_file) |
| 121 | else: |
| 122 | cmd.append('sign') |
| 123 | cmd.append(input_file) |
| 124 | cmd.append(sig_file) |
| 125 | |
| 126 | # convert DER private key to PEM |
| 127 | pem_key = os.path.join(work_dir, 'key.pem') |
| 128 | key_cmd = ['openssl', 'pkcs8'] |
| 129 | key_cmd.extend(['-inform', 'DER']) |
| 130 | key_cmd.extend(['-in', self._key]) |
| 131 | key_cmd.extend(['-nocrypt']) |
| 132 | key_cmd.extend(['-out', pem_key]) |
| 133 | subprocess.check_call(key_cmd) |
| 134 | |
| 135 | cmd.extend(['--key', pem_key]) |
| 136 | cmd.extend(['--cert', self._cert]) |
| 137 | cmd.extend(['--hash-alg', self._hash_alg]) |
| 138 | cmd.extend(['--block-size', '4096']) |
| 139 | cmd.extend(['--out-merkle-tree', merkletree_file]) |
| 140 | cmd.extend(['--out-descriptor', desc_file]) |
| 141 | subprocess.check_call(cmd, stdout=open(os.devnull, 'w')) |
| 142 | |
| 143 | with open(output_file, 'wb') as out: |
| 144 | # 1. version |
| 145 | out.write(pack('<I', 1)) |
| 146 | |
| 147 | # 2. fsverity_descriptor |
| 148 | with open(desc_file, 'rb') as f: |
| 149 | out.write(f.read()) |
| 150 | |
| 151 | # 3. signature |
| 152 | SIG_TYPE_NONE = 0 |
| 153 | SIG_TYPE_PKCS7 = 1 |
| 154 | SIG_TYPE_RAW = 2 |
| 155 | if self._signature == 'raw': |
| 156 | out.write(pack('<I', SIG_TYPE_RAW)) |
| 157 | sig = self._raw_signature(sig_file) |
| 158 | out.write(pack('<I', len(sig))) |
| 159 | out.write(sig) |
| 160 | elif self._signature == 'pkcs7': |
| 161 | with open(sig_file, 'rb') as f: |
| 162 | out.write(pack('<I', SIG_TYPE_PKCS7)) |
| 163 | sig = f.read() |
| 164 | out.write(pack('<I', len(sig))) |
| 165 | out.write(sig) |
| 166 | else: |
| 167 | out.write(pack('<I', SIG_TYPE_NONE)) |
| 168 | |
| 169 | # 4. merkle tree |
| 170 | with open(merkletree_file, 'rb') as f: |
| 171 | # merkle tree is placed at the next nearest page boundary to make |
| 172 | # mmapping possible |
| 173 | out.seek(next_page(out.tell())) |
| 174 | out.write(f.read()) |
| 175 | |
| 176 | def next_page(n): |
| 177 | """ Returns the next nearest page boundary from `n` """ |
| 178 | PAGE_SIZE = 4096 |
| 179 | return (n + PAGE_SIZE - 1) // PAGE_SIZE * PAGE_SIZE |
| 180 | |
| 181 | if __name__ == '__main__': |
| 182 | p = argparse.ArgumentParser() |
| 183 | p.add_argument( |
| 184 | '--output', |
| 185 | help='output file. If omitted, print to <INPUT>.fsv_meta', |
| 186 | metavar='output', |
| 187 | default=None) |
| 188 | p.add_argument( |
| 189 | 'input', |
| 190 | help='input file to be signed') |
| 191 | p.add_argument( |
| 192 | '--key', |
| 193 | help='PKCS#8 private key file in DER format') |
| 194 | p.add_argument( |
| 195 | '--cert', |
| 196 | help='x509 certificate file in PEM format') |
| 197 | p.add_argument( |
| 198 | '--hash-alg', |
| 199 | help='hash algorithm to use to build the merkle tree', |
| 200 | choices=['sha256', 'sha512'], |
| 201 | default='sha256') |
| 202 | p.add_argument( |
| 203 | '--signature', |
| 204 | help='format for signature', |
| 205 | choices=['none', 'raw', 'pkcs7'], |
| 206 | default='none') |
| 207 | p.add_argument( |
| 208 | '--fsverity-path', |
| 209 | help='path to the fsverity program', |
| 210 | required=True) |
| 211 | args = p.parse_args(sys.argv[1:]) |
| 212 | |
| 213 | generator = FSVerityMetadataGenerator(args.fsverity_path) |
| 214 | generator.set_signature(args.signature) |
| 215 | if args.signature == 'none': |
| 216 | if args.key or args.cert: |
| 217 | raise ValueError("When signature is none, key and cert can't be set") |
| 218 | else: |
| 219 | if not args.key or not args.cert: |
| 220 | raise ValueError("To generate signature, key and cert must be set") |
| 221 | generator.set_key(args.key) |
| 222 | generator.set_cert(args.cert) |
| 223 | generator.set_hash_alg(args.hash_alg) |
| 224 | generator.generate(args.input, args.output) |