DRF 默认权限设定无法对每个接口进行明细的权限设定, 对其增加DrfActionPermission, 利用ViewSet action,为所有接口设定权限。
# settings.py
REST_FRAMEWORK = {
"DEFAULT_PERMISSION_CLASSES": (
"rest_framework.permissions.IsAuthenticated",
"XXX.contrib.drf.permissions.DrfActionPermission"
)
}
# 非ModelViewSet 类,是否放行
REST_DRF_ACTION_PERMISSION_DEFAULT = False
class DrfActionPermission(permissions.BasePermission):
authenticated_users_only = True
action_perms_map = {
"GET": ["%(app_label)s.action_get_%(model_name)s__%(action_name)s"],
"OPTIONS": [],
"HEAD": [],
"POST": ["%(app_label)s.action_post_%(model_name)s__%(action_name)s"],
"PUT": ["%(app_label)s.action_put_%(model_name)s__%(action_name)s"],
"PATCH": ["%(app_label)s.action_patch_%(model_name)s__%(action_name)s"],
"DELETE": ["%(app_label)s.action_delete_%(model_name)s__%(action_name)s"],
}
def _queryset(self, view):
assert (
hasattr(view, "get_queryset") or getattr(view, "queryset", None) is not None
), (
"Cannot apply {} on a view that does not set "
"`.queryset` or have a `.get_queryset()` method."
).format(
self.__class__.__name__
)
if hasattr(view, "get_queryset"):
queryset = view.get_queryset()
assert queryset is not None, "{}.get_queryset() returned None".format(
view.__class__.__name__
)
return queryset
return view.queryset
def _get_model_cls(self, view):
if hasattr(view, "permission_object"):
return view.permission_object
queryset = self._queryset(view)
return queryset.model
def get_action_required_permissions(self, method, model_cls, action_name):
"""
Given a model and an HTTP method, return the list of permission
codes that the user is required to have.
"""
kwargs = {
"app_label": model_cls._meta.app_label,
"model_name": model_cls._meta.model_name,
"action_name": action_name,
}
if method not in self.action_perms_map:
raise exceptions.MethodNotAllowed(method)
return [perm % kwargs for perm in self.action_perms_map[method]]
def has_action_permission(self, request, view):
if getattr(view, "_ignore_model_permissions", False):
return True
if hasattr(view, "action"):
if (
request.accepted_renderer.format == "xlsx"
and request.method.lower() == "get"
):
action_name = "export"
else:
action_name = view.action
model_cls = self._get_model_cls(view)
perms = self.get_action_required_permissions(
request.method, model_cls, action_name
)
return request.user.has_perms(perms)
else:
Warning("Use ViewSet as BaseClass")
return settings.REST_DRF_ACTION_PERMISSION_DEFAULT
def has_permission(self, request, view):
if not request.user or (
not request.user.is_authenticated and self.authenticated_users_only
):
return False
return self.has_action_permission(request, view)
# add permission code
from importlib import import_module
from itertools import chain
from django.apps import apps
from django.contrib.auth.models import Permission
from django.contrib.contenttypes.models import ContentType
from django.core.management.base import BaseCommand
from django.utils.module_loading import module_has_submodule
from rest_framework.viewsets import ViewSetMixin
detail_action_mapping = [
{"get": "retrieve", "put": "update", "patch": "partial_update", "delete": "destroy"}
]
list_action_mapping = [{"get": "list", "post": "create"}]
export_action_mapping = [{"get": "export"}]
class Command(BaseCommand):
help = "add_action_permission"
def _get_moduleviews(self):
moduleviews = set()
for app in apps.get_app_configs():
if module_has_submodule(app.module, "views"):
views_module_name = "%s.%s" % (app.name, "views")
views_module = import_module(views_module_name)
for obj_str in dir(views_module):
if obj_str.startswith("_"):
continue
if obj_str in ("mro",):
continue
obj = getattr(views_module, obj_str)
if isinstance(obj, type):
try:
if ViewSetMixin in obj.mro():
moduleviews.add(obj)
except TypeError:
pass
return moduleviews
def _get_model_cls(self, moduleview):
if hasattr(moduleview, "permission_object"):
model_cls = moduleview.permission_object
else:
if hasattr(moduleview, "queryset") and moduleview.queryset is not None:
model_cls = moduleview.queryset.model
else:
self.stdout.write(
"%s no queryset, no permission_object, can not create permission objects auto"
% moduleview.__name__
)
model_cls = None
return model_cls
def handle(self, *args, **kwargs):
moduleviews = self._get_moduleviews()
permissions = []
for moduleview in moduleviews:
model_cls = self._get_model_cls(moduleview)
if not model_cls:
continue
extra_actions = moduleview.get_extra_actions()
for action in chain(
[_action.mapping for _action in extra_actions],
detail_action_mapping,
list_action_mapping,
export_action_mapping,
):
for method, action_name in action.items():
kwargs = {
"app_label": model_cls._meta.app_label,
"model_name": model_cls._meta.model_name,
"action_name": action_name,
"method": method,
}
permission_name = (
"action_%(method)s_%(model_name)s__%(action_name)s" % kwargs
)
permissions.append(
{
"app_label": kwargs["app_label"],
"model_name": kwargs["model_name"],
"permission_name": permission_name,
}
)
if hasattr(moduleview, "extra_permissions"):
for permission in moduleview.extra_permissions:
permissions.append(
{
"app_label": model_cls._meta.app_label,
"model_name": model_cls._meta.model_name,
"permission_name": permission,
}
)
def get_permission_name(permission_code):
name_mapping = {
"retrieve": "查询明细",
"update": "修改",
"partial_update": "部分修改",
"destroy": "删除",
"list": "查询列表",
"create": "创建",
"export": "批量导出",
}
try:
action = permission_code.split("__")[1]
module = permission_code.split("__")[0].split("_")[2]
except:
self.stdout.write(
"permission_code %s, name invalid, use directly" % permission_code
)
return permission_code
return "%s:%s" % (module, name_mapping.get(action, action))
# Permission.objects.filter(name__contains="导出").delete()
for permission in permissions:
ct = ContentType.objects.get_by_natural_key(
permission["app_label"], permission["model_name"]
)
if not Permission.objects.filter(
content_type=ct, codename=permission["permission_name"]
).exists():
self.stdout.write(f"Add {permission}")
Permission.objects.create(
name=get_permission_name(permission["permission_name"]),
content_type=ct,
codename=permission["permission_name"],
)
reference
- todo, 增加git