Skip to content

Commit

Permalink
tutorial: overhaul semaphores section
Browse files Browse the repository at this point in the history
Reworked the semaphores section to reflect my better understanding (I hope...)
of exception safety. Also fixed some errors in the example (like using
boost::irange() which we don't support because the patch to support it
was never comitted).

Signed-off-by: Nadav Har'El <[email protected]>
Message-Id: <[email protected]>
  • Loading branch information
nyh authored and avikivity committed Aug 7, 2017
1 parent 4f0c382 commit 6af6599
Showing 1 changed file with 49 additions and 37 deletions.
86 changes: 49 additions & 37 deletions doc/tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -535,9 +535,9 @@ TODO: See seastar commit "input_stream: Fix possible infinite recursion in consu
Seastar's semaphores are the standard computer-science semaphores, adapted for futures. A semaphore is a counter into which you can deposit units or take them away. Taking units from the counter may wait if not enough units are available.

## Limiting parallelism with semaphores
A common use for a semaphore in Seastar is for limiting parallelism, i.e., limiting the number of instances of some code which can run in parallel. This can be important when each of the parallel invocations uses a limited resource (e.g., memory) so letting an unlimited number of them run in parallel can exhaust this resource.
The most common use for a semaphore in Seastar is for limiting parallelism, i.e., limiting the number of instances of some code which can run in parallel. This can be important when each of the parallel invocations uses a limited resource (e.g., memory) so letting an unlimited number of them run in parallel can exhaust this resource.

Consider a case where an external source of events (e.g., incoming network requests) causes an asynchronous function ```g()``` to be called. Imagine that we want to limit the number of concurrent ```g()``` operations to 100. I.e., If g() is started when 100 other invocations are still ongoing, we want it to delay its real work until one of the other invocations has completed. We can do this with a semaphore:
Consider a case where an external source of events (e.g., an incoming network request) causes an asynchronous function ```g()``` to be called. Imagine that we want to limit the number of concurrent ```g()``` operations to 100. I.e., If g() is started when 100 other invocations are still ongoing, we want it to delay its real work until one of the other invocations has completed. We can do this with a semaphore:

```c++
seastar::future<> g() {
Expand All @@ -550,13 +550,13 @@ seastar::future<> g() {
}
```

In this example, the semaphore starts with the counter at 100. The asynchronous operation (```slow()```) is only started when we can reduce the counter by one (```wait(1)```), and when ```slow()``` is done, either successfully or with exception, the counter is increased back by one (```signal(1)```). This way, when 100 operations have already started their work and have not yet finished, the 101st operation will wait, until one of the ongoing operations finishes and returns a unit to the semaphore. This ensures that at each time we have at most 100 concurrent ```slow()``` operations running in the above code.
In this example, the semaphore starts with the counter at 100. The asynchronous operation `slow()` is only started when we can reduce the counter by one (`wait(1)`), and when `slow()` is done, either successfully or with exception, the counter is increased back by one (```signal(1)```). This way, when 100 operations have already started their work and have not yet finished, the 101st operation will wait, until one of the ongoing operations finishes and returns a unit to the semaphore. This ensures that at each time we have at most 100 concurrent `slow()` operations running in the above code.

Note how we used a ```static thread_local``` semaphore, so that all calls to ```g()``` from the same shard count towards the same limit; As usual, a Seastar application is sharded so this limit is separate per shard (CPU thread). This is usually fine, because sharded applications consider resources to be separate per shard.

