3 changed files with 161 additions and 0 deletions
@ -0,0 +1,86 @@ |
|||||||
|
// Package eoseenvelope provides an encoder for the EOSE (End Of Stored
|
||||||
|
// Events) event that signifies that a REQ has found all stored events and
|
||||||
|
// from here on the request morphs into a subscription, until the limit, if
|
||||||
|
// requested, or until CLOSE or CLOSED.
|
||||||
|
package eoseenvelope |
||||||
|
|
||||||
|
import ( |
||||||
|
"io" |
||||||
|
|
||||||
|
"lol.mleku.dev/chk" |
||||||
|
"next.orly.dev/pkg/encoders/envelopes" |
||||||
|
"next.orly.dev/pkg/encoders/text" |
||||||
|
"next.orly.dev/pkg/interfaces/codec" |
||||||
|
) |
||||||
|
|
||||||
|
// L is the label associated with this type of codec.Envelope.
|
||||||
|
const L = "EOSE" |
||||||
|
|
||||||
|
// T is an EOSE envelope (End of Stored Events), that signals the end of events
|
||||||
|
// that are stored and the beginning of a subscription. This is necessitated by
|
||||||
|
// the confusing multiplexing of websockets for multiple requests, and an ugly
|
||||||
|
// merging of two distinct API calls, filter and subscribe.
|
||||||
|
type T struct { |
||||||
|
Subscription []byte |
||||||
|
} |
||||||
|
|
||||||
|
var _ codec.Envelope = (*T)(nil) |
||||||
|
|
||||||
|
// New creates a new eoseenvelope.T with a standard form subscription.Id.
|
||||||
|
func New() *T { |
||||||
|
return new(T) |
||||||
|
} |
||||||
|
|
||||||
|
// NewFrom creates a new eoseenvelope.T using a provided subscription.Id.
|
||||||
|
func NewFrom[V []byte | string](id V) *T { return &T{Subscription: []byte(id)} } |
||||||
|
|
||||||
|
// Label returns the label of a EOSE envelope.
|
||||||
|
func (en *T) Label() string { return L } |
||||||
|
|
||||||
|
// Write the eoseenvelope.T to a provided io.Writer.
|
||||||
|
func (en *T) Write(w io.Writer) (err error) { |
||||||
|
_, err = w.Write(en.Marshal(nil)) |
||||||
|
return |
||||||
|
} |
||||||
|
|
||||||
|
// Marshal a eoseenvelope.T envelope in minified JSON, appending to a provided
|
||||||
|
// destination slice.
|
||||||
|
func (en *T) Marshal(dst []byte) (b []byte) { |
||||||
|
var err error |
||||||
|
b = dst |
||||||
|
b = envelopes.Marshal( |
||||||
|
b, L, |
||||||
|
func(bst []byte) (o []byte) { |
||||||
|
o = bst |
||||||
|
o = append(o, '"') |
||||||
|
o = append(o, en.Subscription...) |
||||||
|
o = append(o, '"') |
||||||
|
return |
||||||
|
}, |
||||||
|
) |
||||||
|
_ = err |
||||||
|
return |
||||||
|
} |
||||||
|
|
||||||
|
// Unmarshal a eoseenvelope.T from minified JSON, returning the remainder after
|
||||||
|
// the end of the envelope.
|
||||||
|
func (en *T) Unmarshal(b []byte) (r []byte, err error) { |
||||||
|
r = b |
||||||
|
if en.Subscription, r, err = text.UnmarshalQuoted(r); chk.E(err) { |
||||||
|
return |
||||||
|
} |
||||||
|
if r, err = envelopes.SkipToTheEnd(r); chk.E(err) { |
||||||
|
return |
||||||
|
} |
||||||
|
return |
||||||
|
} |
||||||
|
|
||||||
|
// Parse reads a EOSE envelope in minified JSON into a newly allocated
|
||||||
|
// eoseenvelope.T.
|
||||||
|
func Parse(b []byte) (t *T, rem []byte, err error) { |
||||||
|
t = New() |
||||||
|
if rem, err = t.Unmarshal(b); chk.E(err) { |
||||||
|
return |
||||||
|
} |
||||||
|
return |
||||||
|
} |
||||||
@ -0,0 +1,66 @@ |
|||||||
|
package eoseenvelope |
||||||
|
|
||||||
|
import ( |
||||||
|
"testing" |
||||||
|
|
||||||
|
"lol.mleku.dev/chk" |
||||||
|
"next.orly.dev/pkg/encoders/envelopes" |
||||||
|
"next.orly.dev/pkg/utils" |
||||||
|
) |
||||||
|
|
||||||
|
func TestMarshalUnmarshal(t *testing.T) { |
||||||
|
var err error |
||||||
|
rb, rb1, rb2 := make([]byte, 0, 65535), make([]byte, 0, 65535), make( |
||||||
|
[]byte, 0, 65535, |
||||||
|
) |
||||||
|
for i := range 1000 { |
||||||
|
s := utils.NewSubscription(i) |
||||||
|
req := NewFrom(s) |
||||||
|
rb = req.Marshal(rb) |
||||||
|
// log.I.Ln(req.ID)
|
||||||
|
rb1 = rb1[:len(rb)] |
||||||
|
copy(rb1, rb) |
||||||
|
var rem []byte |
||||||
|
var l string |
||||||
|
if l, rb, err = envelopes.Identify(rb); chk.E(err) { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
if l != L { |
||||||
|
t.Fatalf("invalid sentinel %s, expect %s", l, L) |
||||||
|
} |
||||||
|
req2 := New() |
||||||
|
if rem, err = req2.Unmarshal(rb); chk.E(err) { |
||||||
|
t.Fatal(err) |
||||||
|
} |
||||||
|
// log.I.Ln(req2.ID)
|
||||||
|
if len(rem) > 0 { |
||||||
|
t.Fatalf( |
||||||
|
"unmarshal failed, remainder\n%d %s", |
||||||
|
len(rem), rem, |
||||||
|
) |
||||||
|
} |
||||||
|
rb2 = req2.Marshal(rb2) |
||||||
|
if !utils.FastEqual(rb1, rb2) { |
||||||
|
if len(rb1) != len(rb2) { |
||||||
|
t.Fatalf( |
||||||
|
"unmarshal failed, different lengths\n%d %s\n%d %s\n", |
||||||
|
len(rb1), rb1, len(rb2), rb2, |
||||||
|
) |
||||||
|
} |
||||||
|
for i := range rb1 { |
||||||
|
if rb1[i] != rb2[i] { |
||||||
|
t.Fatalf( |
||||||
|
"unmarshal failed, difference at position %d\n%d %s\n%s\n%d %s\n%s\n", |
||||||
|
i, len(rb1), rb1[:i], rb1[i:], len(rb2), rb2[:i], |
||||||
|
rb2[i:], |
||||||
|
) |
||||||
|
} |
||||||
|
} |
||||||
|
t.Fatalf( |
||||||
|
"unmarshal failed\n%d %s\n%d %s\n", |
||||||
|
len(rb1), rb1, len(rb2), rb2, |
||||||
|
) |
||||||
|
} |
||||||
|
rb, rb1, rb2 = rb[:0], rb1[:0], rb2[:0] |
||||||
|
} |
||||||
|
} |
||||||
Loading…
Reference in new issue