diff --git a/app/server.go b/app/server.go index 1a2dfac..5648804 100644 --- a/app/server.go +++ b/app/server.go @@ -9,16 +9,21 @@ import ( "net/http" "net/http/httputil" "net/url" + "os" + "path/filepath" "strconv" "strings" "sync" "time" + "github.com/gorilla/websocket" + "git.mleku.dev/mleku/nostr/encoders/bech32encoding" "git.mleku.dev/mleku/nostr/encoders/event" "git.mleku.dev/mleku/nostr/encoders/filter" "git.mleku.dev/mleku/nostr/encoders/hex" "git.mleku.dev/mleku/nostr/encoders/tag" "git.mleku.dev/mleku/nostr/httpauth" + "git.mleku.dev/mleku/nostr/interfaces/signer/p8k" "git.mleku.dev/mleku/nostr/protocol/auth" "lol.mleku.dev/chk" "next.orly.dev/app/branding" @@ -440,6 +445,8 @@ func (s *Server) UserInterface() { s.mux.HandleFunc("/api/events/mine", s.handleEventsMine) // Import endpoint (admin only) s.mux.HandleFunc("/api/import", s.handleImport) + // Streaming endpoint to forward events to remote relay + s.mux.HandleFunc("/api/stream-to-relay", s.handleStreamToRelay) // Sprocket endpoints (owner only) s.mux.HandleFunc("/api/sprocket/status", s.handleSprocketStatus) s.mux.HandleFunc("/api/sprocket/update", s.handleSprocketUpdate) @@ -1118,24 +1125,68 @@ func (s *Server) handleEventsMine(w http.ResponseWriter, r *http.Request) { w.Write(jsonData) } +// authenticateLocalhost authenticates requests from localhost using $NOSTR_PRIVATE_KEY +// Returns the pubkey if authentication succeeds, nil otherwise +func (s *Server) authenticateLocalhost(r *http.Request) ([]byte, error) { + // Check if request is from localhost + remoteIP := strings.Split(r.RemoteAddr, ":")[0] + if remoteIP != "127.0.0.1" && remoteIP != "::1" && remoteIP != "localhost" { + return nil, fmt.Errorf("not a localhost request") + } + + // Read NOSTR_PRIVATE_KEY from environment + nsec := os.Getenv("NOSTR_PRIVATE_KEY") + if nsec == "" { + return nil, fmt.Errorf("NOSTR_PRIVATE_KEY environment variable not set") + } + + // Decode nsec to get private key bytes + secretBytes, err := bech32encoding.NsecToBytes([]byte(nsec)) + if err != nil { + return nil, fmt.Errorf("failed to decode nsec: %w", err) + } + + // Create signer from private key + signer, err := p8k.New() + if err != nil { + return nil, fmt.Errorf("failed to create signer: %w", err) + } + if err = signer.InitSec(secretBytes); err != nil { + return nil, fmt.Errorf("failed to initialize signer: %w", err) + } + + // Get public key from signer + pubkey := signer.Pub() + return pubkey, nil +} + // handleImport receives a JSONL/NDJSON file or body and enqueues an async import using NIP-98 authentication. Write, admin, or owner roles required. +// Supports folder imports via ?folder=/path/to/folder query parameter. func (s *Server) handleImport(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } + var pubkey []byte + var err error + // Skip authentication and permission checks when ACL is "none" (open relay mode) if acl.Registry.GetMode() != "none" { - // Validate NIP-98 authentication - valid, pubkey, err := httpauth.CheckAuth(r) - if chk.E(err) || !valid { - errorMsg := "NIP-98 authentication validation failed" - if err != nil { - errorMsg = err.Error() + // Try localhost authentication first + pubkey, err = s.authenticateLocalhost(r) + if err != nil { + // Fall back to NIP-98 authentication + valid, pk, authErr := httpauth.CheckAuth(r) + if chk.E(authErr) || !valid { + errorMsg := "NIP-98 authentication validation failed" + if authErr != nil { + errorMsg = authErr.Error() + } + http.Error(w, errorMsg, http.StatusUnauthorized) + return } - http.Error(w, errorMsg, http.StatusUnauthorized) - return + pubkey = pk } // Check permissions - require write, admin, or owner level @@ -1148,6 +1199,13 @@ func (s *Server) handleImport(w http.ResponseWriter, r *http.Request) { } } + // Check if this is a folder import + folderPath := r.URL.Query().Get("folder") + if folderPath != "" { + s.handleImportFolder(w, r, folderPath) + return + } + ct := r.Header.Get("Content-Type") if strings.HasPrefix(ct, "multipart/form-data") { if err := r.ParseMultipartForm(32 << 20); chk.E(err) { // 32MB memory, rest to temp files @@ -1174,6 +1232,272 @@ func (s *Server) handleImport(w http.ResponseWriter, r *http.Request) { w.Write([]byte(`{"success": true, "message": "Import started"}`)) } +// handleImportFolder imports all .jsonl files from a folder +func (s *Server) handleImportFolder(w http.ResponseWriter, r *http.Request, folderPath string) { + // Security: Only allow imports from specific allowed directories + // For now, allow ../scripts/exports and absolute paths that are explicitly allowed + allowedPrefixes := []string{ + "../scripts/exports", + "./scripts/exports", + "scripts/exports", + } + + isAllowed := false + for _, prefix := range allowedPrefixes { + if strings.HasPrefix(folderPath, prefix) { + isAllowed = true + break + } + } + + // Also allow absolute paths that are explicitly in allowed directories + if !isAllowed && filepath.IsAbs(folderPath) { + // Check if it's under a reasonable directory (e.g., user's home or current working directory) + // For security, we'll be conservative and only allow relative paths for now + http.Error(w, "Absolute paths not allowed for security reasons", http.StatusForbidden) + return + } + + if !isAllowed { + http.Error(w, "Folder path not in allowed directories", http.StatusForbidden) + return + } + + // Resolve the folder path + absPath, err := filepath.Abs(folderPath) + if err != nil { + http.Error(w, fmt.Sprintf("Invalid folder path: %v", err), http.StatusBadRequest) + return + } + + // Check if folder exists + info, err := os.Stat(absPath) + if err != nil { + http.Error(w, fmt.Sprintf("Folder not found: %v", err), http.StatusNotFound) + return + } + if !info.IsDir() { + http.Error(w, "Path is not a directory", http.StatusBadRequest) + return + } + + // Read all .jsonl files in the folder + entries, err := os.ReadDir(absPath) + if err != nil { + http.Error(w, fmt.Sprintf("Failed to read folder: %v", err), http.StatusInternalServerError) + return + } + + var jsonlFiles []string + for _, entry := range entries { + if !entry.IsDir() && strings.HasSuffix(strings.ToLower(entry.Name()), ".jsonl") { + jsonlFiles = append(jsonlFiles, filepath.Join(absPath, entry.Name())) + } + } + + if len(jsonlFiles) == 0 { + http.Error(w, "No .jsonl files found in folder", http.StatusNotFound) + return + } + + // Process each file sequentially + type fileResult struct { + File string `json:"file"` + Success bool `json:"success"` + Error string `json:"error,omitempty"` + } + results := make([]fileResult, 0, len(jsonlFiles)) + + for _, filePath := range jsonlFiles { + file, err := os.Open(filePath) + if err != nil { + results = append(results, fileResult{ + File: filePath, + Success: false, + Error: err.Error(), + }) + continue + } + + // Import the file + s.DB.Import(file) + file.Close() + + results = append(results, fileResult{ + File: filePath, + Success: true, + }) + } + + // Return results + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusAccepted) + response := map[string]interface{}{ + "success": true, + "message": fmt.Sprintf("Import started for %d files", len(jsonlFiles)), + "files": results, + } + jsonData, _ := json.Marshal(response) + w.Write(jsonData) +} + +// handleStreamToRelay streams events from the local relay to a remote relay via WebSocket +// Supports both regular EVENT streaming and negentropy sync (NIP-77) +func (s *Server) handleStreamToRelay(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + var pubkey []byte + var err error + + // Authenticate request (localhost or NIP-98) + if acl.Registry.GetMode() != "none" { + pubkey, err = s.authenticateLocalhost(r) + if err != nil { + valid, pk, authErr := httpauth.CheckAuth(r) + if chk.E(authErr) || !valid { + errorMsg := "NIP-98 authentication validation failed" + if authErr != nil { + errorMsg = authErr.Error() + } + http.Error(w, errorMsg, http.StatusUnauthorized) + return + } + pubkey = pk + } + + // Check permissions + accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr) + if accessLevel != "write" && accessLevel != "admin" && accessLevel != "owner" { + http.Error(w, "Write, admin, or owner permission required", http.StatusForbidden) + return + } + } + + // Get target relay URL (default: wss://orly-relay.imwald.eu) + targetURL := r.URL.Query().Get("target") + if targetURL == "" { + targetURL = "wss://orly-relay.imwald.eu" + } + + // Normalize WebSocket URL + if strings.HasPrefix(targetURL, "http://") { + targetURL = strings.Replace(targetURL, "http://", "ws://", 1) + } else if strings.HasPrefix(targetURL, "https://") { + targetURL = strings.Replace(targetURL, "https://", "wss://", 1) + } else if !strings.HasPrefix(targetURL, "ws://") && !strings.HasPrefix(targetURL, "wss://") { + targetURL = "wss://" + targetURL + } + + // Check if negentropy sync is requested + useNegentropy := r.URL.Query().Get("use_negentropy") == "true" + + // Start streaming in background + go func() { + ctx := context.Background() + if useNegentropy && s.syncManager != nil { + // Use negentropy sync + _, err := s.streamWithNegentropy(ctx, targetURL) + if err != nil { + log.Printf("Negentropy stream to %s failed: %v", targetURL, err) + } + } else { + // Use regular EVENT streaming + err := s.streamEvents(ctx, targetURL) + if err != nil { + log.Printf("Event stream to %s failed: %v", targetURL, err) + } + } + }() + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusAccepted) + response := map[string]interface{}{ + "success": true, + "message": "Streaming started", + "target": targetURL, + "use_negentropy": useNegentropy, + } + jsonData, _ := json.Marshal(response) + w.Write(jsonData) +} + +// streamEvents streams all events from local relay to remote relay using EVENT messages +func (s *Server) streamEvents(ctx context.Context, targetURL string) error { + // Connect to remote relay + dialer := websocket.Dialer{ + HandshakeTimeout: 30 * time.Second, + } + + conn, _, err := dialer.DialContext(ctx, targetURL, http.Header{}) + if err != nil { + return fmt.Errorf("failed to connect to relay: %w", err) + } + defer conn.Close() + + log.Printf("Connected to remote relay: %s", targetURL) + + // Query all events from local database + f := filter.New() // Empty filter = all events + events, err := s.DB.QueryEvents(ctx, f) + if err != nil { + return fmt.Errorf("failed to query events: %w", err) + } + + log.Printf("Streaming %d events to %s", len(events), targetURL) + + // Stream events in batches + batchSize := 100 + sent := 0 + for i := 0; i < len(events); i += batchSize { + end := i + batchSize + if end > len(events) { + end = len(events) + } + + for j := i; j < end; j++ { + ev := events[j] + if ev == nil { + continue + } + + // Send EVENT message: ["EVENT", event] + eventMsg := []interface{}{"EVENT", ev} + if err := conn.WriteJSON(eventMsg); err != nil { + return fmt.Errorf("failed to send event: %w", err) + } + + sent++ + if sent%1000 == 0 { + log.Printf("Streamed %d/%d events to %s", sent, len(events), targetURL) + } + } + + // Small delay between batches to avoid overwhelming the remote relay + time.Sleep(100 * time.Millisecond) + } + + log.Printf("Successfully streamed %d events to %s", sent, targetURL) + return nil +} + +// streamWithNegentropy streams events using negentropy sync (NIP-77) +// This is a simplified implementation that directly performs negentropy sync +func (s *Server) streamWithNegentropy(ctx context.Context, targetURL string) (int64, error) { + // For now, return a helpful error message + // Full negentropy implementation would require: + // 1. Building negentropy storage from local events + // 2. Creating negentropy instance + // 3. Performing NIP-77 protocol exchange + // 4. Fetching and pushing events + + // This is a placeholder - the sync manager's negentropy functionality + // should be used via the existing sync infrastructure + return 0, fmt.Errorf("negentropy sync via HTTP endpoint not yet fully implemented. Use the sync manager's peer configuration or implement direct negentropy protocol") +} + // handleSprocketStatus returns the current status of the sprocket script func (s *Server) handleSprocketStatus(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { diff --git a/cmd/orly/db/db.go b/cmd/orly/db/db.go index 9e7d712..c2d4e79 100644 --- a/cmd/orly/db/db.go +++ b/cmd/orly/db/db.go @@ -74,6 +74,9 @@ func Run(args []string) { } else if arg == "repair" { runRepair(args[i+1:]) return + } else if arg == "import" { + runImport(args[i+1:]) + return } } diff --git a/cmd/orly/db/import.go b/cmd/orly/db/import.go new file mode 100644 index 0000000..d022210 --- /dev/null +++ b/cmd/orly/db/import.go @@ -0,0 +1,131 @@ +//go:build !(js && wasm) + +package db + +import ( + "context" + "fmt" + "os" + "strings" + + "lol.mleku.dev" + "lol.mleku.dev/chk" + "lol.mleku.dev/log" + + "next.orly.dev/pkg/database" +) + +func runImport(args []string) { + var inputFile string + var showHelp bool + + for i, arg := range args { + if arg == "--file" || arg == "-f" { + if i+1 < len(args) { + inputFile = args[i+1] + } + } else if arg == "--help" || arg == "-h" { + showHelp = true + } else if arg != "" && !strings.HasPrefix(arg, "-") && inputFile == "" { + inputFile = arg + } + } + + if showHelp { + printImportHelp() + return + } + + if inputFile == "" { + fmt.Fprintln(os.Stderr, "error: input file required") + fmt.Fprintln(os.Stderr, "usage: orly db import ") + fmt.Fprintln(os.Stderr, " or: orly db import --file ") + os.Exit(1) + } + + cfg := loadConfig() + lol.SetLogLevel(cfg.LogLevel) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create database configuration + dbCfg := &database.DatabaseConfig{ + DataDir: cfg.DataDir, + LogLevel: cfg.LogLevel, + BlockCacheMB: cfg.BlockCacheMB, + IndexCacheMB: cfg.IndexCacheMB, + QueryCacheSizeMB: cfg.QueryCacheSizeMB, + QueryCacheMaxAge: cfg.QueryCacheMaxAge, + QueryCacheDisabled: cfg.QueryCacheDisabled, + SerialCachePubkeys: cfg.SerialCachePubkeys, + SerialCacheEventIds: cfg.SerialCacheEventIds, + ZSTDLevel: cfg.ZSTDLevel, + } + + // Initialize database directly + log.I.F("initializing database at %s for import", cfg.DataDir) + db, err := database.NewWithConfig(ctx, cancel, dbCfg) + if chk.E(err) { + log.E.F("failed to initialize database: %v", err) + os.Exit(1) + } + defer db.Close() + + // Wait for database to be ready + log.I.F("waiting for database to be ready...") + <-db.Ready() + log.I.F("database ready") + + // Open input file + log.I.F("opening input file: %s", inputFile) + file, err := os.Open(inputFile) + if chk.E(err) { + log.E.F("failed to open input file: %v", err) + os.Exit(1) + } + defer file.Close() + + // Get file size for progress + fileInfo, err := file.Stat() + if chk.E(err) { + log.E.F("failed to stat file: %v", err) + os.Exit(1) + } + fileSizeMB := float64(fileInfo.Size()) / 1024 / 1024 + log.I.F("importing %.2f MB from %s", fileSizeMB, inputFile) + + // Import events + log.I.F("starting import...") + db.Import(file) + + log.I.F("import completed successfully") + fmt.Fprintf(os.Stdout, "✓ Imported events from %s (%.2f MB)\n", inputFile, fileSizeMB) +} + +func printImportHelp() { + fmt.Println(` +Import events from a JSONL file directly into the database. + +Usage: + orly db import + orly db import --file + +This command imports events from a JSONL (JSON Lines) file directly into +the Badger database, bypassing the HTTP API. This is much faster for large +imports when you have direct access to the database. + +The input file should contain one JSON event per line (JSONL format). + +Environment variables: + ORLY_DATA_DIR - Database data directory (required) + ORLY_DB_LOG_LEVEL - Log level (default: info) + ORLY_DB_BLOCK_CACHE_MB - Block cache size in MB (default: 1024) + ORLY_DB_INDEX_CACHE_MB - Index cache size in MB (default: 512) + ORLY_DB_ZSTD_LEVEL - ZSTD compression level (default: 3) + +Examples: + orly db import events.jsonl + orly db import --file /path/to/large-export.jsonl +`) +} diff --git a/cmd/orly/main.go b/cmd/orly/main.go index 2752aa4..f05d5af 100644 --- a/cmd/orly/main.go +++ b/cmd/orly/main.go @@ -25,6 +25,7 @@ // orly db health # Run database health check // orly db repair # Repair database issues // orly db repair --dry-run # Preview repairs without applying +// orly db import # Import events directly from JSONL file // orly acl --driver=follows # Run follows ACL server // orly sync --driver=negentropy # Run negentropy sync service // orly launcher # Run process supervisor diff --git a/main.go b/main.go index f9445eb..222b109 100644 --- a/main.go +++ b/main.go @@ -11,6 +11,9 @@ import ( "sync" "time" + "git.mleku.dev/mleku/nostr/crypto/keys" + "git.mleku.dev/mleku/nostr/encoders/bech32encoding" + "git.mleku.dev/mleku/nostr/encoders/hex" "github.com/adrg/xdg" "golang.org/x/term" "lol.mleku.dev/chk" @@ -18,10 +21,7 @@ import ( "next.orly.dev/app" "next.orly.dev/app/branding" "next.orly.dev/app/config" - "git.mleku.dev/mleku/nostr/crypto/keys" - "git.mleku.dev/mleku/nostr/encoders/bech32encoding" "next.orly.dev/pkg/database" - "git.mleku.dev/mleku/nostr/encoders/hex" "next.orly.dev/pkg/relay" "next.orly.dev/pkg/version" )