Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Idiomatic rxjs #1

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

sourishkrout
Copy link

Refactorage

Copy link
Owner

@grampelberg grampelberg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bunch of questions! I love that the stream got folded from 3 into one. I think my biggest questions are around how the caching works and how to get loading to toggle (but I suspect I know how to do that with a nested subscribe ...)

tap(data),
const refresher = timer(0, 1000).pipe(
switchMap(() => getURL(url)),
publish(),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so this sets the timer which fires immediately and every second. It doesn't actually do anything though until something subscribes because of the publish? Why is the publish required if observables don't go hot until the subscription normally?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The publish is required to turn the timer into something multiple subscribers can subscribe to. Think about it like a multiplexer. The refresher stream which is subscribed to the timer stream won't start firing until it has at least 1 subscriber which has also called connect on the subscription to turn it hot. To avoid requiring subscribers having to call connect/disconnect is why the next line leverages the refCounter.

const refresher = timer(0, 1000).pipe(
switchMap(() => getURL(url)),
publish(),
refCount(),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do publish/refCount instead of just share (or just using a subject)? From the docs:

Note that using the share operator is exactly the same as using the publish operator (making the observable hot) and the refCount operator in a sequence.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you could use share. However, depending on the use case you might want to use publishBehavior or publishReplay to get different characteristics some of which have their own shareReplay etc. Just got used to using them in conjunction since it's easier to play around with them individually. - Once you've got the right behavior you could condense a sequence of operator with combinator operatoers.

data.subscribe((ev) => {
setData(ev.value.body)
setError(ev.error)
const fetcher = fetchData(url).pipe(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this toggle loading on refresh? I like the pattern here more, but loading will always just be false after the first time right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It won't toggle it, it will work like an argument to instantiate a consumable stream to either be one or the other. Did you want streams to allow toggling refresh? Can probably make that happen.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh shit... i think I misunderstood and it just became clear to me. You're right I forgot to toggle loading when a refresh is attempted. Let me fix that.

sub.add(startRefresh.subscribe())
let refresher = fetcher
if (!refresh) {
refresher = fetcher.pipe(take(1))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, instead of taking the whole stream, this'll take a single one and then stop listening again? How does it fit in with the cache? Is the original stream kept in memory and the first item is taken? Or is it the latest item? Or am I not understanding at all? =)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

take(1) will emit a complete to the subscriber signaling that it's complete. If you don't just want avoid duplication of fetcher using a cache you should be able to use publishBehavior instead of publish to relay the last value emitted to subsequent subscribers. Let me add that.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like this take(1) makes it so that if someone else is refreshing, no one else gets updates. That seems weird? I assumed that it would update all subscribers and not just the one taking refreshes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@grampelberg do you mean when you have multiple useFetchMany consumers? E.g. below. - That works fine for me. The take(1) happens prior to publishBehavior(null), refCount(), which should make it so that you multiple subscribers can consume to emissions from the same source stream.

const Home = () => (
  <>
    <Many />
    <Many />
    <Many />
    <One />
  </>
)

}

const sub = refresher.subscribe((ev) => {
if (ev.error) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason you're guarding the error here? My thinking was that if ev.error is undefined, on the next refresh it'll get unset (imagine retries).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice the return in the subsequent line return setError(ev.error) which will avoid a null pointer exception for ev.value.body when an error occurs because ev.value will be null. Maybe not the most self-explanatory way to avoid this execption.

filter((i) => !isError(i)),
dematerialize(),
updateBy('url'),
combineAll(),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this end up waiting for all to complete? The main reason I did this implementation was to allow results to stream back (especially when there's partial errors). I'd like to get results from some of the endpoints even when there's issues across them all (imagine a timeout holding everything up for 10 seconds).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will lump the individual streams together and emit them as one value. You're right, this will result in the joint completion taking as long as the slowest one to complete. Let me improve on that.

I generally wouldn't recommend lumping unrelated requests together unless the consumer(s) requires it. I mean, I suppose useFetchMany is more of a convenience API.

)
setError(errs)
setLoading(false)
}),
)
.subscribe(setData)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't setData end up adding Notification including errors and completes?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷‍♂️ , i have to admit that I refactored the rxjs plumping without fully getting into what the playloads look like or reasoning about it. My 1-2h best quick effort. - Should be easy to modify this to your expectations here.

Copy link
Owner

@grampelberg grampelberg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My biggest question is around the refresh updates. I think I understand why it is an issue but I'd love some confirmation =)

sub.add(startRefresh.subscribe())
let refresher = fetcher
if (!refresh) {
refresher = fetcher.pipe(take(1))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like this take(1) makes it so that if someone else is refreshing, no one else gets updates. That seems weird? I assumed that it would update all subscribers and not just the one taking refreshes.

setError([])
})

of(setLoading(true))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely setting loading: false a bunch, but it doesn't appear to ever set it to true. I tried adding a couple types events and a concat, but that broke everything and I've not figured out why yet.

const refresher = timer(0, 1000).pipe(
    switchMap(() =>
      concat(
        of({ kind: 'loading', state: true }),
        getURL(url).pipe(
          catchError((err) => of(err)),
          publishBehavior(null),
          refCount(),
          filter((ev) => ev !== null),
          map((resp) => ({ kind: 'response', ...resp })),
        ),
        of({ kind: 'loading', state: false }),
      ),
    ),
  )

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dang. I didn't properly test this with refresh=true. However, since of(...) is the source observable it will only ever run once, then complete. Without having tested it thoroughly I believe your revised refresher impl suffers from the same problem. Lemme see if I can fix this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@grampelberg pushed some more commits; essentially split the refresher into two separate streams 1.) pulse (interval fires) & 2. fetcher (emits values from remote API). This will make it work.

const valsub = vals.subscribe(setData)
const errsub = errs.subscribe(setError)

return () => {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why didn't you add these as child subscriptions?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally possible. Really just a style/code maintenance thing.

It's probably because I primarily used rxjs for a somewhat different use case I have cognitive bias to keep my streams/observables very separate.

}

const status = all.pipe(
map((url) => {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool!

@sourishkrout
Copy link
Author

Getting there! I'll take a look later today or over the weekend. :-D

@sourishkrout
Copy link
Author

@grampelberg the one last thing I noticed is that refreshers will halt (complete) on error - is that what you want? if it should continue to refresh on error the error handling needs to be moved higher in the order before switchMap

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants