From a7dfa1da6435a90e12b1aa66fac49f7272680d06 Mon Sep 17 00:00:00 2001 From: kevin <35275952+kaladinlight@users.noreply.github.com> Date: Mon, 16 Sep 2024 12:39:50 -0600 Subject: [PATCH] feat: index affiliate fees across multiple addresses (#1037) --- go/coinstacks/thorchain/api/affiliateFees.go | 150 ++++++++++--------- go/coinstacks/thorchain/api/handler.go | 8 +- go/coinstacks/thorchain/api/swagger.json | 13 +- 3 files changed, 94 insertions(+), 77 deletions(-) diff --git a/go/coinstacks/thorchain/api/affiliateFees.go b/go/coinstacks/thorchain/api/affiliateFees.go index 5df41e876..501f44978 100644 --- a/go/coinstacks/thorchain/api/affiliateFees.go +++ b/go/coinstacks/thorchain/api/affiliateFees.go @@ -23,20 +23,20 @@ import ( ) const ( - blockWorkers = 10 - resultWorkers = 100 - pageSize = 50 - affiliateAddress = "thor1xmaggkcln5m5fnha2780xrdrulmplvfrz6wj3l" + blockWorkers = 10 + resultWorkers = 100 + pageSize = 50 ) type AffiliateFeeIndexer struct { - AffiliateFees []*AffiliateFee - conf cosmos.Config - httpClients []*cosmos.HTTPClient - mu sync.Mutex - pageChs []chan int - resultChs []chan *coretypes.ResultBlockSearch - wg sync.WaitGroup + AffiliateAddresses []string + AffiliateFees []*AffiliateFee + conf cosmos.Config + httpClients []*cosmos.HTTPClient + mu sync.Mutex + pageChs []chan int + resultChs []chan *coretypes.ResultBlockSearch + wg sync.WaitGroup } type AffiliateFee struct { @@ -50,22 +50,25 @@ type AffiliateFee struct { } func NewAffiliateFeeIndexer(conf cosmos.Config, httpClients []*cosmos.HTTPClient) *AffiliateFeeIndexer { - pageChs := make([]chan int, len(httpClients)) + affiliateAddresses := []string{"thor1xmaggkcln5m5fnha2780xrdrulmplvfrz6wj3l", "thor1crs0y53jfg224mettqeg883e6ume49tllktg2s"} + + pageChs := make([]chan int, len(httpClients)*len(affiliateAddresses)) for i := range pageChs { pageChs[i] = make(chan int) } - resultChs := make([]chan *coretypes.ResultBlockSearch, len(httpClients)) + resultChs := make([]chan *coretypes.ResultBlockSearch, len(httpClients)*len(affiliateAddresses)) for i := range resultChs { resultChs[i] = make(chan *coretypes.ResultBlockSearch) } return &AffiliateFeeIndexer{ - AffiliateFees: []*AffiliateFee{}, - conf: conf, - httpClients: httpClients, - pageChs: pageChs, - resultChs: resultChs, + AffiliateAddresses: affiliateAddresses, + AffiliateFees: []*AffiliateFee{}, + conf: conf, + httpClients: httpClients, + pageChs: pageChs, + resultChs: resultChs, } } @@ -85,56 +88,64 @@ func (i *AffiliateFeeIndexer) Sync() error { g := new(errgroup.Group) - for j, httpClient := range i.httpClients { - httpClient := httpClient - resultCh := i.resultChs[j] - pageCh := i.pageChs[j] + idx := 0 + for _, affiliateAddress := range i.AffiliateAddresses { + affiliateAddress := affiliateAddress - g.Go(func() error { - for w := 0; w < resultWorkers; w++ { - go func() { - for result := range resultCh { - for _, b := range result.Blocks { - block := b.Block - i.handleBlock(httpClient, block) + for _, httpClient := range i.httpClients { + httpClient := httpClient + + resultCh := i.resultChs[idx] + pageCh := i.pageChs[idx] + + idx++ + + g.Go(func() error { + for w := 0; w < resultWorkers; w++ { + go func() { + for result := range resultCh { + for _, b := range result.Blocks { + block := b.Block + i.handleBlock(httpClient, block, affiliateAddress) + } } - } - }() - } + }() + } - result, err := httpClient.BlockSearch(fmt.Sprintf(`"outbound.to='%s'"`, "thor1xmaggkcln5m5fnha2780xrdrulmplvfrz6wj3l"), 1, pageSize) - if err != nil { - return err - } + result, err := httpClient.BlockSearch(fmt.Sprintf(`"outbound.to='%s'"`, affiliateAddress), 1, pageSize) + if err != nil { + return err + } - maxPages := int(math.Ceil(float64(result.TotalCount) / float64(pageSize))) - resultCh <- result + maxPages := int(math.Ceil(float64(result.TotalCount) / float64(pageSize))) + resultCh <- result - i.wg.Add(blockWorkers) - for w := 0; w < blockWorkers; w++ { - go i.fetchBlocks(httpClient, pageCh, resultCh) - } + i.wg.Add(blockWorkers) + for w := 0; w < blockWorkers; w++ { + go i.fetchBlocks(httpClient, affiliateAddress, pageCh, resultCh) + } - go func() { - page := 2 - for { - if page > maxPages { - close(pageCh) - break + go func() { + page := 2 + for { + if page > maxPages { + close(pageCh) + break + } + pageCh <- page + page++ } - pageCh <- page - page++ - } - }() + }() - i.wg.Wait() + i.wg.Wait() - return nil - }) - } + return nil + }) + } - if err := g.Wait(); err != nil { - return err + if err := g.Wait(); err != nil { + return err + } } logger.Infof("Finished indexing affiliate fees (%s)", time.Since(start)) @@ -211,11 +222,11 @@ func (i *AffiliateFeeIndexer) listen() error { return nil } -func (i *AffiliateFeeIndexer) fetchBlocks(httpClient *cosmos.HTTPClient, pageCh <-chan int, resultCh chan<- *coretypes.ResultBlockSearch) { +func (i *AffiliateFeeIndexer) fetchBlocks(httpClient *cosmos.HTTPClient, affiliateAddress string, pageCh <-chan int, resultCh chan<- *coretypes.ResultBlockSearch) { defer i.wg.Done() for page := range pageCh { - result, err := httpClient.BlockSearch(fmt.Sprintf(`"outbound.to='%s'"`, "thor1xmaggkcln5m5fnha2780xrdrulmplvfrz6wj3l"), page, pageSize) + result, err := httpClient.BlockSearch(fmt.Sprintf(`"outbound.to='%s'"`, affiliateAddress), page, pageSize) if err != nil { logger.Panicf("failed to fetch blocks for page: %d: %+v", page, err) } @@ -224,7 +235,7 @@ func (i *AffiliateFeeIndexer) fetchBlocks(httpClient *cosmos.HTTPClient, pageCh } } -func (i *AffiliateFeeIndexer) handleBlock(httpClient *cosmos.HTTPClient, block *tmtypes.Block) { +func (i *AffiliateFeeIndexer) handleBlock(httpClient *cosmos.HTTPClient, block *tmtypes.Block, affiliateAddress string) { blockResult, err := httpClient.BlockResults(int(block.Height)) if err != nil { logger.Panicf("failed to handle block: %d: %+v", block.Height, err) @@ -234,7 +245,7 @@ func (i *AffiliateFeeIndexer) handleBlock(httpClient *cosmos.HTTPClient, block * Block: block, } - i.processAffiliateFees(b, blockResult.EndBlockEvents) + i.processAffiliateFees(b, blockResult.EndBlockEvents, []string{affiliateAddress}) } func (i *AffiliateFeeIndexer) handleNewBlockHeader(newBlockHeader types.EventDataNewBlockHeader) { @@ -242,10 +253,10 @@ func (i *AffiliateFeeIndexer) handleNewBlockHeader(newBlockHeader types.EventDat EventDataNewBlockHeader: newBlockHeader, } - i.processAffiliateFees(b, newBlockHeader.ResultEndBlock.Events) + i.processAffiliateFees(b, newBlockHeader.ResultEndBlock.Events, i.AffiliateAddresses) } -func (i *AffiliateFeeIndexer) processAffiliateFees(block thorchain.Block, endBlockEvents []abci.Event) { +func (i *AffiliateFeeIndexer) processAffiliateFees(block thorchain.Block, endBlockEvents []abci.Event, affiliateAddresses []string) { _, typedEvents, err := thorchain.ParseBlockEvents(endBlockEvents) if err != nil { logger.Panicf("failed to parse block events for block: %d: %+v", block.Height(), err) @@ -269,10 +280,13 @@ func (i *AffiliateFeeIndexer) processAffiliateFees(block thorchain.Block, endBlo continue } - if affiliateFee.Address == affiliateAddress { - i.mu.Lock() - i.AffiliateFees = append(i.AffiliateFees, affiliateFee) - i.mu.Unlock() + for _, affiliateAddress := range affiliateAddresses { + if affiliateFee.Address == affiliateAddress { + i.mu.Lock() + i.AffiliateFees = append(i.AffiliateFees, affiliateFee) + i.mu.Unlock() + break + } } } } diff --git a/go/coinstacks/thorchain/api/handler.go b/go/coinstacks/thorchain/api/handler.go index 8f151e7f4..a399bc61b 100644 --- a/go/coinstacks/thorchain/api/handler.go +++ b/go/coinstacks/thorchain/api/handler.go @@ -85,9 +85,9 @@ func (h *Handler) GetTxHistory(pubkey string, cursor string, pageSize int) (api. // Contains info about the affiliate revenue earned // swagger:model AffiliateRevenue type AffiliateRevenue struct { - // Affiliate address + // Affiliate addresses // required: true - Address string `json:"address"` + Addresses []string `json:"addresses"` // Amount earned (RUNE) // required: true Amount string `json:"amount"` @@ -104,8 +104,8 @@ func (h *Handler) GetAffiliateRevenue(start int, end int) (*AffiliateRevenue, er } a := &AffiliateRevenue{ - Address: affiliateAddress, - Amount: total.String(), + Addresses: h.indexer.AffiliateAddresses, + Amount: total.String(), } return a, nil diff --git a/go/coinstacks/thorchain/api/swagger.json b/go/coinstacks/thorchain/api/swagger.json index fbd6366ec..e618c3dc8 100644 --- a/go/coinstacks/thorchain/api/swagger.json +++ b/go/coinstacks/thorchain/api/swagger.json @@ -334,14 +334,17 @@ "description": "Contains info about the affiliate revenue earned", "type": "object", "required": [ - "address", + "addresses", "amount" ], "properties": { - "address": { - "description": "Affiliate address", - "type": "string", - "x-go-name": "Address" + "addresses": { + "description": "Affiliate addresses", + "type": "array", + "items": { + "type": "string" + }, + "x-go-name": "Addresses" }, "amount": { "description": "Amount earned (RUNE)",