You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
89 lines
2.0 KiB
89 lines
2.0 KiB
package neo4j |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"sync" |
|
) |
|
|
|
// Serial number management |
|
// We use a special Marker node in Neo4j to track the next available serial number |
|
|
|
const serialCounterKey = "serial_counter" |
|
|
|
var ( |
|
serialMutex sync.Mutex |
|
) |
|
|
|
// getNextSerial atomically increments and returns the next serial number |
|
func (n *N) getNextSerial() (uint64, error) { |
|
serialMutex.Lock() |
|
defer serialMutex.Unlock() |
|
|
|
ctx := context.Background() |
|
|
|
// Query current serial value |
|
cypher := "MATCH (m:Marker {key: $key}) RETURN m.value AS value" |
|
params := map[string]any{"key": serialCounterKey} |
|
|
|
result, err := n.ExecuteRead(ctx, cypher, params) |
|
if err != nil { |
|
return 0, fmt.Errorf("failed to query serial counter: %w", err) |
|
} |
|
|
|
var currentSerial uint64 = 1 |
|
if result.Next(ctx) { |
|
record := result.Record() |
|
if record != nil { |
|
valueRaw, found := record.Get("value") |
|
if found { |
|
if value, ok := valueRaw.(int64); ok { |
|
currentSerial = uint64(value) |
|
} |
|
} |
|
} |
|
} |
|
|
|
// Increment serial |
|
nextSerial := currentSerial + 1 |
|
|
|
// Update counter |
|
updateCypher := ` |
|
MERGE (m:Marker {key: $key}) |
|
SET m.value = $value` |
|
updateParams := map[string]any{ |
|
"key": serialCounterKey, |
|
"value": int64(nextSerial), |
|
} |
|
|
|
_, err = n.ExecuteWrite(ctx, updateCypher, updateParams) |
|
if err != nil { |
|
return 0, fmt.Errorf("failed to update serial counter: %w", err) |
|
} |
|
|
|
return currentSerial, nil |
|
} |
|
|
|
// initSerialCounter initializes the serial counter if it doesn't exist |
|
// Uses MERGE to be idempotent - safe to call multiple times |
|
func (n *N) initSerialCounter() error { |
|
ctx := context.Background() |
|
|
|
// Use MERGE with ON CREATE to initialize only if it doesn't exist |
|
// This is idempotent and avoids race conditions |
|
initCypher := ` |
|
MERGE (m:Marker {key: $key}) |
|
ON CREATE SET m.value = $value` |
|
initParams := map[string]any{ |
|
"key": serialCounterKey, |
|
"value": int64(1), |
|
} |
|
|
|
_, err := n.ExecuteWrite(ctx, initCypher, initParams) |
|
if err != nil { |
|
return fmt.Errorf("failed to initialize serial counter: %w", err) |
|
} |
|
|
|
n.Logger.Debugf("serial counter initialized/verified") |
|
return nil |
|
}
|
|
|