// Copyright (C) 2024 Umorpha Systems // SPDX-License-Identifier: AGPL-3.0-or-later package leader import ( "database/sql" "encoding/json" "fmt" "path/filepath" "time" _ "github.com/mattn/go-sqlite3" "git.mothstuff.lol/lukeshu/eclipse/lib/common" ) type DB struct { db *sql.DB jobsdir string } func (cfg Config) DB() (*DB, error) { db, err := sql.Open("sqlite3", filepath.Join(cfg.StateDir, "state.sqlite")) if err != nil { return nil, fmt.Errorf("statedb: open: %w", err) } if _, err := db.Exec(` CREATE TABLE IF NOT EXISTS triggers ( expression TEXT NOT NULL PRIMARY KEY, value TEXT NOT NULL ) STRICT; CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY, trigger_expressions TEXT NOT NULL, -- JSON array of strings trigger_values TEXT NOT NULL, -- JSON array of strings command TEXT NOT NULL, status INTEGER, status_msg TEXT, created_at INTEGER, -- seconds since the Unix epoch updated_at INTEGER -- seconds since the Unix epoch ) STRICT; `); err != nil { _ = db.Close() return nil, fmt.Errorf("statedb: migrate: %w", err) } return &DB{ db: db, jobsdir: filepath.Join(cfg.StateDir, "jobfiles"), }, nil } // SQL ///////////////////////////////////////////////////////////////////////// func (s *DB) Close() error { return s.db.Close() } func (s *DB) GetTrigger(expr string) (string, error) { var val string err := s.db.QueryRow(`SELECT value FROM triggers WHERE expression = ?`, expr).Scan(&val) if err != nil { return "", fmt.Errorf("statedb: get trigger: %q: %w", expr, err) } return val, nil } func (s *DB) SetTrigger(expr string, val string) error { if _, err := s.db.Exec(`INSERT OR REPLACE INTO triggers(expression, value) VALUES(?, ?)`, expr, val); err != nil { return fmt.Errorf("statedb: set trigger: %q => %q: %w", expr, val, err) } return nil } func (s *DB) NewJob(triggerExprs []string, triggerVals []string, cmd string) (common.JobID, error) { triggerExprsJSON, err := json.Marshal(triggerExprs) if err != nil { return 0, err } triggerValsJSON, err := json.Marshal(triggerVals) if err != nil { return 0, err } var id common.JobID now := time.Now().Unix() if err := s.db.QueryRow(`INSERT INTO jobs(trigger_expressions, trigger_values, command, status, created_at, updated_at) VALUES(?, ?, ?, ?, ?, ?) RETURNING id`, string(triggerExprsJSON), string(triggerValsJSON), cmd, common.StatusNew, now, now).Scan(&id); err != nil { return 0, fmt.Errorf("statedb: create job: %w", err) } return id, nil } func scanJob(row interface{ Scan(...any) error }) (common.Job, error) { var job common.Job var exprsJSON, valsJSON []byte var created, updated int64 if err := row.Scan(&job.ID, &exprsJSON, &valsJSON, &job.Command, &job.Status, &job.StatusMsg, &created, &updated); err != nil { return common.Job{}, err } if err := json.Unmarshal(exprsJSON, &job.TriggerExpressions); err != nil { return common.Job{}, err } if err := json.Unmarshal(valsJSON, &job.TriggerValues); err != nil { return common.Job{}, err } job.CreatedAt = time.Unix(created, 0) job.UpdatedAt = time.Unix(updated, 0) return job, nil } func (s *DB) ListJobs(status common.JobStatus) (_ []common.Job, err error) { maybeSetErr := func(_err error) { if err == nil && _err != nil { err = _err } } var rows *sql.Rows if status < 0 { rows, err = s.db.Query(`SELECT * FROM jobs ORDER BY id DESC`) } else { rows, err = s.db.Query(`SELECT * FROM jobs WHERE status = ? ORDER BY id DESC`, status) } if err != nil { return nil, fmt.Errorf("statedb: list jobs: %w", err) } defer func() { maybeSetErr(rows.Close()) }() var jobs []common.Job for rows.Next() { job, err := scanJob(rows) if err != nil { return nil, fmt.Errorf("statedb: list jobs: %w", err) } job.Dir = filepath.Join(s.jobsdir, fmt.Sprintf("%v", job.ID)) jobs = append(jobs, job) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("statedb: list jobs: %w", err) } return jobs, nil } func (s *DB) GetJob(id common.JobID) (common.Job, error) { job, err := scanJob(s.db.QueryRow(`SELECT * FROM jobs WHERE id = ?`, id)) if err != nil { return common.Job{}, fmt.Errorf("statedb: list jobs: %v: %w", id, err) } job.Dir = filepath.Join(s.jobsdir, fmt.Sprintf("%v", job.ID)) return job, nil } func (s *DB) SwapJobStatus(id common.JobID, oldStatus, newStatus common.JobStatus, statusMsg string) (ok bool, _ error) { now := time.Now().Unix() res, err := s.db.Exec(`UPDATE jobs SET status = ?, status_msg = ?, updated_at = ? WHERE id = ? and status = ?`, newStatus, statusMsg, now, id, oldStatus) if err != nil { return false, fmt.Errorf("statedb: update job: %v: %v => %v (%q): %w", id, oldStatus, newStatus, statusMsg, err) } n, err := res.RowsAffected() if err != nil { return false, fmt.Errorf("statedb: update job: %v: %v => %v (%q): %w", id, oldStatus, newStatus, statusMsg, err) } return n != 0, nil } func (s *DB) GCRunningJobs() error { now := time.Now().Unix() if _, err := s.db.Exec(`UPDATE jobs SET status = ?, updated_at = ? WHERE status = ?`, common.StatusError, now, common.StatusRunning); err != nil { return fmt.Errorf("statedb: gc running jobs: %w", err) } return nil }