diff --git a/videogrep/videogrep.py b/videogrep/videogrep.py index b64262f..40235f8 100644 --- a/videogrep/videogrep.py +++ b/videogrep/videogrep.py @@ -4,11 +4,17 @@ import re import gc import time +import mimetypes from . import vtt, srt, sphinx, fcpxml from pathlib import Path from typing import Optional, List, Union, Iterator -from moviepy.editor import VideoFileClip, concatenate_videoclips +from moviepy.editor import ( + VideoFileClip, + AudioFileClip, + concatenate_videoclips, + concatenate_audioclips +) BATCH_SIZE = 20 SUB_EXTS = [".json", ".vtt", ".srt", ".transcript"] @@ -285,6 +291,32 @@ def search( return all_segments +def get_input_type(composition: List[dict]): + """ + Get input type of files ('audio' or 'video') for inputs based on the + IANA Media Type, aka MIME type. + + :param composition List[dict]: List of timestamps in the format [{start, end, file}] + """ + mimetypes.init() + filenames = set([c["file"] for c in composition]) + types = [] + + for f in filenames: + type = mimetypes.guess_type(f)[0] + if type != None: + types.append(type.split("/")[0]) + else: + types.append("unknown") + + if "audio" in types: + input_type = "audio" + else: + input_type = "video" + + return input_type + + def create_supercut(composition: List[dict], outputfile: str): """ Concatenate video clips together. @@ -292,29 +324,58 @@ def create_supercut(composition: List[dict], outputfile: str): :param composition List[dict]: List of timestamps in the format [{start, end, file}] :param outputfile str: Path to save the video to """ - print("[+] Creating clips.") + input_type = get_input_type(composition) all_filenames = set([c["file"] for c in composition]) - videofileclips = dict([(f, VideoFileClip(f)) for f in all_filenames]) - cut_clips = [] - for c in composition: - if c["start"] < 0: - c["start"] = 0 - if c["end"] > videofileclips[c["file"]].duration: - c["end"] = videofileclips[c["file"]].duration - cut_clips.append(videofileclips[c["file"]].subclip(c["start"], c["end"])) - print("[+] Concatenating clips.") - final_clip = concatenate_videoclips(cut_clips, method="compose") + if input_type == "video": + print("[+] Creating clips.") + videofileclips = dict([(f, VideoFileClip(f)) for f in all_filenames]) + cut_clips = [] + for c in composition: + if c["start"] < 0: + c["start"] = 0 + if c["end"] > videofileclips[c["file"]].duration: + c["end"] = videofileclips[c["file"]].duration + cut_clips.append(videofileclips[c["file"]].subclip(c["start"], c["end"])) + + print("[+] Concatenating clips.") + final_clip = concatenate_videoclips(cut_clips, method="compose") + + print("[+] Writing ouput file.") + final_clip.write_videofile( + outputfile, + codec="libx264", + temp_audiofile=f"{outputfile}_temp-audio{time.time()}.m4a", + remove_temp=True, + audio_codec="aac", + ) + elif input_type == "audio": + print("[+] Creating clips.") + audiofileclips = dict([(f, AudioFileClip(f)) for f in all_filenames]) + cut_clips = [] + + for c in composition: + if c["start"] < 0: + c["start"] = 0 + if c["end"] > audiofileclips[c["file"]].duration: + c["end"] = audiofileclips[c["file"]].duration + cut_clips.append(audiofileclips[c["file"]].subclip(c["start"], c["end"])) + + print("[+] Concatenating clips.") + final_clip = concatenate_audioclips(cut_clips) - print("[+] Writing ouput file.") - final_clip.write_videofile( - outputfile, - codec="libx264", - temp_audiofile=f"{outputfile}_temp-audio{time.time()}.m4a", - remove_temp=True, - audio_codec="aac", - ) + print("[+] Writing output file.") + if outputfile == "supercut.mp4": + outputfile = "supercut.mp3" + + # we don't currently use this, but may be useful for certain libraries + outputformat = outputfile.split('.')[-1] + + final_clip.write_audiofile(outputfile) + else: + # what should happen here? + pass def create_supercut_in_batches(composition: List[dict], outputfile: str): @@ -328,8 +389,17 @@ def create_supercut_in_batches(composition: List[dict], outputfile: str): start_index = 0 end_index = BATCH_SIZE batch_comp = [] + input_type = get_input_type(composition) + if input_type == "video": + file_ext = ".mp4" + elif input_type == "audio": + file_ext = ".mp3" + else: + # what should happen here? + print(f"Unknown output file input '{input_type}'") + while start_index < total_clips: - filename = outputfile + ".tmp" + str(start_index) + ".mp4" + filename = outputfile + ".tmp" + str(start_index) + file_ext try: create_supercut(composition[start_index:end_index], filename) batch_comp.append(filename) @@ -341,15 +411,27 @@ def create_supercut_in_batches(composition: List[dict], outputfile: str): end_index += BATCH_SIZE next - clips = [VideoFileClip(filename) for filename in batch_comp] - video = concatenate_videoclips(clips, method="compose") - video.write_videofile( - outputfile, - codec="libx264", - temp_audiofile=f"{outputfile}_temp-audio{time.time()}.m4a", - remove_temp=True, - audio_codec="aac", - ) + if input_type == "video": + clips = [VideoFileClip(filename) for filename in batch_comp] + video = concatenate_videoclips(clips, method="compose") + video.write_videofile( + outputfile, + codec="libx264", + temp_audiofile=f"{outputfile}_temp-audio{time.time()}.m4a", + remove_temp=True, + audio_codec="aac", + ) + elif input_type == "audio": + if outputfile == "supercut.mp4": + outputfile == "supercut.mp3" + else: + pass + clips = [AudioFileClip(filename) for filename in batch_comp] + audio = concatenate_audioclips(clips) + audio.write_audiofile(outputfile) + else: + # what should happen here? + pass # remove partial video files for filename in batch_comp: @@ -366,27 +448,49 @@ def export_individual_clips(composition: List[dict], outputfile: str): :param outputfile str: Path to save the videos to """ + input_type = get_input_type(composition) all_filenames = set([c["file"] for c in composition]) - videofileclips = dict([(f, VideoFileClip(f)) for f in all_filenames]) - cut_clips = [] - for c in composition: - if c["start"] < 0: - c["start"] = 0 - if c["end"] > videofileclips[c["file"]].duration: - c["end"] = videofileclips[c["file"]].duration - cut_clips.append(videofileclips[c["file"]].subclip(c["start"], c["end"])) - - basename, ext = os.path.splitext(outputfile) - print("[+] Writing ouput files.") - for i, clip in enumerate(cut_clips): - clipfilename = basename + "_" + str(i).zfill(5) + ext - clip.write_videofile( - clipfilename, - codec="libx264", - temp_audiofile="{clipfilename}_temp-audio.m4a", - remove_temp=True, - audio_codec="aac", - ) + + if input_type == "video": + videofileclips = dict([(f, VideoFileClip(f)) for f in all_filenames]) + cut_clips = [] + for c in composition: + if c["start"] < 0: + c["start"] = 0 + if c["end"] > videofileclips[c["file"]].duration: + c["end"] = videofileclips[c["file"]].duration + cut_clips.append(videofileclips[c["file"]].subclip(c["start"], c["end"])) + + basename, ext = os.path.splitext(outputfile) + print("[+] Writing output files.") + for i, clip in enumerate(cut_clips): + clipfilename = basename + "_" + str(i).zfill(5) + ext + clip.write_videofile( + clipfilename, + codec="libx264", + temp_audiofile="{clipfilename}_temp-audio.m4a", + remove_temp=True, + audio_codec="aac", + ) + elif input_type == "audio": + audiofileclips = dict([(f, AudioFileClip(f)) for f in all_filenames]) + cut_clips = [] + + for c in composition: + if c["start"] < 0: + c["start"] = 0 + if c["end"] > audiofileclips[c["file"]].duration: + c["end"] = audiofileclips[c["file"]].duration + cut_clips.append(audiofileclips[c["file"]].subclip(c["start"], c["end"])) + + if outputfile == "supercut.mp4": + outputfile == "supercut.mp3" + + basename, ext = os.path.splitext(outputfile) + print("[+] Writing output files.") + for i, clip in enumerate(cut_clips): + clipfilename = basename + "_" + str(i).zfill(5) + ext + clip.write_audiofile(clipfilename) def export_m3u(composition: List[dict], outputfile: str):