Skip to content
This repository has been archived by the owner on Aug 5, 2024. It is now read-only.

Commit

Permalink
fix: Start adding some API tests
Browse files Browse the repository at this point in the history
  • Loading branch information
robsavoye committed Mar 24, 2024
1 parent 0bb748a commit 51fb223
Showing 1 changed file with 44 additions and 22 deletions.
66 changes: 44 additions & 22 deletions tm_admin/access.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
from codetiming import Timer
from tm_admin.yamlfile import YamlFile
import asyncio
from rbac import RBAC
from enum import IntEnum
from rbac import RBAC, RBACConfigurationError, RBACAuthorizationError
from enum import IntEnum, StrEnum
# This is only used for debugging output during development
import json

Expand All @@ -43,9 +43,17 @@
class Operation(IntEnum):
READ = 1
CREATE = 2
MODIFY = 3
UPDATE = 3
DELETE = 4

class Roles(IntEnum):
READ_ONLY = 1
ORGANIZATION_ADMIN = 4
PROJECT_MANAGER = 5
ASSOCIATE_MANAGER = 6
VALIDATOR = 7
MAPPER = 8

class Access(object):
def __init__(self,
yamlfile: str = None,
Expand All @@ -57,6 +65,7 @@ def __init__(self,
self.permissions = dict()
self.domains = dict()
self.roles = dict()
self.subjects = dict()
# self.subjects = dict()

# Convert to a dict to make it easy to query
Expand All @@ -74,12 +83,14 @@ def __init__(self,
if category == "domains":
for entry in values:
self.domains[entry] = self.rbac.create_domain(entry)
log.debug(f"Creating domain for {entry}")
continue
if category == "permissions":
for roles in values:
[[k, v]] = roles.items()
self.permissions[k] = dict()
self.roles[k] = self.rbac.create_role(k)
log.debug(f"Creating role for {k}")
# print(f"\t{k}, {v}")
if type(v) == list:
for i in v:
Expand All @@ -92,6 +103,7 @@ def __init__(self,
elif k1 == 'tables':
# print(f"TABLES: {v1}")
self.permissions[k]['tables'] = v1
log.debug(f"Setting permissions for {k} = {v1}")
else:
# This is an access permission
# print(f"STR: {k} = {i}")
Expand All @@ -101,39 +113,46 @@ def __init__(self,
# print(json.dumps(self.permissions, indent=4))
# print("------------------")
for k, v in self.permissions.items():
print(k)
if 'children' in v:
# print(f"CHILD: {k} = {v['children']}")
self.rbac.create_role(k, children=v['children'])
elif 'tables' in v:
# print(f"TABLE: {k1} = {v['tables']}")
print(f"TABLE: {k} = {v['tables']}")
for table in v['tables']:
log.debug(f"Adding permission '{v['access']}' for '{k.upper()}' on '{table}'")
self.roles[k].add_permission(self.perms[v['access']], self.domains[table])

for role, data in self.roles.items():
# [[ k, v]] =
subject = self.rbac.create_subject(role)
subject.authorize(self.roles[role])
# for role, data in self.roles.items():
for role in self.roles:
self.subjects[role] = self.rbac.create_subject(role)
self.subjects[role].authorize(self.roles[role])

self.rbac.lock()

async def check(self,
domain: str,
access: str,
userrole: Userrole = None,
teamrole: Teamrole = None,
role: Roles = Roles.MAPPER,
op: Operation = Operation.READ,
):
"""
Check a role in a domain for access permissions.
Args:
domain (str): The domain to check for the role
access (str): The access permissions
userrole (Userrole): A user role
teamrole (Teamrole): A team role
] role (Roles): The role
op (Operation): The access permissions
"""
return self.rbac.go(self.roles[role.lower()],
xxx = None
try:
xxx = self.rbac.go(self.subjects[role.name.lower()],
self.domains[domain.lower()],
self.perms[access.lower()])
self.perms[op.name.lower()])
except RBACAuthorizationError as e :
log.error(f"{e}")
print(f"\tARGS: {domain}, {role.name}, {op.name}")

return xxx

async def main():
"""This main function lets this class be run standalone by a bash script."""
Expand All @@ -157,12 +176,15 @@ async def main():
)

acl = Access(args.config)
role = Userrole(Userrole.PROJECT_MANAGER)
op = Operation(Operation.READ)
# await acl.check('projects', userrole=op)

# rights = await roles.getRights(Userrole.PROJECT_MANAGER)
# print(rights)
await acl.check('projects', Roles.MAPPER, Operation.READ)
await acl.check('users', Roles.MAPPER, Operation.READ)
await acl.check('tasks', Roles.MAPPER, Operation.READ)
await acl.check('messages', Roles.MAPPER, Operation.READ)
await acl.check('campaigns', Roles.MAPPER, Operation.READ)
await acl.check('campaigns', Roles.MAPPER, Operation.CREATE)

await acl.check('projects', Roles.VALIDATOR, Operation.UPDATE)
# await acl.check('tasks', Roles.VALIDATOR, Operation.UPDATE)

if __name__ == "__main__":
"""This is just a hook so this file can be run standalone during development."""
Expand Down

0 comments on commit 51fb223

Please sign in to comment.