Skip to content

Commit

Permalink
Rename track util
Browse files Browse the repository at this point in the history
  • Loading branch information
third774 committed Jan 14, 2025
1 parent 81f632a commit 5e24087
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 85 deletions.
17 changes: 8 additions & 9 deletions fixtures/partytracks/src/client/index.tsx
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
import "./styles.css";

import { PartyTracks } from "partytracks/client";
import { of } from "rxjs";
import { PartyTracks, resilientTrack$ } from "partytracks/client";
import invariant from "tiny-invariant";

const localVideo = document.getElementById("local-video");
const remoteVideo = document.getElementById("remote-video");
invariant(localVideo instanceof HTMLVideoElement);
invariant(remoteVideo instanceof HTMLVideoElement);

const webcamTrack = await navigator.mediaDevices
.getUserMedia({ video: true })
.then((ms) => ms.getVideoTracks()[0]);
const track$ = resilientTrack$({ kind: "videoinput" });

const localMediaStream = new MediaStream();
localMediaStream.addTrack(webcamTrack);
localVideo.srcObject = localMediaStream;
track$.subscribe((track) => {
const localMediaStream = new MediaStream();
localMediaStream.addTrack(track);
localVideo.srcObject = localMediaStream;
});

const partyTracks = new PartyTracks();
const pushedTrack$ = partyTracks.push(of(webcamTrack));
const pushedTrack$ = partyTracks.push(track$);
const pulledTrack$ = partyTracks.pull(pushedTrack$);

