Skip to content

Commit

Permalink
Improve tsh ssh parallel output (gravitational#33429)
Browse files Browse the repository at this point in the history
This change improves the output of tsh ssh when running on multiple
nodes. Stdout and stderr are now labeled with the hostname of the
node they came from. The --log-dir flag on tsh ssh will create a
directory where the separated output of each command will be stored.
atburke authored Oct 30, 2023

Verified

This commit was signed with the committer’s verified signature. The key has expired.
pdehaan Peter deHaan
1 parent 5ce0c33 commit 34492de
Showing 8 changed files with 541 additions and 136 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Changelog

## 15.0.0 (xx/xx/24)

### Breaking changes

#### `tsh ssh`

When running a command on multiple nodes with `tsh ssh`, each line of output
is now labeled with the hostname of the node it was written by. Users that
rely on parsing the output from multiple nodes should pass the `--log-dir` flag
to `tsh ssh`, which will create a directory where the separated output of each node
will be written.

## 14.0.0 (09/20/23)

Teleport 14 brings the following new major features and improvements:
38 changes: 21 additions & 17 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
@@ -6107,28 +6107,28 @@ func testCmdLabels(t *testing.T, suite *integrationTestSuite) {
// test label patterns that match both nodes, and each
// node individually.
tts := []struct {
desc string
command []string
labels map[string]string
expect string
desc string
command []string
labels map[string]string
expectLines []string
}{
{
desc: "Both",
command: []string{"echo", "two"},
labels: map[string]string{"spam": "eggs"},
expect: "two\ntwo\n",
desc: "Both",
command: []string{"echo", "two"},
labels: map[string]string{"spam": "eggs"},
expectLines: []string{"[server-01] two", "[server-02] two"},
},
{
desc: "Worker only",
command: []string{"echo", "worker"},
labels: map[string]string{"role": "worker"},
expect: "worker\n",
desc: "Worker only",
command: []string{"echo", "worker"},
labels: map[string]string{"role": "worker"},
expectLines: []string{"worker"},
},
{
desc: "Database only",
command: []string{"echo", "database"},
labels: map[string]string{"role": "database"},
expect: "database\n",
desc: "Database only",
command: []string{"echo", "database"},
labels: map[string]string{"role": "database"},
expectLines: []string{"database"},
},
}

@@ -6142,7 +6142,11 @@ func testCmdLabels(t *testing.T, suite *integrationTestSuite) {

output, err := runCommand(t, teleport, tt.command, cfg, 1)
require.NoError(t, err)
require.Equal(t, tt.expect, output)
outputLines := strings.Split(strings.TrimSpace(output), "\n")
require.Len(t, outputLines, len(tt.expectLines))
for _, line := range tt.expectLines {
require.Contains(t, outputLines, line)
}
})
}
}
152 changes: 130 additions & 22 deletions lib/client/api.go
Original file line number Diff line number Diff line change
@@ -471,6 +471,10 @@ type Config struct {
// WebauthnLogin allows tests to override the Webauthn Login func.
// Defaults to [wancli.Login].
WebauthnLogin WebauthnLoginFunc

// SSHLogDir is the directory to log the output of multiple SSH commands to.
// If not set, no logs will be created.
SSHLogDir string
}

// CachePolicy defines cache policy for local clients
@@ -1262,9 +1266,14 @@ func (tc *TeleportClient) RootClusterName(ctx context.Context) (string, error) {
return name, nil
}

type targetNode struct {
hostname string
addr string
}

