You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
256 lines
6.9 KiB
256 lines
6.9 KiB
package neo4j |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
|
|
"next.orly.dev/pkg/database/indexes/types" |
|
"git.mleku.dev/mleku/nostr/encoders/event" |
|
"git.mleku.dev/mleku/nostr/encoders/filter" |
|
"git.mleku.dev/mleku/nostr/encoders/hex" |
|
) |
|
|
|
// SaveEvent stores a Nostr event in the Neo4j database. |
|
// It creates event nodes and relationships for authors, tags, and references. |
|
// This method leverages Neo4j's graph capabilities to model Nostr's social graph naturally. |
|
func (n *N) SaveEvent(c context.Context, ev *event.E) (exists bool, err error) { |
|
eventID := hex.Enc(ev.ID[:]) |
|
|
|
// Check if event already exists |
|
checkCypher := "MATCH (e:Event {id: $id}) RETURN e.id AS id" |
|
checkParams := map[string]any{"id": eventID} |
|
|
|
result, err := n.ExecuteRead(c, checkCypher, checkParams) |
|
if err != nil { |
|
return false, fmt.Errorf("failed to check event existence: %w", err) |
|
} |
|
|
|
// Check if we got a result |
|
ctx := context.Background() |
|
if result.Next(ctx) { |
|
return true, nil // Event already exists |
|
} |
|
|
|
// Get next serial number |
|
serial, err := n.getNextSerial() |
|
if err != nil { |
|
return false, fmt.Errorf("failed to get serial number: %w", err) |
|
} |
|
|
|
// Build and execute Cypher query to create event with all relationships |
|
cypher, params := n.buildEventCreationCypher(ev, serial) |
|
|
|
if _, err = n.ExecuteWrite(c, cypher, params); err != nil { |
|
return false, fmt.Errorf("failed to save event: %w", err) |
|
} |
|
|
|
return false, nil |
|
} |
|
|
|
// buildEventCreationCypher constructs a Cypher query to create an event node with all relationships |
|
// This is a single atomic operation that creates: |
|
// - Event node with all properties |
|
// - Author node and AUTHORED_BY relationship |
|
// - Tag nodes and TAGGED_WITH relationships |
|
// - Reference relationships (REFERENCES for 'e' tags, MENTIONS for 'p' tags) |
|
func (n *N) buildEventCreationCypher(ev *event.E, serial uint64) (string, map[string]any) { |
|
params := make(map[string]any) |
|
|
|
// Event properties |
|
eventID := hex.Enc(ev.ID[:]) |
|
authorPubkey := hex.Enc(ev.Pubkey[:]) |
|
|
|
params["eventId"] = eventID |
|
params["serial"] = serial |
|
params["kind"] = int64(ev.Kind) |
|
params["createdAt"] = ev.CreatedAt |
|
params["content"] = string(ev.Content) |
|
params["sig"] = hex.Enc(ev.Sig[:]) |
|
params["pubkey"] = authorPubkey |
|
|
|
// Serialize tags as JSON string for storage |
|
tagsJSON, _ := ev.Tags.MarshalJSON() |
|
params["tags"] = string(tagsJSON) |
|
|
|
// Start building the Cypher query |
|
// Use MERGE to ensure idempotency for author nodes |
|
cypher := ` |
|
// Create or match author node |
|
MERGE (a:Author {pubkey: $pubkey}) |
|
|
|
// Create event node |
|
CREATE (e:Event { |
|
id: $eventId, |
|
serial: $serial, |
|
kind: $kind, |
|
created_at: $createdAt, |
|
content: $content, |
|
sig: $sig, |
|
pubkey: $pubkey, |
|
tags: $tags |
|
}) |
|
|
|
// Link event to author |
|
CREATE (e)-[:AUTHORED_BY]->(a) |
|
` |
|
|
|
// Process tags to create relationships |
|
// Different tag types create different relationship patterns |
|
tagNodeIndex := 0 |
|
eTagIndex := 0 |
|
pTagIndex := 0 |
|
|
|
for _, tagItem := range *ev.Tags { |
|
if len(tagItem.T) < 2 { |
|
continue |
|
} |
|
|
|
tagType := string(tagItem.T[0]) |
|
tagValue := string(tagItem.T[1]) |
|
|
|
switch tagType { |
|
case "e": // Event reference - creates REFERENCES relationship |
|
// Create reference to another event (if it exists) |
|
paramName := fmt.Sprintf("eTag_%d", eTagIndex) |
|
params[paramName] = tagValue |
|
|
|
cypher += fmt.Sprintf(` |
|
// Reference to event (e-tag) |
|
OPTIONAL MATCH (ref%d:Event {id: $%s}) |
|
FOREACH (ignoreMe IN CASE WHEN ref%d IS NOT NULL THEN [1] ELSE [] END | |
|
CREATE (e)-[:REFERENCES]->(ref%d) |
|
) |
|
`, eTagIndex, paramName, eTagIndex, eTagIndex) |
|
|
|
eTagIndex++ |
|
|
|
case "p": // Pubkey mention - creates MENTIONS relationship |
|
// Create mention to another author |
|
paramName := fmt.Sprintf("pTag_%d", pTagIndex) |
|
params[paramName] = tagValue |
|
|
|
cypher += fmt.Sprintf(` |
|
// Mention of author (p-tag) |
|
MERGE (mentioned%d:Author {pubkey: $%s}) |
|
CREATE (e)-[:MENTIONS]->(mentioned%d) |
|
`, pTagIndex, paramName, pTagIndex) |
|
|
|
pTagIndex++ |
|
|
|
default: // Other tags - creates Tag nodes and TAGGED_WITH relationships |
|
// Create tag node and relationship |
|
typeParam := fmt.Sprintf("tagType_%d", tagNodeIndex) |
|
valueParam := fmt.Sprintf("tagValue_%d", tagNodeIndex) |
|
params[typeParam] = tagType |
|
params[valueParam] = tagValue |
|
|
|
cypher += fmt.Sprintf(` |
|
// Generic tag relationship |
|
MERGE (tag%d:Tag {type: $%s, value: $%s}) |
|
CREATE (e)-[:TAGGED_WITH]->(tag%d) |
|
`, tagNodeIndex, typeParam, valueParam, tagNodeIndex) |
|
|
|
tagNodeIndex++ |
|
} |
|
} |
|
|
|
// Return the created event |
|
cypher += ` |
|
RETURN e.id AS id` |
|
|
|
return cypher, params |
|
} |
|
|
|
// GetSerialsFromFilter returns event serials matching a filter |
|
func (n *N) GetSerialsFromFilter(f *filter.F) (serials types.Uint40s, err error) { |
|
// Use QueryForSerials with background context |
|
return n.QueryForSerials(context.Background(), f) |
|
} |
|
|
|
// WouldReplaceEvent checks if an event would replace existing events |
|
// This handles replaceable events (kinds 0, 3, and 10000-19999) |
|
// and parameterized replaceable events (kinds 30000-39999) |
|
func (n *N) WouldReplaceEvent(ev *event.E) (bool, types.Uint40s, error) { |
|
// Check for replaceable events (kinds 0, 3, and 10000-19999) |
|
isReplaceable := ev.Kind == 0 || ev.Kind == 3 || (ev.Kind >= 10000 && ev.Kind < 20000) |
|
|
|
// Check for parameterized replaceable events (kinds 30000-39999) |
|
isParameterizedReplaceable := ev.Kind >= 30000 && ev.Kind < 40000 |
|
|
|
if !isReplaceable && !isParameterizedReplaceable { |
|
return false, nil, nil |
|
} |
|
|
|
authorPubkey := hex.Enc(ev.Pubkey[:]) |
|
ctx := context.Background() |
|
|
|
var cypher string |
|
params := map[string]any{ |
|
"pubkey": authorPubkey, |
|
"kind": int64(ev.Kind), |
|
"createdAt": ev.CreatedAt, |
|
} |
|
|
|
if isParameterizedReplaceable { |
|
// For parameterized replaceable events, we need to match on d-tag as well |
|
dTag := ev.Tags.GetFirst([]byte{'d'}) |
|
if dTag == nil { |
|
return false, nil, nil |
|
} |
|
|
|
dValue := "" |
|
if len(dTag.T) >= 2 { |
|
dValue = string(dTag.T[1]) |
|
} |
|
|
|
params["dValue"] = dValue |
|
|
|
// Query for existing parameterized replaceable events with same kind, pubkey, and d-tag |
|
cypher = ` |
|
MATCH (e:Event {kind: $kind, pubkey: $pubkey})-[:TAGGED_WITH]->(t:Tag {type: 'd', value: $dValue}) |
|
WHERE e.created_at < $createdAt |
|
RETURN e.serial AS serial, e.created_at AS created_at |
|
ORDER BY e.created_at DESC` |
|
|
|
} else { |
|
// Query for existing replaceable events with same kind and pubkey |
|
cypher = ` |
|
MATCH (e:Event {kind: $kind, pubkey: $pubkey}) |
|
WHERE e.created_at < $createdAt |
|
RETURN e.serial AS serial, e.created_at AS created_at |
|
ORDER BY e.created_at DESC` |
|
} |
|
|
|
result, err := n.ExecuteRead(ctx, cypher, params) |
|
if err != nil { |
|
return false, nil, fmt.Errorf("failed to query replaceable events: %w", err) |
|
} |
|
|
|
// Parse results |
|
var serials types.Uint40s |
|
wouldReplace := false |
|
|
|
for result.Next(ctx) { |
|
record := result.Record() |
|
if record == nil { |
|
continue |
|
} |
|
|
|
serialRaw, found := record.Get("serial") |
|
if !found { |
|
continue |
|
} |
|
|
|
serialVal, ok := serialRaw.(int64) |
|
if !ok { |
|
continue |
|
} |
|
|
|
wouldReplace = true |
|
serial := types.Uint40{} |
|
serial.Set(uint64(serialVal)) |
|
serials = append(serials, &serial) |
|
} |
|
|
|
return wouldReplace, serials, nil |
|
}
|
|
|