pulledTrack$.subscribe((track) => {
Expand Down
26 changes: 5 additions & 21 deletions fixtures/video-echo/app/components/Demo.client.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { useMemo, useRef, useState } from "react";
import { devices$, ideallyGetTrack$, PartyTracks } from "partytracks/client";
import { devices$, PartyTracks, resilientTrack$ } from "partytracks/client";
import { useObservableAsValue, useOnEmit } from "partytracks/react";
import { map, of, shareReplay } from "rxjs";
import { map } from "rxjs";

import type { ComponentProps, ComponentRef } from "react";
import type { Observable } from "rxjs";
Expand All @@ -11,13 +11,7 @@ export function Demo() {
const [remoteFeedOn, setRemoteFeedOn] = useState(false);
const [preferredWebcamDeviceId, setPreferredWebcamDeviceId] = useState("");
const devices = useObservableAsValue(devices$);
const client = useMemo(
() =>
new PartyTracks({
apiBase: "/api/calls"
}),
[]
);
const client = useMemo(() => new PartyTracks(), []);

const peerConnectionState = useObservableAsValue(
client.peerConnectionState$,
Expand Down Expand Up @@ -133,23 +127,13 @@ function Audio(props: { audioTrack$: Observable<MediaStreamTrack | null> }) {
function useWebcamTrack$(enabled: boolean) {
return useMemo(() => {
if (!enabled) return null;
return ideallyGetTrack$({ kind: "videoinput" }).pipe(
shareReplay({
refCount: true,
bufferSize: 1
})
);
return resilientTrack$({ kind: "videoinput" });
}, [enabled]);
}

function useMicTrack$(enabled: boolean) {
return useMemo(() => {
if (!enabled) return null;
return ideallyGetTrack$({ kind: "audioinput" }).pipe(
shareReplay({
refCount: true,
bufferSize: 1
})
);
return resilientTrack$({ kind: "audioinput" });
}, [enabled]);
}
5 changes: 2 additions & 3 deletions packages/partytracks/src/client/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
export { PartyTracks } from "./PartyTracks";
export {
ideallyGetTrack$,
resilientTrack$,
DevicesExhaustedError,
devices$
} from "./getUserMediaTrack$";
export type { InputMediaDeviceKind } from "./getUserMediaTrack$";
} from "./resilientTrack$";
export { getScreenshare$ } from "./getScreenshare$";
export { setLogLevel } from "./logging";
export type { PartyTracksConfig, ApiHistoryEntry } from "./PartyTracks";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {
combineLatest,
concat,
debounceTime,
defer,
Expand All @@ -9,7 +8,7 @@ import {
map,
merge,
Observable,
of,
shareReplay,
switchMap
} from "rxjs";

Expand All @@ -25,7 +24,7 @@ export class DevicesExhaustedError extends Error {
}
}

// Using defer so that this doesn't blow up if it ends
// Using defer here so that this doesn't blow up if it ends
// up in a server js bundle since navigator is browser api
export const devices$ = defer(() =>
merge(
Expand All @@ -37,63 +36,63 @@ export const devices$ = defer(() =>
)
);

export type InputMediaDeviceKind = Exclude<MediaDeviceKind, "audiooutput">;
export interface ResilientTrackOptions {
kind: "audioinput" | "videoinput";
constraints?: MediaTrackConstraints;
devicePriority$?: Observable<MediaDeviceInfo[]>;
onDeviceFailure?: (device: MediaDeviceInfo) => void;
}

export const ideallyGetTrack$ = ({
export const resilientTrack$ = ({
kind,
constraints$ = of({}),
prioritizedDeviceList$ = devices$,
onDeviceUnhealthy = () => {}
}: {
kind?: InputMediaDeviceKind;
constraints$?: Observable<MediaTrackConstraints>;
prioritizedDeviceList$?: Observable<MediaDeviceInfo[]>;
onDeviceUnhealthy?: (device: MediaDeviceInfo) => void;
}): Observable<MediaStreamTrack> =>
combineLatest([
prioritizedDeviceList$.pipe(
map((list) =>
// only apply filter when kind is defined
list.filter((d) => (kind === undefined ? true : d.kind === kind))
),
constraints = {},
devicePriority$ = devices$,
onDeviceFailure = () => {}
}: ResilientTrackOptions): Observable<MediaStreamTrack> =>
devicePriority$
.pipe(
map((list) => list.filter((d) => d.kind === kind)),
distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b))
),
constraints$
]).pipe(
// switchMap on the outside here will cause the previous queue to stop
// when the inputs change
switchMap(([deviceList, constraints]) =>
// concat here is going to make these be subscribed to sequentially
concat(
...deviceList.map(
(device) =>
new Observable<MediaStreamTrack>((subscriber) => {
const cleanupRef = { current: () => {} };
acquireTrack(
subscriber,
device,
constraints,
cleanupRef,
onDeviceUnhealthy
);
return () => {
cleanupRef.current();
};
})
),
new Observable<MediaStreamTrack>((sub) =>
sub.error(new DevicesExhaustedError())
)
)
)
);
.pipe(
// switchMap on the outside here will cause the previous queue to stop
// when the inputs change
switchMap((deviceList) =>
// concat here is going to make these be subscribed to sequentially
concat(
...deviceList.map(
(device) =>
new Observable<MediaStreamTrack>((subscriber) => {
const cleanupRef = { current: () => {} };
acquireTrack(
subscriber,
device,
constraints,
cleanupRef,
onDeviceFailure
);
return () => {
cleanupRef.current();
};
})
),
new Observable<MediaStreamTrack>((sub) =>
sub.error(new DevicesExhaustedError())
)
)
),
shareReplay({
refCount: true,
bufferSize: 1
})
);

function acquireTrack(
subscriber: Subscriber<MediaStreamTrack>,
device: MediaDeviceInfo,
constraints: MediaTrackConstraints,
cleanupRef: { current: () => void },
onDeviceUnhealthy: (device: MediaDeviceInfo) => void
onDeviceFailure: (device: MediaDeviceInfo) => void
) {
const { deviceId, groupId, label } = device;
logger.log(`🙏🏻 Requesting ${label}`);
Expand Down Expand Up @@ -125,15 +124,15 @@ function acquireTrack(
device,
constraints,
cleanupRef,
onDeviceUnhealthy
onDeviceFailure
);
};
document.addEventListener("visibilitychange", onVisibleHandler);
cleanupRef.current = cleanup;
subscriber.next(track);
} else {
logger.log("☠️ track is not healthy, stopping");
onDeviceUnhealthy(device);
onDeviceFailure(device);
track.stop();
subscriber.complete();
}
Expand Down

0 comments on commit 5e24087

Please sign in to comment.