forked from casbin/pycasbin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
distributed_enforcer.py
132 lines (105 loc) · 4.75 KB
/
distributed_enforcer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from casbin import SyncedEnforcer
import logging
from casbin.persist import batch_adapter
from casbin.model.policy_op import PolicyOp
from casbin.persist.adapters import update_adapter
class DistributedEnforcer(SyncedEnforcer):
"""DistributedEnforcer wraps SyncedEnforcer for dispatcher."""
def __init__(self, model=None, adapter=None):
self.logger = logging.getLogger()
SyncedEnforcer.__init__(self, model, adapter)
def add_policy_self(self, should_persist, sec, ptype, rules):
"""
AddPolicySelf provides a method for dispatcher to add authorization rules to the current policy.
The function returns the rules affected and error.
"""
no_exists_policy = []
for rule in rules:
if not self.get_model().has_policy(sec, ptype, rule):
no_exists_policy.append(rule)
if should_persist:
try:
if isinstance(self.adapter, batch_adapter):
self.adapter.add_policies(sec, ptype, rules)
except Exception as e:
self.logger.log("An error occurred: " + e)
self.get_model().add_policies(sec, ptype, no_exists_policy)
if sec == "g":
try:
self.build_incremental_role_links(PolicyOp.Policy_add, ptype, no_exists_policy)
except Exception as e:
self.logger.log("An exception occurred: " + e)
return no_exists_policy
return no_exists_policy
def remove_policy_self(self, should_persist, sec, ptype, rules):
"""
remove_policy_self provides a method for dispatcher to remove policies from current policy.
The function returns the rules affected and error.
"""
if(should_persist):
try:
if(isinstance(self.adapter, batch_adapter)):
self.adapter.remove_policy(sec, ptype, rules)
except Exception as e:
self.logger.log("An exception occurred: " + e)
effected = self.get_model().remove_policies_with_effected(sec, ptype, rules)
if sec == "g":
try:
self.build_incremental_role_links(PolicyOp.Policy_remove, ptype, rules)
except Exception as e:
self.logger.log("An exception occurred: " + e)
return effected
return effected
def remove_filtered_policy_self(self, should_persist, sec, ptype, field_index, *field_values):
"""
remove_filtered_policy_self provides a method for dispatcher to remove an authorization
rule from the current policy,field filters can be specified.
The function returns the rules affected and error.
"""
if should_persist:
try:
self.adapter.remove_filtered_policy(sec, ptype, field_index, field_values)
except Exception as e:
self.logger.log("An exception occurred: " + e)
effects = self.get_model().remove_filtered_policy_returns_effects(sec, ptype, field_index, field_values)
if sec == "g":
try:
self.build_incremental_role_links(PolicyOp.Policy_remove, ptype, effects)
except Exception as e:
self.logger.log("An exception occurred: " + e)
return effects
return effects
def clear_policy_self(self, should_persist):
"""
clear_policy_self provides a method for dispatcher to clear all rules from the current policy.
"""
if should_persist:
try:
self.adapter.save_policy(None)
except Exception as e:
self.logger.log("An exception occurred: " + e)
self.get_model().clear_policy()
def update_policy_self(self, should_persist, sec, ptype, old_rule, new_rule):
"""
update_policy_self provides a method for dispatcher to update an authorization rule from the current policy.
"""
if should_persist:
try:
if isinstance(self.adapter, update_adapter):
self.adapter.update_policy(sec, ptype, old_rule, new_rule)
except Exception as e:
self.logger.log("An exception occurred: " + e)
return False
rule_updated = self.get_model().update_policy(sec, ptype, old_rule, new_rule)
if not rule_updated:
return False
if sec == "g":
try:
self.build_incremental_role_links(PolicyOp.Policy_remove, ptype, [old_rule])
except Exception as e:
return False
try:
self.build_incremental_role_links(PolicyOp.Policy_add, ptype, [new_rule])
except Exception as e:
return False
return True