Skip to content

Commit

Permalink
Add files via upload
Browse files Browse the repository at this point in the history
  • Loading branch information
botbahlul authored Jun 19, 2023
1 parent 74aff5a commit 509ca18
Showing 1 changed file with 275 additions and 9 deletions.
284 changes: 275 additions & 9 deletions autosrt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,272 @@
import shutil


VERSION = "1.4.0"
VERSION = "1.4.1"


#======================================================== ffmpeg_progress_yield ========================================================#


import re
import subprocess
from typing import Any, Callable, Iterator, List, Optional, Union


def to_ms(**kwargs: Union[float, int, str]) -> int:
hour = int(kwargs.get("hour", 0))
minute = int(kwargs.get("min", 0))
sec = int(kwargs.get("sec", 0))
ms = int(kwargs.get("ms", 0))

return (hour * 60 * 60 * 1000) + (minute * 60 * 1000) + (sec * 1000) + ms


def _probe_duration(cmd: List[str]) -> Optional[int]:
'''
Get the duration via ffprobe from input media file
in case ffmpeg was run with loglevel=error.
Args:
cmd (List[str]): A list of command line elements, e.g. ["ffmpeg", "-i", ...]
Returns:
Optional[int]: The duration in milliseconds.
'''

def _get_file_name(cmd: List[str]) -> Optional[str]:
try:
idx = cmd.index("-i")
return cmd[idx + 1]
except ValueError:
return None

file_name = _get_file_name(cmd)
if file_name is None:
return None

try:
if sys.platform == "win32":
output = subprocess.check_output(
[
"ffprobe",
"-loglevel",
"-1",
"-hide_banner",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
file_name,
],
universal_newlines=True,
creationflags=subprocess.CREATE_NO_WINDOW,
)
else:
output = subprocess.check_output(
[
"ffprobe",
"-loglevel",
"-1",
"-hide_banner",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
file_name,
],
universal_newlines=True,
)

return int(float(output.strip()) * 1000)
except Exception:
# TODO: add logging
return None


def _uses_error_loglevel(cmd: List[str]) -> bool:
try:
idx = cmd.index("-loglevel")
if cmd[idx + 1] == "error":
return True
else:
return False
except ValueError:
return False


class FfmpegProgress:
DUR_REGEX = re.compile(
r"Duration: (?P<hour>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2})\.(?P<ms>\d{2})"
)
TIME_REGEX = re.compile(
r"out_time=(?P<hour>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2})\.(?P<ms>\d{2})"
)

def __init__(self, cmd: List[str], dry_run: bool = False) -> None:
'''Initialize the FfmpegProgress class.
Args:
cmd (List[str]): A list of command line elements, e.g. ["ffmpeg", "-i", ...]
dry_run (bool, optional): Only show what would be done. Defaults to False.
'''
self.cmd = cmd
self.stderr: Union[str, None] = None
self.dry_run = dry_run
self.process: Any = None
self.stderr_callback: Union[Callable[[str], None], None] = None
if sys.platform == "win32":
self.base_popen_kwargs = {
"stdin": subprocess.PIPE, # Apply stdin isolation by creating separate pipe.
"stdout": subprocess.PIPE,
"stderr": subprocess.STDOUT,
"universal_newlines": False,
"shell": True,
}
else:
self.base_popen_kwargs = {
"stdin": subprocess.PIPE, # Apply stdin isolation by creating separate pipe.
"stdout": subprocess.PIPE,
"stderr": subprocess.STDOUT,
"universal_newlines": False,
}

def set_stderr_callback(self, callback: Callable[[str], None]) -> None:
'''
Set a callback function to be called on stderr output.
The callback function must accept a single string argument.
Note that this is called on every line of stderr output, so it can be called a lot.
Also note that stdout/stderr are joined into one stream, so you might get stdout output in the callback.
Args:
callback (Callable[[str], None]): A callback function that accepts a single string argument.
'''
if not callable(callback) or len(callback.__code__.co_varnames) != 1:
raise ValueError(
"Callback must be a function that accepts only one argument"
)

self.stderr_callback = callback

def run_command_with_progress(
self, popen_kwargs=None, duration_override: Union[float, None] = None
) -> Iterator[int]:
'''
Run an ffmpeg command, trying to capture the process output and calculate
the duration / progress.
Yields the progress in percent.
Args:
popen_kwargs (dict, optional): A dict to specify extra arguments to the popen call, e.g. { creationflags: CREATE_NO_WINDOW }
duration_override (float, optional): The duration in seconds. If not specified, it will be calculated from the ffmpeg output.
Raises:
RuntimeError: If the command fails, an exception is raised.
Yields:
Iterator[int]: A generator that yields the progress in percent.
'''
if self.dry_run:
return self.cmd

total_dur: Union[None, int] = None
if _uses_error_loglevel(self.cmd):
total_dur = _probe_duration(self.cmd)

cmd_with_progress = (
[self.cmd[0]] + ["-progress", "-", "-nostats"] + self.cmd[1:]
)

stderr = []
base_popen_kwargs = self.base_popen_kwargs.copy()
if popen_kwargs is not None:
base_popen_kwargs.update(popen_kwargs)

