603 lines
19 KiB
Go
603 lines
19 KiB
Go
// Copyright (C) 2023-2024 Umorpha Systems
|
|
// SPDX-License-Identifier: AGPL-3.0-or-later
|
|
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"embed"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"html/template"
|
|
"io"
|
|
"io/fs"
|
|
"log"
|
|
"mime"
|
|
"mime/multipart"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.lukeshu.com/go/containers/typedsync"
|
|
"github.com/datawire/ocibuild/pkg/cliutil"
|
|
"github.com/golang/gddo/httputil"
|
|
"github.com/spf13/cobra"
|
|
|
|
source "git.mothstuff.lol/lukeshu/eclipse"
|
|
"git.mothstuff.lol/lukeshu/eclipse/lib/apikeys"
|
|
"git.mothstuff.lol/lukeshu/eclipse/lib/common"
|
|
"git.mothstuff.lol/lukeshu/eclipse/lib/leader"
|
|
)
|
|
|
|
func main() {
|
|
argparser := &cobra.Command{
|
|
Use: os.Args[0] + " [flags]",
|
|
Short: "The leader HTTP server",
|
|
Args: cliutil.WrapPositionalArgs(cobra.NoArgs),
|
|
|
|
SilenceErrors: true, // we'll handle this ourselves after .ExecuteContext()
|
|
SilenceUsage: true, // FlagErrorFunc will handle this
|
|
|
|
CompletionOptions: cobra.CompletionOptions{
|
|
DisableDefaultCmd: true,
|
|
},
|
|
}
|
|
argparser.SetFlagErrorFunc(cliutil.FlagErrorFunc)
|
|
argparser.SetHelpTemplate(cliutil.HelpTemplate)
|
|
|
|
var cfgFile string
|
|
argparser.Flags().StringVar(&cfgFile, "config", source.DefaultLeaderConfigFile,
|
|
"Config file to use")
|
|
_ = argparser.MarkFlagFilename("config", "yml", "yaml")
|
|
|
|
var socket string
|
|
argparser.Flags().StringVar(&socket, "socket", "tcp::8080",
|
|
"the socket to listen on; a \"family:address\" pair, where the\n"+
|
|
"\"family\" is one of \"tcp\", \"tcp4\", \"tcp6\", \"unix\", \"unixpacket\"\n"+
|
|
"(the usual Go net.Listen() network names), or \"fd\" (to listen\n"+
|
|
"on an already-open file descriptor). The \"address\" part for\n"+
|
|
"the \"fd\" family is one of\n"+
|
|
" 1. a file descriptor number (or the special names \"stdin\",\n"+
|
|
" \"stdout\", or \"stderr\" to refer to 0, 1, or 2\n"+
|
|
" respectively),\n"+
|
|
" 2. \"systemd:N\" to refer to the Nth file descriptor passed in\n"+
|
|
" via systemd (or \"systemd\" as shorthand for \"systemd:0\")\n"+
|
|
" (see sd_listend_fds(3)),\n"+
|
|
" 3. \"systemd:NAME\" to refer to a named file descriptor passed\n"+
|
|
" in via systemd (see sd_listend_fds_with_names(3)).\n")
|
|
|
|
argparser.RunE = func(cmd *cobra.Command, args []string) error {
|
|
stype, saddr, ok := strings.Cut(socket, ":")
|
|
if !ok {
|
|
return cliutil.FlagErrorFunc(cmd, fmt.Errorf("invalid address: %q", socket))
|
|
}
|
|
return Serve(cfgFile, stype, saddr)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
if err := argparser.ExecuteContext(ctx); err != nil {
|
|
fmt.Fprintf(os.Stderr, "%v: error: %v\n", argparser.CommandPath(), err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func Serve(cfgFile, stype, saddr string) (err error) {
|
|
maybeSetErr := func(_err error) {
|
|
if err == nil && _err != nil {
|
|
err = _err
|
|
}
|
|
}
|
|
|
|
cfg, err := leader.LoadConfig(cfgFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
pubKey, err := apikeys.LoadLeaderPubKey(cfg.PubKeyFile)
|
|
if err != nil {
|
|
return fmt.Errorf("load pubkey: %w", err)
|
|
}
|
|
|
|
db, err := cfg.DB()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() { maybeSetErr(db.Close()) }()
|
|
|
|
sock, err := netListen(stype, saddr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Do this *after* listening, so that the socket can act as a
|
|
// mutex.
|
|
if err := db.GCRunningJobs(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Even tnough most of the routing is happening in big
|
|
// `switch` statements, the ServeMux is important because it
|
|
// cleans the paths, which would sort of be a pain to do
|
|
// ourselves.
|
|
router := http.NewServeMux()
|
|
router.Handle("/", &ServerState{db: db, pubKey: pubKey})
|
|
router.Handle("/webhooks/", http.StripPrefix("/webhooks/", &Webhooks{pubKey: pubKey, cfgFile: cfgFile}))
|
|
|
|
log.Printf("Serving on %v...", sock.Addr())
|
|
return http.Serve(sock, router)
|
|
}
|
|
|
|
// HTTP utils //////////////////////////////////////////////////////////////////
|
|
|
|
func httpAddSlash(w http.ResponseWriter, r *http.Request) {
|
|
u := &url.URL{Path: r.URL.Path + "/", RawQuery: r.URL.RawQuery}
|
|
http.Redirect(w, r, u.String(), http.StatusMovedPermanently)
|
|
}
|
|
|
|
type _httpError struct {
|
|
Err error
|
|
Code int
|
|
}
|
|
|
|
func (e *_httpError) Error() string {
|
|
return fmt.Sprintf("HTTP %d: %v", e.Code, e.Err)
|
|
}
|
|
|
|
func httpError(err error, code int) *_httpError {
|
|
return &_httpError{Code: code, Err: err}
|
|
}
|
|
|
|
const httpStatusWriteError = -100
|
|
|
|
func writeError(err error) *_httpError {
|
|
return &_httpError{Code: httpStatusWriteError, Err: err}
|
|
}
|
|
|
|
func writeOrServerError(err error) *_httpError {
|
|
return &_httpError{Code: http.StatusInternalServerError, Err: err}
|
|
}
|
|
|
|
func httpNotFound() *_httpError {
|
|
return httpError(errors.New("404 page not found"), http.StatusNotFound)
|
|
}
|
|
|
|
func requireAuthorization(pubKey apikeys.PublicKey, w http.ResponseWriter, r *http.Request) *_httpError {
|
|
key := strings.TrimSpace(strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer "))
|
|
var keyErr error
|
|
if key == "" {
|
|
keyErr = fmt.Errorf("no apikey token")
|
|
} else {
|
|
keyErr = apikeys.ValidateAPIKey(pubKey, key)
|
|
}
|
|
if keyErr == nil {
|
|
return nil
|
|
}
|
|
w.Header().Set("WWW-Authenticate", "Bearer")
|
|
return httpError(keyErr, http.StatusUnauthorized)
|
|
}
|
|
|
|
// ServerState /////////////////////////////////////////////////////////////////
|
|
|
|
type ServerState struct {
|
|
db *leader.DB
|
|
pubKey apikeys.PublicKey
|
|
running typedsync.Map[common.JobID, chan struct{}]
|
|
}
|
|
|
|
//go:embed style.css
|
|
var fileStyle []byte
|
|
|
|
//go:embed favicon.ico
|
|
var fileFavicon []byte
|
|
|
|
//go:embed html
|
|
var tmplFS embed.FS
|
|
|
|
func pageTemplate(pageName string) *template.Template {
|
|
return template.Must(template.New(pageName).
|
|
Funcs(template.FuncMap{
|
|
"fmtDatetime": func(t time.Time) template.HTML {
|
|
return template.HTML(fmt.Sprintf(`<time datetime="%s" title="%s">%v</time>`,
|
|
t.Format(time.RFC3339),
|
|
t.Format("2006-01-02 15:04:05Z07:00"),
|
|
t))
|
|
},
|
|
// html/template strips comments by default, so we have to do this to wrap
|
|
// it in [template.HTML] instead of putting it directly in
|
|
// `./html/lib/page.html.tmpl`. (We could instead wrap just the stuff
|
|
// inside the <script> tag in [template.JS].
|
|
"jsLicense": func() template.HTML {
|
|
return template.HTML(`<script>
|
|
/* @licstart The following is the entire license notice for the JavaScript
|
|
* code in this page.
|
|
*
|
|
* Copyright (C) 2023-2024 Umorpha Systems
|
|
*
|
|
* This JavaScript code in this page is free software: you can redistribute
|
|
* it and/or modify it under the terms of the GNU Affero General Public
|
|
* License as published by the Free Software Foundation, either version 3 of
|
|
* the License, or (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful, but
|
|
* WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
|
|
* General Public License for more details.
|
|
*
|
|
* @licend The above is the entire license notice for the JavaScript code in
|
|
* this page.
|
|
*/
|
|
</script>`)
|
|
},
|
|
}).
|
|
ParseFS(tmplFS, "html/pages/"+pageName, "html/lib/*.html.tmpl"))
|
|
}
|
|
|
|
var (
|
|
tmplIndex = pageTemplate("index.html.tmpl")
|
|
tmplJobs = pageTemplate("jobs.html.tmpl")
|
|
tmplJob = pageTemplate("job.html.tmpl")
|
|
)
|
|
|
|
func (o *ServerState) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
if err := o.serveHTTP(w, r); err != nil {
|
|
if err.Code != httpStatusWriteError {
|
|
http.Error(w, err.Error(), err.Code)
|
|
}
|
|
if err.Code/100 == 5 {
|
|
log.Printf("error: %q: %v", r.URL, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (o *ServerState) serveHTTP(w http.ResponseWriter, r *http.Request) *_httpError {
|
|
switch {
|
|
case r.URL.Path == "/":
|
|
if r.Method != http.MethodGet {
|
|
return httpError(errors.New("only GET is supported"), http.StatusMethodNotAllowed)
|
|
}
|
|
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
|
if err := tmplIndex.Execute(w, nil); err != nil {
|
|
return writeOrServerError(err)
|
|
}
|
|
case r.URL.Path == "/style.css":
|
|
if r.Method != http.MethodGet {
|
|
return httpError(errors.New("only GET is supported"), http.StatusMethodNotAllowed)
|
|
}
|
|
w.Header().Set("Content-Type", "text/css")
|
|
if _, err := w.Write(fileStyle); err != nil {
|
|
return writeError(err)
|
|
}
|
|
case r.URL.Path == "/favicon.ico":
|
|
if r.Method != http.MethodGet {
|
|
return httpError(errors.New("only GET is supported"), http.StatusMethodNotAllowed)
|
|
}
|
|
w.Header().Set("Content-Type", "image/vnd.microsoft.icon")
|
|
if _, err := w.Write(fileFavicon); err != nil {
|
|
return writeError(err)
|
|
}
|
|
case r.URL.Path == "/eclipse.tar":
|
|
if r.Method != http.MethodGet {
|
|
return httpError(errors.New("only GET is supported"), http.StatusMethodNotAllowed)
|
|
}
|
|
w.Header().Set("Content-Type", "application/x-tar")
|
|
if _, err := w.Write(source.Tarball); err != nil {
|
|
return writeError(err)
|
|
}
|
|
case r.URL.Path == "/jobs":
|
|
httpAddSlash(w, r)
|
|
case r.URL.Path == "/jobs/":
|
|
switch r.Method {
|
|
case http.MethodGet:
|
|
params := r.URL.Query()
|
|
status := common.JobStatus(-1)
|
|
_ = status.UnmarshalText([]byte(params.Get("status"))) // allowed to fail
|
|
wait, _ := strconv.ParseBool(params.Get("wait")) // allowed to fail
|
|
|
|
var jobs []common.Job
|
|
for i := 0; i == 0 || (wait && len(jobs) == 0); i++ {
|
|
var err error
|
|
jobs, err = o.db.ListJobs(status)
|
|
if err != nil {
|
|
return httpError(err, http.StatusInternalServerError)
|
|
}
|
|
if wait && len(jobs) == 0 {
|
|
if err := r.Context().Err(); err != nil {
|
|
return httpError(err, http.StatusRequestTimeout)
|
|
}
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
}
|
|
switch httputil.NegotiateContentType(r, []string{"application/json", "text/html"}, "application/json") {
|
|
case "text/html":
|
|
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
|
if err := tmplJobs.Execute(w, map[string]any{"Jobs": jobs}); err != nil {
|
|
return writeOrServerError(err)
|
|
}
|
|
case "application/json":
|
|
jsonBytes, err := json.Marshal(jobs)
|
|
if err != nil {
|
|
return httpError(err, http.StatusInternalServerError)
|
|
}
|
|
w.Header().Set("Content-Type", "application/json")
|
|
if _, err := w.Write(jsonBytes); err != nil {
|
|
return writeError(err)
|
|
}
|
|
default:
|
|
return httpError(errors.New("only text/html and application/json are supported"), http.StatusNotAcceptable)
|
|
}
|
|
case http.MethodPost:
|
|
return httpError(errors.New("TODO: allow POSTing to /jobs/ to create a job"), http.StatusNotImplemented)
|
|
default:
|
|
return httpError(errors.New("only GET and POST are supported"), http.StatusMethodNotAllowed)
|
|
}
|
|
case strings.HasPrefix(r.URL.Path, "/jobs/"):
|
|
rest := strings.TrimPrefix(r.URL.Path, "/jobs/")
|
|
jobIDStr, file, haveSlash := strings.Cut(rest, "/")
|
|
jobID, err := strconv.Atoi(jobIDStr)
|
|
if err != nil {
|
|
return httpNotFound()
|
|
}
|
|
job, err := o.db.GetJob(common.JobID(jobID))
|
|
if err != nil {
|
|
return httpNotFound()
|
|
}
|
|
if !haveSlash {
|
|
httpAddSlash(w, r)
|
|
return nil
|
|
}
|
|
switch file {
|
|
case "":
|
|
switch r.Method {
|
|
case http.MethodGet:
|
|
dirfs := job.DirFS()
|
|
artifactNames := []string{}
|
|
if err := fs.WalkDir(dirfs, "artifacts", func(path string, d fs.DirEntry, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if d.Type() == 0 {
|
|
artifactNames = append(artifactNames, strings.TrimPrefix(path, "artifacts/"))
|
|
}
|
|
return nil
|
|
}); err != nil && !os.IsNotExist(err) {
|
|
return httpError(err, http.StatusInternalServerError)
|
|
}
|
|
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
|
if err := tmplJob.Execute(w, map[string]any{
|
|
"Job": job,
|
|
"ArtifactNames": artifactNames,
|
|
}); err != nil {
|
|
return writeOrServerError(err)
|
|
}
|
|
case http.MethodPost:
|
|
if err := requireAuthorization(o.pubKey, w, r); err != nil {
|
|
return err
|
|
}
|
|
return o.runJob(w, r, job)
|
|
default:
|
|
return httpError(errors.New("only GET and POST are supported"), http.StatusMethodNotAllowed)
|
|
}
|
|
default:
|
|
if r.Method != http.MethodGet {
|
|
return httpError(errors.New("only GET is supported"), http.StatusMethodNotAllowed)
|
|
}
|
|
if file == "log.txt" && job.Status == common.StatusRunning {
|
|
ch, _ := o.running.Load(job.ID)
|
|
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
|
if err := tailf(w, ch, filepath.Join(job.Dir, "log.txt")); err != nil {
|
|
return writeOrServerError(err)
|
|
}
|
|
} else {
|
|
r.URL.Path = "/" + file
|
|
http.FileServer(http.Dir(job.Dir)).ServeHTTP(w, r)
|
|
}
|
|
}
|
|
default:
|
|
return httpNotFound()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// tailf is like the `tail -f --retry -n +1`, but a Go function.
|
|
func tailf(w http.ResponseWriter, cancel <-chan struct{}, filename string) error {
|
|
ctl := http.NewResponseController(w)
|
|
|
|
fh, _ := os.Open(filename)
|
|
if fh != nil {
|
|
if _, err := io.Copy(w, fh); err != nil {
|
|
return err
|
|
}
|
|
if err := ctl.Flush(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
ticker := time.NewTicker(1 * time.Second)
|
|
defer ticker.Stop()
|
|
for keepGoing := true; keepGoing; {
|
|
select {
|
|
case <-cancel:
|
|
keepGoing = false
|
|
case <-ticker.C:
|
|
}
|
|
if fh == nil {
|
|
fh, _ = os.Open(filename)
|
|
}
|
|
if fh != nil {
|
|
if _, err := io.Copy(w, fh); err != nil {
|
|
return err
|
|
}
|
|
if err := ctl.Flush(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (o *ServerState) runJob(w http.ResponseWriter, r *http.Request, job common.Job) (_reterr *_httpError) {
|
|
log.Printf("running job: %q", r.URL)
|
|
|
|
ctl := http.NewResponseController(w)
|
|
if err := ctl.EnableFullDuplex(); err != nil {
|
|
return httpError(err, http.StatusInternalServerError)
|
|
}
|
|
|
|
wroteHeader := false
|
|
defer func() {
|
|
if wroteHeader && _reterr != nil {
|
|
bs, _ := json.Marshal(map[string]any{
|
|
"status": _reterr.Code,
|
|
"error": _reterr.Err.Error(),
|
|
})
|
|
_, _ = w.Write(bs)
|
|
_ = ctl.Flush()
|
|
if _reterr.Code/100 == 5 {
|
|
log.Printf("error: %q: %v", r.URL, _reterr)
|
|
}
|
|
_reterr = nil
|
|
}
|
|
}()
|
|
|
|
ch := make(chan struct{})
|
|
if _, conflict := o.running.LoadOrStore(job.ID, ch); conflict {
|
|
return httpError(errors.New("job has already started (ch)"), http.StatusConflict)
|
|
}
|
|
defer func() {
|
|
close(ch)
|
|
o.running.Delete(job.ID)
|
|
}()
|
|
swapped, err := o.db.SwapJobStatus(job.ID, common.StatusNew, common.StatusRunning, "")
|
|
if err != nil {
|
|
return httpError(err, http.StatusInternalServerError)
|
|
}
|
|
if !swapped {
|
|
return httpError(errors.New("job has already started (db)"), http.StatusConflict)
|
|
}
|
|
defer func() {
|
|
if _reterr != nil {
|
|
if _, err := o.db.SwapJobStatus(job.ID, common.StatusRunning, common.StatusError, _reterr.Err.Error()); err != nil {
|
|
_reterr = httpError(err, http.StatusInternalServerError)
|
|
}
|
|
}
|
|
}()
|
|
|
|
mediaType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
|
|
if err != nil {
|
|
return httpError(fmt.Errorf("invalid Content-Type: %q: %w", r.Header.Get("Content-Type"), err), http.StatusBadRequest)
|
|
}
|
|
if mediaType != "multipart/mixed" {
|
|
return httpError(fmt.Errorf("invalid Content-Type: %q", r.Header.Get("Content-Type")), http.StatusBadRequest)
|
|
}
|
|
reader := multipart.NewReader(r.Body, params["boundary"])
|
|
|
|
respBytes1, err := json.Marshal(job)
|
|
if err != nil {
|
|
return httpError(fmt.Errorf("encoding job description: %w", err), http.StatusInternalServerError)
|
|
}
|
|
|
|
respBytes2, err := json.Marshal(map[string]any{"status": 200})
|
|
if err != nil {
|
|
return httpError(fmt.Errorf("encoding description: %w", err), http.StatusInternalServerError)
|
|
}
|
|
|
|
if strings.Contains(r.Header.Get("Expect"), "100-continue") {
|
|
// The stdlib net/http server is supposed to automatically do this if needed, but it
|
|
// triggers this based on us reading the request body. Normally that would be a
|
|
// good trigger; usually you *must* have read the body before you start writing the
|
|
// response... but ctl.EnableFullDuplex() broke that assumption. If we start
|
|
// writing before we read, the net/http server would skip straight to the "HTTP 200"
|
|
// response, skipping the require "HTTP 100" response. (as of Go 1.21.6)
|
|
//
|
|
// So manually emit the "HTTP 100" response if it looks like we need it. Our
|
|
// strings.Contains() check is a little loose--it's OK for is to emit this even if
|
|
// it wasn't asked for.
|
|
w.WriteHeader(http.StatusContinue)
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
|
// This is the line between "can" and "can't" return a status code ///////////////
|
|
wroteHeader = true
|
|
if _, err := w.Write(respBytes1); err != nil {
|
|
return writeError(fmt.Errorf("write job description: %w", err))
|
|
}
|
|
if err := ctl.Flush(); err != nil {
|
|
return writeError(fmt.Errorf("write job description: %w", err))
|
|
}
|
|
|
|
var status string
|
|
for {
|
|
part, err := reader.NextPart()
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
return httpError(fmt.Errorf("read next part: %w", err), http.StatusBadRequest)
|
|
}
|
|
log.Printf("part: %q", part.Header.Get("Content-Disposition"))
|
|
disposition, params, err := mime.ParseMediaType(part.Header.Get("Content-Disposition"))
|
|
if err != nil {
|
|
return httpError(fmt.Errorf("invalid Content-Disposition: %q: %w", part.Header.Get("Content-Disposition"), err), http.StatusBadRequest)
|
|
}
|
|
filename := path.Clean(params["filename"])
|
|
switch {
|
|
case disposition == "attachment" && filename == "log.txt" || strings.HasPrefix(filename, "artifacts/"):
|
|
fsFilename := filepath.Join(job.Dir, filepath.FromSlash(filename))
|
|
if err := os.MkdirAll(filepath.Dir(fsFilename), 0755); err != nil {
|
|
return httpError(fmt.Errorf("saving file: %q: %w", filename, err), http.StatusInternalServerError)
|
|
}
|
|
fh, err := os.Create(fsFilename)
|
|
if err != nil {
|
|
return httpError(fmt.Errorf("saving file: %q: %w", filename, err), http.StatusInternalServerError)
|
|
}
|
|
if _, err := io.Copy(fh, part); err != nil {
|
|
return writeOrServerError(fmt.Errorf("saving file: %q: %w", filename, err))
|
|
}
|
|
if err := fh.Close(); err != nil {
|
|
return httpError(fmt.Errorf("saving file: %q: %w", filename, err), http.StatusInternalServerError)
|
|
}
|
|
case disposition == "form-data" && params["name"] == "status":
|
|
statusBytes, err := io.ReadAll(part)
|
|
if err != nil {
|
|
return httpError(fmt.Errorf("reading status: %w (got: %q)", err, statusBytes), http.StatusInternalServerError)
|
|
}
|
|
status = strings.TrimSpace(string(statusBytes))
|
|
default:
|
|
return httpError(fmt.Errorf("invalid Content-Disposition: %q", part.Header.Get("Content-Disposition")), http.StatusBadRequest)
|
|
}
|
|
if err := part.Close(); err != nil {
|
|
return httpError(fmt.Errorf("closing part: %w", err), http.StatusBadRequest)
|
|
}
|
|
}
|
|
|
|
switch status {
|
|
case "0":
|
|
if _, err := o.db.SwapJobStatus(job.ID, common.StatusRunning, common.StatusSucceeded, ""); err != nil {
|
|
return httpError(err, http.StatusInternalServerError)
|
|
}
|
|
case "":
|
|
return httpError(fmt.Errorf("no status: %w", io.ErrUnexpectedEOF), http.StatusBadRequest)
|
|
default:
|
|
if _, err := o.db.SwapJobStatus(job.ID, common.StatusRunning, common.StatusFailed, status); err != nil {
|
|
return httpError(err, http.StatusInternalServerError)
|
|
}
|
|
}
|
|
|
|
if _, err := w.Write(respBytes2); err != nil {
|
|
return writeError(fmt.Errorf("write tailing status: %w", err))
|
|
}
|
|
if err := ctl.Flush(); err != nil {
|
|
return writeError(fmt.Errorf("write tailing status: %w", err))
|
|
}
|
|
|
|
return nil
|
|
}
|