Skip to content

Commit

Permalink
ingest add metrics ingested rows (#129)
Browse files Browse the repository at this point in the history
Co-authored-by: YongHao Hu <yonghao.hu@grabtaxi.com>
  • Loading branch information
atlas-comstock and hiohiograb authored May 12, 2023
1 parent cbdbb69 commit 63b422f
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions internal/server/server_ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (s *Server) Ingest(ctx context.Context, request *talaria.IngestRequest) (*t
return nil, errors.Internal("unable to read the block", err)
}

rowCount := 0
// If table supports streaming, then stream
if streamer, ok := t.(storage.Streamer); ok {
s := stream.Publish(streamer, s.monitor)
Expand All @@ -60,6 +61,7 @@ func (s *Server) Ingest(ctx context.Context, request *talaria.IngestRequest) (*t
if err != nil {
return nil, err
}
rowCount += len(rows)
for _, row := range rows {
_, err := s(row)
if err != nil {
Expand All @@ -71,13 +73,13 @@ func (s *Server) Ingest(ctx context.Context, request *talaria.IngestRequest) (*t
}

// Append all of the blocks
for _, block := range blocks {
if err := appender.Append(block); err != nil {
for _, blk := range blocks {
if err := appender.Append(blk); err != nil {
s.monitor.Count1(ctxTag, ingestErrorKey, "type:append")
return nil, err
}
}

s.monitor.Count("server", fmt.Sprintf("%s.ingest.row.count", t.Name()), int64(rowCount))
s.monitor.Count("server", fmt.Sprintf("%s.ingest.count", t.Name()), int64(len(blocks)))
}

Expand Down

0 comments on commit 63b422f

Please sign in to comment.