-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1 from princekrroshan01/master
first commit
- Loading branch information
Showing
5 changed files
with
159 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from .adapter import CasbinRule, Adapter |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
from casbin import persist | ||
from sqlobject import SQLObject, StringCol, sqlhub, connectionForURI | ||
|
||
|
||
class CasbinRule(SQLObject): | ||
class sqlmeta: | ||
|
||
table = "casbin_rule" | ||
|
||
ptype = StringCol(length=255) | ||
v0 = StringCol(length=255, default=None) | ||
v1 = StringCol(length=255, default=None) | ||
v2 = StringCol(length=255, default=None) | ||
v3 = StringCol(length=255, default=None) | ||
v4 = StringCol(length=255, default=None) | ||
v5 = StringCol(length=255, default=None) | ||
|
||
def __str__(self): | ||
arr = [self.ptype] | ||
for v in (self.v0, self.v1, self.v2, self.v3, self.v4, self.v5): | ||
if v is None: | ||
break | ||
arr.append(v) | ||
return ", ".join(arr) | ||
|
||
def __repr__(self): | ||
return '<CasbinRule {}: "{}">'.format(self.id, str(self)) | ||
|
||
|
||
class Adapter(persist.Adapter): | ||
"""the interface for Casbin adapters.""" | ||
|
||
def __init__(self, connection_string): | ||
self._conhandler = connectionForURI(connection_string) | ||
sqlhub.processConnection = self._conhandler | ||
|
||
def load_policy(self, model): | ||
"""loads all policy rules from the storage.""" | ||
count = CasbinRule.select().count() | ||
for i in range(1, 1 + count): | ||
line = CasbinRule.get(i) | ||
persist.load_policy_line(str(line), model) | ||
|
||
def _save_policy_line(self, ptype, rule): | ||
line = CasbinRule.selectBy(ptype=ptype) | ||
for i, v in enumerate(rule): | ||
setattr(line, "v{}".format(i), v) | ||
|
||
def save_policy(self, model): | ||
"""saves all policy rules to the storage.""" | ||
for sec in ["p", "g"]: | ||
if sec not in model.model.keys(): | ||
continue | ||
for ptype, ast in model.model[sec].items(): | ||
for rule in ast.policy: | ||
self._save_policy_line(ptype, rule) | ||
|
||
return True | ||
|
||
def add_policy(self, sec, ptype, rule): | ||
"""adds a policy rule to the storage.""" | ||
self._save_policy_line(ptype, rule) | ||
|
||
def remove_policy(self, sec, ptype, rule): | ||
"""removes a policy rule from the storage.""" | ||
pass | ||
|
||
def remove_filtered_policy(self, sec, ptype, field_index, *field_values): | ||
"""removes policy rules that match the filter from the storage. | ||
This is part of the Auto-Save feature. | ||
""" | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
[request_definition] | ||
r = sub, obj, act | ||
|
||
[policy_definition] | ||
p = sub, obj, act | ||
|
||
[role_definition] | ||
g = _, _ | ||
|
||
[policy_effect] | ||
e = some(where (p.eft == allow)) | ||
|
||
[matchers] | ||
m = g(r.sub, p.sub) && r.obj == p.obj && r.act == p.act |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
p, alice, data1, read | ||
p, bob, data2, write | ||
p, data2_admin, data2, read | ||
p, data2_admin, data2, write | ||
|
||
g, alice, data2_admin |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
import os | ||
import unittest | ||
|
||
import casbin | ||
|
||
from casbin_sqlobject_adapter import Adapter | ||
from casbin_sqlobject_adapter import CasbinRule | ||
|
||
|
||
def get_fixture(path): | ||
dir_path = os.path.split(os.path.realpath(__file__))[0] + "/" | ||
return os.path.abspath(dir_path + path) | ||
|
||
|
||
def get_enforcer(): | ||
con_string = "sqlite:/:memory:" | ||
adapter = Adapter(con_string) | ||
|
||
CasbinRule.createTable(ifNotExists=True) | ||
CasbinRule(ptype="p", v0="alice", v1="data1", v2="read") | ||
CasbinRule(ptype="p", v0="bob", v1="data2", v2="write") | ||
CasbinRule(ptype="p", v0="data2_admin", v1="data2", v2="read") | ||
CasbinRule(ptype="p", v0="data2_admin", v1="data2", v2="write") | ||
CasbinRule(ptype="g", v0="alice", v1="data2_admin") | ||
|
||
return casbin.Enforcer(get_fixture("rbac_model.conf"), adapter) | ||
|
||
|
||
class TestConfig(unittest.TestCase): | ||
def test_enforcer_basic(self): | ||
e = get_enforcer() | ||
|
||
self.assertTrue(e.enforce("alice", "data1", "read")) | ||
self.assertFalse(e.enforce("bob", "data1", "read")) | ||
self.assertTrue(e.enforce("bob", "data2", "write")) | ||
self.assertTrue(e.enforce("alice", "data2", "read")) | ||
self.assertTrue(e.enforce("alice", "data2", "write")) | ||
|
||
def test_add_policy(self): | ||
e = get_enforcer() | ||
|
||
self.assertFalse(e.enforce("eve", "data3", "read")) | ||
res = e.add_permission_for_user("eve", "data3", "read") | ||
self.assertTrue(res) | ||
self.assertTrue(e.enforce("eve", "data3", "read")) | ||
|
||
def test_save_policy(self): | ||
e = get_enforcer() | ||
self.assertFalse(e.enforce("alice", "data4", "read")) | ||
|
||
model = e.get_model() | ||
model.clear_policy() | ||
|
||
model.add_policy("p", "p", ["alice", "data4", "read"]) | ||
|
||
adapter = e.get_adapter() | ||
adapter.save_policy(model) | ||
self.assertTrue(e.enforce("alice", "data4", "read")) | ||
|
||
def test_str(self): | ||
rule = CasbinRule(ptype="p", v0="alice", v1="data1", v2="read") | ||
self.assertEqual(str(rule), "p, alice, data1, read") | ||
|
||
def test_repr(self): | ||
rule = CasbinRule(ptype="p", v0="alice", v1="data1", v2="read") | ||
self.assertRegex(repr(rule), r'<CasbinRule \d+: "p, alice, data1, read">') |