Skip to content

Commit

Permalink
redirected subprocess outputs into Rich console
Browse files Browse the repository at this point in the history
  • Loading branch information
k4yt3x committed Feb 28, 2022
1 parent c0fe81b commit 4459f4d
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 14 deletions.
36 changes: 29 additions & 7 deletions video2x/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
Name: Video Decoder
Author: K4YT3X
Date Created: June 17, 2021
Last Modified: February 16, 2022
Last Modified: February 27, 2022
"""

# local imports
from .pipe_printer import PipePrinter

# built-in imports
import contextlib
import os
Expand Down Expand Up @@ -87,10 +90,15 @@ def __init__(
),
overwrite_output=True,
),
env={"AV_LOG_FORCE_COLOR": "TRUE"},
stdout=subprocess.PIPE,
# stderr=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)

# start the PIPE printer to start printing FFmpeg logs
self.pipe_printer = PipePrinter(self.decoder.stderr)
self.pipe_printer.start()

def run(self) -> None:
self.running = True

Expand All @@ -113,8 +121,8 @@ def run(self) -> None:
# after the last frame has been decoded
# read will return nothing
if len(buffer) == 0:
logger.debug("Decoding queue depleted")
break
self.stop()
continue

# convert raw bytes into image object
image = Image.frombytes(
Expand All @@ -140,6 +148,7 @@ def run(self) -> None:

# most likely "not enough image data"
except ValueError as e:
self.exception = e

# ignore queue closed
if not "is closed" in str(e):
Expand All @@ -151,13 +160,26 @@ def run(self) -> None:
self.exception = e
logger.exception(e)
break
else:
logger.debug("Decoding queue depleted")

# flush the remaining data in STDOUT and close PIPE
self.decoder.stdout.flush()
self.decoder.stdout.close()

# flush the remaining data in STDERR and wait for it to be read
self.decoder.stderr.flush()

# send SIGINT (2) to FFmpeg
# this instructs it to finalize and exit
if self.decoder.poll() is None:
self.decoder.send_signal(signal.SIGTERM)
self.decoder.send_signal(signal.SIGINT)

# wait for process to terminate
self.pipe_printer.stop()
self.decoder.stderr.close()

# ensure the decoder has exited
# wait for processes and threads to stop
self.pipe_printer.join()
self.decoder.wait()
logger.info("Decoder thread exiting")

Expand Down
29 changes: 25 additions & 4 deletions video2x/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
Name: Video Encoder
Author: K4YT3X
Date Created: June 17, 2021
Last Modified: February 16, 2022
Last Modified: February 27, 2022
"""

# local imports
from .pipe_printer import PipePrinter

# built-in imports
import multiprocessing
import multiprocessing.managers
Expand Down Expand Up @@ -74,6 +77,10 @@ def __init__(
self.processed_frames = processed_frames
self.processed = processed

# stores exceptions if the thread exits with errors
self.exception = None

# create FFmpeg input for the original input video
self.original = ffmpeg.input(input_path)

# define frames as input
Expand Down Expand Up @@ -122,11 +129,15 @@ def __init__(
),
overwrite_output=True,
),
env={"AV_LOG_FORCE_COLOR": "TRUE"},
stdin=subprocess.PIPE,
# stdout=subprocess.DEVNULL,
# stderr=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)

# start the PIPE printer to start printing FFmpeg logs
self.pipe_printer = PipePrinter(self.encoder.stderr)
self.pipe_printer.start()

def run(self) -> None:
self.running = True
frame_index = 0
Expand All @@ -150,19 +161,29 @@ def run(self) -> None:

# send exceptions into the client connection pipe
except Exception as e:
self.exception = e
logger.exception(e)
break
else:
logger.debug("Encoding queue depleted")

# flush the remaining data in STDIN and close PIPE
logger.debug("Encoding queue depleted")
self.encoder.stdin.flush()
self.encoder.stdin.close()

# flush the remaining data in STDERR and wait for it to be read
self.encoder.stderr.flush()

# send SIGINT (2) to FFmpeg
# this instructs it to finalize and exit
self.encoder.send_signal(signal.SIGINT)

# wait for process to terminate
self.pipe_printer.stop()
self.encoder.stderr.close()

# wait for processes and threads to stop
self.pipe_printer.join()
self.encoder.wait()
logger.info("Encoder thread exiting")

Expand Down
56 changes: 56 additions & 0 deletions video2x/pipe_printer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Copyright (C) 2018-2022 K4YT3X and contributors.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Name: PIPE Printer
Author: K4YT3X
Date Created: February 27, 2022
Last Modified: February 27, 2022
"""

# built-in imports
from typing import IO
import os
import sys
import threading
import time


class PipePrinter(threading.Thread):
def __init__(self, stderr: IO[bytes]) -> None:
threading.Thread.__init__(self)
self.stderr = stderr
self.running = False

# set read mode to non-blocking
os.set_blocking(self.stderr.fileno(), False)

def run(self) -> None:
self.running = True

# keep printing contents in the PIPE
while self.running:
time.sleep(0.5)

output = self.stderr.read()
if output is not None:
print(output.decode(), file=sys.stderr)

return super().run()

def stop(self) -> None:
self.running = False
24 changes: 21 additions & 3 deletions video2x/video2x.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,26 @@ def _run(
# wait for jobs in queue to deplete
while self.processed.value < total_frames - 1:
time.sleep(0.5)

# check processor health
for process in self.processor_processes:
if not process.is_alive():
raise Exception("process died unexpectedly")

# check decoder health
if (
not self.decoder.is_alive()
and self.decoder.exception is not None
):
raise Exception("decoder died unexpectedly")

# check encoder health
if (
not self.encoder.is_alive()
and self.encoder.exception is not None
):
raise Exception("encoder died unexpectedly")

# show progress bar when upscale starts
if progress.disable is True and self.processed.value > 0:
progress.disable = False
Expand All @@ -266,7 +282,8 @@ def _run(
# if SIGTERM is received or ^C is pressed
# TODO: pause and continue here
except (SystemExit, KeyboardInterrupt) as e:
logger.warning("Exit signal received, terminating")
logger.warning("Exit signal received, exiting gracefully")
logger.warning("Press ^C again to force terminate")
exception.append(e)

except Exception as e:
Expand All @@ -281,8 +298,8 @@ def _run(
# mark processing queue as closed
self.processing_queue.close()

# stop upscaler processes
logger.info("Stopping upscaler processes")
# stop processor processes
logger.info("Stopping processor processes")
for process in self.processor_processes:
process.terminate()

Expand All @@ -291,6 +308,7 @@ def _run(
process.join()

# ensure both the decoder and the encoder have exited
logger.info("Stopping decoder and encoder threads")
self.decoder.stop()
self.encoder.stop()
self.decoder.join()
Expand Down

0 comments on commit 4459f4d

Please sign in to comment.