Skip to content

Commit

Permalink
Implement CopyPageAction and admin API for copying pages
Browse files Browse the repository at this point in the history
* Moved Page.copy implementation into an external function

* Convert copy_page function into a class

* Implement PageCopyAction.execute()

* Implement CopyPageAction.check()

To allow checking the action ahead-of-time

* Make page copy view use PageCopyAction

* Add page copy Admin API
  • Loading branch information
kaedroho authored Dec 2, 2021
1 parent 4803133 commit 195b1a1
Show file tree
Hide file tree
Showing 10 changed files with 559 additions and 190 deletions.
Empty file.
6 changes: 6 additions & 0 deletions wagtail/admin/api/actions/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class APIAction:
serializer = None

def __init__(self, view, request):
self.view = view
self.request = request
67 changes: 67 additions & 0 deletions wagtail/admin/api/actions/copy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from django.core.exceptions import ValidationError as DjangoValidationError
from django.shortcuts import get_object_or_404
from rest_framework import fields, status
from rest_framework.exceptions import ValidationError
from rest_framework.response import Response
from rest_framework.serializers import Serializer

from wagtail.api.v2.utils import BadRequestError
from wagtail.core.actions.copy_page import CopyPageAction, CopyPageIntegrityError
from wagtail.core.models import Page
from wagtail.core.utils import find_available_slug

from .base import APIAction


class CopyPageAPIActionSerializer(Serializer):
# Note: CopyPageAction will validate the destination page
destination_page_id = fields.IntegerField(required=False)
recursive = fields.BooleanField(default=False, required=False)
keep_live = fields.BooleanField(default=True, required=False)
slug = fields.CharField(required=False)
title = fields.CharField(required=False)


class CopyPageAPIAction(APIAction):
serializer = CopyPageAPIActionSerializer

def _action_from_data(self, instance, data):
destination_page_id = data.get('destination_page_id')
if destination_page_id is None:
destination = instance.get_parent()
else:
destination = get_object_or_404(Page, id=destination_page_id)

update_attrs = {}
if 'slug' in data:
update_attrs['slug'] = data['slug']
else:
# If user didn't specify a particular slug, find an available one
available_slug = find_available_slug(destination, instance.slug)
if available_slug != instance.slug:
update_attrs['slug'] = available_slug

if 'title' in data:
update_attrs['title'] = data['title']

return CopyPageAction(
page=instance,
to=destination,
recursive=data['recursive'],
keep_live=data['keep_live'],
update_attrs=update_attrs,
user=self.request.user
)

def execute(self, instance, data):
action = self._action_from_data(instance, data)

try:
new_page = action.execute()
except DjangoValidationError as e:
raise ValidationError(e.message_dict)
except CopyPageIntegrityError as e:
raise BadRequestError(e.args[0])

serializer = self.view.get_serializer(new_page)
return Response(serializer.data, status=status.HTTP_201_CREATED)
30 changes: 30 additions & 0 deletions wagtail/admin/api/views.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from collections import OrderedDict

from django.conf import settings
from django.http import Http404
from django.urls import path
from rest_framework.authentication import SessionAuthentication

from wagtail.api.v2.views import PagesAPIViewSet
from wagtail.core.models import Page

from .actions.copy import CopyPageAPIAction
from .filters import ForExplorerFilter, HasChildrenFilter
from .serializers import AdminPageSerializer

Expand All @@ -14,6 +17,10 @@ class PagesAdminAPIViewSet(PagesAPIViewSet):
base_serializer_class = AdminPageSerializer
authentication_classes = [SessionAuthentication]

actions = {
'copy': CopyPageAPIAction,
}

