You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
304 lines
11 KiB
304 lines
11 KiB
//go:build !(js && wasm) |
|
|
|
// Package database provides shared import utilities for events |
|
package database |
|
|
|
import ( |
|
"bufio" |
|
"context" |
|
"fmt" |
|
"io" |
|
"os" |
|
"runtime/debug" |
|
"strings" |
|
"time" |
|
|
|
"git.mleku.dev/mleku/nostr/encoders/event" |
|
"lol.mleku.dev/chk" |
|
"lol.mleku.dev/log" |
|
) |
|
|
|
const maxLen = 500000000 |
|
|
|
// ImportEventsFromReader imports events from an io.Reader containing JSONL data |
|
func (d *D) ImportEventsFromReader(ctx context.Context, rr io.Reader) error { |
|
startTime := time.Now() |
|
log.I.F("================================================") |
|
log.I.F("[HTTP API IMPORT] Starting import operation") |
|
log.I.F("[HTTP API IMPORT] State: Initializing - preparing to receive file data") |
|
log.I.F("================================================") |
|
|
|
// store to disk so we can return fast |
|
tmpPath := os.TempDir() + string(os.PathSeparator) + "orly" |
|
os.MkdirAll(tmpPath, 0700) |
|
tmp, err := os.CreateTemp(tmpPath, "") |
|
if chk.E(err) { |
|
log.E.F("[HTTP API IMPORT] ERROR: Failed to create temp file: %v", err) |
|
log.E.F("[HTTP API IMPORT] Action: Check disk space and temp directory permissions") |
|
return err |
|
} |
|
defer os.Remove(tmp.Name()) // Clean up temp file when done |
|
|
|
log.I.F("[HTTP API IMPORT] State: Buffering - receiving file data from client") |
|
log.I.F("[HTTP API IMPORT] Temp file: %s", tmp.Name()) |
|
bufferStart := time.Now() |
|
bytesBuffered, err := io.Copy(tmp, rr) |
|
if chk.E(err) { |
|
log.E.F("[HTTP API IMPORT] ERROR: Failed to buffer file data: %v", err) |
|
log.E.F("[HTTP API IMPORT] Action: Check network connection and file integrity") |
|
return err |
|
} |
|
bufferElapsed := time.Since(bufferStart) |
|
fileSizeMB := float64(bytesBuffered) / 1024 / 1024 |
|
bufferSpeedMBps := fileSizeMB / bufferElapsed.Seconds() |
|
log.I.F("[HTTP API IMPORT] State: Buffering complete") |
|
log.I.F("[HTTP API IMPORT] File size: %.2f MB | Buffer time: %v | Speed: %.2f MB/sec", |
|
fileSizeMB, bufferElapsed.Round(time.Millisecond), bufferSpeedMBps) |
|
|
|
if _, err = tmp.Seek(0, 0); chk.E(err) { |
|
log.E.F("[HTTP API IMPORT] ERROR: Failed to rewind temp file: %v", err) |
|
return err |
|
} |
|
|
|
// Estimate number of events (rough estimate: ~500 bytes per event average) |
|
estimatedEvents := int(float64(bytesBuffered) / 500) |
|
log.I.F("[HTTP API IMPORT] State: Processing - reading and importing events") |
|
log.I.F("[HTTP API IMPORT] Estimated events: ~%d (based on file size)", estimatedEvents) |
|
log.I.F("[HTTP API IMPORT] Progress updates will appear every 5 seconds") |
|
log.I.F("================================================") |
|
processErr := d.processJSONLEvents(ctx, tmp, bytesBuffered) |
|
|
|
totalElapsed := time.Since(startTime) |
|
log.I.F("================================================") |
|
log.I.F("[HTTP API IMPORT] State: Completed") |
|
log.I.F("[HTTP API IMPORT] Total time: %v", totalElapsed.Round(time.Millisecond)) |
|
if processErr != nil { |
|
log.E.F("[HTTP API IMPORT] Result: FAILED") |
|
log.E.F("[HTTP API IMPORT] Error: %v", processErr) |
|
log.E.F("[HTTP API IMPORT] Action: Check error details above, verify file format is valid JSONL") |
|
log.I.F("================================================") |
|
} else { |
|
log.I.F("[HTTP API IMPORT] Result: SUCCESS") |
|
log.I.F("[HTTP API IMPORT] All events have been imported and are now available on the relay") |
|
log.I.F("================================================") |
|
} |
|
|
|
return processErr |
|
} |
|
|
|
// ImportEventsFromStrings imports events from a slice of JSON strings with policy filtering |
|
func (d *D) ImportEventsFromStrings(ctx context.Context, eventJSONs []string, policyManager interface { |
|
CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error) |
|
}) error { |
|
// Create a reader from the string slice |
|
reader := strings.NewReader(strings.Join(eventJSONs, "\n")) |
|
// Estimate total bytes (rough estimate) |
|
totalBytes := int64(len(strings.Join(eventJSONs, "\n"))) |
|
return d.processJSONLEventsWithPolicy(ctx, reader, policyManager, totalBytes) |
|
} |
|
|
|
// processJSONLEvents processes JSONL events from a reader |
|
func (d *D) processJSONLEvents(ctx context.Context, rr io.Reader, totalBytes int64) error { |
|
return d.processJSONLEventsWithPolicy(ctx, rr, nil, totalBytes) |
|
} |
|
|
|
// processJSONLEventsWithPolicy processes JSONL events from a reader with optional policy filtering |
|
func (d *D) processJSONLEventsWithPolicy(ctx context.Context, rr io.Reader, policyManager interface { |
|
CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error) |
|
}, totalBytes int64) error { |
|
// Create a scanner to read the buffer line by line |
|
scan := bufio.NewScanner(rr) |
|
scanBuf := make([]byte, maxLen) |
|
scan.Buffer(scanBuf, maxLen) |
|
|
|
// Performance tracking |
|
startTime := time.Now() |
|
lastLogTime := startTime |
|
const logInterval = 5 * time.Second |
|
|
|
var count, bytesRead, skipped, policyRejected, unmarshalErrors, saveErrors int |
|
var kind3Count, largeEventCount int // Track noisy event types |
|
|
|
// Track bytes read for progress percentage |
|
bytesReadTotal := int64(0) |
|
|
|
for scan.Scan() { |
|
select { |
|
case <-ctx.Done(): |
|
log.I.F("[HTTP API IMPORT] State: Cancelled") |
|
log.I.F("[HTTP API IMPORT] Processed %d events before cancellation", count) |
|
return ctx.Err() |
|
default: |
|
} |
|
|
|
b := scan.Bytes() |
|
lineLen := len(b) + 1 // +1 for newline |
|
bytesReadTotal += int64(lineLen) |
|
bytesRead += lineLen |
|
|
|
if len(b) < 1 { |
|
skipped++ |
|
continue |
|
} |
|
|
|
ev := event.New() |
|
if _, err := ev.Unmarshal(b); err != nil { |
|
// return the pooled buffer on error |
|
ev.Free() |
|
unmarshalErrors++ |
|
// Only log unmarshal errors if they're frequent (not spam) |
|
if unmarshalErrors <= 10 || unmarshalErrors%100 == 0 { |
|
log.W.F("[HTTP API IMPORT] Failed to unmarshal event #%d: %v", unmarshalErrors, err) |
|
} |
|
continue |
|
} |
|
|
|
// Track event kinds for summary (but don't log each one) |
|
if ev.Kind == 3 { |
|
kind3Count++ |
|
} |
|
if len(b) > 100000 { // Events larger than 100KB |
|
largeEventCount++ |
|
} |
|
|
|
// Apply policy checking if policy manager is provided |
|
if policyManager != nil { |
|
// For sync imports, we treat events as coming from system/trusted source |
|
// Use nil pubkey and empty remote to indicate system-level import |
|
allowed, policyErr := policyManager.CheckPolicy("write", ev, nil, "") |
|
if policyErr != nil { |
|
ev.Free() |
|
policyRejected++ |
|
// Only log policy errors if they're frequent (not spam) |
|
if policyRejected <= 10 || policyRejected%100 == 0 { |
|
log.W.F("[HTTP API IMPORT] Policy check failed for event %x: %v", ev.ID, policyErr) |
|
} |
|
continue |
|
} |
|
if !allowed { |
|
ev.Free() |
|
policyRejected++ |
|
// Don't log individual policy rejections (too noisy) |
|
continue |
|
} |
|
} |
|
|
|
// Apply rate limiting before write operation if limiter is configured |
|
if d.rateLimiter != nil && d.rateLimiter.IsEnabled() { |
|
d.rateLimiter.Wait(ctx, WriteOpType) |
|
} |
|
|
|
if _, err := d.SaveEvent(ctx, ev); err != nil { |
|
// return the pooled buffer on error paths too |
|
ev.Free() |
|
saveErrors++ |
|
// Only log save errors if they're frequent (not spam) |
|
if saveErrors <= 10 || saveErrors%100 == 0 { |
|
log.W.F("[HTTP API IMPORT] Failed to save event #%d: %v", saveErrors, err) |
|
} |
|
continue |
|
} |
|
|
|
// return the pooled buffer after successful save |
|
ev.Free() |
|
b = nil |
|
count++ |
|
|
|
// Progress logging every logInterval with detailed information |
|
if time.Since(lastLogTime) >= logInterval { |
|
elapsed := time.Since(startTime) |
|
eventsPerSec := float64(count) / elapsed.Seconds() |
|
mbRead := float64(bytesRead) / 1024 / 1024 |
|
mbPerSec := mbRead / elapsed.Seconds() |
|
|
|
// Calculate progress percentage if we have total bytes |
|
progressPercent := "" |
|
eta := "" |
|
if totalBytes > 0 { |
|
percent := float64(bytesReadTotal) / float64(totalBytes) * 100 |
|
if percent > 100 { |
|
percent = 100 |
|
} |
|
progressPercent = fmt.Sprintf(" | Progress: %.1f%%", percent) |
|
|
|
// Estimate time remaining |
|
if percent > 0 && percent < 100 && eventsPerSec > 0 { |
|
remainingBytes := totalBytes - bytesReadTotal |
|
remainingMB := float64(remainingBytes) / 1024 / 1024 |
|
if mbPerSec > 0 { |
|
remainingSeconds := remainingMB / mbPerSec |
|
eta = fmt.Sprintf(" | ETA: ~%v", time.Duration(remainingSeconds)*time.Second) |
|
} |
|
} |
|
} |
|
|
|
log.I.F("[HTTP API IMPORT] Progress: %d events saved | %.2f MB read | %.0f events/sec | %.2f MB/sec%s%s", |
|
count, mbRead, eventsPerSec, mbPerSec, progressPercent, eta) |
|
|
|
// Show error summary if there are errors |
|
if unmarshalErrors > 0 || saveErrors > 0 || policyRejected > 0 { |
|
log.W.F("[HTTP API IMPORT] Errors so far: %d unmarshal, %d save, %d policy rejected", |
|
unmarshalErrors, saveErrors, policyRejected) |
|
} |
|
|
|
lastLogTime = time.Now() |
|
debug.FreeOSMemory() |
|
} |
|
} |
|
|
|
// Final summary with actionable information |
|
elapsed := time.Since(startTime) |
|
eventsPerSec := float64(count) / elapsed.Seconds() |
|
mbRead := float64(bytesRead) / 1024 / 1024 |
|
mbPerSec := mbRead / elapsed.Seconds() |
|
|
|
log.I.F("[HTTP API IMPORT] ================================================") |
|
log.I.F("[HTTP API IMPORT] Processing complete") |
|
log.I.F("[HTTP API IMPORT] Events saved: %d | Data processed: %.2f MB | Time: %v", |
|
count, mbRead, elapsed.Round(time.Millisecond)) |
|
log.I.F("[HTTP API IMPORT] Performance: %.0f events/sec | %.2f MB/sec", eventsPerSec, mbPerSec) |
|
|
|
// Show statistics |
|
if kind3Count > 0 || largeEventCount > 0 { |
|
log.I.F("[HTTP API IMPORT] Event types: %d kind-3 (follow lists), %d large events (>100KB)", |
|
kind3Count, largeEventCount) |
|
} |
|
|
|
// Show error summary with actionable information |
|
hasErrors := false |
|
if unmarshalErrors > 0 { |
|
log.W.F("[HTTP API IMPORT] WARNING: %d events failed to unmarshal", unmarshalErrors) |
|
log.W.F("[HTTP API IMPORT] Action: Check if file format is valid JSONL (one JSON object per line)") |
|
hasErrors = true |
|
} |
|
if saveErrors > 0 { |
|
log.W.F("[HTTP API IMPORT] WARNING: %d events failed to save", saveErrors) |
|
log.W.F("[HTTP API IMPORT] Action: Check database disk space and permissions") |
|
hasErrors = true |
|
} |
|
if policyRejected > 0 { |
|
log.W.F("[HTTP API IMPORT] WARNING: %d events rejected by policy", policyRejected) |
|
log.W.F("[HTTP API IMPORT] Action: Review ACL/policy settings if this is unexpected") |
|
hasErrors = true |
|
} |
|
if skipped > 0 { |
|
log.I.F("[HTTP API IMPORT] Info: %d empty lines skipped (this is normal)", skipped) |
|
} |
|
|
|
if !hasErrors { |
|
log.I.F("[HTTP API IMPORT] Result: All events imported successfully") |
|
log.I.F("[HTTP API IMPORT] Action: Events are now available on the relay") |
|
} else { |
|
log.W.F("[HTTP API IMPORT] Result: Import completed with errors (see warnings above)") |
|
log.W.F("[HTTP API IMPORT] Action: Review error messages and take appropriate action") |
|
} |
|
log.I.F("[HTTP API IMPORT] ================================================") |
|
|
|
if err := scan.Err(); err != nil { |
|
log.E.F("[HTTP API IMPORT] ERROR: Scanner error: %v", err) |
|
log.E.F("[HTTP API IMPORT] Action: Check file integrity and format") |
|
return err |
|
} |
|
|
|
return nil |
|
}
|
|
|