diff --git a/pkg/database/get-indexes-for-event.go b/pkg/database/get-indexes-for-event.go index 72757bb..3043d68 100644 --- a/pkg/database/get-indexes-for-event.go +++ b/pkg/database/get-indexes-for-event.go @@ -143,22 +143,26 @@ func GetIndexesForEvent(ev *event.E, serial uint64) ( } } } - kind := new(Uint16) - kind.Set(uint16(ev.Kind)) + kindType := new(Uint16) + kindType.Set(uint16(ev.Kind)) // Kind index - kindIndex := indexes.KindEnc(kind, createdAt, ser) + kindIndex := indexes.KindEnc(kindType, createdAt, ser) if err = appendIndexBytes(&idxs, kindIndex, buf); chk.E(err) { return } // KindPubkey index // Using the correct parameters based on the function signature kindPubkeyIndex := indexes.KindPubkeyEnc( - kind, pubHash, createdAt, ser, + kindType, pubHash, createdAt, ser, ) if err = appendIndexBytes(&idxs, kindPubkeyIndex, buf); chk.E(err) { return } + // NOTE: AddressableEvent index for parameterized replaceable events (kinds 30000-39999) + // is written separately in save-event.go with the serial as value for O(1) direct lookup. + // This enables fast NIP-33 queries by pubkey + kind + d-tag. + // Word token indexes (from content) if len(ev.Content) > 0 { for _, h := range TokenHashes(ev.Content) { diff --git a/pkg/database/query-addressable.go b/pkg/database/query-addressable.go new file mode 100644 index 0000000..5a64e3e --- /dev/null +++ b/pkg/database/query-addressable.go @@ -0,0 +1,149 @@ +//go:build !(js && wasm) + +package database + +import ( + "bytes" + + "github.com/dgraph-io/badger/v4" + "lol.mleku.dev/chk" + "lol.mleku.dev/log" + "next.orly.dev/pkg/database/bufpool" + "next.orly.dev/pkg/database/indexes" + "next.orly.dev/pkg/database/indexes/types" + "git.mleku.dev/mleku/nostr/encoders/filter" + "git.mleku.dev/mleku/nostr/encoders/kind" +) + +// IsAddressableEventQuery checks if a filter matches the NIP-33 addressable event +// query pattern: exactly one kind (30000-39999), one author, and one d-tag. +// This pattern uniquely identifies a single parameterized replaceable event. +func IsAddressableEventQuery(f *filter.F) bool { + // Must have exactly one kind + if f.Kinds == nil || f.Kinds.Len() != 1 { + return false + } + // Kind must be parameterized replaceable (30000-39999) + kindVal := f.Kinds.K[0].K + if !kind.IsParameterizedReplaceable(kindVal) { + return false + } + // Must have exactly one author + if f.Authors == nil || f.Authors.Len() != 1 { + return false + } + // Must have a d-tag filter + if f.Tags == nil { + return false + } + dTagFilter := f.Tags.GetFirst([]byte("#d")) + if dTagFilter == nil || dTagFilter.Len() != 2 { + return false + } + // Must not have IDs filter (would bypass this optimization) + if f.Ids != nil && f.Ids.Len() > 0 { + return false + } + return true +} + +// QueryForAddressableEvent performs a direct O(1) lookup for a NIP-33 parameterized +// replaceable event using the AddressableEvent index. +// Returns the serial if found, nil if not found, or an error. +func (d *D) QueryForAddressableEvent(f *filter.F) (serial *types.Uint40, err error) { + if !IsAddressableEventQuery(f) { + return nil, nil + } + + // Extract components from filter + kindVal := f.Kinds.K[0].K + author := f.Authors.T[0] + dTagFilter := f.Tags.GetFirst([]byte("#d")) + dTagValue := dTagFilter.T[1] + + // Build pubkey hash + pubHash := new(types.PubHash) + if err = pubHash.FromPubkey(author); chk.E(err) { + return nil, err + } + + // Build kind type + kindType := new(types.Uint16) + kindType.Set(kindVal) + + // Build d-tag hash + dTagHash := new(types.Ident) + dTagHash.FromIdent(dTagValue) + + // Build the AddressableEvent index key + aevKey := indexes.AddressableEventEnc(pubHash, kindType, dTagHash) + keyBuf := bufpool.GetSmall() + defer bufpool.PutSmall(keyBuf) + if err = aevKey.MarshalWrite(keyBuf); chk.E(err) { + return nil, err + } + keyBytes := bufpool.CopyBytes(keyBuf) + + // Direct key lookup - O(1) + err = d.View(func(txn *badger.Txn) error { + item, err := txn.Get(keyBytes) + if err == badger.ErrKeyNotFound { + // Not found - this is not an error + return nil + } + if err != nil { + return err + } + + // Read the serial from the value + return item.Value(func(val []byte) error { + if len(val) < 5 { + log.W.F("QueryForAddressableEvent: invalid value length %d", len(val)) + return nil + } + serial = new(types.Uint40) + rdr := bytes.NewReader(val) + if err := serial.UnmarshalRead(rdr); err != nil { + log.W.F("QueryForAddressableEvent: failed to read serial: %v", err) + serial = nil + return nil + } + return nil + }) + }) + + if err != nil { + return nil, err + } + + if serial != nil { + log.T.F("QueryForAddressableEvent: found serial %d for kind=%d author=%x d=%s", + serial.Get(), kindVal, author[:8], string(dTagValue)) + } + + return serial, nil +} + +// BuildAddressableEventKey builds the key for an AddressableEvent index entry. +// This is used by both save-event.go (for writing) and deletion (for cleanup). +func BuildAddressableEventKey(pubkey []byte, eventKind uint16, dTagValue []byte) ([]byte, error) { + pubHash := new(types.PubHash) + if err := pubHash.FromPubkey(pubkey); err != nil { + return nil, err + } + + kindType := new(types.Uint16) + kindType.Set(eventKind) + + dTagHash := new(types.Ident) + dTagHash.FromIdent(dTagValue) + + aevKey := indexes.AddressableEventEnc(pubHash, kindType, dTagHash) + keyBuf := bufpool.GetSmall() + defer bufpool.PutSmall(keyBuf) + if err := aevKey.MarshalWrite(keyBuf); err != nil { + return nil, err + } + + return bufpool.CopyBytes(keyBuf), nil +} diff --git a/pkg/database/query-addressable_test.go b/pkg/database/query-addressable_test.go new file mode 100644 index 0000000..abdcc1e --- /dev/null +++ b/pkg/database/query-addressable_test.go @@ -0,0 +1,326 @@ +//go:build !(js && wasm) + +package database + +import ( + "context" + "os" + "testing" + "time" + + "git.mleku.dev/mleku/nostr/encoders/event" + "git.mleku.dev/mleku/nostr/encoders/filter" + "git.mleku.dev/mleku/nostr/encoders/kind" + "git.mleku.dev/mleku/nostr/encoders/tag" + "git.mleku.dev/mleku/nostr/interfaces/signer/p8k" + "lol.mleku.dev/chk" +) + +func TestIsAddressableEventQuery(t *testing.T) { + // Generate a test keypair + signer := p8k.MustNew() + if err := signer.Generate(); chk.E(err) { + t.Fatal(err) + } + pub := signer.Pub() + + tests := []struct { + name string + filter *filter.F + expected bool + }{ + { + name: "valid NIP-33 query - kind 30000", + filter: &filter.F{ + Kinds: kind.NewS(kind.New(30000)), + Authors: tag.NewFromBytesSlice(pub), + Tags: tag.NewS(tag.NewFromAny("#d", []byte("test-d-tag"))), + }, + expected: true, + }, + { + name: "valid NIP-33 query - kind 30382", + filter: &filter.F{ + Kinds: kind.NewS(kind.New(30382)), + Authors: tag.NewFromBytesSlice(pub), + Tags: tag.NewS(tag.NewFromAny("#d", []byte("some-identifier"))), + }, + expected: true, + }, + { + name: "invalid - kind 1 (not parameterized replaceable)", + filter: &filter.F{ + Kinds: kind.NewS(kind.New(1)), + Authors: tag.NewFromBytesSlice(pub), + Tags: tag.NewS(tag.NewFromAny("#d", []byte("test"))), + }, + expected: false, + }, + { + name: "invalid - kind 10000 (replaceable, not parameterized)", + filter: &filter.F{ + Kinds: kind.NewS(kind.New(10000)), + Authors: tag.NewFromBytesSlice(pub), + Tags: tag.NewS(tag.NewFromAny("#d", []byte("test"))), + }, + expected: false, + }, + { + name: "invalid - missing d-tag", + filter: &filter.F{ + Kinds: kind.NewS(kind.New(30000)), + Authors: tag.NewFromBytesSlice(pub), + }, + expected: false, + }, + { + name: "invalid - multiple kinds", + filter: &filter.F{ + Kinds: kind.NewS(kind.New(30000), kind.New(30001)), + Authors: tag.NewFromBytesSlice(pub), + Tags: tag.NewS(tag.NewFromAny("#d", []byte("test"))), + }, + expected: false, + }, + { + name: "invalid - no authors", + filter: &filter.F{ + Kinds: kind.NewS(kind.New(30000)), + Tags: tag.NewS(tag.NewFromAny("#d", []byte("test"))), + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := IsAddressableEventQuery(tt.filter) + if result != tt.expected { + t.Errorf("IsAddressableEventQuery() = %v, want %v", result, tt.expected) + } + }) + } +} + +func TestQueryForAddressableEvent(t *testing.T) { + // Create temporary database + tempDir, err := os.MkdirTemp("", "test-addressable-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, err := New(ctx, cancel, tempDir, "info") + if err != nil { + t.Fatalf("failed to create database: %v", err) + } + defer db.Close() + + // Generate a test keypair + signer := p8k.MustNew() + if err := signer.Generate(); chk.E(err) { + t.Fatal(err) + } + pub := signer.Pub() + + // Create and save a parameterized replaceable event (kind 30382) + dTagValue := []byte("test-identifier-12345") + ev := &event.E{ + Kind: 30382, + Pubkey: pub, + CreatedAt: time.Now().Unix(), + Content: []byte("Test content for addressable event"), + Tags: tag.NewS(tag.NewFromAny("d", dTagValue)), + } + + // Sign the event + if err := ev.Sign(signer); err != nil { + t.Fatalf("failed to sign event: %v", err) + } + + // Save the event + _, err = db.SaveEvent(ctx, ev) + if err != nil { + t.Fatalf("failed to save event: %v", err) + } + + // Query using the fast path + queryFilter := &filter.F{ + Kinds: kind.NewS(kind.New(30382)), + Authors: tag.NewFromBytesSlice(pub), + Tags: tag.NewS(tag.NewFromAny("#d", dTagValue)), + } + + // Test IsAddressableEventQuery + if !IsAddressableEventQuery(queryFilter) { + t.Errorf("Expected IsAddressableEventQuery to return true for valid NIP-33 filter") + } + + // Test QueryForAddressableEvent + serial, err := db.QueryForAddressableEvent(queryFilter) + if err != nil { + t.Fatalf("QueryForAddressableEvent failed: %v", err) + } + if serial == nil { + t.Fatalf("QueryForAddressableEvent returned nil serial, expected to find event") + } + + // Fetch the event and verify it matches + fetchedEv, err := db.FetchEventBySerial(serial) + if err != nil { + t.Fatalf("FetchEventBySerial failed: %v", err) + } + if fetchedEv == nil { + t.Fatalf("FetchEventBySerial returned nil event") + } + + // Verify it's the same event + if string(fetchedEv.ID[:]) != string(ev.ID[:]) { + t.Errorf("Fetched event ID doesn't match: got %x, want %x", fetchedEv.ID, ev.ID) + } + + // Test that QueryEvents also uses the fast path + evs, err := db.QueryEvents(ctx, queryFilter) + if err != nil { + t.Fatalf("QueryEvents failed: %v", err) + } + if len(evs) != 1 { + t.Fatalf("QueryEvents returned %d events, expected 1", len(evs)) + } + if string(evs[0].ID[:]) != string(ev.ID[:]) { + t.Errorf("QueryEvents returned wrong event: got %x, want %x", evs[0].ID, ev.ID) + } + + t.Logf("Successfully queried addressable event via fast path: kind=%d, d=%s", ev.Kind, string(dTagValue)) +} + +func TestQueryForAddressableEventNotFound(t *testing.T) { + // Create temporary database + tempDir, err := os.MkdirTemp("", "test-addressable-notfound-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, err := New(ctx, cancel, tempDir, "info") + if err != nil { + t.Fatalf("failed to create database: %v", err) + } + defer db.Close() + + // Generate a test keypair + signer := p8k.MustNew() + if err := signer.Generate(); chk.E(err) { + t.Fatal(err) + } + pub := signer.Pub() + + // Query for non-existent event + queryFilter := &filter.F{ + Kinds: kind.NewS(kind.New(30000)), + Authors: tag.NewFromBytesSlice(pub), + Tags: tag.NewS(tag.NewFromAny("#d", []byte("non-existent-d-tag"))), + } + + serial, err := db.QueryForAddressableEvent(queryFilter) + if err != nil { + t.Fatalf("QueryForAddressableEvent failed: %v", err) + } + if serial != nil { + t.Errorf("Expected nil serial for non-existent event, got %d", serial.Get()) + } +} + +func TestAddressableEventReplacement(t *testing.T) { + // Create temporary database + tempDir, err := os.MkdirTemp("", "test-addressable-replace-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, err := New(ctx, cancel, tempDir, "info") + if err != nil { + t.Fatalf("failed to create database: %v", err) + } + defer db.Close() + + // Generate a test keypair + signer := p8k.MustNew() + if err := signer.Generate(); chk.E(err) { + t.Fatal(err) + } + pub := signer.Pub() + + dTagValue := []byte("replaceable-event") + baseTime := time.Now().Unix() + + // Create and save the first event + ev1 := &event.E{ + Kind: 30000, + Pubkey: pub, + CreatedAt: baseTime, + Content: []byte("First version"), + Tags: tag.NewS(tag.NewFromAny("d", dTagValue)), + } + if err := ev1.Sign(signer); err != nil { + t.Fatalf("failed to sign event 1: %v", err) + } + if _, err := db.SaveEvent(ctx, ev1); err != nil { + t.Fatalf("failed to save event 1: %v", err) + } + + // Create and save a newer replacement event + ev2 := &event.E{ + Kind: 30000, + Pubkey: pub, + CreatedAt: baseTime + 1000, // Newer + Content: []byte("Second version - replacement"), + Tags: tag.NewS(tag.NewFromAny("d", dTagValue)), + } + if err := ev2.Sign(signer); err != nil { + t.Fatalf("failed to sign event 2: %v", err) + } + if _, err := db.SaveEvent(ctx, ev2); err != nil { + t.Fatalf("failed to save event 2: %v", err) + } + + // Query and verify we get the newer event via fast path + queryFilter := &filter.F{ + Kinds: kind.NewS(kind.New(30000)), + Authors: tag.NewFromBytesSlice(pub), + Tags: tag.NewS(tag.NewFromAny("#d", dTagValue)), + } + + serial, err := db.QueryForAddressableEvent(queryFilter) + if err != nil { + t.Fatalf("QueryForAddressableEvent failed: %v", err) + } + if serial == nil { + t.Fatalf("QueryForAddressableEvent returned nil serial") + } + + fetchedEv, err := db.FetchEventBySerial(serial) + if err != nil { + t.Fatalf("FetchEventBySerial failed: %v", err) + } + + // Should be the second (newer) event + if string(fetchedEv.ID[:]) != string(ev2.ID[:]) { + t.Errorf("Expected to get newer event (ev2), got different event") + } + if string(fetchedEv.Content) != "Second version - replacement" { + t.Errorf("Expected content 'Second version - replacement', got '%s'", string(fetchedEv.Content)) + } + + t.Logf("Replacement event correctly indexed: %x", ev2.ID) +} diff --git a/pkg/database/query-events.go b/pkg/database/query-events.go index 3a4179e..2672e98 100644 --- a/pkg/database/query-events.go +++ b/pkg/database/query-events.go @@ -61,6 +61,51 @@ func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDelete } defer d.ReleaseQuerySlot() + // Fast path for NIP-33 addressable event queries (kind + author + d-tag) + // This provides O(1) direct lookup instead of index scanning + if !showAllVersions && IsAddressableEventQuery(f) { + var serial *types.Uint40 + if serial, err = d.QueryForAddressableEvent(f); err != nil { + log.W.F("QueryEvents: addressable event fast path error: %v", err) + // Fall through to regular query path + err = nil + } else if serial != nil { + // Found via fast path - fetch the event + ev, fetchErr := d.FetchEventBySerial(serial) + if fetchErr == nil && ev != nil { + // Apply time filters if present + if f.Since != nil && ev.CreatedAt < f.Since.V { + log.T.F("QueryEvents: addressable event fast path - event before 'since' filter") + return // Return empty result + } + if f.Until != nil && ev.CreatedAt > f.Until.V { + log.T.F("QueryEvents: addressable event fast path - event after 'until' filter") + return // Return empty result + } + // Check deletion status (skip in open relay mode) + if !mode.IsOpen() { + if derr := d.CheckForDeleted(ev, nil); derr != nil { + log.T.F("QueryEvents: addressable event fast path - event deleted: %v", derr) + return // Return empty result + } + } + // Check expiration + if !mode.IsOpen() && CheckExpiration(ev) { + log.T.F("QueryEvents: addressable event fast path - event expired") + return // Return empty result + } + log.T.F("QueryEvents: addressable event fast path SUCCESS - kind=%d d=%s", + ev.Kind, string(ev.Tags.GetFirst([]byte("d")).Value())) + evs = append(evs, ev) + return + } + // Fetch failed - fall through to regular path + log.T.F("QueryEvents: addressable event fast path - fetch failed, falling back") + } + // Not found via fast path - fall through to regular query path + // This handles the case where the index doesn't exist yet (migration) + } + // Determine if we should return multiple versions of replaceable events // based on the limit parameter wantMultipleVersions := showAllVersions || (f.Limit != nil && *f.Limit > 1) diff --git a/pkg/database/save-event.go b/pkg/database/save-event.go index d61ea1b..f34759e 100644 --- a/pkg/database/save-event.go +++ b/pkg/database/save-event.go @@ -73,6 +73,38 @@ func (d *D) WouldReplaceEvent(ev *event.E) (bool, types.Uint40s, error) { return false, nil, nil } + // Fast path for parameterized replaceable events using AddressableEvent index + if kind.IsParameterizedReplaceable(ev.Kind) { + dTag := ev.Tags.GetFirst([]byte("d")) + if dTag == nil { + return false, nil, ErrMissingDTag + } + + // Build filter for direct lookup + f := &filter.F{ + Authors: tag.NewFromBytesSlice(ev.Pubkey), + Kinds: kind.NewS(kind.New(ev.Kind)), + Tags: tag.NewS( + tag.NewFromAny("#d", dTag.Value()), + ), + } + + // Try direct O(1) lookup via AddressableEvent index + if serial, err := d.QueryForAddressableEvent(f); err == nil && serial != nil { + oldEv, ferr := d.FetchEventBySerial(serial) + if ferr == nil && oldEv != nil { + if ev.CreatedAt < oldEv.CreatedAt { + return false, nil, ErrOlderThanExisting + } + // Candidate is newer or same age - it should replace + return true, nil, nil + } + // Fetch failed - fall through to slow path + } + // Index miss (migration pending) - fall through to slow path + } + + // Standard path for replaceable events and fallback for parameterized replaceable var f *filter.F if kind.IsReplaceable(ev.Kind) { f = &filter.F{ @@ -80,7 +112,7 @@ func (d *D) WouldReplaceEvent(ev *event.E) (bool, types.Uint40s, error) { Kinds: kind.NewS(kind.New(ev.Kind)), } } else { - // parameterized replaceable requires 'd' tag + // parameterized replaceable - build filter for slow path dTag := ev.Tags.GetFirst([]byte("d")) if dTag == nil { return false, nil, ErrMissingDTag @@ -319,6 +351,30 @@ func (d *D) SaveEvent(c context.Context, ev *event.E) ( // Cache the event ID mapping d.serialCache.CacheEventId(serial, ev.ID) + // Write AddressableEvent index for parameterized replaceable events (kinds 30000-39999) + // This enables O(1) direct lookup by pubkey + kind + d-tag for NIP-33 queries + // Key: aev|pubkey_hash|kind|dtag_hash, Value: serial (5 bytes) + if kind.IsParameterizedReplaceable(ev.Kind) { + dTag := ev.Tags.GetFirst([]byte("d")) + if dTag != nil { + aevKey, keyErr := BuildAddressableEventKey(ev.Pubkey, ev.Kind, dTag.Value()) + if keyErr == nil { + // Serialize the serial as the value + serBuf := bufpool.GetSmall() + if err = ser.MarshalWrite(serBuf); chk.E(err) { + bufpool.PutSmall(serBuf) + return + } + if err = txn.Set(aevKey, bufpool.CopyBytes(serBuf)); chk.E(err) { + bufpool.PutSmall(serBuf) + return + } + bufpool.PutSmall(serBuf) + log.T.F("SaveEvent: wrote AddressableEvent index for kind=%d d=%s", ev.Kind, string(dTag.Value())) + } + } + } + // Store compact event with cmp prefix // Format: cmp|serial|compact_event_data // This is the only storage format - legacy evt/sev/aev/rev prefixes