// Copyright (C) 2023-2024 Umorpha Systems // SPDX-License-Identifier: AGPL-3.0-or-later package main import ( "context" "embed" "encoding/json" "errors" "fmt" "html/template" "io" "io/fs" "log" "mime" "mime/multipart" "net/http" "net/url" "os" "path" "path/filepath" "strconv" "strings" "time" "git.lukeshu.com/go/containers/typedsync" "github.com/datawire/ocibuild/pkg/cliutil" "github.com/golang/gddo/httputil" "github.com/spf13/cobra" source "git.mothstuff.lol/lukeshu/eclipse" "git.mothstuff.lol/lukeshu/eclipse/lib/apikeys" "git.mothstuff.lol/lukeshu/eclipse/lib/common" "git.mothstuff.lol/lukeshu/eclipse/lib/leader" ) func main() { argparser := &cobra.Command{ Use: os.Args[0] + " [flags]", Short: "The leader HTTP server", Args: cliutil.WrapPositionalArgs(cobra.NoArgs), SilenceErrors: true, // we'll handle this ourselves after .ExecuteContext() SilenceUsage: true, // FlagErrorFunc will handle this CompletionOptions: cobra.CompletionOptions{ DisableDefaultCmd: true, }, } argparser.SetFlagErrorFunc(cliutil.FlagErrorFunc) argparser.SetHelpTemplate(cliutil.HelpTemplate) var cfgFile string argparser.Flags().StringVar(&cfgFile, "config", source.DefaultLeaderConfigFile, "Config file to use") _ = argparser.MarkFlagFilename("config", "yml", "yaml") var socket string argparser.Flags().StringVar(&socket, "socket", "tcp::8080", "the socket to listen on; a \"family:address\" pair, where the\n"+ "\"family\" is one of \"tcp\", \"tcp4\", \"tcp6\", \"unix\", \"unixpacket\"\n"+ "(the usual Go net.Listen() network names), or \"fd\" (to listen\n"+ "on an already-open file descriptor). The \"address\" part for\n"+ "the \"fd\" family is one of\n"+ " 1. a file descriptor number (or the special names \"stdin\",\n"+ " \"stdout\", or \"stderr\" to refer to 0, 1, or 2\n"+ " respectively),\n"+ " 2. \"systemd:N\" to refer to the Nth file descriptor passed in\n"+ " via systemd (or \"systemd\" as shorthand for \"systemd:0\")\n"+ " (see sd_listend_fds(3)),\n"+ " 3. \"systemd:NAME\" to refer to a named file descriptor passed\n"+ " in via systemd (see sd_listend_fds_with_names(3)).\n") argparser.RunE = func(cmd *cobra.Command, args []string) error { stype, saddr, ok := strings.Cut(socket, ":") if !ok { return cliutil.FlagErrorFunc(cmd, fmt.Errorf("invalid address: %q", socket)) } return Serve(cfgFile, stype, saddr) } ctx := context.Background() if err := argparser.ExecuteContext(ctx); err != nil { fmt.Fprintf(os.Stderr, "%v: error: %v\n", argparser.CommandPath(), err) os.Exit(1) } } func Serve(cfgFile, stype, saddr string) (err error) { maybeSetErr := func(_err error) { if err == nil && _err != nil { err = _err } } cfg, err := leader.LoadConfig(cfgFile) if err != nil { return err } pubKey, err := apikeys.LoadLeaderPubKey(cfg.PubKeyFile) if err != nil { return fmt.Errorf("load pubkey: %w", err) } db, err := cfg.DB() if err != nil { return err } defer func() { maybeSetErr(db.Close()) }() sock, err := netListen(stype, saddr) if err != nil { return err } // Do this *after* listening, so that the socket can act as a // mutex. if err := db.GCRunningJobs(); err != nil { return err } // Even tnough most of the routing is happening in big // `switch` statements, the ServeMux is important because it // cleans the paths, which would sort of be a pain to do // ourselves. router := http.NewServeMux() router.Handle("/", &ServerState{db: db, pubKey: pubKey}) router.Handle("/webhooks/", http.StripPrefix("/webhooks/", &Webhooks{pubKey: pubKey, cfgFile: cfgFile})) log.Printf("Serving on %v...", sock.Addr()) return http.Serve(sock, router) } // HTTP utils ////////////////////////////////////////////////////////////////// func httpAddSlash(w http.ResponseWriter, r *http.Request) { u := &url.URL{Path: r.URL.Path + "/", RawQuery: r.URL.RawQuery} http.Redirect(w, r, u.String(), http.StatusMovedPermanently) } type _httpError struct { Err error Code int } func (e *_httpError) Error() string { return fmt.Sprintf("HTTP %d: %v", e.Code, e.Err) } func httpError(err error, code int) *_httpError { return &_httpError{Code: code, Err: err} } const httpStatusWriteError = -100 func writeError(err error) *_httpError { return &_httpError{Code: httpStatusWriteError, Err: err} } func writeOrServerError(err error) *_httpError { return &_httpError{Code: http.StatusInternalServerError, Err: err} } func httpNotFound() *_httpError { return httpError(errors.New("404 page not found"), http.StatusNotFound) } func requireAuthorization(pubKey apikeys.PublicKey, w http.ResponseWriter, r *http.Request) *_httpError { key := strings.TrimSpace(strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer ")) var keyErr error if key == "" { keyErr = fmt.Errorf("no apikey token") } else { keyErr = apikeys.ValidateAPIKey(pubKey, key) } if keyErr == nil { return nil } w.Header().Set("WWW-Authenticate", "Bearer") return httpError(keyErr, http.StatusUnauthorized) } // ServerState ///////////////////////////////////////////////////////////////// type ServerState struct { db *leader.DB pubKey apikeys.PublicKey running typedsync.Map[common.JobID, chan struct{}] } //go:embed style.css var fileStyle []byte //go:embed favicon.ico var fileFavicon []byte //go:embed html var tmplFS embed.FS func pageTemplate(pageName string) *template.Template { return template.Must(template.New(pageName). Funcs(template.FuncMap{ "fmtDatetime": func(t time.Time) template.HTML { return template.HTML(fmt.Sprintf(``, t.Format(time.RFC3339), t.Format("2006-01-02 15:04:05Z07:00"), t)) }, // html/template strips comments by default, so we have to do this to wrap // it in [template.HTML] instead of putting it directly in // `./html/lib/page.html.tmpl`. (We could instead wrap just the stuff // inside the `) }, }). ParseFS(tmplFS, "html/pages/"+pageName, "html/lib/*.html.tmpl")) } var ( tmplIndex = pageTemplate("index.html.tmpl") tmplJobs = pageTemplate("jobs.html.tmpl") tmplJob = pageTemplate("job.html.tmpl") ) func (o *ServerState) ServeHTTP(w http.ResponseWriter, r *http.Request) { if err := o.serveHTTP(w, r); err != nil { if err.Code != httpStatusWriteError { http.Error(w, err.Error(), err.Code) } if err.Code/100 == 5 { log.Printf("error: %q: %v", r.URL, err) } } } func (o *ServerState) serveHTTP(w http.ResponseWriter, r *http.Request) *_httpError { switch { case r.URL.Path == "/": if r.Method != http.MethodGet { return httpError(errors.New("only GET is supported"), http.StatusMethodNotAllowed) } w.Header().Set("Content-Type", "text/html; charset=utf-8") if err := tmplIndex.Execute(w, nil); err != nil { return writeOrServerError(err) } case r.URL.Path == "/style.css": if r.Method != http.MethodGet { return httpError(errors.New("only GET is supported"), http.StatusMethodNotAllowed) } w.Header().Set("Content-Type", "text/css") if _, err := w.Write(fileStyle); err != nil { return writeError(err) } case r.URL.Path == "/favicon.ico": if r.Method != http.MethodGet { return httpError(errors.New("only GET is supported"), http.StatusMethodNotAllowed) } w.Header().Set("Content-Type", "image/vnd.microsoft.icon") if _, err := w.Write(fileFavicon); err != nil { return writeError(err) } case r.URL.Path == "/eclipse.tar": if r.Method != http.MethodGet { return httpError(errors.New("only GET is supported"), http.StatusMethodNotAllowed) } w.Header().Set("Content-Type", "application/x-tar") if _, err := w.Write(source.Tarball); err != nil { return writeError(err) } case r.URL.Path == "/jobs": httpAddSlash(w, r) case r.URL.Path == "/jobs/": switch r.Method { case http.MethodGet: params := r.URL.Query() status := common.JobStatus(-1) _ = status.UnmarshalText([]byte(params.Get("status"))) // allowed to fail wait, _ := strconv.ParseBool(params.Get("wait")) // allowed to fail var jobs []common.Job for i := 0; i == 0 || (wait && len(jobs) == 0); i++ { var err error jobs, err = o.db.ListJobs(status) if err != nil { return httpError(err, http.StatusInternalServerError) } if wait && len(jobs) == 0 { if err := r.Context().Err(); err != nil { return httpError(err, http.StatusRequestTimeout) } time.Sleep(1 * time.Second) } } switch httputil.NegotiateContentType(r, []string{"application/json", "text/html"}, "application/json") { case "text/html": w.Header().Set("Content-Type", "text/html; charset=utf-8") if err := tmplJobs.Execute(w, map[string]any{"Jobs": jobs}); err != nil { return writeOrServerError(err) } case "application/json": jsonBytes, err := json.Marshal(jobs) if err != nil { return httpError(err, http.StatusInternalServerError) } w.Header().Set("Content-Type", "application/json") if _, err := w.Write(jsonBytes); err != nil { return writeError(err) } default: return httpError(errors.New("only text/html and application/json are supported"), http.StatusNotAcceptable) } case http.MethodPost: return httpError(errors.New("TODO: allow POSTing to /jobs/ to create a job"), http.StatusNotImplemented) default: return httpError(errors.New("only GET and POST are supported"), http.StatusMethodNotAllowed) } case strings.HasPrefix(r.URL.Path, "/jobs/"): rest := strings.TrimPrefix(r.URL.Path, "/jobs/") jobIDStr, file, haveSlash := strings.Cut(rest, "/") jobID, err := strconv.Atoi(jobIDStr) if err != nil { return httpNotFound() } job, err := o.db.GetJob(common.JobID(jobID)) if err != nil { return httpNotFound() } if !haveSlash { httpAddSlash(w, r) return nil } switch file { case "": switch r.Method { case http.MethodGet: dirfs := job.DirFS() artifactNames := []string{} if err := fs.WalkDir(dirfs, "artifacts", func(path string, d fs.DirEntry, err error) error { if err != nil { return err } if d.Type() == 0 { artifactNames = append(artifactNames, strings.TrimPrefix(path, "artifacts/")) } return nil }); err != nil && !os.IsNotExist(err) { return httpError(err, http.StatusInternalServerError) } w.Header().Set("Content-Type", "text/html; charset=utf-8") if err := tmplJob.Execute(w, map[string]any{ "Job": job, "ArtifactNames": artifactNames, }); err != nil { return writeOrServerError(err) } case http.MethodPost: if err := requireAuthorization(o.pubKey, w, r); err != nil { return err } return o.runJob(w, r, job) default: return httpError(errors.New("only GET and POST are supported"), http.StatusMethodNotAllowed) } default: if r.Method != http.MethodGet { return httpError(errors.New("only GET is supported"), http.StatusMethodNotAllowed) } if file == "log.txt" && job.Status == common.StatusRunning { ch, _ := o.running.Load(job.ID) w.Header().Set("Content-Type", "text/plain; charset=utf-8") if err := tailf(w, ch, filepath.Join(job.Dir, "log.txt")); err != nil { return writeOrServerError(err) } } else { r.URL.Path = "/" + file http.FileServer(http.Dir(job.Dir)).ServeHTTP(w, r) } } default: return httpNotFound() } return nil } // tailf is like the `tail -f --retry -n +1`, but a Go function. func tailf(w http.ResponseWriter, cancel <-chan struct{}, filename string) error { ctl := http.NewResponseController(w) fh, _ := os.Open(filename) if fh != nil { if _, err := io.Copy(w, fh); err != nil { return err } if err := ctl.Flush(); err != nil { return err } } ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for keepGoing := true; keepGoing; { select { case <-cancel: keepGoing = false case <-ticker.C: } if fh == nil { fh, _ = os.Open(filename) } if fh != nil { if _, err := io.Copy(w, fh); err != nil { return err } if err := ctl.Flush(); err != nil { return err } } } return nil } func (o *ServerState) runJob(w http.ResponseWriter, r *http.Request, job common.Job) (_reterr *_httpError) { log.Printf("running job: %q", r.URL) ctl := http.NewResponseController(w) if err := ctl.EnableFullDuplex(); err != nil { return httpError(err, http.StatusInternalServerError) } wroteHeader := false defer func() { if wroteHeader && _reterr != nil { bs, _ := json.Marshal(map[string]any{ "status": _reterr.Code, "error": _reterr.Err.Error(), }) _, _ = w.Write(bs) _ = ctl.Flush() if _reterr.Code/100 == 5 { log.Printf("error: %q: %v", r.URL, _reterr) } _reterr = nil } }() ch := make(chan struct{}) if _, conflict := o.running.LoadOrStore(job.ID, ch); conflict { return httpError(errors.New("job has already started (ch)"), http.StatusConflict) } defer func() { close(ch) o.running.Delete(job.ID) }() swapped, err := o.db.SwapJobStatus(job.ID, common.StatusNew, common.StatusRunning, "") if err != nil { return httpError(err, http.StatusInternalServerError) } if !swapped { return httpError(errors.New("job has already started (db)"), http.StatusConflict) } defer func() { if _reterr != nil { if _, err := o.db.SwapJobStatus(job.ID, common.StatusRunning, common.StatusError, _reterr.Err.Error()); err != nil { _reterr = httpError(err, http.StatusInternalServerError) } } }() mediaType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type")) if err != nil { return httpError(fmt.Errorf("invalid Content-Type: %q: %w", r.Header.Get("Content-Type"), err), http.StatusBadRequest) } if mediaType != "multipart/mixed" { return httpError(fmt.Errorf("invalid Content-Type: %q", r.Header.Get("Content-Type")), http.StatusBadRequest) } reader := multipart.NewReader(r.Body, params["boundary"]) respBytes1, err := json.Marshal(job) if err != nil { return httpError(fmt.Errorf("encoding job description: %w", err), http.StatusInternalServerError) } respBytes2, err := json.Marshal(map[string]any{"status": 200}) if err != nil { return httpError(fmt.Errorf("encoding description: %w", err), http.StatusInternalServerError) } if strings.Contains(r.Header.Get("Expect"), "100-continue") { // The stdlib net/http server is supposed to automatically do this if needed, but it // triggers this based on us reading the request body. Normally that would be a // good trigger; usually you *must* have read the body before you start writing the // response... but ctl.EnableFullDuplex() broke that assumption. If we start // writing before we read, the net/http server would skip straight to the "HTTP 200" // response, skipping the require "HTTP 100" response. (as of Go 1.21.6) // // So manually emit the "HTTP 100" response if it looks like we need it. Our // strings.Contains() check is a little loose--it's OK for is to emit this even if // it wasn't asked for. w.WriteHeader(http.StatusContinue) } w.Header().Set("Content-Type", "application/json") // This is the line between "can" and "can't" return a status code /////////////// wroteHeader = true if _, err := w.Write(respBytes1); err != nil { return writeError(fmt.Errorf("write job description: %w", err)) } if err := ctl.Flush(); err != nil { return writeError(fmt.Errorf("write job description: %w", err)) } var status string for { part, err := reader.NextPart() if err != nil { if err == io.EOF { break } return httpError(fmt.Errorf("read next part: %w", err), http.StatusBadRequest) } log.Printf("part: %q", part.Header.Get("Content-Disposition")) disposition, params, err := mime.ParseMediaType(part.Header.Get("Content-Disposition")) if err != nil { return httpError(fmt.Errorf("invalid Content-Disposition: %q: %w", part.Header.Get("Content-Disposition"), err), http.StatusBadRequest) } filename := path.Clean(params["filename"]) switch { case disposition == "attachment" && filename == "log.txt" || strings.HasPrefix(filename, "artifacts/"): fsFilename := filepath.Join(job.Dir, filepath.FromSlash(filename)) if err := os.MkdirAll(filepath.Dir(fsFilename), 0755); err != nil { return httpError(fmt.Errorf("saving file: %q: %w", filename, err), http.StatusInternalServerError) } fh, err := os.Create(fsFilename) if err != nil { return httpError(fmt.Errorf("saving file: %q: %w", filename, err), http.StatusInternalServerError) } if _, err := io.Copy(fh, part); err != nil { return writeOrServerError(fmt.Errorf("saving file: %q: %w", filename, err)) } if err := fh.Close(); err != nil { return httpError(fmt.Errorf("saving file: %q: %w", filename, err), http.StatusInternalServerError) } case disposition == "form-data" && params["name"] == "status": statusBytes, err := io.ReadAll(part) if err != nil { return httpError(fmt.Errorf("reading status: %w (got: %q)", err, statusBytes), http.StatusInternalServerError) } status = strings.TrimSpace(string(statusBytes)) default: return httpError(fmt.Errorf("invalid Content-Disposition: %q", part.Header.Get("Content-Disposition")), http.StatusBadRequest) } if err := part.Close(); err != nil { return httpError(fmt.Errorf("closing part: %w", err), http.StatusBadRequest) } } switch status { case "0": if _, err := o.db.SwapJobStatus(job.ID, common.StatusRunning, common.StatusSucceeded, ""); err != nil { return httpError(err, http.StatusInternalServerError) } case "": return httpError(fmt.Errorf("no status: %w", io.ErrUnexpectedEOF), http.StatusBadRequest) default: if _, err := o.db.SwapJobStatus(job.ID, common.StatusRunning, common.StatusFailed, status); err != nil { return httpError(err, http.StatusInternalServerError) } } if _, err := w.Write(respBytes2); err != nil { return writeError(fmt.Errorf("write tailing status: %w", err)) } if err := ctl.Flush(); err != nil { return writeError(fmt.Errorf("write tailing status: %w", err)) } return nil }