eclipse/cmd/eclipse-run/main.go

296 lines
6.9 KiB
Go
Raw Permalink Normal View History

2024-01-18 07:43:02 +00:00
// Copyright (C) 2023-2024 Umorpha Systems
// SPDX-License-Identifier: AGPL-3.0-or-later
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"io/fs"
2024-01-18 21:17:26 +00:00
"log"
2024-01-18 07:43:02 +00:00
"mime"
"mime/multipart"
"net"
2024-01-18 07:43:02 +00:00
"net/http"
"net/textproto"
"os"
"os/exec"
2024-01-20 16:44:20 +00:00
"path/filepath"
2024-01-19 21:02:27 +00:00
"strings"
"sync/atomic"
"time"
2024-01-18 07:43:02 +00:00
"github.com/datawire/ocibuild/pkg/cliutil"
"github.com/spf13/cobra"
source "git.mothstuff.lol/lukeshu/eclipse"
"git.mothstuff.lol/lukeshu/eclipse/lib/common"
2024-01-19 21:02:27 +00:00
"git.mothstuff.lol/lukeshu/eclipse/lib/follower"
2024-01-18 07:43:02 +00:00
)
func main() {
argparser := &cobra.Command{
Use: os.Args[0] + " [flags] JOB_URL",
2024-01-20 21:08:44 +00:00
Short: "Run a job from the leader server",
2024-01-18 07:43:02 +00:00
Args: cliutil.WrapPositionalArgs(cobra.ExactArgs(1)),
SilenceErrors: true, // we'll handle this ourselves after .ExecuteContext()
SilenceUsage: true, // FlagErrorFunc will handle this
CompletionOptions: cobra.CompletionOptions{
DisableDefaultCmd: true,
},
}
argparser.SetFlagErrorFunc(cliutil.FlagErrorFunc)
argparser.SetHelpTemplate(cliutil.HelpTemplate)
2024-01-19 21:02:27 +00:00
var cfgFile string
argparser.Flags().StringVar(&cfgFile, "config", source.DefaultFollowerConfigFile,
"Config file to use")
_ = argparser.MarkFlagFilename("config", "yml", "yaml")
2024-01-18 07:43:02 +00:00
argparser.RunE = func(cmd *cobra.Command, args []string) error {
2024-01-19 21:02:27 +00:00
return Run(cfgFile, args[0])
2024-01-18 07:43:02 +00:00
}
ctx := context.Background()
if err := argparser.ExecuteContext(ctx); err != nil {
fmt.Fprintf(os.Stderr, "%v: error: %v\n", argparser.CommandPath(), err)
os.Exit(1)
}
}
type ConnTracker struct {
Inner func(ctx context.Context, network, address string) (net.Conn, error)
cnt int64
}
func (ct *ConnTracker) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
ret, err := ct.Inner(ctx, network, address)
if err != nil {
return nil, err
}
atomic.AddInt64(&ct.cnt, 1)
return &ctConn{
Conn: ret,
cnt: &ct.cnt,
}, nil
}
type ctConn struct {
net.Conn
cnt *int64
}
func (c *ctConn) Close() error {
ret := c.Conn.Close()
atomic.AddInt64(c.cnt, -1)
return ret
}
func (ct *ConnTracker) NumOpenConnections() int64 {
return atomic.LoadInt64(&ct.cnt)
}
2024-01-19 21:02:27 +00:00
func Run(cfgFile, jobURL string) (err error) {
2024-01-18 07:43:02 +00:00
maybeSetErr := func(_err error) {
if err == nil && _err != nil {
err = _err
}
}
2024-01-19 21:02:27 +00:00
cfg, err := follower.LoadConfig(cfgFile)
if err != nil {
return err
}
var apikey string
if cfg.APIKeyFile != "" {
_apikey, err := os.ReadFile(cfg.APIKeyFile)
if err != nil {
return err
}
apikey = strings.TrimSpace(string(_apikey))
}
2024-01-18 07:43:02 +00:00
2024-01-20 16:44:20 +00:00
tmpDir, err := os.MkdirTemp("", "eclipse-run.*")
2024-01-18 07:43:02 +00:00
if err != nil {
return err
}
2024-01-20 16:44:20 +00:00
defer func() { maybeSetErr(os.RemoveAll(tmpDir)) }()
runDir := filepath.Join(tmpDir, "run")
if err := os.Mkdir(runDir, 0777); err != nil {
return err
}
artifactsDir := filepath.Join(tmpDir, "artifacts")
if err := os.Mkdir(artifactsDir, 0777); err != nil {
return err
}
2024-01-18 07:43:02 +00:00
pipeR, pipeW := io.Pipe()
defer func() {
if err != nil {
pipeW.CloseWithError(err)
}
}()
writer := multipart.NewWriter(pipeW)
req, err := http.NewRequest(http.MethodPost, jobURL, pipeR)
if err != nil {
return err
}
2024-01-19 04:54:39 +00:00
2024-01-19 21:02:27 +00:00
if apikey != "" {
req.Header.Set("Authorization", "Bearer "+apikey)
}
2024-01-19 04:54:39 +00:00
// Tell the server that we're including a request body, even
// though we don't yet know how big it will be. Forcing
// "chunked" as apposed to other encodings allows better
// interrupted-connection detection.
2024-01-19 01:23:49 +00:00
req.Header.Set("Transfer-Encoding", "chunked")
2024-01-19 04:54:39 +00:00
// This isn't actually for the server--without this the HTTP
// client library would block waiting to upload the whole
// body, even if it already got an error response back. (as
// of Go 1.21.6)
req.Header.Set("Expect", "100-continue")
2024-01-18 07:43:02 +00:00
req.Header.Set("Content-Type", mime.FormatMediaType("multipart/mixed", map[string]string{
"boundary": writer.Boundary(),
}))
connTracker := &ConnTracker{
// This is the same as the dialer in http.DefaultTransport.
Inner: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
}
httpClient := &http.Client{
// This is the same as http.DefaultTransport, except that the
// DialContext has been wrapped in ConnTracker.
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: connTracker.DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}
2024-01-18 21:17:26 +00:00
log.Printf("dialing %q...", jobURL)
resp, err := httpClient.Do(req)
2024-01-18 07:43:02 +00:00
if err != nil {
return err
}
2024-01-18 21:17:26 +00:00
log.Printf("... dialed")
2024-01-18 07:43:02 +00:00
////////////////////////////////////////////////////////////////////////
if resp.StatusCode != http.StatusOK {
2024-01-18 21:17:26 +00:00
err := fmt.Errorf("HTTP %s", resp.Status)
body, _ := io.ReadAll(resp.Body)
if len(body) > 0 {
err = fmt.Errorf("%w\n%s", err, body)
}
return err
2024-01-18 07:43:02 +00:00
}
dec := json.NewDecoder(resp.Body)
2024-01-18 21:17:26 +00:00
log.Printf("reading job description...")
var job common.Job
2024-01-18 07:43:02 +00:00
if err := dec.Decode(&job); err != nil {
return err
}
go func() {
_, _ = io.Copy(io.Discard, resp.Body)
_ = resp.Body.Close()
}()
2024-01-18 21:17:26 +00:00
log.Printf("... read")
2024-01-18 07:43:02 +00:00
hdr := make(textproto.MIMEHeader)
2024-01-19 01:23:49 +00:00
log.Printf("running command...")
2024-01-18 21:17:26 +00:00
hdr.Set("Content-Disposition", `attachment; filename="log.txt"`)
part, err := writer.CreatePart(hdr)
if err != nil {
2024-01-19 01:23:49 +00:00
return fmt.Errorf("create part: %w", err)
2024-01-18 21:17:26 +00:00
}
cmd := exec.Command("sh", "-c", job.Command)
cmd.Stdout = part
cmd.Stderr = part
2024-01-20 16:44:20 +00:00
cmd.Dir = runDir
2024-01-18 21:17:26 +00:00
cmd.Env = append(os.Environ(),
2024-01-19 01:23:49 +00:00
"ECLIPSE_ARTIFACTSDIR="+artifactsDir)
2024-01-18 21:17:26 +00:00
status := "0"
if err := cmd.Run(); err != nil {
status = err.Error()
}
2024-01-19 01:23:49 +00:00
log.Printf("... run")
2024-01-18 21:17:26 +00:00
2024-01-19 01:23:49 +00:00
log.Printf("uploading artifacts...")
artifactsFS := os.DirFS(artifactsDir)
if err := fs.WalkDir(artifactsFS, ".", func(path string, d fs.DirEntry, err error) error {
2024-01-18 21:17:26 +00:00
if err != nil {
return err
}
if d.Type() != 0 {
return nil
}
2024-01-19 01:23:49 +00:00
log.Printf(" -> %q...", path)
2024-01-18 21:17:26 +00:00
hdr.Set("Content-Disposition", mime.FormatMediaType("attachment", map[string]string{
2024-01-19 01:23:49 +00:00
"filename": "artifacts/" + path,
2024-01-18 21:17:26 +00:00
}))
2024-01-18 07:43:02 +00:00
part, err := writer.CreatePart(hdr)
if err != nil {
return err
}
2024-01-19 01:23:49 +00:00
fh, err := artifactsFS.Open(path)
2024-01-18 07:43:02 +00:00
if err != nil {
return err
}
2024-01-18 21:17:26 +00:00
if _, err := io.Copy(part, fh); err != nil {
_ = fh.Close()
return err
2024-01-18 07:43:02 +00:00
}
2024-01-18 21:17:26 +00:00
if err := fh.Close(); err != nil {
2024-01-18 07:43:02 +00:00
return err
}
2024-01-18 21:17:26 +00:00
return nil
}); err != nil {
return err
2024-01-18 07:43:02 +00:00
}
2024-01-19 01:23:49 +00:00
log.Printf("... uploaded")
2024-01-18 07:43:02 +00:00
2024-01-23 03:56:09 +00:00
log.Printf("closing...")
2024-01-18 21:17:26 +00:00
hdr.Set("Content-Disposition", mime.FormatMediaType("form-data", map[string]string{
"name": "status",
}))
part, err = writer.CreatePart(hdr)
if err != nil {
return err
}
if _, err := io.WriteString(part, status); err != nil {
2024-01-18 07:43:02 +00:00
return err
}
2024-01-18 21:17:26 +00:00
if err := writer.Close(); err != nil {
return err
}
2024-01-23 04:02:53 +00:00
if err := pipeW.Close(); err != nil {
return err
}
// Wait for the HTTP client to finish pumping the data between
// buffers and the network.
for connTracker.NumOpenConnections() > 0 {
httpClient.CloseIdleConnections()
}
2024-01-23 03:56:09 +00:00
log.Printf("... closed")
2024-01-18 07:43:02 +00:00
return nil
}