Skip to content

[UR][NFC][CUDA-HIP] Add documentation in stream queue #19471

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 17, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 73 additions & 31 deletions unified-runtime/source/common/cuda-hip/stream_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,47 +30,90 @@ struct stream_queue_t {
static constexpr int DefaultNumComputeStreams = CS;
static constexpr int DefaultNumTransferStreams = TS;

// Mutex to guard modifications to the ComputeStreams vector, and
// NumComputeStreams.
std::mutex ComputeStreamMutex;
std::vector<native_type> ComputeStreams;
// Number of compute streams that have been created
unsigned int NumComputeStreams{0};

// Mutex to guard modifications to the TransferStreams vector, and
// NumTransferStreams.
std::mutex TransferStreamMutex;
std::vector<native_type> TransferStreams;
// Number of transfer streams that have been created
unsigned int NumTransferStreams{0};

// The stream indices are incremented every time we return a stream. This
// means that they encode both the index of the next stream in the round
// robin, as well as which iteration of the round robin we're on. Dividing
// the stream index by the size of the associated stream vector will give the
// number of round robins we've done as quotient, and the index of the next
// stream to use as remainder.
std::atomic_uint32_t ComputeStreamIndex{0};
std::atomic_uint32_t TransferStreamIndex{0};

// The LastSync indices keep track of the index based on ComputeStreamIndex
// or TransferStreamIndex of the last stream that was synchronized during a
// syncStreams operation.
unsigned int LastSyncComputeStreams{0};
unsigned int LastSyncTransferStreams{0};

// Stream used for recording EvQueue, which holds information about when the
// command in question is enqueued on host, as opposed to started. It is
// created only if profiling is enabled - either for queue or per event.
native_type HostSubmitTimeStream{0};
// Flag to keep track of the creation og HostSubmitTimeStream, it is created
// either in the queue constructor when profiling is enabled or whenever it
// is requested for the first time through timestamp entry points.
std::once_flag HostSubmitTimeStreamFlag;
// delay_compute_ keeps track of which streams have been recently reused and

// DelayCompute keeps track of which streams have been recently reused and
// their next use should be delayed. If a stream has been recently reused it
// will be skipped the next time it would be selected round-robin style. When
// skipped, its delay flag is cleared.
std::vector<bool> DelayCompute;
// keep track of which streams have applied barrier

// ComputeStreamSyncMutex is used to guard compute streams when they are
// being re-used.
//
// When ComputeStreamSyncMutex and ComputeStreamMutex both need to be
// locked at the same time, ComputeStreamSyncMutex should be locked first
// to avoid deadlocks.
std::mutex ComputeStreamSyncMutex;

// Guards barrier insertion in urEnqueueEventsWaitWithBarrier.
std::mutex BarrierMutex;
BarrierEventT BarrierEvent = nullptr;
BarrierEventT BarrierTmpEvent = nullptr;

// Keep track of which streams have applied barrier.
std::vector<bool> ComputeAppliedBarrier;
std::vector<bool> TransferAppliedBarrier;
ur_context_handle_t_ *Context;
ur_device_handle_t_ *Device;

ur_context_handle_t Context;
ur_device_handle_t Device;

// Reference count for the queue object.
ur::RefCount RefCount;

// Event count used to give events an ordering used in the event class
// forLatestEvents.
std::atomic_uint32_t EventCount{0};
std::atomic_uint32_t ComputeStreamIndex{0};
std::atomic_uint32_t TransferStreamIndex{0};
unsigned int NumComputeStreams{0};
unsigned int NumTransferStreams{0};
unsigned int LastSyncComputeStreams{0};
unsigned int LastSyncTransferStreams{0};

// Queue flags in the native API format as well as UR format.
unsigned int Flags;
ur_queue_flags_t URFlags;

// Priority of this queue, matches underlying API priority.
int Priority;
// When ComputeStreamSyncMutex and ComputeStreamMutex both need to be
// locked at the same time, ComputeStreamSyncMutex should be locked first
// to avoid deadlocks
std::mutex ComputeStreamSyncMutex;
std::mutex ComputeStreamMutex;
std::mutex TransferStreamMutex;
std::mutex BarrierMutex;

// Tracks if the queue owns the underlying native streams, this may happen
// for queues created from interop.
bool HasOwnership;
BarrierEventT BarrierEvent = nullptr;
BarrierEventT BarrierTmpEvent = nullptr;

stream_queue_t(bool IsOutOfOrder, ur_context_handle_t_ *Context,
ur_device_handle_t_ *Device, unsigned int Flags,
stream_queue_t(bool IsOutOfOrder, ur_context_handle_t Context,
ur_device_handle_t Device, unsigned int Flags,
ur_queue_flags_t URFlags, int Priority)
: ComputeStreams(IsOutOfOrder ? DefaultNumComputeStreams : 1),
TransferStreams(IsOutOfOrder ? DefaultNumTransferStreams : 0),
Expand All @@ -87,16 +130,16 @@ struct stream_queue_t {
}
}

// Create a queue from a native handle
stream_queue_t(native_type stream, ur_context_handle_t_ *Context,
ur_device_handle_t_ *Device, unsigned int Flags,
// Create a queue from a native handle.
stream_queue_t(native_type stream, ur_context_handle_t Context,
ur_device_handle_t Device, unsigned int Flags,
ur_queue_flags_t URFlags, bool BackendOwns)
: ComputeStreams(1, stream), TransferStreams(0),
: ComputeStreams(1, stream), NumComputeStreams{1}, TransferStreams(0),
DelayCompute(this->ComputeStreams.size(), false),
ComputeAppliedBarrier(this->ComputeStreams.size()),
TransferAppliedBarrier(this->TransferStreams.size()), Context{Context},
Device{Device}, NumComputeStreams{1}, Flags(Flags), URFlags(URFlags),
Priority(0), HasOwnership{BackendOwns} {
Device{Device}, Flags(Flags), URFlags(URFlags), Priority(0),
HasOwnership{BackendOwns} {
urContextRetain(Context);

// Create timing stream if profiling is enabled.
Expand All @@ -107,6 +150,7 @@ struct stream_queue_t {

~stream_queue_t() { urContextRelease(Context); }

// Methods defined by the specific adapters.
void computeStreamWaitForBarrierIfNeeded(native_type Strean,
uint32_t StreamI);
void transferStreamWaitForBarrierIfNeeded(native_type Stream,
Expand Down Expand Up @@ -206,9 +250,6 @@ struct stream_queue_t {
return Result;
}

native_type get() { return getNextComputeStream(); };
ur_device_handle_t getDevice() const noexcept { return Device; };

native_type getHostSubmitTimeStream() { return HostSubmitTimeStream; }

bool hasBeenSynchronized(uint32_t StreamToken) {
Expand Down Expand Up @@ -345,7 +386,8 @@ struct stream_queue_t {
}
}

ur_context_handle_t_ *getContext() const { return Context; };
ur_device_handle_t getDevice() const noexcept { return Device; };
ur_context_handle_t getContext() const noexcept { return Context; };

uint32_t getNextEventId() noexcept { return ++EventCount; }

Expand Down