Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fuel server and Blocks job resumption #157

Open
dwf opened this issue Jun 9, 2015 · 6 comments
Open

Fuel server and Blocks job resumption #157

dwf opened this issue Jun 9, 2015 · 6 comments

Comments

@dwf
Copy link
Contributor

dwf commented Jun 9, 2015

Currently (correct) job resumption and the "server process" are an either/or. It would be nice to at least think about fixing this.

@bartvm Have you thought at all about how this could work?

Seems like we'd need to define a request/reply protocol that allows the client to request all the bits of state necessary for the streams in the pipeline (in theory, dataset objects themselves ought to be stateless...). This might not be as bad as it sounds: every transformer just needs a method like so:

def _get_stream_state(self):
    return self.data_stream.stream_state + [()]  # or something

def _set_stream_state(self, state):
    state = list(state)
    current = state.popleft()
    # ... do something with current
    self.data_stream.stream_state = state

stream_state = property(_get_stream_state, _set_stream_state)
@bartvm
Copy link
Member

bartvm commented Jun 9, 2015

I've given it a little bit of thought, but don't really have a clear idea of how it could work. A big problem are the buffers. The current method is quite fast partly because there is no request/reply protocol, so the server can just push data until all the ZMQ and system TCP buffers are full. However, that means that when the client process requests the server's state, that state is not in sync with what the client has received up to that point.

If we limit the checkpointing/resumption to fixed intervals e.g. only in between epochs, it's probably a lot more feasible. This might not be as bad as a limitation as it sounds, because we could write a data stream that splits a long epoch up into smaller epochs, so that we can save on regular intervals (might make things slightly more complicated if you want to e.g. average something over the full epoch though).

In that case, I wonder whether we really need custom _get_stream_state and _set_stream_state methods. We could simply pickle the data stream and send that over TCP. When resuming, the client could send the pickled data stream to the server, which unpickles it and continues from the epoch it left off.

@dwf
Copy link
Contributor Author

dwf commented Jun 15, 2015

I think we can work around the buffers issue, actually, if we assume that there is a control connection fot initiating epochs, and before we start an epoch on the server side, we pickle our data stream so that the pre-epoch DataStream is available upon request (we might even send it to the client once per epoch if that's not too expensive). The client keeps track of how many batches it had received in a given epoch, and records it if the job dies. Upon resumption, the pickled pre-epoch stream is sent back to the server along with the number of batched already seen; the server queues it up to the right place before starting to send any batches over the wire.

We could potentially put this functionality in get_fastforwarded_epoch_iterator or something, have a default implementation in AbstractDataStream that just calls toolz.itertoolz.drop(n, self.get_epoch_iterator()) or a picklable equivalent, and let other streams override it with something cleverer if warranted (like if you're iterating with an IndexScheme, just iterate the indices without doing array/disk access).

I think this gets us everything we need for arbitrary resumability, or am I missing something?

@bartvm
Copy link
Member

bartvm commented Jun 15, 2015

Ah, clever, that sounds like it would work yes. That said, I'm not sure what people find a higher priority: Resumption, or parallelizing the data processing? Otherwise it might be worth implementing the latter first, because my guess is that the code will overlap a bit.

@mjwillson
Copy link

Just seen this and my 2 cents: I'd value being able to parallelize the data processing more highly than the resumability stuff. Resumability seems to constrain the implementation a lot and is easy to break -- in my case at least implementing it for custom datasets didn't seem worth it. Without having to worry about resumability I have more freedom, e.g. to use non-picklable things like generators to write more idiomatic iteration code.

I tried paralellising my datastream transforms using concurrent.futures.ProcessPoolExecutor.map, but the pickling overhead seems to kill it. If this more efficient serialization and zmq-based approach could generalise to a pool of worker processes that would be awesome!

It would also be awesome if ServerDataStream could handle forking the server process(es) for you :)

@dwf
Copy link
Contributor Author

dwf commented Aug 26, 2015 via email

@mjwillson
Copy link

OK, yeah that makes sense. I need some level of reproducibility too, but unfortunately also need to do some relatively expensive transformations on-the-fly, which is why I'm particularly interested in parallelisation and backgrounding the transformation work. If I could precompute them then parallelisation would be less of an issue. For now I'm using ThreadPoolExecutor.map with some Cython code that gives up the GIL which gives me a reasonable speed boost, but it's not a general solution.

(Perhaps implementing a concurrent.futures.Executor based on zmq and then making the executor pluggable could be a nice approach to both parallelisation and/or backgrounding if you do go down that road?)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants