You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

2238 lines
64 KiB

package app
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/http/httputil"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"git.mleku.dev/mleku/nostr/encoders/bech32encoding"
"git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/filter"
"git.mleku.dev/mleku/nostr/encoders/hex"
"git.mleku.dev/mleku/nostr/encoders/tag"
"git.mleku.dev/mleku/nostr/encoders/timestamp"
"git.mleku.dev/mleku/nostr/httpauth"
"git.mleku.dev/mleku/nostr/interfaces/signer/p8k"
"git.mleku.dev/mleku/nostr/protocol/auth"
"github.com/gorilla/websocket"
"lol.mleku.dev/chk"
"next.orly.dev/app/branding"
"next.orly.dev/app/config"
"next.orly.dev/pkg/acl"
"next.orly.dev/pkg/archive"
"next.orly.dev/pkg/blossom"
"next.orly.dev/pkg/bunker"
"next.orly.dev/pkg/database"
domainevents "next.orly.dev/pkg/domain/events"
"next.orly.dev/pkg/domain/events/subscribers"
"next.orly.dev/pkg/event/authorization"
"next.orly.dev/pkg/event/ingestion"
"next.orly.dev/pkg/event/processing"
"next.orly.dev/pkg/event/routing"
"next.orly.dev/pkg/event/specialkinds"
"next.orly.dev/pkg/event/validation"
acliface "next.orly.dev/pkg/interfaces/acl"
"next.orly.dev/pkg/policy"
"next.orly.dev/pkg/protocol/graph"
"next.orly.dev/pkg/protocol/nip43"
"next.orly.dev/pkg/protocol/nrc"
"next.orly.dev/pkg/protocol/publish"
"next.orly.dev/pkg/ratelimit"
"next.orly.dev/pkg/spider"
"next.orly.dev/pkg/storage"
dsync "next.orly.dev/pkg/sync"
"next.orly.dev/pkg/transport"
"next.orly.dev/pkg/wireguard"
)
type Server struct {
mux *http.ServeMux
// Config holds the relay configuration.
// Deprecated: Use GetConfig() method instead of accessing directly.
Config *config.C
// Ctx holds the server context.
// Deprecated: Use Context() method instead of accessing directly.
Ctx context.Context
publishers *publish.S
// Admins holds the admin pubkeys.
// Deprecated: Use IsAdmin() method instead of accessing directly.
Admins [][]byte
// Owners holds the owner pubkeys.
// Deprecated: Use IsOwner() method instead of accessing directly.
Owners [][]byte
// DB holds the database instance.
// Deprecated: Use Database() method instead of accessing directly.
DB database.Database
// optional reverse proxy for dev web server
devProxy *httputil.ReverseProxy
// Per-IP connection tracking to prevent resource exhaustion
connPerIPMu sync.RWMutex
connPerIP map[string]int
// Challenge storage for HTTP UI authentication
challengeMutex sync.RWMutex
challenges map[string][]byte
// Message processing pause mutex for policy/follow list updates
// Use RLock() for normal message processing, Lock() for updates
messagePauseMutex sync.RWMutex
paymentProcessor *PaymentProcessor
sprocketManager *SprocketManager
policyManager *policy.P
spiderManager *spider.Spider
directorySpider *spider.DirectorySpider
syncManager *dsync.Manager
relayGroupMgr *dsync.RelayGroupManager
clusterManager *dsync.ClusterManager
blossomServer *blossom.Server
InviteManager *nip43.InviteManager
graphExecutor *graph.Executor
rateLimiter *ratelimit.Limiter
cfg *config.C
db database.Database // Changed from *database.D to interface
// Domain services for event handling
eventValidator *validation.Service
eventAuthorizer *authorization.Service
eventRouter *routing.DefaultRouter
eventProcessor *processing.Service
eventDispatcher *domainevents.Dispatcher
ingestionService *ingestion.Service
specialKinds *specialkinds.Registry
aclRegistry acliface.Registry
// WireGuard VPN and NIP-46 Bunker
wireguardServer *wireguard.Server
bunkerServer *bunker.Server
subnetPool *wireguard.SubnetPool
// NRC (Nostr Relay Connect) bridge for remote relay access
nrcBridge *nrc.Bridge
// Archive relay and storage management
archiveManager *archive.Manager
accessTracker *storage.AccessTracker
garbageCollector *storage.GarbageCollector
// Transport manager for network transports (TCP, TLS, Tor, etc.)
transportMgr *transport.Manager
// Branding/white-label customization
brandingMgr *branding.Manager
}
// =============================================================================
// Server Accessor Methods
// =============================================================================
// GetConfig returns the relay configuration.
func (s *Server) GetConfig() *config.C {
return s.Config
}
// Context returns the server context.
func (s *Server) Context() context.Context {
return s.Ctx
}
// Database returns the database instance.
func (s *Server) Database() database.Database {
return s.DB
}
// IsAdmin returns true if the given pubkey is an admin.
func (s *Server) IsAdmin(pubkey []byte) bool {
pubHex := string(hex.Enc(pubkey))
for _, admin := range s.Admins {
if string(hex.Enc(admin)) == pubHex {
return true
}
}
return false
}
// IsOwner returns true if the given pubkey is an owner.
func (s *Server) IsOwner(pubkey []byte) bool {
pubHex := string(hex.Enc(pubkey))
for _, owner := range s.Owners {
if string(hex.Enc(owner)) == pubHex {
return true
}
}
return false
}
// ACLRegistry returns the ACL registry instance.
// This enables dependency injection for testing and removes reliance on global state.
func (s *Server) ACLRegistry() acliface.Registry {
return s.aclRegistry
}
// isIPBlacklisted checks if an IP address is blacklisted using the managed ACL system
func (s *Server) isIPBlacklisted(remote string) bool {
// Extract IP from remote address (e.g., "192.168.1.1:12345" -> "192.168.1.1")
remoteIP := strings.Split(remote, ":")[0]
// Check static IP blacklist from config first
if len(s.Config.IPBlacklist) > 0 {
for _, blocked := range s.Config.IPBlacklist {
// Allow simple prefix matching for subnets (e.g., "192.168" matches 192.168.0.0/16)
if blocked != "" && strings.HasPrefix(remoteIP, blocked) {
return true
}
}
}
// Check if managed ACL is available and active
if s.Config.ACLMode == "managed" {
for _, aclInstance := range acl.Registry.ACLs() {
if aclInstance.Type() == "managed" {
if managed, ok := aclInstance.(*acl.Managed); ok {
return managed.IsIPBlocked(remoteIP)
}
}
}
}
return false
}
// isAllowedCORSOrigin checks if the given origin is allowed for CORS requests.
// Returns true if:
// - CORSOrigins contains "*" (allow all)
// - CORSOrigins is empty (allow all when CORS is enabled)
// - The origin matches one of the configured origins
func (s *Server) isAllowedCORSOrigin(origin string) bool {
if origin == "" {
return false
}
// If no specific origins configured, allow all
if len(s.Config.CORSOrigins) == 0 {
return true
}
for _, allowed := range s.Config.CORSOrigins {
if allowed == "*" {
return true
}
if allowed == origin {
return true
}
}
return false
}
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Check if this is a blossom-related path (needs CORS headers)
path := r.URL.Path
isBlossomPath := path == "/upload" || path == "/media" ||
path == "/mirror" || path == "/report" ||
strings.HasPrefix(path, "/list/") ||
strings.HasPrefix(path, "/blossom/") ||
(len(path) == 65 && path[0] == '/') // /<sha256> blob downloads
// Set CORS headers for all blossom-related requests
if isBlossomPath {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, PUT, DELETE, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Authorization, authorization, Content-Type, content-type, X-SHA-256, x-sha-256, X-Content-Length, x-content-length, X-Content-Type, x-content-type, Accept, accept")
w.Header().Set("Access-Control-Expose-Headers", "X-Reason, Content-Length, Content-Type, Accept-Ranges")
w.Header().Set("Access-Control-Max-Age", "86400")
// Handle preflight OPTIONS requests for blossom paths
if r.Method == "OPTIONS" {
w.WriteHeader(http.StatusOK)
return
}
}
// Set CORS headers for API endpoints when enabled (for standalone dashboard mode)
// Also allow root path for NIP-11 relay info requests
isAPIPath := strings.HasPrefix(path, "/api/")
isRelayInfoRequest := path == "/" && r.Header.Get("Accept") == "application/nostr+json"
if s.Config != nil && s.Config.CORSEnabled && (isAPIPath || isRelayInfoRequest) {
origin := r.Header.Get("Origin")
if s.isAllowedCORSOrigin(origin) {
w.Header().Set("Access-Control-Allow-Origin", origin)
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, Accept")
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Access-Control-Max-Age", "86400")
}
// Handle preflight OPTIONS requests for API paths
if r.Method == "OPTIONS" {
w.WriteHeader(http.StatusOK)
return
}
}
if r.Method == "OPTIONS" && !isBlossomPath && !isAPIPath {
// Handle OPTIONS for other paths
if s.mux != nil {
s.mux.ServeHTTP(w, r)
return
}
w.WriteHeader(http.StatusOK)
return
}
// Log proxy information for debugging (only for WebSocket requests to avoid spam)
if r.Header.Get("Upgrade") == "websocket" {
LogProxyInfo(r, "HTTP request")
}
// If this is a websocket request, only intercept the relay root path.
// This allows other websocket paths (e.g., Vite HMR) to be handled by the dev proxy when enabled.
if r.Header.Get("Upgrade") == "websocket" {
if s.mux != nil && s.Config != nil && s.Config.WebDisableEmbedded && s.Config.WebDevProxyURL != "" && r.URL.Path != "/" {
// forward to mux (which will proxy to dev server)
s.mux.ServeHTTP(w, r)
return
}
s.HandleWebsocket(w, r)
return
}
if r.Header.Get("Accept") == "application/nostr+json" {
s.HandleRelayInfo(w, r)
return
}
if s.mux == nil {
http.Error(w, "Upgrade required", http.StatusUpgradeRequired)
return
}
s.mux.ServeHTTP(w, r)
}
func (s *Server) ServiceURL(req *http.Request) (url string) {
// Use configured RelayURL if available
if s.Config != nil && s.Config.RelayURL != "" {
relayURL := strings.TrimSuffix(s.Config.RelayURL, "/")
// Ensure it has a protocol
if !strings.HasPrefix(relayURL, "http://") && !strings.HasPrefix(relayURL, "https://") {
relayURL = "http://" + relayURL
}
return relayURL
}
proto := req.Header.Get("X-Forwarded-Proto")
if proto == "" {
if req.TLS != nil {
proto = "https"
} else {
proto = "http"
}
}
host := req.Header.Get("X-Forwarded-Host")
if host == "" {
host = req.Host
}
return proto + "://" + host
}
func (s *Server) WebSocketURL(req *http.Request) (url string) {
proto := req.Header.Get("X-Forwarded-Proto")
if proto == "" {
if req.TLS != nil {
proto = "wss"
} else {
proto = "ws"
}
} else {
// Convert HTTP scheme to WebSocket scheme
if proto == "https" {
proto = "wss"
} else if proto == "http" {
proto = "ws"
}
}
host := req.Header.Get("X-Forwarded-Host")
if host == "" {
host = req.Host
}
return proto + "://" + strings.TrimRight(host, "/") + "/"
}
func (s *Server) DashboardURL(req *http.Request) (url string) {
return s.ServiceURL(req) + "/"
}
// UserInterface sets up a basic Nostr NDK interface that allows users to log into the relay user interface
func (s *Server) UserInterface() {
if s.mux == nil {
s.mux = http.NewServeMux()
}
// If dev proxy is configured, initialize it
if s.Config != nil && s.Config.WebDisableEmbedded && s.Config.WebDevProxyURL != "" {
proxyURL := s.Config.WebDevProxyURL
// Add default scheme if missing to avoid: proxy error: unsupported protocol scheme ""
if !strings.Contains(proxyURL, "://") {
proxyURL = "http://" + proxyURL
}
if target, err := url.Parse(proxyURL); !chk.E(err) {
if target.Scheme == "" || target.Host == "" {
// invalid URL, disable proxy
log.Printf(
"invalid ORLY_WEB_DEV_PROXY_URL: %q — disabling dev proxy\n",
s.Config.WebDevProxyURL,
)
} else {
s.devProxy = httputil.NewSingleHostReverseProxy(target)
// Ensure Host header points to upstream for dev servers that care
origDirector := s.devProxy.Director
s.devProxy.Director = func(req *http.Request) {
origDirector(req)
req.Host = target.Host
}
// Suppress noisy "context canceled" errors from browser navigation
s.devProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
if r.Context().Err() == context.Canceled {
// Browser canceled the request - this is normal, don't log it
return
}
log.Printf("proxy error: %v", err)
http.Error(w, "Bad Gateway", http.StatusBadGateway)
}
}
}
}
// Initialize challenge storage if not already done
if s.challenges == nil {
s.challengeMutex.Lock()
s.challenges = make(map[string][]byte)
s.challengeMutex.Unlock()
}
// Serve favicon.ico by serving favicon.png
s.mux.HandleFunc("/favicon.ico", s.handleFavicon)
// Branding/white-label endpoints (custom assets, CSS, manifest)
s.mux.HandleFunc("/branding/", s.handleBrandingAsset)
// Intercept /orly.png to serve custom logo if branding is active
s.mux.HandleFunc("/orly.png", s.handleLogo)
// Serve the main login interface (and static assets) or proxy in dev mode
s.mux.HandleFunc("/", s.handleLoginInterface)
// API endpoints for authentication
s.mux.HandleFunc("/api/auth/challenge", s.handleAuthChallenge)
s.mux.HandleFunc("/api/auth/login", s.handleAuthLogin)
s.mux.HandleFunc("/api/auth/status", s.handleAuthStatus)
s.mux.HandleFunc("/api/auth/logout", s.handleAuthLogout)
s.mux.HandleFunc("/api/permissions/", s.handlePermissions)
// Export endpoint
s.mux.HandleFunc("/api/export", s.handleExport)
// Events endpoints
s.mux.HandleFunc("/api/events/mine", s.handleEventsMine)
// Import endpoint (admin only)
s.mux.HandleFunc("/api/import", s.handleImport)
// Streaming endpoint to forward events to remote relay
s.mux.HandleFunc("/api/stream-to-relay", s.handleStreamToRelay)
// Sprocket endpoints (owner only)
s.mux.HandleFunc("/api/sprocket/status", s.handleSprocketStatus)
s.mux.HandleFunc("/api/sprocket/update", s.handleSprocketUpdate)
s.mux.HandleFunc("/api/sprocket/restart", s.handleSprocketRestart)
s.mux.HandleFunc("/api/sprocket/versions", s.handleSprocketVersions)
s.mux.HandleFunc(
"/api/sprocket/delete-version", s.handleSprocketDeleteVersion,
)
s.mux.HandleFunc("/api/sprocket/config", s.handleSprocketConfig)
// NIP-86 management endpoint
s.mux.HandleFunc("/api/nip86", s.handleNIP86Management)
// ACL mode endpoint
s.mux.HandleFunc("/api/acl-mode", s.handleACLMode)
// Log viewer endpoints (owner only)
s.mux.HandleFunc("/api/logs", s.handleGetLogs)
s.mux.HandleFunc("/api/logs/clear", s.handleClearLogs)
s.mux.HandleFunc("/api/logs/level", s.handleLogLevel)
// Sync endpoints for distributed synchronization
if s.syncManager != nil {
s.mux.HandleFunc("/api/sync/current", s.handleSyncCurrent)
s.mux.HandleFunc("/api/sync/event-ids", s.handleSyncEventIDs)
log.Printf("Distributed sync API enabled at /api/sync")
}
// Blossom blob storage API endpoint
if s.blossomServer != nil {
// Primary routes under /blossom/
s.mux.HandleFunc("/blossom/", s.blossomHandler)
// Root-level routes for clients that expect blossom at root (like Jumble)
s.mux.HandleFunc("/upload", s.blossomRootHandler)
s.mux.HandleFunc("/list/", s.blossomRootHandler)
s.mux.HandleFunc("/media", s.blossomRootHandler)
s.mux.HandleFunc("/mirror", s.blossomRootHandler)
s.mux.HandleFunc("/report", s.blossomRootHandler)
log.Printf("Blossom blob storage API enabled at /blossom and root")
} else {
log.Printf("WARNING: Blossom server is nil, routes not registered")
}
// Cluster replication API endpoints
if s.clusterManager != nil {
s.mux.HandleFunc("/cluster/latest", s.clusterManager.HandleLatestSerial)
s.mux.HandleFunc("/cluster/events", s.clusterManager.HandleEventsRange)
log.Printf("Cluster replication API enabled at /cluster")
}
// WireGuard VPN and Bunker API endpoints
// These are always registered but will return errors if not enabled
s.mux.HandleFunc("/api/wireguard/config", s.handleWireGuardConfig)
s.mux.HandleFunc("/api/wireguard/regenerate", s.handleWireGuardRegenerate)
s.mux.HandleFunc("/api/wireguard/status", s.handleWireGuardStatus)
s.mux.HandleFunc("/api/wireguard/audit", s.handleWireGuardAudit)
s.mux.HandleFunc("/api/bunker/url", s.handleBunkerURL)
s.mux.HandleFunc("/api/bunker/info", s.handleBunkerInfo)
// NRC (Nostr Relay Connect) management endpoints
s.mux.HandleFunc("/api/nrc/connections", s.handleNRCConnectionsRouter)
s.mux.HandleFunc("/api/nrc/connections/", s.handleNRCConnectionsRouter)
s.mux.HandleFunc("/api/nrc/config", s.handleNRCConfig)
}
// handleFavicon serves favicon.png as favicon.ico
func (s *Server) handleFavicon(w http.ResponseWriter, r *http.Request) {
// In dev mode with proxy configured, forward to dev server
if s.devProxy != nil {
s.devProxy.ServeHTTP(w, r)
return
}
// If web UI is disabled without a proxy, return 404
if s.Config != nil && s.Config.WebDisableEmbedded {
http.NotFound(w, r)
return
}
// Check for custom branding favicon first
if s.brandingMgr != nil {
if data, mimeType, ok := s.brandingMgr.GetAsset("favicon"); ok {
w.Header().Set("Content-Type", mimeType)
w.Header().Set("Cache-Control", "public, max-age=86400")
w.Write(data)
return
}
}
// Serve favicon.png as favicon.ico from embedded web app
w.Header().Set("Content-Type", "image/png")
w.Header().Set("Cache-Control", "public, max-age=86400") // Cache for 1 day
// Create a request for favicon.png and serve it
faviconReq := &http.Request{
Method: "GET",
URL: &url.URL{Path: "/favicon.png"},
}
ServeEmbeddedWeb(w, faviconReq)
}
// handleLogo serves the logo image, using custom branding if available
func (s *Server) handleLogo(w http.ResponseWriter, r *http.Request) {
// In dev mode with proxy configured, forward to dev server
if s.devProxy != nil {
s.devProxy.ServeHTTP(w, r)
return
}
// Check for custom branding logo first
if s.brandingMgr != nil {
if data, mimeType, ok := s.brandingMgr.GetAsset("logo"); ok {
w.Header().Set("Content-Type", mimeType)
w.Header().Set("Cache-Control", "public, max-age=86400")
w.Write(data)
return
}
}
// Fall back to embedded orly.png
w.Header().Set("Content-Type", "image/png")
w.Header().Set("Cache-Control", "public, max-age=86400")
ServeEmbeddedWeb(w, r)
}
// handleLoginInterface serves the main user interface for login
func (s *Server) handleLoginInterface(w http.ResponseWriter, r *http.Request) {
// In dev mode with proxy configured, forward to dev server
if s.devProxy != nil {
s.devProxy.ServeHTTP(w, r)
return
}
// If web UI is disabled without a proxy, return 404
if s.Config != nil && s.Config.WebDisableEmbedded {
http.NotFound(w, r)
return
}
// If branding is enabled and this is the index page, inject customizations
if s.brandingMgr != nil && (r.URL.Path == "/" || r.URL.Path == "/index.html") {
s.serveModifiedIndex(w, r)
return
}
// Serve embedded web interface
ServeEmbeddedWeb(w, r)
}
// serveModifiedIndex serves the index.html with branding modifications injected
func (s *Server) serveModifiedIndex(w http.ResponseWriter, r *http.Request) {
// Read the embedded index.html
fs := GetReactAppFS()
file, err := fs.Open("index.html")
if err != nil {
// Fallback to embedded serving
ServeEmbeddedWeb(w, r)
return
}
defer file.Close()
originalHTML, err := io.ReadAll(file)
if err != nil {
ServeEmbeddedWeb(w, r)
return
}
// Apply branding modifications
modifiedHTML, err := s.brandingMgr.ModifyIndexHTML(originalHTML)
if err != nil {
ServeEmbeddedWeb(w, r)
return
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Header().Set("Cache-Control", "no-cache")
w.Write(modifiedHTML)
}
// handleBrandingAsset serves custom branding assets (logo, icons, CSS, manifest)
func (s *Server) handleBrandingAsset(w http.ResponseWriter, r *http.Request) {
// Extract asset name from path: /branding/logo.png -> logo.png
path := strings.TrimPrefix(r.URL.Path, "/branding/")
// If no branding manager, return 404
if s.brandingMgr == nil {
http.NotFound(w, r)
return
}
switch path {
case "custom.css":
// Serve combined custom CSS
css, err := s.brandingMgr.GetCustomCSS()
if err != nil {
http.NotFound(w, r)
return
}
w.Header().Set("Content-Type", "text/css; charset=utf-8")
w.Header().Set("Cache-Control", "public, max-age=3600")
w.Write(css)
case "manifest.json":
// Serve customized manifest.json
// First read the embedded manifest
fs := GetReactAppFS()
file, err := fs.Open("manifest.json")
if err != nil {
http.NotFound(w, r)
return
}
defer file.Close()
originalManifest, err := io.ReadAll(file)
if err != nil {
http.NotFound(w, r)
return
}
manifest, err := s.brandingMgr.GetManifest(originalManifest)
if err != nil {
// Fallback to original
w.Header().Set("Content-Type", "application/manifest+json")
w.Write(originalManifest)
return
}
w.Header().Set("Content-Type", "application/manifest+json")
w.Header().Set("Cache-Control", "public, max-age=3600")
w.Write(manifest)
case "logo.png":
s.serveBrandingAsset(w, "logo")
case "favicon.png":
s.serveBrandingAsset(w, "favicon")
case "icon-192.png":
s.serveBrandingAsset(w, "icon-192")
case "icon-512.png":
s.serveBrandingAsset(w, "icon-512")
default:
http.NotFound(w, r)
}
}
// serveBrandingAsset serves a specific branding asset by name
func (s *Server) serveBrandingAsset(w http.ResponseWriter, name string) {
if s.brandingMgr == nil {
http.NotFound(w, nil)
return
}
data, mimeType, ok := s.brandingMgr.GetAsset(name)
if !ok {
http.NotFound(w, nil)
return
}
w.Header().Set("Content-Type", mimeType)
w.Header().Set("Cache-Control", "public, max-age=86400")
w.Write(data)
}
// handleAuthChallenge generates a new authentication challenge
func (s *Server) handleAuthChallenge(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
w.Header().Set("Content-Type", "application/json")
// Generate a new challenge
challenge := auth.GenerateChallenge()
challengeHex := hex.Enc(challenge)
// Store the challenge with expiration (5 minutes)
s.challengeMutex.Lock()
if s.challenges == nil {
s.challenges = make(map[string][]byte)
}
s.challenges[challengeHex] = challenge
s.challengeMutex.Unlock()
// Clean up expired challenges
go func() {
time.Sleep(5 * time.Minute)
s.challengeMutex.Lock()
delete(s.challenges, challengeHex)
s.challengeMutex.Unlock()
}()
// Return the challenge
response := struct {
Challenge string `json:"challenge"`
}{
Challenge: challengeHex,
}
jsonData, err := json.Marshal(response)
if chk.E(err) {
http.Error(
w, "Error generating challenge", http.StatusInternalServerError,
)
return
}
w.Write(jsonData)
}
// handleAuthLogin processes authentication requests
func (s *Server) handleAuthLogin(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
w.Header().Set("Content-Type", "application/json")
// Read the request body
body, err := io.ReadAll(r.Body)
if chk.E(err) {
w.Write([]byte(`{"success": false, "error": "Failed to read request body"}`))
return
}
// Parse the signed event
var evt event.E
if err = json.Unmarshal(body, &evt); chk.E(err) {
w.Write([]byte(`{"success": false, "error": "Invalid event format"}`))
return
}
// Extract the challenge from the event to look up the stored challenge
challengeTag := evt.Tags.GetFirst([]byte("challenge"))
if challengeTag == nil {
w.Write([]byte(`{"success": false, "error": "Challenge tag missing from event"}`))
return
}
challengeHex := string(challengeTag.Value())
// Retrieve the stored challenge
s.challengeMutex.RLock()
_, exists := s.challenges[challengeHex]
s.challengeMutex.RUnlock()
if !exists {
w.Write([]byte(`{"success": false, "error": "Invalid or expired challenge"}`))
return
}
// Clean up the used challenge
s.challengeMutex.Lock()
delete(s.challenges, challengeHex)
s.challengeMutex.Unlock()
relayURL := s.WebSocketURL(r)
// Validate the authentication event with the correct challenge
// The challenge in the event tag is hex-encoded, so we need to pass the hex string as bytes
ok, err := auth.Validate(&evt, []byte(challengeHex), relayURL)
if chk.E(err) || !ok {
errorMsg := "Authentication validation failed"
if err != nil {
errorMsg = err.Error()
}
w.Write([]byte(`{"success": false, "error": "` + errorMsg + `"}`))
return
}
// Authentication successful: set a simple session cookie with the pubkey
cookie := &http.Cookie{
Name: "orly_auth",
Value: hex.Enc(evt.Pubkey),
Path: "/",
HttpOnly: true,
SameSite: http.SameSiteLaxMode,
MaxAge: 60 * 60 * 24 * 30, // 30 days
}
http.SetCookie(w, cookie)
w.Write([]byte(`{"success": true}`))
}
// handleAuthStatus checks if the user is authenticated
func (s *Server) handleAuthStatus(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
w.Header().Set("Content-Type", "application/json")
// Check for auth cookie
c, err := r.Cookie("orly_auth")
if err != nil || c.Value == "" {
w.Write([]byte(`{"authenticated": false}`))
return
}
// Validate the pubkey format
pubkey, err := hex.Dec(c.Value)
if chk.E(err) {
w.Write([]byte(`{"authenticated": false}`))
return
}
// Get user permissions
permission := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr)
response := struct {
Authenticated bool `json:"authenticated"`
Pubkey string `json:"pubkey"`
Permission string `json:"permission"`
}{
Authenticated: true,
Pubkey: c.Value,
Permission: permission,
}
jsonData, err := json.Marshal(response)
if chk.E(err) {
w.Write([]byte(`{"authenticated": false}`))
return
}
w.Write(jsonData)
}
// handleAuthLogout clears the authentication cookie
func (s *Server) handleAuthLogout(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
w.Header().Set("Content-Type", "application/json")
// Clear the auth cookie
cookie := &http.Cookie{
Name: "orly_auth",
Value: "",
Path: "/",
HttpOnly: true,
SameSite: http.SameSiteLaxMode,
MaxAge: -1, // Expire immediately
}
http.SetCookie(w, cookie)
w.Write([]byte(`{"success": true}`))
}
// handlePermissions returns the permission level for a given pubkey
func (s *Server) handlePermissions(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// Extract pubkey from URL path
pubkeyHex := strings.TrimPrefix(r.URL.Path, "/api/permissions/")
if pubkeyHex == "" || pubkeyHex == "/" {
http.Error(w, "Invalid pubkey", http.StatusBadRequest)
return
}
// Convert hex to binary pubkey
pubkey, err := hex.Dec(pubkeyHex)
if chk.E(err) {
http.Error(w, "Invalid pubkey format", http.StatusBadRequest)
return
}
// Get access level using acl registry
permission := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr)
// Set content type and write JSON response
w.Header().Set("Content-Type", "application/json")
// Format response as proper JSON
response := struct {
Permission string `json:"permission"`
}{
Permission: permission,
}
// Marshal and write the response
jsonData, err := json.Marshal(response)
if chk.E(err) {
http.Error(
w, "Error generating response", http.StatusInternalServerError,
)
return
}
w.Write(jsonData)
}
// handleExport streams events as JSONL (NDJSON) using NIP-98 authentication.
// Supports both GET (query params) and POST (JSON body) for pubkey filtering.
func (s *Server) handleExport(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet && r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// Skip authentication and permission checks when ACL is "none" (open relay mode)
if acl.Registry.GetMode() != "none" {
// Validate NIP-98 authentication
valid, pubkey, err := httpauth.CheckAuth(r)
if chk.E(err) || !valid {
errorMsg := "NIP-98 authentication validation failed"
if err != nil {
errorMsg = err.Error()
}
http.Error(w, errorMsg, http.StatusUnauthorized)
return
}
// Check permissions - require write, admin, or owner level
accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr)
if accessLevel != "write" && accessLevel != "admin" && accessLevel != "owner" {
http.Error(
w, "Write, admin, or owner permission required",
http.StatusForbidden,
)
return
}
}
// Parse pubkeys from request
var pks [][]byte
if r.Method == http.MethodPost {
// Parse JSON body for pubkeys
var requestBody struct {
Pubkeys []string `json:"pubkeys"`
}
if err := json.NewDecoder(r.Body).Decode(&requestBody); err == nil {
// If JSON parsing succeeds, use pubkeys from body
for _, pkHex := range requestBody.Pubkeys {
if pkHex == "" {
continue
}
if pk, err := hex.Dec(pkHex); !chk.E(err) {
pks = append(pks, pk)
}
}
}
// If JSON parsing fails, fall back to empty pubkeys (export all)
} else {
// GET method - parse query parameters
q := r.URL.Query()
for _, pkHex := range q["pubkey"] {
if pkHex == "" {
continue
}
if pk, err := hex.Dec(pkHex); !chk.E(err) {
pks = append(pks, pk)
}
}
}
// Determine filename based on whether filtering by pubkeys
var filename string
if len(pks) == 0 {
filename = "all-events-" + time.Now().UTC().Format("20060102-150405Z") + ".jsonl"
} else if len(pks) == 1 {
filename = "my-events-" + time.Now().UTC().Format("20060102-150405Z") + ".jsonl"
} else {
filename = "filtered-events-" + time.Now().UTC().Format("20060102-150405Z") + ".jsonl"
}
w.Header().Set("Content-Type", "application/x-ndjson")
w.Header().Set(
"Content-Disposition", "attachment; filename=\""+filename+"\"",
)
w.Header().Set("X-Content-Type-Options", "nosniff")
// Flush headers to start streaming immediately
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
// Stream export
s.DB.Export(s.Ctx, w, pks...)
}
// handleEventsMine returns the authenticated user's events in JSON format with pagination using NIP-98 authentication.
func (s *Server) handleEventsMine(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// Validate NIP-98 authentication
valid, pubkey, err := httpauth.CheckAuth(r)
if chk.E(err) || !valid {
errorMsg := "NIP-98 authentication validation failed"
if err != nil {
errorMsg = err.Error()
}
http.Error(w, errorMsg, http.StatusUnauthorized)
return
}
// Parse pagination parameters
query := r.URL.Query()
limit := 50 // default limit
if l := query.Get("limit"); l != "" {
if parsed, err := strconv.Atoi(l); err == nil && parsed > 0 && parsed <= 100 {
limit = parsed
}
}
offset := 0
if o := query.Get("offset"); o != "" {
if parsed, err := strconv.Atoi(o); err == nil && parsed >= 0 {
offset = parsed
}
}
// Use QueryEvents with filter for this user's events
f := &filter.F{
Authors: tag.NewFromBytesSlice(pubkey),
}
log.Printf("DEBUG: Querying events for pubkey: %s", hex.Enc(pubkey))
events, err := s.DB.QueryEvents(s.Ctx, f)
if chk.E(err) {
log.Printf("DEBUG: QueryEvents failed: %v", err)
http.Error(w, "Failed to query events", http.StatusInternalServerError)
return
}
log.Printf("DEBUG: QueryEvents returned %d events", len(events))
// Apply pagination
totalEvents := len(events)
if offset >= totalEvents {
events = event.S{} // Empty slice
} else {
end := offset + limit
if end > totalEvents {
end = totalEvents
}
events = events[offset:end]
}
// Set content type and write JSON response
w.Header().Set("Content-Type", "application/json")
// Format response as proper JSON
response := struct {
Events []*event.E `json:"events"`
Total int `json:"total"`
Limit int `json:"limit"`
Offset int `json:"offset"`
}{
Events: events,
Total: totalEvents,
Limit: limit,
Offset: offset,
}
// Marshal and write the response
jsonData, err := json.Marshal(response)
if chk.E(err) {
// Free events before returning error
for _, ev := range events {
if ev != nil {
ev.Free()
}
}
http.Error(
w, "Error generating response", http.StatusInternalServerError,
)
return
}
w.Write(jsonData)
// Free events after successfully sending response
for _, ev := range events {
if ev != nil {
ev.Free()
}
}
}
// authenticateLocalhost authenticates requests from localhost using $NOSTR_PRIVATE_KEY
// Returns the pubkey if authentication succeeds, nil otherwise
func (s *Server) authenticateLocalhost(r *http.Request) ([]byte, error) {
// Check if request is from localhost
remoteIP := strings.Split(r.RemoteAddr, ":")[0]
if remoteIP != "127.0.0.1" && remoteIP != "::1" && remoteIP != "localhost" {
return nil, fmt.Errorf("not a localhost request")
}
// Read NOSTR_PRIVATE_KEY from environment
nsec := os.Getenv("NOSTR_PRIVATE_KEY")
if nsec == "" {
return nil, fmt.Errorf("NOSTR_PRIVATE_KEY environment variable not set")
}
// Decode nsec to get private key bytes
secretBytes, err := bech32encoding.NsecToBytes([]byte(nsec))
if err != nil {
return nil, fmt.Errorf("failed to decode nsec: %w", err)
}
// Create signer from private key
signer, err := p8k.New()
if err != nil {
return nil, fmt.Errorf("failed to create signer: %w", err)
}
if err = signer.InitSec(secretBytes); err != nil {
return nil, fmt.Errorf("failed to initialize signer: %w", err)
}
// Get public key from signer
pubkey := signer.Pub()
return pubkey, nil
}
// handleImport receives a JSONL/NDJSON file or body and enqueues an async import using NIP-98 authentication. Write, admin, or owner roles required.
// Supports folder imports via ?folder=/path/to/folder query parameter.
func (s *Server) handleImport(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var pubkey []byte
var err error
// Skip authentication and permission checks when ACL is "none" (open relay mode)
if acl.Registry.GetMode() != "none" {
// Try localhost authentication first
pubkey, err = s.authenticateLocalhost(r)
if err != nil {
// Fall back to NIP-98 authentication
valid, pk, authErr := httpauth.CheckAuth(r)
if chk.E(authErr) || !valid {
errorMsg := "NIP-98 authentication validation failed"
if authErr != nil {
errorMsg = authErr.Error()
}
http.Error(w, errorMsg, http.StatusUnauthorized)
return
}
pubkey = pk
}
// Check permissions - require write, admin, or owner level
accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr)
if accessLevel != "write" && accessLevel != "admin" && accessLevel != "owner" {
http.Error(
w, "Write, admin, or owner permission required", http.StatusForbidden,
)
return
}
}
// Check if this is a folder import
folderPath := r.URL.Query().Get("folder")
if folderPath != "" {
s.handleImportFolder(w, r, folderPath)
return
}
ct := r.Header.Get("Content-Type")
if strings.HasPrefix(ct, "multipart/form-data") {
if err := r.ParseMultipartForm(32 << 20); chk.E(err) { // 32MB memory, rest to temp files
http.Error(w, "Failed to parse form", http.StatusBadRequest)
return
}
file, _, err := r.FormFile("file")
if chk.E(err) {
http.Error(w, "Missing file", http.StatusBadRequest)
return
}
defer file.Close()
s.DB.Import(file)
} else {
if r.Body == nil {
http.Error(w, "Empty request body", http.StatusBadRequest)
return
}
s.DB.Import(r.Body)
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
w.Write([]byte(`{"success": true, "message": "Import started"}`))
}
// handleImportFolder imports all .jsonl files from a folder
func (s *Server) handleImportFolder(w http.ResponseWriter, r *http.Request, folderPath string) {
// Security: Only allow imports from specific allowed directories
// For now, allow ../scripts/exports and absolute paths that are explicitly allowed
allowedPrefixes := []string{
"../scripts/exports",
"./scripts/exports",
"scripts/exports",
}
isAllowed := false
for _, prefix := range allowedPrefixes {
if strings.HasPrefix(folderPath, prefix) {
isAllowed = true
break
}
}
// Also allow absolute paths that are explicitly in allowed directories
if !isAllowed && filepath.IsAbs(folderPath) {
// Check if it's under a reasonable directory (e.g., user's home or current working directory)
// For security, we'll be conservative and only allow relative paths for now
http.Error(w, "Absolute paths not allowed for security reasons", http.StatusForbidden)
return
}
if !isAllowed {
http.Error(w, "Folder path not in allowed directories", http.StatusForbidden)
return
}
// Resolve the folder path
absPath, err := filepath.Abs(folderPath)
if err != nil {
http.Error(w, fmt.Sprintf("Invalid folder path: %v", err), http.StatusBadRequest)
return
}
// Check if folder exists
info, err := os.Stat(absPath)
if err != nil {
http.Error(w, fmt.Sprintf("Folder not found: %v", err), http.StatusNotFound)
return
}
if !info.IsDir() {
http.Error(w, "Path is not a directory", http.StatusBadRequest)
return
}
// Read all .jsonl files in the folder
entries, err := os.ReadDir(absPath)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to read folder: %v", err), http.StatusInternalServerError)
return
}
var jsonlFiles []string
for _, entry := range entries {
if !entry.IsDir() && strings.HasSuffix(strings.ToLower(entry.Name()), ".jsonl") {
jsonlFiles = append(jsonlFiles, filepath.Join(absPath, entry.Name()))
}
}
if len(jsonlFiles) == 0 {
http.Error(w, "No .jsonl files found in folder", http.StatusNotFound)
return
}
// Process each file sequentially
type fileResult struct {
File string `json:"file"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
}
results := make([]fileResult, 0, len(jsonlFiles))
for _, filePath := range jsonlFiles {
file, err := os.Open(filePath)
if err != nil {
results = append(results, fileResult{
File: filePath,
Success: false,
Error: err.Error(),
})
continue
}
// Import the file
s.DB.Import(file)
file.Close()
results = append(results, fileResult{
File: filePath,
Success: true,
})
}
// Return results
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
response := map[string]interface{}{
"success": true,
"message": fmt.Sprintf("Import started for %d files", len(jsonlFiles)),
"files": results,
}
jsonData, _ := json.Marshal(response)
w.Write(jsonData)
}
// handleStreamToRelay streams events from the local relay to a remote relay via WebSocket
// Supports both regular EVENT streaming and negentropy sync (NIP-77)
func (s *Server) handleStreamToRelay(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var pubkey []byte
var err error
// Authenticate request (localhost or NIP-98)
if acl.Registry.GetMode() != "none" {
pubkey, err = s.authenticateLocalhost(r)
if err != nil {
valid, pk, authErr := httpauth.CheckAuth(r)
if chk.E(authErr) || !valid {
errorMsg := "NIP-98 authentication validation failed"
if authErr != nil {
errorMsg = authErr.Error()
}
http.Error(w, errorMsg, http.StatusUnauthorized)
return
}
pubkey = pk
}
// Check permissions
accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr)
if accessLevel != "write" && accessLevel != "admin" && accessLevel != "owner" {
http.Error(w, "Write, admin, or owner permission required", http.StatusForbidden)
return
}
}
// Get target relay URL (default: wss://orly-relay.imwald.eu)
targetURL := r.URL.Query().Get("target")
if targetURL == "" {
targetURL = "wss://orly-relay.imwald.eu"
}
// Normalize WebSocket URL
if strings.HasPrefix(targetURL, "http://") {
targetURL = strings.Replace(targetURL, "http://", "ws://", 1)
} else if strings.HasPrefix(targetURL, "https://") {
targetURL = strings.Replace(targetURL, "https://", "wss://", 1)
} else if !strings.HasPrefix(targetURL, "ws://") && !strings.HasPrefix(targetURL, "wss://") {
targetURL = "wss://" + targetURL
}
// Check if negentropy sync is requested
useNegentropy := r.URL.Query().Get("use_negentropy") == "true"
// Start streaming in background
go func() {
ctx := context.Background()
if useNegentropy && s.syncManager != nil {
// Use negentropy sync
_, err := s.streamWithNegentropy(ctx, targetURL)
if err != nil {
log.Printf("Negentropy stream to %s failed: %v", targetURL, err)
}
} else {
// Use regular EVENT streaming
err := s.streamEvents(ctx, targetURL)
if err != nil {
log.Printf("Event stream to %s failed: %v", targetURL, err)
}
}
}()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
response := map[string]interface{}{
"success": true,
"message": "Streaming started",
"target": targetURL,
"use_negentropy": useNegentropy,
}
jsonData, _ := json.Marshal(response)
w.Write(jsonData)
}
// streamEvents streams all events from local relay to remote relay using EVENT messages
// Uses pagination to avoid loading all events into memory at once
func (s *Server) streamEvents(ctx context.Context, targetURL string) error {
// Connect to remote relay
dialer := websocket.Dialer{
HandshakeTimeout: 30 * time.Second,
}
conn, _, err := dialer.DialContext(ctx, targetURL, http.Header{})
if err != nil {
return fmt.Errorf("failed to connect to relay: %w", err)
}
defer conn.Close()
log.Printf("Connected to remote relay: %s", targetURL)
// Stream events in batches to avoid memory exhaustion
// Query events in chunks using timestamp-based pagination
const batchLimit = 1000 // Query 1000 events at a time
var since *timestamp.T
var totalSent int64
for {
// Create filter for this batch
f := filter.New()
if since != nil {
f.Since = since
}
limit := uint(batchLimit)
f.Limit = &limit
// Query batch of events
events, err := s.DB.QueryEvents(ctx, f)
if err != nil {
return fmt.Errorf("failed to query events: %w", err)
}
if len(events) == 0 {
// No more events
break
}
// Stream this batch - ensure all events are freed
processedCount := 0
for i, ev := range events {
if ev == nil {
continue
}
// Capture timestamp before freeing
createdAt := ev.CreatedAt
// Send EVENT message: ["EVENT", event]
eventMsg := []interface{}{"EVENT", ev}
if err := conn.WriteJSON(eventMsg); err != nil {
// Free this event and all remaining events in the batch
ev.Free()
for j := i + 1; j < len(events); j++ {
if events[j] != nil {
events[j].Free()
}
}
return fmt.Errorf("failed to send event: %w", err)
}
// Free the event to return it to the pool
ev.Free()
processedCount++
totalSent++
if totalSent%1000 == 0 {
log.Printf("Streamed %d events to %s", totalSent, targetURL)
}
// Update since timestamp for next batch (use created_at + 1 to avoid duplicates)
since = timestamp.FromUnix(createdAt + 1)
}
// If we got fewer events than the limit, we've reached the end
if len(events) < batchLimit {
break
}
// Small delay between batches to avoid overwhelming the remote relay
time.Sleep(100 * time.Millisecond)
}
log.Printf("Successfully streamed %d events to %s", totalSent, targetURL)
return nil
}
// streamWithNegentropy streams events using negentropy sync (NIP-77)
// This is a simplified implementation that directly performs negentropy sync
func (s *Server) streamWithNegentropy(ctx context.Context, targetURL string) (int64, error) {
// For now, return a helpful error message
// Full negentropy implementation would require:
// 1. Building negentropy storage from local events
// 2. Creating negentropy instance
// 3. Performing NIP-77 protocol exchange
// 4. Fetching and pushing events
// This is a placeholder - the sync manager's negentropy functionality
// should be used via the existing sync infrastructure
return 0, fmt.Errorf("negentropy sync via HTTP endpoint not yet fully implemented. Use the sync manager's peer configuration or implement direct negentropy protocol")
}
// handleSprocketStatus returns the current status of the sprocket script
func (s *Server) handleSprocketStatus(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// Validate NIP-98 authentication
valid, pubkey, err := httpauth.CheckAuth(r)
if chk.E(err) || !valid {
errorMsg := "NIP-98 authentication validation failed"
if err != nil {
errorMsg = err.Error()
}
http.Error(w, errorMsg, http.StatusUnauthorized)
return
}
// Check permissions - require owner level
accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr)
if accessLevel != "owner" {
http.Error(w, "Owner permission required", http.StatusForbidden)
return
}
status := s.sprocketManager.GetSprocketStatus()
w.Header().Set("Content-Type", "application/json")
jsonData, err := json.Marshal(status)
if chk.E(err) {
http.Error(
w, "Error generating response", http.StatusInternalServerError,
)
return
}
w.Write(jsonData)
}
// handleSprocketUpdate updates the sprocket script and restarts it
func (s *Server) handleSprocketUpdate(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// Validate NIP-98 authentication
valid, pubkey, err := httpauth.CheckAuth(r)
if chk.E(err) || !valid {
errorMsg := "NIP-98 authentication validation failed"
if err != nil {
errorMsg = err.Error()
}
http.Error(w, errorMsg, http.StatusUnauthorized)
return
}
// Check permissions - require owner level
accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr)
if accessLevel != "owner" {
http.Error(w, "Owner permission required", http.StatusForbidden)
return
}
// Read the request body
body, err := io.ReadAll(r.Body)
if chk.E(err) {
http.Error(w, "Failed to read request body", http.StatusBadRequest)
return
}
// Update the sprocket script
if err := s.sprocketManager.UpdateSprocket(string(body)); chk.E(err) {
http.Error(
w, fmt.Sprintf("Failed to update sprocket: %v", err),
http.StatusInternalServerError,
)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"success": true, "message": "Sprocket updated successfully"}`))
}
// handleSprocketRestart restarts the sprocket script
func (s *Server) handleSprocketRestart(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// Validate NIP-98 authentication
valid, pubkey, err := httpauth.CheckAuth(r)
if chk.E(err) || !valid {
errorMsg := "NIP-98 authentication validation failed"
if err != nil {
errorMsg = err.Error()
}
http.Error(w, errorMsg, http.StatusUnauthorized)
return
}
// Check permissions - require owner level
accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr)
if accessLevel != "owner" {
http.Error(w, "Owner permission required", http.StatusForbidden)
return
}
// Restart the sprocket script
if err := s.sprocketManager.RestartSprocket(); chk.E(err) {
http.Error(
w, fmt.Sprintf("Failed to restart sprocket: %v", err),
http.StatusInternalServerError,
)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"success": true, "message": "Sprocket restarted successfully"}`))
}
// handleSprocketVersions returns all sprocket script versions
func (s *Server) handleSprocketVersions(
w http.ResponseWriter, r *http.Request,
) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// Validate NIP-98 authentication
valid, pubkey, err := httpauth.CheckAuth(r)
if chk.E(err) || !valid {
errorMsg := "NIP-98 authentication validation failed"
if err != nil {
errorMsg = err.Error()
}
http.Error(w, errorMsg, http.StatusUnauthorized)
return
}
// Check permissions - require owner level
accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr)
if accessLevel != "owner" {
http.Error(w, "Owner permission required", http.StatusForbidden)
return
}
versions, err := s.sprocketManager.GetSprocketVersions()
if chk.E(err) {
http.Error(
w, fmt.Sprintf("Failed to get sprocket versions: %v", err),
http.StatusInternalServerError,
)
return
}
w.Header().Set("Content-Type", "application/json")
jsonData, err := json.Marshal(versions)
if chk.E(err) {
http.Error(
w, "Error generating response", http.StatusInternalServerError,
)
return
}
w.Write(jsonData)
}
// handleSprocketDeleteVersion deletes a specific sprocket version
func (s *Server) handleSprocketDeleteVersion(
w http.ResponseWriter, r *http.Request,
) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// Validate NIP-98 authentication
valid, pubkey, err := httpauth.CheckAuth(r)
if chk.E(err) || !valid {
errorMsg := "NIP-98 authentication validation failed"
if err != nil {
errorMsg = err.Error()
}
http.Error(w, errorMsg, http.StatusUnauthorized)
return
}
// Check permissions - require owner level
accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr)
if accessLevel != "owner" {
http.Error(w, "Owner permission required", http.StatusForbidden)
return
}
// Read the request body
body, err := io.ReadAll(r.Body)
if chk.E(err) {
http.Error(w, "Failed to read request body", http.StatusBadRequest)
return
}
var request struct {
Filename string `json:"filename"`
}
if err := json.Unmarshal(body, &request); chk.E(err) {
http.Error(w, "Invalid JSON in request body", http.StatusBadRequest)
return
}
if request.Filename == "" {
http.Error(w, "Filename is required", http.StatusBadRequest)
return
}
// Delete the sprocket version
if err := s.sprocketManager.DeleteSprocketVersion(request.Filename); chk.E(err) {
http.Error(
w, fmt.Sprintf("Failed to delete sprocket version: %v", err),
http.StatusInternalServerError,
)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"success": true, "message": "Sprocket version deleted successfully"}`))
}
// handleSprocketConfig returns the sprocket configuration status
func (s *Server) handleSprocketConfig(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
w.Header().Set("Content-Type", "application/json")
response := struct {
Enabled bool `json:"enabled"`
}{
Enabled: s.Config.SprocketEnabled,
}
jsonData, err := json.Marshal(response)
if chk.E(err) {
http.Error(
w, "Error generating response", http.StatusInternalServerError,
)
return
}
w.Write(jsonData)
}
// handleACLMode returns the current ACL mode
func (s *Server) handleACLMode(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
w.Header().Set("Content-Type", "application/json")
response := struct {
ACLMode string `json:"acl_mode"`
}{
ACLMode: acl.Registry.Type(),
}
jsonData, err := json.Marshal(response)
if chk.E(err) {
http.Error(
w, "Error generating response", http.StatusInternalServerError,
)
return
}
w.Write(jsonData)
}
// handleSyncCurrent handles requests for the current serial number
func (s *Server) handleSyncCurrent(w http.ResponseWriter, r *http.Request) {
if s.syncManager == nil {
http.Error(
w, "Sync manager not initialized", http.StatusServiceUnavailable,
)
return
}
// Validate NIP-98 authentication and check peer authorization
if !s.validatePeerRequest(w, r) {
return
}
s.syncManager.HandleCurrentRequest(w, r)
}
// handleSyncEventIDs handles requests for event IDs with their serial numbers
func (s *Server) handleSyncEventIDs(w http.ResponseWriter, r *http.Request) {
if s.syncManager == nil {
http.Error(
w, "Sync manager not initialized", http.StatusServiceUnavailable,
)
return
}
// Validate NIP-98 authentication and check peer authorization
if !s.validatePeerRequest(w, r) {
return
}
s.syncManager.HandleEventIDsRequest(w, r)
}
// validatePeerRequest validates NIP-98 authentication and checks if the requesting peer is authorized
func (s *Server) validatePeerRequest(
w http.ResponseWriter, r *http.Request,
) bool {
// Validate NIP-98 authentication
valid, pubkey, err := httpauth.CheckAuth(r)
if err != nil {
log.Printf("NIP-98 auth validation error: %v", err)
http.Error(
w, "Authentication validation failed", http.StatusUnauthorized,
)
return false
}
if !valid {
http.Error(w, "NIP-98 authentication required", http.StatusUnauthorized)
return false
}
if s.syncManager == nil {
log.Printf("Sync manager not available for peer validation")
http.Error(w, "Service unavailable", http.StatusServiceUnavailable)
return false
}
// Extract the relay URL from the request (this should be in the request body)
// For now, we'll check against all configured peers
peerPubkeyHex := hex.Enc(pubkey)
// Check if this pubkey matches any of our configured peer relays' NIP-11 pubkeys
for _, peerURL := range s.syncManager.GetPeers() {
if s.syncManager.IsAuthorizedPeer(peerURL, peerPubkeyHex) {
// Also update ACL to grant admin access to this peer pubkey
s.updatePeerAdminACL(pubkey)
return true
}
}
log.Printf("Unauthorized sync request from pubkey: %s", peerPubkeyHex)
http.Error(w, "Unauthorized peer", http.StatusForbidden)
return false
}
// updatePeerAdminACL grants admin access to peer relay identity pubkeys
func (s *Server) updatePeerAdminACL(peerPubkey []byte) {
// Find the managed ACL instance and update peer admins
for _, aclInstance := range acl.Registry.ACLs() {
if aclInstance.Type() == "managed" {
if managed, ok := aclInstance.(*acl.Managed); ok {
// Collect all current peer pubkeys
var peerPubkeys [][]byte
for _, peerURL := range s.syncManager.GetPeers() {
if pubkey, err := s.syncManager.GetPeerPubkey(peerURL); err == nil {
peerPubkeys = append(peerPubkeys, []byte(pubkey))
}
}
managed.UpdatePeerAdmins(peerPubkeys)
break
}
}
}
}
// =============================================================================
// Event Service Initialization
// =============================================================================
// InitEventServices initializes the domain services for event handling.
// This should be called after the Server is created but before accepting connections.
func (s *Server) InitEventServices() {
// Initialize validation service
s.eventValidator = validation.NewWithConfig(&validation.Config{
MaxFutureSeconds: 3600, // 1 hour
})
// Initialize authorization service
authCfg := &authorization.Config{
AuthRequired: s.Config.AuthRequired,
AuthToWrite: s.Config.AuthToWrite,
NIP46BypassAuth: s.Config.NIP46BypassAuth,
Admins: s.Admins,
Owners: s.Owners,
}
s.eventAuthorizer = authorization.New(
authCfg,
s.wrapAuthACLRegistry(),
s.wrapAuthPolicyManager(),
s.wrapAuthSyncManager(),
)
// Initialize router with handlers for special event kinds
s.eventRouter = routing.New()
// Register ephemeral event handler (kinds 20000-29999)
s.eventRouter.RegisterKindCheck(
"ephemeral",
routing.IsEphemeral,
routing.MakeEphemeralHandler(s.publishers),
)
// Initialize processing service
procCfg := &processing.Config{
Admins: s.Admins,
Owners: s.Owners,
WriteTimeout: 30 * time.Second,
}
s.eventProcessor = processing.New(procCfg, s.wrapDB(), s.publishers)
// Wire up optional dependencies to processing service
if s.rateLimiter != nil {
s.eventProcessor.SetRateLimiter(s.wrapRateLimiter())
}
if s.syncManager != nil {
s.eventProcessor.SetSyncManager(s.wrapSyncManager())
}
if s.relayGroupMgr != nil {
s.eventProcessor.SetRelayGroupManager(s.wrapRelayGroupManager())
}
if s.clusterManager != nil {
s.eventProcessor.SetClusterManager(s.wrapClusterManager())
}
s.eventProcessor.SetACLRegistry(s.wrapACLRegistry())
// Initialize domain event dispatcher
s.eventDispatcher = domainevents.NewDispatcher(domainevents.DefaultDispatcherConfig())
// Register logging subscriber for analytics
logLevel := "debug"
if s.Config.LogLevel == "trace" {
logLevel = "trace"
} else if s.Config.LogLevel == "info" || s.Config.LogLevel == "warn" || s.Config.LogLevel == "error" {
logLevel = "info"
}
s.eventDispatcher.Subscribe(subscribers.NewLoggingSubscriber(logLevel))
// Wire dispatcher to processing service
s.eventProcessor.SetEventDispatcher(s.eventDispatcher)
// Initialize special kinds registry and register handlers
s.specialKinds = specialkinds.NewRegistry()
s.registerSpecialKindHandlers()
// Initialize ingestion service
s.ingestionService = ingestion.NewService(
s.eventValidator,
s.eventAuthorizer,
s.eventRouter,
s.eventProcessor,
ingestion.Config{
SprocketChecker: s.wrapSprocketChecker(),
SpecialKinds: s.specialKinds,
ACLMode: acl.Registry.GetMode,
},
)
}
// SprocketChecker wrapper for ingestion.SprocketChecker interface
type sprocketCheckerWrapper struct {
sm *SprocketManager
}
func (s *Server) wrapSprocketChecker() ingestion.SprocketChecker {
if s.sprocketManager == nil {
return nil
}
return &sprocketCheckerWrapper{sm: s.sprocketManager}
}
func (w *sprocketCheckerWrapper) IsEnabled() bool {
return w.sm != nil && w.sm.IsEnabled()
}
func (w *sprocketCheckerWrapper) IsDisabled() bool {
return w.sm.IsDisabled()
}
func (w *sprocketCheckerWrapper) IsRunning() bool {
return w.sm.IsRunning()
}
func (w *sprocketCheckerWrapper) ProcessEvent(ev *event.E) (*ingestion.SprocketResponse, error) {
resp, err := w.sm.ProcessEvent(ev)
if err != nil {
return nil, err
}
return &ingestion.SprocketResponse{
Action: resp.Action,
Msg: resp.Msg,
}, nil
}
// Database wrapper for processing.Database interface
type processingDBWrapper struct {
db database.Database
}
func (s *Server) wrapDB() processing.Database {
return &processingDBWrapper{db: s.DB}
}
func (w *processingDBWrapper) SaveEvent(ctx context.Context, ev *event.E) (exists bool, err error) {
return w.db.SaveEvent(ctx, ev)
}
func (w *processingDBWrapper) CheckForDeleted(ev *event.E, adminOwners [][]byte) error {
return w.db.CheckForDeleted(ev, adminOwners)
}
// RateLimiter wrapper for processing.RateLimiter interface
type processingRateLimiterWrapper struct {
rl *ratelimit.Limiter
}
func (s *Server) wrapRateLimiter() processing.RateLimiter {
return &processingRateLimiterWrapper{rl: s.rateLimiter}
}
func (w *processingRateLimiterWrapper) IsEnabled() bool {
return w.rl.IsEnabled()
}
func (w *processingRateLimiterWrapper) Wait(ctx context.Context, opType int) error {
w.rl.Wait(ctx, opType)
return nil
}
// SyncManager wrapper for processing.SyncManager interface
type processingSyncManagerWrapper struct {
sm *dsync.Manager
}
func (s *Server) wrapSyncManager() processing.SyncManager {
return &processingSyncManagerWrapper{sm: s.syncManager}
}
func (w *processingSyncManagerWrapper) UpdateSerial() {
w.sm.UpdateSerial()
}
// RelayGroupManager wrapper for processing.RelayGroupManager interface
type processingRelayGroupManagerWrapper struct {
rgm *dsync.RelayGroupManager
}
func (s *Server) wrapRelayGroupManager() processing.RelayGroupManager {
return &processingRelayGroupManagerWrapper{rgm: s.relayGroupMgr}
}
func (w *processingRelayGroupManagerWrapper) ValidateRelayGroupEvent(ev *event.E) error {
return w.rgm.ValidateRelayGroupEvent(ev)
}
func (w *processingRelayGroupManagerWrapper) HandleRelayGroupEvent(ev *event.E, syncMgr any) {
if sm, ok := syncMgr.(*dsync.Manager); ok {
w.rgm.HandleRelayGroupEvent(ev, sm)
}
}
// ClusterManager wrapper for processing.ClusterManager interface
type processingClusterManagerWrapper struct {
cm *dsync.ClusterManager
}
func (s *Server) wrapClusterManager() processing.ClusterManager {
return &processingClusterManagerWrapper{cm: s.clusterManager}
}
func (w *processingClusterManagerWrapper) HandleMembershipEvent(ev *event.E) error {
return w.cm.HandleMembershipEvent(ev)
}
// ACLRegistry wrapper for processing.ACLRegistry interface
type processingACLRegistryWrapper struct{}
func (s *Server) wrapACLRegistry() processing.ACLRegistry {
return &processingACLRegistryWrapper{}
}
func (w *processingACLRegistryWrapper) Configure(cfg ...any) error {
return acl.Registry.Configure(cfg...)
}
func (w *processingACLRegistryWrapper) Active() string {
return acl.Registry.GetMode()
}
// =============================================================================
// Authorization Service Wrappers
// =============================================================================
// ACLRegistry wrapper for authorization.ACLRegistry interface
type authACLRegistryWrapper struct{}
func (s *Server) wrapAuthACLRegistry() authorization.ACLRegistry {
return &authACLRegistryWrapper{}
}
func (w *authACLRegistryWrapper) GetAccessLevel(pub []byte, address string) string {
return acl.Registry.GetAccessLevel(pub, address)
}
func (w *authACLRegistryWrapper) CheckPolicy(ev *event.E) (bool, error) {
return acl.Registry.CheckPolicy(ev)
}
func (w *authACLRegistryWrapper) Active() string {
return acl.Registry.GetMode()
}
// PolicyManager wrapper for authorization.PolicyManager interface
type authPolicyManagerWrapper struct {
pm *policy.P
}
func (s *Server) wrapAuthPolicyManager() authorization.PolicyManager {
if s.policyManager == nil {
return nil
}
return &authPolicyManagerWrapper{pm: s.policyManager}
}
func (w *authPolicyManagerWrapper) IsEnabled() bool {
return w.pm.IsEnabled()
}
func (w *authPolicyManagerWrapper) CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error) {
return w.pm.CheckPolicy(action, ev, pubkey, remote)
}
// SyncManager wrapper for authorization.SyncManager interface
type authSyncManagerWrapper struct {
sm *dsync.Manager
}
func (s *Server) wrapAuthSyncManager() authorization.SyncManager {
if s.syncManager == nil {
return nil
}
return &authSyncManagerWrapper{sm: s.syncManager}
}
func (w *authSyncManagerWrapper) GetPeers() []string {
return w.sm.GetPeers()
}
func (w *authSyncManagerWrapper) IsAuthorizedPeer(url, pubkey string) bool {
return w.sm.IsAuthorizedPeer(url, pubkey)
}
// =============================================================================
// Message Processing Pause/Resume for Policy and Follow List Updates
// =============================================================================
// PauseMessageProcessing acquires an exclusive lock to pause all message processing.
// This should be called before updating policy configuration or follow lists.
// Call ResumeMessageProcessing to release the lock after updates are complete.
func (s *Server) PauseMessageProcessing() {
s.messagePauseMutex.Lock()
}
// ResumeMessageProcessing releases the exclusive lock to resume message processing.
// This should be called after policy configuration or follow list updates are complete.
func (s *Server) ResumeMessageProcessing() {
s.messagePauseMutex.Unlock()
}
// AcquireMessageProcessingLock acquires a read lock for normal message processing.
// This allows concurrent message processing while blocking during policy updates.
// Call ReleaseMessageProcessingLock when message processing is complete.
func (s *Server) AcquireMessageProcessingLock() {
s.messagePauseMutex.RLock()
}
// ReleaseMessageProcessingLock releases the read lock after message processing.
func (s *Server) ReleaseMessageProcessingLock() {
s.messagePauseMutex.RUnlock()
}