You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
484 lines
14 KiB
484 lines
14 KiB
//go:build !(js && wasm) |
|
|
|
// Package sync implements the "orly sync" subcommand for sync service operations. |
|
// Supports both one-shot CLI sync (like strfry sync) and running as a gRPC service. |
|
// |
|
// Two modes of operation: |
|
// 1. Direct mode: Opens database directly (like strfry sync) |
|
// 2. Client mode: Connects to running orly-sync-negentropy service via gRPC |
|
// |
|
// Client mode allows you to use a running ORLY relay's database without |
|
// needing direct filesystem access to the database files. |
|
package sync |
|
|
|
import ( |
|
"context" |
|
"encoding/json" |
|
"fmt" |
|
"os" |
|
"os/signal" |
|
"strings" |
|
"syscall" |
|
"time" |
|
|
|
"lol.mleku.dev/log" |
|
|
|
"git.mleku.dev/mleku/nostr/encoders/filter" |
|
"git.mleku.dev/mleku/nostr/encoders/kind" |
|
"git.mleku.dev/mleku/nostr/encoders/timestamp" |
|
"next.orly.dev/pkg/database" |
|
pkgsync "next.orly.dev/pkg/sync" |
|
"next.orly.dev/pkg/sync/negentropy" |
|
negentropygrpc "next.orly.dev/pkg/sync/negentropy/grpc" |
|
commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1" |
|
) |
|
|
|
// SyncDirection specifies which direction to sync |
|
type SyncDirection string |
|
|
|
const ( |
|
DirDown SyncDirection = "down" // Download from remote to local |
|
DirUp SyncDirection = "up" // Upload from local to remote |
|
DirBoth SyncDirection = "both" // Bidirectional sync |
|
) |
|
|
|
// CLISyncConfig holds configuration for one-shot CLI sync |
|
type CLISyncConfig struct { |
|
RelayURL string |
|
Filter *filter.F |
|
Direction SyncDirection |
|
DataDir string |
|
Verbose bool |
|
// Client mode: connect to running service instead of direct DB access |
|
ServerAddress string // gRPC address of orly-sync-negentropy service |
|
UseClientMode bool // If true, use gRPC client instead of direct DB |
|
} |
|
|
|
// Run executes the sync subcommand. |
|
func Run(args []string) { |
|
var driver string |
|
var listDrivers bool |
|
var showHelp bool |
|
var relayURL string |
|
var filterJSON string |
|
var direction string = "down" |
|
var dataDir string |
|
var verbose bool |
|
var serverAddress string |
|
|
|
// Parse arguments - look for either service mode (--driver) or CLI mode (relay URL) |
|
positionalArgs := []string{} |
|
for i := 0; i < len(args); i++ { |
|
arg := args[i] |
|
|
|
if strings.HasPrefix(arg, "--driver=") { |
|
driver = strings.TrimPrefix(arg, "--driver=") |
|
} else if arg == "--driver" && i+1 < len(args) { |
|
driver = args[i+1] |
|
i++ |
|
} else if strings.HasPrefix(arg, "--filter=") { |
|
filterJSON = strings.TrimPrefix(arg, "--filter=") |
|
} else if arg == "--filter" && i+1 < len(args) { |
|
filterJSON = args[i+1] |
|
i++ |
|
} else if strings.HasPrefix(arg, "--dir=") { |
|
direction = strings.TrimPrefix(arg, "--dir=") |
|
} else if arg == "--dir" && i+1 < len(args) { |
|
direction = args[i+1] |
|
i++ |
|
} else if strings.HasPrefix(arg, "--data-dir=") { |
|
dataDir = strings.TrimPrefix(arg, "--data-dir=") |
|
} else if arg == "--data-dir" && i+1 < len(args) { |
|
dataDir = args[i+1] |
|
i++ |
|
} else if strings.HasPrefix(arg, "--server=") { |
|
serverAddress = strings.TrimPrefix(arg, "--server=") |
|
} else if arg == "--server" && i+1 < len(args) { |
|
serverAddress = args[i+1] |
|
i++ |
|
} else if arg == "--list-drivers" || arg == "-l" { |
|
listDrivers = true |
|
} else if arg == "--verbose" || arg == "-v" { |
|
verbose = true |
|
} else if arg == "--help" || arg == "-h" { |
|
showHelp = true |
|
} else if !strings.HasPrefix(arg, "-") { |
|
positionalArgs = append(positionalArgs, arg) |
|
} |
|
} |
|
|
|
if showHelp { |
|
printSyncHelp() |
|
return |
|
} |
|
|
|
if listDrivers { |
|
drivers := pkgsync.ListDriversWithInfo() |
|
if len(drivers) == 0 { |
|
fmt.Println("No sync drivers available.") |
|
fmt.Println("Build with appropriate tags to include drivers.") |
|
return |
|
} |
|
fmt.Println("Available sync drivers:") |
|
for _, d := range drivers { |
|
fmt.Printf(" %-12s - %s\n", d.Name, d.Description) |
|
} |
|
return |
|
} |
|
|
|
// Check if this is CLI sync mode (relay URL provided) |
|
if len(positionalArgs) > 0 && (strings.HasPrefix(positionalArgs[0], "ws://") || strings.HasPrefix(positionalArgs[0], "wss://")) { |
|
relayURL = positionalArgs[0] |
|
cfg := &CLISyncConfig{ |
|
RelayURL: relayURL, |
|
Filter: parseFilterJSON(filterJSON), |
|
Direction: SyncDirection(direction), |
|
DataDir: dataDir, |
|
Verbose: verbose, |
|
ServerAddress: serverAddress, |
|
UseClientMode: serverAddress != "", |
|
} |
|
if cfg.UseClientMode { |
|
runClientModeSync(cfg) |
|
} else { |
|
runDirectModeSync(cfg) |
|
} |
|
return |
|
} |
|
|
|
// Service mode |
|
if driver == "" { |
|
// Check if any driver is registered |
|
drivers := pkgsync.ListDrivers() |
|
if len(drivers) == 0 { |
|
fmt.Fprintln(os.Stderr, "error: no sync drivers available") |
|
os.Exit(1) |
|
} |
|
if len(drivers) == 1 { |
|
// Use the only available driver |
|
driver = drivers[0] |
|
log.I.F("using default sync driver: %s", driver) |
|
} else { |
|
fmt.Fprintln(os.Stderr, "error: --driver required (multiple drivers available)") |
|
fmt.Fprintf(os.Stderr, "available: %s\n", strings.Join(drivers, ", ")) |
|
os.Exit(1) |
|
} |
|
} |
|
|
|
// Check if driver is available |
|
if !pkgsync.HasDriver(driver) { |
|
fmt.Fprintf(os.Stderr, "error: sync driver %q not available\n", driver) |
|
fmt.Fprintf(os.Stderr, "available: %s\n", strings.Join(pkgsync.ListDrivers(), ", ")) |
|
os.Exit(1) |
|
} |
|
|
|
runSyncService(driver, args) |
|
} |
|
|
|
// parseFilterJSON parses a JSON filter string into a filter.F |
|
func parseFilterJSON(jsonStr string) *filter.F { |
|
if jsonStr == "" { |
|
return nil |
|
} |
|
|
|
// Parse as generic JSON first to handle the kinds array |
|
var rawFilter struct { |
|
Kinds []int `json:"kinds"` |
|
Authors []string `json:"authors"` |
|
IDs []string `json:"ids"` |
|
Since *int64 `json:"since"` |
|
Until *int64 `json:"until"` |
|
Limit *uint `json:"limit"` |
|
} |
|
|
|
if err := json.Unmarshal([]byte(jsonStr), &rawFilter); err != nil { |
|
fmt.Fprintf(os.Stderr, "warning: failed to parse filter JSON: %v\n", err) |
|
return nil |
|
} |
|
|
|
f := &filter.F{} |
|
|
|
// Convert kinds using kind.FromIntSlice |
|
if len(rawFilter.Kinds) > 0 { |
|
f.Kinds = kind.FromIntSlice(rawFilter.Kinds) |
|
} |
|
|
|
// Convert timestamps |
|
if rawFilter.Since != nil { |
|
f.Since = ×tamp.T{V: *rawFilter.Since} |
|
} |
|
if rawFilter.Until != nil { |
|
f.Until = ×tamp.T{V: *rawFilter.Until} |
|
} |
|
|
|
// Convert limit |
|
if rawFilter.Limit != nil { |
|
f.Limit = rawFilter.Limit |
|
} |
|
|
|
return f |
|
} |
|
|
|
// runDirectModeSync performs a one-shot negentropy sync with a remote relay |
|
// using direct database access (like strfry sync) |
|
func runDirectModeSync(cfg *CLISyncConfig) { |
|
if cfg.Verbose { |
|
log.I.F("CLI sync starting with %s (direction: %s)", cfg.RelayURL, cfg.Direction) |
|
} |
|
|
|
// Set up signal handling |
|
ctx, cancel := context.WithCancel(context.Background()) |
|
defer cancel() |
|
|
|
sigChan := make(chan os.Signal, 1) |
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) |
|
go func() { |
|
<-sigChan |
|
fmt.Println("\nInterrupted, closing...") |
|
cancel() |
|
}() |
|
|
|
// Determine data directory |
|
dataDir := cfg.DataDir |
|
if dataDir == "" { |
|
dataDir = os.Getenv("ORLY_DATA_DIR") |
|
if dataDir == "" { |
|
homeDir, _ := os.UserHomeDir() |
|
dataDir = homeDir + "/.local/share/ORLY" |
|
} |
|
} |
|
|
|
// Open database using the factory |
|
dbCfg := &database.DatabaseConfig{ |
|
DataDir: dataDir, |
|
LogLevel: "info", |
|
} |
|
|
|
db, err := database.NewDatabaseWithConfig(ctx, cancel, "badger", dbCfg) |
|
if err != nil { |
|
fmt.Fprintf(os.Stderr, "error: failed to open database: %v\n", err) |
|
os.Exit(1) |
|
} |
|
defer db.Close() |
|
|
|
// Create negentropy manager for sync |
|
mgrCfg := &negentropy.Config{ |
|
Peers: []string{cfg.RelayURL}, |
|
SyncInterval: 0, // One-shot, no interval |
|
FrameSize: 128 * 1024, |
|
IDSize: 16, |
|
Filter: cfg.Filter, |
|
} |
|
|
|
mgr := negentropy.NewManager(db, mgrCfg) |
|
|
|
// Perform sync |
|
startTime := time.Now() |
|
fmt.Printf("Syncing with %s...\n", cfg.RelayURL) |
|
|
|
// For direction handling, we'll use the existing sync which does bidirectional |
|
// The --dir flag will be used to filter what we actually store |
|
mgr.TriggerSync(ctx, cfg.RelayURL) |
|
|
|
elapsed := time.Since(startTime) |
|
state, _ := mgr.GetPeerState(cfg.RelayURL) |
|
if state != nil { |
|
if state.LastError != "" { |
|
fmt.Fprintf(os.Stderr, "Sync error: %s\n", state.LastError) |
|
os.Exit(1) |
|
} |
|
fmt.Printf("Sync complete: %d events synced in %v\n", state.EventsSynced, elapsed) |
|
} else { |
|
fmt.Printf("Sync complete in %v\n", elapsed) |
|
} |
|
} |
|
|
|
// runClientModeSync performs a one-shot negentropy sync with a remote relay |
|
// by connecting to a running orly-sync-negentropy service via gRPC. |
|
// This allows you to sync using a relay's database without direct filesystem access. |
|
func runClientModeSync(cfg *CLISyncConfig) { |
|
if cfg.Verbose { |
|
log.I.F("Client mode sync starting with %s via server %s (direction: %s)", |
|
cfg.RelayURL, cfg.ServerAddress, cfg.Direction) |
|
} |
|
|
|
// Set up signal handling |
|
ctx, cancel := context.WithCancel(context.Background()) |
|
defer cancel() |
|
|
|
sigChan := make(chan os.Signal, 1) |
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) |
|
go func() { |
|
<-sigChan |
|
fmt.Println("\nInterrupted, closing...") |
|
cancel() |
|
}() |
|
|
|
// Connect to the sync service via gRPC |
|
fmt.Printf("Connecting to sync service at %s...\n", cfg.ServerAddress) |
|
client, err := negentropygrpc.New(ctx, &negentropygrpc.ClientConfig{ |
|
ServerAddress: cfg.ServerAddress, |
|
ConnectTimeout: 30 * time.Second, |
|
}) |
|
if err != nil { |
|
fmt.Fprintf(os.Stderr, "error: failed to connect to sync service: %v\n", err) |
|
os.Exit(1) |
|
} |
|
defer client.Close() |
|
|
|
// Wait for the service to be ready |
|
select { |
|
case <-client.Ready(): |
|
fmt.Println("Connected to sync service") |
|
case <-time.After(30 * time.Second): |
|
fmt.Fprintf(os.Stderr, "error: sync service not ready\n") |
|
os.Exit(1) |
|
} |
|
|
|
// Convert filter to proto format |
|
var protoFilter *commonv1.Filter |
|
if cfg.Filter != nil { |
|
protoFilter = filterToProto(cfg.Filter) |
|
} |
|
|
|
// Start sync with the peer |
|
startTime := time.Now() |
|
fmt.Printf("Requesting sync with %s...\n", cfg.RelayURL) |
|
|
|
progressCh, err := client.SyncWithPeer(ctx, cfg.RelayURL, protoFilter, 0) |
|
if err != nil { |
|
fmt.Fprintf(os.Stderr, "error: failed to start sync: %v\n", err) |
|
os.Exit(1) |
|
} |
|
|
|
// Display progress updates |
|
var lastProgress *negentropygrpc.SyncProgress |
|
for progress := range progressCh { |
|
lastProgress = progress |
|
if progress.Error != "" { |
|
fmt.Fprintf(os.Stderr, "Sync error: %s\n", progress.Error) |
|
os.Exit(1) |
|
} |
|
if cfg.Verbose || progress.Complete { |
|
fmt.Printf("Round %d: have=%d need=%d fetched=%d sent=%d\n", |
|
progress.Round, progress.HaveCount, progress.NeedCount, |
|
progress.FetchedCount, progress.SentCount) |
|
} |
|
} |
|
|
|
elapsed := time.Since(startTime) |
|
if lastProgress != nil && lastProgress.Complete { |
|
fmt.Printf("Sync complete in %v\n", elapsed) |
|
} else { |
|
fmt.Printf("Sync finished in %v (may be incomplete)\n", elapsed) |
|
} |
|
} |
|
|
|
// filterToProto converts a nostr filter to proto format |
|
func filterToProto(f *filter.F) *commonv1.Filter { |
|
if f == nil { |
|
return nil |
|
} |
|
|
|
pf := &commonv1.Filter{} |
|
|
|
// Convert kinds |
|
if f.Kinds != nil && f.Kinds.Len() > 0 { |
|
for _, k := range f.Kinds.ToUint16() { |
|
pf.Kinds = append(pf.Kinds, uint32(k)) |
|
} |
|
} |
|
|
|
// Convert authors (binary to bytes) |
|
if f.Authors != nil && f.Authors.Len() > 0 { |
|
for _, author := range f.Authors.T { |
|
pf.Authors = append(pf.Authors, author) |
|
} |
|
} |
|
|
|
// Convert IDs (binary to bytes) |
|
if f.Ids != nil && f.Ids.Len() > 0 { |
|
for _, id := range f.Ids.T { |
|
pf.Ids = append(pf.Ids, id) |
|
} |
|
} |
|
|
|
// Convert timestamps |
|
if f.Since != nil { |
|
since := f.Since.V |
|
pf.Since = &since |
|
} |
|
if f.Until != nil { |
|
until := f.Until.V |
|
pf.Until = &until |
|
} |
|
|
|
// Convert limit |
|
if f.Limit != nil { |
|
limit := uint32(*f.Limit) |
|
pf.Limit = &limit |
|
} |
|
|
|
return pf |
|
} |
|
|
|
func runSyncService(driver string, args []string) { |
|
log.I.F("Sync service with driver=%s not yet implemented via unified binary", driver) |
|
log.I.F("Use the standalone binary: orly-sync-%s", driver) |
|
os.Exit(1) |
|
} |
|
|
|
func printSyncHelp() { |
|
fmt.Println(`orly sync - Sync operations (NIP-77 negentropy) |
|
|
|
Usage: |
|
orly sync <relay_url> [options] One-shot sync with relay |
|
orly sync <relay_url> --server=ADDR Sync via running service |
|
orly sync --driver=NAME [options] Run as sync service |
|
|
|
One-shot sync options: |
|
--filter=JSON Nostr filter JSON (e.g. '{"kinds": [0, 3, 1984]}') |
|
--dir=DIRECTION Sync direction: down, up, both (default: down) |
|
--data-dir=PATH Database directory (default: ~/.local/share/ORLY) |
|
--server=ADDR Connect to running sync service (e.g. 127.0.0.1:50064) |
|
--verbose, -v Verbose output |
|
|
|
Service mode options: |
|
--driver=NAME Select sync driver (negentropy, cluster, distributed) |
|
--list-drivers List available sync drivers |
|
|
|
Common options: |
|
--help, -h Show this help message |
|
|
|
Drivers: |
|
negentropy NIP-77 negentropy set reconciliation |
|
cluster Cluster-based synchronization |
|
distributed Distributed synchronization |
|
|
|
Environment variables: |
|
ORLY_DATA_DIR Database data directory |
|
ORLY_SYNC_LISTEN gRPC server listen address |
|
ORLY_SYNC_LOG_LEVEL Logging level |
|
ORLY_SYNC_DB_TYPE Database type (grpc or badger) |
|
ORLY_SYNC_TARGET_RELAYS Comma-separated target relay URLs |
|
|
|
Examples: |
|
# One-shot sync with local database (like strfry sync) |
|
orly sync wss://relay.example.com --filter '{"kinds": [0, 3, 1984]}' --dir down |
|
orly sync wss://wot.grapevine.network --filter '{"kinds": [3, 1984, 10000]}' --dir down |
|
|
|
# Sync via running ORLY service (no direct DB access needed) |
|
orly sync wss://relay.example.com --server 127.0.0.1:50064 --filter '{"kinds": [1]}' --dir both |
|
|
|
# Service mode |
|
orly sync --driver=negentropy Run negentropy sync service |
|
orly sync --list-drivers List available drivers |
|
|
|
Client mode (--server): |
|
When using --server, the CLI connects to a running orly-sync-negentropy |
|
service via gRPC instead of opening the database directly. This allows |
|
you to sync from a remote machine without filesystem access to the DB. |
|
|
|
The server address is typically 127.0.0.1:50064 (or as configured in |
|
ORLY_LAUNCHER_SYNC_NEGENTROPY_LISTEN).`) |
|
}
|
|
|