From 12be04b09b6c0fd2261604717cc2967f15879f54 Mon Sep 17 00:00:00 2001 From: Anish Mukherjee Date: Tue, 26 Jul 2022 21:21:32 +0530 Subject: [PATCH] refactor --- README.md | 64 ++++++++++++------------- bench_reports/darwin_arm64_m1/2.7.0.txt | 41 ++++++++++++++++ bench_reports/ubuntu_intel_xeon.txt | 62 ++++++++++++++++++++++++ zenq.go | 43 +++++++++-------- 4 files changed, 157 insertions(+), 53 deletions(-) create mode 100644 bench_reports/darwin_arm64_m1/2.7.0.txt create mode 100644 bench_reports/ubuntu_intel_xeon.txt diff --git a/README.md b/README.md index 45baa4f..5d84bc9 100644 --- a/README.md +++ b/README.md @@ -169,43 +169,43 @@ Computed from benchstat of 30 benchmarks each via go test -benchmem -bench=. ben name time/op _Chan_NumWriters1_InputSize600-8 23.4µs ± 1% -_ZenQ_NumWriters1_InputSize600-8 17.7µs ± 0% -_Chan_NumWriters3_InputSize60000-8 5.54ms ± 5% -_ZenQ_NumWriters3_InputSize60000-8 2.63ms ± 2% -_Chan_NumWriters8_InputSize6000000-8 687ms ± 2% -_ZenQ_NumWriters8_InputSize6000000-8 243ms ± 4% -_Chan_NumWriters100_InputSize6000000-8 1.59s ± 4% -_ZenQ_NumWriters100_InputSize6000000-8 296ms ± 2% -_Chan_NumWriters1000_InputSize7000000-8 1.97s ± 0% -_ZenQ_NumWriters1000_InputSize7000000-8 409ms ± 2% -_Chan_Million_Blocking_Writers-8 10.4s ± 4% -_ZenQ_Million_Blocking_Writers-8 1.83s ±10% +_ZenQ_NumWriters1_InputSize600-8 18.0µs ± 1% +_Chan_NumWriters3_InputSize60000-8 5.35ms ± 3% +_ZenQ_NumWriters3_InputSize60000-8 2.39ms ± 5% +_Chan_NumWriters8_InputSize6000000-8 674ms ± 2% +_ZenQ_NumWriters8_InputSize6000000-8 236ms ± 2% +_Chan_NumWriters100_InputSize6000000-8 1.58s ± 6% +_ZenQ_NumWriters100_InputSize6000000-8 312ms ± 2% +_Chan_NumWriters1000_InputSize7000000-8 1.97s ± 1% +_ZenQ_NumWriters1000_InputSize7000000-8 397ms ± 4% +_Chan_Million_Blocking_Writers-8 11.0s ± 2% +_ZenQ_Million_Blocking_Writers-8 2.59s ±10% name alloc/op -_Chan_NumWriters1_InputSize600-8 0.00B -_ZenQ_NumWriters1_InputSize600-8 0.00B -_Chan_NumWriters3_InputSize60000-8 117B ±63% -_ZenQ_NumWriters3_InputSize60000-8 22.1B ±122% -_Chan_NumWriters8_InputSize6000000-8 1.01kB ±196% -_ZenQ_NumWriters8_InputSize6000000-8 1.12kB ±89% -_Chan_NumWriters100_InputSize6000000-8 42.6kB ±37% -_ZenQ_NumWriters100_InputSize6000000-8 11.3kB ±28% -_Chan_NumWriters1000_InputSize7000000-8 481kB ± 7% -_ZenQ_NumWriters1000_InputSize7000000-8 90.5kB ± 6% +_Chan_NumWriters1_InputSize600-8 0.00B +_ZenQ_NumWriters1_InputSize600-8 0.00B +_Chan_NumWriters3_InputSize60000-8 114B ±82% +_ZenQ_NumWriters3_InputSize60000-8 23.6B ±112% +_Chan_NumWriters8_InputSize6000000-8 733B ±260% +_ZenQ_NumWriters8_InputSize6000000-8 1.02kB ±121% +_Chan_NumWriters100_InputSize6000000-8 43.7kB ±40% +_ZenQ_NumWriters100_InputSize6000000-8 11.2kB ±54% +_Chan_NumWriters1000_InputSize7000000-8 474kB ± 7% +_ZenQ_NumWriters1000_InputSize7000000-8 90.0kB ± 6% _Chan_Million_Blocking_Writers-8 553MB ± 0% -_ZenQ_Million_Blocking_Writers-8 123MB ± 4% +_ZenQ_Million_Blocking_Writers-8 121MB ± 4% name allocs/op -_Chan_NumWriters1_InputSize600-8 0.00 -_ZenQ_NumWriters1_InputSize600-8 0.00 -_Chan_NumWriters3_InputSize60000-8 0.00 -_ZenQ_NumWriters3_InputSize60000-8 0.00 -_Chan_NumWriters8_InputSize6000000-8 3.43 ±162% -_ZenQ_NumWriters8_InputSize6000000-8 5.23 ±53% -_Chan_NumWriters100_InputSize6000000-8 158 ±20% -_ZenQ_NumWriters100_InputSize6000000-8 26.3 ±29% -_Chan_NumWriters1000_InputSize7000000-8 1.76k ± 2% -_ZenQ_NumWriters1000_InputSize7000000-8 48.3 ±28% +_Chan_NumWriters1_InputSize600-8 0.00 +_ZenQ_NumWriters1_InputSize600-8 0.00 +_Chan_NumWriters3_InputSize60000-8 0.00 +_ZenQ_NumWriters3_InputSize60000-8 0.00 +_Chan_NumWriters8_InputSize6000000-8 2.18 ±175% +_ZenQ_NumWriters8_InputSize6000000-8 5.13 ±56% +_Chan_NumWriters100_InputSize6000000-8 157 ±30% +_ZenQ_NumWriters100_InputSize6000000-8 26.3 ±56% +_Chan_NumWriters1000_InputSize7000000-8 1.76k ± 4% +_ZenQ_NumWriters1000_InputSize7000000-8 47.1 ±29% _Chan_Million_Blocking_Writers-8 2.00M ± 0% _ZenQ_Million_Blocking_Writers-8 1.00M ± 0% ``` diff --git a/bench_reports/darwin_arm64_m1/2.7.0.txt b/bench_reports/darwin_arm64_m1/2.7.0.txt new file mode 100644 index 0000000..fadd190 --- /dev/null +++ b/bench_reports/darwin_arm64_m1/2.7.0.txt @@ -0,0 +1,41 @@ +name time/op +_Chan_NumWriters1_InputSize600-8 23.4µs ± 1% +_ZenQ_NumWriters1_InputSize600-8 18.0µs ± 1% +_Chan_NumWriters3_InputSize60000-8 5.35ms ± 3% +_ZenQ_NumWriters3_InputSize60000-8 2.39ms ± 5% +_Chan_NumWriters8_InputSize6000000-8 674ms ± 2% +_ZenQ_NumWriters8_InputSize6000000-8 236ms ± 2% +_Chan_NumWriters100_InputSize6000000-8 1.58s ± 6% +_ZenQ_NumWriters100_InputSize6000000-8 312ms ± 2% +_Chan_NumWriters1000_InputSize7000000-8 1.97s ± 1% +_ZenQ_NumWriters1000_InputSize7000000-8 397ms ± 4% +_Chan_Million_Blocking_Writers-8 11.0s ± 2% +_ZenQ_Million_Blocking_Writers-8 2.59s ±10% + +name alloc/op +_Chan_NumWriters1_InputSize600-8 0.00B +_ZenQ_NumWriters1_InputSize600-8 0.00B +_Chan_NumWriters3_InputSize60000-8 114B ±82% +_ZenQ_NumWriters3_InputSize60000-8 23.6B ±112% +_Chan_NumWriters8_InputSize6000000-8 733B ±260% +_ZenQ_NumWriters8_InputSize6000000-8 1.02kB ±121% +_Chan_NumWriters100_InputSize6000000-8 43.7kB ±40% +_ZenQ_NumWriters100_InputSize6000000-8 11.2kB ±54% +_Chan_NumWriters1000_InputSize7000000-8 474kB ± 7% +_ZenQ_NumWriters1000_InputSize7000000-8 90.0kB ± 6% +_Chan_Million_Blocking_Writers-8 553MB ± 0% +_ZenQ_Million_Blocking_Writers-8 121MB ± 4% + +name allocs/op +_Chan_NumWriters1_InputSize600-8 0.00 +_ZenQ_NumWriters1_InputSize600-8 0.00 +_Chan_NumWriters3_InputSize60000-8 0.00 +_ZenQ_NumWriters3_InputSize60000-8 0.00 +_Chan_NumWriters8_InputSize6000000-8 2.18 ±175% +_ZenQ_NumWriters8_InputSize6000000-8 5.13 ±56% +_Chan_NumWriters100_InputSize6000000-8 157 ±30% +_ZenQ_NumWriters100_InputSize6000000-8 26.3 ±56% +_Chan_NumWriters1000_InputSize7000000-8 1.76k ± 4% +_ZenQ_NumWriters1000_InputSize7000000-8 47.1 ±29% +_Chan_Million_Blocking_Writers-8 2.00M ± 0% +_ZenQ_Million_Blocking_Writers-8 1.00M ± 0% diff --git a/bench_reports/ubuntu_intel_xeon.txt b/bench_reports/ubuntu_intel_xeon.txt new file mode 100644 index 0000000..fd2dc2d --- /dev/null +++ b/bench_reports/ubuntu_intel_xeon.txt @@ -0,0 +1,62 @@ + .-/+oossssoo+/-. manas@dell-Precision-Tower-5810 + `:+ssssssssssssssssss+:` ------------------------------- + -+ssssssssssssssssssyyssss+- OS: Ubuntu 20.04.3 LTS x86_64 + .ossssssssssssssssssdMMMNysssso. Host: Precision Tower 5810 + /ssssssssssshdmmNNmmyNMMMMhssssss/ Kernel: 5.11.0-27-generic + +ssssssssshmydMMMMMMMNddddyssssssss+ Uptime: 2 hours, 49 mins + /sssssssshNMMMyhhyyyyhmNMMMNhssssssss/ Packages: 3061 (dpkg), 9 (snap) +.ssssssssdMMMNhsssssssssshNMMMdssssssss. Shell: zsh 5.8 ++sssshhhyNMMNyssssssssssssyNMMMysssssss+ Resolution: 1920x1080 +ossyNMMMNyMMhsssssssssssssshmmmhssssssso DE: Plasma +ossyNMMMNyMMhsssssssssssssshmmmhssssssso WM: KWin ++sssshhhyNMMNyssssssssssssyNMMMysssssss+ Theme: Breeze [Plasma], Breeze [GTK2/3] +.ssssssssdMMMNhsssssssssshNMMMdssssssss. Icons: breeze [Plasma], breeze [GTK2/3] + /sssssssshNMMMyhhyyyyhdNMMMNhssssssss/ Terminal: terminator + +sssssssssdmydMMMMMMMMddddyssssssss+ CPU: Intel Xeon E5-1620 v4 (8) @ 3.800GHz + /ssssssssssshdmNNNNmyNMMMMhssssss/ GPU: NVIDIA GeForce GTX 1080 + .ossssssssssssssssssdMMMNysssso. Memory: 2875MiB / 64244MiB + -+sssssssssssssssssyyyssss+- + `:+ssssssssssssssssss+:` + .-/+oossssoo+/-. + +name time/op +_Chan_NumWriters1_InputSize600-8 90.8µs ± 7% +_ZenQ_NumWriters1_InputSize600-8 52.8µs ±15% +_Chan_NumWriters3_InputSize60000-8 12.2ms ± 5% +_ZenQ_NumWriters3_InputSize60000-8 5.14ms ± 3% +_Chan_NumWriters8_InputSize6000000-8 1.48s ± 9% +_ZenQ_NumWriters8_InputSize6000000-8 429ms ± 3% +_Chan_NumWriters100_InputSize6000000-8 1.78s ± 7% +_ZenQ_NumWriters100_InputSize6000000-8 453ms ± 4% +_Chan_NumWriters1000_InputSize7000000-8 3.95s ± 5% +_ZenQ_NumWriters1000_InputSize7000000-8 545ms ± 4% +_Chan_Million_Blocking_Writers-8 7.49s ± 1% +_ZenQ_Million_Blocking_Writers-8 2.04s ± 5% + +name alloc/op +_Chan_NumWriters1_InputSize600-8 0.00B +_ZenQ_NumWriters1_InputSize600-8 0.00B +_Chan_NumWriters3_InputSize60000-8 202B ±81% +_ZenQ_NumWriters3_InputSize60000-8 94.4B ±64% +_Chan_NumWriters8_InputSize6000000-8 333B ±104% +_ZenQ_NumWriters8_InputSize6000000-8 1.62kB ±124% +_Chan_NumWriters100_InputSize6000000-8 41.6kB ±28% +_ZenQ_NumWriters100_InputSize6000000-8 15.4kB ±46% +_Chan_NumWriters1000_InputSize7000000-8 485kB ± 8% +_ZenQ_NumWriters1000_InputSize7000000-8 136kB ± 8% +_Chan_Million_Blocking_Writers-8 553MB ± 0% +_ZenQ_Million_Blocking_Writers-8 123MB ± 3% + +name allocs/op +_Chan_NumWriters1_InputSize600-8 0.00 +_ZenQ_NumWriters1_InputSize600-8 0.00 +_Chan_NumWriters3_InputSize60000-8 0.00 +_ZenQ_NumWriters3_InputSize60000-8 0.00 +_Chan_NumWriters8_InputSize6000000-8 3.59 ±123% +_ZenQ_NumWriters8_InputSize6000000-8 8.24 ±46% +_Chan_NumWriters100_InputSize6000000-8 156 ±19% +_ZenQ_NumWriters100_InputSize6000000-8 36.2 ±46% +_Chan_NumWriters1000_InputSize7000000-8 1.80k ± 4% +_ZenQ_NumWriters1000_InputSize7000000-8 76.4 ±31% +_Chan_Million_Blocking_Writers-8 2.00M ± 0% +_ZenQ_Million_Blocking_Writers-8 1.00M ± 0% diff --git a/zenq.go b/zenq.go index af9ebd8..8fdfc14 100644 --- a/zenq.go +++ b/zenq.go @@ -52,9 +52,9 @@ const ( type ( slot[T any] struct { - State uint32 - WriteParker *ThreadParker[T] - Item T + state uint32 + writeParker *ThreadParker[T] + item T } selectFactory struct { @@ -101,7 +101,7 @@ func New[T any](size uint32) *ZenQ[T] { for idx := uint32(0); idx < queueSize; idx++ { n := parkPool.Get().(*parkSpot[T]) n.threadPtr, n.next = nil, nil - contents[idx].WriteParker = NewThreadParker[T](unsafe.Pointer(n)) + contents[idx].writeParker = NewThreadParker[T](unsafe.Pointer(n)) } zenq := &ZenQ[T]{ strideLength: unsafe.Sizeof(slot[T]{}), @@ -153,14 +153,14 @@ direct_send: slot := (*slot[T])(unsafe.Pointer(uintptr(atomic.AddUint32(&self.writerIndex, 1)&self.indexMask)*self.strideLength + uintptr(self.contents))) // CAS -> change slot_state to busy if slot_state == empty - for !atomic.CompareAndSwapUint32(&slot.State, SlotEmpty, SlotBusy) { - switch atomic.LoadUint32(&slot.State) { + for !atomic.CompareAndSwapUint32(&slot.state, SlotEmpty, SlotBusy) { + switch atomic.LoadUint32(&slot.state) { case SlotBusy: wait() case SlotCommitted: n := self.alloc().(*parkSpot[T]) n.threadPtr, n.next, n.value = GetG(), nil, value - slot.WriteParker.Park(unsafe.Pointer(n)) + slot.writeParker.Park(unsafe.Pointer(n)) mcall(fast_park) return case SlotEmpty: @@ -169,8 +169,8 @@ direct_send: return } } - slot.Item = value - atomic.StoreUint32(&slot.State, SlotCommitted) + slot.item = value + atomic.StoreUint32(&slot.state, SlotCommitted) return } @@ -179,13 +179,13 @@ func (self *ZenQ[T]) Read() (data T, queueOpen bool) { slot := (*slot[T])(unsafe.Pointer(uintptr(atomic.AddUint32(&self.readerIndex, 1)&self.indexMask)*self.strideLength + uintptr(self.contents))) // CAS -> change slot_state to busy if slot_state == committed - for !atomic.CompareAndSwapUint32(&slot.State, SlotCommitted, SlotBusy) { - switch atomic.LoadUint32(&slot.State) { + for !atomic.CompareAndSwapUint32(&slot.state, SlotCommitted, SlotBusy) { + switch atomic.LoadUint32(&slot.state) { case SlotBusy: wait() case SlotEmpty: var freeable *parkSpot[T] - if data, queueOpen, freeable = slot.WriteParker.Ready(); queueOpen { + if data, queueOpen, freeable = slot.writeParker.Ready(); queueOpen { if freeable != nil { self.free(freeable) } @@ -199,7 +199,7 @@ func (self *ZenQ[T]) Read() (data T, queueOpen bool) { return } case SlotClosed: - if atomic.CompareAndSwapUint32(&slot.State, SlotClosed, SlotEmpty) { + if atomic.CompareAndSwapUint32(&slot.state, SlotClosed, SlotEmpty) { atomic.StoreUint32(&self.globalState, StateFullyClosed) } queueOpen = false @@ -208,8 +208,8 @@ func (self *ZenQ[T]) Read() (data T, queueOpen bool) { continue } } - data, queueOpen = slot.Item, true - atomic.StoreUint32(&slot.State, SlotEmpty) + data, queueOpen = slot.item, true + atomic.StoreUint32(&slot.state, SlotEmpty) return } @@ -228,8 +228,8 @@ func (self *ZenQ[T]) Close() (alreadyClosedForWrites bool) { slot := (*slot[T])(unsafe.Pointer(uintptr(atomic.AddUint32(&self.writerIndex, 1)&self.indexMask)*self.strideLength + uintptr(self.contents))) // CAS -> change slot_state to busy if slot_state == empty - for !atomic.CompareAndSwapUint32(&slot.State, SlotEmpty, SlotBusy) { - switch atomic.LoadUint32(&slot.State) { + for !atomic.CompareAndSwapUint32(&slot.state, SlotEmpty, SlotBusy) { + switch atomic.LoadUint32(&slot.state) { case SlotBusy, SlotCommitted: mcall(gosched_m) case SlotEmpty: @@ -239,7 +239,7 @@ func (self *ZenQ[T]) Close() (alreadyClosedForWrites bool) { } } // Closing commit - atomic.StoreUint32(&slot.State, SlotClosed) + atomic.StoreUint32(&slot.state, SlotClosed) return } @@ -293,9 +293,10 @@ func (self *ZenQ[T]) Reset() { // Unsafe to be called from multiple goroutines func (self *ZenQ[T]) Dump() { fmt.Printf("writerIndex: %3d, readerIndex: %3d\n contents:-\n\n", self.writerIndex, self.readerIndex) - // for idx := range self.contents { - // fmt.Printf("%5v : State -> %5v, Item -> %5v\n", idx, self.contents[idx].State, self.contents[idx].Item) - // } + for idx := uintptr(0); idx <= uintptr(self.indexMask); idx++ { + slot := (*slot[T])(unsafe.Pointer(uintptr(self.contents) + idx*unsafe.Sizeof(slot[T]{}))) + fmt.Printf("Slot -> %#v\n", *slot) + } } // selectSender is an auxillary thread which remains parked by default