Skip to content
This repository has been archived by the owner on Jan 2, 2025. It is now read-only.

Activity Feed #1636

Merged
merged 13 commits into from
Feb 14, 2024
58 changes: 58 additions & 0 deletions backend/core/crypto.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package core

import (
"crypto/aes"
"crypto/cipher"
"crypto/ed25519"
"crypto/rand"
"encoding"
Expand All @@ -13,6 +15,14 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
)

const (
// KeyBytes is the length of GCM key.
KeyBytes = 32

// NonceBytes is the length of GCM nonce.
NonceBytes = 12
)

// Ensure interface implementations.
var (
_ Verifier = PublicKey{}
Expand Down Expand Up @@ -225,6 +235,54 @@ func (kp KeyPair) SignatureSize() int {
}
}

// Encrypt uses private key of the key pair and performs AES-256 GCM encryption on plaintext.
func (kp KeyPair) Encrypt(plaintext []byte) ([]byte, error) {
privKey, err := kp.k.Raw()
if err != nil {
return nil, err
}
block, err := aes.NewCipher(privKey[:KeyBytes])
if err != nil {
return nil, err
}
aesgcm, err := cipher.NewGCM(block)
if err != nil {
return nil, err
}
nonce := make([]byte, NonceBytes)
if _, err := rand.Read(nonce); err != nil {
return nil, err
}
ciphertext := aesgcm.Seal(nil, nonce, plaintext, nil)
ciphertext = append(nonce[:], ciphertext...)
return ciphertext, nil
}

// Decrypt uses private key of the key pair to perform AES-256 GCM decryption on ciphertext.
func (kp KeyPair) Decrypt(ciphertext []byte) ([]byte, error) {
privKey, err := kp.k.Raw()
if err != nil {
return nil, err
}
block, err := aes.NewCipher(privKey[:KeyBytes])
if err != nil {
return nil, err
}
aesgcm, err := cipher.NewGCM(block)
if err != nil {
return nil, err
}
if len(ciphertext) < NonceBytes {
return nil, fmt.Errorf("malformed cipher text")
}
nonce := ciphertext[:NonceBytes]
plain, err := aesgcm.Open(nil, nonce, ciphertext[NonceBytes:], nil)
if err != nil {
return nil, err
}
return plain, nil
}