Unfortunately, the above example code is actually _incorrect_. Counter-intuitively, `limit.wait(1)` can fail with an exception: when it runs out of memory to keep the list of waiters. In that case, the counter will not be decreased, but the `finally()` clause will still be run, and increase the counter!
Luckily, the above code happens to be exception safe: `limit.wait(1)` can throw an exception when it runs out of memory (keeping a list of waiters), and in that case the semaphore counter is not decreased but the continuations below are not run so it is not increased either. `limit.wait(1)` can also return an exceptional future when the semaphore is *broken* (we'll discuss this later) but in that case the extra `signal()` call is ignored. Finally, `slow()` may also throw, or return an exceptional future, but the `finally()` ensures the semaphore is still increased.

To solve this problem, we need the `finally()` to chain to the `slow()` call only, not to `limit.wait(1)`:
However, as the application code becomes more complex, it becomes harder to ensure that we never forget to call `signal()` after the operation is done, regardless of which code path or exceptions happen. As an example of what might go wrong, consider the following *buggy* code snippet, which differs subtly from the above one, and also appears, on first sight, to be correct:

```c++
seastar::future<> g() {
Expand All @@ -567,25 +567,11 @@ seastar::future<> g() {
}
```

This version also has it's own subtle problem... What if `slow()` throws an exception before returning a future? Note that this is different from `slow()` returning an exceptional future (we discussed this difference in the section about exception handling). In this case, we decreased the counter, but the `finally()` will never be reached, and the counter will never be increased back...
But this version is **not** exception safe: Consider what happens if `slow()` throws an exception before returning a future (this is different from `slow()` returning an exceptional future - we discussed this difference in the section about exception handling). In this case, we decreased the counter, but the `finally()` will never be reached, and the counter will never be increased back. There is a way to fix this code, by replacing the call to `slow()` with `seastar::futurize_apply(slow)`. But the point we're trying to make here is not how to fix buggy code, but rather that by using the separate `semaphore::wait()` and `semaphore::signal()` functions, you can very easily get things wrong.

To correctly support the case that `slow()` throws an exception, and also the case where `slow()` is not actually an asynchronous function, but rather a function which returns an non-future value, we can use the `futurize_apply()` function, which converts values and exceptions to the corresponding ready futures:
For exception safety, in C++ it is generally not recommended to have separate resource acquisition and release functions. Instead, C++ offers safer mechanisms for acquiring a resource (in this case seamphore units) and later releasing it: lambda functions, and RAII ("resource acquisition is initialization"):

```c++
future<> g() {
static thread_local seastar::semaphore limit(100);
return limit.wait(1).then([] {
return seastar::futurize_apply(slow).finally([] { limit.signal(1); });
});
}
```

This is finally a bug-free, safe, version.

As we saw now, it is not easy to safely use the separate `semaphore::wait()` and `semaphore::signal()` functions while guaranteeing we never forget to call either one. C++ offers safer mechanisms for acquiring a resource (in this case seamphore units) and later releasing it: lambda functions, and RAII (_resource acquisition is initialization_):


The lambda-based solution is an exception-safe shortcut ```with_semaphore()``` that replaces exactly the last example above:
The lambda-based solution is a function ```seastar::with_semaphore()``` which is a shortcut for the code in the examples above:

```c++
seastar::future<> g() {
Expand All @@ -596,9 +582,9 @@ seastar::future<> g() {
}
```

`with_semaphore()`, like the code above, waits for the given number of units from the semaphore, then runs the given lambda, and when the future returned by the lambda is resolved, `with_semaphore()` returns back the units to the semaphore. `with_semaphore()` returns a future which only resolves after all these steps are done.
`with_semaphore()`, like the earlier code snippets, waits for the given number of units from the semaphore, then runs the given lambda, and when the future returned by the lambda is resolved, `with_semaphore()` returns back the units to the semaphore. `with_semaphore()` returns a future which only resolves after all these steps are done.

The function `get_units()` provides a safer alternative to `semaphore`'s separate `wait()` and `signal()` functions, based on C++'s RAII philosophy: It returns an opaque units object, which while held, keeps the semaphore's counter decreased - and as soon as this object is destructed, the counter is increased back. With this interface you cannot forget to increase the counter, or increase it twice, or increase without decreasing: The counter will always be decreased once when the units object is created, and if that succeeded, increased when the object is destructed. When the units object is moved into a continuation, no matter how this continuation ends, when the continuation is destructed, the units object is destructed and the units are returned to the semaphore's counter. The above examples, written with `get_units()`, looks like this:
The function `seastar::get_units()` is more general. It provides an exception-safe alternative to `seastar::semaphore`'s separate `wait()` and `signal()` methods, based on C++'s RAII philosophy: The function returns an opaque units object, which while held, keeps the semaphore's counter decreased - and as soon as this object is destructed, the counter is increased back. With this interface you cannot forget to increase the counter, or increase it twice, or increase without decreasing: The counter will always be decreased once when the units object is created, and if that succeeded, increased when the object is destructed. When the units object is moved into a continuation, no matter how this continuation ends, when the continuation is destructed, the units object is destructed and the units are returned to the semaphore's counter. The above examples, written with `get_units()`, looks like this:

```c++
seastar::future<> g() {
Expand All @@ -609,6 +595,11 @@ seastar::future<> g() {
}
```

Note the somewhat convoluted way that `get_units()` needs to be used: The continuations must be nested because we need the `units` object to be passed to the last continuation; We need to use `futurize_apply` as explained above to allow for the possibility that the function `slow()` throws; The `finally()` continuation captures the `units` object until everything is done, but does not run any code.

Seastars programmers should generally avoid using the the `seamphore::wait()` and `semaphore::signal()` functions directly, and always prefer either `with_semaphore()` (when applicable) or `get_units()`.


## Limiting resource use
Because semaphores support waiting for any number of units, not just 1, we can use them for more than simple limiting of the *number* of parallel invocation. For example, consider we have an asynchronous function ```using_lots_of_memory(size_t bytes)```, which uses ```bytes``` bytes of memory, and we want to ensure that not more than 1 MB of memory is used by all parallel invocations of this function --- and that additional calls are delayed until previous calls have finished. We can do this with a semaphore:

Expand All @@ -621,10 +612,12 @@ seastar::future<> using_lots_of_memory(size_t bytes) {
}
```
Watch out that in the above example, a call to `using_lots_of_memory(2000000)` will return a future that never resolves, because the semaphore will never contain enough units to satisfy the semaphore wait. `using_lots_of_memory()` should probably check whether `bytes` is above the limit, and throw an exception in that case.
Watch out that in the above example, a call to `using_lots_of_memory(2000000)` will return a future that never resolves, because the semaphore will never contain enough units to satisfy the semaphore wait. `using_lots_of_memory()` should probably check whether `bytes` is above the limit, and throw an exception in that case. Seastar doesn't do this for you.
## Limiting parallelism of loops
Above, we looked at a function `g()` which gets called by some external event, and wanted to control its parallelism. In this section, we look at parallelism of loops, which also can be controlled with semaphores.
Consider the following simple loop:
```cpp
Expand Down Expand Up @@ -673,14 +666,34 @@ seastar::future<> f() {

Note how in this code we do not wait for `slow()` to complete before continuing the loop (i.e., we do not `return` the future chain starting at `slow()`); The loop continues to the next iteration when a semaphore unit becomes available, while (in our example) 99 other operations might be ongoing in the background and we do not wait for them.

The above example is incomplete, because it has a never-ending loop and the future returned by `f()` will never resolve. In more realistic cases, the loop has an end, and at the end of the loop we need to wait for all the background operations which the loop started. We can do this by ```wait()```ing on the original count of the semaphore: When the full count is finally available, it means that *all* the operations have completed. For example, the following loop ends after 456 iterations:

Note that in the examples in this section, we cannot not use the `with_semaphore()` shortcut. `with_semaphore()` returns a future which only resolves after the lambda's returned future resolves. But in the above examples, the loop needs to know when just the semaphore units are available, to start the next iteration, and not wait for the previous iteration to complete. We could not achieve that with `with_semaphore()`. But the more general exception-safe idiom, `seastar::get_units()`, can be used in this case, and is recommended:


```cpp
seastar::future<> f() {
return seastar::do_with(seastar::semaphore(100), [] (auto& limit) {
return seastar::do_for_each(boost::irange(0, 456), [&limit] (int i) {
return limit.wait(1).then([&limit] {
slow().finally([&limit] { limit.signal(1); });
return seastar::repeat([&limit] {
return seastar::get_units(limit, 1).then([] (auto units) {
slow().finally([units = std::move(units)] {});
return seastar::stop_iteration::no;
});
});
});
}
```

Note that as we already discussed above, in the general case where `slow()` may throw, we should replace in all the examples above the call to `slow()` by `seastar::futurize_apply(slow)`.

The above examples are incomplete, because they have a never-ending loop and the future returned by `f()` will never resolve. In more realistic cases, the loop has an end, and at the end of the loop we need to wait for all the background operations which the loop started. We can do this by ```wait()```ing on the original count of the semaphore: When the full count is finally available, it means that *all* the operations have completed. For example, the following loop ends after 456 iterations:

```cpp
seastar::future<> f() {
return seastar::do_with(seastar::semaphore(100), [] (auto& limit) {
return seastar::do_for_each(boost::counting_iterator<int>(0),
boost::counting_iterator<int>(456), [&limit] (int i) {
return seastar::get_units(limit, 1).then([] (auto units) {
seastar::futurize_apply(slow).finally([units = std::move(units)] {});
});
}).finally([&limit] {
return limit.wait(100);
Expand All @@ -689,19 +702,19 @@ seastar::future<> f() {
}
````

The last `finally` is what ensures we wait for the last operations to complete: After the `repeat` loop ends (whether successfully or prematurely because of an exception in one of the iterations), we do a `wait(100)` to wait for the semaphore to reach its original value 100, meaning that all operations that we started have completed. Without this `finally`, the future returned by `f()` will resolve *before* all the iterations of the loop actually completed (the last 100 may still be running).
The last `finally` is what ensures that we wait for the last operations to complete: After the `repeat` loop ends (whether successfully or prematurely because of an exception in one of the iterations), we do a `wait(100)` to wait for the semaphore to reach its original value 100, meaning that all operations that we started have completed. Without this `finally`, the future returned by `f()` will resolve *before* all the iterations of the loop actually completed (the last 100 may still be running).

In the idiom we saw in the above example, the same semaphore is used both for limiting the number of background operations, and later to wait for all of them to complete. Sometimes, we want several different loops to use the same semaphore to limit their total parallelism. In that case we must use a separate mechanism for waiting for the completion of the background operations started by the loop. The most convenient way to wait for ongoing operations is using a gate, which we will describe in detail later. A typical example of a loop whose parallelism is limited by an external semaphore:

```cpp
thread_local seastar::semaphore limit(100);
seastar::future<> f() {
return seastar::do_with(seastar::gate(), [] (auto& gate) {
return seastar::do_for_each(boost::irange(0, 456), [&gate] (int i) {
return limit.wait(1).then([&gate] {
return seastar::do_for_each(boost::counting_iterator<int>(0),
boost::counting_iterator<int>(456), [&gate] (int i) {
return seastar::get_units(limit, 1).then([&gate] (auto units) {
gate.enter();
slow().finally([&gate] {
limit.signal(1);
seastar::futurize_apply(slow).finally([&gate, units = std::move(units)] {
gate.leave();
});
});
Expand All @@ -711,10 +724,9 @@ seastar::future<> f() {
});
}
```
In this code, we use the external semaphore `limit` to limit the number of concurrent operations, but additionally have a gate specific to this loop to help us wait for all ongoing operations to complete.
Note that in the above examples, we could not use the `with_semaphore()` shortcut. `with_semaphore()` returns a future which only resolves after the lambda's returned future resolves. But in the above examples, the loop needs to know when just the semaphore units are available, to start the next iteration, and not wait for the previous iteration to complete. We could not achieve that with `with_semaphore()`.
TODO: also allow `get_units()` or something similar on a gate, and use that instead of the explicit gate.enter/gate.leave.
TODO: say something about semaphore fairness - if someone is waiting for a lot of units and later someone asks for 1 unit, will both wait or will the request for 1 unit be satisfied?
Expand Down Expand Up @@ -969,7 +981,7 @@ seastar::future<> handle_connection(seastar::connected_socket s,
return out.write(std::move(buf)).then([&out] {
return out.flush();
}).then([] {
return stop_iteration::no;
return seastar::stop_iteration::no;
});
} else {
return seastar::make_ready_future<seastar::stop_iteration>(
Expand Down

0 comments on commit 6af6599

Please sign in to comment.