Skip to content

Commit

Permalink
Merge pull request #13 from tryfix/index_store_fixes
Browse files Browse the repository at this point in the history
Index store fixes
  • Loading branch information
gmbyapa authored Jun 11, 2020
2 parents 0cbd561 + 5f4e981 commit 2935d43
Show file tree
Hide file tree
Showing 29 changed files with 770 additions and 166 deletions.
35 changes: 21 additions & 14 deletions backend/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ type memoryRecord struct {
}

type config struct {
MetricsReporter metrics.Reporter
Logger log.Logger
ExpiredRecordCleanupInterval time.Duration
MetricsReporter metrics.Reporter
Logger log.Logger
}

func NewConfig() *config {
Expand All @@ -35,6 +36,10 @@ func NewConfig() *config {
}

func (c *config) parse() {
if c.ExpiredRecordCleanupInterval == time.Duration(0) {
c.ExpiredRecordCleanupInterval = time.Second
}

if c.Logger == nil {
c.Logger = log.NewNoopLogger()
}
Expand All @@ -45,9 +50,10 @@ func (c *config) parse() {
}

type memory struct {
records *sync.Map
logger log.Logger
metrics struct {
expiredRecordCleanupInterval time.Duration
records *sync.Map
logger log.Logger
metrics struct {
readLatency metrics.Observer
updateLatency metrics.Observer
deleteLatency metrics.Observer
Expand All @@ -57,29 +63,30 @@ type memory struct {

func Builder(config *config) backend.Builder {
return func(name string) (backend backend.Backend, err error) {
return NewMemoryBackend(config.Logger, config.MetricsReporter), nil
return NewMemoryBackend(config), nil
}
}

func NewMemoryBackend(logger log.Logger, reporter metrics.Reporter) backend.Backend {
func NewMemoryBackend(config *config) backend.Backend {

m := &memory{
logger: logger,
records: new(sync.Map),
expiredRecordCleanupInterval: config.ExpiredRecordCleanupInterval,
logger: config.Logger,
records: new(sync.Map),
}

labels := []string{`name`, `type`}
m.metrics.readLatency = reporter.Observer(metrics.MetricConf{Path: `backend_read_latency_microseconds`, Labels: labels})
m.metrics.updateLatency = reporter.Observer(metrics.MetricConf{Path: `backend_update_latency_microseconds`, Labels: labels})
m.metrics.storageSize = reporter.Gauge(metrics.MetricConf{Path: `backend_storage_size`, Labels: labels})
m.metrics.deleteLatency = reporter.Observer(metrics.MetricConf{Path: `backend_delete_latency_microseconds`, Labels: labels})
m.metrics.readLatency = config.MetricsReporter.Observer(metrics.MetricConf{Path: `backend_read_latency_microseconds`, Labels: labels})
m.metrics.updateLatency = config.MetricsReporter.Observer(metrics.MetricConf{Path: `backend_update_latency_microseconds`, Labels: labels})
m.metrics.storageSize = config.MetricsReporter.Gauge(metrics.MetricConf{Path: `backend_storage_size`, Labels: labels})
m.metrics.deleteLatency = config.MetricsReporter.Observer(metrics.MetricConf{Path: `backend_delete_latency_microseconds`, Labels: labels})

go m.runCleaner()
return m
}

func (m *memory) runCleaner() {
ticker := time.NewTicker(1 * time.Millisecond)
ticker := time.NewTicker(m.expiredRecordCleanupInterval)
for range ticker.C {
records := m.snapshot()
for _, record := range records {
Expand Down
20 changes: 16 additions & 4 deletions backend/memory/memory_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ import (
)

func BenchmarkMemory_Set(b *testing.B) {
backend := NewMemoryBackend(log.Constructor.Log(), metrics.NoopReporter())
conf := NewConfig()
conf.Logger = log.NewNoopLogger()
conf.MetricsReporter = metrics.NoopReporter()
backend := NewMemoryBackend(conf)

b.ResetTimer()
b.ReportAllocs()
Expand All @@ -30,7 +33,10 @@ func BenchmarkMemory_Set(b *testing.B) {
}

func BenchmarkMemory_Get(b *testing.B) {
backend := NewMemoryBackend(log.Constructor.Log(), metrics.NoopReporter())
conf := NewConfig()
conf.Logger = log.NewNoopLogger()
conf.MetricsReporter = metrics.NoopReporter()
backend := NewMemoryBackend(conf)
numOfRecs := 1000000
for i := 1; i <= numOfRecs; i++ {
if err := backend.Set([]byte(fmt.Sprint(i)), []byte(`100`), 0); err != nil {
Expand All @@ -53,7 +59,10 @@ func BenchmarkMemory_Get(b *testing.B) {
}

func BenchmarkMemory_GetSet(b *testing.B) {
backend := NewMemoryBackend(log.Constructor.Log(), metrics.NoopReporter())
conf := NewConfig()
conf.Logger = log.NewNoopLogger()
conf.MetricsReporter = metrics.NoopReporter()
backend := NewMemoryBackend(conf)

for i := 1; i <= 99999; i++ {
if err := backend.Set([]byte(fmt.Sprint(rand.Intn(1000)+1)), []byte(`100`), 0); err != nil {
Expand All @@ -79,7 +88,10 @@ func BenchmarkMemory_GetSet(b *testing.B) {
}

func BenchmarkMemory_Iterator(b *testing.B) {
backend := NewMemoryBackend(log.Constructor.Log(), metrics.NoopReporter())
conf := NewConfig()
conf.Logger = log.NewNoopLogger()
conf.MetricsReporter = metrics.NoopReporter()
backend := NewMemoryBackend(conf)

for i := 1; i <= 999999; i++ {
if err := backend.Set([]byte(fmt.Sprint(rand.Intn(999999)+1)), []byte(`100`), 0); err != nil {
Expand Down
16 changes: 13 additions & 3 deletions backend/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ import (
)

func TestMemory_Set_Expiry(t *testing.T) {
backend := NewMemoryBackend(log.Constructor.Log(), metrics.NoopReporter())
conf := NewConfig()
conf.ExpiredRecordCleanupInterval = 1 * time.Millisecond
conf.Logger = log.NewNoopLogger()
conf.MetricsReporter = metrics.NoopReporter()
backend := NewMemoryBackend(conf)
if err := backend.Set([]byte(`100`), []byte(`100`), 10*time.Millisecond); err != nil {
log.Fatal(err)
}
Expand All @@ -34,7 +38,10 @@ func TestMemory_Set_Expiry(t *testing.T) {
}

func TestMemory_Get(t *testing.T) {
backend := NewMemoryBackend(log.Constructor.Log(), metrics.NoopReporter())
conf := NewConfig()
conf.Logger = log.NewNoopLogger()
conf.MetricsReporter = metrics.NoopReporter()
backend := NewMemoryBackend(conf)

for i := 1; i <= 1000; i++ {
if err := backend.Set([]byte(fmt.Sprint(i)), []byte(`100`), 0); err != nil {
Expand All @@ -56,7 +63,10 @@ func TestMemory_Get(t *testing.T) {
}

func TestMemory_Delete(t *testing.T) {
backend := NewMemoryBackend(log.Constructor.Log(), metrics.NoopReporter())
conf := NewConfig()
conf.Logger = log.NewNoopLogger()
conf.MetricsReporter = metrics.NoopReporter()
backend := NewMemoryBackend(conf)

if err := backend.Set([]byte(`100`), []byte(`100`), 0); err != nil {
t.Fatal(err)
Expand Down
5 changes: 4 additions & 1 deletion backend/memory/partition_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ func NewPartitionMemoryBackend(partitions int, logger log.Logger, reporter metri
}

for i := 0; i < partitions; i++ {
backend := NewMemoryBackend(logger, reporter)
conf := NewConfig()
conf.Logger = logger
conf.MetricsReporter = reporter
backend := NewMemoryBackend(conf)
partitionedBackend.partitions[i] = backend
}

Expand Down
2 changes: 2 additions & 0 deletions examples/example_1/encoders/encoders.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (

var KeyEncoder = func() encoding.Encoder { return Int64Encoder{} }

var UuidKeyEncoder = func() encoding.Encoder { return UuidEncoder{} }

var TransactionReceivedEncoder = func() encoding.Encoder { return TransactionEncoder{} }

var AccountCreditedEncoder = func() encoding.Encoder { return events.AccountCredited{} }
Expand Down
28 changes: 23 additions & 5 deletions examples/example_1/encoders/int64_encoder.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package encoders

import (
"github.com/google/uuid"
"github.com/tryfix/errors"
"reflect"
"strconv"
Expand All @@ -19,10 +20,7 @@ func (Int64Encoder) Encode(v interface{}) ([]byte, error) {
i = int64(j)
}

/*byt := make([]byte, 4)
binary.BigEndian.PutUint32(byt, uint32(i))*/

return []byte(strconv.FormatInt(i,10)), nil
return []byte(strconv.FormatInt(i, 10)), nil
}

func (Int64Encoder) Decode(data []byte) (interface{}, error) {
Expand All @@ -32,5 +30,25 @@ func (Int64Encoder) Decode(data []byte) (interface{}, error) {
}

return i, nil
//return int(binary.BigEndian.Uint32(data)), nil
}

type UuidEncoder struct{}

func (UuidEncoder) Encode(v interface{}) ([]byte, error) {
i, ok := v.(uuid.UUID)
if !ok {
return nil, errors.Errorf(`invalid type [%v] expected int64`, reflect.TypeOf(v))
}

return i.MarshalText()
}

func (UuidEncoder) Decode(data []byte) (interface{}, error) {
uid := uuid.UUID{}
err := uid.UnmarshalText(data)
if err != nil {
return nil, errors.WithPrevious(err, `cannot decode data`)
}

return uid, nil
}
26 changes: 14 additions & 12 deletions examples/example_1/events/account_details_updated.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
package events

import "encoding/json"
import (
"encoding/json"
"github.com/google/uuid"
)

type AccountDetailsUpdated struct {
ID string `json:"id"`
Type string `json:"type"`
Body struct {
AccountNo int64 `json:"account_no"`
AccountType string `json:"account_type"`
CustomerID int64 `json:"customer_id"`
Branch string `json:"branch"`
BranchCode int `json:"branch_code"`
UpdatedAt int64 `json:"updated_at"`
AccountNo int64 `json:"account_no"`
AccountType string `json:"account_type"`
CustomerID uuid.UUID `json:"customer_id"`
Branch string `json:"branch"`
BranchCode int `json:"branch_code"`
UpdatedAt int64 `json:"updated_at"`
} `json:"body"`
Timestamp int64 `json:"timestamp"`
}

func (a AccountDetailsUpdated) Encode(data interface{}) ([]byte, error) {
b, err := json.Marshal(data)
if err != nil{
return nil,err
if err != nil {
return nil, err
}

return b, nil
Expand All @@ -28,9 +31,8 @@ func (a AccountDetailsUpdated) Encode(data interface{}) ([]byte, error) {
func (a AccountDetailsUpdated) Decode(data []byte) (interface{}, error) {
ad := AccountDetailsUpdated{}
err := json.Unmarshal(data, &ad)
if err != nil{
return nil,err
if err != nil {
return nil, err
}
return ad, nil
}

26 changes: 14 additions & 12 deletions examples/example_1/events/customer_profile.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
package events

import "encoding/json"
import (
"encoding/json"
"github.com/google/uuid"
)

type CustomerProfileUpdated struct {
ID string `json:"id"`
Type string `json:"type"`
ID uuid.UUID `json:"id"`
Type string `json:"type"`
Body struct {
CustomerID int64 `json:"customer_id"`
CustomerName string `json:"customer_name"`
NIC string `json:"nic"`
CustomerID uuid.UUID `json:"customer_id"`
CustomerName string `json:"customer_name"`
NIC string `json:"nic"`
ContactDetails struct {
Phone string `json:"phone"`
Email string `json:"email"`
Address string `json:"address"`
} `json:"contact_details"`
DateOfBirth string `json:"date_of_birth"`
UpdatedAt int64 `json:"updated_at"`
UpdatedAt int64 `json:"updated_at"`
} `json:"body"`
Timestamp int64 `json:"timestamp"`
}

func (c CustomerProfileUpdated) Encode(data interface{}) ([]byte, error) {
b, err := json.Marshal(data)
if err != nil{
return nil,err
if err != nil {
return nil, err
}

return b, nil
Expand All @@ -32,9 +35,8 @@ func (c CustomerProfileUpdated) Encode(data interface{}) ([]byte, error) {
func (c CustomerProfileUpdated) Decode(data []byte) (interface{}, error) {
cp := CustomerProfileUpdated{}
err := json.Unmarshal(data, &cp)
if err != nil{
return nil,err
if err != nil {
return nil, err
}
return cp, nil
}

24 changes: 13 additions & 11 deletions examples/example_1/events/message.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
package events

import "encoding/json"
import (
"encoding/json"
"github.com/google/uuid"
)

type MessageCreated struct {
ID string `json:"id"`
Type string `json:"type"`
Body struct {
CustomerID int64 `json:"customer_id"`
Text string `json:"text"`
Phone string `json:"phone"`
Email string `json:"email"`
Address string `json:"address"`
CustomerID uuid.UUID `json:"customer_id"`
Text string `json:"text"`
Phone string `json:"phone"`
Email string `json:"email"`
Address string `json:"address"`
} `json:"body"`
Timestamp int64 `json:"timestamp"`
}

func (m MessageCreated) Encode(data interface{}) ([]byte, error) {
b, err := json.Marshal(data)
if err != nil{
return nil,err
if err != nil {
return nil, err
}

return b, nil
Expand All @@ -27,9 +30,8 @@ func (m MessageCreated) Encode(data interface{}) ([]byte, error) {
func (m MessageCreated) Decode(data []byte) (interface{}, error) {
mc := MessageCreated{}
err := json.Unmarshal(data, &mc)
if err != nil{
return nil,err
if err != nil {
return nil, err
}
return mc, nil
}

1 change: 0 additions & 1 deletion examples/example_1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,5 @@ package main
import "github.com/tryfix/kstream/examples/example_1/stream"

func main() {

stream.Init()
}
Loading

0 comments on commit 2935d43

Please sign in to comment.