Skip to content

Commit

Permalink
Make -w and enter-to-rerun work with subprocess spawning tasks (com-l…
Browse files Browse the repository at this point in the history
…ihaoyi#1645)

The basic issue here is that we were not properly closing the input streams generated by spawnSubprocess when `-i` is not passed, in particular the one that pumps input from the proxied stdin to the subprocess. Thus it would survive long past the lifetime of the subprocess, try to pump the proxied stdin into the subprocess's closed stdin, and fail while consuming bytes from the proxied stdin so that other consumers couldn't get at it.

The fix is twofold:

1. Pass in `checkAvailable = true` for the proxied-stdin-to-subprocess-inputstream pumper so it doesn't block on trying to read from stdin, and instead polls regularly at 2ms intervals
2. Pass a `java.util.function.BooleanSupplier runningCheck` into the pumper that can be used to abort its polling at any point, and configure it so it aborts when its destination subprocess is no longer alive.

There's a small race condition where the subprocess may be alive when the check is made, but dead when the stream data is pumped a moment later, and so I add a try-catch around the  `InputPumper.run`'s `dest.write` call to just quietly discard the input and terminate the pumper

An unrelated change, I also added nice names to all the various threads we're spawning, so it's easier to keep track of them in the `jstack`.

Tested manually via `./mill -i dev.run scratch foo.run`. On `main` without this PR, `-i -w` works:

```bash
lihaoyi mill$ ./mill -i dev.run scratch -i -w foo.run
[90/647] de.tobiasroeser.mill.vcs.version.VcsVersion.vcsState
[647/647] dev.run
[44/44] foo.run
false
Watching for changes to 4 paths... (Enter to re-run, Ctrl-C to exit)
<Enter>

[44/44] foo.run
false
Watching for changes to 4 paths... (Enter to re-run, Ctrl-C to exit)
<Enter>

[44/44] foo.run
false
Watching for changes to 4 paths... (Enter to re-run, Ctrl-C to exit)
```

But `-w` alone fails:

```bash
lihaoyi mill$ ./mill -i dev.run scratch  -w foo.run
[90/647] de.tobiasroeser.mill.vcs.version.VcsVersion.vcsState
[647/647] dev.run
[44/44] foo.run
false
Watching for changes to 4 paths... (Enter to re-run, Ctrl-C to exit)
<Enter>

.run(InputPumper.java:29)
	... 1 more
<Enter>

Exception in thread "Thread-0" java.lang.RuntimeException: java.io.IOException: Read end dead
	at mill.main.client.InputPumper.run(InputPumper.java:34)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: Read end dead
	at java.base/java.io.PipedInputStream.checkStateForReceive(PipedInputStream.java:262)
	at java.base/java.io.PipedInputStream.receive(PipedInputStream.java:226)
	at java.base/java.io.PipedOutputStream.write(PipedOutputStream.java:149)
	at mill.main.client.InputPumper.run(InputPumper.java:28)
	... 1 more
```


With this PR, both versions succeed:

```bash
lihaoyi mill$ ./mill -i dev.run scratch -i -w foo.run
[90/647] de.tobiasroeser.mill.vcs.version.VcsVersion.vcsState
[647/647] dev.run
[44/44] foo.run
false
Watching for changes to 4 paths... (Enter to re-run, Ctrl-C to exit)
<Enter>

[44/44] foo.run
false
Watching for changes to 4 paths... (Enter to re-run, Ctrl-C to exit)
<Enter>

[44/44] foo.run
false
Watching for changes to 4 paths... (Enter to re-run, Ctrl-C to exit)
```

```bash
lihaoyi mill$ ./mill -i dev.run scratch -w foo.run
[90/647] de.tobiasroeser.mill.vcs.version.VcsVersion.vcsState
[647/647] dev.run
[44/44] foo.run
false
Watching for changes to 4 paths... (Enter to re-run, Ctrl-C to exit)
<Enter>

[44/44] foo.run
false
Watching for changes to 4 paths... (Enter to re-run, Ctrl-C to exit)
<Enter>

[44/44] foo.run
false
Watching for changes to 4 paths... (Enter to re-run, Ctrl-C to exit)
<Enter>

[44/44] foo.run
false
Watching for changes to 4 paths... (Enter to re-run, Ctrl-C to exit)
```
  • Loading branch information
lihaoyi authored Dec 29, 2021
1 parent 855d440 commit 5ce1aa7
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 15 deletions.
25 changes: 21 additions & 4 deletions main/client/src/mill/main/client/InputPumper.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,43 @@ public class InputPumper implements Runnable{
private InputStream src;
private OutputStream dest;
private Boolean checkAvailable;
private java.util.function.BooleanSupplier runningCheck;
public InputPumper(InputStream src,
OutputStream dest,
Boolean checkAvailable){
this(src, dest, checkAvailable, () -> true);
}
public InputPumper(InputStream src,
OutputStream dest,
Boolean checkAvailable,
java.util.function.BooleanSupplier runningCheck){
this.src = src;
this.dest = dest;
this.checkAvailable = checkAvailable;
this.runningCheck = runningCheck;
}

boolean running = true;
public void run() {
byte[] buffer = new byte[1024];
try{
while(running){
if (checkAvailable && src.available() == 0) Thread.sleep(2);
if (!runningCheck.getAsBoolean()) {
running = false;
}
else if (checkAvailable && src.available() == 0) Thread.sleep(2);
else {
int n = src.read(buffer);
if (n == -1) running = false;
if (n == -1) {
running = false;
}
else {
dest.write(buffer, 0, n);
dest.flush();
try {
dest.write(buffer, 0, n);
dest.flush();
}catch(java.io.IOException e){
running = false;
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions main/client/src/mill/main/client/MillClientMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,9 @@ public static int run(String lockBase,
OutputStream in = ioSocket.getOutputStream();
ProxyStreamPumper outPump = new ProxyStreamPumper(outErr, stdout, stderr);
InputPumper inPump = new InputPumper(stdin, in, true);
Thread outThread = new Thread(outPump);
Thread outThread = new Thread(outPump, "outPump");
outThread.setDaemon(true);
Thread inThread = new Thread(inPump);
Thread inThread = new Thread(inPump, "inPump");
inThread.setDaemon(true);
outThread.start();
inThread.start();
Expand Down
9 changes: 5 additions & 4 deletions main/src/mill/main/MillServerMain.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package mill.main

import sun.misc.{Signal, SignalHandler}

import java.io._
import java.net.Socket

import scala.jdk.CollectionConverters._

import org.scalasbt.ipcsocket._
import mill.{BuildInfo, MillMain}
import mill.main.client._
import mill.api.DummyInputStream
import mill.main.client.lock.{Lock, Locks}

import java.util.function.Consumer

trait MillServerMain[T] {
var stateCache = Option.empty[T]
def main0(
Expand Down Expand Up @@ -140,7 +141,7 @@ class Server[T](
val pipedOutput = new PipedOutputStream()
pipedOutput.connect(pipedInput)
val pumper = new InputPumper(in, pipedOutput, false)
val pumperThread = new Thread(pumper)
val pumperThread = new Thread(pumper, "proxyInputStreamThroughPumper")
pumperThread.setDaemon(true)
pumperThread.start()
pipedInput
Expand Down Expand Up @@ -230,7 +231,7 @@ class Server[T](
// It seems OK to exit the client early and subsequently
// start up mill client again (perhaps closing the server
// socket helps speed up the process).
val t = new Thread(() => clientSocket.close())
val t = new Thread(() => clientSocket.close(), "clientSocketCloser")
t.setDaemon(true)
t.start()
} else clientSocket.close()
Expand Down
15 changes: 10 additions & 5 deletions main/src/mill/modules/Jvm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import scala.util.Using
import mill.BuildInfo
import upickle.default.{ReadWriter => RW}

import java.util.function.Consumer

object Jvm {

/**
Expand Down Expand Up @@ -200,13 +202,16 @@ object Jvm {
)

val sources = Seq(
process.stdout -> System.out,
process.stderr -> System.err,
System.in -> process.stdin
(process.stdout, System.out, "spawnSubprocess.stdout", false, () => true),
(process.stderr, System.err, "spawnSubprocess.stderr", false, () => true),
(System.in, process.stdin, "spawnSubprocess.stdin", true, () => process.isAlive())
)

for ((std, dest) <- sources) {
val t = new Thread(new InputPumper(std, dest, false))
for ((std, dest, name, checkAvailable, runningCheck) <- sources) {
val t = new Thread(
new InputPumper(std, dest, checkAvailable, () => runningCheck()),
name
)
t.setDaemon(true)
t.start()
}
Expand Down

0 comments on commit 5ce1aa7

Please sign in to comment.