You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
120 lines
2.8 KiB
120 lines
2.8 KiB
package app |
|
|
|
import ( |
|
"context" |
|
"errors" |
|
|
|
"encoders.orly/envelopes/closedenvelope" |
|
"encoders.orly/envelopes/eoseenvelope" |
|
"encoders.orly/envelopes/eventenvelope" |
|
"encoders.orly/envelopes/reqenvelope" |
|
"encoders.orly/event" |
|
"encoders.orly/filter" |
|
"encoders.orly/tag" |
|
"github.com/dgraph-io/badger/v4" |
|
"lol.mleku.dev/chk" |
|
"lol.mleku.dev/log" |
|
"utils.orly/normalize" |
|
"utils.orly/pointers" |
|
) |
|
|
|
func (l *Listener) HandleReq(c context.Context, msg []byte) ( |
|
err error, |
|
) { |
|
var rem []byte |
|
env := reqenvelope.New() |
|
if rem, err = env.Unmarshal(msg); chk.E(err) { |
|
return normalize.Error.Errorf(err.Error()) |
|
} |
|
if len(rem) > 0 { |
|
log.I.F("extra '%s'", rem) |
|
} |
|
var events event.S |
|
for _, f := range *env.Filters { |
|
if pointers.Present(f.Limit) { |
|
if *f.Limit == 0 { |
|
continue |
|
} |
|
} |
|
if events, err = l.QueryEvents(c, f); chk.E(err) { |
|
if errors.Is(err, badger.ErrDBClosed) { |
|
return |
|
} |
|
err = nil |
|
} |
|
} |
|
// write out the events to the socket |
|
seen := make(map[string]struct{}) |
|
for _, ev := range events { |
|
// track the IDs we've sent |
|
seen[string(ev.ID)] = struct{}{} |
|
var res *eventenvelope.Result |
|
if res, err = eventenvelope.NewResultWith( |
|
env.Subscription, ev, |
|
); chk.E(err) { |
|
return |
|
} |
|
if err = res.Write(l); chk.E(err) { |
|
return |
|
} |
|
} |
|
// write the EOSE to signal to the client that all events found have been |
|
// sent. |
|
if err = eoseenvelope.NewFrom(env.Subscription). |
|
Write(l); chk.E(err) { |
|
return |
|
} |
|
// if the query was for just Ids, we know there can't be any more results, |
|
// so cancel the subscription. |
|
cancel := true |
|
var subbedFilters filter.S |
|
for _, f := range *env.Filters { |
|
if f.Ids.Len() < 1 { |
|
cancel = false |
|
subbedFilters = append(subbedFilters, f) |
|
} else { |
|
// remove the IDs that we already sent |
|
var notFounds [][]byte |
|
for _, ev := range events { |
|
if _, ok := seen[string(ev.ID)]; ok { |
|
continue |
|
} |
|
notFounds = append(notFounds, ev.ID) |
|
} |
|
// if all were found, don't add to subbedFilters |
|
if len(notFounds) == 0 { |
|
continue |
|
} |
|
// rewrite the filter Ids to remove the ones we already sent |
|
f.Ids = tag.NewFromBytesSlice(notFounds...) |
|
// add the filter to the list of filters we're subscribing to |
|
subbedFilters = append(subbedFilters, f) |
|
} |
|
// also, if we received the limit number of events, subscription ded |
|
if pointers.Present(f.Limit) { |
|
if len(events) < int(*f.Limit) { |
|
cancel = false |
|
} |
|
} |
|
} |
|
receiver := make(event.C, 32) |
|
// if the subscription should be cancelled, do so |
|
if !cancel { |
|
l.publishers.Receive( |
|
&W{ |
|
Conn: l.conn, |
|
remote: l.remote, |
|
Id: string(env.Subscription), |
|
Receiver: receiver, |
|
Filters: env.Filters, |
|
}, |
|
) |
|
} else { |
|
if err = closedenvelope.NewFrom( |
|
env.Subscription, nil, |
|
).Write(l); chk.E(err) { |
|
return |
|
} |
|
} |
|
return |
|
}
|
|
|