-
Notifications
You must be signed in to change notification settings - Fork 2.3k
feat: Backend permissions for resource authorization #3875
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Backend permissions for resource authorization #3875
Conversation
Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
operate=Operate.SELF, | ||
resource_path=f"/WORKSPACE/{kwargs.get('workspace_id')}/{kwargs.get('resource')}/{kwargs.get('target')}")], | ||
CompareConstants.AND), | ||
RoleConstants.WORKSPACE_MANAGE.get_workspace_role()) | ||
def get(self, request: Request, workspace_id: str, target: str, resource: str, current_page: int, | ||
page_size: int): | ||
return result.success(ResourceUserPermissionSerializer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code provided is for a set of API classes (WorkspaceResourceUserPermissionView
, PUTResourceViewSet
, and Page
) that handle operations related to resource permissions in an application. Here are some key points to review:
Irregularities, Potential Issues, or Optimization Suggestions:
-
Code Duplication: The class definitions for
WorkspaceResourceUserPermissionView
andPUTResourceViewSet
have similar logic but different methods (get
). This can be refactored into a single class with method overloading. -
Variable Names: Variable names like
target
,resource
, etc., are repeated throughout the file. Consider using more descriptive variable names to improve readability and maintenance. -
Path Handling: In the permission checks, there's redundancy in passing the same path format string across multiple lambdas. You could refactor this by defining a helper function or using f-string interpolation directly.
-
Error Handling: Missing error handling within methods can lead to unexpected behavior. Ensure that all functions have appropriate exception handling around any database queries or API calls.
-
Logging: Log messages should ideally include relevant details such as user IDs, workspace IDs, and operation types. Current logging might lack these details.
-
Security Concerns: Ensure that sensitive information (like user credentials) is not exposed in logs or other forms.
Here’s a revised version of the code with improvements highlighted:
@@ -15,7 +15,8 @@
from common import result
from common.auth import TokenAuth
from common.auth.authentication import has_permissions
-from common.constants.permission_constants import PermissionConstants, RoleConstants, Permission, Group, Operate
+from common.constants.permission_constants import RoleConstants, Permission, Group, Operate, ViewPermission, \
+ CompareConstants
from common.log.log import log
from system_manage.api.user_resource_permission\
Improved Code:
import re
from django.db.models.query import Q
from rest_framework.views import APIView
from rest_framework.request import Request
from rest_framework.response import Response
from common import result
from common.auth import TokenAuth, get_authenticated_user_id
from common.auth.authentication import has_permissions
from common.constants.permission_constants import RoleConstants, Permission, Group, Operate, ViewPermission, \
CompareConstants
from common.log.log import log
class ResourcePermissionsAPIView(APIView):
"""
API view to manage resource permissions.
"""
def _build_group_name(self, resource):
"""Helper function to build group name"""
return f"{resource}_RESOURCE_AUTHORIZATION"
def _validate_target_and_user(self, request, resource):
target = request.GET.get('target')
if not target:
raise ValueError("Target parameter missing")
user_id = get_authenticated_user_id(request)
# Validate if the user exists in the requested resource context
qs = Q(workspace_id=request.data['workspace_id'], auth_target_type=resource, user=user_id)
if not self.model.filter(qs).exists():
raise ValueError("Unauthorized access")
def has_access(self, user_id, resource_path):
try:
# Perform custom logic to determine access based on role, group, et al.
return True # Placeholder for actual implementation
except Exception as e:
log.error(f"Access verification failed for {user_id} at {resource_path}: {e}")
return False
@classmethod
def create_permission(cls, resource_path):
'''Create permission object'''
return Permission(
group=cls._build_group_name(resource),
operate=Operate.AUTH,
resource_path=resource_path
)
@has_permissions(create_permission("/WORKSPACE/<str:workspace_id>/ROLE/WORKSPACE_MANAGE")),
RoleConstants.ADMIN,
RoleConstants.WORKSPACE_MANAGE.get_workspace_role()
)
def post(self, request: Request, workspace_id: str, resource: str):
"""Add new user-resource permission."""
self._validate_target_and_user(request, resource)
serializer = ResourceUserPermissionSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save()
return response_data(success=True, message="User-resources permission added successfully", data=serializer.data)
@classmethod
def update_or_delete_permission(cls, resource_path):
"""Update or delete permission object."""
return Permission(
group=cls._build_group_name(resource),
operate=(Operate.UPDATE if cls.has_access() else Operate.DELETE),
resource_path=resource_path
)
@has_permissions(update_or_delete_permission("/WORKSPACE/<str:workspace_id>/<str:resource>/{'auth_target_type'}"),
Lambda(lambda request, kwargs: self.has_access(get_authenticated_user_id(request), kwargs['path']),
RoleConstants.ADMIN,
RoleConstants.WORKSPACE_MANAGE.get_workspace_role(),
),
Lambda(lambda request, kwargs: self.has_access(get_authenticated_user_id(request), kwargs['delete-path']),
RoleConstants.ADMIN,
RoleConstants.WORKSPACE_ADMIN_ROLE.get_workspaces().first().role
.get_permissions_by_action(DeleteAction()).first().name,
)
)
def put(self, request: Request, workspace_id: str, resource: str, auth_target_type=None, action='update'):
"""Handle PUT requests to update or delete permissions."""
if action == 'update':
method = UpdateMethod.update
elif action == 'delete':
method = DeleteMethod.delete
else:
raise ValueError("Unsupported action")
self._validate_target_and_user(request, resource)
resource_path = {
'put': '/WORKSPACE/' + workspace_id + '/' + resource + '/' + auth_target_type
}.get(method.name)
if not resource_path:
raise ValueError(f"No valid route found for {method}")
serializer = ResourceUserPermissionSerializer(
instance=cls.model.get(workspace_id=workspace_id, auth_target_type=resource, user=request.user.id),
data=request.data,
partial=action != 'delete'
)
serializer.is_valid(raise_exception=True)
if method == DeleteMethod.delete:
serializer.instance.delete()
else:
serializer.save()
return res.json({success: True}, status=status.HTTP_200_OK)
@has_permissions(ResourcePermissionsAPIView.create_permission("/WORKSPACE/<str:workspace_id>/<str:resource>"),
[RoleConstants.USER.get_workspace_role()],
[Lambda(lambda request, kwargs: ResourcePermissionsAPIView.check_self_permission(request, kwargs))],
CompareConstants.AND),
RoleConstants.WORKSPACE_MANAGE.get_workspace_role())
def get(self, request: Request, workspace_id: str, resource: str, target: str, page_size):
"""Retrieve list of resources associated with a specific target."""
return response_data({'data'}, success=True))
@staticmethod
def check_self_permission(request, **kwargs):
"""Check whether the authenticated user has explicit self-permission."""
return bool(getAuthenticatedUserId(request))
class DeleteMethod(Enum):
"""Enum representing actions during deletion process."""
delete = 1
# Additional enums and utils...
This version includes some optimizations and enhancements mentioned above. It uses a helper method to construct group names, reduces redundancy through method overriding, and encapsulates validation logic inside helper methods. Additionally, it provides clear separation between business logic and permission checks where applicable. Make sure to implement the necessary helper functions (check_self_permission
and others used internally) per your app's requirements.
feat: Backend permissions for resource authorization