183 lines
4.1 KiB
Go
183 lines
4.1 KiB
Go
// Copyright (C) 2024 Umorpha Systems
|
|
// SPDX-License-Identifier: AGPL-3.0-or-later
|
|
|
|
package store
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
_ "github.com/mattn/go-sqlite3"
|
|
)
|
|
|
|
type Store struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
type JobStatus int
|
|
|
|
const (
|
|
StatusNew JobStatus = iota
|
|
StatusRunning
|
|
StatusSucceeded
|
|
StatusFailed
|
|
StatusError
|
|
)
|
|
|
|
func (s JobStatus) String() string {
|
|
switch s {
|
|
case StatusNew:
|
|
return "new"
|
|
case StatusRunning:
|
|
return "running"
|
|
case StatusSucceeded:
|
|
return "succeeded"
|
|
case StatusFailed:
|
|
return "failed"
|
|
case StatusError:
|
|
return "error"
|
|
default:
|
|
return fmt.Sprintf("InvalidStatus(%d)", s)
|
|
}
|
|
}
|
|
|
|
func (s JobStatus) HasLog() bool {
|
|
return s != StatusNew
|
|
}
|
|
|
|
func (s JobStatus) Finished() bool {
|
|
switch s {
|
|
case StatusSucceeded, StatusFailed, StatusError:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
type JobID int
|
|
|
|
func Open(filename string) (*Store, error) {
|
|
db, err := sql.Open("sqlite3", filename)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("statedb: open: %w", err)
|
|
}
|
|
|
|
if _, err := db.Exec(`
|
|
CREATE TABLE IF NOT EXISTS triggers (
|
|
expression TEXT NOT NULL PRIMARY KEY,
|
|
value TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS jobs (
|
|
id INTEGER PRIMARY KEY,
|
|
trigger_expressions TEXT NOT NULL, -- JSON array of strings
|
|
trigger_values TEXT NOT NULL, -- JSON array of strings
|
|
command TEXT NOT NULL,
|
|
status INTEGER
|
|
);
|
|
`); err != nil {
|
|
db.Close()
|
|
return nil, fmt.Errorf("statedb: migrate: %w", err)
|
|
}
|
|
|
|
return &Store{
|
|
db: db,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Store) Close() error {
|
|
return s.db.Close()
|
|
}
|
|
|
|
func (s *Store) GetTrigger(expr string) (string, error) {
|
|
var val string
|
|
err := s.db.QueryRow(`SELECT value FROM triggers WHERE expression = ?`,
|
|
expr).Scan(&val)
|
|
if err != nil {
|
|
return "", fmt.Errorf("statedb: get trigger: %q: %w", expr, err)
|
|
}
|
|
return val, nil
|
|
}
|
|
|
|
func (s *Store) SetTrigger(expr string, val string) error {
|
|
if _, err := s.db.Exec(`INSERT OR REPLACE INTO triggers(expression, value) VALUES(?, ?)`,
|
|
expr, val); err != nil {
|
|
return fmt.Errorf("statedb: set trigger: %q => %q: %w", expr, val, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Store) NewJob(triggerExprs []string, triggerVals []string, cmd string) (JobID, error) {
|
|
triggerExprsJSON, err := json.Marshal(triggerExprs)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
triggerValsJSON, err := json.Marshal(triggerVals)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
var id JobID
|
|
if err := s.db.QueryRow(`INSERT INTO jobs(trigger_expressions, trigger_values, command, status) VALUES(?, ?, ?, ?) RETURNING id`,
|
|
triggerExprsJSON, triggerValsJSON, cmd, StatusNew).Scan(&id); err != nil {
|
|
return 0, fmt.Errorf("statedb: create job: %w", err)
|
|
}
|
|
return id, nil
|
|
}
|
|
|
|
type Job struct {
|
|
ID JobID
|
|
TriggerExpressions []string
|
|
TriggerValues []string
|
|
Command string
|
|
Status JobStatus
|
|
}
|
|
|
|
func scanJob(row interface{ Scan(...any) error }) (Job, error) {
|
|
var job Job
|
|
var exprsJSON, valsJSON []byte
|
|
if err := row.Scan(&job.ID, &exprsJSON, &valsJSON, &job.Command, &job.Status); err != nil {
|
|
return Job{}, err
|
|
}
|
|
if err := json.Unmarshal(exprsJSON, &job.TriggerExpressions); err != nil {
|
|
return Job{}, err
|
|
}
|
|
if err := json.Unmarshal(valsJSON, &job.TriggerValues); err != nil {
|
|
return Job{}, err
|
|
}
|
|
return job, nil
|
|
}
|
|
|
|
func (s *Store) ListJobs() ([]Job, error) {
|
|
rows, err := s.db.Query(`SELECT id, trigger_expressions, trigger_values, command, status FROM jobs ORDER BY id DESC`)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("statedb: list jobs: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var jobs []Job
|
|
for rows.Next() {
|
|
job, err := scanJob(rows)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("statedb: list jobs: %w", err)
|
|
}
|
|
jobs = append(jobs, job)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("statedb: list jobs: %w", err)
|
|
}
|
|
|
|
return jobs, nil
|
|
}
|
|
|
|
func (s *Store) GetJob(id JobID) (Job, error) {
|
|
job, err := scanJob(s.db.QueryRow(`SELECT id, trigger_expressions, trigger_values, command, status FROM jobs WHERE id = ?`,
|
|
id))
|
|
if err != nil {
|
|
return Job{}, fmt.Errorf("statedb: list jobs: %v: %w", id, err)
|
|
}
|
|
return job, nil
|
|
}
|