// getTargetNodes returns a list of node addresses this SSH command needs to
// operate on.
func (tc *TeleportClient) getTargetNodes(ctx context.Context, clt client.GetResourcesClient, options SSHOptions) ([]string, error) {
func (tc *TeleportClient) getTargetNodes(ctx context.Context, clt client.GetResourcesClient, options SSHOptions) ([]targetNode, error) {
ctx, span := tc.Tracer.Start(
ctx,
"teleportClient/getTargetNodes",
@@ -1273,7 +1282,12 @@ func (tc *TeleportClient) getTargetNodes(ctx context.Context, clt client.GetReso
defer span.End()

if options.HostAddress != "" {
return []string{options.HostAddress}, nil
return []targetNode{
{
hostname: options.HostAddress,
addr: options.HostAddress,
},
}, nil
}

// use the target node that was explicitly provided if valid
@@ -1286,7 +1300,12 @@ func (tc *TeleportClient) getTargetNodes(ctx context.Context, clt client.GetReso
}

addr := net.JoinHostPort(tc.Host, strconv.Itoa(tc.HostPort))
return []string{addr}, nil
return []targetNode{
{
hostname: tc.Host,
addr: addr,
},
}, nil
}

// find the nodes matching the labels that were provided
@@ -1295,10 +1314,13 @@ func (tc *TeleportClient) getTargetNodes(ctx context.Context, clt client.GetReso
return nil, trace.Wrap(err)
}

retval := make([]string, 0, len(nodes))
retval := make([]targetNode, 0, len(nodes))
for _, resource := range nodes {
// always dial nodes by UUID
retval = append(retval, fmt.Sprintf("%s:0", resource.GetName()))
retval = append(retval, targetNode{
hostname: resource.GetHostname(),
addr: fmt.Sprintf("%s:0", resource.GetName()),
})
}

return retval, nil
@@ -1556,7 +1578,7 @@ func (tc *TeleportClient) SSH(ctx context.Context, command []string, runLocally
if len(nodeAddrs) > 1 {
return tc.runShellOrCommandOnMultipleNodes(ctx, clt, nodeAddrs, command)
}
return tc.runShellOrCommandOnSingleNode(ctx, clt, nodeAddrs[0], command, runLocally)
return tc.runShellOrCommandOnSingleNode(ctx, clt, nodeAddrs[0].addr, command, runLocally)
}

// ConnectToNode attempts to establish a connection to the node resolved to by the provided
@@ -1565,7 +1587,7 @@ func (tc *TeleportClient) SSH(ctx context.Context, command []string, runLocally
// fail the error from the connection attempt with the already provisioned certificates will
// be returned. The client from whichever attempt succeeds first will be returned.
func (tc *TeleportClient) ConnectToNode(ctx context.Context, clt *ClusterClient, nodeDetails NodeDetails, user string) (*NodeClient, error) {
node := nodeName(nodeDetails.Addr)
node := nodeName(targetNode{addr: nodeDetails.Addr})
ctx, span := tc.Tracer.Start(
ctx,
"teleportClient/ConnectToNode",
@@ -1615,7 +1637,8 @@ func (tc *TeleportClient) ConnectToNode(ctx context.Context, clt *ClusterClient,
}

sshConfig := clt.ProxyClient.SSHConfig(user)
clt, err := NewNodeClient(ctx, sshConfig, conn, nodeDetails.ProxyFormat(), nodeDetails.Addr, tc, details.FIPS)
clt, err := NewNodeClient(ctx, sshConfig, conn, nodeDetails.ProxyFormat(), nodeDetails.Addr, tc, details.FIPS,
WithNodeHostname(nodeDetails.hostname), WithSSHLogDir(tc.SSHLogDir))
directResultC <- clientRes{clt: clt, err: err}
}()

@@ -1715,7 +1738,7 @@ func (m MFARequiredUnknownErr) Is(err error) bool {
// if it is required, then the mfa ceremony is attempted. The target host is dialed once the ceremony
// completes and new certificates are retrieved.
func (tc *TeleportClient) connectToNodeWithMFA(ctx context.Context, clt *ClusterClient, nodeDetails NodeDetails, user string) (*NodeClient, error) {
node := nodeName(nodeDetails.Addr)
node := nodeName(targetNode{addr: nodeDetails.Addr})
ctx, span := tc.Tracer.Start(
ctx,
"teleportClient/connectToNodeWithMFA",
@@ -1750,7 +1773,8 @@ func (tc *TeleportClient) connectToNodeWithMFA(ctx context.Context, clt *Cluster
return nil, trace.Wrap(err)
}

nodeClient, err := NewNodeClient(ctx, cfg, conn, nodeDetails.ProxyFormat(), nodeDetails.Addr, tc, details.FIPS)
nodeClient, err := NewNodeClient(ctx, cfg, conn, nodeDetails.ProxyFormat(), nodeDetails.Addr, tc, details.FIPS,
WithNodeHostname(nodeDetails.hostname), WithSSHLogDir(tc.SSHLogDir))
return nodeClient, trace.Wrap(err)
}

@@ -1823,8 +1847,12 @@ func (tc *TeleportClient) runShellOrCommandOnSingleNode(ctx context.Context, clt
return trace.Wrap(nodeClient.RunInteractiveShell(ctx, types.SessionPeerMode, nil, nil))
}

func (tc *TeleportClient) runShellOrCommandOnMultipleNodes(ctx context.Context, clt *ClusterClient, nodeAddrs []string, command []string) error {
func (tc *TeleportClient) runShellOrCommandOnMultipleNodes(ctx context.Context, clt *ClusterClient, nodes []targetNode, command []string) error {
cluster := clt.ClusterName()
nodeAddrs := make([]string, 0, len(nodes))
for _, node := range nodes {
nodeAddrs = append(nodeAddrs, node.addr)
}
ctx, span := tc.Tracer.Start(
ctx,
"teleportClient/runShellOrCommandOnMultipleNodes",
@@ -1839,7 +1867,7 @@ func (tc *TeleportClient) runShellOrCommandOnMultipleNodes(ctx context.Context,
// There was a command provided, run a non-interactive session against each match
if len(command) > 0 {
fmt.Printf("\x1b[1mWARNING\x1b[0m: Multiple nodes matched label selector, running command on all.\n")
return tc.runCommandOnNodes(ctx, clt, nodeAddrs, command)
return tc.runCommandOnNodes(ctx, clt, nodes, command)
}

// Issue "shell" request to the first matching node.
@@ -2666,8 +2694,13 @@ func commandLimit(ctx context.Context, getter roleGetter, mfaRequired bool) int
}
}

type execResult struct {
hostname string
exitStatus int
}

// runCommandOnNodes executes a given bash command on a bunch of remote nodes.
func (tc *TeleportClient) runCommandOnNodes(ctx context.Context, clt *ClusterClient, nodeAddresses []string, command []string) error {
func (tc *TeleportClient) runCommandOnNodes(ctx context.Context, clt *ClusterClient, nodes []targetNode, command []string) error {
cluster := clt.ClusterName()
ctx, span := tc.Tracer.Start(
ctx,
@@ -2685,7 +2718,7 @@ func (tc *TeleportClient) runCommandOnNodes(ctx context.Context, clt *ClusterCli
mfaRequiredCheck, err := clt.AuthClient.IsMFARequired(ctx, &proto.IsMFARequiredRequest{
Target: &proto.IsMFARequiredRequest_Node{
Node: &proto.NodeLogin{
Node: nodeName(nodeAddresses[0]),
Node: nodeName(targetNode{addr: nodes[0].addr}),
Login: tc.Config.HostLogin,
},
},
@@ -2694,43 +2727,118 @@ func (tc *TeleportClient) runCommandOnNodes(ctx context.Context, clt *ClusterCli
return trace.Wrap(err)
}

if tc.SSHLogDir != "" {
if err := os.MkdirAll(tc.SSHLogDir, 0700); err != nil {
return trace.ConvertSystemError(err)
}
}

resultsCh := make(chan execResult, len(nodes))

g, gctx := errgroup.WithContext(ctx)
g.SetLimit(commandLimit(ctx, clt.AuthClient, mfaRequiredCheck.Required))
for _, address := range nodeAddresses {
address := address
for _, node := range nodes {
node := node
g.Go(func() error {
ctx, span := tc.Tracer.Start(
gctx,
"teleportClient/executingCommand",
oteltrace.WithSpanKind(oteltrace.SpanKindClient),
oteltrace.WithAttributes(attribute.String("node", address)),
oteltrace.WithAttributes(attribute.String("node", node.addr)),
)
defer span.End()

nodeClient, err := tc.ConnectToNode(
ctx,
clt,
NodeDetails{
Addr: address,
Addr: node.addr,
Namespace: tc.Namespace,
Cluster: cluster,
MFACheck: mfaRequiredCheck,
hostname: node.hostname,
},
tc.Config.HostLogin,
)
if err != nil {
// Returning the error here would cancel all the other goroutines, so
// print the error instead to let them all finish.
fmt.Fprintln(tc.Stderr, err)
return trace.Wrap(err)
return nil
}
defer nodeClient.Close()

fmt.Printf("Running command on %v:\n", nodeName(address))
displayName := nodeName(node)
fmt.Printf("Running command on %v:\n", displayName)

return trace.Wrap(nodeClient.RunCommand(ctx, command))
if err := nodeClient.RunCommand(ctx, command, WithLabeledOutput()); err != nil && tc.ExitStatus == 0 {
fmt.Fprintln(tc.Stderr, err)
return nil
}
resultsCh <- execResult{
hostname: displayName,
exitStatus: tc.ExitStatus,
}
return nil
})
}

return trace.Wrap(g.Wait())
// Non-command-related errors will have already been reported by the goroutines,
// and command-related errors will be reported in writeCommandResults.
g.Wait()

close(resultsCh)
results := make([]execResult, 0, len(resultsCh))
for result := range resultsCh {
results = append(results, result)
}

return trace.Wrap(tc.writeCommandResults(results))
}

func (tc *TeleportClient) writeCommandResults(nodes []execResult) error {
fmt.Println()
var succeededNodes []string
var failedNodes []string
for _, node := range nodes {
if node.exitStatus != 0 {
failedNodes = append(failedNodes, node.hostname)
fmt.Printf("[%v] failed with exit code %d\n", node.hostname, node.exitStatus)
} else {
succeededNodes = append(succeededNodes, node.hostname)
fmt.Printf("[%v] success\n", node.hostname)
}
}
fmt.Printf("\n%d host(s) succeeded; %d host(s) failed\n", len(succeededNodes), len(failedNodes))

if tc.SSHLogDir != "" {
if len(succeededNodes) > 0 {
successFile, err := os.Create(filepath.Join(tc.SSHLogDir, "hosts.succeeded"))
if err != nil {
return trace.Wrap(err)
}
defer successFile.Close()
for _, node := range succeededNodes {
fmt.Fprintln(successFile, node)
}
}

if len(failedNodes) > 0 {
failFile, err := os.Create(filepath.Join(tc.SSHLogDir, "hosts.failed"))
if err != nil {
return trace.Wrap(err)
}
defer failFile.Close()
for _, node := range failedNodes {
fmt.Fprintln(failFile, node)
}
}
}

if len(failedNodes) > 0 {
return trace.Errorf("%d command(s) failed", len(failedNodes))
}
return nil
}

func (tc *TeleportClient) newSessionEnv() map[string]string {
Loading

0 comments on commit 34492de

Please sign in to comment.