You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
731 lines
27 KiB
731 lines
27 KiB
package main |
|
|
|
import ( |
|
"context" |
|
"io" |
|
|
|
"google.golang.org/grpc/codes" |
|
"google.golang.org/grpc/status" |
|
"lol.mleku.dev/chk" |
|
"lol.mleku.dev/log" |
|
|
|
"next.orly.dev/pkg/database" |
|
orlydbv1 "next.orly.dev/pkg/proto/orlydb/v1" |
|
) |
|
|
|
// DatabaseService implements the orlydbv1.DatabaseServiceServer interface. |
|
type DatabaseService struct { |
|
orlydbv1.UnimplementedDatabaseServiceServer |
|
db database.Database |
|
cfg *Config |
|
} |
|
|
|
// NewDatabaseService creates a new database service. |
|
func NewDatabaseService(db database.Database, cfg *Config) *DatabaseService { |
|
return &DatabaseService{ |
|
db: db, |
|
cfg: cfg, |
|
} |
|
} |
|
|
|
// === Lifecycle Methods === |
|
|
|
func (s *DatabaseService) GetPath(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.PathResponse, error) { |
|
return &orlydbv1.PathResponse{Path: s.db.Path()}, nil |
|
} |
|
|
|
func (s *DatabaseService) Sync(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.Empty, error) { |
|
if err := s.db.Sync(); err != nil { |
|
return nil, status.Errorf(codes.Internal, "sync failed: %v", err) |
|
} |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
func (s *DatabaseService) Ready(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.ReadyResponse, error) { |
|
// Check if ready channel is closed |
|
select { |
|
case <-s.db.Ready(): |
|
return &orlydbv1.ReadyResponse{Ready: true}, nil |
|
default: |
|
return &orlydbv1.ReadyResponse{Ready: false}, nil |
|
} |
|
} |
|
|
|
func (s *DatabaseService) SetLogLevel(ctx context.Context, req *orlydbv1.SetLogLevelRequest) (*orlydbv1.Empty, error) { |
|
s.db.SetLogLevel(req.Level) |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
// === Event Storage === |
|
|
|
func (s *DatabaseService) SaveEvent(ctx context.Context, req *orlydbv1.SaveEventRequest) (*orlydbv1.SaveEventResponse, error) { |
|
ev := orlydbv1.ProtoToEvent(req.Event) |
|
exists, err := s.db.SaveEvent(ctx, ev) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "save event failed: %v", err) |
|
} |
|
return &orlydbv1.SaveEventResponse{Exists: exists}, nil |
|
} |
|
|
|
func (s *DatabaseService) GetSerialsFromFilter(ctx context.Context, req *orlydbv1.GetSerialsFromFilterRequest) (*orlydbv1.SerialList, error) { |
|
f := orlydbv1.ProtoToFilter(req.Filter) |
|
serials, err := s.db.GetSerialsFromFilter(f) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "get serials failed: %v", err) |
|
} |
|
return orlydbv1.Uint40sToProto(serials), nil |
|
} |
|
|
|
func (s *DatabaseService) WouldReplaceEvent(ctx context.Context, req *orlydbv1.WouldReplaceEventRequest) (*orlydbv1.WouldReplaceEventResponse, error) { |
|
ev := orlydbv1.ProtoToEvent(req.Event) |
|
wouldReplace, replacedSerials, err := s.db.WouldReplaceEvent(ev) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "would replace check failed: %v", err) |
|
} |
|
resp := &orlydbv1.WouldReplaceEventResponse{ |
|
WouldReplace: wouldReplace, |
|
} |
|
for _, ser := range replacedSerials { |
|
resp.ReplacedSerials = append(resp.ReplacedSerials, ser.Get()) |
|
} |
|
return resp, nil |
|
} |
|
|
|
// === Event Queries (Streaming) === |
|
|
|
func (s *DatabaseService) QueryEvents(req *orlydbv1.QueryEventsRequest, stream orlydbv1.DatabaseService_QueryEventsServer) error { |
|
f := orlydbv1.ProtoToFilter(req.Filter) |
|
events, err := s.db.QueryEvents(stream.Context(), f) |
|
if err != nil { |
|
return status.Errorf(codes.Internal, "query events failed: %v", err) |
|
} |
|
return s.streamEvents(orlydbv1.EventsToProto(events), stream) |
|
} |
|
|
|
func (s *DatabaseService) QueryAllVersions(req *orlydbv1.QueryEventsRequest, stream orlydbv1.DatabaseService_QueryAllVersionsServer) error { |
|
f := orlydbv1.ProtoToFilter(req.Filter) |
|
events, err := s.db.QueryAllVersions(stream.Context(), f) |
|
if err != nil { |
|
return status.Errorf(codes.Internal, "query all versions failed: %v", err) |
|
} |
|
return s.streamEvents(orlydbv1.EventsToProto(events), stream) |
|
} |
|
|
|
func (s *DatabaseService) QueryEventsWithOptions(req *orlydbv1.QueryEventsWithOptionsRequest, stream orlydbv1.DatabaseService_QueryEventsWithOptionsServer) error { |
|
f := orlydbv1.ProtoToFilter(req.Filter) |
|
events, err := s.db.QueryEventsWithOptions(stream.Context(), f, req.IncludeDeleteEvents, req.ShowAllVersions) |
|
if err != nil { |
|
return status.Errorf(codes.Internal, "query events with options failed: %v", err) |
|
} |
|
return s.streamEvents(orlydbv1.EventsToProto(events), stream) |
|
} |
|
|
|
func (s *DatabaseService) QueryDeleteEventsByTargetId(req *orlydbv1.QueryDeleteEventsByTargetIdRequest, stream orlydbv1.DatabaseService_QueryDeleteEventsByTargetIdServer) error { |
|
events, err := s.db.QueryDeleteEventsByTargetId(stream.Context(), req.TargetEventId) |
|
if err != nil { |
|
return status.Errorf(codes.Internal, "query delete events failed: %v", err) |
|
} |
|
return s.streamEvents(orlydbv1.EventsToProto(events), stream) |
|
} |
|
|
|
func (s *DatabaseService) QueryForSerials(ctx context.Context, req *orlydbv1.QueryEventsRequest) (*orlydbv1.SerialList, error) { |
|
f := orlydbv1.ProtoToFilter(req.Filter) |
|
serials, err := s.db.QueryForSerials(ctx, f) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "query for serials failed: %v", err) |
|
} |
|
return orlydbv1.Uint40sToProto(serials), nil |
|
} |
|
|
|
func (s *DatabaseService) QueryForIds(ctx context.Context, req *orlydbv1.QueryEventsRequest) (*orlydbv1.IdPkTsList, error) { |
|
f := orlydbv1.ProtoToFilter(req.Filter) |
|
idPkTs, err := s.db.QueryForIds(ctx, f) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "query for ids failed: %v", err) |
|
} |
|
return orlydbv1.IdPkTsListToProto(idPkTs), nil |
|
} |
|
|
|
func (s *DatabaseService) CountEvents(ctx context.Context, req *orlydbv1.QueryEventsRequest) (*orlydbv1.CountEventsResponse, error) { |
|
f := orlydbv1.ProtoToFilter(req.Filter) |
|
count, approximate, err := s.db.CountEvents(ctx, f) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "count events failed: %v", err) |
|
} |
|
return &orlydbv1.CountEventsResponse{ |
|
Count: int32(count), |
|
Approximate: approximate, |
|
}, nil |
|
} |
|
|
|
// === Event Retrieval by Serial === |
|
|
|
func (s *DatabaseService) FetchEventBySerial(ctx context.Context, req *orlydbv1.FetchEventBySerialRequest) (*orlydbv1.FetchEventBySerialResponse, error) { |
|
ser := orlydbv1.ProtoToUint40(&orlydbv1.Uint40{Value: req.Serial}) |
|
ev, err := s.db.FetchEventBySerial(ser) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "fetch event by serial failed: %v", err) |
|
} |
|
return &orlydbv1.FetchEventBySerialResponse{ |
|
Event: orlydbv1.EventToProto(ev), |
|
Found: ev != nil, |
|
}, nil |
|
} |
|
|
|
func (s *DatabaseService) FetchEventsBySerials(ctx context.Context, req *orlydbv1.FetchEventsBySerialRequest) (*orlydbv1.EventMap, error) { |
|
serials := orlydbv1.ProtoToUint40s(&orlydbv1.SerialList{Serials: req.Serials}) |
|
events, err := s.db.FetchEventsBySerials(serials) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "fetch events by serials failed: %v", err) |
|
} |
|
return orlydbv1.EventMapToProto(events), nil |
|
} |
|
|
|
func (s *DatabaseService) GetSerialById(ctx context.Context, req *orlydbv1.GetSerialByIdRequest) (*orlydbv1.GetSerialByIdResponse, error) { |
|
ser, err := s.db.GetSerialById(req.Id) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "get serial by id failed: %v", err) |
|
} |
|
if ser == nil { |
|
return &orlydbv1.GetSerialByIdResponse{Found: false}, nil |
|
} |
|
return &orlydbv1.GetSerialByIdResponse{ |
|
Serial: ser.Get(), |
|
Found: true, |
|
}, nil |
|
} |
|
|
|
func (s *DatabaseService) GetSerialsByIds(ctx context.Context, req *orlydbv1.GetSerialsByIdsRequest) (*orlydbv1.SerialMap, error) { |
|
// Convert request IDs to tag format |
|
ids := orlydbv1.BytesToTag(req.Ids) |
|
serials, err := s.db.GetSerialsByIds(ids) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "get serials by ids failed: %v", err) |
|
} |
|
result := &orlydbv1.SerialMap{ |
|
Serials: make(map[string]uint64), |
|
} |
|
for k, v := range serials { |
|
if v != nil { |
|
result.Serials[k] = v.Get() |
|
} |
|
} |
|
return result, nil |
|
} |
|
|
|
func (s *DatabaseService) GetSerialsByRange(ctx context.Context, req *orlydbv1.GetSerialsByRangeRequest) (*orlydbv1.SerialList, error) { |
|
r := orlydbv1.ProtoToRange(req.Range) |
|
serials, err := s.db.GetSerialsByRange(r) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "get serials by range failed: %v", err) |
|
} |
|
return orlydbv1.Uint40sToProto(serials), nil |
|
} |
|
|
|
func (s *DatabaseService) GetFullIdPubkeyBySerial(ctx context.Context, req *orlydbv1.GetFullIdPubkeyBySerialRequest) (*orlydbv1.IdPkTs, error) { |
|
ser := orlydbv1.ProtoToUint40(&orlydbv1.Uint40{Value: req.Serial}) |
|
idPkTs, err := s.db.GetFullIdPubkeyBySerial(ser) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "get full id pubkey by serial failed: %v", err) |
|
} |
|
return orlydbv1.IdPkTsToProto(idPkTs), nil |
|
} |
|
|
|
func (s *DatabaseService) GetFullIdPubkeyBySerials(ctx context.Context, req *orlydbv1.GetFullIdPubkeyBySerialsRequest) (*orlydbv1.IdPkTsList, error) { |
|
serials := orlydbv1.ProtoToUint40s(&orlydbv1.SerialList{Serials: req.Serials}) |
|
idPkTs, err := s.db.GetFullIdPubkeyBySerials(serials) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "get full id pubkey by serials failed: %v", err) |
|
} |
|
return orlydbv1.IdPkTsListToProto(idPkTs), nil |
|
} |
|
|
|
// === Event Deletion === |
|
|
|
func (s *DatabaseService) DeleteEvent(ctx context.Context, req *orlydbv1.DeleteEventRequest) (*orlydbv1.Empty, error) { |
|
if err := s.db.DeleteEvent(ctx, req.EventId); err != nil { |
|
return nil, status.Errorf(codes.Internal, "delete event failed: %v", err) |
|
} |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
func (s *DatabaseService) DeleteEventBySerial(ctx context.Context, req *orlydbv1.DeleteEventBySerialRequest) (*orlydbv1.Empty, error) { |
|
ser := orlydbv1.ProtoToUint40(&orlydbv1.Uint40{Value: req.Serial}) |
|
ev := orlydbv1.ProtoToEvent(req.Event) |
|
if err := s.db.DeleteEventBySerial(ctx, ser, ev); err != nil { |
|
return nil, status.Errorf(codes.Internal, "delete event by serial failed: %v", err) |
|
} |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
func (s *DatabaseService) DeleteExpired(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.Empty, error) { |
|
s.db.DeleteExpired() |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
func (s *DatabaseService) ProcessDelete(ctx context.Context, req *orlydbv1.ProcessDeleteRequest) (*orlydbv1.Empty, error) { |
|
ev := orlydbv1.ProtoToEvent(req.Event) |
|
if err := s.db.ProcessDelete(ev, req.Admins); err != nil { |
|
return nil, status.Errorf(codes.Internal, "process delete failed: %v", err) |
|
} |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
func (s *DatabaseService) CheckForDeleted(ctx context.Context, req *orlydbv1.CheckForDeletedRequest) (*orlydbv1.Empty, error) { |
|
ev := orlydbv1.ProtoToEvent(req.Event) |
|
if err := s.db.CheckForDeleted(ev, req.Admins); err != nil { |
|
return nil, status.Errorf(codes.Internal, "check for deleted failed: %v", err) |
|
} |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
// === Import/Export === |
|
|
|
func (s *DatabaseService) Import(stream orlydbv1.DatabaseService_ImportServer) error { |
|
pr, pw := io.Pipe() |
|
|
|
// Goroutine to read from gRPC stream and write to pipe |
|
go func() { |
|
defer pw.Close() |
|
for { |
|
chunk, err := stream.Recv() |
|
if err == io.EOF { |
|
return |
|
} |
|
if err != nil { |
|
log.E.F("import stream error: %v", err) |
|
pw.CloseWithError(err) |
|
return |
|
} |
|
if _, err := pw.Write(chunk.Data); chk.E(err) { |
|
return |
|
} |
|
} |
|
}() |
|
|
|
// Import from pipe |
|
s.db.Import(pr) |
|
|
|
return stream.SendAndClose(&orlydbv1.ImportResponse{ |
|
EventsImported: 0, // TODO: Track count |
|
EventsSkipped: 0, |
|
}) |
|
} |
|
|
|
func (s *DatabaseService) Export(req *orlydbv1.ExportRequest, stream orlydbv1.DatabaseService_ExportServer) error { |
|
pr, pw := io.Pipe() |
|
|
|
// Goroutine to export to pipe |
|
go func() { |
|
defer pw.Close() |
|
s.db.Export(stream.Context(), pw, req.Pubkeys...) |
|
}() |
|
|
|
// Read from pipe and send to stream |
|
buf := make([]byte, 64*1024) // 64KB chunks |
|
for { |
|
n, err := pr.Read(buf) |
|
if err == io.EOF { |
|
return nil |
|
} |
|
if err != nil { |
|
return status.Errorf(codes.Internal, "export failed: %v", err) |
|
} |
|
if err := stream.Send(&orlydbv1.ExportChunk{Data: buf[:n]}); err != nil { |
|
return err |
|
} |
|
} |
|
} |
|
|
|
func (s *DatabaseService) ImportEventsFromStrings(ctx context.Context, req *orlydbv1.ImportEventsFromStringsRequest) (*orlydbv1.ImportResponse, error) { |
|
// Note: We can't pass policy manager over gRPC, so we pass nil |
|
if err := s.db.ImportEventsFromStrings(ctx, req.EventJsons, nil); err != nil { |
|
return nil, status.Errorf(codes.Internal, "import events from strings failed: %v", err) |
|
} |
|
return &orlydbv1.ImportResponse{ |
|
EventsImported: int64(len(req.EventJsons)), |
|
}, nil |
|
} |
|
|
|
// === Relay Identity === |
|
|
|
func (s *DatabaseService) GetRelayIdentitySecret(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.GetRelayIdentitySecretResponse, error) { |
|
secret, err := s.db.GetRelayIdentitySecret() |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "get relay identity secret failed: %v", err) |
|
} |
|
return &orlydbv1.GetRelayIdentitySecretResponse{SecretKey: secret}, nil |
|
} |
|
|
|
func (s *DatabaseService) SetRelayIdentitySecret(ctx context.Context, req *orlydbv1.SetRelayIdentitySecretRequest) (*orlydbv1.Empty, error) { |
|
if err := s.db.SetRelayIdentitySecret(req.SecretKey); err != nil { |
|
return nil, status.Errorf(codes.Internal, "set relay identity secret failed: %v", err) |
|
} |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
func (s *DatabaseService) GetOrCreateRelayIdentitySecret(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.GetRelayIdentitySecretResponse, error) { |
|
secret, err := s.db.GetOrCreateRelayIdentitySecret() |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "get or create relay identity secret failed: %v", err) |
|
} |
|
return &orlydbv1.GetRelayIdentitySecretResponse{SecretKey: secret}, nil |
|
} |
|
|
|
// === Markers === |
|
|
|
func (s *DatabaseService) SetMarker(ctx context.Context, req *orlydbv1.SetMarkerRequest) (*orlydbv1.Empty, error) { |
|
if err := s.db.SetMarker(req.Key, req.Value); err != nil { |
|
return nil, status.Errorf(codes.Internal, "set marker failed: %v", err) |
|
} |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
func (s *DatabaseService) GetMarker(ctx context.Context, req *orlydbv1.GetMarkerRequest) (*orlydbv1.GetMarkerResponse, error) { |
|
value, err := s.db.GetMarker(req.Key) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "get marker failed: %v", err) |
|
} |
|
return &orlydbv1.GetMarkerResponse{ |
|
Value: value, |
|
Found: value != nil, |
|
}, nil |
|
} |
|
|
|
func (s *DatabaseService) HasMarker(ctx context.Context, req *orlydbv1.HasMarkerRequest) (*orlydbv1.HasMarkerResponse, error) { |
|
exists := s.db.HasMarker(req.Key) |
|
return &orlydbv1.HasMarkerResponse{Exists: exists}, nil |
|
} |
|
|
|
func (s *DatabaseService) DeleteMarker(ctx context.Context, req *orlydbv1.DeleteMarkerRequest) (*orlydbv1.Empty, error) { |
|
if err := s.db.DeleteMarker(req.Key); err != nil { |
|
return nil, status.Errorf(codes.Internal, "delete marker failed: %v", err) |
|
} |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
// === Subscriptions === |
|
|
|
func (s *DatabaseService) GetSubscription(ctx context.Context, req *orlydbv1.GetSubscriptionRequest) (*orlydbv1.Subscription, error) { |
|
sub, err := s.db.GetSubscription(req.Pubkey) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "get subscription failed: %v", err) |
|
} |
|
return orlydbv1.SubscriptionToProto(sub, req.Pubkey), nil |
|
} |
|
|
|
func (s *DatabaseService) IsSubscriptionActive(ctx context.Context, req *orlydbv1.IsSubscriptionActiveRequest) (*orlydbv1.IsSubscriptionActiveResponse, error) { |
|
active, err := s.db.IsSubscriptionActive(req.Pubkey) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "is subscription active failed: %v", err) |
|
} |
|
return &orlydbv1.IsSubscriptionActiveResponse{Active: active}, nil |
|
} |
|
|
|
func (s *DatabaseService) ExtendSubscription(ctx context.Context, req *orlydbv1.ExtendSubscriptionRequest) (*orlydbv1.Empty, error) { |
|
if err := s.db.ExtendSubscription(req.Pubkey, int(req.Days)); err != nil { |
|
return nil, status.Errorf(codes.Internal, "extend subscription failed: %v", err) |
|
} |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
func (s *DatabaseService) RecordPayment(ctx context.Context, req *orlydbv1.RecordPaymentRequest) (*orlydbv1.Empty, error) { |
|
if err := s.db.RecordPayment(req.Pubkey, req.Amount, req.Invoice, req.Preimage); err != nil { |
|
return nil, status.Errorf(codes.Internal, "record payment failed: %v", err) |
|
} |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
func (s *DatabaseService) GetPaymentHistory(ctx context.Context, req *orlydbv1.GetPaymentHistoryRequest) (*orlydbv1.PaymentList, error) { |
|
payments, err := s.db.GetPaymentHistory(req.Pubkey) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "get payment history failed: %v", err) |
|
} |
|
return orlydbv1.PaymentListToProto(payments), nil |
|
} |
|
|
|
func (s *DatabaseService) ExtendBlossomSubscription(ctx context.Context, req *orlydbv1.ExtendBlossomSubscriptionRequest) (*orlydbv1.Empty, error) { |
|
if err := s.db.ExtendBlossomSubscription(req.Pubkey, req.Tier, req.StorageMb, int(req.DaysExtended)); err != nil { |
|
return nil, status.Errorf(codes.Internal, "extend blossom subscription failed: %v", err) |
|
} |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
func (s *DatabaseService) GetBlossomStorageQuota(ctx context.Context, req *orlydbv1.GetBlossomStorageQuotaRequest) (*orlydbv1.GetBlossomStorageQuotaResponse, error) { |
|
quota, err := s.db.GetBlossomStorageQuota(req.Pubkey) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "get blossom storage quota failed: %v", err) |
|
} |
|
return &orlydbv1.GetBlossomStorageQuotaResponse{QuotaMb: quota}, nil |
|
} |
|
|
|
func (s *DatabaseService) IsFirstTimeUser(ctx context.Context, req *orlydbv1.IsFirstTimeUserRequest) (*orlydbv1.IsFirstTimeUserResponse, error) { |
|
firstTime, err := s.db.IsFirstTimeUser(req.Pubkey) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "is first time user failed: %v", err) |
|
} |
|
return &orlydbv1.IsFirstTimeUserResponse{FirstTime: firstTime}, nil |
|
} |
|
|
|
// === NIP-43 === |
|
|
|
func (s *DatabaseService) AddNIP43Member(ctx context.Context, req *orlydbv1.AddNIP43MemberRequest) (*orlydbv1.Empty, error) { |
|
if err := s.db.AddNIP43Member(req.Pubkey, req.InviteCode); err != nil { |
|
return nil, status.Errorf(codes.Internal, "add NIP-43 member failed: %v", err) |
|
} |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
func (s *DatabaseService) RemoveNIP43Member(ctx context.Context, req *orlydbv1.RemoveNIP43MemberRequest) (*orlydbv1.Empty, error) { |
|
if err := s.db.RemoveNIP43Member(req.Pubkey); err != nil { |
|
return nil, status.Errorf(codes.Internal, "remove NIP-43 member failed: %v", err) |
|
} |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
func (s *DatabaseService) IsNIP43Member(ctx context.Context, req *orlydbv1.IsNIP43MemberRequest) (*orlydbv1.IsNIP43MemberResponse, error) { |
|
isMember, err := s.db.IsNIP43Member(req.Pubkey) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "is NIP-43 member failed: %v", err) |
|
} |
|
return &orlydbv1.IsNIP43MemberResponse{IsMember: isMember}, nil |
|
} |
|
|
|
func (s *DatabaseService) GetNIP43Membership(ctx context.Context, req *orlydbv1.GetNIP43MembershipRequest) (*orlydbv1.NIP43Membership, error) { |
|
membership, err := s.db.GetNIP43Membership(req.Pubkey) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "get NIP-43 membership failed: %v", err) |
|
} |
|
return orlydbv1.NIP43MembershipToProto(membership), nil |
|
} |
|
|
|
func (s *DatabaseService) GetAllNIP43Members(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.PubkeyList, error) { |
|
members, err := s.db.GetAllNIP43Members() |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "get all NIP-43 members failed: %v", err) |
|
} |
|
return &orlydbv1.PubkeyList{Pubkeys: members}, nil |
|
} |
|
|
|
func (s *DatabaseService) StoreInviteCode(ctx context.Context, req *orlydbv1.StoreInviteCodeRequest) (*orlydbv1.Empty, error) { |
|
expiresAt := orlydbv1.TimeFromUnix(req.ExpiresAt) |
|
if err := s.db.StoreInviteCode(req.Code, expiresAt); err != nil { |
|
return nil, status.Errorf(codes.Internal, "store invite code failed: %v", err) |
|
} |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
func (s *DatabaseService) ValidateInviteCode(ctx context.Context, req *orlydbv1.ValidateInviteCodeRequest) (*orlydbv1.ValidateInviteCodeResponse, error) { |
|
valid, err := s.db.ValidateInviteCode(req.Code) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "validate invite code failed: %v", err) |
|
} |
|
return &orlydbv1.ValidateInviteCodeResponse{Valid: valid}, nil |
|
} |
|
|
|
func (s *DatabaseService) DeleteInviteCode(ctx context.Context, req *orlydbv1.DeleteInviteCodeRequest) (*orlydbv1.Empty, error) { |
|
if err := s.db.DeleteInviteCode(req.Code); err != nil { |
|
return nil, status.Errorf(codes.Internal, "delete invite code failed: %v", err) |
|
} |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
func (s *DatabaseService) PublishNIP43MembershipEvent(ctx context.Context, req *orlydbv1.PublishNIP43MembershipEventRequest) (*orlydbv1.Empty, error) { |
|
if err := s.db.PublishNIP43MembershipEvent(int(req.Kind), req.Pubkey); err != nil { |
|
return nil, status.Errorf(codes.Internal, "publish NIP-43 membership event failed: %v", err) |
|
} |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
// === Query Cache === |
|
|
|
func (s *DatabaseService) GetCachedJSON(ctx context.Context, req *orlydbv1.GetCachedJSONRequest) (*orlydbv1.GetCachedJSONResponse, error) { |
|
f := orlydbv1.ProtoToFilter(req.Filter) |
|
jsonItems, found := s.db.GetCachedJSON(f) |
|
return &orlydbv1.GetCachedJSONResponse{ |
|
JsonItems: jsonItems, |
|
Found: found, |
|
}, nil |
|
} |
|
|
|
func (s *DatabaseService) CacheMarshaledJSON(ctx context.Context, req *orlydbv1.CacheMarshaledJSONRequest) (*orlydbv1.Empty, error) { |
|
f := orlydbv1.ProtoToFilter(req.Filter) |
|
s.db.CacheMarshaledJSON(f, req.JsonItems) |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
func (s *DatabaseService) GetCachedEvents(ctx context.Context, req *orlydbv1.GetCachedEventsRequest) (*orlydbv1.GetCachedEventsResponse, error) { |
|
f := orlydbv1.ProtoToFilter(req.Filter) |
|
events, found := s.db.GetCachedEvents(f) |
|
return &orlydbv1.GetCachedEventsResponse{ |
|
Events: orlydbv1.EventsToProto(events), |
|
Found: found, |
|
}, nil |
|
} |
|
|
|
func (s *DatabaseService) CacheEvents(ctx context.Context, req *orlydbv1.CacheEventsRequest) (*orlydbv1.Empty, error) { |
|
f := orlydbv1.ProtoToFilter(req.Filter) |
|
events := orlydbv1.ProtoToEvents(req.Events) |
|
s.db.CacheEvents(f, events) |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
func (s *DatabaseService) InvalidateQueryCache(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.Empty, error) { |
|
s.db.InvalidateQueryCache() |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
// === Access Tracking === |
|
|
|
func (s *DatabaseService) RecordEventAccess(ctx context.Context, req *orlydbv1.RecordEventAccessRequest) (*orlydbv1.Empty, error) { |
|
if err := s.db.RecordEventAccess(req.Serial, req.ConnectionId); err != nil { |
|
return nil, status.Errorf(codes.Internal, "record event access failed: %v", err) |
|
} |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
func (s *DatabaseService) GetEventAccessInfo(ctx context.Context, req *orlydbv1.GetEventAccessInfoRequest) (*orlydbv1.GetEventAccessInfoResponse, error) { |
|
lastAccess, accessCount, err := s.db.GetEventAccessInfo(req.Serial) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "get event access info failed: %v", err) |
|
} |
|
return &orlydbv1.GetEventAccessInfoResponse{ |
|
LastAccess: lastAccess, |
|
AccessCount: accessCount, |
|
}, nil |
|
} |
|
|
|
func (s *DatabaseService) GetLeastAccessedEvents(ctx context.Context, req *orlydbv1.GetLeastAccessedEventsRequest) (*orlydbv1.SerialList, error) { |
|
serials, err := s.db.GetLeastAccessedEvents(int(req.Limit), req.MinAgeSec) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "get least accessed events failed: %v", err) |
|
} |
|
return &orlydbv1.SerialList{Serials: serials}, nil |
|
} |
|
|
|
// === Utility === |
|
|
|
func (s *DatabaseService) EventIdsBySerial(ctx context.Context, req *orlydbv1.EventIdsBySerialRequest) (*orlydbv1.EventIdsBySerialResponse, error) { |
|
eventIds, err := s.db.EventIdsBySerial(req.Start, int(req.Count)) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "event ids by serial failed: %v", err) |
|
} |
|
return &orlydbv1.EventIdsBySerialResponse{EventIds: eventIds}, nil |
|
} |
|
|
|
func (s *DatabaseService) RunMigrations(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.Empty, error) { |
|
s.db.RunMigrations() |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
// === Blob Storage (Blossom) === |
|
|
|
func (s *DatabaseService) SaveBlob(ctx context.Context, req *orlydbv1.SaveBlobRequest) (*orlydbv1.Empty, error) { |
|
if err := s.db.SaveBlob(req.Sha256Hash, req.Data, req.Pubkey, req.MimeType, req.Extension); err != nil { |
|
return nil, status.Errorf(codes.Internal, "save blob failed: %v", err) |
|
} |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
func (s *DatabaseService) GetBlob(ctx context.Context, req *orlydbv1.GetBlobRequest) (*orlydbv1.GetBlobResponse, error) { |
|
data, metadata, err := s.db.GetBlob(req.Sha256Hash) |
|
if err != nil { |
|
// Return not found as a response, not an error |
|
return &orlydbv1.GetBlobResponse{Found: false}, nil |
|
} |
|
return &orlydbv1.GetBlobResponse{ |
|
Found: true, |
|
Data: data, |
|
Metadata: orlydbv1.BlobMetadataToProto(metadata), |
|
}, nil |
|
} |
|
|
|
func (s *DatabaseService) HasBlob(ctx context.Context, req *orlydbv1.HasBlobRequest) (*orlydbv1.HasBlobResponse, error) { |
|
exists, err := s.db.HasBlob(req.Sha256Hash) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "has blob failed: %v", err) |
|
} |
|
return &orlydbv1.HasBlobResponse{Exists: exists}, nil |
|
} |
|
|
|
func (s *DatabaseService) DeleteBlob(ctx context.Context, req *orlydbv1.DeleteBlobRequest) (*orlydbv1.Empty, error) { |
|
if err := s.db.DeleteBlob(req.Sha256Hash, req.Pubkey); err != nil { |
|
return nil, status.Errorf(codes.Internal, "delete blob failed: %v", err) |
|
} |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
func (s *DatabaseService) ListBlobs(ctx context.Context, req *orlydbv1.ListBlobsRequest) (*orlydbv1.ListBlobsResponse, error) { |
|
descriptors, err := s.db.ListBlobs(req.Pubkey, req.Since, req.Until) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "list blobs failed: %v", err) |
|
} |
|
return &orlydbv1.ListBlobsResponse{ |
|
Descriptors: orlydbv1.BlobDescriptorListToProto(descriptors), |
|
}, nil |
|
} |
|
|
|
func (s *DatabaseService) GetBlobMetadata(ctx context.Context, req *orlydbv1.GetBlobMetadataRequest) (*orlydbv1.BlobMetadata, error) { |
|
metadata, err := s.db.GetBlobMetadata(req.Sha256Hash) |
|
if err != nil { |
|
return nil, status.Errorf(codes.NotFound, "blob metadata not found: %v", err) |
|
} |
|
return orlydbv1.BlobMetadataToProto(metadata), nil |
|
} |
|
|
|
func (s *DatabaseService) GetTotalBlobStorageUsed(ctx context.Context, req *orlydbv1.GetTotalBlobStorageUsedRequest) (*orlydbv1.GetTotalBlobStorageUsedResponse, error) { |
|
totalMB, err := s.db.GetTotalBlobStorageUsed(req.Pubkey) |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "get total blob storage used failed: %v", err) |
|
} |
|
return &orlydbv1.GetTotalBlobStorageUsedResponse{TotalMb: totalMB}, nil |
|
} |
|
|
|
func (s *DatabaseService) SaveBlobReport(ctx context.Context, req *orlydbv1.SaveBlobReportRequest) (*orlydbv1.Empty, error) { |
|
if err := s.db.SaveBlobReport(req.Sha256Hash, req.ReportData); err != nil { |
|
return nil, status.Errorf(codes.Internal, "save blob report failed: %v", err) |
|
} |
|
return &orlydbv1.Empty{}, nil |
|
} |
|
|
|
func (s *DatabaseService) ListAllBlobUserStats(ctx context.Context, req *orlydbv1.Empty) (*orlydbv1.ListAllBlobUserStatsResponse, error) { |
|
stats, err := s.db.ListAllBlobUserStats() |
|
if err != nil { |
|
return nil, status.Errorf(codes.Internal, "list all blob user stats failed: %v", err) |
|
} |
|
return &orlydbv1.ListAllBlobUserStatsResponse{ |
|
Stats: orlydbv1.UserBlobStatsListToProto(stats), |
|
}, nil |
|
} |
|
|
|
// === Helper Methods === |
|
|
|
// streamEvents is a helper to stream events in batches. |
|
type eventStreamer interface { |
|
Send(*orlydbv1.EventBatch) error |
|
Context() context.Context |
|
} |
|
|
|
func (s *DatabaseService) streamEvents(events []*orlydbv1.Event, stream eventStreamer) error { |
|
batchSize := s.cfg.StreamBatchSize |
|
if batchSize == 0 { |
|
batchSize = 100 |
|
} |
|
|
|
for i := 0; i < len(events); i += batchSize { |
|
end := i + batchSize |
|
if end > len(events) { |
|
end = len(events) |
|
} |
|
|
|
batch := &orlydbv1.EventBatch{ |
|
Events: events[i:end], |
|
} |
|
|
|
if err := stream.Send(batch); err != nil { |
|
return err |
|
} |
|
} |
|
|
|
return nil |
|
}
|
|
|