From 195b1a178a4f4b57eaf316a6e582e8cf3e832782 Mon Sep 17 00:00:00 2001 From: Karl Hobley Date: Thu, 2 Dec 2021 13:50:28 +0000 Subject: [PATCH] Implement CopyPageAction and admin API for copying pages * Moved Page.copy implementation into an external function * Convert copy_page function into a class * Implement PageCopyAction.execute() * Implement CopyPageAction.check() To allow checking the action ahead-of-time * Make page copy view use PageCopyAction * Add page copy Admin API --- wagtail/admin/api/actions/__init__.py | 0 wagtail/admin/api/actions/base.py | 6 + wagtail/admin/api/actions/copy.py | 67 +++++++ wagtail/admin/api/views.py | 30 +++ wagtail/admin/tests/api/test_pages.py | 179 +++++++++++++++++- wagtail/admin/tests/api/utils.py | 2 +- wagtail/admin/views/pages/copy.py | 10 +- wagtail/core/actions/__init__.py | 0 wagtail/core/actions/copy_page.py | 257 ++++++++++++++++++++++++++ wagtail/core/models/__init__.py | 198 ++------------------ 10 files changed, 559 insertions(+), 190 deletions(-) create mode 100644 wagtail/admin/api/actions/__init__.py create mode 100644 wagtail/admin/api/actions/base.py create mode 100644 wagtail/admin/api/actions/copy.py create mode 100644 wagtail/core/actions/__init__.py create mode 100644 wagtail/core/actions/copy_page.py diff --git a/wagtail/admin/api/actions/__init__.py b/wagtail/admin/api/actions/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/wagtail/admin/api/actions/base.py b/wagtail/admin/api/actions/base.py new file mode 100644 index 000000000000..5bdfc268e830 --- /dev/null +++ b/wagtail/admin/api/actions/base.py @@ -0,0 +1,6 @@ +class APIAction: + serializer = None + + def __init__(self, view, request): + self.view = view + self.request = request diff --git a/wagtail/admin/api/actions/copy.py b/wagtail/admin/api/actions/copy.py new file mode 100644 index 000000000000..f40ef2da06b0 --- /dev/null +++ b/wagtail/admin/api/actions/copy.py @@ -0,0 +1,67 @@ +from django.core.exceptions import ValidationError as DjangoValidationError +from django.shortcuts import get_object_or_404 +from rest_framework import fields, status +from rest_framework.exceptions import ValidationError +from rest_framework.response import Response +from rest_framework.serializers import Serializer + +from wagtail.api.v2.utils import BadRequestError +from wagtail.core.actions.copy_page import CopyPageAction, CopyPageIntegrityError +from wagtail.core.models import Page +from wagtail.core.utils import find_available_slug + +from .base import APIAction + + +class CopyPageAPIActionSerializer(Serializer): + # Note: CopyPageAction will validate the destination page + destination_page_id = fields.IntegerField(required=False) + recursive = fields.BooleanField(default=False, required=False) + keep_live = fields.BooleanField(default=True, required=False) + slug = fields.CharField(required=False) + title = fields.CharField(required=False) + + +class CopyPageAPIAction(APIAction): + serializer = CopyPageAPIActionSerializer + + def _action_from_data(self, instance, data): + destination_page_id = data.get('destination_page_id') + if destination_page_id is None: + destination = instance.get_parent() + else: + destination = get_object_or_404(Page, id=destination_page_id) + + update_attrs = {} + if 'slug' in data: + update_attrs['slug'] = data['slug'] + else: + # If user didn't specify a particular slug, find an available one + available_slug = find_available_slug(destination, instance.slug) + if available_slug != instance.slug: + update_attrs['slug'] = available_slug + + if 'title' in data: + update_attrs['title'] = data['title'] + + return CopyPageAction( + page=instance, + to=destination, + recursive=data['recursive'], + keep_live=data['keep_live'], + update_attrs=update_attrs, + user=self.request.user + ) + + def execute(self, instance, data): + action = self._action_from_data(instance, data) + + try: + new_page = action.execute() + except DjangoValidationError as e: + raise ValidationError(e.message_dict) + except CopyPageIntegrityError as e: + raise BadRequestError(e.args[0]) + + serializer = self.view.get_serializer(new_page) + return Response(serializer.data, status=status.HTTP_201_CREATED) diff --git a/wagtail/admin/api/views.py b/wagtail/admin/api/views.py index 9ffe6d26662c..576252e4fb93 100644 --- a/wagtail/admin/api/views.py +++ b/wagtail/admin/api/views.py @@ -1,11 +1,14 @@ from collections import OrderedDict from django.conf import settings +from django.http import Http404 +from django.urls import path from rest_framework.authentication import SessionAuthentication from wagtail.api.v2.views import PagesAPIViewSet from wagtail.core.models import Page +from .actions.copy import CopyPageAPIAction from .filters import ForExplorerFilter, HasChildrenFilter from .serializers import AdminPageSerializer @@ -14,6 +17,10 @@ class PagesAdminAPIViewSet(PagesAPIViewSet): base_serializer_class = AdminPageSerializer authentication_classes = [SessionAuthentication] + actions = { + 'copy': CopyPageAPIAction, + } + # Add has_children and for_explorer filters filter_backends = PagesAPIViewSet.filter_backends + [ HasChildrenFilter, @@ -103,3 +110,26 @@ def detail_view(self, request, pk): response = super().detail_view(request, pk) response.data['__types'] = self.get_type_info() return response + + def action_view(self, request, pk, action_name): + instance = self.get_object() + + if action_name not in self.actions: + raise Http404(f"unrecognised action '{action_name}'") + + action = self.actions[action_name](self, request) + action_data = action.serializer(data=request.data) + action_data.is_valid() + + return action.execute(instance, action_data.data) + + @classmethod + def get_urlpatterns(cls): + """ + This returns a list of URL patterns for the endpoint + """ + urlpatterns = super().get_urlpatterns() + urlpatterns.extend([ + path('/action//', cls.as_view({'post': 'action_view'}), name='action'), + ]) + return urlpatterns diff --git a/wagtail/admin/tests/api/test_pages.py b/wagtail/admin/tests/api/test_pages.py index 565ba2d3664d..468580fae9b1 100644 --- a/wagtail/admin/tests/api/test_pages.py +++ b/wagtail/admin/tests/api/test_pages.py @@ -3,12 +3,13 @@ import json from django.contrib.auth import get_user_model +from django.contrib.auth.models import Group, Permission from django.urls import reverse from django.utils import timezone from wagtail.api.v2.tests.test_pages import TestPageDetail, TestPageListing from wagtail.core import hooks -from wagtail.core.models import Locale, Page +from wagtail.core.models import GroupPagePermission, Locale, Page from wagtail.tests.demosite import models from wagtail.tests.testapp.models import SimplePage, StreamPage from wagtail.users.models import UserProfile @@ -807,6 +808,182 @@ def test_custom_admin_display_title_shown_on_listing(self): self.assertEqual(matching_items[0]['admin_display_title'], "Saint Patrick (single event)") +class TestCopyPageAction(AdminAPITestCase): + fixtures = ['test.json'] + + def get_response(self, page_id, data): + return self.client.post(reverse('wagtailadmin_api:pages:action', args=[page_id, 'copy']), data) + + def test_copy_page(self): + response = self.get_response(3, {}) + + self.assertEqual(response.status_code, 201) + content = json.loads(response.content.decode('utf-8')) + + new_page = Page.objects.get(id=content['id']) + self.assertEqual(new_page.title, "Events") + self.assertEqual(new_page.slug, 'events-1') + self.assertTrue(new_page.live) + self.assertFalse(new_page.get_children().exists()) + + def test_copy_page_change_title(self): + response = self.get_response(3, { + 'title': "New title" + }) + + self.assertEqual(response.status_code, 201) + content = json.loads(response.content.decode('utf-8')) + + new_page = Page.objects.get(id=content['id']) + self.assertEqual(new_page.title, "New title") + self.assertEqual(new_page.slug, 'events-1') + + def test_copy_page_change_slug(self): + response = self.get_response(3, { + 'slug': "new-slug" + }) + + self.assertEqual(response.status_code, 201) + content = json.loads(response.content.decode('utf-8')) + + new_page = Page.objects.get(id=content['id']) + self.assertEqual(new_page.slug, 'new-slug') + + def test_copy_page_destination(self): + response = self.get_response(3, { + 'destination_page_id': 3 + }) + + self.assertEqual(response.status_code, 201) + content = json.loads(response.content.decode('utf-8')) + + new_page = Page.objects.get(id=content['id']) + self.assertEqual(new_page.title, "Events") + self.assertTrue(new_page.live) + self.assertFalse(new_page.get_children().exists()) + + def test_copy_page_recursive(self): + response = self.get_response(3, { + 'recursive': True, + }) + + self.assertEqual(response.status_code, 201) + content = json.loads(response.content.decode('utf-8')) + + new_page = Page.objects.get(id=content['id']) + self.assertEqual(new_page.title, "Events") + self.assertTrue(new_page.get_children().exists()) + + def test_copy_page_in_draft(self): + response = self.get_response(3, { + 'keep_live': False, + }) + + self.assertEqual(response.status_code, 201) + content = json.loads(response.content.decode('utf-8')) + + new_page = Page.objects.get(id=content['id']) + self.assertEqual(new_page.title, "Events") + self.assertFalse(new_page.live) + + # Check errors + + def test_without_publish_permissions_at_destination_with_keep_live_false(self): + self.user.is_superuser = False + self.user.user_permissions.add( + Permission.objects.get(content_type__app_label='wagtailadmin', codename='access_admin') + ) + self.user.groups.add( + Group.objects.get(name="Editors") + ) + self.user.save() + + response = self.get_response(3, { + 'destination_page_id': 1, + 'keep_live': False, + }) + + self.assertEqual(response.status_code, 403) + content = json.loads(response.content.decode('utf-8')) + self.assertEqual(content, { + 'detail': "You do not have permission to perform this action." + }) + + def test_recursively_copy_into_self(self): + response = self.get_response(3, { + 'destination_page_id': 3, + 'recursive': True, + }) + + self.assertEqual(response.status_code, 400) + content = json.loads(response.content.decode('utf-8')) + self.assertEqual(content, { + 'message': "You cannot copy a tree branch recursively into itself" + }) + + def test_without_create_permissions_at_destination(self): + self.user.is_superuser = False + self.user.user_permissions.add( + Permission.objects.get(content_type__app_label='wagtailadmin', codename='access_admin') + ) + self.user.save() + + response = self.get_response(3, { + 'destination_page_id': 2, + }) + + self.assertEqual(response.status_code, 403) + content = json.loads(response.content.decode('utf-8')) + self.assertEqual(content, { + 'detail': "You do not have permission to perform this action." + }) + + def test_without_publish_permissions_at_destination_with_keep_live(self): + self.user.is_superuser = False + self.user.user_permissions.add( + Permission.objects.get(content_type__app_label='wagtailadmin', codename='access_admin') + ) + self.user.groups.add( + Group.objects.get(name="Editors") + ) + self.user.save() + + GroupPagePermission.objects.create( + group=Group.objects.get(name="Editors"), + page_id=2, + permission_type='add' + ) + + response = self.get_response(3, { + 'destination_page_id': 2, + }) + + self.assertEqual(response.status_code, 403) + content = json.loads(response.content.decode('utf-8')) + self.assertEqual(content, { + 'detail': "You do not have permission to perform this action." + }) + + def test_respects_page_creation_rules(self): + # Only one homepage may exist + response = self.get_response(2, {}) + + self.assertEqual(response.status_code, 403) + content = json.loads(response.content.decode('utf-8')) + self.assertEqual(content, { + 'detail': "You do not have permission to perform this action." + }) + + def test_copy_page_slug_in_use(self): + response = self.get_response(3, { + 'slug': 'events', + }) + + self.assertEqual(response.status_code, 400) + content = json.loads(response.content.decode('utf-8')) + self.assertEqual(content, {'slug': ['This slug is already in use']}) + + # Overwrite imported test cases do Django doesn't run them TestPageDetail = None TestPageListing = None diff --git a/wagtail/admin/tests/api/utils.py b/wagtail/admin/tests/api/utils.py index 4709e261c71c..730d7c5e33be 100644 --- a/wagtail/admin/tests/api/utils.py +++ b/wagtail/admin/tests/api/utils.py @@ -5,4 +5,4 @@ class AdminAPITestCase(TestCase, WagtailTestUtils): def setUp(self): - self.login() + self.user = self.login() diff --git a/wagtail/admin/views/pages/copy.py b/wagtail/admin/views/pages/copy.py index 7fadc25e8cf1..bd0023497f1e 100644 --- a/wagtail/admin/views/pages/copy.py +++ b/wagtail/admin/views/pages/copy.py @@ -1,4 +1,3 @@ -from django.core.exceptions import PermissionDenied from django.shortcuts import redirect from django.template.response import TemplateResponse from django.utils.translation import gettext as _ @@ -8,6 +7,7 @@ from wagtail.admin.forms.pages import CopyForm from wagtail.admin.views.pages.utils import get_valid_next_url_from_request from wagtail.core import hooks +from wagtail.core.actions.copy_page import CopyPageAction from wagtail.core.models import Page @@ -42,10 +42,6 @@ def copy(request, page_id): if form.cleaned_data['new_parent_page']: parent_page = form.cleaned_data['new_parent_page'] - if not page.permissions_for_user(request.user).can_copy_to(parent_page, - form.cleaned_data.get('copy_subpages')): - raise PermissionDenied - # Re-check if the user has permission to publish subpages on the new parent can_publish = parent_page.permissions_for_user(request.user).can_publish_subpage() keep_live = can_publish and form.cleaned_data.get('publish_copies') @@ -61,7 +57,8 @@ def copy(request, page_id): user=request.user, ) else: - new_page = page.specific.copy( + action = CopyPageAction( + page=page, recursive=form.cleaned_data.get('copy_subpages'), to=parent_page, update_attrs={ @@ -71,6 +68,7 @@ def copy(request, page_id): keep_live=keep_live, user=request.user, ) + new_page = action.execute() # Give a success message back to the user if form.cleaned_data.get('copy_subpages'): diff --git a/wagtail/core/actions/__init__.py b/wagtail/core/actions/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/wagtail/core/actions/copy_page.py b/wagtail/core/actions/copy_page.py new file mode 100644 index 000000000000..23751483b541 --- /dev/null +++ b/wagtail/core/actions/copy_page.py @@ -0,0 +1,257 @@ +import json +import logging +import uuid + +from django.core.exceptions import PermissionDenied +from modelcluster.models import get_all_child_relations + +from wagtail.core.log_actions import log +from wagtail.core.models.copying import _copy, _copy_m2m_relations +from wagtail.core.models.i18n import TranslatableMixin +from wagtail.core.signals import page_published + + +logger = logging.getLogger('wagtail.core') + + +class CopyPageIntegrityError(RuntimeError): + """ + Raised when the page copy cannot be performed for data integrity reasons. + """ + pass + + +class CopyPagePermissionError(PermissionDenied): + """ + Raised when the page copy cannot be performed due to insufficient permissions. + """ + pass + + +class CopyPageAction: + """ + Copies pages and page trees. + """ + def __init__(self, page, to=None, update_attrs=None, exclude_fields=None, recursive=False, copy_revisions=True, keep_live=True, user=None, process_child_object=None, log_action='wagtail.copy', reset_translation_key=True): + # Note: These four parameters don't apply to any copied children + self.page = page + self.to = to + self.update_attrs = update_attrs + self.exclude_fields = exclude_fields + + self.recursive = recursive + self.copy_revisions = copy_revisions + self.keep_live = keep_live + self.user = user + self.process_child_object = process_child_object + self.log_action = log_action + self.reset_translation_key = reset_translation_key + + def check(self, skip_permission_checks=False): + from wagtail.core.models import UserPagePermissionsProxy + + # Essential data model checks + if self.page._state.adding: + raise CopyPageIntegrityError('Page.copy() called on an unsaved page') + + if self.to and self.recursive and (self.to.id == self.page.id or self.to.is_descendant_of(self.page)): + raise CopyPageIntegrityError("You cannot copy a tree branch recursively into itself") + + # Permission checks + if self.user and not skip_permission_checks: + to = self.to + if to is None: + to = self.page.get_parent() + + if not self.page.permissions_for_user(self.user).can_copy_to(to, self.recursive): + raise CopyPagePermissionError("You do not have permission to copy this page") + + if self.keep_live: + destination_perms = UserPagePermissionsProxy(self.user).for_page(self.to) + + if not destination_perms.can_publish_subpage(): + raise CopyPagePermissionError("You do not have permission publish a page at the destination") + + def _copy_page(self, page, to=None, update_attrs=None, exclude_fields=None, _mpnode_attrs=None): + exclude_fields = page.default_exclude_fields_in_copy + page.exclude_fields_in_copy + (exclude_fields or []) + specific_page = page.specific + if self.keep_live: + base_update_attrs = { + 'alias_of': None, + } + else: + base_update_attrs = { + 'live': False, + 'has_unpublished_changes': True, + 'live_revision': None, + 'first_published_at': None, + 'last_published_at': None, + 'alias_of': None, + } + + if self.user: + base_update_attrs['owner'] = self.user + + # When we're not copying for translation, we should give the translation_key a new value + if self.reset_translation_key: + base_update_attrs['translation_key'] = uuid.uuid4() + + if update_attrs: + base_update_attrs.update(update_attrs) + + page_copy, child_object_map = _copy(specific_page, exclude_fields=exclude_fields, update_attrs=base_update_attrs) + + # Save copied child objects and run process_child_object on them if we need to + for (child_relation, old_pk), child_object in child_object_map.items(): + if self.process_child_object: + self.process_child_object(specific_page, page_copy, child_relation, child_object) + + # When we're not copying for translation, we should give the translation_key a new value for each child object as well + if self.reset_translation_key and isinstance(child_object, TranslatableMixin): + child_object.translation_key = uuid.uuid4() + + # Save the new page + if _mpnode_attrs: + # We've got a tree position already reserved. Perform a quick save + page_copy.path = _mpnode_attrs[0] + page_copy.depth = _mpnode_attrs[1] + page_copy.save(clean=False) + + else: + if to: + page_copy = to.add_child(instance=page_copy) + else: + page_copy = page.add_sibling(instance=page_copy) + + _mpnode_attrs = (page_copy.path, page_copy.depth) + + _copy_m2m_relations(specific_page, page_copy, exclude_fields=exclude_fields, update_attrs=base_update_attrs) + + # Copy revisions + if self.copy_revisions: + for revision in page.revisions.all(): + revision.pk = None + revision.submitted_for_moderation = False + revision.approved_go_live_at = None + revision.page = page_copy + + # Update ID fields in content + revision_content = json.loads(revision.content_json) + revision_content['pk'] = page_copy.pk + + for child_relation in get_all_child_relations(specific_page): + accessor_name = child_relation.get_accessor_name() + try: + child_objects = revision_content[accessor_name] + except KeyError: + # KeyErrors are possible if the revision was created + # before this child relation was added to the database + continue + + for child_object in child_objects: + child_object[child_relation.field.name] = page_copy.pk + + # Remap primary key to copied versions + # If the primary key is not recognised (eg, the child object has been deleted from the database) + # set the primary key to None + copied_child_object = child_object_map.get((child_relation, child_object['pk'])) + child_object['pk'] = copied_child_object.pk if copied_child_object else None + + revision.content_json = json.dumps(revision_content) + + # Save + revision.save() + + # Create a new revision + # This code serves a few purposes: + # * It makes sure update_attrs gets applied to the latest revision + # * It bumps the last_revision_created_at value so the new page gets ordered as if it was just created + # * It sets the user of the new revision so it's possible to see who copied the page by looking at its history + latest_revision = page_copy.get_latest_revision_as_page() + + if update_attrs: + for field, value in update_attrs.items(): + setattr(latest_revision, field, value) + + latest_revision_as_page_revision = latest_revision.save_revision(user=self.user, changed=False, clean=False) + if self.keep_live: + page_copy.live_revision = latest_revision_as_page_revision + page_copy.last_published_at = latest_revision_as_page_revision.created_at + page_copy.first_published_at = latest_revision_as_page_revision.created_at + page_copy.save(clean=False) + + if page_copy.live: + page_published.send( + sender=page_copy.specific_class, instance=page_copy, + revision=latest_revision_as_page_revision + ) + + # Log + if self.log_action: + parent = specific_page.get_parent() + log( + instance=page_copy, + action=self.log_action, + user=self.user, + data={ + 'page': { + 'id': page_copy.id, + 'title': page_copy.get_admin_display_title(), + 'locale': { + 'id': page_copy.locale_id, + 'language_code': page_copy.locale.language_code + } + }, + 'source': {'id': parent.id, 'title': parent.specific_deferred.get_admin_display_title()} if parent else None, + 'destination': {'id': to.id, 'title': to.specific_deferred.get_admin_display_title()} if to else None, + 'keep_live': page_copy.live and self.keep_live, + 'source_locale': { + 'id': page.locale_id, + 'language_code': page.locale.language_code + } + }, + ) + if page_copy.live and self.keep_live: + # Log the publish if the use chose to keep the copied page live + log( + instance=page_copy, + action='wagtail.publish', + user=self.user, + revision=latest_revision_as_page_revision, + ) + logger.info("Page copied: \"%s\" id=%d from=%d", page_copy.title, page_copy.id, page.id) + + # Copy child pages + from wagtail.core.models import Page + + if self.recursive: + numchild = 0 + + for child_page in page.get_children().specific(): + newdepth = _mpnode_attrs[1] + 1 + child_mpnode_attrs = ( + Page._get_path(_mpnode_attrs[0], newdepth, numchild), + newdepth + ) + numchild += 1 + self._copy_page( + child_page, + to=page_copy, + _mpnode_attrs=child_mpnode_attrs + ) + + if numchild > 0: + page_copy.numchild = numchild + page_copy.save(clean=False, update_fields=['numchild']) + + return page_copy + + def execute(self, skip_permission_checks=False): + self.check(skip_permission_checks=skip_permission_checks) + + return self._copy_page( + self.page, + to=self.to, + update_attrs=self.update_attrs, + exclude_fields=self.exclude_fields + ) diff --git a/wagtail/core/models/__init__.py b/wagtail/core/models/__init__.py index 382dc62ec311..f7dbe94dd6b9 100644 --- a/wagtail/core/models/__init__.py +++ b/wagtail/core/models/__init__.py @@ -43,9 +43,10 @@ from django.utils.text import capfirst, slugify from django.utils.translation import gettext_lazy as _ from modelcluster.fields import ParentalKey -from modelcluster.models import ClusterableModel, get_all_child_relations +from modelcluster.models import ClusterableModel from treebeard.mp_tree import MP_Node +from wagtail.core.actions.copy_page import CopyPageAction from wagtail.core.fields import StreamField from wagtail.core.forms import TaskStateCommentForm from wagtail.core.log_actions import log @@ -1547,192 +1548,25 @@ def move(self, target, pos=None, user=None): logger.info("Page moved: \"%s\" id=%d path=%s", self.title, self.id, new_url_path) def copy(self, recursive=False, to=None, update_attrs=None, copy_revisions=True, keep_live=True, user=None, - process_child_object=None, exclude_fields=None, log_action='wagtail.copy', reset_translation_key=True, _mpnode_attrs=None): + process_child_object=None, exclude_fields=None, log_action='wagtail.copy', reset_translation_key=True): """ Copies a given page :param log_action flag for logging the action. Pass None to skip logging. Can be passed an action string. Defaults to 'wagtail.copy' """ - - if self._state.adding: - raise RuntimeError('Page.copy() called on an unsaved page') - - exclude_fields = self.default_exclude_fields_in_copy + self.exclude_fields_in_copy + (exclude_fields or []) - specific_self = self.specific - if keep_live: - base_update_attrs = { - 'alias_of': None, - } - else: - base_update_attrs = { - 'live': False, - 'has_unpublished_changes': True, - 'live_revision': None, - 'first_published_at': None, - 'last_published_at': None, - 'alias_of': None, - } - - if user: - base_update_attrs['owner'] = user - - # When we're not copying for translation, we should give the translation_key a new value - if reset_translation_key: - base_update_attrs['translation_key'] = uuid.uuid4() - - if update_attrs: - base_update_attrs.update(update_attrs) - - page_copy, child_object_map = _copy(specific_self, exclude_fields=exclude_fields, update_attrs=base_update_attrs) - - # Save copied child objects and run process_child_object on them if we need to - for (child_relation, old_pk), child_object in child_object_map.items(): - if process_child_object: - process_child_object(specific_self, page_copy, child_relation, child_object) - - # When we're not copying for translation, we should give the translation_key a new value for each child object as well - if reset_translation_key and isinstance(child_object, TranslatableMixin): - child_object.translation_key = uuid.uuid4() - - # Save the new page - if _mpnode_attrs: - # We've got a tree position already reserved. Perform a quick save - page_copy.path = _mpnode_attrs[0] - page_copy.depth = _mpnode_attrs[1] - page_copy.save(clean=False) - - else: - if to: - if recursive and (to == self or to.is_descendant_of(self)): - raise Exception("You cannot copy a tree branch recursively into itself") - page_copy = to.add_child(instance=page_copy) - else: - page_copy = self.add_sibling(instance=page_copy) - - _mpnode_attrs = (page_copy.path, page_copy.depth) - - _copy_m2m_relations(specific_self, page_copy, exclude_fields=exclude_fields, update_attrs=base_update_attrs) - - # Copy revisions - if copy_revisions: - for revision in self.revisions.all(): - revision.pk = None - revision.submitted_for_moderation = False - revision.approved_go_live_at = None - revision.page = page_copy - - # Update ID fields in content - revision_content = json.loads(revision.content_json) - revision_content['pk'] = page_copy.pk - - for child_relation in get_all_child_relations(specific_self): - accessor_name = child_relation.get_accessor_name() - try: - child_objects = revision_content[accessor_name] - except KeyError: - # KeyErrors are possible if the revision was created - # before this child relation was added to the database - continue - - for child_object in child_objects: - child_object[child_relation.field.name] = page_copy.pk - - # Remap primary key to copied versions - # If the primary key is not recognised (eg, the child object has been deleted from the database) - # set the primary key to None - copied_child_object = child_object_map.get((child_relation, child_object['pk'])) - child_object['pk'] = copied_child_object.pk if copied_child_object else None - - revision.content_json = json.dumps(revision_content) - - # Save - revision.save() - - # Create a new revision - # This code serves a few purposes: - # * It makes sure update_attrs gets applied to the latest revision - # * It bumps the last_revision_created_at value so the new page gets ordered as if it was just created - # * It sets the user of the new revision so it's possible to see who copied the page by looking at its history - latest_revision = page_copy.get_latest_revision_as_page() - - if update_attrs: - for field, value in update_attrs.items(): - setattr(latest_revision, field, value) - - latest_revision_as_page_revision = latest_revision.save_revision(user=user, changed=False, clean=False) - if keep_live: - page_copy.live_revision = latest_revision_as_page_revision - page_copy.last_published_at = latest_revision_as_page_revision.created_at - page_copy.first_published_at = latest_revision_as_page_revision.created_at - page_copy.save(clean=False) - - if page_copy.live: - page_published.send( - sender=page_copy.specific_class, instance=page_copy, - revision=latest_revision_as_page_revision - ) - - # Log - if log_action: - parent = specific_self.get_parent() - log( - instance=page_copy, - action=log_action, - user=user, - data={ - 'page': { - 'id': page_copy.id, - 'title': page_copy.get_admin_display_title(), - 'locale': { - 'id': page_copy.locale_id, - 'language_code': page_copy.locale.language_code - } - }, - 'source': {'id': parent.id, 'title': parent.specific_deferred.get_admin_display_title()} if parent else None, - 'destination': {'id': to.id, 'title': to.specific_deferred.get_admin_display_title()} if to else None, - 'keep_live': page_copy.live and keep_live, - 'source_locale': { - 'id': self.locale_id, - 'language_code': self.locale.language_code - } - }, - ) - if page_copy.live and keep_live: - # Log the publish if the use chose to keep the copied page live - log( - instance=page_copy, - action='wagtail.publish', - user=user, - revision=latest_revision_as_page_revision, - ) - logger.info("Page copied: \"%s\" id=%d from=%d", page_copy.title, page_copy.id, self.id) - - # Copy child pages - if recursive: - numchild = 0 - - for child_page in self.get_children().specific(): - newdepth = _mpnode_attrs[1] + 1 - child_mpnode_attrs = ( - Page._get_path(_mpnode_attrs[0], newdepth, numchild), - newdepth - ) - numchild += 1 - child_page.copy( - recursive=True, - to=page_copy, - copy_revisions=copy_revisions, - keep_live=keep_live, - user=user, - process_child_object=process_child_object, - _mpnode_attrs=child_mpnode_attrs - ) - - if numchild > 0: - page_copy.numchild = numchild - page_copy.save(clean=False, update_fields=['numchild']) - - return page_copy + return CopyPageAction( + self, + to=to, + update_attrs=update_attrs, + exclude_fields=exclude_fields, + recursive=recursive, + copy_revisions=copy_revisions, + keep_live=keep_live, + user=user, + process_child_object=process_child_object, + log_action=log_action, + reset_translation_key=reset_translation_key, + ).execute(skip_permission_checks=True) copy.alters_data = True