if sys.platform == "win32":
self.process = subprocess.Popen(
cmd_with_progress,
**base_popen_kwargs,
creationflags=subprocess.CREATE_NO_WINDOW,
) # type: ignore
else:
self.process = subprocess.Popen(
cmd_with_progress,
**base_popen_kwargs,
) # type: ignore

yield 0

while True:
if self.process.stdout is None:
continue

stderr_line = (
self.process.stdout.readline().decode("utf-8", errors="replace").strip()
)

if self.stderr_callback:
self.stderr_callback(stderr_line)

if stderr_line == '' and self.process.poll() is not None:
break

stderr.append(stderr_line.strip())

self.stderr = "\n".join(stderr)

if total_dur is None:
total_dur_match = self.DUR_REGEX.search(stderr_line)
if total_dur_match:
total_dur = to_ms(**total_dur_match.groupdict())
continue
elif duration_override is not None:
# use the override (should apply in the first loop)
total_dur = int(duration_override * 1000)
continue

if total_dur:
progress_time = FfmpegProgress.TIME_REGEX.search(stderr_line)
if progress_time:
elapsed_time = to_ms(**progress_time.groupdict())
yield int(elapsed_time * 100/ total_dur)

if self.process is None or self.process.returncode != 0:
#print(self.process)
#print(self.process.returncode)
_pretty_stderr = "\n".join(stderr)
raise RuntimeError(f"Error running command {self.cmd}: {_pretty_stderr}")

yield 100
self.process = None

def quit_gracefully(self) -> None:
'''
Quit the ffmpeg process by sending 'q'
Raises:
RuntimeError: If no process is found.
'''
if self.process is None:
raise RuntimeError("No process found. Did you run the command?")

self.process.communicate(input=b"q")
self.process.kill()
self.process = None

def quit(self) -> None:
'''
Quit the ffmpeg process by sending SIGKILL.
Raises:
RuntimeError: If no process is found.
'''
if self.process is None:
raise RuntimeError("No process found. Did you run the command?")

self.process.kill()
self.process = None


#=======================================================================================================================================#


def stop_ffmpeg_windows(error_messages_callback=None):
Expand Down Expand Up @@ -1704,8 +1969,8 @@ def __call__(self, media_filepath):
class MediaSubtitleEmbedder:
@staticmethod
def which(program):
def is_exe(media_filepath):
return os.path.isfile(media_filepath) and os.access(media_filepath, os.X_OK)
def is_exe(file_path):
return os.path.isfile(file_path) and os.access(file_path, os.X_OK)
fpath, _ = os.path.split(program)
if fpath:
if is_exe(program):
Expand All @@ -1726,7 +1991,7 @@ def ffmpeg_check():
return "ffmpeg.exe"
return None

def __init__(self, subtitle_path=None, language="eng", output_path=None, progress_callback=None, error_messages_callback=None):
def __init__(self, subtitle_path=None, language=None, output_path=None, progress_callback=None, error_messages_callback=None):
self.subtitle_path = subtitle_path
self.language = language
self.output_path = output_path
Expand Down Expand Up @@ -1758,10 +2023,10 @@ def get_existing_subtitle_language(self, media_filepath):
metadata = json.loads(output.stdout)
streams = metadata['streams']

# Find the subtitles stream with language metadata
# Find the subtitle stream with language metadata
subtitle_languages = []
for stream in streams:
if stream['codec_type'] == 'subtitles' and 'tags' in stream and 'language' in stream['tags']:
if stream['codec_type'] == 'subtitle' and 'tags' in stream and 'language' in stream['tags']:
language = stream['tags']['language']
subtitle_languages.append(language)

Expand Down Expand Up @@ -1793,16 +2058,16 @@ def __call__(self, media_filepath):
try:
existing_languages = self.get_existing_subtitle_language(media_filepath)
if self.language in existing_languages:
# THIS 'print' THINGS WILL MAKE 'progresbar' SCREWED UP!
#msg = (f"'{self.language}' subtitles stream already existed in {media_filepath}")
# THIS 'print' THINGS WILL MAKE progresbar screwed up!
#msg = (f"'{self.language}' subtitle stream already existed in {media_filepath}")
#if self.error_messages_callback:
# self.error_messages_callback(msg)
#else:
# print(msg)
return

else:
# Determine the next available subtitles index
# Determine the next available subtitle index
next_index = len(existing_languages)

ffmpeg_command = [
Expand Down Expand Up @@ -3018,6 +3283,7 @@ def main():
src_dst_tmp_output = embed_subtitle_to_media(src_tmp_embedded_media_filepath, media_type, dst_subtitle_filepath, ffmpeg_dst_language_code, src_dst_tmp_embedded_media_filepath)
'''


# USING CLASS
widgets = [f"Embedding '{ffmpeg_src_language_code}' subtitles into {media_type} : ", Percentage(), ' ', Bar(marker="#"), ' ', ETA()]
pbar = ProgressBar(widgets=widgets, maxval=100).start()
Expand Down

0 comments on commit 509ca18

Please sign in to comment.