package app import ( "bufio" "bytes" "context" "encoding/json" "fmt" "io" "log" "net/http" "net/http/httputil" "net/url" "os" "path/filepath" "strconv" "strings" "sync" "time" "git.mleku.dev/mleku/nostr/encoders/bech32encoding" "git.mleku.dev/mleku/nostr/encoders/event" "git.mleku.dev/mleku/nostr/encoders/filter" "git.mleku.dev/mleku/nostr/encoders/hex" "git.mleku.dev/mleku/nostr/encoders/tag" "git.mleku.dev/mleku/nostr/encoders/timestamp" "git.mleku.dev/mleku/nostr/httpauth" "git.mleku.dev/mleku/nostr/interfaces/signer/p8k" "git.mleku.dev/mleku/nostr/protocol/auth" "github.com/gorilla/websocket" "lol.mleku.dev/chk" "next.orly.dev/app/branding" "next.orly.dev/app/config" "next.orly.dev/pkg/acl" "next.orly.dev/pkg/archive" "next.orly.dev/pkg/blossom" "next.orly.dev/pkg/bunker" "next.orly.dev/pkg/database" domainevents "next.orly.dev/pkg/domain/events" "next.orly.dev/pkg/domain/events/subscribers" "next.orly.dev/pkg/event/authorization" "next.orly.dev/pkg/event/ingestion" "next.orly.dev/pkg/event/processing" "next.orly.dev/pkg/event/routing" "next.orly.dev/pkg/event/specialkinds" "next.orly.dev/pkg/event/validation" acliface "next.orly.dev/pkg/interfaces/acl" "next.orly.dev/pkg/policy" "next.orly.dev/pkg/protocol/graph" "next.orly.dev/pkg/protocol/nip43" "next.orly.dev/pkg/protocol/nrc" "next.orly.dev/pkg/protocol/publish" "next.orly.dev/pkg/ratelimit" "next.orly.dev/pkg/spider" "next.orly.dev/pkg/storage" dsync "next.orly.dev/pkg/sync" "next.orly.dev/pkg/transport" "next.orly.dev/pkg/wireguard" ) type Server struct { mux *http.ServeMux // Config holds the relay configuration. // Deprecated: Use GetConfig() method instead of accessing directly. Config *config.C // Ctx holds the server context. // Deprecated: Use Context() method instead of accessing directly. Ctx context.Context publishers *publish.S // Admins holds the admin pubkeys. // Deprecated: Use IsAdmin() method instead of accessing directly. Admins [][]byte // Owners holds the owner pubkeys. // Deprecated: Use IsOwner() method instead of accessing directly. Owners [][]byte // DB holds the database instance. // Deprecated: Use Database() method instead of accessing directly. DB database.Database // optional reverse proxy for dev web server devProxy *httputil.ReverseProxy // Per-IP connection tracking to prevent resource exhaustion connPerIPMu sync.RWMutex connPerIP map[string]int // Challenge storage for HTTP UI authentication challengeMutex sync.RWMutex challenges map[string][]byte // Message processing pause mutex for policy/follow list updates // Use RLock() for normal message processing, Lock() for updates messagePauseMutex sync.RWMutex paymentProcessor *PaymentProcessor sprocketManager *SprocketManager policyManager *policy.P spiderManager *spider.Spider directorySpider *spider.DirectorySpider syncManager *dsync.Manager relayGroupMgr *dsync.RelayGroupManager clusterManager *dsync.ClusterManager blossomServer *blossom.Server InviteManager *nip43.InviteManager graphExecutor *graph.Executor rateLimiter *ratelimit.Limiter cfg *config.C db database.Database // Changed from *database.D to interface // Domain services for event handling eventValidator *validation.Service eventAuthorizer *authorization.Service eventRouter *routing.DefaultRouter eventProcessor *processing.Service eventDispatcher *domainevents.Dispatcher ingestionService *ingestion.Service specialKinds *specialkinds.Registry aclRegistry acliface.Registry // WireGuard VPN and NIP-46 Bunker wireguardServer *wireguard.Server bunkerServer *bunker.Server subnetPool *wireguard.SubnetPool // NRC (Nostr Relay Connect) bridge for remote relay access nrcBridge *nrc.Bridge // Archive relay and storage management archiveManager *archive.Manager accessTracker *storage.AccessTracker garbageCollector *storage.GarbageCollector // Transport manager for network transports (TCP, TLS, Tor, etc.) transportMgr *transport.Manager // Branding/white-label customization brandingMgr *branding.Manager } // ============================================================================= // Server Accessor Methods // ============================================================================= // GetConfig returns the relay configuration. func (s *Server) GetConfig() *config.C { return s.Config } // Context returns the server context. func (s *Server) Context() context.Context { return s.Ctx } // Database returns the database instance. func (s *Server) Database() database.Database { return s.DB } // IsAdmin returns true if the given pubkey is an admin. func (s *Server) IsAdmin(pubkey []byte) bool { pubHex := string(hex.Enc(pubkey)) for _, admin := range s.Admins { if string(hex.Enc(admin)) == pubHex { return true } } return false } // IsOwner returns true if the given pubkey is an owner. func (s *Server) IsOwner(pubkey []byte) bool { pubHex := string(hex.Enc(pubkey)) for _, owner := range s.Owners { if string(hex.Enc(owner)) == pubHex { return true } } return false } // ACLRegistry returns the ACL registry instance. // This enables dependency injection for testing and removes reliance on global state. func (s *Server) ACLRegistry() acliface.Registry { return s.aclRegistry } // isIPBlacklisted checks if an IP address is blacklisted using the managed ACL system func (s *Server) isIPBlacklisted(remote string) bool { // Extract IP from remote address (e.g., "192.168.1.1:12345" -> "192.168.1.1") remoteIP := strings.Split(remote, ":")[0] // Check static IP blacklist from config first if len(s.Config.IPBlacklist) > 0 { for _, blocked := range s.Config.IPBlacklist { // Allow simple prefix matching for subnets (e.g., "192.168" matches 192.168.0.0/16) if blocked != "" && strings.HasPrefix(remoteIP, blocked) { return true } } } // Check if managed ACL is available and active if s.Config.ACLMode == "managed" { for _, aclInstance := range acl.Registry.ACLs() { if aclInstance.Type() == "managed" { if managed, ok := aclInstance.(*acl.Managed); ok { return managed.IsIPBlocked(remoteIP) } } } } return false } // isAllowedCORSOrigin checks if the given origin is allowed for CORS requests. // Returns true if: // - CORSOrigins contains "*" (allow all) // - CORSOrigins is empty (allow all when CORS is enabled) // - The origin matches one of the configured origins func (s *Server) isAllowedCORSOrigin(origin string) bool { if origin == "" { return false } // If no specific origins configured, allow all if len(s.Config.CORSOrigins) == 0 { return true } for _, allowed := range s.Config.CORSOrigins { if allowed == "*" { return true } if allowed == origin { return true } } return false } func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Check if this is a blossom-related path (needs CORS headers) path := r.URL.Path isBlossomPath := path == "/upload" || path == "/media" || path == "/mirror" || path == "/report" || strings.HasPrefix(path, "/list/") || strings.HasPrefix(path, "/blossom/") || (len(path) == 65 && path[0] == '/') // / blob downloads // Set CORS headers for all blossom-related requests if isBlossomPath { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, PUT, DELETE, OPTIONS") w.Header().Set("Access-Control-Allow-Headers", "Authorization, authorization, Content-Type, content-type, X-SHA-256, x-sha-256, X-Content-Length, x-content-length, X-Content-Type, x-content-type, Accept, accept") w.Header().Set("Access-Control-Expose-Headers", "X-Reason, Content-Length, Content-Type, Accept-Ranges") w.Header().Set("Access-Control-Max-Age", "86400") // Handle preflight OPTIONS requests for blossom paths if r.Method == "OPTIONS" { w.WriteHeader(http.StatusOK) return } } // Set CORS headers for API endpoints when enabled (for standalone dashboard mode) // Also allow root path for NIP-11 relay info requests isAPIPath := strings.HasPrefix(path, "/api/") isRelayInfoRequest := path == "/" && r.Header.Get("Accept") == "application/nostr+json" if s.Config != nil && s.Config.CORSEnabled && (isAPIPath || isRelayInfoRequest) { origin := r.Header.Get("Origin") if s.isAllowedCORSOrigin(origin) { w.Header().Set("Access-Control-Allow-Origin", origin) w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") w.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, Accept") w.Header().Set("Access-Control-Allow-Credentials", "true") w.Header().Set("Access-Control-Max-Age", "86400") } // Handle preflight OPTIONS requests for API paths if r.Method == "OPTIONS" { w.WriteHeader(http.StatusOK) return } } if r.Method == "OPTIONS" && !isBlossomPath && !isAPIPath { // Handle OPTIONS for other paths if s.mux != nil { s.mux.ServeHTTP(w, r) return } w.WriteHeader(http.StatusOK) return } // Log proxy information for debugging (only for WebSocket requests to avoid spam) if r.Header.Get("Upgrade") == "websocket" { LogProxyInfo(r, "HTTP request") } // If this is a websocket request, only intercept the relay root path. // This allows other websocket paths (e.g., Vite HMR) to be handled by the dev proxy when enabled. if r.Header.Get("Upgrade") == "websocket" { if s.mux != nil && s.Config != nil && s.Config.WebDisableEmbedded && s.Config.WebDevProxyURL != "" && r.URL.Path != "/" { // forward to mux (which will proxy to dev server) s.mux.ServeHTTP(w, r) return } s.HandleWebsocket(w, r) return } if r.Header.Get("Accept") == "application/nostr+json" { s.HandleRelayInfo(w, r) return } if s.mux == nil { http.Error(w, "Upgrade required", http.StatusUpgradeRequired) return } s.mux.ServeHTTP(w, r) } func (s *Server) ServiceURL(req *http.Request) (url string) { // Use configured RelayURL if available if s.Config != nil && s.Config.RelayURL != "" { relayURL := strings.TrimSuffix(s.Config.RelayURL, "/") // Ensure it has a protocol if !strings.HasPrefix(relayURL, "http://") && !strings.HasPrefix(relayURL, "https://") { relayURL = "http://" + relayURL } return relayURL } proto := req.Header.Get("X-Forwarded-Proto") if proto == "" { if req.TLS != nil { proto = "https" } else { proto = "http" } } host := req.Header.Get("X-Forwarded-Host") if host == "" { host = req.Host } return proto + "://" + host } func (s *Server) WebSocketURL(req *http.Request) (url string) { proto := req.Header.Get("X-Forwarded-Proto") if proto == "" { if req.TLS != nil { proto = "wss" } else { proto = "ws" } } else { // Convert HTTP scheme to WebSocket scheme if proto == "https" { proto = "wss" } else if proto == "http" { proto = "ws" } } host := req.Header.Get("X-Forwarded-Host") if host == "" { host = req.Host } return proto + "://" + strings.TrimRight(host, "/") + "/" } func (s *Server) DashboardURL(req *http.Request) (url string) { return s.ServiceURL(req) + "/" } // UserInterface sets up a basic Nostr NDK interface that allows users to log into the relay user interface func (s *Server) UserInterface() { if s.mux == nil { s.mux = http.NewServeMux() } // If dev proxy is configured, initialize it if s.Config != nil && s.Config.WebDisableEmbedded && s.Config.WebDevProxyURL != "" { proxyURL := s.Config.WebDevProxyURL // Add default scheme if missing to avoid: proxy error: unsupported protocol scheme "" if !strings.Contains(proxyURL, "://") { proxyURL = "http://" + proxyURL } if target, err := url.Parse(proxyURL); !chk.E(err) { if target.Scheme == "" || target.Host == "" { // invalid URL, disable proxy log.Printf( "invalid ORLY_WEB_DEV_PROXY_URL: %q — disabling dev proxy\n", s.Config.WebDevProxyURL, ) } else { s.devProxy = httputil.NewSingleHostReverseProxy(target) // Ensure Host header points to upstream for dev servers that care origDirector := s.devProxy.Director s.devProxy.Director = func(req *http.Request) { origDirector(req) req.Host = target.Host } // Suppress noisy "context canceled" errors from browser navigation s.devProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) { if r.Context().Err() == context.Canceled { // Browser canceled the request - this is normal, don't log it return } log.Printf("proxy error: %v", err) http.Error(w, "Bad Gateway", http.StatusBadGateway) } } } } // Initialize challenge storage if not already done if s.challenges == nil { s.challengeMutex.Lock() s.challenges = make(map[string][]byte) s.challengeMutex.Unlock() } // Serve favicon.ico by serving favicon.png s.mux.HandleFunc("/favicon.ico", s.handleFavicon) // Branding/white-label endpoints (custom assets, CSS, manifest) s.mux.HandleFunc("/branding/", s.handleBrandingAsset) // Intercept /orly.png to serve custom logo if branding is active s.mux.HandleFunc("/orly.png", s.handleLogo) // Serve the main login interface (and static assets) or proxy in dev mode s.mux.HandleFunc("/", s.handleLoginInterface) // API endpoints for authentication s.mux.HandleFunc("/api/auth/challenge", s.handleAuthChallenge) s.mux.HandleFunc("/api/auth/login", s.handleAuthLogin) s.mux.HandleFunc("/api/auth/status", s.handleAuthStatus) s.mux.HandleFunc("/api/auth/logout", s.handleAuthLogout) s.mux.HandleFunc("/api/permissions/", s.handlePermissions) // Export endpoint s.mux.HandleFunc("/api/export", s.handleExport) // Events endpoints s.mux.HandleFunc("/api/events/mine", s.handleEventsMine) // Import endpoint (admin only) s.mux.HandleFunc("/api/import", s.handleImport) // Streaming endpoint to forward events to remote relay s.mux.HandleFunc("/api/stream-to-relay", s.handleStreamToRelay) // Sprocket endpoints (owner only) s.mux.HandleFunc("/api/sprocket/status", s.handleSprocketStatus) s.mux.HandleFunc("/api/sprocket/update", s.handleSprocketUpdate) s.mux.HandleFunc("/api/sprocket/restart", s.handleSprocketRestart) s.mux.HandleFunc("/api/sprocket/versions", s.handleSprocketVersions) s.mux.HandleFunc( "/api/sprocket/delete-version", s.handleSprocketDeleteVersion, ) s.mux.HandleFunc("/api/sprocket/config", s.handleSprocketConfig) // NIP-86 management endpoint s.mux.HandleFunc("/api/nip86", s.handleNIP86Management) // ACL mode endpoint s.mux.HandleFunc("/api/acl-mode", s.handleACLMode) // Log viewer endpoints (owner only) s.mux.HandleFunc("/api/logs", s.handleGetLogs) s.mux.HandleFunc("/api/logs/clear", s.handleClearLogs) s.mux.HandleFunc("/api/logs/level", s.handleLogLevel) // Sync endpoints for distributed synchronization if s.syncManager != nil { s.mux.HandleFunc("/api/sync/current", s.handleSyncCurrent) s.mux.HandleFunc("/api/sync/event-ids", s.handleSyncEventIDs) log.Printf("Distributed sync API enabled at /api/sync") } // Blossom blob storage API endpoint if s.blossomServer != nil { // Primary routes under /blossom/ s.mux.HandleFunc("/blossom/", s.blossomHandler) // Root-level routes for clients that expect blossom at root (like Jumble) s.mux.HandleFunc("/upload", s.blossomRootHandler) s.mux.HandleFunc("/list/", s.blossomRootHandler) s.mux.HandleFunc("/media", s.blossomRootHandler) s.mux.HandleFunc("/mirror", s.blossomRootHandler) s.mux.HandleFunc("/report", s.blossomRootHandler) log.Printf("Blossom blob storage API enabled at /blossom and root") } else { log.Printf("WARNING: Blossom server is nil, routes not registered") } // Cluster replication API endpoints if s.clusterManager != nil { s.mux.HandleFunc("/cluster/latest", s.clusterManager.HandleLatestSerial) s.mux.HandleFunc("/cluster/events", s.clusterManager.HandleEventsRange) log.Printf("Cluster replication API enabled at /cluster") } // WireGuard VPN and Bunker API endpoints // These are always registered but will return errors if not enabled s.mux.HandleFunc("/api/wireguard/config", s.handleWireGuardConfig) s.mux.HandleFunc("/api/wireguard/regenerate", s.handleWireGuardRegenerate) s.mux.HandleFunc("/api/wireguard/status", s.handleWireGuardStatus) s.mux.HandleFunc("/api/wireguard/audit", s.handleWireGuardAudit) s.mux.HandleFunc("/api/bunker/url", s.handleBunkerURL) s.mux.HandleFunc("/api/bunker/info", s.handleBunkerInfo) // NRC (Nostr Relay Connect) management endpoints s.mux.HandleFunc("/api/nrc/connections", s.handleNRCConnectionsRouter) s.mux.HandleFunc("/api/nrc/connections/", s.handleNRCConnectionsRouter) s.mux.HandleFunc("/api/nrc/config", s.handleNRCConfig) } // handleFavicon serves favicon.png as favicon.ico func (s *Server) handleFavicon(w http.ResponseWriter, r *http.Request) { // In dev mode with proxy configured, forward to dev server if s.devProxy != nil { s.devProxy.ServeHTTP(w, r) return } // If web UI is disabled without a proxy, return 404 if s.Config != nil && s.Config.WebDisableEmbedded { http.NotFound(w, r) return } // Check for custom branding favicon first if s.brandingMgr != nil { if data, mimeType, ok := s.brandingMgr.GetAsset("favicon"); ok { w.Header().Set("Content-Type", mimeType) w.Header().Set("Cache-Control", "public, max-age=86400") w.Write(data) return } } // Serve favicon.png as favicon.ico from embedded web app w.Header().Set("Content-Type", "image/png") w.Header().Set("Cache-Control", "public, max-age=86400") // Cache for 1 day // Create a request for favicon.png and serve it faviconReq := &http.Request{ Method: "GET", URL: &url.URL{Path: "/favicon.png"}, } ServeEmbeddedWeb(w, faviconReq) } // handleLogo serves the logo image, using custom branding if available func (s *Server) handleLogo(w http.ResponseWriter, r *http.Request) { // In dev mode with proxy configured, forward to dev server if s.devProxy != nil { s.devProxy.ServeHTTP(w, r) return } // Check for custom branding logo first if s.brandingMgr != nil { if data, mimeType, ok := s.brandingMgr.GetAsset("logo"); ok { w.Header().Set("Content-Type", mimeType) w.Header().Set("Cache-Control", "public, max-age=86400") w.Write(data) return } } // Fall back to embedded orly.png w.Header().Set("Content-Type", "image/png") w.Header().Set("Cache-Control", "public, max-age=86400") ServeEmbeddedWeb(w, r) } // handleLoginInterface serves the main user interface for login func (s *Server) handleLoginInterface(w http.ResponseWriter, r *http.Request) { // In dev mode with proxy configured, forward to dev server if s.devProxy != nil { s.devProxy.ServeHTTP(w, r) return } // If web UI is disabled without a proxy, return 404 if s.Config != nil && s.Config.WebDisableEmbedded { http.NotFound(w, r) return } // If branding is enabled and this is the index page, inject customizations if s.brandingMgr != nil && (r.URL.Path == "/" || r.URL.Path == "/index.html") { s.serveModifiedIndex(w, r) return } // Serve embedded web interface ServeEmbeddedWeb(w, r) } // serveModifiedIndex serves the index.html with branding modifications injected func (s *Server) serveModifiedIndex(w http.ResponseWriter, r *http.Request) { // Read the embedded index.html fs := GetReactAppFS() file, err := fs.Open("index.html") if err != nil { // Fallback to embedded serving ServeEmbeddedWeb(w, r) return } defer file.Close() originalHTML, err := io.ReadAll(file) if err != nil { ServeEmbeddedWeb(w, r) return } // Apply branding modifications modifiedHTML, err := s.brandingMgr.ModifyIndexHTML(originalHTML) if err != nil { ServeEmbeddedWeb(w, r) return } w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Cache-Control", "no-cache") w.Write(modifiedHTML) } // handleBrandingAsset serves custom branding assets (logo, icons, CSS, manifest) func (s *Server) handleBrandingAsset(w http.ResponseWriter, r *http.Request) { // Extract asset name from path: /branding/logo.png -> logo.png path := strings.TrimPrefix(r.URL.Path, "/branding/") // If no branding manager, return 404 if s.brandingMgr == nil { http.NotFound(w, r) return } switch path { case "custom.css": // Serve combined custom CSS css, err := s.brandingMgr.GetCustomCSS() if err != nil { http.NotFound(w, r) return } w.Header().Set("Content-Type", "text/css; charset=utf-8") w.Header().Set("Cache-Control", "public, max-age=3600") w.Write(css) case "manifest.json": // Serve customized manifest.json // First read the embedded manifest fs := GetReactAppFS() file, err := fs.Open("manifest.json") if err != nil { http.NotFound(w, r) return } defer file.Close() originalManifest, err := io.ReadAll(file) if err != nil { http.NotFound(w, r) return } manifest, err := s.brandingMgr.GetManifest(originalManifest) if err != nil { // Fallback to original w.Header().Set("Content-Type", "application/manifest+json") w.Write(originalManifest) return } w.Header().Set("Content-Type", "application/manifest+json") w.Header().Set("Cache-Control", "public, max-age=3600") w.Write(manifest) case "logo.png": s.serveBrandingAsset(w, "logo") case "favicon.png": s.serveBrandingAsset(w, "favicon") case "icon-192.png": s.serveBrandingAsset(w, "icon-192") case "icon-512.png": s.serveBrandingAsset(w, "icon-512") default: http.NotFound(w, r) } } // serveBrandingAsset serves a specific branding asset by name func (s *Server) serveBrandingAsset(w http.ResponseWriter, name string) { if s.brandingMgr == nil { http.NotFound(w, nil) return } data, mimeType, ok := s.brandingMgr.GetAsset(name) if !ok { http.NotFound(w, nil) return } w.Header().Set("Content-Type", mimeType) w.Header().Set("Cache-Control", "public, max-age=86400") w.Write(data) } // handleAuthChallenge generates a new authentication challenge func (s *Server) handleAuthChallenge(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } w.Header().Set("Content-Type", "application/json") // Generate a new challenge challenge := auth.GenerateChallenge() challengeHex := hex.Enc(challenge) // Store the challenge with expiration (5 minutes) s.challengeMutex.Lock() if s.challenges == nil { s.challenges = make(map[string][]byte) } s.challenges[challengeHex] = challenge s.challengeMutex.Unlock() // Clean up expired challenges go func() { time.Sleep(5 * time.Minute) s.challengeMutex.Lock() delete(s.challenges, challengeHex) s.challengeMutex.Unlock() }() // Return the challenge response := struct { Challenge string `json:"challenge"` }{ Challenge: challengeHex, } jsonData, err := json.Marshal(response) if chk.E(err) { http.Error( w, "Error generating challenge", http.StatusInternalServerError, ) return } w.Write(jsonData) } // handleAuthLogin processes authentication requests func (s *Server) handleAuthLogin(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } w.Header().Set("Content-Type", "application/json") // Read the request body body, err := io.ReadAll(r.Body) if chk.E(err) { w.Write([]byte(`{"success": false, "error": "Failed to read request body"}`)) return } // Parse the signed event var evt event.E if err = json.Unmarshal(body, &evt); chk.E(err) { w.Write([]byte(`{"success": false, "error": "Invalid event format"}`)) return } // Extract the challenge from the event to look up the stored challenge challengeTag := evt.Tags.GetFirst([]byte("challenge")) if challengeTag == nil { w.Write([]byte(`{"success": false, "error": "Challenge tag missing from event"}`)) return } challengeHex := string(challengeTag.Value()) // Retrieve the stored challenge s.challengeMutex.RLock() _, exists := s.challenges[challengeHex] s.challengeMutex.RUnlock() if !exists { w.Write([]byte(`{"success": false, "error": "Invalid or expired challenge"}`)) return } // Clean up the used challenge s.challengeMutex.Lock() delete(s.challenges, challengeHex) s.challengeMutex.Unlock() relayURL := s.WebSocketURL(r) // Validate the authentication event with the correct challenge // The challenge in the event tag is hex-encoded, so we need to pass the hex string as bytes ok, err := auth.Validate(&evt, []byte(challengeHex), relayURL) if chk.E(err) || !ok { errorMsg := "Authentication validation failed" if err != nil { errorMsg = err.Error() } w.Write([]byte(`{"success": false, "error": "` + errorMsg + `"}`)) return } // Authentication successful: set a simple session cookie with the pubkey cookie := &http.Cookie{ Name: "orly_auth", Value: hex.Enc(evt.Pubkey), Path: "/", HttpOnly: true, SameSite: http.SameSiteLaxMode, MaxAge: 60 * 60 * 24 * 30, // 30 days } http.SetCookie(w, cookie) w.Write([]byte(`{"success": true}`)) } // handleAuthStatus checks if the user is authenticated func (s *Server) handleAuthStatus(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } w.Header().Set("Content-Type", "application/json") // Check for auth cookie c, err := r.Cookie("orly_auth") if err != nil || c.Value == "" { w.Write([]byte(`{"authenticated": false}`)) return } // Validate the pubkey format pubkey, err := hex.Dec(c.Value) if chk.E(err) { w.Write([]byte(`{"authenticated": false}`)) return } // Get user permissions permission := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr) response := struct { Authenticated bool `json:"authenticated"` Pubkey string `json:"pubkey"` Permission string `json:"permission"` }{ Authenticated: true, Pubkey: c.Value, Permission: permission, } jsonData, err := json.Marshal(response) if chk.E(err) { w.Write([]byte(`{"authenticated": false}`)) return } w.Write(jsonData) } // handleAuthLogout clears the authentication cookie func (s *Server) handleAuthLogout(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } w.Header().Set("Content-Type", "application/json") // Clear the auth cookie cookie := &http.Cookie{ Name: "orly_auth", Value: "", Path: "/", HttpOnly: true, SameSite: http.SameSiteLaxMode, MaxAge: -1, // Expire immediately } http.SetCookie(w, cookie) w.Write([]byte(`{"success": true}`)) } // handlePermissions returns the permission level for a given pubkey func (s *Server) handlePermissions(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // Extract pubkey from URL path pubkeyHex := strings.TrimPrefix(r.URL.Path, "/api/permissions/") if pubkeyHex == "" || pubkeyHex == "/" { http.Error(w, "Invalid pubkey", http.StatusBadRequest) return } // Convert hex to binary pubkey pubkey, err := hex.Dec(pubkeyHex) if chk.E(err) { http.Error(w, "Invalid pubkey format", http.StatusBadRequest) return } // Get access level using acl registry permission := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr) // Set content type and write JSON response w.Header().Set("Content-Type", "application/json") // Format response as proper JSON response := struct { Permission string `json:"permission"` }{ Permission: permission, } // Marshal and write the response jsonData, err := json.Marshal(response) if chk.E(err) { http.Error( w, "Error generating response", http.StatusInternalServerError, ) return } w.Write(jsonData) } // handleExport streams events as JSONL (NDJSON) using NIP-98 authentication. // Supports both GET (query params) and POST (JSON body) for pubkey filtering. func (s *Server) handleExport(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet && r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // Skip authentication and permission checks when ACL is "none" (open relay mode) if acl.Registry.GetMode() != "none" { // Validate NIP-98 authentication valid, pubkey, err := httpauth.CheckAuth(r) if chk.E(err) || !valid { errorMsg := "NIP-98 authentication validation failed" if err != nil { errorMsg = err.Error() } http.Error(w, errorMsg, http.StatusUnauthorized) return } // Check permissions - require write, admin, or owner level accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr) if accessLevel != "write" && accessLevel != "admin" && accessLevel != "owner" { http.Error( w, "Write, admin, or owner permission required", http.StatusForbidden, ) return } } // Parse pubkeys from request var pks [][]byte if r.Method == http.MethodPost { // Parse JSON body for pubkeys var requestBody struct { Pubkeys []string `json:"pubkeys"` } if err := json.NewDecoder(r.Body).Decode(&requestBody); err == nil { // If JSON parsing succeeds, use pubkeys from body for _, pkHex := range requestBody.Pubkeys { if pkHex == "" { continue } if pk, err := hex.Dec(pkHex); !chk.E(err) { pks = append(pks, pk) } } } // If JSON parsing fails, fall back to empty pubkeys (export all) } else { // GET method - parse query parameters q := r.URL.Query() for _, pkHex := range q["pubkey"] { if pkHex == "" { continue } if pk, err := hex.Dec(pkHex); !chk.E(err) { pks = append(pks, pk) } } } // Determine filename based on whether filtering by pubkeys var filename string if len(pks) == 0 { filename = "all-events-" + time.Now().UTC().Format("20060102-150405Z") + ".jsonl" } else if len(pks) == 1 { filename = "my-events-" + time.Now().UTC().Format("20060102-150405Z") + ".jsonl" } else { filename = "filtered-events-" + time.Now().UTC().Format("20060102-150405Z") + ".jsonl" } w.Header().Set("Content-Type", "application/x-ndjson") w.Header().Set( "Content-Disposition", "attachment; filename=\""+filename+"\"", ) w.Header().Set("X-Content-Type-Options", "nosniff") // Flush headers to start streaming immediately if flusher, ok := w.(http.Flusher); ok { flusher.Flush() } // Stream export s.DB.Export(s.Ctx, w, pks...) } // handleEventsMine returns the authenticated user's events in JSON format with pagination using NIP-98 authentication. func (s *Server) handleEventsMine(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // Validate NIP-98 authentication valid, pubkey, err := httpauth.CheckAuth(r) if chk.E(err) || !valid { errorMsg := "NIP-98 authentication validation failed" if err != nil { errorMsg = err.Error() } http.Error(w, errorMsg, http.StatusUnauthorized) return } // Parse pagination parameters query := r.URL.Query() limit := 50 // default limit if l := query.Get("limit"); l != "" { if parsed, err := strconv.Atoi(l); err == nil && parsed > 0 && parsed <= 100 { limit = parsed } } offset := 0 if o := query.Get("offset"); o != "" { if parsed, err := strconv.Atoi(o); err == nil && parsed >= 0 { offset = parsed } } // Use QueryEvents with filter for this user's events f := &filter.F{ Authors: tag.NewFromBytesSlice(pubkey), } log.Printf("DEBUG: Querying events for pubkey: %s", hex.Enc(pubkey)) events, err := s.DB.QueryEvents(s.Ctx, f) if chk.E(err) { log.Printf("DEBUG: QueryEvents failed: %v", err) http.Error(w, "Failed to query events", http.StatusInternalServerError) return } log.Printf("DEBUG: QueryEvents returned %d events", len(events)) // Apply pagination totalEvents := len(events) if offset >= totalEvents { events = event.S{} // Empty slice } else { end := offset + limit if end > totalEvents { end = totalEvents } events = events[offset:end] } // Set content type and write JSON response w.Header().Set("Content-Type", "application/json") // Format response as proper JSON response := struct { Events []*event.E `json:"events"` Total int `json:"total"` Limit int `json:"limit"` Offset int `json:"offset"` }{ Events: events, Total: totalEvents, Limit: limit, Offset: offset, } // Marshal and write the response jsonData, err := json.Marshal(response) if chk.E(err) { // Free events before returning error for _, ev := range events { if ev != nil { ev.Free() } } http.Error( w, "Error generating response", http.StatusInternalServerError, ) return } w.Write(jsonData) // Free events after successfully sending response for _, ev := range events { if ev != nil { ev.Free() } } } // authenticateLocalhost authenticates requests from localhost using $NOSTR_PRIVATE_KEY // Returns the pubkey if authentication succeeds, nil otherwise func (s *Server) authenticateLocalhost(r *http.Request) ([]byte, error) { // Check if request is from localhost remoteIP := strings.Split(r.RemoteAddr, ":")[0] if remoteIP != "127.0.0.1" && remoteIP != "::1" && remoteIP != "localhost" { return nil, fmt.Errorf("not a localhost request") } // Read NOSTR_PRIVATE_KEY from environment nsec := os.Getenv("NOSTR_PRIVATE_KEY") if nsec == "" { return nil, fmt.Errorf("NOSTR_PRIVATE_KEY environment variable not set") } // Decode nsec to get private key bytes secretBytes, err := bech32encoding.NsecToBytes([]byte(nsec)) if err != nil { return nil, fmt.Errorf("failed to decode nsec: %w", err) } // Create signer from private key signer, err := p8k.New() if err != nil { return nil, fmt.Errorf("failed to create signer: %w", err) } if err = signer.InitSec(secretBytes); err != nil { return nil, fmt.Errorf("failed to initialize signer: %w", err) } // Get public key from signer pubkey := signer.Pub() return pubkey, nil } // handleImport receives a JSONL/NDJSON file or body and enqueues an async import using NIP-98 authentication. Write, admin, or owner roles required. // Supports folder imports via ?folder=/path/to/folder query parameter. func (s *Server) handleImport(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } var pubkey []byte var err error // Skip authentication and permission checks when ACL is "none" (open relay mode) if acl.Registry.GetMode() != "none" { // Try localhost authentication first (uses NOSTR_PRIVATE_KEY env var, no AUTH header required) pubkey, err = s.authenticateLocalhost(r) if err != nil { // If NOSTR_PRIVATE_KEY is set, allow the request without NIP-98 auth // This is useful for containerized environments where requests come from within the container if nsec := os.Getenv("NOSTR_PRIVATE_KEY"); nsec != "" { // Try to get pubkey from NOSTR_PRIVATE_KEY even if not localhost secretBytes, decodeErr := bech32encoding.NsecToBytes([]byte(nsec)) if decodeErr == nil { signer, signerErr := p8k.New() if signerErr == nil { if initErr := signer.InitSec(secretBytes); initErr == nil { pubkey = signer.Pub() log.Printf("[HTTP API IMPORT] Authenticated using NOSTR_PRIVATE_KEY (no NIP-98 header required)") } } } } // If we still don't have a pubkey, try NIP-98 authentication if pubkey == nil { valid, pk, authErr := httpauth.CheckAuth(r) if chk.E(authErr) || !valid { errorMsg := "Authentication required. Use NOSTR_PRIVATE_KEY env var or NIP-98 Authorization header" if authErr != nil { errorMsg = authErr.Error() } http.Error(w, errorMsg, http.StatusUnauthorized) return } pubkey = pk } } else { log.Printf("[HTTP API IMPORT] Authenticated via localhost using NOSTR_PRIVATE_KEY") } // Check permissions - require write, admin, or owner level accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr) if accessLevel != "write" && accessLevel != "admin" && accessLevel != "owner" { http.Error( w, "Write, admin, or owner permission required", http.StatusForbidden, ) return } } else { // Open relay mode - no authentication required log.Printf("[HTTP API IMPORT] Open relay mode - no authentication required") } // Check if this is a folder import folderPath := r.URL.Query().Get("folder") if folderPath != "" { s.handleImportFolder(w, r, folderPath) return } // Check if importing directly to a remote relay targetURL := r.URL.Query().Get("target") if targetURL != "" { // Normalize WebSocket URL if strings.HasPrefix(targetURL, "http://") { targetURL = strings.Replace(targetURL, "http://", "ws://", 1) } else if strings.HasPrefix(targetURL, "https://") { targetURL = strings.Replace(targetURL, "https://", "wss://", 1) } else if !strings.HasPrefix(targetURL, "ws://") && !strings.HasPrefix(targetURL, "wss://") { targetURL = "wss://" + targetURL } // Get the file/body reader var reader io.Reader ct := r.Header.Get("Content-Type") if strings.HasPrefix(ct, "multipart/form-data") { if err := r.ParseMultipartForm(32 << 20); chk.E(err) { http.Error(w, "Failed to parse form", http.StatusBadRequest) return } file, _, err := r.FormFile("file") if chk.E(err) { http.Error(w, "Missing file", http.StatusBadRequest) return } defer file.Close() reader = file } else { if r.Body == nil { http.Error(w, "Empty request body", http.StatusBadRequest) return } reader = r.Body } // Check if we should wait for completion (default: yes, for sequential processing) waitForCompletion := r.URL.Query().Get("async") != "true" if waitForCompletion { // Run synchronously - wait for completion before returning // This allows the script to process files sequentially if err := s.importToRemoteRelay(context.Background(), reader, targetURL); err != nil { http.Error(w, fmt.Sprintf("Import to remote relay failed: %v", err), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) w.Write([]byte(`{"success": true, "message": "Import to remote relay completed", "target": "` + targetURL + `"}`)) } else { // Run asynchronously in background // IMPORTANT: Read the body into memory first, as r.Body will be closed when handler returns bodyBytes, err := io.ReadAll(reader) if err != nil { log.Printf("ERROR: Failed to read request body for remote import: %v", err) http.Error(w, fmt.Sprintf("Failed to read request body: %v", err), http.StatusInternalServerError) return } log.Printf("Starting async import to remote relay %s (body size: %d bytes)", targetURL, len(bodyBytes)) go func() { bodyReader := bytes.NewReader(bodyBytes) if err := s.importToRemoteRelay(context.Background(), bodyReader, targetURL); err != nil { log.Printf("ERROR: Import to remote relay %s failed: %v", targetURL, err) } }() w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusAccepted) w.Write([]byte(`{"success": true, "message": "Import to remote relay started", "target": "` + targetURL + `"}`)) } return } // Default: import to local database // Check if we should wait for completion (default: yes, for reliability) waitForCompletion := r.URL.Query().Get("async") != "true" ct := r.Header.Get("Content-Type") var reader io.Reader if strings.HasPrefix(ct, "multipart/form-data") { if err := r.ParseMultipartForm(32 << 20); chk.E(err) { // 32MB memory, rest to temp files http.Error(w, "Failed to parse form", http.StatusBadRequest) return } file, _, err := r.FormFile("file") if chk.E(err) { http.Error(w, "Missing file", http.StatusBadRequest) return } defer file.Close() reader = file } else { if r.Body == nil { http.Error(w, "Empty request body", http.StatusBadRequest) return } reader = r.Body } if waitForCompletion { // Run synchronously - wait for completion before returning // This ensures the import is complete and we can return proper status log.Printf("================================================") log.Printf("[HTTP API IMPORT] Starting synchronous import") log.Printf("[HTTP API IMPORT] This may take several minutes for large files") log.Printf("================================================") s.DB.Import(reader) log.Printf("================================================") log.Printf("[HTTP API IMPORT] Import completed successfully") log.Printf("================================================") w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) w.Write([]byte(`{"success": true, "message": "Import completed"}`)) } else { // Run asynchronously in background // IMPORTANT: Read the body into memory first, as r.Body will be closed when handler returns bodyBytes, err := io.ReadAll(reader) if err != nil { log.Printf("ERROR: Failed to read request body for async import: %v", err) http.Error(w, fmt.Sprintf("Failed to read request body: %v", err), http.StatusInternalServerError) return } log.Printf("Starting async import (body size: %d bytes)", len(bodyBytes)) go func() { bodyReader := bytes.NewReader(bodyBytes) s.DB.Import(bodyReader) log.Printf("Async import completed") }() w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusAccepted) w.Write([]byte(`{"success": true, "message": "Import started"}`)) } } // importToRemoteRelay reads JSONL from a reader and sends events directly to a remote relay func (s *Server) importToRemoteRelay(ctx context.Context, reader io.Reader, targetURL string) error { log.Printf("Starting import to remote relay: %s", targetURL) // Connect to remote relay dialer := websocket.Dialer{ HandshakeTimeout: 30 * time.Second, } log.Printf("Attempting WebSocket connection to: %s", targetURL) conn, _, err := dialer.DialContext(ctx, targetURL, http.Header{}) if err != nil { log.Printf("ERROR: Failed to connect to remote relay %s: %v", targetURL, err) return fmt.Errorf("failed to connect to relay: %w", err) } defer conn.Close() log.Printf("Successfully connected to remote relay: %s", targetURL) // Read JSONL line by line and send events scanner := bufio.NewScanner(reader) buf := make([]byte, 0, 64*1024) scanner.Buffer(buf, 10*1024*1024) // 10MB max line size var totalSent, totalAccepted, totalRejected int64 rejectedKinds := make(map[uint16]int) rejectionReasons := make(map[string]int) // reason -> count pendingEvents := make(map[string]uint16) // eventID -> kind (for matching OK responses) // Start reading OK responses in background type okResponse struct { EventID string Accepted bool Reason string } okChan := make(chan okResponse, 10000) doneReading := make(chan struct{}) go func() { defer close(doneReading) for { var raw []interface{} if err := conn.ReadJSON(&raw); err != nil { return } if len(raw) >= 3 && raw[0] == "OK" { eventID, _ := raw[1].(string) accepted, _ := raw[2].(bool) reason := "" if len(raw) >= 4 { reason, _ = raw[3].(string) } select { case okChan <- okResponse{EventID: eventID, Accepted: accepted, Reason: reason}: default: } } } }() // Process events line by line for scanner.Scan() { select { case <-ctx.Done(): return ctx.Err() default: } line := scanner.Bytes() if len(line) == 0 { continue } // Parse event ev := event.New() if _, err := ev.Unmarshal(line); err != nil { ev.Free() continue } // Serialize event for sending eventJSON := ev.Serialize() var eventMap map[string]interface{} if err := json.Unmarshal(eventJSON, &eventMap); err != nil { ev.Free() continue } // Send EVENT message eventMsg := []interface{}{"EVENT", eventMap} if err := conn.WriteJSON(eventMsg); err != nil { ev.Free() return fmt.Errorf("failed to send event: %w", err) } eventID := hex.Enc(ev.ID) eventKind := ev.Kind totalSent++ // Store pending event for OK response matching pendingEvents[eventID] = eventKind // Process any available OK responses (non-blocking) for { select { case ok := <-okChan: if kind, exists := pendingEvents[ok.EventID]; exists { if ok.Accepted { totalAccepted++ } else { totalRejected++ rejectedKinds[kind]++ if ok.Reason != "" { rejectionReasons[ok.Reason]++ } } delete(pendingEvents, ok.EventID) } default: // No more responses available right now goto continueLoop } } continueLoop: ev.Free() // Progress logging every 1000 events if totalSent%1000 == 0 { log.Printf("Import to %s: %d sent (accepted: %d, rejected: %d, pending: %d)", targetURL, totalSent, totalAccepted, totalRejected, len(pendingEvents)) } } if err := scanner.Err(); err != nil { return fmt.Errorf("scanner error: %w", err) } // Wait for remaining OK responses (up to 30 seconds for large batches) timeout := time.After(30 * time.Second) waitStart := time.Now() for len(pendingEvents) > 0 && time.Since(waitStart) < 30*time.Second { select { case ok := <-okChan: if kind, exists := pendingEvents[ok.EventID]; exists { if ok.Accepted { totalAccepted++ } else { totalRejected++ rejectedKinds[kind]++ if ok.Reason != "" { rejectionReasons[ok.Reason]++ } } delete(pendingEvents, ok.EventID) } case <-timeout: log.Printf("Timeout waiting for OK responses, %d events still pending", len(pendingEvents)) break } } log.Printf("Import to %s completed: %d sent (accepted: %d, rejected: %d, no response: %d)", targetURL, totalSent, totalAccepted, totalRejected, len(pendingEvents)) if totalRejected > 0 { log.Printf("Rejected events by kind: %v", rejectedKinds) if len(rejectionReasons) > 0 { log.Printf("Rejection reasons:") for reason, count := range rejectionReasons { log.Printf(" %s: %d", reason, count) } } } if len(pendingEvents) > 0 { log.Printf("Warning: %d events sent but no OK response received (may have been accepted silently)", len(pendingEvents)) } return nil } // handleImportFolder imports all .jsonl files from a folder func (s *Server) handleImportFolder(w http.ResponseWriter, r *http.Request, folderPath string) { // Security: Only allow imports from specific allowed directories // For now, allow ../scripts/exports and absolute paths that are explicitly allowed allowedPrefixes := []string{ "../scripts/exports", "./scripts/exports", "scripts/exports", } isAllowed := false for _, prefix := range allowedPrefixes { if strings.HasPrefix(folderPath, prefix) { isAllowed = true break } } // Also allow absolute paths that are explicitly in allowed directories if !isAllowed && filepath.IsAbs(folderPath) { // Check if it's under a reasonable directory (e.g., user's home or current working directory) // For security, we'll be conservative and only allow relative paths for now http.Error(w, "Absolute paths not allowed for security reasons", http.StatusForbidden) return } if !isAllowed { http.Error(w, "Folder path not in allowed directories", http.StatusForbidden) return } // Resolve the folder path absPath, err := filepath.Abs(folderPath) if err != nil { http.Error(w, fmt.Sprintf("Invalid folder path: %v", err), http.StatusBadRequest) return } // Check if folder exists info, err := os.Stat(absPath) if err != nil { http.Error(w, fmt.Sprintf("Folder not found: %v", err), http.StatusNotFound) return } if !info.IsDir() { http.Error(w, "Path is not a directory", http.StatusBadRequest) return } // Read all .jsonl files in the folder entries, err := os.ReadDir(absPath) if err != nil { http.Error(w, fmt.Sprintf("Failed to read folder: %v", err), http.StatusInternalServerError) return } var jsonlFiles []string for _, entry := range entries { if !entry.IsDir() && strings.HasSuffix(strings.ToLower(entry.Name()), ".jsonl") { jsonlFiles = append(jsonlFiles, filepath.Join(absPath, entry.Name())) } } if len(jsonlFiles) == 0 { http.Error(w, "No .jsonl files found in folder", http.StatusNotFound) return } // Process each file sequentially type fileResult struct { File string `json:"file"` Success bool `json:"success"` Error string `json:"error,omitempty"` } results := make([]fileResult, 0, len(jsonlFiles)) for _, filePath := range jsonlFiles { file, err := os.Open(filePath) if err != nil { results = append(results, fileResult{ File: filePath, Success: false, Error: err.Error(), }) continue } // Import the file s.DB.Import(file) file.Close() results = append(results, fileResult{ File: filePath, Success: true, }) } // Return results w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusAccepted) response := map[string]interface{}{ "success": true, "message": fmt.Sprintf("Import started for %d files", len(jsonlFiles)), "files": results, } jsonData, _ := json.Marshal(response) w.Write(jsonData) } // handleStreamToRelay streams events from the local relay to a remote relay via WebSocket // Supports both regular EVENT streaming and negentropy sync (NIP-77) func (s *Server) handleStreamToRelay(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } var pubkey []byte var err error // Authenticate request (localhost or NIP-98) if acl.Registry.GetMode() != "none" { pubkey, err = s.authenticateLocalhost(r) if err != nil { valid, pk, authErr := httpauth.CheckAuth(r) if chk.E(authErr) || !valid { errorMsg := "NIP-98 authentication validation failed" if authErr != nil { errorMsg = authErr.Error() } http.Error(w, errorMsg, http.StatusUnauthorized) return } pubkey = pk } // Check permissions accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr) if accessLevel != "write" && accessLevel != "admin" && accessLevel != "owner" { http.Error(w, "Write, admin, or owner permission required", http.StatusForbidden) return } } // Get target relay URL (default: wss://orly-relay.imwald.eu) targetURL := r.URL.Query().Get("target") if targetURL == "" { targetURL = "wss://orly-relay.imwald.eu" } // Normalize WebSocket URL if strings.HasPrefix(targetURL, "http://") { targetURL = strings.Replace(targetURL, "http://", "ws://", 1) } else if strings.HasPrefix(targetURL, "https://") { targetURL = strings.Replace(targetURL, "https://", "wss://", 1) } else if !strings.HasPrefix(targetURL, "ws://") && !strings.HasPrefix(targetURL, "wss://") { targetURL = "wss://" + targetURL } // Check if negentropy sync is requested useNegentropy := r.URL.Query().Get("use_negentropy") == "true" // Start streaming in background go func() { ctx := context.Background() if useNegentropy && s.syncManager != nil { // Use negentropy sync _, err := s.streamWithNegentropy(ctx, targetURL) if err != nil { log.Printf("Negentropy stream to %s failed: %v", targetURL, err) } } else { // Use regular EVENT streaming err := s.streamEvents(ctx, targetURL) if err != nil { log.Printf("Event stream to %s failed: %v", targetURL, err) } } }() w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusAccepted) response := map[string]interface{}{ "success": true, "message": "Streaming started", "target": targetURL, "use_negentropy": useNegentropy, } jsonData, _ := json.Marshal(response) w.Write(jsonData) } // streamEvents streams all events from local relay to remote relay using EVENT messages // Uses pagination to avoid loading all events into memory at once func (s *Server) streamEvents(ctx context.Context, targetURL string) error { // Connect to remote relay dialer := websocket.Dialer{ HandshakeTimeout: 30 * time.Second, } conn, _, err := dialer.DialContext(ctx, targetURL, http.Header{}) if err != nil { return fmt.Errorf("failed to connect to relay: %w", err) } defer conn.Close() log.Printf("Connected to remote relay: %s", targetURL) // Stream events in batches to avoid memory exhaustion // Query events in chunks using timestamp-based pagination const batchLimit = 1000 // Query 1000 events at a time var since *timestamp.T var totalSent int64 for { // Create filter for this batch f := filter.New() if since != nil { f.Since = since } limit := uint(batchLimit) f.Limit = &limit // Query batch of events events, err := s.DB.QueryEvents(ctx, f) if err != nil { return fmt.Errorf("failed to query events: %w", err) } if len(events) == 0 { // No more events break } // Stream this batch - ensure all events are freed processedCount := 0 for i, ev := range events { if ev == nil { continue } // Capture timestamp before freeing createdAt := ev.CreatedAt // Send EVENT message: ["EVENT", event] eventMsg := []interface{}{"EVENT", ev} if err := conn.WriteJSON(eventMsg); err != nil { // Free this event and all remaining events in the batch ev.Free() for j := i + 1; j < len(events); j++ { if events[j] != nil { events[j].Free() } } return fmt.Errorf("failed to send event: %w", err) } // Free the event to return it to the pool ev.Free() processedCount++ totalSent++ if totalSent%1000 == 0 { log.Printf("Streamed %d events to %s", totalSent, targetURL) } // Update since timestamp for next batch (use created_at + 1 to avoid duplicates) since = timestamp.FromUnix(createdAt + 1) } // If we got fewer events than the limit, we've reached the end if len(events) < batchLimit { break } // Small delay between batches to avoid overwhelming the remote relay time.Sleep(100 * time.Millisecond) } log.Printf("Successfully streamed %d events to %s", totalSent, targetURL) return nil } // streamWithNegentropy streams events using negentropy sync (NIP-77) // This is a simplified implementation that directly performs negentropy sync func (s *Server) streamWithNegentropy(ctx context.Context, targetURL string) (int64, error) { // For now, return a helpful error message // Full negentropy implementation would require: // 1. Building negentropy storage from local events // 2. Creating negentropy instance // 3. Performing NIP-77 protocol exchange // 4. Fetching and pushing events // This is a placeholder - the sync manager's negentropy functionality // should be used via the existing sync infrastructure return 0, fmt.Errorf("negentropy sync via HTTP endpoint not yet fully implemented. Use the sync manager's peer configuration or implement direct negentropy protocol") } // handleSprocketStatus returns the current status of the sprocket script func (s *Server) handleSprocketStatus(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // Validate NIP-98 authentication valid, pubkey, err := httpauth.CheckAuth(r) if chk.E(err) || !valid { errorMsg := "NIP-98 authentication validation failed" if err != nil { errorMsg = err.Error() } http.Error(w, errorMsg, http.StatusUnauthorized) return } // Check permissions - require owner level accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr) if accessLevel != "owner" { http.Error(w, "Owner permission required", http.StatusForbidden) return } status := s.sprocketManager.GetSprocketStatus() w.Header().Set("Content-Type", "application/json") jsonData, err := json.Marshal(status) if chk.E(err) { http.Error( w, "Error generating response", http.StatusInternalServerError, ) return } w.Write(jsonData) } // handleSprocketUpdate updates the sprocket script and restarts it func (s *Server) handleSprocketUpdate(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // Validate NIP-98 authentication valid, pubkey, err := httpauth.CheckAuth(r) if chk.E(err) || !valid { errorMsg := "NIP-98 authentication validation failed" if err != nil { errorMsg = err.Error() } http.Error(w, errorMsg, http.StatusUnauthorized) return } // Check permissions - require owner level accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr) if accessLevel != "owner" { http.Error(w, "Owner permission required", http.StatusForbidden) return } // Read the request body body, err := io.ReadAll(r.Body) if chk.E(err) { http.Error(w, "Failed to read request body", http.StatusBadRequest) return } // Update the sprocket script if err := s.sprocketManager.UpdateSprocket(string(body)); chk.E(err) { http.Error( w, fmt.Sprintf("Failed to update sprocket: %v", err), http.StatusInternalServerError, ) return } w.Header().Set("Content-Type", "application/json") w.Write([]byte(`{"success": true, "message": "Sprocket updated successfully"}`)) } // handleSprocketRestart restarts the sprocket script func (s *Server) handleSprocketRestart(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // Validate NIP-98 authentication valid, pubkey, err := httpauth.CheckAuth(r) if chk.E(err) || !valid { errorMsg := "NIP-98 authentication validation failed" if err != nil { errorMsg = err.Error() } http.Error(w, errorMsg, http.StatusUnauthorized) return } // Check permissions - require owner level accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr) if accessLevel != "owner" { http.Error(w, "Owner permission required", http.StatusForbidden) return } // Restart the sprocket script if err := s.sprocketManager.RestartSprocket(); chk.E(err) { http.Error( w, fmt.Sprintf("Failed to restart sprocket: %v", err), http.StatusInternalServerError, ) return } w.Header().Set("Content-Type", "application/json") w.Write([]byte(`{"success": true, "message": "Sprocket restarted successfully"}`)) } // handleSprocketVersions returns all sprocket script versions func (s *Server) handleSprocketVersions( w http.ResponseWriter, r *http.Request, ) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // Validate NIP-98 authentication valid, pubkey, err := httpauth.CheckAuth(r) if chk.E(err) || !valid { errorMsg := "NIP-98 authentication validation failed" if err != nil { errorMsg = err.Error() } http.Error(w, errorMsg, http.StatusUnauthorized) return } // Check permissions - require owner level accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr) if accessLevel != "owner" { http.Error(w, "Owner permission required", http.StatusForbidden) return } versions, err := s.sprocketManager.GetSprocketVersions() if chk.E(err) { http.Error( w, fmt.Sprintf("Failed to get sprocket versions: %v", err), http.StatusInternalServerError, ) return } w.Header().Set("Content-Type", "application/json") jsonData, err := json.Marshal(versions) if chk.E(err) { http.Error( w, "Error generating response", http.StatusInternalServerError, ) return } w.Write(jsonData) } // handleSprocketDeleteVersion deletes a specific sprocket version func (s *Server) handleSprocketDeleteVersion( w http.ResponseWriter, r *http.Request, ) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // Validate NIP-98 authentication valid, pubkey, err := httpauth.CheckAuth(r) if chk.E(err) || !valid { errorMsg := "NIP-98 authentication validation failed" if err != nil { errorMsg = err.Error() } http.Error(w, errorMsg, http.StatusUnauthorized) return } // Check permissions - require owner level accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr) if accessLevel != "owner" { http.Error(w, "Owner permission required", http.StatusForbidden) return } // Read the request body body, err := io.ReadAll(r.Body) if chk.E(err) { http.Error(w, "Failed to read request body", http.StatusBadRequest) return } var request struct { Filename string `json:"filename"` } if err := json.Unmarshal(body, &request); chk.E(err) { http.Error(w, "Invalid JSON in request body", http.StatusBadRequest) return } if request.Filename == "" { http.Error(w, "Filename is required", http.StatusBadRequest) return } // Delete the sprocket version if err := s.sprocketManager.DeleteSprocketVersion(request.Filename); chk.E(err) { http.Error( w, fmt.Sprintf("Failed to delete sprocket version: %v", err), http.StatusInternalServerError, ) return } w.Header().Set("Content-Type", "application/json") w.Write([]byte(`{"success": true, "message": "Sprocket version deleted successfully"}`)) } // handleSprocketConfig returns the sprocket configuration status func (s *Server) handleSprocketConfig(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } w.Header().Set("Content-Type", "application/json") response := struct { Enabled bool `json:"enabled"` }{ Enabled: s.Config.SprocketEnabled, } jsonData, err := json.Marshal(response) if chk.E(err) { http.Error( w, "Error generating response", http.StatusInternalServerError, ) return } w.Write(jsonData) } // handleACLMode returns the current ACL mode func (s *Server) handleACLMode(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } w.Header().Set("Content-Type", "application/json") response := struct { ACLMode string `json:"acl_mode"` }{ ACLMode: acl.Registry.Type(), } jsonData, err := json.Marshal(response) if chk.E(err) { http.Error( w, "Error generating response", http.StatusInternalServerError, ) return } w.Write(jsonData) } // handleSyncCurrent handles requests for the current serial number func (s *Server) handleSyncCurrent(w http.ResponseWriter, r *http.Request) { if s.syncManager == nil { http.Error( w, "Sync manager not initialized", http.StatusServiceUnavailable, ) return } // Validate NIP-98 authentication and check peer authorization if !s.validatePeerRequest(w, r) { return } s.syncManager.HandleCurrentRequest(w, r) } // handleSyncEventIDs handles requests for event IDs with their serial numbers func (s *Server) handleSyncEventIDs(w http.ResponseWriter, r *http.Request) { if s.syncManager == nil { http.Error( w, "Sync manager not initialized", http.StatusServiceUnavailable, ) return } // Validate NIP-98 authentication and check peer authorization if !s.validatePeerRequest(w, r) { return } s.syncManager.HandleEventIDsRequest(w, r) } // validatePeerRequest validates NIP-98 authentication and checks if the requesting peer is authorized func (s *Server) validatePeerRequest( w http.ResponseWriter, r *http.Request, ) bool { // Validate NIP-98 authentication valid, pubkey, err := httpauth.CheckAuth(r) if err != nil { log.Printf("NIP-98 auth validation error: %v", err) http.Error( w, "Authentication validation failed", http.StatusUnauthorized, ) return false } if !valid { http.Error(w, "NIP-98 authentication required", http.StatusUnauthorized) return false } if s.syncManager == nil { log.Printf("Sync manager not available for peer validation") http.Error(w, "Service unavailable", http.StatusServiceUnavailable) return false } // Extract the relay URL from the request (this should be in the request body) // For now, we'll check against all configured peers peerPubkeyHex := hex.Enc(pubkey) // Check if this pubkey matches any of our configured peer relays' NIP-11 pubkeys for _, peerURL := range s.syncManager.GetPeers() { if s.syncManager.IsAuthorizedPeer(peerURL, peerPubkeyHex) { // Also update ACL to grant admin access to this peer pubkey s.updatePeerAdminACL(pubkey) return true } } log.Printf("Unauthorized sync request from pubkey: %s", peerPubkeyHex) http.Error(w, "Unauthorized peer", http.StatusForbidden) return false } // updatePeerAdminACL grants admin access to peer relay identity pubkeys func (s *Server) updatePeerAdminACL(peerPubkey []byte) { // Find the managed ACL instance and update peer admins for _, aclInstance := range acl.Registry.ACLs() { if aclInstance.Type() == "managed" { if managed, ok := aclInstance.(*acl.Managed); ok { // Collect all current peer pubkeys var peerPubkeys [][]byte for _, peerURL := range s.syncManager.GetPeers() { if pubkey, err := s.syncManager.GetPeerPubkey(peerURL); err == nil { peerPubkeys = append(peerPubkeys, []byte(pubkey)) } } managed.UpdatePeerAdmins(peerPubkeys) break } } } } // ============================================================================= // Event Service Initialization // ============================================================================= // InitEventServices initializes the domain services for event handling. // This should be called after the Server is created but before accepting connections. func (s *Server) InitEventServices() { // Initialize validation service s.eventValidator = validation.NewWithConfig(&validation.Config{ MaxFutureSeconds: 3600, // 1 hour }) // Initialize authorization service authCfg := &authorization.Config{ AuthRequired: s.Config.AuthRequired, AuthToWrite: s.Config.AuthToWrite, NIP46BypassAuth: s.Config.NIP46BypassAuth, Admins: s.Admins, Owners: s.Owners, } s.eventAuthorizer = authorization.New( authCfg, s.wrapAuthACLRegistry(), s.wrapAuthPolicyManager(), s.wrapAuthSyncManager(), ) // Initialize router with handlers for special event kinds s.eventRouter = routing.New() // Register ephemeral event handler (kinds 20000-29999) s.eventRouter.RegisterKindCheck( "ephemeral", routing.IsEphemeral, routing.MakeEphemeralHandler(s.publishers), ) // Initialize processing service procCfg := &processing.Config{ Admins: s.Admins, Owners: s.Owners, WriteTimeout: 30 * time.Second, } s.eventProcessor = processing.New(procCfg, s.wrapDB(), s.publishers) // Wire up optional dependencies to processing service if s.rateLimiter != nil { s.eventProcessor.SetRateLimiter(s.wrapRateLimiter()) } if s.syncManager != nil { s.eventProcessor.SetSyncManager(s.wrapSyncManager()) } if s.relayGroupMgr != nil { s.eventProcessor.SetRelayGroupManager(s.wrapRelayGroupManager()) } if s.clusterManager != nil { s.eventProcessor.SetClusterManager(s.wrapClusterManager()) } s.eventProcessor.SetACLRegistry(s.wrapACLRegistry()) // Initialize domain event dispatcher s.eventDispatcher = domainevents.NewDispatcher(domainevents.DefaultDispatcherConfig()) // Register logging subscriber for analytics logLevel := "debug" if s.Config.LogLevel == "trace" { logLevel = "trace" } else if s.Config.LogLevel == "info" || s.Config.LogLevel == "warn" || s.Config.LogLevel == "error" { logLevel = "info" } s.eventDispatcher.Subscribe(subscribers.NewLoggingSubscriber(logLevel)) // Wire dispatcher to processing service s.eventProcessor.SetEventDispatcher(s.eventDispatcher) // Initialize special kinds registry and register handlers s.specialKinds = specialkinds.NewRegistry() s.registerSpecialKindHandlers() // Initialize ingestion service s.ingestionService = ingestion.NewService( s.eventValidator, s.eventAuthorizer, s.eventRouter, s.eventProcessor, ingestion.Config{ SprocketChecker: s.wrapSprocketChecker(), SpecialKinds: s.specialKinds, ACLMode: acl.Registry.GetMode, }, ) } // SprocketChecker wrapper for ingestion.SprocketChecker interface type sprocketCheckerWrapper struct { sm *SprocketManager } func (s *Server) wrapSprocketChecker() ingestion.SprocketChecker { if s.sprocketManager == nil { return nil } return &sprocketCheckerWrapper{sm: s.sprocketManager} } func (w *sprocketCheckerWrapper) IsEnabled() bool { return w.sm != nil && w.sm.IsEnabled() } func (w *sprocketCheckerWrapper) IsDisabled() bool { return w.sm.IsDisabled() } func (w *sprocketCheckerWrapper) IsRunning() bool { return w.sm.IsRunning() } func (w *sprocketCheckerWrapper) ProcessEvent(ev *event.E) (*ingestion.SprocketResponse, error) { resp, err := w.sm.ProcessEvent(ev) if err != nil { return nil, err } return &ingestion.SprocketResponse{ Action: resp.Action, Msg: resp.Msg, }, nil } // Database wrapper for processing.Database interface type processingDBWrapper struct { db database.Database } func (s *Server) wrapDB() processing.Database { return &processingDBWrapper{db: s.DB} } func (w *processingDBWrapper) SaveEvent(ctx context.Context, ev *event.E) (exists bool, err error) { return w.db.SaveEvent(ctx, ev) } func (w *processingDBWrapper) CheckForDeleted(ev *event.E, adminOwners [][]byte) error { return w.db.CheckForDeleted(ev, adminOwners) } // RateLimiter wrapper for processing.RateLimiter interface type processingRateLimiterWrapper struct { rl *ratelimit.Limiter } func (s *Server) wrapRateLimiter() processing.RateLimiter { return &processingRateLimiterWrapper{rl: s.rateLimiter} } func (w *processingRateLimiterWrapper) IsEnabled() bool { return w.rl.IsEnabled() } func (w *processingRateLimiterWrapper) Wait(ctx context.Context, opType int) error { w.rl.Wait(ctx, opType) return nil } // SyncManager wrapper for processing.SyncManager interface type processingSyncManagerWrapper struct { sm *dsync.Manager } func (s *Server) wrapSyncManager() processing.SyncManager { return &processingSyncManagerWrapper{sm: s.syncManager} } func (w *processingSyncManagerWrapper) UpdateSerial() { w.sm.UpdateSerial() } // RelayGroupManager wrapper for processing.RelayGroupManager interface type processingRelayGroupManagerWrapper struct { rgm *dsync.RelayGroupManager } func (s *Server) wrapRelayGroupManager() processing.RelayGroupManager { return &processingRelayGroupManagerWrapper{rgm: s.relayGroupMgr} } func (w *processingRelayGroupManagerWrapper) ValidateRelayGroupEvent(ev *event.E) error { return w.rgm.ValidateRelayGroupEvent(ev) } func (w *processingRelayGroupManagerWrapper) HandleRelayGroupEvent(ev *event.E, syncMgr any) { if sm, ok := syncMgr.(*dsync.Manager); ok { w.rgm.HandleRelayGroupEvent(ev, sm) } } // ClusterManager wrapper for processing.ClusterManager interface type processingClusterManagerWrapper struct { cm *dsync.ClusterManager } func (s *Server) wrapClusterManager() processing.ClusterManager { return &processingClusterManagerWrapper{cm: s.clusterManager} } func (w *processingClusterManagerWrapper) HandleMembershipEvent(ev *event.E) error { return w.cm.HandleMembershipEvent(ev) } // ACLRegistry wrapper for processing.ACLRegistry interface type processingACLRegistryWrapper struct{} func (s *Server) wrapACLRegistry() processing.ACLRegistry { return &processingACLRegistryWrapper{} } func (w *processingACLRegistryWrapper) Configure(cfg ...any) error { return acl.Registry.Configure(cfg...) } func (w *processingACLRegistryWrapper) Active() string { return acl.Registry.GetMode() } // ============================================================================= // Authorization Service Wrappers // ============================================================================= // ACLRegistry wrapper for authorization.ACLRegistry interface type authACLRegistryWrapper struct{} func (s *Server) wrapAuthACLRegistry() authorization.ACLRegistry { return &authACLRegistryWrapper{} } func (w *authACLRegistryWrapper) GetAccessLevel(pub []byte, address string) string { return acl.Registry.GetAccessLevel(pub, address) } func (w *authACLRegistryWrapper) CheckPolicy(ev *event.E) (bool, error) { return acl.Registry.CheckPolicy(ev) } func (w *authACLRegistryWrapper) Active() string { return acl.Registry.GetMode() } // PolicyManager wrapper for authorization.PolicyManager interface type authPolicyManagerWrapper struct { pm *policy.P } func (s *Server) wrapAuthPolicyManager() authorization.PolicyManager { if s.policyManager == nil { return nil } return &authPolicyManagerWrapper{pm: s.policyManager} } func (w *authPolicyManagerWrapper) IsEnabled() bool { return w.pm.IsEnabled() } func (w *authPolicyManagerWrapper) CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error) { return w.pm.CheckPolicy(action, ev, pubkey, remote) } // SyncManager wrapper for authorization.SyncManager interface type authSyncManagerWrapper struct { sm *dsync.Manager } func (s *Server) wrapAuthSyncManager() authorization.SyncManager { if s.syncManager == nil { return nil } return &authSyncManagerWrapper{sm: s.syncManager} } func (w *authSyncManagerWrapper) GetPeers() []string { return w.sm.GetPeers() } func (w *authSyncManagerWrapper) IsAuthorizedPeer(url, pubkey string) bool { return w.sm.IsAuthorizedPeer(url, pubkey) } // ============================================================================= // Message Processing Pause/Resume for Policy and Follow List Updates // ============================================================================= // PauseMessageProcessing acquires an exclusive lock to pause all message processing. // This should be called before updating policy configuration or follow lists. // Call ResumeMessageProcessing to release the lock after updates are complete. func (s *Server) PauseMessageProcessing() { s.messagePauseMutex.Lock() } // ResumeMessageProcessing releases the exclusive lock to resume message processing. // This should be called after policy configuration or follow list updates are complete. func (s *Server) ResumeMessageProcessing() { s.messagePauseMutex.Unlock() } // AcquireMessageProcessingLock acquires a read lock for normal message processing. // This allows concurrent message processing while blocking during policy updates. // Call ReleaseMessageProcessingLock when message processing is complete. func (s *Server) AcquireMessageProcessingLock() { s.messagePauseMutex.RLock() } // ReleaseMessageProcessingLock releases the read lock after message processing. func (s *Server) ReleaseMessageProcessingLock() { s.messagePauseMutex.RUnlock() }