Skip to content

Commit

Permalink
add rocksdb (#12)
Browse files Browse the repository at this point in the history
* add rocksdb

* move codes

* dir update and misc fixes

* fix tests

* Update db.go

Co-Authored-By: Anton Kaliaev <anton.kalyaev@gmail.com>
  • Loading branch information
Stumble and melekes committed Sep 18, 2019
1 parent 94017c8 commit 37a6176
Show file tree
Hide file tree
Showing 3 changed files with 376 additions and 0 deletions.
5 changes: 5 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ const (
// - may be faster is some use-cases (random reads - indexer)
// - use boltdb build tag (go build -tags boltdb)
BoltDBBackend DBBackendType = "boltdb"
// RocksDBBackend represents rocksdb (uses Stumble fork github.com/stumble/gorocksdb of the tecbot wrapper)
// - EXPERIMENTAL
// - requires gcc
// - use rocksdb build tag (go build -tags rocksdb)
RocksDBBackend DBBackendType = "rocksdb"
)

type dbCreator func(name string, dir string) (DB, error)
Expand Down
341 changes: 341 additions & 0 deletions rocks_db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,341 @@
// +build rocksdb

package db

import (
"bytes"
"fmt"
"path/filepath"
"runtime"

"github.com/stumble/gorocksdb"
)

func init() {
dbCreator := func(name string, dir string) (DB, error) {
return NewRocksDB(name, dir)
}
registerDBCreator(RocksDBBackend, dbCreator, false)
}

var _ DB = (*RocksDB)(nil)

type RocksDB struct {
db *gorocksdb.DB
ro *gorocksdb.ReadOptions
wo *gorocksdb.WriteOptions
woSync *gorocksdb.WriteOptions
}

func NewRocksDB(name string, dir string) (*RocksDB, error) {
// default rocksdb option, good enough for most cases, including heavy workloads.
// 1GB table cache, 512MB write buffer(may use 50% more on heavy workloads).
// compression: snappy as default, need to -lsnappy to enable.
bbto := gorocksdb.NewDefaultBlockBasedTableOptions()
bbto.SetBlockCache(gorocksdb.NewLRUCache(1 << 30))
bbto.SetFilterPolicy(gorocksdb.NewBloomFilter(10))

opts := gorocksdb.NewDefaultOptions()
opts.SetBlockBasedTableFactory(bbto)
opts.SetCreateIfMissing(true)
opts.IncreaseParallelism(runtime.NumCPU())
// 1.5GB maximum memory use for writebuffer.
opts.OptimizeLevelStyleCompaction(512 * 1024 * 1024)
return NewRocksDBWithOptions(name, dir, opts)
}

func NewRocksDBWithOptions(name string, dir string, opts *gorocksdb.Options) (*RocksDB, error) {
dbPath := filepath.Join(dir, name+".db")
db, err := gorocksdb.OpenDb(opts, dbPath)
if err != nil {
return nil, err
}
ro := gorocksdb.NewDefaultReadOptions()
wo := gorocksdb.NewDefaultWriteOptions()
woSync := gorocksdb.NewDefaultWriteOptions()
woSync.SetSync(true)
database := &RocksDB{
db: db,
ro: ro,
wo: wo,
woSync: woSync,
}
return database, nil
}

// Implements DB.
func (db *RocksDB) Get(key []byte) []byte {
key = nonNilBytes(key)
res, err := db.db.Get(db.ro, key)
if err != nil {
panic(err)
}
return moveSliceToBytes(res)
}

// Implements DB.
func (db *RocksDB) Has(key []byte) bool {
return db.Get(key) != nil
}

// Implements DB.
func (db *RocksDB) Set(key []byte, value []byte) {
key = nonNilBytes(key)
value = nonNilBytes(value)
err := db.db.Put(db.wo, key, value)
if err != nil {
panic(err)
}
}

// Implements DB.
func (db *RocksDB) SetSync(key []byte, value []byte) {
key = nonNilBytes(key)
value = nonNilBytes(value)
err := db.db.Put(db.woSync, key, value)
if err != nil {
panic(err)
}
}

// Implements DB.
func (db *RocksDB) Delete(key []byte) {
key = nonNilBytes(key)
err := db.db.Delete(db.wo, key)
if err != nil {
panic(err)
}
}

// Implements DB.
func (db *RocksDB) DeleteSync(key []byte) {
key = nonNilBytes(key)
err := db.db.Delete(db.woSync, key)
if err != nil {
panic(err)
}
}

func (db *RocksDB) DB() *gorocksdb.DB {
return db.db
}

// Implements DB.
func (db *RocksDB) Close() {
db.ro.Destroy()
db.wo.Destroy()
db.woSync.Destroy()
db.db.Close()
}

// Implements DB.
func (db *RocksDB) Print() {
itr := db.Iterator(nil, nil)
defer itr.Close()
for ; itr.Valid(); itr.Next() {
key := itr.Key()
value := itr.Value()
fmt.Printf("[%X]:\t[%X]\n", key, value)
}
}

// Implements DB.
func (db *RocksDB) Stats() map[string]string {
keys := []string{"rocksdb.stats"}
stats := make(map[string]string, len(keys))
for _, key := range keys {
stats[key] = db.db.GetProperty(key)
}
return stats
}

//----------------------------------------
// Batch

// Implements DB.
func (db *RocksDB) NewBatch() Batch {
batch := gorocksdb.NewWriteBatch()
return &rocksDBBatch{db, batch}
}

type rocksDBBatch struct {
db *RocksDB
batch *gorocksdb.WriteBatch
}

// Implements Batch.
func (mBatch *rocksDBBatch) Set(key, value []byte) {
mBatch.batch.Put(key, value)
}

// Implements Batch.
func (mBatch *rocksDBBatch) Delete(key []byte) {
mBatch.batch.Delete(key)
}

// Implements Batch.
func (mBatch *rocksDBBatch) Write() {
err := mBatch.db.db.Write(mBatch.db.wo, mBatch.batch)
if err != nil {
panic(err)
}
}

// Implements Batch.
func (mBatch *rocksDBBatch) WriteSync() {
err := mBatch.db.db.Write(mBatch.db.woSync, mBatch.batch)
if err != nil {
panic(err)
}
}

// Implements Batch.
func (mBatch *rocksDBBatch) Close() {
mBatch.batch.Destroy()
}

//----------------------------------------
// Iterator
// NOTE This is almost identical to db/go_level_db.Iterator
// Before creating a third version, refactor.

func (db *RocksDB) Iterator(start, end []byte) Iterator {
itr := db.db.NewIterator(db.ro)
return newRocksDBIterator(itr, start, end, false)
}

func (db *RocksDB) ReverseIterator(start, end []byte) Iterator {
itr := db.db.NewIterator(db.ro)
return newRocksDBIterator(itr, start, end, true)
}

var _ Iterator = (*rocksDBIterator)(nil)

type rocksDBIterator struct {
source *gorocksdb.Iterator
start, end []byte
isReverse bool
isInvalid bool
}

func newRocksDBIterator(source *gorocksdb.Iterator, start, end []byte, isReverse bool) *rocksDBIterator {
if isReverse {
if end == nil {
source.SeekToLast()
} else {
source.Seek(end)
if source.Valid() {
eoakey := moveSliceToBytes(source.Key()) // end or after key
if bytes.Compare(end, eoakey) <= 0 {
source.Prev()
}
} else {
source.SeekToLast()
}
}
} else {
if start == nil {
source.SeekToFirst()
} else {
source.Seek(start)
}
}
return &rocksDBIterator{
source: source,
start: start,
end: end,
isReverse: isReverse,
isInvalid: false,
}
}

func (itr rocksDBIterator) Domain() ([]byte, []byte) {
return itr.start, itr.end
}

func (itr rocksDBIterator) Valid() bool {

// Once invalid, forever invalid.
if itr.isInvalid {
return false
}

// Panic on DB error. No way to recover.
itr.assertNoError()

// If source is invalid, invalid.
if !itr.source.Valid() {
itr.isInvalid = true
return false
}

// If key is end or past it, invalid.
var start = itr.start
var end = itr.end
var key = moveSliceToBytes(itr.source.Key())
if itr.isReverse {
if start != nil && bytes.Compare(key, start) < 0 {
itr.isInvalid = true
return false
}
} else {
if end != nil && bytes.Compare(end, key) <= 0 {
itr.isInvalid = true
return false
}
}

// It's valid.
return true
}

func (itr rocksDBIterator) Key() []byte {
itr.assertNoError()
itr.assertIsValid()
return moveSliceToBytes(itr.source.Key())
}

func (itr rocksDBIterator) Value() []byte {
itr.assertNoError()
itr.assertIsValid()
return moveSliceToBytes(itr.source.Value())
}

func (itr rocksDBIterator) Next() {
itr.assertNoError()
itr.assertIsValid()
if itr.isReverse {
itr.source.Prev()
} else {
itr.source.Next()
}
}

func (itr rocksDBIterator) Close() {
itr.source.Close()
}

func (itr rocksDBIterator) assertNoError() {
if err := itr.source.Err(); err != nil {
panic(err)
}
}

func (itr rocksDBIterator) assertIsValid() {
if !itr.Valid() {
panic("rocksDBIterator is invalid")
}
}

// moveSliceToBytes will free the slice and copy out a go []byte
// This function can be applied on *Slice returned from Key() and Value()
// of an Iterator, because they are marked as freed.
func moveSliceToBytes(s *gorocksdb.Slice) []byte {
defer s.Free()
if !s.Exists() {
return nil
}
v := make([]byte, len(s.Data()))
copy(v, s.Data())
return v
}
30 changes: 30 additions & 0 deletions rocks_db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// +build rocksdb

package db

import (
"fmt"
"os"
"testing"

"github.com/stretchr/testify/assert"
)

func TestRocksDBBackend(t *testing.T) {
name := fmt.Sprintf("test_%x", randStr(12))
dir := os.TempDir()
db := NewDB(name, RocksDBBackend, dir)
defer cleanupDBDir(dir, name)

_, ok := db.(*RocksDB)
assert.True(t, ok)
}

func TestCLevelDBStats(t *testing.T) {
name := fmt.Sprintf("test_%x", randStr(12))
dir := os.TempDir()
db := NewDB(name, RocksDBBackend, dir)
defer cleanupDBDir(dir, name)

assert.NotEmpty(t, db.Stats())
}

0 comments on commit 37a6176

Please sign in to comment.