DRF Best Practices01: DynamicFieldsModelSerializer

DRF 默认权限设定无法对每个接口进行明细的权限设定, 对其增加DrfActionPermission, 利用ViewSet action,为所有接口设定权限。

# settings.py
REST_FRAMEWORK = {
    "DEFAULT_PERMISSION_CLASSES": (
        "rest_framework.permissions.IsAuthenticated",
        "XXX.contrib.drf.permissions.DrfActionPermission"
    )
}

# 非ModelViewSet 类,是否放行
REST_DRF_ACTION_PERMISSION_DEFAULT = False
class DrfActionPermission(permissions.BasePermission):
    authenticated_users_only = True
    action_perms_map = {
        "GET": ["%(app_label)s.action_get_%(model_name)s__%(action_name)s"],
        "OPTIONS": [],
        "HEAD": [],
        "POST": ["%(app_label)s.action_post_%(model_name)s__%(action_name)s"],
        "PUT": ["%(app_label)s.action_put_%(model_name)s__%(action_name)s"],
        "PATCH": ["%(app_label)s.action_patch_%(model_name)s__%(action_name)s"],
        "DELETE": ["%(app_label)s.action_delete_%(model_name)s__%(action_name)s"],
    }

    def _queryset(self, view):
        assert (
            hasattr(view, "get_queryset") or getattr(view, "queryset", None) is not None
        ), (
            "Cannot apply {} on a view that does not set "
            "`.queryset` or have a `.get_queryset()` method."
        ).format(
            self.__class__.__name__
        )

        if hasattr(view, "get_queryset"):
            queryset = view.get_queryset()
            assert queryset is not None, "{}.get_queryset() returned None".format(
                view.__class__.__name__
            )
            return queryset
        return view.queryset

    def _get_model_cls(self, view):
        if hasattr(view, "permission_object"):
            return view.permission_object
        queryset = self._queryset(view)
        return queryset.model

    def get_action_required_permissions(self, method, model_cls, action_name):
        """
        Given a model and an HTTP method, return the list of permission
        codes that the user is required to have.
        """
        kwargs = {
            "app_label": model_cls._meta.app_label,
            "model_name": model_cls._meta.model_name,
            "action_name": action_name,
        }

        if method not in self.action_perms_map:
            raise exceptions.MethodNotAllowed(method)

        return [perm % kwargs for perm in self.action_perms_map[method]]

    def has_action_permission(self, request, view):
        if getattr(view, "_ignore_model_permissions", False):
            return True

        if hasattr(view, "action"):
            if (
                request.accepted_renderer.format == "xlsx"
                and request.method.lower() == "get"
            ):
                action_name = "export"
            else:
                action_name = view.action

            model_cls = self._get_model_cls(view)
            perms = self.get_action_required_permissions(
                request.method, model_cls, action_name
            )
            return request.user.has_perms(perms)
        else:
            Warning("Use ViewSet as BaseClass")
            return settings.REST_DRF_ACTION_PERMISSION_DEFAULT

    def has_permission(self, request, view):
        if not request.user or (
            not request.user.is_authenticated and self.authenticated_users_only
        ):
            return False
        return self.has_action_permission(request, view)
# add permission code

from importlib import import_module
from itertools import chain

from django.apps import apps
from django.contrib.auth.models import Permission
from django.contrib.contenttypes.models import ContentType
from django.core.management.base import BaseCommand
from django.utils.module_loading import module_has_submodule
from rest_framework.viewsets import ViewSetMixin

detail_action_mapping = [
    {"get": "retrieve", "put": "update", "patch": "partial_update", "delete": "destroy"}
]

list_action_mapping = [{"get": "list", "post": "create"}]

export_action_mapping = [{"get": "export"}]


class Command(BaseCommand):
    help = "add_action_permission"

    def _get_moduleviews(self):
        moduleviews = set()
        for app in apps.get_app_configs():
            if module_has_submodule(app.module, "views"):
                views_module_name = "%s.%s" % (app.name, "views")
                views_module = import_module(views_module_name)
                for obj_str in dir(views_module):
                    if obj_str.startswith("_"):
                        continue
                    if obj_str in ("mro",):
                        continue
                    obj = getattr(views_module, obj_str)
                    if isinstance(obj, type):
                        try:
                            if ViewSetMixin in obj.mro():
                                moduleviews.add(obj)
                        except TypeError:
                            pass
        return moduleviews

    def _get_model_cls(self, moduleview):
        if hasattr(moduleview, "permission_object"):
            model_cls = moduleview.permission_object

        else:
            if hasattr(moduleview, "queryset") and moduleview.queryset is not None:
                model_cls = moduleview.queryset.model
            else:
                self.stdout.write(
                    "%s no queryset, no permission_object, can not create permission objects auto"
                    % moduleview.__name__
                )
                model_cls = None

        return model_cls

    def handle(self, *args, **kwargs):
        moduleviews = self._get_moduleviews()
        permissions = []
        for moduleview in moduleviews:
            model_cls = self._get_model_cls(moduleview)
            if not model_cls:
                continue
            extra_actions = moduleview.get_extra_actions()

            for action in chain(
                [_action.mapping for _action in extra_actions],
                detail_action_mapping,
                list_action_mapping,
                export_action_mapping,
            ):
                for method, action_name in action.items():
                    kwargs = {
                        "app_label": model_cls._meta.app_label,
                        "model_name": model_cls._meta.model_name,
                        "action_name": action_name,
                        "method": method,
                    }
                    permission_name = (
                        "action_%(method)s_%(model_name)s__%(action_name)s" % kwargs
                    )
                    permissions.append(
                        {
                            "app_label": kwargs["app_label"],
                            "model_name": kwargs["model_name"],
                            "permission_name": permission_name,
                        }
                    )

            if hasattr(moduleview, "extra_permissions"):
                for permission in moduleview.extra_permissions:
                    permissions.append(
                        {
                            "app_label": model_cls._meta.app_label,
                            "model_name": model_cls._meta.model_name,
                            "permission_name": permission,
                        }
                    )

        def get_permission_name(permission_code):
            name_mapping = {
                "retrieve": "查询明细",
                "update": "修改",
                "partial_update": "部分修改",
                "destroy": "删除",
                "list": "查询列表",
                "create": "创建",
                "export": "批量导出",
            }
            try:
                action = permission_code.split("__")[1]
                module = permission_code.split("__")[0].split("_")[2]
            except:
                self.stdout.write(
                    "permission_code %s, name invalid, use directly" % permission_code
                )
                return permission_code

            return "%s:%s" % (module, name_mapping.get(action, action))

        # Permission.objects.filter(name__contains="导出").delete()
        for permission in permissions:
            ct = ContentType.objects.get_by_natural_key(
                permission["app_label"], permission["model_name"]
            )
            if not Permission.objects.filter(
                content_type=ct, codename=permission["permission_name"]
            ).exists():
                self.stdout.write(f"Add {permission}")
                Permission.objects.create(
                    name=get_permission_name(permission["permission_name"]),
                    content_type=ct,
                    codename=permission["permission_name"],
                )

reference

  • todo, 增加git

links

social