// Wrapped returns the wrapped libp2p key.
func (kp KeyPair) Wrapped() crypto.PrivKey {
return kp.k
Expand Down
143 changes: 143 additions & 0 deletions backend/daemon/api/activity/v1alpha/activity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Package activity manages the activity feed.
package activity

import (
context "context"
"encoding/base64"
"fmt"
"math"
"mintter/backend/core"
activity "mintter/backend/genproto/activity/v1alpha"
"mintter/backend/pkg/dqb"
"mintter/backend/pkg/future"
"strconv"
"time"

"github.com/ipfs/go-cid"

"crawshaw.io/sqlite"
"crawshaw.io/sqlite/sqlitex"
"google.golang.org/protobuf/types/known/timestamppb"
)

// Repo is a subset of the [ondisk.OnDisk] used by this server.
type Repo interface {
Device() core.KeyPair
Identity() *future.ReadOnly[core.Identity]
CommitAccount(core.PublicKey) error
}

// Server implements the Activity gRPC API.
type Server struct {
me *future.ReadOnly[core.Identity]
db *sqlitex.Pool
startTime time.Time
}

// NewServer creates a new Server.
func NewServer(id *future.ReadOnly[core.Identity], db *sqlitex.Pool) *Server {
return &Server{
db: db,
startTime: time.Now(),
me: id,
}
}

// ListEvents list all the events seen locally.
func (srv *Server) ListEvents(ctx context.Context, req *activity.ListEventsRequest) (*activity.ListEventsResponse, error) {
me, ok := srv.me.Get()
if !ok {
return nil, fmt.Errorf("account is not initialized yet")
}
var cursorBlobID int64 = math.MaxInt32
var err error
if req.PageToken != "" {
pageTokenBytes, _ := base64.StdEncoding.DecodeString(req.PageToken)
if err != nil {
return nil, fmt.Errorf("Token encoding not valid: %w", err)
}
clearPageToken, err := me.DeviceKey().Decrypt(pageTokenBytes)
if err != nil {
return nil, fmt.Errorf("Token not valid: %w", err)
}
pageToken, err := strconv.ParseUint(string(clearPageToken), 10, 32)
if err != nil {
return nil, fmt.Errorf("Token not valid: %w", err)
}
cursorBlobID = int64(pageToken)
}
conn, cancel, err := srv.db.Conn(ctx)
if err != nil {
return nil, err
}
defer cancel()

var events []*activity.Event

var qGetEventsTrusted = dqb.Str(`
SELECT blobs.id, structural_blobs.type ,public_keys.principal, resources.iri, structural_blobs.ts, blobs.insert_time, blobs.multihash, blobs.codec
FROM structural_blobs
JOIN blobs ON blobs.id=structural_blobs.id
JOIN public_keys ON structural_blobs.author=public_keys.id
LEFT JOIN resources ON structural_blobs.resource=resources.id
JOIN trusted_accounts ON trusted_accounts.id=public_keys.id
WHERE blobs.id <= :idx ORDER BY blobs.id desc limit :page_token;
`)
var qGetEventsAll = dqb.Str(`
SELECT blobs.id, structural_blobs.type ,public_keys.principal, resources.iri, structural_blobs.ts, blobs.insert_time, blobs.multihash, blobs.codec
FROM structural_blobs
JOIN blobs ON blobs.id=structural_blobs.id
JOIN public_keys ON structural_blobs.author=public_keys.id
LEFT JOIN resources ON structural_blobs.resource=resources.id
WHERE blobs.id <= :idx ORDER BY blobs.id desc limit :page_token;
`)
query := qGetEventsAll()
if req.TrustedOnly {
query = qGetEventsTrusted()
}
var lastBlobID int64
err = sqlitex.Exec(conn, query, func(stmt *sqlite.Stmt) error {
lastBlobID = stmt.ColumnInt64(0)
eventType := stmt.ColumnText(1)
author := stmt.ColumnBytes(2)
resource := stmt.ColumnText(3)
eventTime := stmt.ColumnInt64(4) * 1000 //Its in microseconds and we need nanos
observeTime := stmt.ColumnInt64(5)
mhash := stmt.ColumnBytes(6)
codec := stmt.ColumnInt64(7)
accountID := core.Principal(author).String()
id := cid.NewCidV1(uint64(codec), mhash)
if eventType == "Comment" {
resource = "hm://c/" + id.String()
}
event := activity.Event{
Data: &activity.Event_NewBlob{NewBlob: &activity.NewBlobEvent{
Cid: id.String(),
BlobType: eventType,
Author: accountID,
Resource: resource,
}},
Account: accountID,
EventTime: &timestamppb.Timestamp{Seconds: eventTime / 1000000000, Nanos: int32(eventTime % 1000000000)},
ObserveTime: &timestamppb.Timestamp{Seconds: observeTime},
}
events = append(events, &event)
return nil
}, cursorBlobID, req.PageSize)
if err != nil {
return nil, fmt.Errorf("Problem collecting activity feed, Probably no feed or token out of range: %w", err)
}
var PageTokenStr string

pageToken, err := me.DeviceKey().Encrypt([]byte(strconv.Itoa(int(lastBlobID - 1))))
if err != nil {
return nil, err
}
if lastBlobID != 0 && req.PageSize == int32(len(events)) {
PageTokenStr = base64.StdEncoding.EncodeToString(pageToken)
}
return &activity.ListEventsResponse{
Events: events,
NextPageToken: PageTokenStr,
}, err
}
40 changes: 40 additions & 0 deletions backend/daemon/api/activity/v1alpha/activity_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package activity

import (
context "context"
"mintter/backend/core"
"mintter/backend/core/coretest"
"mintter/backend/daemon/storage"
activity "mintter/backend/genproto/activity/v1alpha"
"mintter/backend/pkg/future"
"testing"

"github.com/stretchr/testify/require"
)

func TestListEvents(t *testing.T) {
alice := newTestServer(t, "alice")
ctx := context.Background()

req := &activity.ListEventsRequest{
PageSize: 5,
PageToken: "",
}
events, err := alice.ListEvents(ctx, req)
require.NoError(t, err)
require.NotNil(t, events)
require.Len(t, events.Events, 0)
}

// TODO: update profile idempotent no change

func newTestServer(t *testing.T, name string) *Server {
u := coretest.NewTester(name)
//repo := daemontest.MakeTestRepo(t, u)
db := storage.MakeTestDB(t)
//blobs := hyper.NewStorage(db, logging.New("mintter/hyper", "debug"))
fut := future.New[core.Identity]()
require.NoError(t, fut.Resolve(u.Identity))

return NewServer(fut.ReadOnly, db)
}
3 changes: 3 additions & 0 deletions backend/daemon/api/apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
accounts "mintter/backend/daemon/api/accounts/v1alpha"
activity "mintter/backend/daemon/api/activity/v1alpha"
daemon "mintter/backend/daemon/api/daemon/v1alpha"
documents "mintter/backend/daemon/api/documents/v1alpha"
entities "mintter/backend/daemon/api/entities/v1alpha"
Expand All @@ -30,6 +31,7 @@ type Server struct {
Networking *networking.Server
Entities *entities.Server
Groups *groups.Server
Activity *activity.Server
}

// New creates a new API server.
Expand Down Expand Up @@ -61,6 +63,7 @@ func New(
documentsSrv := documents.NewServer(repo.Identity(), db, &lazyDiscoverer{sync: sync, net: node}, &lazyGwClient{net: node}, LogLevel)
return Server{
Accounts: accounts.NewServer(repo.Identity(), blobs),
Activity: activity.NewServer(repo.Identity(), db),
Daemon: daemon.NewServer(repo, blobs, wallet, doSync),
Documents: documentsSrv,
Networking: networking.NewServer(blobs, node),
Expand Down
2 changes: 2 additions & 0 deletions backend/daemon/api/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
accounts "mintter/backend/genproto/accounts/v1alpha"
activity "mintter/backend/genproto/activity/v1alpha"
daemon "mintter/backend/genproto/daemon/v1alpha"
documents "mintter/backend/genproto/documents/v1alpha"
entities "mintter/backend/genproto/entities/v1alpha"
Expand All @@ -22,6 +23,7 @@ func (s Server) Register(srv *grpc.Server) {
documents.RegisterChangesServer(srv, s.Documents)
documents.RegisterCommentsServer(srv, s.Documents)

activity.RegisterActivityFeedServer(srv, s.Activity)
networking.RegisterNetworkingServer(srv, s.Networking)
entities.RegisterEntitiesServer(srv, s.Entities)
groups.RegisterGroupsServer(srv, s.Groups)
Expand Down
Loading
Loading