Browse Source

implement mass-import function

imwald-v0.58.5
silberengel 4 months ago
parent
commit
4bd9421a1d
  1. 340
      app/server.go
  2. 3
      cmd/orly/db/db.go
  3. 131
      cmd/orly/db/import.go
  4. 1
      cmd/orly/main.go
  5. 6
      main.go

340
app/server.go

@ -9,16 +9,21 @@ import (
"net/http" "net/http"
"net/http/httputil" "net/http/httputil"
"net/url" "net/url"
"os"
"path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/gorilla/websocket"
"git.mleku.dev/mleku/nostr/encoders/bech32encoding"
"git.mleku.dev/mleku/nostr/encoders/event" "git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/filter" "git.mleku.dev/mleku/nostr/encoders/filter"
"git.mleku.dev/mleku/nostr/encoders/hex" "git.mleku.dev/mleku/nostr/encoders/hex"
"git.mleku.dev/mleku/nostr/encoders/tag" "git.mleku.dev/mleku/nostr/encoders/tag"
"git.mleku.dev/mleku/nostr/httpauth" "git.mleku.dev/mleku/nostr/httpauth"
"git.mleku.dev/mleku/nostr/interfaces/signer/p8k"
"git.mleku.dev/mleku/nostr/protocol/auth" "git.mleku.dev/mleku/nostr/protocol/auth"
"lol.mleku.dev/chk" "lol.mleku.dev/chk"
"next.orly.dev/app/branding" "next.orly.dev/app/branding"
@ -440,6 +445,8 @@ func (s *Server) UserInterface() {
s.mux.HandleFunc("/api/events/mine", s.handleEventsMine) s.mux.HandleFunc("/api/events/mine", s.handleEventsMine)
// Import endpoint (admin only) // Import endpoint (admin only)
s.mux.HandleFunc("/api/import", s.handleImport) s.mux.HandleFunc("/api/import", s.handleImport)
// Streaming endpoint to forward events to remote relay
s.mux.HandleFunc("/api/stream-to-relay", s.handleStreamToRelay)
// Sprocket endpoints (owner only) // Sprocket endpoints (owner only)
s.mux.HandleFunc("/api/sprocket/status", s.handleSprocketStatus) s.mux.HandleFunc("/api/sprocket/status", s.handleSprocketStatus)
s.mux.HandleFunc("/api/sprocket/update", s.handleSprocketUpdate) s.mux.HandleFunc("/api/sprocket/update", s.handleSprocketUpdate)
@ -1118,24 +1125,68 @@ func (s *Server) handleEventsMine(w http.ResponseWriter, r *http.Request) {
w.Write(jsonData) w.Write(jsonData)
} }
// authenticateLocalhost authenticates requests from localhost using $NOSTR_PRIVATE_KEY
// Returns the pubkey if authentication succeeds, nil otherwise
func (s *Server) authenticateLocalhost(r *http.Request) ([]byte, error) {
// Check if request is from localhost
remoteIP := strings.Split(r.RemoteAddr, ":")[0]
if remoteIP != "127.0.0.1" && remoteIP != "::1" && remoteIP != "localhost" {
return nil, fmt.Errorf("not a localhost request")
}
// Read NOSTR_PRIVATE_KEY from environment
nsec := os.Getenv("NOSTR_PRIVATE_KEY")
if nsec == "" {
return nil, fmt.Errorf("NOSTR_PRIVATE_KEY environment variable not set")
}
// Decode nsec to get private key bytes
secretBytes, err := bech32encoding.NsecToBytes([]byte(nsec))
if err != nil {
return nil, fmt.Errorf("failed to decode nsec: %w", err)
}
// Create signer from private key
signer, err := p8k.New()
if err != nil {
return nil, fmt.Errorf("failed to create signer: %w", err)
}
if err = signer.InitSec(secretBytes); err != nil {
return nil, fmt.Errorf("failed to initialize signer: %w", err)
}
// Get public key from signer
pubkey := signer.Pub()
return pubkey, nil
}
// handleImport receives a JSONL/NDJSON file or body and enqueues an async import using NIP-98 authentication. Write, admin, or owner roles required. // handleImport receives a JSONL/NDJSON file or body and enqueues an async import using NIP-98 authentication. Write, admin, or owner roles required.
// Supports folder imports via ?folder=/path/to/folder query parameter.
func (s *Server) handleImport(w http.ResponseWriter, r *http.Request) { func (s *Server) handleImport(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost { if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return return
} }
var pubkey []byte
var err error
// Skip authentication and permission checks when ACL is "none" (open relay mode) // Skip authentication and permission checks when ACL is "none" (open relay mode)
if acl.Registry.GetMode() != "none" { if acl.Registry.GetMode() != "none" {
// Validate NIP-98 authentication // Try localhost authentication first
valid, pubkey, err := httpauth.CheckAuth(r) pubkey, err = s.authenticateLocalhost(r)
if chk.E(err) || !valid { if err != nil {
errorMsg := "NIP-98 authentication validation failed" // Fall back to NIP-98 authentication
if err != nil { valid, pk, authErr := httpauth.CheckAuth(r)
errorMsg = err.Error() if chk.E(authErr) || !valid {
errorMsg := "NIP-98 authentication validation failed"
if authErr != nil {
errorMsg = authErr.Error()
}
http.Error(w, errorMsg, http.StatusUnauthorized)
return
} }
http.Error(w, errorMsg, http.StatusUnauthorized) pubkey = pk
return
} }
// Check permissions - require write, admin, or owner level // Check permissions - require write, admin, or owner level
@ -1148,6 +1199,13 @@ func (s *Server) handleImport(w http.ResponseWriter, r *http.Request) {
} }
} }
// Check if this is a folder import
folderPath := r.URL.Query().Get("folder")
if folderPath != "" {
s.handleImportFolder(w, r, folderPath)
return
}
ct := r.Header.Get("Content-Type") ct := r.Header.Get("Content-Type")
if strings.HasPrefix(ct, "multipart/form-data") { if strings.HasPrefix(ct, "multipart/form-data") {
if err := r.ParseMultipartForm(32 << 20); chk.E(err) { // 32MB memory, rest to temp files if err := r.ParseMultipartForm(32 << 20); chk.E(err) { // 32MB memory, rest to temp files
@ -1174,6 +1232,272 @@ func (s *Server) handleImport(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"success": true, "message": "Import started"}`)) w.Write([]byte(`{"success": true, "message": "Import started"}`))
} }
// handleImportFolder imports all .jsonl files from a folder
func (s *Server) handleImportFolder(w http.ResponseWriter, r *http.Request, folderPath string) {
// Security: Only allow imports from specific allowed directories
// For now, allow ../scripts/exports and absolute paths that are explicitly allowed
allowedPrefixes := []string{
"../scripts/exports",
"./scripts/exports",
"scripts/exports",
}
isAllowed := false
for _, prefix := range allowedPrefixes {
if strings.HasPrefix(folderPath, prefix) {
isAllowed = true
break
}
}
// Also allow absolute paths that are explicitly in allowed directories
if !isAllowed && filepath.IsAbs(folderPath) {
// Check if it's under a reasonable directory (e.g., user's home or current working directory)
// For security, we'll be conservative and only allow relative paths for now
http.Error(w, "Absolute paths not allowed for security reasons", http.StatusForbidden)
return
}
if !isAllowed {
http.Error(w, "Folder path not in allowed directories", http.StatusForbidden)
return
}
// Resolve the folder path
absPath, err := filepath.Abs(folderPath)
if err != nil {
http.Error(w, fmt.Sprintf("Invalid folder path: %v", err), http.StatusBadRequest)
return
}
// Check if folder exists
info, err := os.Stat(absPath)
if err != nil {
http.Error(w, fmt.Sprintf("Folder not found: %v", err), http.StatusNotFound)
return
}
if !info.IsDir() {
http.Error(w, "Path is not a directory", http.StatusBadRequest)
return
}
// Read all .jsonl files in the folder
entries, err := os.ReadDir(absPath)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to read folder: %v", err), http.StatusInternalServerError)
return
}
var jsonlFiles []string
for _, entry := range entries {
if !entry.IsDir() && strings.HasSuffix(strings.ToLower(entry.Name()), ".jsonl") {
jsonlFiles = append(jsonlFiles, filepath.Join(absPath, entry.Name()))
}
}
if len(jsonlFiles) == 0 {
http.Error(w, "No .jsonl files found in folder", http.StatusNotFound)
return
}
// Process each file sequentially
type fileResult struct {
File string `json:"file"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
}
results := make([]fileResult, 0, len(jsonlFiles))
for _, filePath := range jsonlFiles {
file, err := os.Open(filePath)
if err != nil {
results = append(results, fileResult{
File: filePath,
Success: false,
Error: err.Error(),
})
continue
}
// Import the file
s.DB.Import(file)
file.Close()
results = append(results, fileResult{
File: filePath,
Success: true,
})
}
// Return results
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
response := map[string]interface{}{
"success": true,
"message": fmt.Sprintf("Import started for %d files", len(jsonlFiles)),
"files": results,
}
jsonData, _ := json.Marshal(response)
w.Write(jsonData)
}
// handleStreamToRelay streams events from the local relay to a remote relay via WebSocket
// Supports both regular EVENT streaming and negentropy sync (NIP-77)
func (s *Server) handleStreamToRelay(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var pubkey []byte
var err error
// Authenticate request (localhost or NIP-98)
if acl.Registry.GetMode() != "none" {
pubkey, err = s.authenticateLocalhost(r)
if err != nil {
valid, pk, authErr := httpauth.CheckAuth(r)
if chk.E(authErr) || !valid {
errorMsg := "NIP-98 authentication validation failed"
if authErr != nil {
errorMsg = authErr.Error()
}
http.Error(w, errorMsg, http.StatusUnauthorized)
return
}
pubkey = pk
}
// Check permissions
accessLevel := acl.Registry.GetAccessLevel(pubkey, r.RemoteAddr)
if accessLevel != "write" && accessLevel != "admin" && accessLevel != "owner" {
http.Error(w, "Write, admin, or owner permission required", http.StatusForbidden)
return
}
}
// Get target relay URL (default: wss://orly-relay.imwald.eu)
targetURL := r.URL.Query().Get("target")
if targetURL == "" {
targetURL = "wss://orly-relay.imwald.eu"
}
// Normalize WebSocket URL
if strings.HasPrefix(targetURL, "http://") {
targetURL = strings.Replace(targetURL, "http://", "ws://", 1)
} else if strings.HasPrefix(targetURL, "https://") {
targetURL = strings.Replace(targetURL, "https://", "wss://", 1)
} else if !strings.HasPrefix(targetURL, "ws://") && !strings.HasPrefix(targetURL, "wss://") {
targetURL = "wss://" + targetURL
}
// Check if negentropy sync is requested
useNegentropy := r.URL.Query().Get("use_negentropy") == "true"
// Start streaming in background
go func() {
ctx := context.Background()
if useNegentropy && s.syncManager != nil {
// Use negentropy sync
_, err := s.streamWithNegentropy(ctx, targetURL)
if err != nil {
log.Printf("Negentropy stream to %s failed: %v", targetURL, err)
}
} else {
// Use regular EVENT streaming
err := s.streamEvents(ctx, targetURL)
if err != nil {
log.Printf("Event stream to %s failed: %v", targetURL, err)
}
}
}()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
response := map[string]interface{}{
"success": true,
"message": "Streaming started",
"target": targetURL,
"use_negentropy": useNegentropy,
}
jsonData, _ := json.Marshal(response)
w.Write(jsonData)
}
// streamEvents streams all events from local relay to remote relay using EVENT messages
func (s *Server) streamEvents(ctx context.Context, targetURL string) error {
// Connect to remote relay
dialer := websocket.Dialer{
HandshakeTimeout: 30 * time.Second,
}
conn, _, err := dialer.DialContext(ctx, targetURL, http.Header{})
if err != nil {
return fmt.Errorf("failed to connect to relay: %w", err)
}
defer conn.Close()
log.Printf("Connected to remote relay: %s", targetURL)
// Query all events from local database
f := filter.New() // Empty filter = all events
events, err := s.DB.QueryEvents(ctx, f)
if err != nil {
return fmt.Errorf("failed to query events: %w", err)
}
log.Printf("Streaming %d events to %s", len(events), targetURL)
// Stream events in batches
batchSize := 100
sent := 0
for i := 0; i < len(events); i += batchSize {
end := i + batchSize
if end > len(events) {
end = len(events)
}
for j := i; j < end; j++ {
ev := events[j]
if ev == nil {
continue
}
// Send EVENT message: ["EVENT", event]
eventMsg := []interface{}{"EVENT", ev}
if err := conn.WriteJSON(eventMsg); err != nil {
return fmt.Errorf("failed to send event: %w", err)
}
sent++
if sent%1000 == 0 {
log.Printf("Streamed %d/%d events to %s", sent, len(events), targetURL)
}
}
// Small delay between batches to avoid overwhelming the remote relay
time.Sleep(100 * time.Millisecond)
}
log.Printf("Successfully streamed %d events to %s", sent, targetURL)
return nil
}
// streamWithNegentropy streams events using negentropy sync (NIP-77)
// This is a simplified implementation that directly performs negentropy sync
func (s *Server) streamWithNegentropy(ctx context.Context, targetURL string) (int64, error) {
// For now, return a helpful error message
// Full negentropy implementation would require:
// 1. Building negentropy storage from local events
// 2. Creating negentropy instance
// 3. Performing NIP-77 protocol exchange
// 4. Fetching and pushing events
// This is a placeholder - the sync manager's negentropy functionality
// should be used via the existing sync infrastructure
return 0, fmt.Errorf("negentropy sync via HTTP endpoint not yet fully implemented. Use the sync manager's peer configuration or implement direct negentropy protocol")
}
// handleSprocketStatus returns the current status of the sprocket script // handleSprocketStatus returns the current status of the sprocket script
func (s *Server) handleSprocketStatus(w http.ResponseWriter, r *http.Request) { func (s *Server) handleSprocketStatus(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet { if r.Method != http.MethodGet {

3
cmd/orly/db/db.go

@ -74,6 +74,9 @@ func Run(args []string) {
} else if arg == "repair" { } else if arg == "repair" {
runRepair(args[i+1:]) runRepair(args[i+1:])
return return
} else if arg == "import" {
runImport(args[i+1:])
return
} }
} }

131
cmd/orly/db/import.go

@ -0,0 +1,131 @@
//go:build !(js && wasm)
package db
import (
"context"
"fmt"
"os"
"strings"
"lol.mleku.dev"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/pkg/database"
)
func runImport(args []string) {
var inputFile string
var showHelp bool
for i, arg := range args {
if arg == "--file" || arg == "-f" {
if i+1 < len(args) {
inputFile = args[i+1]
}
} else if arg == "--help" || arg == "-h" {
showHelp = true
} else if arg != "" && !strings.HasPrefix(arg, "-") && inputFile == "" {
inputFile = arg
}
}
if showHelp {
printImportHelp()
return
}
if inputFile == "" {
fmt.Fprintln(os.Stderr, "error: input file required")
fmt.Fprintln(os.Stderr, "usage: orly db import <file.jsonl>")
fmt.Fprintln(os.Stderr, " or: orly db import --file <file.jsonl>")
os.Exit(1)
}
cfg := loadConfig()
lol.SetLogLevel(cfg.LogLevel)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create database configuration
dbCfg := &database.DatabaseConfig{
DataDir: cfg.DataDir,
LogLevel: cfg.LogLevel,
BlockCacheMB: cfg.BlockCacheMB,
IndexCacheMB: cfg.IndexCacheMB,
QueryCacheSizeMB: cfg.QueryCacheSizeMB,
QueryCacheMaxAge: cfg.QueryCacheMaxAge,
QueryCacheDisabled: cfg.QueryCacheDisabled,
SerialCachePubkeys: cfg.SerialCachePubkeys,
SerialCacheEventIds: cfg.SerialCacheEventIds,
ZSTDLevel: cfg.ZSTDLevel,
}
// Initialize database directly
log.I.F("initializing database at %s for import", cfg.DataDir)
db, err := database.NewWithConfig(ctx, cancel, dbCfg)
if chk.E(err) {
log.E.F("failed to initialize database: %v", err)
os.Exit(1)
}
defer db.Close()
// Wait for database to be ready
log.I.F("waiting for database to be ready...")
<-db.Ready()
log.I.F("database ready")
// Open input file
log.I.F("opening input file: %s", inputFile)
file, err := os.Open(inputFile)
if chk.E(err) {
log.E.F("failed to open input file: %v", err)
os.Exit(1)
}
defer file.Close()
// Get file size for progress
fileInfo, err := file.Stat()
if chk.E(err) {
log.E.F("failed to stat file: %v", err)
os.Exit(1)
}
fileSizeMB := float64(fileInfo.Size()) / 1024 / 1024
log.I.F("importing %.2f MB from %s", fileSizeMB, inputFile)
// Import events
log.I.F("starting import...")
db.Import(file)
log.I.F("import completed successfully")
fmt.Fprintf(os.Stdout, "✓ Imported events from %s (%.2f MB)\n", inputFile, fileSizeMB)
}
func printImportHelp() {
fmt.Println(`
Import events from a JSONL file directly into the database.
Usage:
orly db import <file.jsonl>
orly db import --file <file.jsonl>
This command imports events from a JSONL (JSON Lines) file directly into
the Badger database, bypassing the HTTP API. This is much faster for large
imports when you have direct access to the database.
The input file should contain one JSON event per line (JSONL format).
Environment variables:
ORLY_DATA_DIR - Database data directory (required)
ORLY_DB_LOG_LEVEL - Log level (default: info)
ORLY_DB_BLOCK_CACHE_MB - Block cache size in MB (default: 1024)
ORLY_DB_INDEX_CACHE_MB - Index cache size in MB (default: 512)
ORLY_DB_ZSTD_LEVEL - ZSTD compression level (default: 3)
Examples:
orly db import events.jsonl
orly db import --file /path/to/large-export.jsonl
`)
}

1
cmd/orly/main.go

@ -25,6 +25,7 @@
// orly db health # Run database health check // orly db health # Run database health check
// orly db repair # Repair database issues // orly db repair # Repair database issues
// orly db repair --dry-run # Preview repairs without applying // orly db repair --dry-run # Preview repairs without applying
// orly db import <file.jsonl> # Import events directly from JSONL file
// orly acl --driver=follows # Run follows ACL server // orly acl --driver=follows # Run follows ACL server
// orly sync --driver=negentropy # Run negentropy sync service // orly sync --driver=negentropy # Run negentropy sync service
// orly launcher # Run process supervisor // orly launcher # Run process supervisor

6
main.go

@ -11,6 +11,9 @@ import (
"sync" "sync"
"time" "time"
"git.mleku.dev/mleku/nostr/crypto/keys"
"git.mleku.dev/mleku/nostr/encoders/bech32encoding"
"git.mleku.dev/mleku/nostr/encoders/hex"
"github.com/adrg/xdg" "github.com/adrg/xdg"
"golang.org/x/term" "golang.org/x/term"
"lol.mleku.dev/chk" "lol.mleku.dev/chk"
@ -18,10 +21,7 @@ import (
"next.orly.dev/app" "next.orly.dev/app"
"next.orly.dev/app/branding" "next.orly.dev/app/branding"
"next.orly.dev/app/config" "next.orly.dev/app/config"
"git.mleku.dev/mleku/nostr/crypto/keys"
"git.mleku.dev/mleku/nostr/encoders/bech32encoding"
"next.orly.dev/pkg/database" "next.orly.dev/pkg/database"
"git.mleku.dev/mleku/nostr/encoders/hex"
"next.orly.dev/pkg/relay" "next.orly.dev/pkg/relay"
"next.orly.dev/pkg/version" "next.orly.dev/pkg/version"
) )

Loading…
Cancel
Save