Jeff Vander Stoep | bdfc030 | 2017-05-25 09:53:47 -0700 | [diff] [blame^] | 1 | from ctypes import * |
| 2 | import re |
| 3 | import os |
| 4 | |
| 5 | class TERule: |
| 6 | def __init__(self, rule): |
| 7 | data = rule.split(',') |
| 8 | self.flavor = data[0] |
| 9 | self.sctx = data[1] |
| 10 | self.tctx = data[2] |
| 11 | self.tclass = data[3] |
| 12 | self.perms = set((data[4].strip()).split(' ')) |
| 13 | self.rule = rule |
| 14 | |
| 15 | class Policy: |
| 16 | __Rules = None |
| 17 | __FcDict = None |
| 18 | __libsepolwrap = None |
| 19 | __policydbP = None |
| 20 | |
| 21 | # Return all file_contexts entries that map to the input Type. |
| 22 | def QueryFc(self, Type): |
| 23 | if Type in self.__FcDict: |
| 24 | return self.__FcDict[Type] |
| 25 | else: |
| 26 | return None |
| 27 | |
| 28 | # Return all attributes associated with a type if IsAttr=False or |
| 29 | # all types associated with an attribute if IsAttr=True |
| 30 | def QueryTypeAttribute(self, Type, IsAttr): |
| 31 | TypeIterP = self.__libsepolwrap.init_type_iter(self.__policydbP, |
| 32 | create_string_buffer(Type), c_bool(IsAttr)) |
| 33 | if (TypeIterP == None): |
| 34 | sys.exit("Failed to initialize type iterator") |
| 35 | buf = create_string_buffer(2048) |
| 36 | |
| 37 | while True: |
| 38 | ret = self.__libsepolwrap.get_type(buf, c_int(2048), |
| 39 | self.__policydbP, TypeIterP) |
| 40 | if ret == 0: |
| 41 | yield buf.value |
| 42 | continue |
| 43 | if ret == 1: |
| 44 | break; |
| 45 | # We should never get here. |
| 46 | sys.exit("Failed to import policy") |
| 47 | self.__libsepolwrap.destroy_type_iter(TypeIterP) |
| 48 | |
| 49 | # Return all TERules that match: |
| 50 | # (any scontext) or (any tcontext) or (any tclass) or (any perms), |
| 51 | # perms. |
| 52 | # Any unspecified paramenter will match all. |
| 53 | # |
| 54 | # Example: QueryTERule(tcontext=["foo", "bar"], perms=["entrypoint"]) |
| 55 | # Will return any rule with: |
| 56 | # (tcontext="foo" or tcontext="bar") and ("entrypoint" in perms) |
| 57 | def QueryTERule(self, **kwargs): |
| 58 | if self.__Rules is None: |
| 59 | self.__InitTERules() |
| 60 | for Rule in self.__Rules: |
| 61 | # Match source type |
| 62 | if "scontext" in kwargs and Rule.sctx not in kwargs['scontext']: |
| 63 | continue |
| 64 | # Match target type |
| 65 | if "tcontext" in kwargs and Rule.tctx not in kwargs['tcontext']: |
| 66 | continue |
| 67 | # Match target class |
| 68 | if "tclass" in kwargs and Rule.tclass not in kwargs['tclass']: |
| 69 | continue |
| 70 | # Match any perms |
| 71 | if "perms" in kwargs and not bool(Rule.perms & set(kwargs['perms'])): |
| 72 | continue |
| 73 | yield Rule |
| 74 | |
| 75 | |
| 76 | def __GetTERules(self, policydbP, avtabIterP): |
| 77 | if self.__Rules is None: |
| 78 | self.__Rules = set() |
| 79 | buf = create_string_buffer(2048) |
| 80 | ret = 0 |
| 81 | while True: |
| 82 | ret = self.__libsepolwrap.get_allow_rule(buf, c_int(2048), policydbP, avtabIterP) |
| 83 | if ret == 0: |
| 84 | Rule = TERule(buf.value) |
| 85 | self.__Rules.add(Rule) |
| 86 | continue |
| 87 | if ret == 1: |
| 88 | break; |
| 89 | # We should never get here. |
| 90 | sys.exit("Failed to import policy") |
| 91 | |
| 92 | def __InitTERules(self): |
| 93 | avtabIterP = self.__libsepolwrap.init_avtab(self.__policydbP) |
| 94 | if (avtabIterP == None): |
| 95 | sys.exit("Failed to initialize avtab") |
| 96 | self.__GetTERules(self.__policydbP, avtabIterP) |
| 97 | self.__libsepolwrap.destroy_avtab(avtabIterP) |
| 98 | avtabIterP = self.__libsepolwrap.init_cond_avtab(self.__policydbP) |
| 99 | if (avtabIterP == None): |
| 100 | sys.exit("Failed to initialize conditional avtab") |
| 101 | self.__GetTERules(self.__policydbP, avtabIterP) |
| 102 | self.__libsepolwrap.destroy_avtab(avtabIterP) |
| 103 | |
| 104 | # load ctypes-ified libsepol wrapper |
| 105 | def __InitLibsepolwrap(self): |
| 106 | self.__libsepolwrap = CDLL("libsepolwrap.so") |
| 107 | |
| 108 | # load file_contexts |
| 109 | def __InitFC(self, FcPaths): |
| 110 | fc = [] |
| 111 | for path in FcPaths: |
| 112 | if not os.path.exists(path): |
| 113 | sys.exit("file_contexts file " + path + " does not exist.") |
| 114 | fd = open(path, "r") |
| 115 | fc += fd.readlines() |
| 116 | fd.close() |
| 117 | self.__FcDict = {} |
| 118 | for i in fc: |
| 119 | rec = i.split() |
| 120 | try: |
| 121 | t = rec[-1].split(":")[2] |
| 122 | if t in self.__FcDict: |
| 123 | self.__FcDict[t].append(rec[0]) |
| 124 | else: |
| 125 | self.__FcDict[t] = [rec[0]] |
| 126 | except: |
| 127 | pass |
| 128 | |
| 129 | # load policy |
| 130 | def __InitPolicy(self, PolicyPath): |
| 131 | self.__policydbP = self.__libsepolwrap.load_policy(create_string_buffer(PolicyPath)) |
| 132 | if (self.__policydbP is None): |
| 133 | sys.exit("Failed to load policy") |
| 134 | |
| 135 | def __init__(self, PolicyPath, FcPaths): |
| 136 | self.__InitLibsepolwrap() |
| 137 | self.__InitFC(FcPaths) |
| 138 | self.__InitPolicy(PolicyPath) |
| 139 | |
| 140 | def __del__(self): |
| 141 | if self.__policydbP is not None: |
| 142 | self.__libsepolwrap.destroy_policy(self.__policydbP) |