diff --git a/app/handle-req.go b/app/handle-req.go index db5fc13..22e1662 100644 --- a/app/handle-req.go +++ b/app/handle-req.go @@ -734,6 +734,10 @@ func (l *Listener) HandleReq(msg []byte) (err error) { if l.accessTracker != nil && len(events) > 0 { go func(evts event.S, connID string) { for _, ev := range evts { + // Validate event ID before calling GetSerialById + if len(ev.ID) != 32 { + continue + } if ser, err := l.DB.GetSerialById(ev.ID); err == nil && ser != nil { l.accessTracker.RecordAccess(ser.Get(), connID) } diff --git a/pkg/database/database.go b/pkg/database/database.go index 7191208..2b9dd44 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -13,6 +13,7 @@ import ( "github.com/dgraph-io/badger/v4/options" "lol.mleku.dev" "lol.mleku.dev/chk" + "lol.mleku.dev/log" "next.orly.dev/pkg/database/querycache" "git.mleku.dev/mleku/nostr/encoders/event" "git.mleku.dev/mleku/nostr/encoders/filter" @@ -20,6 +21,12 @@ import ( "git.mleku.dev/mleku/nostr/utils/units" ) +// DefaultMaxConcurrentQueries limits concurrent database queries to prevent memory exhaustion. +// Each query creates Badger iterators that consume significant memory. With many concurrent +// connections, unlimited queries can exhaust available memory in seconds. +// Set very low (3) because each query can internally create many iterators. +const DefaultMaxConcurrentQueries = 3 + // RateLimiterInterface defines the minimal interface for rate limiting during import type RateLimiterInterface interface { IsEnabled() bool @@ -47,6 +54,10 @@ type D struct { // Rate limiter for controlling memory pressure during bulk operations rateLimiter RateLimiterInterface + + // Query semaphore limits concurrent database queries to prevent memory exhaustion. + // Each query with iterators consumes significant memory; this prevents OOM under load. + querySem chan struct{} } // SetRateLimiter sets the rate limiter for controlling memory during import/export @@ -139,6 +150,7 @@ func NewWithConfig( ready: make(chan struct{}), queryCache: qc, serialCache: NewSerialCache(serialCachePubkeys, serialCacheEventIds), + querySem: make(chan struct{}, DefaultMaxConcurrentQueries), } // Ensure the data directory exists @@ -353,3 +365,24 @@ func (d *D) Close() (err error) { } return } + +// AcquireQuerySlot acquires a slot from the query semaphore to limit concurrent queries. +// This blocks until a slot is available or the context is cancelled. +// Returns true if slot was acquired, false if context cancelled. +func (d *D) AcquireQuerySlot(ctx context.Context) bool { + select { + case d.querySem <- struct{}{}: + return true + case <-ctx.Done(): + return false + } +} + +// ReleaseQuerySlot releases a previously acquired query slot. +func (d *D) ReleaseQuerySlot() { + select { + case <-d.querySem: + default: + log.W.F("ReleaseQuerySlot called without matching AcquireQuerySlot") + } +} diff --git a/pkg/database/fetch-events-by-serials.go b/pkg/database/fetch-events-by-serials.go index 963f6df..8c11575 100644 --- a/pkg/database/fetch-events-by-serials.go +++ b/pkg/database/fetch-events-by-serials.go @@ -33,6 +33,14 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*ev if err = d.View( func(txn *badger.Txn) (err error) { + // Create ONE iterator for sev prefix lookups - reused for all serials + // This dramatically reduces memory usage vs creating one per serial + sevOpts := badger.DefaultIteratorOptions + sevOpts.PrefetchValues = false // We read from key, not value + sevOpts.PrefetchSize = 1 + sevIt := txn.NewIterator(sevOpts) + defer sevIt.Close() + for _, ser := range serials { var ev *event.E serialVal := ser.Get() @@ -45,8 +53,8 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*ev } err = nil // Reset error, try legacy formats - // Try sev (small event inline) prefix - legacy Reiser4 optimization - ev, err = d.fetchSmallEvent(txn, ser) + // Try sev (small event inline) using shared iterator + ev, err = d.fetchSmallEventWithIterator(txn, ser, sevIt) if err == nil && ev != nil { events[serialVal] = ev continue @@ -70,6 +78,61 @@ func (d *D) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*ev return events, nil } +// fetchSmallEventWithIterator uses a provided iterator for sev lookups (memory efficient) +func (d *D) fetchSmallEventWithIterator(txn *badger.Txn, ser *types.Uint40, it *badger.Iterator) (ev *event.E, err error) { + smallBuf := new(bytes.Buffer) + if err = indexes.SmallEventEnc(ser).MarshalWrite(smallBuf); chk.E(err) { + return nil, err + } + prefix := smallBuf.Bytes() + + // Seek to the prefix for this serial + it.Seek(prefix) + if !it.ValidForPrefix(prefix) { + return nil, nil // Not found + } + + // Found in sev table - extract inline data + key := it.Item().KeyCopy(nil) // Copy key as iterator may be reused + // Key format: sev|serial|size_uint16|event_data + if len(key) <= 8+2 { // prefix(3) + serial(5) + size(2) = 10 bytes minimum + return nil, nil + } + + sizeIdx := 8 // After sev(3) + serial(5) + // Read uint16 big-endian size + size := int(key[sizeIdx])<<8 | int(key[sizeIdx+1]) + dataStart := sizeIdx + 2 + + if len(key) < dataStart+size { + return nil, nil + } + + eventData := key[dataStart : dataStart+size] + + // Check if this is compact format (starts with version byte 1) + if len(eventData) > 0 && eventData[0] == CompactFormatVersion { + // This is compact format stored in sev - need to decode with resolver + resolver := NewDatabaseSerialResolver(d, d.serialCache) + eventId, idErr := d.GetEventIdBySerial(ser) + if idErr != nil { + // Cannot decode compact format without event ID - return error + // DO NOT fall back to legacy unmarshal as compact format is not valid legacy format + log.W.F("fetchSmallEventWithIterator: compact format but no event ID mapping for serial %d: %v", ser.Get(), idErr) + return nil, idErr + } + return UnmarshalCompactEvent(eventData, eventId, resolver) + } + + // Legacy binary format + ev = new(event.E) + if err = ev.UnmarshalBinary(bytes.NewBuffer(eventData)); err != nil { + return nil, err + } + + return ev, nil +} + // fetchCompactEvent tries to fetch an event from the compact format (cmp prefix). func (d *D) fetchCompactEvent(txn *badger.Txn, ser *types.Uint40, resolver SerialResolver) (ev *event.E, err error) { // Build cmp key diff --git a/pkg/database/get-serial-by-id.go b/pkg/database/get-serial-by-id.go index cc69079..a120bd7 100644 --- a/pkg/database/get-serial-by-id.go +++ b/pkg/database/get-serial-by-id.go @@ -21,7 +21,8 @@ import ( func (d *D) GetSerialById(id []byte) (ser *types.Uint40, err error) { // log.T.F("GetSerialById: input id=%s", hex.Enc(id)) if len(id) == 0 { - err = errorf.E("GetSerialById: called with empty ID") + // Return error without logging - caller should validate ID before calling + err = errorf.E("empty event ID") return } var idxs []Range diff --git a/pkg/database/query-events.go b/pkg/database/query-events.go index fa44bcd..3a4179e 100644 --- a/pkg/database/query-events.go +++ b/pkg/database/query-events.go @@ -54,6 +54,13 @@ func (d *D) QueryAllVersions(c context.Context, f *filter.F) ( func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDeleteEvents bool, showAllVersions bool) ( evs event.S, err error, ) { + // Acquire query slot to limit concurrent queries and prevent memory exhaustion + if !d.AcquireQuerySlot(c) { + err = c.Err() + return + } + defer d.ReleaseQuerySlot() + // Determine if we should return multiple versions of replaceable events // based on the limit parameter wantMultipleVersions := showAllVersions || (f.Limit != nil && *f.Limit > 1) @@ -310,18 +317,13 @@ func (d *D) QueryEventsWithOptions(c context.Context, f *filter.F, includeDelete } } - // If not found in current batch, try to fetch it directly + // DISABLED: Fetching target events not in current batch causes memory + // exhaustion under high concurrent load. Each GetSerialById call creates + // a Badger iterator that consumes significant memory. With many connections + // processing deletion events, this explodes memory usage (~300MB/connection). + // The deletion will still work when the target event is directly queried. if targetEv == nil { - // Get serial for the event ID - ser, serErr := d.GetSerialById(evId) - if serErr != nil || ser == nil { - continue - } - // Fetch the event by serial - targetEv, serErr = d.FetchEventBySerial(ser) - if serErr != nil || targetEv == nil { - continue - } + continue } // Only allow users to delete their own events diff --git a/pkg/version/version b/pkg/version/version index 5714657..361fdaa 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.52.12 +v0.52.17