Skip to content

Commit

Permalink
Fixed threading, migrated to CVs, added looping
Browse files Browse the repository at this point in the history
  • Loading branch information
roamic committed Aug 15, 2024
1 parent 5c4ac98 commit b3ef959
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 49 deletions.
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,4 @@
url = https://github.com/HowardHinnant/date.git
[submodule "externals/ffmpeg-core"]
path = externals/ffmpeg-core
url = https://github.com/RPCS3/ffmpeg-core.git
url = https://github.com/shadps4-emu/ext-ffmpeg-core
7 changes: 6 additions & 1 deletion src/core/libraries/avplayer/avplayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ s32 PS4_SYSV_ABI sceAvPlayerSetLooping(SceAvPlayerHandle handle, bool loop_flag)
if (handle == nullptr) {
return ORBIS_AVPLAYER_ERROR_INVALID_PARAMS;
}
if (!handle->SetLooping(loop_flag)) {
return ORBIS_AVPLAYER_ERROR_OPERATION_FAILED;
}
return ORBIS_OK;
}

Expand All @@ -256,7 +259,9 @@ s32 PS4_SYSV_ABI sceAvPlayerStop(SceAvPlayerHandle handle) {
if (handle == nullptr) {
return ORBIS_AVPLAYER_ERROR_INVALID_PARAMS;
}
return handle->Stop();
const auto res = handle->Stop();
LOG_TRACE(Lib_AvPlayer, "returning {}", res);
return res;
}

