// Copyright (C) 2023-2024 Umorpha Systems // SPDX-License-Identifier: AGPL-3.0-or-later package main import ( "context" "encoding/json" "fmt" "io" "io/fs" "log" "mime" "mime/multipart" "net/http" "net/textproto" "os" "os/exec" "path/filepath" "strings" "github.com/datawire/ocibuild/pkg/cliutil" "github.com/spf13/cobra" source "git.mothstuff.lol/lukeshu/eclipse" "git.mothstuff.lol/lukeshu/eclipse/lib/common" "git.mothstuff.lol/lukeshu/eclipse/lib/follower" ) func main() { argparser := &cobra.Command{ Use: os.Args[0] + " [flags] JOB_URL", Short: "Run a job from the leader server", Args: cliutil.WrapPositionalArgs(cobra.ExactArgs(1)), SilenceErrors: true, // we'll handle this ourselves after .ExecuteContext() SilenceUsage: true, // FlagErrorFunc will handle this CompletionOptions: cobra.CompletionOptions{ DisableDefaultCmd: true, }, } argparser.SetFlagErrorFunc(cliutil.FlagErrorFunc) argparser.SetHelpTemplate(cliutil.HelpTemplate) var cfgFile string argparser.Flags().StringVar(&cfgFile, "config", source.DefaultFollowerConfigFile, "Config file to use") _ = argparser.MarkFlagFilename("config", "yml", "yaml") argparser.RunE = func(cmd *cobra.Command, args []string) error { return Run(cfgFile, args[0]) } ctx := context.Background() if err := argparser.ExecuteContext(ctx); err != nil { fmt.Fprintf(os.Stderr, "%v: error: %v\n", argparser.CommandPath(), err) os.Exit(1) } } func Run(cfgFile, jobURL string) (err error) { maybeSetErr := func(_err error) { if err == nil && _err != nil { err = _err } } cfg, err := follower.LoadConfig(cfgFile) if err != nil { return err } var apikey string if cfg.APIKeyFile != "" { _apikey, err := os.ReadFile(cfg.APIKeyFile) if err != nil { return err } apikey = strings.TrimSpace(string(_apikey)) } tmpDir, err := os.MkdirTemp("", "eclipse-run.*") if err != nil { return err } defer func() { maybeSetErr(os.RemoveAll(tmpDir)) }() runDir := filepath.Join(tmpDir, "run") if err := os.Mkdir(runDir, 0777); err != nil { return err } artifactsDir := filepath.Join(tmpDir, "artifacts") if err := os.Mkdir(artifactsDir, 0777); err != nil { return err } pipeR, pipeW := io.Pipe() defer func() { if err != nil { pipeW.CloseWithError(err) } }() writer := multipart.NewWriter(pipeW) req, err := http.NewRequest(http.MethodPost, jobURL, pipeR) if err != nil { return err } if apikey != "" { req.Header.Set("Authorization", "Bearer "+apikey) } // Tell the server that we're including a request body, even // though we don't yet know how big it will be. Forcing // "chunked" as apposed to other encodings allows better // interrupted-connection detection. req.Header.Set("Transfer-Encoding", "chunked") // This isn't actually for the server--without this the HTTP // client library would block waiting to upload the whole // body, even if it already got an error response back. (as // of Go 1.21.6) req.Header.Set("Expect", "100-continue") req.Header.Set("Content-Type", mime.FormatMediaType("multipart/mixed", map[string]string{ "boundary": writer.Boundary(), })) httpClient := http.DefaultClient log.Printf("dialing %q...", jobURL) resp, err := httpClient.Do(req) if err != nil { return err } log.Printf("... dialed") //////////////////////////////////////////////////////////////////////// if resp.StatusCode != http.StatusOK { err := fmt.Errorf("HTTP %s", resp.Status) body, _ := io.ReadAll(resp.Body) if len(body) > 0 { err = fmt.Errorf("%w\n%s", err, body) } return err } dec := json.NewDecoder(resp.Body) log.Printf("reading job description...") var job common.Job if err := dec.Decode(&job); err != nil { return err } log.Printf("... read") hdr := make(textproto.MIMEHeader) log.Printf("running command...") hdr.Set("Content-Disposition", `attachment; filename="log.txt"`) part, err := writer.CreatePart(hdr) if err != nil { return fmt.Errorf("create part: %w", err) } cmd := exec.Command("sh", "-c", job.Command) cmd.Stdout = io.MultiWriter(part, os.Stdout) cmd.Stderr = cmd.Stdout cmd.Dir = runDir cmd.Env = append(os.Environ(), "ECLIPSE_ARTIFACTSDIR="+artifactsDir) status := "0" if err := cmd.Run(); err != nil { status = err.Error() } log.Printf("... run") log.Printf("uploading artifacts...") artifactsFS := os.DirFS(artifactsDir) if err := fs.WalkDir(artifactsFS, ".", func(path string, d fs.DirEntry, err error) error { if err != nil { return err } if d.Type() != 0 { return nil } log.Printf(" -> %q...", path) hdr.Set("Content-Disposition", mime.FormatMediaType("attachment", map[string]string{ "filename": "artifacts/" + path, })) part, err := writer.CreatePart(hdr) if err != nil { return err } fh, err := artifactsFS.Open(path) if err != nil { return err } if _, err := io.Copy(part, fh); err != nil { _ = fh.Close() return err } if err := fh.Close(); err != nil { return err } return nil }); err != nil { return err } log.Printf("... uploaded") log.Printf("closing...") hdr.Set("Content-Disposition", mime.FormatMediaType("form-data", map[string]string{ "name": "status", })) part, err = writer.CreatePart(hdr) if err != nil { return fmt.Errorf("create part: %w", err) } if _, err := io.WriteString(part, status); err != nil { return fmt.Errorf("write status: %w", err) } if err := writer.Close(); err != nil { return fmt.Errorf("write multipart closer: %w", err) } if err := pipeW.Close(); err != nil { return fmt.Errorf("write EOF: %w", err) } log.Printf("... closed") log.Printf("reading confirmation...") var tailResp struct { Status int `json:"status"` Error string `json:"error"` } if err := dec.Decode(&tailResp); err != nil { return err } _, _ = io.Copy(io.Discard, resp.Body) _ = resp.Body.Close() log.Printf("... read") if tailResp.Status != http.StatusOK { err := fmt.Errorf("HTTP %d %s", tailResp.Status, http.StatusText(tailResp.Status)) if tailResp.Error != "" { err = fmt.Errorf("%w\n%s", err, tailResp.Error) } return err } return nil }