You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
272 lines
6.6 KiB
272 lines
6.6 KiB
package database |
|
|
|
import ( |
|
"bytes" |
|
"sort" |
|
|
|
"github.com/dgraph-io/badger/v4" |
|
"lol.mleku.dev/chk" |
|
"lol.mleku.dev/log" |
|
"next.orly.dev/pkg/database/indexes" |
|
"next.orly.dev/pkg/database/indexes/types" |
|
"next.orly.dev/pkg/encoders/event" |
|
"next.orly.dev/pkg/encoders/ints" |
|
) |
|
|
|
const ( |
|
currentVersion uint32 = 2 |
|
) |
|
|
|
func (d *D) RunMigrations() { |
|
var err error |
|
var dbVersion uint32 |
|
// first find the current version tag if any |
|
if err = d.View( |
|
func(txn *badger.Txn) (err error) { |
|
buf := new(bytes.Buffer) |
|
if err = indexes.VersionEnc(nil).MarshalWrite(buf); chk.E(err) { |
|
return |
|
} |
|
verPrf := new(bytes.Buffer) |
|
if _, err = indexes.VersionPrefix.Write(verPrf); chk.E(err) { |
|
return |
|
} |
|
it := txn.NewIterator( |
|
badger.IteratorOptions{ |
|
Prefix: verPrf.Bytes(), |
|
}, |
|
) |
|
defer it.Close() |
|
ver := indexes.VersionVars() |
|
for it.Rewind(); it.Valid(); it.Next() { |
|
// there should only be one |
|
item := it.Item() |
|
key := item.Key() |
|
if err = indexes.VersionDec(ver).UnmarshalRead( |
|
bytes.NewBuffer(key), |
|
); chk.E(err) { |
|
return |
|
} |
|
log.I.F("found version tag: %d", ver.Get()) |
|
dbVersion = ver.Get() |
|
} |
|
return |
|
}, |
|
); chk.E(err) { |
|
} |
|
if dbVersion == 0 { |
|
log.D.F("no version tag found, creating...") |
|
// write the version tag now (ensure any old tags are removed first) |
|
if err = d.writeVersionTag(currentVersion); chk.E(err) { |
|
return |
|
} |
|
} |
|
if dbVersion < 1 { |
|
log.I.F("migrating to version 1...") |
|
// the first migration is expiration tags |
|
d.UpdateExpirationTags() |
|
// bump to version 1 |
|
_ = d.writeVersionTag(1) |
|
} |
|
if dbVersion < 2 { |
|
log.I.F("migrating to version 2...") |
|
// backfill word indexes |
|
d.UpdateWordIndexes() |
|
// bump to version 2 |
|
_ = d.writeVersionTag(2) |
|
} |
|
} |
|
|
|
// writeVersionTag writes a new version tag key to the database (no value) |
|
func (d *D) writeVersionTag(ver uint32) (err error) { |
|
return d.Update( |
|
func(txn *badger.Txn) (err error) { |
|
// delete any existing version keys first (there should only be one, but be safe) |
|
verPrf := new(bytes.Buffer) |
|
if _, err = indexes.VersionPrefix.Write(verPrf); chk.E(err) { |
|
return |
|
} |
|
it := txn.NewIterator(badger.IteratorOptions{Prefix: verPrf.Bytes()}) |
|
defer it.Close() |
|
for it.Rewind(); it.Valid(); it.Next() { |
|
item := it.Item() |
|
key := item.KeyCopy(nil) |
|
if err = txn.Delete(key); chk.E(err) { |
|
return |
|
} |
|
} |
|
|
|
// now write the new version key |
|
buf := new(bytes.Buffer) |
|
vv := new(types.Uint32) |
|
vv.Set(ver) |
|
if err = indexes.VersionEnc(vv).MarshalWrite(buf); chk.E(err) { |
|
return |
|
} |
|
return txn.Set(buf.Bytes(), nil) |
|
}, |
|
) |
|
} |
|
|
|
func (d *D) UpdateWordIndexes() { |
|
log.T.F("updating word indexes...") |
|
var err error |
|
var wordIndexes [][]byte |
|
// iterate all events and generate word index keys from content and tags |
|
if err = d.View( |
|
func(txn *badger.Txn) (err error) { |
|
prf := new(bytes.Buffer) |
|
if err = indexes.EventEnc(nil).MarshalWrite(prf); chk.E(err) { |
|
return |
|
} |
|
it := txn.NewIterator(badger.IteratorOptions{Prefix: prf.Bytes()}) |
|
defer it.Close() |
|
for it.Rewind(); it.Valid(); it.Next() { |
|
item := it.Item() |
|
var val []byte |
|
if val, err = item.ValueCopy(nil); chk.E(err) { |
|
continue |
|
} |
|
// decode the event |
|
ev := new(event.E) |
|
if err = ev.UnmarshalBinary(bytes.NewBuffer(val)); chk.E(err) { |
|
continue |
|
} |
|
// log.I.F("updating word indexes for event: %s", ev.Serialize()) |
|
// read serial from key |
|
key := item.Key() |
|
ser := indexes.EventVars() |
|
if err = indexes.EventDec(ser).UnmarshalRead(bytes.NewBuffer(key)); chk.E(err) { |
|
continue |
|
} |
|
// collect unique word hashes for this event |
|
seen := make(map[string]struct{}) |
|
// from content |
|
if len(ev.Content) > 0 { |
|
for _, h := range TokenHashes(ev.Content) { |
|
seen[string(h)] = struct{}{} |
|
} |
|
} |
|
// from all tag fields (key and values) |
|
if ev.Tags != nil && ev.Tags.Len() > 0 { |
|
for _, t := range *ev.Tags { |
|
for _, field := range t.T { |
|
if len(field) == 0 { |
|
continue |
|
} |
|
for _, h := range TokenHashes(field) { |
|
seen[string(h)] = struct{}{} |
|
} |
|
} |
|
} |
|
} |
|
// build keys |
|
for k := range seen { |
|
w := new(types.Word) |
|
w.FromWord([]byte(k)) |
|
buf := new(bytes.Buffer) |
|
if err = indexes.WordEnc( |
|
w, ser, |
|
).MarshalWrite(buf); chk.E(err) { |
|
continue |
|
} |
|
wordIndexes = append(wordIndexes, buf.Bytes()) |
|
} |
|
} |
|
return |
|
}, |
|
); chk.E(err) { |
|
return |
|
} |
|
// sort the indexes for ordered writes |
|
sort.Slice( |
|
wordIndexes, func(i, j int) bool { |
|
return bytes.Compare( |
|
wordIndexes[i], wordIndexes[j], |
|
) < 0 |
|
}, |
|
) |
|
// write in a batch |
|
batch := d.NewWriteBatch() |
|
for _, v := range wordIndexes { |
|
if err = batch.Set(v, nil); chk.E(err) { |
|
continue |
|
} |
|
} |
|
_ = batch.Flush() |
|
log.T.F("finished updating word indexes...") |
|
} |
|
|
|
func (d *D) UpdateExpirationTags() { |
|
log.T.F("updating expiration tag indexes...") |
|
var err error |
|
var expIndexes [][]byte |
|
// iterate all event records and decode and look for version tags |
|
if err = d.View( |
|
func(txn *badger.Txn) (err error) { |
|
prf := new(bytes.Buffer) |
|
if err = indexes.EventEnc(nil).MarshalWrite(prf); chk.E(err) { |
|
return |
|
} |
|
it := txn.NewIterator(badger.IteratorOptions{Prefix: prf.Bytes()}) |
|
defer it.Close() |
|
for it.Rewind(); it.Valid(); it.Next() { |
|
item := it.Item() |
|
var val []byte |
|
if val, err = item.ValueCopy(nil); chk.E(err) { |
|
continue |
|
} |
|
// decode the event |
|
ev := new(event.E) |
|
if err = ev.UnmarshalBinary(bytes.NewBuffer(val)); chk.E(err) { |
|
continue |
|
} |
|
expTag := ev.Tags.GetFirst([]byte("expiration")) |
|
if expTag == nil { |
|
continue |
|
} |
|
expTS := ints.New(0) |
|
if _, err = expTS.Unmarshal(expTag.Value()); chk.E(err) { |
|
continue |
|
} |
|
key := item.Key() |
|
ser := indexes.EventVars() |
|
if err = indexes.EventDec(ser).UnmarshalRead( |
|
bytes.NewBuffer(key), |
|
); chk.E(err) { |
|
continue |
|
} |
|
// create the expiration tag |
|
exp, _ := indexes.ExpirationVars() |
|
exp.Set(expTS.N) |
|
expBuf := new(bytes.Buffer) |
|
if err = indexes.ExpirationEnc( |
|
exp, ser, |
|
).MarshalWrite(expBuf); chk.E(err) { |
|
continue |
|
} |
|
expIndexes = append(expIndexes, expBuf.Bytes()) |
|
} |
|
return |
|
}, |
|
); chk.E(err) { |
|
return |
|
} |
|
// sort the indexes first so they're written in order, improving compaction |
|
// and iteration. |
|
sort.Slice( |
|
expIndexes, func(i, j int) bool { |
|
return bytes.Compare(expIndexes[i], expIndexes[j]) < 0 |
|
}, |
|
) |
|
// write the collected indexes |
|
batch := d.NewWriteBatch() |
|
for _, v := range expIndexes { |
|
if err = batch.Set(v, nil); chk.E(err) { |
|
continue |
|
} |
|
} |
|
if err = batch.Flush(); chk.E(err) { |
|
return |
|
} |
|
}
|
|
|