eclipse/lib/leader/db.go

186 lines
5.2 KiB
Go

// Copyright (C) 2024 Umorpha Systems
// SPDX-License-Identifier: AGPL-3.0-or-later
package leader
import (
"database/sql"
"encoding/json"
"fmt"
"path/filepath"
"time"
_ "github.com/mattn/go-sqlite3"
"git.mothstuff.lol/lukeshu/eclipse/lib/common"
)
type DB struct {
db *sql.DB
jobsdir string
}
func (cfg Config) DB() (*DB, error) {
db, err := sql.Open("sqlite3", filepath.Join(cfg.StateDir, "state.sqlite"))
if err != nil {
return nil, fmt.Errorf("statedb: open: %w", err)
}
if _, err := db.Exec(`
CREATE TABLE IF NOT EXISTS triggers (
expression TEXT NOT NULL PRIMARY KEY,
value TEXT NOT NULL
) STRICT;
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY,
trigger_expressions TEXT NOT NULL, -- JSON array of strings
trigger_values TEXT NOT NULL, -- JSON array of strings
command TEXT NOT NULL,
status INTEGER,
status_msg TEXT,
created_at INTEGER, -- seconds since the Unix epoch
updated_at INTEGER -- seconds since the Unix epoch
) STRICT;
`); err != nil {
_ = db.Close()
return nil, fmt.Errorf("statedb: migrate: %w", err)
}
return &DB{
db: db,
jobsdir: filepath.Join(cfg.StateDir, "jobfiles"),
}, nil
}
// SQL /////////////////////////////////////////////////////////////////////////
func (s *DB) Close() error {
return s.db.Close()
}
func (s *DB) GetTrigger(expr string) (string, error) {
var val string
err := s.db.QueryRow(`SELECT value FROM triggers WHERE expression = ?`,
expr).Scan(&val)
if err != nil {
return "", fmt.Errorf("statedb: get trigger: %q: %w", expr, err)
}
return val, nil
}
func (s *DB) SetTrigger(expr string, val string) error {
if _, err := s.db.Exec(`INSERT OR REPLACE INTO triggers(expression, value) VALUES(?, ?)`,
expr, val); err != nil {
return fmt.Errorf("statedb: set trigger: %q => %q: %w", expr, val, err)
}
return nil
}
func (s *DB) NewJob(triggerExprs []string, triggerVals []string, cmd string) (common.JobID, error) {
triggerExprsJSON, err := json.Marshal(triggerExprs)
if err != nil {
return 0, err
}
triggerValsJSON, err := json.Marshal(triggerVals)
if err != nil {
return 0, err
}
var id common.JobID
now := time.Now().Unix()
if err := s.db.QueryRow(`INSERT INTO jobs(trigger_expressions, trigger_values, command, status, created_at, updated_at) VALUES(?, ?, ?, ?, ?, ?) RETURNING id`,
string(triggerExprsJSON), string(triggerValsJSON), cmd, common.StatusNew, now, now).Scan(&id); err != nil {
return 0, fmt.Errorf("statedb: create job: %w", err)
}
return id, nil
}
func scanJob(row interface{ Scan(...any) error }) (common.Job, error) {
var job common.Job
var exprsJSON, valsJSON []byte
var statusMsg sql.NullString
var created, updated int64
if err := row.Scan(&job.ID, &exprsJSON, &valsJSON, &job.Command, &job.Status, &statusMsg, &created, &updated); err != nil {
return common.Job{}, err
}
job.StatusMsg = statusMsg.String
if err := json.Unmarshal(exprsJSON, &job.TriggerExpressions); err != nil {
return common.Job{}, err
}
if err := json.Unmarshal(valsJSON, &job.TriggerValues); err != nil {
return common.Job{}, err
}
job.CreatedAt = time.Unix(created, 0).UTC()
job.UpdatedAt = time.Unix(updated, 0).UTC()
return job, nil
}
func (s *DB) ListJobs(status common.JobStatus) (_ []common.Job, err error) {
maybeSetErr := func(_err error) {
if err == nil && _err != nil {
err = _err
}
}
var rows *sql.Rows
if status < 0 {
rows, err = s.db.Query(`SELECT * FROM jobs ORDER BY id DESC`)
} else {
rows, err = s.db.Query(`SELECT * FROM jobs WHERE status = ? ORDER BY id DESC`, status)
}
if err != nil {
return nil, fmt.Errorf("statedb: list jobs: %w", err)
}
defer func() { maybeSetErr(rows.Close()) }()
var jobs []common.Job
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
return nil, fmt.Errorf("statedb: list jobs: %w", err)
}
job.Dir = filepath.Join(s.jobsdir, fmt.Sprintf("%v", job.ID))
jobs = append(jobs, job)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("statedb: list jobs: %w", err)
}
return jobs, nil
}
func (s *DB) GetJob(id common.JobID) (common.Job, error) {
job, err := scanJob(s.db.QueryRow(`SELECT * FROM jobs WHERE id = ?`,
id))
if err != nil {
return common.Job{}, fmt.Errorf("statedb: list jobs: %v: %w", id, err)
}
job.Dir = filepath.Join(s.jobsdir, fmt.Sprintf("%v", job.ID))
return job, nil
}
func (s *DB) SwapJobStatus(id common.JobID, oldStatus, newStatus common.JobStatus, statusMsg string) (ok bool, _ error) {
now := time.Now().Unix()
res, err := s.db.Exec(`UPDATE jobs SET status = ?, status_msg = ?, updated_at = ? WHERE id = ? and status = ?`,
newStatus, statusMsg, now, id, oldStatus)
if err != nil {
return false, fmt.Errorf("statedb: update job: %v: %v => %v (%q): %w", id, oldStatus, newStatus, statusMsg, err)
}
n, err := res.RowsAffected()
if err != nil {
return false, fmt.Errorf("statedb: update job: %v: %v => %v (%q): %w", id, oldStatus, newStatus, statusMsg, err)
}
return n != 0, nil
}
func (s *DB) GCRunningJobs() error {
now := time.Now().Unix()
if _, err := s.db.Exec(`UPDATE jobs SET status = ?, updated_at = ? WHERE status = ?`,
common.StatusError, now, common.StatusRunning); err != nil {
return fmt.Errorf("statedb: gc running jobs: %w", err)
}
return nil
}