s32 PS4_SYSV_ABI sceAvPlayerStreamCount(SceAvPlayerHandle handle) {
Expand Down
7 changes: 7 additions & 0 deletions src/core/libraries/avplayer/avplayer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,11 @@ s32 AvPlayer::Stop() {
return ORBIS_OK;
}

bool AvPlayer::SetLooping(bool is_looping) {
if (m_state == nullptr) {
return false;
}
return m_state->SetLooping(is_looping);
}

} // namespace Libraries::AvPlayer
1 change: 1 addition & 0 deletions src/core/libraries/avplayer/avplayer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class AvPlayer {
bool IsActive();
u64 CurrentTime();
s32 Stop();
bool SetLooping(bool is_looping);

private:
using ScePthreadMutex = Kernel::ScePthreadMutex;
Expand Down
103 changes: 66 additions & 37 deletions src/core/libraries/avplayer/avplayer_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ AvPlayerSource::AvPlayerSource(AvPlayerStateCallback& state, std::string_view pa
const SceAvPlayerInitData& init_data,
SceAvPlayerSourceType source_type)
: m_state(state), m_memory_replacement(init_data.memory_replacement),
m_num_output_video_framebuffers(init_data.num_output_video_framebuffers) {
m_num_output_video_framebuffers(
std::min(std::max(2, init_data.num_output_video_framebuffers), 16)) {
AVFormatContext* context = avformat_alloc_context();
if (init_data.file_replacement.open != nullptr) {
m_up_data_streamer =
Expand Down Expand Up @@ -208,7 +209,7 @@ void AvPlayerSource::SetLooping(bool is_looping) {
}

std::optional<bool> AvPlayerSource::HasFrames(u32 num_frames) {
return m_video_frames.Size() > num_frames || m_is_eof;
return m_video_packets.Size() > num_frames || m_is_eof;
}

s32 AvPlayerSource::Start() {
Expand Down Expand Up @@ -255,6 +256,7 @@ bool AvPlayerSource::Stop() {
m_video_buffers.Push(std::move(m_current_video_frame.value()));
m_current_video_frame.reset();
}
m_stop_cv.Notify();

m_audio_packets.Clear();
m_video_packets.Clear();
Expand Down Expand Up @@ -291,30 +293,30 @@ bool AvPlayerSource::GetVideoData(SceAvPlayerFrameInfoEx& video_info) {
return false;
}

using namespace std::chrono;
while (m_video_frames.Size() == 0 && !m_is_eof) {
std::this_thread::sleep_for(milliseconds(5));
}
m_video_frames_cv.Wait([this]() { return m_video_frames.Size() != 0 || m_is_eof; });

auto frame = m_video_frames.Pop();
if (!frame.has_value()) {
LOG_WARNING(Lib_AvPlayer, "Could get video frame: no frames.");
LOG_WARNING(Lib_AvPlayer, "Could get video frame. EOF reached.");
return false;
}

{
using namespace std::chrono;
auto elapsed_time =
duration_cast<milliseconds>(high_resolution_clock::now() - m_start_time).count();
while (elapsed_time < frame->info.timestamp) {
std::this_thread::sleep_for(milliseconds(frame->info.timestamp - elapsed_time));
elapsed_time =
duration_cast<milliseconds>(high_resolution_clock::now() - m_start_time).count();
if (elapsed_time < frame->info.timestamp) {
if (m_stop_cv.WaitFor(milliseconds(frame->info.timestamp - elapsed_time),
[&]() { return elapsed_time >= frame->info.timestamp; })) {
return false;
}
}
}

// return the buffer to the queue
if (m_current_video_frame.has_value()) {
m_video_buffers.Push(std::move(m_current_video_frame.value()));
m_video_buffers_cv.Notify();
}
m_current_video_frame = std::move(frame->buffer);
video_info = frame->info;
Expand All @@ -326,30 +328,30 @@ bool AvPlayerSource::GetAudioData(SceAvPlayerFrameInfo& audio_info) {
return false;
}

using namespace std::chrono;
while (m_audio_frames.Size() == 0 && !m_is_eof) {
std::this_thread::sleep_for(milliseconds(5));
}
m_audio_frames_cv.Wait([this]() { return m_audio_frames.Size() != 0 || m_is_eof; });

auto frame = m_audio_frames.Pop();
if (!frame.has_value()) {
LOG_WARNING(Lib_AvPlayer, "Could get audio frame: no frames.");
LOG_WARNING(Lib_AvPlayer, "Could get audio frame. EOF reached.");
return false;
}

{
using namespace std::chrono;
auto elapsed_time =
duration_cast<milliseconds>(high_resolution_clock::now() - m_start_time).count();
while (elapsed_time < frame->info.timestamp) {
std::this_thread::sleep_for(milliseconds(frame->info.timestamp - elapsed_time));
elapsed_time =
duration_cast<milliseconds>(high_resolution_clock::now() - m_start_time).count();
if (elapsed_time < frame->info.timestamp) {
if (m_stop_cv.WaitFor(milliseconds(frame->info.timestamp - elapsed_time),
[&]() { return elapsed_time >= frame->info.timestamp; })) {
return false;
}
}
}

// return the buffer to the queue
if (m_current_audio_frame.has_value()) {
m_audio_buffers.Push(std::move(m_current_audio_frame.value()));
m_audio_buffers_cv.Notify();
}
m_current_audio_frame = std::move(frame->buffer);

Expand Down Expand Up @@ -424,8 +426,26 @@ void AvPlayerSource::DemuxerThread(std::stop_token stop) {
const auto res = av_read_frame(m_avformat_context.get(), up_packet.get());
if (res < 0) {
if (res == AVERROR_EOF) {
LOG_INFO(Lib_AvPlayer, "EOF reached in demuxer");
break;
if (m_is_looping) {
LOG_INFO(Lib_AvPlayer, "EOF reached in demuxer. Looping the source...");
avio_seek(m_avformat_context->pb, 0, SEEK_SET);
if (m_video_stream_index.has_value()) {
const auto index = m_video_stream_index.value();
const auto stream = m_avformat_context->streams[index];
avformat_seek_file(m_avformat_context.get(), index, 0, 0, stream->duration,
0);
}
if (m_audio_stream_index.has_value()) {
const auto index = m_audio_stream_index.value();
const auto stream = m_avformat_context->streams[index];
avformat_seek_file(m_avformat_context.get(), index, 0, 0, stream->duration,
0);
}
continue;
} else {
LOG_INFO(Lib_AvPlayer, "EOF reached in demuxer. Exiting.");
break;
}
} else {
LOG_ERROR(Lib_AvPlayer, "Could not read AV frame: error = {}", res);
m_state.OnError();
Expand All @@ -435,14 +455,20 @@ void AvPlayerSource::DemuxerThread(std::stop_token stop) {
}
if (up_packet->stream_index == m_video_stream_index) {
m_video_packets.Push(std::move(up_packet));
m_video_packets_cv.Notify();
} else if (up_packet->stream_index == m_audio_stream_index) {
m_audio_packets.Push(std::move(up_packet));
m_audio_packets_cv.Notify();
}
}

m_is_eof = true;

void* res;
m_video_packets_cv.Notify();
m_audio_packets_cv.Notify();
m_video_frames_cv.Notify();
m_audio_frames_cv.Notify();

if (m_video_decoder_thread.joinable()) {
m_video_decoder_thread.join();
}
Expand All @@ -457,7 +483,7 @@ void AvPlayerSource::DemuxerThread(std::stop_token stop) {
AvPlayerSource::AVFramePtr AvPlayerSource::ConvertVideoFrame(const AVFrame& frame) {
auto nv12_frame = AVFramePtr{av_frame_alloc(), &ReleaseAVFrame};
nv12_frame->pts = frame.pts;
nv12_frame->pkt_dts = frame.pkt_dts;
nv12_frame->pkt_dts = frame.pkt_dts < 0 ? 0 : frame.pkt_dts;
nv12_frame->format = AV_PIX_FMT_NV12;
nv12_frame->width = frame.width;
nv12_frame->height = frame.height;
Expand Down Expand Up @@ -520,8 +546,8 @@ void AvPlayerSource::VideoDecoderThread(std::stop_token stop) {
using namespace std::chrono;
LOG_INFO(Lib_AvPlayer, "Video Decoder Thread started");
while ((!m_is_eof || m_video_packets.Size() != 0) && !stop.stop_requested()) {
if (m_video_packets.Size() == 0) {
std::this_thread::sleep_for(milliseconds(5));
if (!m_video_packets_cv.Wait(
stop, [this]() { return m_video_packets.Size() != 0 || m_is_eof; })) {
continue;
}
const auto packet = m_video_packets.Pop();
Expand All @@ -537,8 +563,10 @@ void AvPlayerSource::VideoDecoderThread(std::stop_token stop) {
return;
}
while (res >= 0) {
if (m_video_buffers.Size() == 0 && !stop.stop_requested()) {
std::this_thread::sleep_for(milliseconds(5));
if (!m_video_buffers_cv.Wait(stop, [this]() { return m_video_buffers.Size() != 0; })) {
break;
}
if (m_video_buffers.Size() == 0) {
continue;
}
auto up_frame = AVFramePtr(av_frame_alloc(), &ReleaseAVFrame);
Expand Down Expand Up @@ -566,8 +594,7 @@ void AvPlayerSource::VideoDecoderThread(std::stop_token stop) {
} else {
m_video_frames.Push(PrepareVideoFrame(std::move(buffer.value()), *up_frame));
}
LOG_TRACE(Lib_AvPlayer, "Produced Video Frame. Num Frames: {}",
m_video_frames.Size());
m_video_frames_cv.Notify();
}
}
}
Expand All @@ -578,7 +605,7 @@ void AvPlayerSource::VideoDecoderThread(std::stop_token stop) {
AvPlayerSource::AVFramePtr AvPlayerSource::ConvertAudioFrame(const AVFrame& frame) {
auto pcm16_frame = AVFramePtr{av_frame_alloc(), &ReleaseAVFrame};
pcm16_frame->pts = frame.pts;
pcm16_frame->pkt_dts = frame.pkt_dts;
pcm16_frame->pkt_dts = frame.pkt_dts < 0 ? 0 : frame.pkt_dts;
pcm16_frame->format = AV_SAMPLE_FMT_S16;
pcm16_frame->ch_layout = frame.ch_layout;
pcm16_frame->sample_rate = frame.sample_rate;
Expand Down Expand Up @@ -638,8 +665,8 @@ void AvPlayerSource::AudioDecoderThread(std::stop_token stop) {
using namespace std::chrono;
LOG_INFO(Lib_AvPlayer, "Audio Decoder Thread started");
while ((!m_is_eof || m_audio_packets.Size() != 0) && !stop.stop_requested()) {
if (m_audio_packets.Size() == 0) {
std::this_thread::sleep_for(milliseconds(5));
if (!m_audio_packets_cv.Wait(
stop, [this]() { return m_audio_packets.Size() != 0 || m_is_eof; })) {
continue;
}
const auto packet = m_audio_packets.Pop();
Expand All @@ -654,10 +681,13 @@ void AvPlayerSource::AudioDecoderThread(std::stop_token stop) {
return;
}
while (res >= 0) {
if (m_audio_buffers.Size() == 0 && !stop.stop_requested()) {
std::this_thread::sleep_for(milliseconds(5));
if (!m_audio_buffers_cv.Wait(stop, [this]() { return m_audio_buffers.Size() != 0; })) {
break;
}
if (m_audio_buffers.Size() == 0) {
continue;
}

auto up_frame = AVFramePtr(av_frame_alloc(), &ReleaseAVFrame);
res = avcodec_receive_frame(m_audio_codec_context.get(), up_frame.get());
if (res < 0) {
Expand All @@ -683,8 +713,7 @@ void AvPlayerSource::AudioDecoderThread(std::stop_token stop) {
} else {
m_audio_frames.Push(PrepareAudioFrame(std::move(buffer.value()), *up_frame));
}
LOG_TRACE(Lib_AvPlayer, "Produced Audio Frame. Num Frames: {}",
m_audio_frames.Size());
m_audio_frames_cv.Notify();
}
}
}
Expand Down
47 changes: 44 additions & 3 deletions src/core/libraries/avplayer/avplayer_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
#include "avplayer_common.h"
#include "avplayer_data_streamer.h"

#include "common/types.h"
#include "common/polyfill_thread.h"
#include "common/types.h"
#include "core/libraries/kernel/thread_management.h"

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <optional>
#include <string>
Expand Down Expand Up @@ -87,6 +88,36 @@ struct Frame {
SceAvPlayerFrameInfoEx info;
};

class EventCV {
public:
template <class Pred>
void Wait(Pred pred) {
std::unique_lock lock(m_mutex);
m_cv.wait(lock, std::move(pred));
}

template <class Pred>
bool Wait(std::stop_token stop, Pred pred) {
std::unique_lock lock(m_mutex);
return m_cv.wait(lock, std::move(stop), std::move(pred));
}

template <class Pred, class Rep, class Period>
bool WaitFor(std::chrono::duration<Rep, Period> timeout, Pred pred) {
std::unique_lock lock(m_mutex);
return m_cv.wait_for(lock, timeout, std::move(pred));
}

void Notify() {
std::unique_lock lock(m_mutex);
m_cv.notify_all();
}

private:
std::mutex m_mutex{};
std::condition_variable_any m_cv{};
};

class AvPlayerSource {
public:
AvPlayerSource(AvPlayerStateCallback& state, std::string_view path,
Expand Down Expand Up @@ -139,7 +170,7 @@ class AvPlayerSource {
AvPlayerStateCallback& m_state;

SceAvPlayerMemAllocator m_memory_replacement{};
u64 m_num_output_video_framebuffers{};
u32 m_num_output_video_framebuffers{};

std::atomic_bool m_is_looping = false;
std::atomic_bool m_is_eof = false;
Expand All @@ -161,7 +192,17 @@ class AvPlayerSource {
std::optional<s32> m_video_stream_index{};
std::optional<s32> m_audio_stream_index{};

std::mutex m_state_mutex;
EventCV m_audio_packets_cv{};
EventCV m_audio_frames_cv{};
EventCV m_audio_buffers_cv{};

EventCV m_video_packets_cv{};
EventCV m_video_frames_cv{};
EventCV m_video_buffers_cv{};

EventCV m_stop_cv{};

std::mutex m_state_mutex{};
std::jthread m_demuxer_thread{};
std::jthread m_video_decoder_thread{};
std::jthread m_audio_decoder_thread{};
Expand Down
Loading

0 comments on commit b3ef959

Please sign in to comment.