You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

304 lines
11 KiB

//go:build !(js && wasm)
// Package database provides shared import utilities for events
package database
import (
"bufio"
"context"
"fmt"
"io"
"os"
"runtime/debug"
"strings"
"time"
"git.mleku.dev/mleku/nostr/encoders/event"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
)
const maxLen = 500000000
// ImportEventsFromReader imports events from an io.Reader containing JSONL data
func (d *D) ImportEventsFromReader(ctx context.Context, rr io.Reader) error {
startTime := time.Now()
log.I.F("================================================")
log.I.F("[HTTP API IMPORT] Starting import operation")
log.I.F("[HTTP API IMPORT] State: Initializing - preparing to receive file data")
log.I.F("================================================")
// store to disk so we can return fast
tmpPath := os.TempDir() + string(os.PathSeparator) + "orly"
os.MkdirAll(tmpPath, 0700)
tmp, err := os.CreateTemp(tmpPath, "")
if chk.E(err) {
log.E.F("[HTTP API IMPORT] ERROR: Failed to create temp file: %v", err)
log.E.F("[HTTP API IMPORT] Action: Check disk space and temp directory permissions")
return err
}
defer os.Remove(tmp.Name()) // Clean up temp file when done
log.I.F("[HTTP API IMPORT] State: Buffering - receiving file data from client")
log.I.F("[HTTP API IMPORT] Temp file: %s", tmp.Name())
bufferStart := time.Now()
bytesBuffered, err := io.Copy(tmp, rr)
if chk.E(err) {
log.E.F("[HTTP API IMPORT] ERROR: Failed to buffer file data: %v", err)
log.E.F("[HTTP API IMPORT] Action: Check network connection and file integrity")
return err
}
bufferElapsed := time.Since(bufferStart)
fileSizeMB := float64(bytesBuffered) / 1024 / 1024
bufferSpeedMBps := fileSizeMB / bufferElapsed.Seconds()
log.I.F("[HTTP API IMPORT] State: Buffering complete")
log.I.F("[HTTP API IMPORT] File size: %.2f MB | Buffer time: %v | Speed: %.2f MB/sec",
fileSizeMB, bufferElapsed.Round(time.Millisecond), bufferSpeedMBps)
if _, err = tmp.Seek(0, 0); chk.E(err) {
log.E.F("[HTTP API IMPORT] ERROR: Failed to rewind temp file: %v", err)
return err
}
// Estimate number of events (rough estimate: ~500 bytes per event average)
estimatedEvents := int(float64(bytesBuffered) / 500)
log.I.F("[HTTP API IMPORT] State: Processing - reading and importing events")
log.I.F("[HTTP API IMPORT] Estimated events: ~%d (based on file size)", estimatedEvents)
log.I.F("[HTTP API IMPORT] Progress updates will appear every 5 seconds")
log.I.F("================================================")
processErr := d.processJSONLEvents(ctx, tmp, bytesBuffered)
totalElapsed := time.Since(startTime)
log.I.F("================================================")
log.I.F("[HTTP API IMPORT] State: Completed")
log.I.F("[HTTP API IMPORT] Total time: %v", totalElapsed.Round(time.Millisecond))
if processErr != nil {
log.E.F("[HTTP API IMPORT] Result: FAILED")
log.E.F("[HTTP API IMPORT] Error: %v", processErr)
log.E.F("[HTTP API IMPORT] Action: Check error details above, verify file format is valid JSONL")
log.I.F("================================================")
} else {
log.I.F("[HTTP API IMPORT] Result: SUCCESS")
log.I.F("[HTTP API IMPORT] All events have been imported and are now available on the relay")
log.I.F("================================================")
}
return processErr
}
// ImportEventsFromStrings imports events from a slice of JSON strings with policy filtering
func (d *D) ImportEventsFromStrings(ctx context.Context, eventJSONs []string, policyManager interface {
CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error)
}) error {
// Create a reader from the string slice
reader := strings.NewReader(strings.Join(eventJSONs, "\n"))
// Estimate total bytes (rough estimate)
totalBytes := int64(len(strings.Join(eventJSONs, "\n")))
return d.processJSONLEventsWithPolicy(ctx, reader, policyManager, totalBytes)
}
// processJSONLEvents processes JSONL events from a reader
func (d *D) processJSONLEvents(ctx context.Context, rr io.Reader, totalBytes int64) error {
return d.processJSONLEventsWithPolicy(ctx, rr, nil, totalBytes)
}
// processJSONLEventsWithPolicy processes JSONL events from a reader with optional policy filtering
func (d *D) processJSONLEventsWithPolicy(ctx context.Context, rr io.Reader, policyManager interface {
CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error)
}, totalBytes int64) error {
// Create a scanner to read the buffer line by line
scan := bufio.NewScanner(rr)
scanBuf := make([]byte, maxLen)
scan.Buffer(scanBuf, maxLen)
// Performance tracking
startTime := time.Now()
lastLogTime := startTime
const logInterval = 5 * time.Second
var count, bytesRead, skipped, policyRejected, unmarshalErrors, saveErrors int
var kind3Count, largeEventCount int // Track noisy event types
// Track bytes read for progress percentage
bytesReadTotal := int64(0)
for scan.Scan() {
select {
case <-ctx.Done():
log.I.F("[HTTP API IMPORT] State: Cancelled")
log.I.F("[HTTP API IMPORT] Processed %d events before cancellation", count)
return ctx.Err()
default:
}
b := scan.Bytes()
lineLen := len(b) + 1 // +1 for newline
bytesReadTotal += int64(lineLen)
bytesRead += lineLen
if len(b) < 1 {
skipped++
continue
}
ev := event.New()
if _, err := ev.Unmarshal(b); err != nil {
// return the pooled buffer on error
ev.Free()
unmarshalErrors++
// Only log unmarshal errors if they're frequent (not spam)
if unmarshalErrors <= 10 || unmarshalErrors%100 == 0 {
log.W.F("[HTTP API IMPORT] Failed to unmarshal event #%d: %v", unmarshalErrors, err)
}
continue
}
// Track event kinds for summary (but don't log each one)
if ev.Kind == 3 {
kind3Count++
}
if len(b) > 100000 { // Events larger than 100KB
largeEventCount++
}
// Apply policy checking if policy manager is provided
if policyManager != nil {
// For sync imports, we treat events as coming from system/trusted source
// Use nil pubkey and empty remote to indicate system-level import
allowed, policyErr := policyManager.CheckPolicy("write", ev, nil, "")
if policyErr != nil {
ev.Free()
policyRejected++
// Only log policy errors if they're frequent (not spam)
if policyRejected <= 10 || policyRejected%100 == 0 {
log.W.F("[HTTP API IMPORT] Policy check failed for event %x: %v", ev.ID, policyErr)
}
continue
}
if !allowed {
ev.Free()
policyRejected++
// Don't log individual policy rejections (too noisy)
continue
}
}
// Apply rate limiting before write operation if limiter is configured
if d.rateLimiter != nil && d.rateLimiter.IsEnabled() {
d.rateLimiter.Wait(ctx, WriteOpType)
}
if _, err := d.SaveEvent(ctx, ev); err != nil {
// return the pooled buffer on error paths too
ev.Free()
saveErrors++
// Only log save errors if they're frequent (not spam)
if saveErrors <= 10 || saveErrors%100 == 0 {
log.W.F("[HTTP API IMPORT] Failed to save event #%d: %v", saveErrors, err)
}
continue
}
// return the pooled buffer after successful save
ev.Free()
b = nil
count++
// Progress logging every logInterval with detailed information
if time.Since(lastLogTime) >= logInterval {
elapsed := time.Since(startTime)
eventsPerSec := float64(count) / elapsed.Seconds()
mbRead := float64(bytesRead) / 1024 / 1024
mbPerSec := mbRead / elapsed.Seconds()
// Calculate progress percentage if we have total bytes
progressPercent := ""
eta := ""
if totalBytes > 0 {
percent := float64(bytesReadTotal) / float64(totalBytes) * 100
if percent > 100 {
percent = 100
}
progressPercent = fmt.Sprintf(" | Progress: %.1f%%", percent)
// Estimate time remaining
if percent > 0 && percent < 100 && eventsPerSec > 0 {
remainingBytes := totalBytes - bytesReadTotal
remainingMB := float64(remainingBytes) / 1024 / 1024
if mbPerSec > 0 {
remainingSeconds := remainingMB / mbPerSec
eta = fmt.Sprintf(" | ETA: ~%v", time.Duration(remainingSeconds)*time.Second)
}
}
}
log.I.F("[HTTP API IMPORT] Progress: %d events saved | %.2f MB read | %.0f events/sec | %.2f MB/sec%s%s",
count, mbRead, eventsPerSec, mbPerSec, progressPercent, eta)
// Show error summary if there are errors
if unmarshalErrors > 0 || saveErrors > 0 || policyRejected > 0 {
log.W.F("[HTTP API IMPORT] Errors so far: %d unmarshal, %d save, %d policy rejected",
unmarshalErrors, saveErrors, policyRejected)
}
lastLogTime = time.Now()
debug.FreeOSMemory()
}
}
// Final summary with actionable information
elapsed := time.Since(startTime)
eventsPerSec := float64(count) / elapsed.Seconds()
mbRead := float64(bytesRead) / 1024 / 1024
mbPerSec := mbRead / elapsed.Seconds()
log.I.F("[HTTP API IMPORT] ================================================")
log.I.F("[HTTP API IMPORT] Processing complete")
log.I.F("[HTTP API IMPORT] Events saved: %d | Data processed: %.2f MB | Time: %v",
count, mbRead, elapsed.Round(time.Millisecond))
log.I.F("[HTTP API IMPORT] Performance: %.0f events/sec | %.2f MB/sec", eventsPerSec, mbPerSec)
// Show statistics
if kind3Count > 0 || largeEventCount > 0 {
log.I.F("[HTTP API IMPORT] Event types: %d kind-3 (follow lists), %d large events (>100KB)",
kind3Count, largeEventCount)
}
// Show error summary with actionable information
hasErrors := false
if unmarshalErrors > 0 {
log.W.F("[HTTP API IMPORT] WARNING: %d events failed to unmarshal", unmarshalErrors)
log.W.F("[HTTP API IMPORT] Action: Check if file format is valid JSONL (one JSON object per line)")
hasErrors = true
}
if saveErrors > 0 {
log.W.F("[HTTP API IMPORT] WARNING: %d events failed to save", saveErrors)
log.W.F("[HTTP API IMPORT] Action: Check database disk space and permissions")
hasErrors = true
}
if policyRejected > 0 {
log.W.F("[HTTP API IMPORT] WARNING: %d events rejected by policy", policyRejected)
log.W.F("[HTTP API IMPORT] Action: Review ACL/policy settings if this is unexpected")
hasErrors = true
}
if skipped > 0 {
log.I.F("[HTTP API IMPORT] Info: %d empty lines skipped (this is normal)", skipped)
}
if !hasErrors {
log.I.F("[HTTP API IMPORT] Result: All events imported successfully")
log.I.F("[HTTP API IMPORT] Action: Events are now available on the relay")
} else {
log.W.F("[HTTP API IMPORT] Result: Import completed with errors (see warnings above)")
log.W.F("[HTTP API IMPORT] Action: Review error messages and take appropriate action")
}
log.I.F("[HTTP API IMPORT] ================================================")
if err := scan.Err(); err != nil {
log.E.F("[HTTP API IMPORT] ERROR: Scanner error: %v", err)
log.E.F("[HTTP API IMPORT] Action: Check file integrity and format")
return err
}
return nil
}