massh

package module
v1.14.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2022 License: GPL-3.0 Imports: 15 Imported by: 1

README

logo

Go Test Go Report Card Go Report Card Go Doc

Description

Go package for streaming Linux distributed shell commands via SSH.

What makes Massh special is it's ability to stream & process output concurrently. See _examples/example_streaming for some sample code.

Contribute

Have a question, idea, or something you think can be improved? Open an issue or PR and let's discuss it!

Example:

package main

import (
	"fmt"
	"github.com/discoriver/massh"
	"golang.org/x/crypto/ssh"
)

func main() {
	// Create pointers to config & job
	config := massh.NewConfig()

	job := &massh.Job{
		Command: "echo hello world",
	}

	config.SetHosts([]string{"192.168.1.118"})

	// Password auth
	config.SetPasswordAuth("u01", "password")

	// Key auth in same config. Auth will try all methods provided before failing.
	err := config.SetPrivateKeyAuth("~/.ssh/id_rsa", "")
	if err != nil {
		panic(err)
	}

	config.SetJob(job)
	config.SetWorkerPool(2)
	config.SetSSHHostKeyCallback(ssh.InsecureIgnoreHostKey())

	// Make sure config will run
	config.CheckSanity()

	res, err := config.Run()
	if err != nil {
		panic(err)
	}

	for i := range res {
		fmt.Printf("%s:\n \t OUT: %s \t ERR: %v\n", res[i].Host, res[i].Output, res[i].Error)
	}
}

More examples, including this one, are available in the examples directory.

Usage:

Get the massh package;

go get github.com/DiscoRiver/massh

Documentation

Other

Bastion Host

Specify a bastion host and config with BastionHost and BastionHostSSHConfig in your massh.Config. You may leave BastionHostSSHConfig as nil, in which case SSHConfig will be used instead. The process is automatic, and if BastionHost is not nil, it will be used.

Streaming output

There is an example of streaming output in the direcotry _examples/example_streaming, which contains one method of reading from the results channel, and processing the output.

Running config.Stream() will populate the provided channel with results. Within this, there are two channels within each Result, StdOutStream and StdErrStream, which hold the stdout and stderr pipes respectively. Reading from these channels will give you the host's output/errors.

When a host has completed it's work and has exited, Result.DoneChannel will receive an empty struct. In my example, I use the following function to monitor this and report that a host has finished (see _examples/example_streaming for full program);

func readStream(res Result, wg *sync.WaitGroup) error {
	for {
		select {
		case d := <-res.StdOutStream:
			fmt.Printf("%s: %s", res.Host, d)
		case <-res.DoneChannel:
			fmt.Printf("%s: Finished\n", res.Host)
			wg.Done()
		}
	}
}

Unlike with Config.Run(), which returns a slice of Results when all hosts have exited, Config.Stream() requires some additional values to monitor host completion. For each individual host we have Result.DoneChannel, as explained above, but to detect when all hosts have finished, we have the variable NumberOfStreamingHostsCompleted, which will equal the length of Config.Hosts once everything has completed. Here is an example of what I'm using in _examples/example_streaming;

if NumberOfStreamingHostsCompleted == len(cfg.Hosts) {
		// We want to wait for all goroutines to complete before we declare that the work is finished, as
		// it's possible for us to execute this code before we've finished reading/processing all host output
		wg.Wait()

		fmt.Println("Everything returned.")
		return
}

Right now, the concurrency model used to read from the results channel is the responsibility of those using this package. An example of how this might be achieved can be found in the https://github.com/DiscoRiver/omnivore/tree/main/internal/ossh package, which is currently in development.

Documentation

Overview

Package massh provides tools for running distributed shell commands via SSH

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrJobConflict indicates that both Job and Jobstack are assigned in Config.
	ErrJobConflict = errors.New("only one of job or jobstack must be present in config")
	// ErrNoJobsSet indicates that no job is set for the config.
	ErrNoJobsSet = errors.New("no jobs are set in config")
)
View Source
var (
	// NumberOfStreamingHostsCompleted is incremented when a Result's DoneChannel is written to, indicating a host has completed it's work.
	NumberOfStreamingHostsCompleted int
)

Functions

This section is empty.

Types

type Config added in v1.0.0

type Config struct {
	Hosts     map[string]struct{}
	SSHConfig *ssh.ClientConfig

	// Jobs to execute, config will error if both are set
	Job      *Job
	JobStack *[]Job

	// Number of concurrent workers
	WorkerPool int

	BastionHost string
	// BastionHost's SSH config. If nil, Bastion will use SSHConfig instead.
	BastionHostSSHConfig *ssh.ClientConfig

	// Stream-only
	SlowTimeout     int  // Timeout for delcaring that a host is slow.
	CancelSlowHosts bool // Not implemented. Automatically cancel hosts that are flagged as slow.
	Stop            chan struct{}
}

