|
|
|
|
@ -1,6 +1,7 @@
@@ -1,6 +1,7 @@
|
|
|
|
|
package app |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"bytes" |
|
|
|
|
"context" |
|
|
|
|
"net/http" |
|
|
|
|
"strings" |
|
|
|
|
@ -38,6 +39,7 @@ type Listener struct {
@@ -38,6 +39,7 @@ type Listener struct {
|
|
|
|
|
messageQueue chan messageRequest // Buffered channel for message processing
|
|
|
|
|
processingDone chan struct{} // Closed when message processor exits
|
|
|
|
|
handlerWg sync.WaitGroup // Tracks spawned message handler goroutines
|
|
|
|
|
authProcessing sync.RWMutex // Ensures AUTH completes before other messages check authentication
|
|
|
|
|
// Flow control counters (atomic for concurrent access)
|
|
|
|
|
droppedMessages atomic.Int64 // Messages dropped due to full queue
|
|
|
|
|
// Diagnostics: per-connection counters
|
|
|
|
|
@ -218,9 +220,26 @@ func (l *Listener) messageProcessor() {
@@ -218,9 +220,26 @@ func (l *Listener) messageProcessor() {
|
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Process the message in a separate goroutine to avoid blocking
|
|
|
|
|
// This allows multiple messages to be processed concurrently (like khatru does)
|
|
|
|
|
// Track the goroutine so we can wait for it during cleanup
|
|
|
|
|
// Lock immediately to ensure AUTH is processed before subsequent messages
|
|
|
|
|
// are dequeued. This prevents race conditions where EVENT checks authentication
|
|
|
|
|
// before AUTH completes.
|
|
|
|
|
l.authProcessing.Lock() |
|
|
|
|
|
|
|
|
|
// Check if this is an AUTH message by looking for the ["AUTH" prefix
|
|
|
|
|
isAuthMessage := len(req.data) > 7 && bytes.HasPrefix(req.data, []byte(`["AUTH"`)) |
|
|
|
|
|
|
|
|
|
if isAuthMessage { |
|
|
|
|
// Process AUTH message synchronously while holding lock
|
|
|
|
|
// This blocks the messageProcessor from dequeuing the next message
|
|
|
|
|
// until authentication is complete and authedPubkey is set
|
|
|
|
|
log.D.F("ws->%s processing AUTH synchronously with lock", req.remote) |
|
|
|
|
l.HandleMessage(req.data, req.remote) |
|
|
|
|
// Unlock after AUTH completes so subsequent messages see updated authedPubkey
|
|
|
|
|
l.authProcessing.Unlock() |
|
|
|
|
} else { |
|
|
|
|
// Not AUTH - unlock immediately and process concurrently
|
|
|
|
|
// The next message can now be dequeued (possibly another non-AUTH to process concurrently)
|
|
|
|
|
l.authProcessing.Unlock() |
|
|
|
|
l.handlerWg.Add(1) |
|
|
|
|
go func(data []byte, remote string) { |
|
|
|
|
defer l.handlerWg.Done() |
|
|
|
|
@ -228,6 +247,7 @@ func (l *Listener) messageProcessor() {
@@ -228,6 +247,7 @@ func (l *Listener) messageProcessor() {
|
|
|
|
|
}(req.data, req.remote) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// getManagedACL returns the managed ACL instance if available
|
|
|
|
|
|