You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

957 lines
22 KiB

package main
import (
"context"
"fmt"
"net"
"os"
"os/exec"
"sync"
"syscall"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
orlyaclv1 "next.orly.dev/pkg/proto/orlyacl/v1"
)
// Supervisor manages the database, ACL, sync, and relay processes.
type Supervisor struct {
cfg *Config
ctx context.Context
cancel context.CancelFunc
dbProc *Process
aclProc *Process
relayProc *Process
// Sync service processes
distributedSyncProc *Process
clusterSyncProc *Process
relayGroupProc *Process
negentropyProc *Process
wg sync.WaitGroup
mu sync.Mutex
closed bool
}
// Process represents a managed subprocess.
type Process struct {
name string
cmd *exec.Cmd
restarts int
exited chan struct{} // closed when process exits
mu sync.Mutex
}
// NewSupervisor creates a new process supervisor.
func NewSupervisor(ctx context.Context, cancel context.CancelFunc, cfg *Config) *Supervisor {
return &Supervisor{
cfg: cfg,
ctx: ctx,
cancel: cancel,
}
}
// Start starts the database, optional ACL server, sync services, and relay processes.
func (s *Supervisor) Start() error {
// 1. Start database server
if err := s.startDB(); err != nil {
return fmt.Errorf("failed to start database: %w", err)
}
// 2. Wait for DB to be ready (health check on gRPC port)
if err := s.waitForDBReady(s.cfg.DBReadyTimeout); err != nil {
s.stopDB()
return fmt.Errorf("database not ready: %w", err)
}
log.I.F("database is ready")
// 3. Start ACL server if enabled
if s.cfg.ACLEnabled {
if err := s.startACL(); err != nil {
s.stopDB()
return fmt.Errorf("failed to start ACL server: %w", err)
}
// Wait for ACL to be ready
if err := s.waitForACLReady(s.cfg.ACLReadyTimeout); err != nil {
s.stopACL()
s.stopDB()
return fmt.Errorf("ACL server not ready: %w", err)
}
log.I.F("ACL server is ready")
}
// 4. Start sync services in parallel (they all depend on DB)
if err := s.startSyncServices(); err != nil {
s.stopSyncServices()
if s.cfg.ACLEnabled {
s.stopACL()
}
s.stopDB()
return fmt.Errorf("failed to start sync services: %w", err)
}
// 5. Start relay with gRPC backend(s)
if err := s.startRelay(); err != nil {
s.stopSyncServices()
if s.cfg.ACLEnabled {
s.stopACL()
}
s.stopDB()
return fmt.Errorf("failed to start relay: %w", err)
}
// 6. Start monitoring goroutines
monitorCount := 2 // db + relay
if s.cfg.ACLEnabled {
monitorCount++
}
if s.cfg.DistributedSyncEnabled {
monitorCount++
}
if s.cfg.ClusterSyncEnabled {
monitorCount++
}
if s.cfg.RelayGroupEnabled {
monitorCount++
}
if s.cfg.NegentropyEnabled {
monitorCount++
}
s.wg.Add(monitorCount)
go s.monitorProcess(s.dbProc, "db", s.startDB)
if s.cfg.ACLEnabled {
go s.monitorProcess(s.aclProc, "acl", s.startACL)
}
if s.cfg.DistributedSyncEnabled {
go s.monitorProcess(s.distributedSyncProc, "distributed-sync", s.startDistributedSync)
}
if s.cfg.ClusterSyncEnabled {
go s.monitorProcess(s.clusterSyncProc, "cluster-sync", s.startClusterSync)
}
if s.cfg.RelayGroupEnabled {
go s.monitorProcess(s.relayGroupProc, "relaygroup", s.startRelayGroup)
}
if s.cfg.NegentropyEnabled {
go s.monitorProcess(s.negentropyProc, "negentropy", s.startNegentropy)
}
go s.monitorProcess(s.relayProc, "relay", s.startRelay)
return nil
}
// Stop stops all managed processes gracefully.
func (s *Supervisor) Stop() error {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return nil
}
s.closed = true
s.mu.Unlock()
// Stop relay first (it depends on sync services, ACL, and DB)
log.I.F("stopping relay...")
s.stopProcess(s.relayProc, 5*time.Second)
// Stop sync services in parallel (they depend on DB)
log.I.F("stopping sync services...")
s.stopSyncServices()
// Stop ACL if enabled (it depends on DB)
if s.cfg.ACLEnabled && s.aclProc != nil {
log.I.F("stopping ACL server...")
s.stopProcess(s.aclProc, 5*time.Second)
}
// Stop DB with longer timeout for flush
log.I.F("stopping database...")
s.stopProcess(s.dbProc, s.cfg.StopTimeout)
// Wait for monitor goroutines to exit (they will exit when they see closed=true)
s.wg.Wait()
return nil
}
func (s *Supervisor) startDB() error {
s.mu.Lock()
defer s.mu.Unlock()
// Build environment for database process
env := os.Environ()
env = append(env, fmt.Sprintf("ORLY_DB_LISTEN=%s", s.cfg.DBListen))
env = append(env, fmt.Sprintf("ORLY_DATA_DIR=%s", s.cfg.DataDir))
env = append(env, fmt.Sprintf("ORLY_DB_LOG_LEVEL=%s", s.cfg.LogLevel))
cmd := exec.CommandContext(s.ctx, s.cfg.DBBinary)
cmd.Env = env
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); chk.E(err) {
return err
}
exited := make(chan struct{})
s.dbProc = &Process{
name: "orly-db",
cmd: cmd,
exited: exited,
}
// Start a goroutine to wait for the process and close the exited channel
go func() {
cmd.Wait()
close(exited)
}()
log.I.F("started database server (pid %d)", cmd.Process.Pid)
return nil
}
func (s *Supervisor) startACL() error {
s.mu.Lock()
defer s.mu.Unlock()
// Build environment for ACL process
env := os.Environ()
env = append(env, fmt.Sprintf("ORLY_ACL_LISTEN=%s", s.cfg.ACLListen))
env = append(env, "ORLY_ACL_DB_TYPE=grpc")
env = append(env, fmt.Sprintf("ORLY_ACL_GRPC_DB_SERVER=%s", s.cfg.DBListen))
env = append(env, fmt.Sprintf("ORLY_ACL_MODE=%s", s.cfg.ACLMode))
env = append(env, fmt.Sprintf("ORLY_ACL_LOG_LEVEL=%s", s.cfg.LogLevel))
cmd := exec.CommandContext(s.ctx, s.cfg.ACLBinary)
cmd.Env = env
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); chk.E(err) {
return err
}
exited := make(chan struct{})
s.aclProc = &Process{
name: "orly-acl",
cmd: cmd,
exited: exited,
}
// Start a goroutine to wait for the process and close the exited channel
go func() {
cmd.Wait()
close(exited)
}()
log.I.F("started ACL server (pid %d)", cmd.Process.Pid)
return nil
}
func (s *Supervisor) waitForACLReady(timeout time.Duration) error {
deadline := time.Now().Add(timeout)
ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop()
var grpcConn *grpc.ClientConn
var aclClient orlyaclv1.ACLServiceClient
for {
select {
case <-s.ctx.Done():
if grpcConn != nil {
grpcConn.Close()
}
return s.ctx.Err()
case <-ticker.C:
if time.Now().After(deadline) {
if grpcConn != nil {
grpcConn.Close()
}
return fmt.Errorf("timeout waiting for ACL server")
}
// First, check if TCP port is open
conn, err := net.DialTimeout("tcp", s.cfg.ACLListen, time.Second)
if err != nil {
continue // Port not open yet
}
conn.Close()
// Port is open, now check gRPC Ready() endpoint
if grpcConn == nil {
grpcConn, err = grpc.DialContext(s.ctx, s.cfg.ACLListen,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
continue // Failed to connect
}
aclClient = orlyaclv1.NewACLServiceClient(grpcConn)
}
// Call Ready() to check if service is fully configured
ctx, cancel := context.WithTimeout(s.ctx, time.Second)
resp, err := aclClient.Ready(ctx, &orlyaclv1.Empty{})
cancel()
if err == nil && resp.Ready {
grpcConn.Close()
return nil // ACL server is fully ready
}
// Not ready yet, keep polling
}
}
}
func (s *Supervisor) stopACL() {
s.stopProcess(s.aclProc, 5*time.Second)
}
func (s *Supervisor) startRelay() error {
s.mu.Lock()
defer s.mu.Unlock()
// Build environment for relay process
env := os.Environ()
env = append(env, "ORLY_DB_TYPE=grpc")
env = append(env, fmt.Sprintf("ORLY_GRPC_SERVER=%s", s.cfg.DBListen))
env = append(env, fmt.Sprintf("ORLY_LOG_LEVEL=%s", s.cfg.LogLevel))
// If ACL is enabled, configure relay to use gRPC ACL
// Otherwise, run in open mode (no ACL restrictions)
if s.cfg.ACLEnabled {
env = append(env, "ORLY_ACL_TYPE=grpc")
env = append(env, fmt.Sprintf("ORLY_GRPC_ACL_SERVER=%s", s.cfg.ACLListen))
env = append(env, fmt.Sprintf("ORLY_ACL_MODE=%s", s.cfg.ACLMode))
} else {
// Open relay - no ACL restrictions
env = append(env, "ORLY_ACL_TYPE=local")
env = append(env, "ORLY_ACL_MODE=none")
}
// Configure sync service connections
if s.cfg.DistributedSyncEnabled {
env = append(env, "ORLY_SYNC_TYPE=grpc")
env = append(env, fmt.Sprintf("ORLY_GRPC_SYNC_DISTRIBUTED=%s", s.cfg.DistributedSyncListen))
}
if s.cfg.ClusterSyncEnabled {
env = append(env, fmt.Sprintf("ORLY_GRPC_SYNC_CLUSTER=%s", s.cfg.ClusterSyncListen))
}
if s.cfg.RelayGroupEnabled {
env = append(env, fmt.Sprintf("ORLY_GRPC_SYNC_RELAYGROUP=%s", s.cfg.RelayGroupListen))
}
if s.cfg.NegentropyEnabled {
env = append(env, "ORLY_NEGENTROPY_ENABLED=true")
env = append(env, fmt.Sprintf("ORLY_GRPC_SYNC_NEGENTROPY=%s", s.cfg.NegentropyListen))
}
cmd := exec.CommandContext(s.ctx, s.cfg.RelayBinary)
cmd.Env = env
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); chk.E(err) {
return err
}
exited := make(chan struct{})
s.relayProc = &Process{
name: "orly",
cmd: cmd,
exited: exited,
}
// Start a goroutine to wait for the process and close the exited channel
go func() {
cmd.Wait()
close(exited)
}()
log.I.F("started relay server (pid %d)", cmd.Process.Pid)
return nil
}
func (s *Supervisor) waitForDBReady(timeout time.Duration) error {
deadline := time.Now().Add(timeout)
ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return s.ctx.Err()
case <-ticker.C:
if time.Now().After(deadline) {
return fmt.Errorf("timeout waiting for database")
}
// Try to connect to the gRPC port
conn, err := net.DialTimeout("tcp", s.cfg.DBListen, time.Second)
if err == nil {
conn.Close()
return nil // Database is accepting connections
}
}
}
}
func (s *Supervisor) stopDB() {
s.stopProcess(s.dbProc, s.cfg.StopTimeout)
}
func (s *Supervisor) stopProcess(p *Process, timeout time.Duration) {
if p == nil {
return
}
p.mu.Lock()
if p.cmd == nil || p.cmd.Process == nil {
p.mu.Unlock()
return
}
p.mu.Unlock()
// Send SIGTERM for graceful shutdown
if err := p.cmd.Process.Signal(syscall.SIGTERM); err != nil {
// Process may have already exited
log.D.F("%s already exited: %v", p.name, err)
return
}
// Wait for process to exit using the exited channel
select {
case <-p.exited:
log.I.F("%s stopped gracefully", p.name)
case <-time.After(timeout):
log.W.F("%s did not stop in time, killing", p.name)
p.cmd.Process.Kill()
<-p.exited // Wait for the kill to complete
}
}
func (s *Supervisor) monitorProcess(p *Process, procType string, restart func() error) {
defer s.wg.Done()
for {
// Check if we're shutting down
s.mu.Lock()
closed := s.closed
s.mu.Unlock()
if closed {
return
}
select {
case <-s.ctx.Done():
return
default:
}
if p == nil || p.exited == nil {
return
}
// Wait for process to exit
select {
case <-p.exited:
// Process exited
case <-s.ctx.Done():
return
}
// Check again if we're shutting down (process may have been stopped intentionally)
s.mu.Lock()
closed = s.closed
s.mu.Unlock()
if closed {
return
}
// Process exited unexpectedly
p.restarts++
log.W.F("%s exited unexpectedly, restart count: %d", p.name, p.restarts)
// Backoff before restart
backoff := time.Duration(p.restarts) * time.Second
if backoff > 30*time.Second {
backoff = 30 * time.Second
}
select {
case <-s.ctx.Done():
return
case <-time.After(backoff):
}
// Check one more time before restarting
s.mu.Lock()
closed = s.closed
s.mu.Unlock()
if closed {
return
}
if err := restart(); err != nil {
log.E.F("failed to restart %s: %v", p.name, err)
} else {
// Update p to point to the new process
s.mu.Lock()
switch procType {
case "db":
p = s.dbProc
case "acl":
p = s.aclProc
case "distributed-sync":
p = s.distributedSyncProc
case "cluster-sync":
p = s.clusterSyncProc
case "relaygroup":
p = s.relayGroupProc
case "negentropy":
p = s.negentropyProc
default:
p = s.relayProc
}
s.mu.Unlock()
}
}
}
// startSyncServices starts all enabled sync services in parallel.
func (s *Supervisor) startSyncServices() error {
var wg sync.WaitGroup
var mu sync.Mutex
var errs []error
// Start each enabled service in a goroutine
if s.cfg.DistributedSyncEnabled {
wg.Add(1)
go func() {
defer wg.Done()
if err := s.startDistributedSync(); err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("distributed sync: %w", err))
mu.Unlock()
return
}
if err := s.waitForServiceReady(s.cfg.DistributedSyncListen, s.cfg.SyncReadyTimeout); err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("distributed sync not ready: %w", err))
mu.Unlock()
return
}
log.I.F("distributed sync service is ready")
}()
}
if s.cfg.ClusterSyncEnabled {
wg.Add(1)
go func() {
defer wg.Done()
if err := s.startClusterSync(); err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("cluster sync: %w", err))
mu.Unlock()
return
}
if err := s.waitForServiceReady(s.cfg.ClusterSyncListen, s.cfg.SyncReadyTimeout); err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("cluster sync not ready: %w", err))
mu.Unlock()
return
}
log.I.F("cluster sync service is ready")
}()
}
if s.cfg.RelayGroupEnabled {
wg.Add(1)
go func() {
defer wg.Done()
if err := s.startRelayGroup(); err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("relaygroup: %w", err))
mu.Unlock()
return
}
if err := s.waitForServiceReady(s.cfg.RelayGroupListen, s.cfg.SyncReadyTimeout); err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("relaygroup not ready: %w", err))
mu.Unlock()
return
}
log.I.F("relaygroup service is ready")
}()
}
if s.cfg.NegentropyEnabled {
wg.Add(1)
go func() {
defer wg.Done()
if err := s.startNegentropy(); err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("negentropy: %w", err))
mu.Unlock()
return
}
if err := s.waitForServiceReady(s.cfg.NegentropyListen, s.cfg.SyncReadyTimeout); err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("negentropy not ready: %w", err))
mu.Unlock()
return
}
log.I.F("negentropy service is ready")
}()
}
wg.Wait()
if len(errs) > 0 {
return errs[0]
}
return nil
}
// stopSyncServices stops all sync services.
func (s *Supervisor) stopSyncServices() {
// Stop all in parallel using a WaitGroup
var wg sync.WaitGroup
if s.distributedSyncProc != nil {
wg.Add(1)
go func() {
defer wg.Done()
s.stopProcess(s.distributedSyncProc, 5*time.Second)
}()
}
if s.clusterSyncProc != nil {
wg.Add(1)
go func() {
defer wg.Done()
s.stopProcess(s.clusterSyncProc, 5*time.Second)
}()
}
if s.relayGroupProc != nil {
wg.Add(1)
go func() {
defer wg.Done()
s.stopProcess(s.relayGroupProc, 5*time.Second)
}()
}
if s.negentropyProc != nil {
wg.Add(1)
go func() {
defer wg.Done()
s.stopProcess(s.negentropyProc, 5*time.Second)
}()
}
wg.Wait()
}
// waitForServiceReady waits for a gRPC service to be accepting connections.
func (s *Supervisor) waitForServiceReady(address string, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return s.ctx.Err()
case <-ticker.C:
if time.Now().After(deadline) {
return fmt.Errorf("timeout waiting for service at %s", address)
}
conn, err := net.DialTimeout("tcp", address, time.Second)
if err == nil {
conn.Close()
return nil
}
}
}
}
func (s *Supervisor) startDistributedSync() error {
s.mu.Lock()
defer s.mu.Unlock()
env := os.Environ()
env = append(env, fmt.Sprintf("ORLY_SYNC_DISTRIBUTED_LISTEN=%s", s.cfg.DistributedSyncListen))
env = append(env, "ORLY_SYNC_DISTRIBUTED_DB_TYPE=grpc")
env = append(env, fmt.Sprintf("ORLY_SYNC_DISTRIBUTED_DB_SERVER=%s", s.cfg.DBListen))
env = append(env, fmt.Sprintf("ORLY_SYNC_DISTRIBUTED_LOG_LEVEL=%s", s.cfg.LogLevel))
cmd := exec.CommandContext(s.ctx, s.cfg.DistributedSyncBinary)
cmd.Env = env
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); chk.E(err) {
return err
}
exited := make(chan struct{})
s.distributedSyncProc = &Process{
name: "orly-sync-distributed",
cmd: cmd,
exited: exited,
}
go func() {
cmd.Wait()
close(exited)
}()
log.I.F("started distributed sync service (pid %d)", cmd.Process.Pid)
return nil
}
func (s *Supervisor) startClusterSync() error {
s.mu.Lock()
defer s.mu.Unlock()
env := os.Environ()
env = append(env, fmt.Sprintf("ORLY_SYNC_CLUSTER_LISTEN=%s", s.cfg.ClusterSyncListen))
env = append(env, "ORLY_SYNC_CLUSTER_DB_TYPE=grpc")
env = append(env, fmt.Sprintf("ORLY_SYNC_CLUSTER_DB_SERVER=%s", s.cfg.DBListen))
env = append(env, fmt.Sprintf("ORLY_SYNC_CLUSTER_LOG_LEVEL=%s", s.cfg.LogLevel))
cmd := exec.CommandContext(s.ctx, s.cfg.ClusterSyncBinary)
cmd.Env = env
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); chk.E(err) {
return err
}
exited := make(chan struct{})
s.clusterSyncProc = &Process{
name: "orly-sync-cluster",
cmd: cmd,
exited: exited,
}
go func() {
cmd.Wait()
close(exited)
}()
log.I.F("started cluster sync service (pid %d)", cmd.Process.Pid)
return nil
}
func (s *Supervisor) startRelayGroup() error {
s.mu.Lock()
defer s.mu.Unlock()
env := os.Environ()
env = append(env, fmt.Sprintf("ORLY_SYNC_RELAYGROUP_LISTEN=%s", s.cfg.RelayGroupListen))
env = append(env, "ORLY_SYNC_RELAYGROUP_DB_TYPE=grpc")
env = append(env, fmt.Sprintf("ORLY_SYNC_RELAYGROUP_DB_SERVER=%s", s.cfg.DBListen))
env = append(env, fmt.Sprintf("ORLY_SYNC_RELAYGROUP_LOG_LEVEL=%s", s.cfg.LogLevel))
cmd := exec.CommandContext(s.ctx, s.cfg.RelayGroupBinary)
cmd.Env = env
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); chk.E(err) {
return err
}
exited := make(chan struct{})
s.relayGroupProc = &Process{
name: "orly-sync-relaygroup",
cmd: cmd,
exited: exited,
}
go func() {
cmd.Wait()
close(exited)
}()
log.I.F("started relaygroup service (pid %d)", cmd.Process.Pid)
return nil
}
func (s *Supervisor) startNegentropy() error {
s.mu.Lock()
defer s.mu.Unlock()
env := os.Environ()
env = append(env, fmt.Sprintf("ORLY_SYNC_NEGENTROPY_LISTEN=%s", s.cfg.NegentropyListen))
env = append(env, "ORLY_SYNC_NEGENTROPY_DB_TYPE=grpc")
env = append(env, fmt.Sprintf("ORLY_SYNC_NEGENTROPY_DB_SERVER=%s", s.cfg.DBListen))
env = append(env, fmt.Sprintf("ORLY_SYNC_NEGENTROPY_LOG_LEVEL=%s", s.cfg.LogLevel))
cmd := exec.CommandContext(s.ctx, s.cfg.NegentropyBinary)
cmd.Env = env
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); chk.E(err) {
return err
}
exited := make(chan struct{})
s.negentropyProc = &Process{
name: "orly-sync-negentropy",
cmd: cmd,
exited: exited,
}
go func() {
cmd.Wait()
close(exited)
}()
log.I.F("started negentropy service (pid %d)", cmd.Process.Pid)
return nil
}
// GetProcessStatuses returns the status of all managed processes.
func (s *Supervisor) GetProcessStatuses() []ProcessStatus {
s.mu.Lock()
defer s.mu.Unlock()
var statuses []ProcessStatus
// Database process
if s.dbProc != nil {
statuses = append(statuses, s.getProcessStatus(s.dbProc, s.cfg.DBBinary))
}
// ACL process
if s.cfg.ACLEnabled && s.aclProc != nil {
statuses = append(statuses, s.getProcessStatus(s.aclProc, s.cfg.ACLBinary))
}
// Sync services
if s.cfg.DistributedSyncEnabled && s.distributedSyncProc != nil {
statuses = append(statuses, s.getProcessStatus(s.distributedSyncProc, s.cfg.DistributedSyncBinary))
}
if s.cfg.ClusterSyncEnabled && s.clusterSyncProc != nil {
statuses = append(statuses, s.getProcessStatus(s.clusterSyncProc, s.cfg.ClusterSyncBinary))
}
if s.cfg.RelayGroupEnabled && s.relayGroupProc != nil {
statuses = append(statuses, s.getProcessStatus(s.relayGroupProc, s.cfg.RelayGroupBinary))
}
if s.cfg.NegentropyEnabled && s.negentropyProc != nil {
statuses = append(statuses, s.getProcessStatus(s.negentropyProc, s.cfg.NegentropyBinary))
}
// Relay process
if s.relayProc != nil {
statuses = append(statuses, s.getProcessStatus(s.relayProc, s.cfg.RelayBinary))
}
return statuses
}
func (s *Supervisor) getProcessStatus(p *Process, binaryPath string) ProcessStatus {
status := "stopped"
pid := 0
p.mu.Lock()
defer p.mu.Unlock()
if p.cmd != nil && p.cmd.Process != nil {
// Check if process is still running
select {
case <-p.exited:
status = "stopped"
default:
status = "running"
pid = p.cmd.Process.Pid
}
}
return ProcessStatus{
Name: p.name,
Binary: binaryPath,
Version: "", // Will be filled by caller if needed
Status: status,
PID: pid,
Restarts: p.restarts,
}
}
// RestartAll stops all processes and starts them again.
func (s *Supervisor) RestartAll() error {
log.I.F("restarting all processes...")
// Stop in reverse dependency order
s.mu.Lock()
if s.relayProc != nil {
s.mu.Unlock()
s.stopProcess(s.relayProc, 5*time.Second)
s.mu.Lock()
}
s.mu.Unlock()
s.stopSyncServices()
s.mu.Lock()
if s.cfg.ACLEnabled && s.aclProc != nil {
s.mu.Unlock()
s.stopProcess(s.aclProc, 5*time.Second)
s.mu.Lock()
}
if s.dbProc != nil {
s.mu.Unlock()
s.stopProcess(s.dbProc, s.cfg.StopTimeout)
s.mu.Lock()
}
s.mu.Unlock()
// Small delay to ensure ports are released
time.Sleep(500 * time.Millisecond)
// Start again in dependency order
if err := s.startDB(); err != nil {
return fmt.Errorf("failed to restart database: %w", err)
}
if err := s.waitForDBReady(s.cfg.DBReadyTimeout); err != nil {
return fmt.Errorf("database not ready after restart: %w", err)
}
if s.cfg.ACLEnabled {
if err := s.startACL(); err != nil {
return fmt.Errorf("failed to restart ACL: %w", err)
}
if err := s.waitForACLReady(s.cfg.ACLReadyTimeout); err != nil {
return fmt.Errorf("ACL not ready after restart: %w", err)
}
}
if err := s.startSyncServices(); err != nil {
return fmt.Errorf("failed to restart sync services: %w", err)
}
if err := s.startRelay(); err != nil {
return fmt.Errorf("failed to restart relay: %w", err)
}
log.I.F("all processes restarted successfully")
return nil
}