Config is a config implementation for distributed SSH commands

func NewConfig added in v1.9.0

func NewConfig() *Config

NewConfig initialises a new massh.Config.

func (*Config) AutoCancelSlowHosts added in v1.14.1

func (c *Config) AutoCancelSlowHosts()

AutoCancelSlowHosts will cancel/terminate slow host sessions.

func (*Config) CheckSanity added in v1.0.0

func (c *Config) CheckSanity() error

CheckSanity ensures config is valid.

func (*Config) Run added in v1.0.0

func (c *Config) Run() ([]Result, error)

Run executes the config, return a slice of Results once the command has exited on all hosts.

This is a rudimentary function, and is not affected by Config.SlowTimeout or Config.CancelSlowHosts. By extension, the Results returned using Run always have an IsSlow value of false.

func (*Config) SetBastionHost added in v1.1.1

func (c *Config) SetBastionHost(host string)

SetBastionHost sets the bastion host to use for a massh config

func (*Config) SetBastionHostConfig added in v1.1.1

func (c *Config) SetBastionHostConfig(s *ssh.ClientConfig)

SetBastionHostConfig sets the bastion hosts's SSH client config. If value is left nil, SSHConfig will be used instead.

func (*Config) SetHosts added in v1.0.0

func (c *Config) SetHosts(hosts []string)

SetHosts adds a slice of strings as hosts to config. Removes duplicates.

func (*Config) SetJob added in v1.0.0

func (c *Config) SetJob(job *Job)

SetJob sets Job in Config.

func (*Config) SetPasswordAuth added in v1.0.0

func (c *Config) SetPasswordAuth(username string, password string)

SetPasswordAuth sets ssh password from provided byte slice (read from terminal)

func (*Config) SetPrivateKeyAuth added in v1.10.1

func (c *Config) SetPrivateKeyAuth(PrivateKeyFile string, PrivateKeyPassphrase string) error

SetPrivateKeyAuth takes the private key file provided, reads it, and adds the key signature to the config.

func (*Config) SetSSHAuthSock added in v1.12.0

func (c *Config) SetSSHAuthSock() error

SetSSHAuthSock uses SSH_AUTH_SOCK environment variable to populate auth method in the SSH config. Useful when using keys, and `AgentForwarding` is enabled in the local SSH config.

func (*Config) SetSSHConfig added in v1.0.0

func (c *Config) SetSSHConfig(s *ssh.ClientConfig)

SetSSHConfig sets the SSH client config for all hosts.

func (*Config) SetSSHHostKeyCallback added in v1.10.1

func (c *Config) SetSSHHostKeyCallback(callback ssh.HostKeyCallback)

SetSSHHostKeyCallback sets the HostKeyCallback for the Config's SSHConfig value.

func (*Config) SetSlowTimeout added in v1.14.1

func (c *Config) SetSlowTimeout(timeout int)

SetSlowTimeout sets the SlowTimeout value for config.

func (*Config) SetWorkerPool added in v1.0.0

func (c *Config) SetWorkerPool(numWorkers int)

SetWorkerPool populates specified number of concurrent workers in Config. It is safe for this number to be larger than the number of hosts being processed, but it must not be zero.

func (*Config) StopAllSessions added in v1.14.1

func (c *Config) StopAllSessions()

func (*Config) Stream added in v1.1.0

func (c *Config) Stream(rs chan *Result) error

Stream executes the config, and writes to rs as commands are initiated.

One result is added to the channel for each host. Streaming is performed by reading the StdOutStream and StdErrStream parameters in Result.

Example reading each result in the channel: ``` cfg.Stream(resultChan)

for {
	result := <-resultChan
	go func() {
		// do something with the result
	}()
}

```

type Job added in v1.0.0

type Job struct {
	Command string
	Script  script
}

Job is a single remote task config. For script files, use Job.SetLocalScript().

func (*Job) SetCommand added in v1.0.0

func (j *Job) SetCommand(command string)

SetCommand sets the Command value in Job. This is the Command executed over SSH to all hosts.

func (*Job) SetScript added in v1.14.0

func (j *Job) SetScript(filePath string, args ...string) error

type Result added in v1.0.0

type Result struct {
	Host   string // Hostname
	Job    string // The command that was run
	Output []byte

	// Package errors, not output from SSH. Makes the concurrency easier to manage without returning an error.
	Error error

	// Stream-specific
	IsSlow bool // Activity timeout for StdOut

	StdOutStream chan []byte
	StdErrStream chan []byte
	DoneChannel  chan struct{} // Written to when a host completes work. This does not indicate that all output from StdOutStream or StdErrStream has been read and/or processed.
}

Result contains usable output from SSH commands.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL