-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
✨ (backend): add management command for video transcription
As we want to transcript the whole catalog of videos, we need to add a management command.
- Loading branch information
1 parent
c9d8dd6
commit 52157df
Showing
4 changed files
with
399 additions
and
7 deletions.
There are no files selected for viewing
54 changes: 54 additions & 0 deletions
54
src/backend/marsha/core/management/commands/transcript_video.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
"""Management command to transcript a video.""" | ||
|
||
from django.core.management import BaseCommand | ||
|
||
from marsha.core import defaults | ||
from marsha.core.models import TimedTextTrack, Video | ||
from marsha.core.utils.transcript_utils import transcript | ||
|
||
|
||
class Command(BaseCommand): | ||
"""Transcript a video.""" | ||
|
||
help = "Transcript a video" | ||
|
||
def add_arguments(self, parser): | ||
parser.add_argument("--video-id", type=str) | ||
|
||
def handle(self, *args, **options): | ||
"""Selects a video to transcript and starts the transcription job.""" | ||
video_id = options["video_id"] | ||
if video_id: | ||
try: | ||
video = Video.objects.get(id=video_id) | ||
except Video.DoesNotExist: | ||
self.stdout.write(f"No video matches the provided id: {video_id}") | ||
return | ||
|
||
if video.upload_state != defaults.READY: | ||
self.stdout.write(f"Video {video_id} is not ready") | ||
return | ||
|
||
if video.timedtexttracks.filter(mode=TimedTextTrack.TRANSCRIPT).exists(): | ||
self.stdout.write(f"Transcript already exists for video {video_id}") | ||
return | ||
else: | ||
excluded_timed_text_tracks = TimedTextTrack.objects.filter( | ||
mode=TimedTextTrack.TRANSCRIPT | ||
) | ||
video = ( | ||
Video.objects.exclude(timedtexttracks__in=excluded_timed_text_tracks) | ||
.filter(upload_state=defaults.READY) | ||
.order_by("-created_on") | ||
.first() | ||
) | ||
if not video: | ||
self.stdout.write("No video to transcript") | ||
return | ||
|
||
try: | ||
self.stdout.write(f"Try to transcript video {video.id}") | ||
transcript(video) | ||
self.stdout.write(f"Transcription job started for video {video.id}") | ||
except Exception as e: # pylint: disable=broad-except | ||
self.stderr.write(f"Error: {e}") |
196 changes: 196 additions & 0 deletions
196
src/backend/marsha/core/tests/management_commands/test_transcript_videos.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,196 @@ | ||
"""Test transcript_video command.""" | ||
|
||
from io import StringIO | ||
from unittest.mock import patch | ||
|
||
from django.core.management import call_command | ||
from django.test import TestCase | ||
|
||
from marsha.core import defaults | ||
from marsha.core.factories import TimedTextTrackFactory, VideoFactory | ||
from marsha.core.management.commands import transcript_video | ||
from marsha.core.models import TimedTextTrack | ||
|
||
|
||
@patch.object(transcript_video, "transcript") | ||
class TranscriptVideoTestCase(TestCase): | ||
""" | ||
Test case for the transcript_video command. | ||
""" | ||
|
||
def setUp(self): | ||
""" | ||
Set up the test case with videos. | ||
""" | ||
self.stdout = StringIO() | ||
|
||
def test_transcript_video_no_videos(self, mock_transcript): | ||
""" | ||
Should not call the transcript function if there is no video to transcript. | ||
""" | ||
call_command("transcript_video", stdout=self.stdout) | ||
|
||
self.assertListEqual( | ||
self.stdout.getvalue().splitlines(), | ||
["No video to transcript"], | ||
) | ||
mock_transcript.assert_not_called() | ||
|
||
def test_transcript_video_first_video(self, mock_transcript): | ||
""" | ||
Should call the transcript function with the first video to transcript. | ||
""" | ||
VideoFactory(upload_state=defaults.READY) | ||
video = VideoFactory(upload_state=defaults.READY) | ||
|
||
call_command("transcript_video", stdout=self.stdout) | ||
|
||
self.assertListEqual( | ||
self.stdout.getvalue().splitlines(), | ||
[ | ||
f"Try to transcript video {video.id}", | ||
f"Transcription job started for video {video.id}", | ||
], | ||
) | ||
mock_transcript.assert_called_once_with(video) | ||
|
||
def test_transcript_video_not_ready(self, mock_transcript): | ||
""" | ||
Should not call the transcript function if the video is not ready. | ||
""" | ||
VideoFactory(upload_state=defaults.PENDING) | ||
|
||
call_command("transcript_video", stdout=self.stdout) | ||
|
||
self.assertListEqual( | ||
self.stdout.getvalue().splitlines(), | ||
["No video to transcript"], | ||
) | ||
mock_transcript.assert_not_called() | ||
|
||
def test_transcript_video_already_transcript(self, mock_transcript): | ||
""" | ||
Should not call the transcript function if the video already has a transcript. | ||
""" | ||
TimedTextTrackFactory( | ||
video=VideoFactory(upload_state=defaults.READY), | ||
mode=TimedTextTrack.TRANSCRIPT, | ||
) | ||
|
||
call_command("transcript_video", stdout=self.stdout) | ||
|
||
self.assertListEqual( | ||
self.stdout.getvalue().splitlines(), | ||
["No video to transcript"], | ||
) | ||
mock_transcript.assert_not_called() | ||
|
||
def test_transcript_video_deleted_transcript(self, mock_transcript): | ||
""" | ||
Should call the transcript function if the video has a deleted transcript. | ||
""" | ||
timed_text_track = TimedTextTrackFactory( | ||
video=VideoFactory(upload_state=defaults.READY), | ||
mode=TimedTextTrack.TRANSCRIPT, | ||
) | ||
timed_text_track.delete() | ||
self.assertEqual(TimedTextTrack.objects.all(force_visibility=True).count(), 1) | ||
|
||
call_command("transcript_video", stdout=self.stdout) | ||
|
||
self.assertListEqual( | ||
self.stdout.getvalue().splitlines(), | ||
[ | ||
f"Try to transcript video {timed_text_track.video.id}", | ||
f"Transcription job started for video {timed_text_track.video.id}", | ||
], | ||
) | ||
mock_transcript.assert_called_once_with(timed_text_track.video) | ||
|
||
def test_transcript_video_unknown_argument(self, mock_transcript): | ||
""" | ||
Should not call the transcript function if there is no video to transcript. | ||
""" | ||
call_command("transcript_video", stdout=self.stdout, video_id=1) | ||
|
||
self.assertListEqual( | ||
self.stdout.getvalue().splitlines(), | ||
["No video matches the provided id: 1"], | ||
) | ||
mock_transcript.assert_not_called() | ||
|
||
def test_transcript_video_argument(self, mock_transcript): | ||
""" | ||
Should call the transcript function with the video to transcript. | ||
""" | ||
VideoFactory(upload_state=defaults.READY) | ||
video = VideoFactory(upload_state=defaults.READY) | ||
VideoFactory(upload_state=defaults.READY) | ||
|
||
call_command("transcript_video", stdout=self.stdout, video_id=video.id) | ||
|
||
self.assertListEqual( | ||
self.stdout.getvalue().splitlines(), | ||
[ | ||
f"Try to transcript video {video.id}", | ||
f"Transcription job started for video {video.id}", | ||
], | ||
) | ||
mock_transcript.assert_called_once_with(video) | ||
|
||
def test_transcript_video_argument_not_ready(self, mock_transcript): | ||
""" | ||
Should not call the transcript function if the video is not ready. | ||
""" | ||
video = VideoFactory(upload_state=defaults.PENDING) | ||
|
||
call_command("transcript_video", stdout=self.stdout, video_id=video.id) | ||
|
||
self.assertListEqual( | ||
self.stdout.getvalue().splitlines(), | ||
[f"Video {video.id} is not ready"], | ||
) | ||
mock_transcript.assert_not_called() | ||
|
||
def test_transcript_video_argument_already_transcript(self, mock_transcript): | ||
""" | ||
Should not call the transcript function if the video already has a transcript. | ||
""" | ||
timed_text_track = TimedTextTrackFactory( | ||
video=VideoFactory(upload_state=defaults.READY), | ||
mode=TimedTextTrack.TRANSCRIPT, | ||
) | ||
|
||
call_command( | ||
"transcript_video", stdout=self.stdout, video_id=timed_text_track.video.id | ||
) | ||
|
||
self.assertListEqual( | ||
self.stdout.getvalue().splitlines(), | ||
[f"Transcript already exists for video {timed_text_track.video.id}"], | ||
) | ||
mock_transcript.assert_not_called() | ||
|
||
def test_transcript_video_argument_deleted_transcript(self, mock_transcript): | ||
""" | ||
Should call the transcript function if the video has a deleted transcript. | ||
""" | ||
timed_text_track = TimedTextTrackFactory( | ||
video=VideoFactory(upload_state=defaults.READY), | ||
mode=TimedTextTrack.TRANSCRIPT, | ||
) | ||
timed_text_track.delete() | ||
self.assertEqual(TimedTextTrack.objects.all(force_visibility=True).count(), 1) | ||
|
||
call_command( | ||
"transcript_video", stdout=self.stdout, video_id=timed_text_track.video.id | ||
) | ||
|
||
self.assertListEqual( | ||
self.stdout.getvalue().splitlines(), | ||
[ | ||
f"Try to transcript video {timed_text_track.video.id}", | ||
f"Transcription job started for video {timed_text_track.video.id}", | ||
], | ||
) | ||
mock_transcript.assert_called_once_with(timed_text_track.video) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.