diff --git a/apps/authentication/api/connection_token.py b/apps/authentication/api/connection_token.py index 37fb3e968e1c..05135bc97ece 100644 --- a/apps/authentication/api/connection_token.py +++ b/apps/authentication/api/connection_token.py @@ -472,6 +472,7 @@ class SuperConnectionTokenViewSet(ConnectionTokenViewSet): rbac_perms = { 'create': 'authentication.add_superconnectiontoken', 'renewal': 'authentication.add_superconnectiontoken', + 'check': 'authentication.view_superconnectiontoken', 'get_secret_detail': 'authentication.view_superconnectiontokensecret', 'get_applet_info': 'authentication.view_superconnectiontoken', 'release_applet_account': 'authentication.view_superconnectiontoken', @@ -484,6 +485,28 @@ def get_queryset(self): def get_user(self, serializer): return serializer.validated_data.get('user') + @action(methods=['GET'], detail=True, url_path='check') + def check(self, request, *args, **kwargs): + instance = self.get_object() + data = { + "detail": "OK", + "code": "perm_ok", + "expired": instance.is_expired + } + try: + self._validate_perm( + instance.user, + instance.asset, + instance.account, + instance.protocol + ) + except JMSException as e: + data['code'] = e.detail.code + data['detail'] = str(e.detail) + return Response(data=data, status=status.HTTP_400_BAD_REQUEST) + + return Response(data=data, status=status.HTTP_200_OK) + @action(methods=['PATCH'], detail=False) def renewal(self, request, *args, **kwargs): from common.utils.timezone import as_current_tz diff --git a/apps/terminal/serializers/session.py b/apps/terminal/serializers/session.py index d54111c892bb..924c11049e2e 100644 --- a/apps/terminal/serializers/session.py +++ b/apps/terminal/serializers/session.py @@ -1,10 +1,12 @@ from django.utils.translation import gettext_lazy as _ from rest_framework import serializers +from assets.models import Asset from common.serializers.fields import LabeledChoiceField from common.utils import pretty_string from orgs.mixins.serializers import BulkOrgResourceModelSerializer from terminal.session_lifecycle import lifecycle_events_map +from users.models import User from .terminal import TerminalSmallSerializer from ..const import SessionType, SessionErrorReason from ..models import Session @@ -73,6 +75,38 @@ def validate_asset(self, value): value = pretty_string(value, max_length=max_length) return value + @staticmethod + def get_valid_instance(model_cls, instance_id, field_name, error_message, validation_attr='is_active'): + if instance_id is None: + raise serializers.ValidationError({field_name: _('This field is required.')}) + instance = model_cls.objects.filter(id=instance_id).first() + if not instance or not getattr(instance, validation_attr, False): + raise serializers.ValidationError({field_name: error_message}) + return instance + + def create(self, validated_data): + user_id = validated_data.get('user_id') + asset_id = validated_data.get('asset_id') + + user = self.get_valid_instance( + User, + user_id, + 'user_id', + _('No user or invalid user'), + validation_attr='is_valid' + ) + + asset = self.get_valid_instance( + Asset, + asset_id, + 'asset_id', + _('No asset or invalid asset') + ) + + validated_data['user'] = str(user) + validated_data['asset'] = str(asset) + return super().create(validated_data) + class SessionDisplaySerializer(SessionSerializer): command_amount = serializers.IntegerField(read_only=True, label=_('Command amount'))