# Add has_children and for_explorer filters
filter_backends = PagesAPIViewSet.filter_backends + [
HasChildrenFilter,
Expand Down Expand Up @@ -103,3 +110,26 @@ def detail_view(self, request, pk):
response = super().detail_view(request, pk)
response.data['__types'] = self.get_type_info()
return response

def action_view(self, request, pk, action_name):
instance = self.get_object()

if action_name not in self.actions:
raise Http404(f"unrecognised action '{action_name}'")

action = self.actions[action_name](self, request)
action_data = action.serializer(data=request.data)
action_data.is_valid()

return action.execute(instance, action_data.data)

@classmethod
def get_urlpatterns(cls):
"""
This returns a list of URL patterns for the endpoint
"""
urlpatterns = super().get_urlpatterns()
urlpatterns.extend([
path('<int:pk>/action/<str:action_name>/', cls.as_view({'post': 'action_view'}), name='action'),
])
return urlpatterns
179 changes: 178 additions & 1 deletion wagtail/admin/tests/api/test_pages.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import json

from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group, Permission
from django.urls import reverse
from django.utils import timezone

from wagtail.api.v2.tests.test_pages import TestPageDetail, TestPageListing
from wagtail.core import hooks
from wagtail.core.models import Locale, Page
from wagtail.core.models import GroupPagePermission, Locale, Page
from wagtail.tests.demosite import models
from wagtail.tests.testapp.models import SimplePage, StreamPage
from wagtail.users.models import UserProfile
Expand Down Expand Up @@ -807,6 +808,182 @@ def test_custom_admin_display_title_shown_on_listing(self):
self.assertEqual(matching_items[0]['admin_display_title'], "Saint Patrick (single event)")


class TestCopyPageAction(AdminAPITestCase):
fixtures = ['test.json']

def get_response(self, page_id, data):
return self.client.post(reverse('wagtailadmin_api:pages:action', args=[page_id, 'copy']), data)

def test_copy_page(self):
response = self.get_response(3, {})

self.assertEqual(response.status_code, 201)
content = json.loads(response.content.decode('utf-8'))

new_page = Page.objects.get(id=content['id'])
self.assertEqual(new_page.title, "Events")
self.assertEqual(new_page.slug, 'events-1')
self.assertTrue(new_page.live)
self.assertFalse(new_page.get_children().exists())

def test_copy_page_change_title(self):
response = self.get_response(3, {
'title': "New title"
})

self.assertEqual(response.status_code, 201)
content = json.loads(response.content.decode('utf-8'))

new_page = Page.objects.get(id=content['id'])
self.assertEqual(new_page.title, "New title")
self.assertEqual(new_page.slug, 'events-1')

def test_copy_page_change_slug(self):
response = self.get_response(3, {
'slug': "new-slug"
})

self.assertEqual(response.status_code, 201)
content = json.loads(response.content.decode('utf-8'))

new_page = Page.objects.get(id=content['id'])
self.assertEqual(new_page.slug, 'new-slug')

def test_copy_page_destination(self):
response = self.get_response(3, {
'destination_page_id': 3
})

self.assertEqual(response.status_code, 201)
content = json.loads(response.content.decode('utf-8'))

new_page = Page.objects.get(id=content['id'])
self.assertEqual(new_page.title, "Events")
self.assertTrue(new_page.live)
self.assertFalse(new_page.get_children().exists())

def test_copy_page_recursive(self):
response = self.get_response(3, {
'recursive': True,
})

self.assertEqual(response.status_code, 201)
content = json.loads(response.content.decode('utf-8'))

new_page = Page.objects.get(id=content['id'])
self.assertEqual(new_page.title, "Events")
self.assertTrue(new_page.get_children().exists())

def test_copy_page_in_draft(self):
response = self.get_response(3, {
'keep_live': False,
})

self.assertEqual(response.status_code, 201)
content = json.loads(response.content.decode('utf-8'))

new_page = Page.objects.get(id=content['id'])
self.assertEqual(new_page.title, "Events")
self.assertFalse(new_page.live)

# Check errors

def test_without_publish_permissions_at_destination_with_keep_live_false(self):
self.user.is_superuser = False
self.user.user_permissions.add(
Permission.objects.get(content_type__app_label='wagtailadmin', codename='access_admin')
)
self.user.groups.add(
Group.objects.get(name="Editors")
)
self.user.save()

response = self.get_response(3, {
'destination_page_id': 1,
'keep_live': False,
})

self.assertEqual(response.status_code, 403)
content = json.loads(response.content.decode('utf-8'))
self.assertEqual(content, {
'detail': "You do not have permission to perform this action."
})

def test_recursively_copy_into_self(self):
response = self.get_response(3, {
'destination_page_id': 3,
'recursive': True,
})

self.assertEqual(response.status_code, 400)
content = json.loads(response.content.decode('utf-8'))
self.assertEqual(content, {
'message': "You cannot copy a tree branch recursively into itself"
})

def test_without_create_permissions_at_destination(self):
self.user.is_superuser = False
self.user.user_permissions.add(
Permission.objects.get(content_type__app_label='wagtailadmin', codename='access_admin')
)
self.user.save()

response = self.get_response(3, {
'destination_page_id': 2,
})

self.assertEqual(response.status_code, 403)
content = json.loads(response.content.decode('utf-8'))
self.assertEqual(content, {
'detail': "You do not have permission to perform this action."
})

def test_without_publish_permissions_at_destination_with_keep_live(self):
self.user.is_superuser = False
self.user.user_permissions.add(
Permission.objects.get(content_type__app_label='wagtailadmin', codename='access_admin')
)
self.user.groups.add(
Group.objects.get(name="Editors")
)
self.user.save()

GroupPagePermission.objects.create(
group=Group.objects.get(name="Editors"),
page_id=2,
permission_type='add'
)

response = self.get_response(3, {
'destination_page_id': 2,
})

self.assertEqual(response.status_code, 403)
content = json.loads(response.content.decode('utf-8'))
self.assertEqual(content, {
'detail': "You do not have permission to perform this action."
})

def test_respects_page_creation_rules(self):
# Only one homepage may exist
response = self.get_response(2, {})

self.assertEqual(response.status_code, 403)
content = json.loads(response.content.decode('utf-8'))
self.assertEqual(content, {
'detail': "You do not have permission to perform this action."
})

def test_copy_page_slug_in_use(self):
response = self.get_response(3, {
'slug': 'events',
})

self.assertEqual(response.status_code, 400)
content = json.loads(response.content.decode('utf-8'))
self.assertEqual(content, {'slug': ['This slug is already in use']})


# Overwrite imported test cases do Django doesn't run them
TestPageDetail = None
TestPageListing = None
2 changes: 1 addition & 1 deletion wagtail/admin/tests/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@

class AdminAPITestCase(TestCase, WagtailTestUtils):
def setUp(self):
self.login()
self.user = self.login()
10 changes: 4 additions & 6 deletions wagtail/admin/views/pages/copy.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from django.core.exceptions import PermissionDenied
from django.shortcuts import redirect
from django.template.response import TemplateResponse
from django.utils.translation import gettext as _
Expand All @@ -8,6 +7,7 @@
from wagtail.admin.forms.pages import CopyForm
from wagtail.admin.views.pages.utils import get_valid_next_url_from_request
from wagtail.core import hooks
from wagtail.core.actions.copy_page import CopyPageAction
from wagtail.core.models import Page


Expand Down Expand Up @@ -42,10 +42,6 @@ def copy(request, page_id):
if form.cleaned_data['new_parent_page']:
parent_page = form.cleaned_data['new_parent_page']

if not page.permissions_for_user(request.user).can_copy_to(parent_page,
form.cleaned_data.get('copy_subpages')):
raise PermissionDenied

# Re-check if the user has permission to publish subpages on the new parent
can_publish = parent_page.permissions_for_user(request.user).can_publish_subpage()
keep_live = can_publish and form.cleaned_data.get('publish_copies')
Expand All @@ -61,7 +57,8 @@ def copy(request, page_id):
user=request.user,
)
else:
new_page = page.specific.copy(
action = CopyPageAction(
page=page,
recursive=form.cleaned_data.get('copy_subpages'),
to=parent_page,
update_attrs={
Expand All @@ -71,6 +68,7 @@ def copy(request, page_id):
keep_live=keep_live,
user=request.user,
)
new_page = action.execute()

# Give a success message back to the user
if form.cleaned_data.get('copy_subpages'):
Expand Down
Empty file.
Loading

0 comments on commit 195b1a1

Please sign in to comment.