Browse Source

Add context to `NewPublisher`, improve logging levels, dispatch events on publish, and refine envelope handling

fixes a panic from the nil context
main
mleku 4 months ago
parent
commit
898aa0cb63
No known key found for this signature in database
  1. 1
      app/handle-event.go
  2. 1
      app/handle-req.go
  3. 2
      app/handle-websocket.go
  4. 2
      app/main.go
  5. 2
      app/ok.go
  6. 18
      app/publisher.go

1
app/handle-event.go

@ -57,6 +57,7 @@ func (l *Listener) HandleEvent(c context.Context, msg []byte) (
if _, _, err = l.SaveEvent(c, env.E, false, nil); chk.E(err) { if _, _, err = l.SaveEvent(c, env.E, false, nil); chk.E(err) {
return return
} }
l.publishers.Deliver(env.E)
// Send a success response storing // Send a success response storing
if err = Ok.Ok(l, env, ""); chk.E(err) { if err = Ok.Ok(l, env, ""); chk.E(err) {
return return

1
app/handle-req.go

@ -60,6 +60,7 @@ func (l *Listener) HandleReq(c context.Context, msg []byte) (
} }
// write the EOSE to signal to the client that all events found have been // write the EOSE to signal to the client that all events found have been
// sent. // sent.
log.T.F("sending EOSE to %s", l.remote)
if err = eoseenvelope.NewFrom(env.Subscription). if err = eoseenvelope.NewFrom(env.Subscription).
Write(l); chk.E(err) { Write(l); chk.E(err) {
return return

2
app/handle-websocket.go

@ -80,7 +80,7 @@ whitelist:
} }
var typ websocket.MessageType var typ websocket.MessageType
var msg []byte var msg []byte
log.I.F("waiting for message from %s", remote) log.T.F("waiting for message from %s", remote)
if typ, msg, err = conn.Read(ctx); chk.E(err) { if typ, msg, err = conn.Read(ctx); chk.E(err) {
if strings.Contains( if strings.Contains(
err.Error(), "use of closed network connection", err.Error(), "use of closed network connection",

2
app/main.go

@ -28,7 +28,7 @@ func Run(
Ctx: ctx, Ctx: ctx,
Config: cfg, Config: cfg,
D: db, D: db,
publishers: publish.New(NewPublisher()), publishers: publish.New(NewPublisher(ctx)),
} }
addr := fmt.Sprintf("%s:%d", cfg.Listen, cfg.Port) addr := fmt.Sprintf("%s:%d", cfg.Listen, cfg.Port)
log.I.F("starting listener on http://%s", addr) log.I.F("starting listener on http://%s", addr)

2
app/ok.go

@ -40,7 +40,7 @@ var Ok = OKs{
params ...any, params ...any,
) (err error) { ) (err error) {
return okenvelope.NewFrom( return okenvelope.NewFrom(
env.Id(), true, nil, env.Id(), true, []byte{},
).Write(l) ).Write(l)
}, },
AuthRequired: func( AuthRequired: func(

18
app/publisher.go

@ -63,8 +63,9 @@ type P struct {
var _ publisher.I = &P{} var _ publisher.I = &P{}
func NewPublisher() (publisher *P) { func NewPublisher(c context.Context) (publisher *P) {
return &P{ return &P{
c: c,
Map: make(Map), Map: make(Map),
} }
} }
@ -113,7 +114,7 @@ func (p *P) Receive(msg typer.T) {
subs = make(map[string]Subscription) subs = make(map[string]Subscription)
subs[m.Id] = Subscription{S: m.Filters, remote: m.remote} subs[m.Id] = Subscription{S: m.Filters, remote: m.remote}
p.Map[m.Conn] = subs p.Map[m.Conn] = subs
log.T.C( log.D.C(
func() string { func() string {
return fmt.Sprintf( return fmt.Sprintf(
"created new subscription for %s, %s", "created new subscription for %s, %s",
@ -124,7 +125,7 @@ func (p *P) Receive(msg typer.T) {
) )
} else { } else {
subs[m.Id] = Subscription{S: m.Filters, remote: m.remote} subs[m.Id] = Subscription{S: m.Filters, remote: m.remote}
log.T.C( log.D.C(
func() string { func() string {
return fmt.Sprintf( return fmt.Sprintf(
"added subscription %s for %s", m.Id, "added subscription %s for %s", m.Id,
@ -151,7 +152,7 @@ func (p *P) Deliver(ev *event.E) {
var err error var err error
p.Mx.Lock() p.Mx.Lock()
defer p.Mx.Unlock() defer p.Mx.Unlock()
log.T.C( log.D.C(
func() string { func() string {
return fmt.Sprintf( return fmt.Sprintf(
"delivering event %0x to websocket subscribers %d", ev.ID, "delivering event %0x to websocket subscribers %d", ev.ID,
@ -160,13 +161,6 @@ func (p *P) Deliver(ev *event.E) {
}, },
) )
for w, subs := range p.Map { for w, subs := range p.Map {
log.T.C(
func() string {
return fmt.Sprintf(
"%v %s", subs,
)
},
)
for id, subscriber := range subs { for id, subscriber := range subs {
if !subscriber.Match(ev) { if !subscriber.Match(ev) {
continue continue
@ -185,7 +179,7 @@ func (p *P) Deliver(ev *event.E) {
); chk.E(err) { ); chk.E(err) {
continue continue
} }
log.T.C( log.D.C(
func() string { func() string {
return fmt.Sprintf( return fmt.Sprintf(
"dispatched event %0x to subscription %s, %s", "dispatched event %0x to subscription %s, %s",

Loading…
Cancel
Save