Skip to content

Commit

Permalink
async rework to notify first
Browse files Browse the repository at this point in the history
  • Loading branch information
devagrawal09 committed Jan 31, 2025
1 parent 5bdbd8a commit 13a9edd
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 11 deletions.
28 changes: 27 additions & 1 deletion index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
createListener,
createMutationListener,
createPartition,
createSyncListener,
halt,
} from ".";
import { setTimeout } from "timers/promises";
Expand Down Expand Up @@ -78,7 +79,7 @@ describe(`createEvent`, () => {

const d = createRoot((d) => {
const [on, emit] = createEvent<string>();
const onValid = on((p) => (p.length < 3 ? halt() : p));
const onValid = on((p) => (p.length < 3 ? halt(`Huh`) : p));
onValid((p) => messages.push(p));
emit(`hello`);
emit(`hi`);
Expand Down Expand Up @@ -207,3 +208,28 @@ describe(`createMutationListener`, () => {
d();
});
});

describe(`createSyncListener`, () => {
test(`runs synchronously`, async () => {
const messages = [] as number[];

const d = createRoot((d) => {
const [on, emit] = createEvent<number>();
const onAsync = on(async (p) => {
await setTimeout(10);
return p + 1;
});
onAsync((p) => messages.push(p));
createSyncListener(onAsync, (p) => {
messages.push(0);
p.then((p) => p && messages.push(p + 1));
});
emit(0);
return d;
});

await setTimeout(10);
expect(messages).toEqual([0, 1, 2]);
d();
});
});
54 changes: 44 additions & 10 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,38 @@ import { createStore, produce, reconcile } from "solid-js/store";
export type Handler<E> = (<O>(
transform: (e: E) => Promise<O> | O
) => Handler<O>) & {
$: Observable<E>;
$: Observable<E | Promise<E | HaltError>>;
};
export type Emitter<E> = (e: E) => void;

function makeHandler<E>($: Observable<E>): Handler<E> {
function handler<O>(transform: (e: E) => Promise<O> | O): Handler<O> {
const next$ = new Subject<O>();
function makeHandler<E>($: Observable<Promise<E | HaltError> | E>): Handler<E> {
function handler<O>(
transform: (e: E) => Promise<O> | O,
syncTransform?: (e: Promise<E | undefined>) => void
): Handler<O> {
const next$ = new Subject<Promise<O | HaltError> | O>();
const sub = $.subscribe((e) => {
try {
const res = transform(e);
if (res instanceof Promise)
res.then((o) => (next$.next(o), flushQueues()));
else pureQueue.push(() => next$.next(res));
if (syncTransform) {
syncTransform(
Promise.resolve(e).then((_e) => {
if (!(_e instanceof HaltError)) return _e;
})
);
}
const o =
e instanceof Promise
? e
.then((_e) => {
if (_e instanceof HaltError) return _e;
return transform(_e);
})
.catch(handleError)
: transform(e);

pureQueue.push(() => next$.next(o));
} catch (e) {
if (!(e instanceof HaltError)) throw e;
console.info(e.message);
handleError(e);
}
});
onCleanup(() => sub.unsubscribe());
Expand Down Expand Up @@ -105,6 +121,11 @@ export class HaltError extends Error {
}
}

function handleError(e: any) {
if (!(e instanceof HaltError)) throw e;
console.info(e.message);
return e;
}
export function halt(reason?: string): never {
throw new HaltError(reason);
}
Expand All @@ -127,6 +148,19 @@ export function createListener<E>(
});
}

export function createSyncListener<E>(
handler: Handler<E>,
effect: (payload: Promise<E | undefined>) => any
) {
handler(
(e) => {},
// @ts-expect-error
(p) => {
listenerQueue.push(() => effect(p));
}
);
}

let listenerQueue = [] as Function[];

let runningListenerQueue = false;
Expand Down

0 comments on commit 13a9edd

Please sign in to comment.