Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
alphadose committed Jul 26, 2022
1 parent 7fa4e6c commit 12be04b
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 53 deletions.
64 changes: 32 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,43 +169,43 @@ Computed from benchstat of 30 benchmarks each via go test -benchmem -bench=. ben

name time/op
_Chan_NumWriters1_InputSize600-8 23.4µs ± 1%
_ZenQ_NumWriters1_InputSize600-8 17.7µs ± 0%
_Chan_NumWriters3_InputSize60000-8 5.54ms ± 5%
_ZenQ_NumWriters3_InputSize60000-8 2.63ms ± 2%
_Chan_NumWriters8_InputSize6000000-8 687ms ± 2%
_ZenQ_NumWriters8_InputSize6000000-8 243ms ± 4%
_Chan_NumWriters100_InputSize6000000-8 1.59s ± 4%
_ZenQ_NumWriters100_InputSize6000000-8 296ms ± 2%
_Chan_NumWriters1000_InputSize7000000-8 1.97s ± 0%
_ZenQ_NumWriters1000_InputSize7000000-8 409ms ± 2%
_Chan_Million_Blocking_Writers-8 10.4s ± 4%
_ZenQ_Million_Blocking_Writers-8 1.83s ±10%
_ZenQ_NumWriters1_InputSize600-8 18.0µs ± 1%
_Chan_NumWriters3_InputSize60000-8 5.35ms ± 3%
_ZenQ_NumWriters3_InputSize60000-8 2.39ms ± 5%
_Chan_NumWriters8_InputSize6000000-8 674ms ± 2%
_ZenQ_NumWriters8_InputSize6000000-8 236ms ± 2%
_Chan_NumWriters100_InputSize6000000-8 1.58s ± 6%
_ZenQ_NumWriters100_InputSize6000000-8 312ms ± 2%
_Chan_NumWriters1000_InputSize7000000-8 1.97s ± 1%
_ZenQ_NumWriters1000_InputSize7000000-8 397ms ± 4%
_Chan_Million_Blocking_Writers-8 11.0s ± 2%
_ZenQ_Million_Blocking_Writers-8 2.59s ±10%

name alloc/op
_Chan_NumWriters1_InputSize600-8 0.00B
_ZenQ_NumWriters1_InputSize600-8 0.00B
_Chan_NumWriters3_InputSize60000-8 117B ±63%
_ZenQ_NumWriters3_InputSize60000-8 22.1B ±122%
_Chan_NumWriters8_InputSize6000000-8 1.01kB ±196%
_ZenQ_NumWriters8_InputSize6000000-8 1.12kB ±89%
_Chan_NumWriters100_InputSize6000000-8 42.6kB ±37%
_ZenQ_NumWriters100_InputSize6000000-8 11.3kB ±28%
_Chan_NumWriters1000_InputSize7000000-8 481kB ± 7%
_ZenQ_NumWriters1000_InputSize7000000-8 90.5kB ± 6%
_Chan_NumWriters1_InputSize600-8 0.00B
_ZenQ_NumWriters1_InputSize600-8 0.00B
_Chan_NumWriters3_InputSize60000-8 114B ±82%
_ZenQ_NumWriters3_InputSize60000-8 23.6B ±112%
_Chan_NumWriters8_InputSize6000000-8 733B ±260%
_ZenQ_NumWriters8_InputSize6000000-8 1.02kB ±121%
_Chan_NumWriters100_InputSize6000000-8 43.7kB ±40%
_ZenQ_NumWriters100_InputSize6000000-8 11.2kB ±54%
_Chan_NumWriters1000_InputSize7000000-8 474kB ± 7%
_ZenQ_NumWriters1000_InputSize7000000-8 90.0kB ± 6%
_Chan_Million_Blocking_Writers-8 553MB ± 0%
_ZenQ_Million_Blocking_Writers-8 123MB ± 4%
_ZenQ_Million_Blocking_Writers-8 121MB ± 4%

name allocs/op
_Chan_NumWriters1_InputSize600-8 0.00
_ZenQ_NumWriters1_InputSize600-8 0.00
_Chan_NumWriters3_InputSize60000-8 0.00
_ZenQ_NumWriters3_InputSize60000-8 0.00
_Chan_NumWriters8_InputSize6000000-8 3.43 ±162%
_ZenQ_NumWriters8_InputSize6000000-8 5.23 ±53%
_Chan_NumWriters100_InputSize6000000-8 158 ±20%
_ZenQ_NumWriters100_InputSize6000000-8 26.3 ±29%
_Chan_NumWriters1000_InputSize7000000-8 1.76k ± 2%
_ZenQ_NumWriters1000_InputSize7000000-8 48.3 ±28%
_Chan_NumWriters1_InputSize600-8 0.00
_ZenQ_NumWriters1_InputSize600-8 0.00
_Chan_NumWriters3_InputSize60000-8 0.00
_ZenQ_NumWriters3_InputSize60000-8 0.00
_Chan_NumWriters8_InputSize6000000-8 2.18 ±175%
_ZenQ_NumWriters8_InputSize6000000-8 5.13 ±56%
_Chan_NumWriters100_InputSize6000000-8 157 ±30%
_ZenQ_NumWriters100_InputSize6000000-8 26.3 ±56%
_Chan_NumWriters1000_InputSize7000000-8 1.76k ± 4%
_ZenQ_NumWriters1000_InputSize7000000-8 47.1 ±29%
_Chan_Million_Blocking_Writers-8 2.00M ± 0%
_ZenQ_Million_Blocking_Writers-8 1.00M ± 0%
```
Expand Down
41 changes: 41 additions & 0 deletions bench_reports/darwin_arm64_m1/2.7.0.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
name time/op
_Chan_NumWriters1_InputSize600-8 23.4µs ± 1%
_ZenQ_NumWriters1_InputSize600-8 18.0µs ± 1%
_Chan_NumWriters3_InputSize60000-8 5.35ms ± 3%
_ZenQ_NumWriters3_InputSize60000-8 2.39ms ± 5%
_Chan_NumWriters8_InputSize6000000-8 674ms ± 2%
_ZenQ_NumWriters8_InputSize6000000-8 236ms ± 2%
_Chan_NumWriters100_InputSize6000000-8 1.58s ± 6%
_ZenQ_NumWriters100_InputSize6000000-8 312ms ± 2%
_Chan_NumWriters1000_InputSize7000000-8 1.97s ± 1%
_ZenQ_NumWriters1000_InputSize7000000-8 397ms ± 4%
_Chan_Million_Blocking_Writers-8 11.0s ± 2%
_ZenQ_Million_Blocking_Writers-8 2.59s ±10%

name alloc/op
_Chan_NumWriters1_InputSize600-8 0.00B
_ZenQ_NumWriters1_InputSize600-8 0.00B
_Chan_NumWriters3_InputSize60000-8 114B ±82%
_ZenQ_NumWriters3_InputSize60000-8 23.6B ±112%
_Chan_NumWriters8_InputSize6000000-8 733B ±260%
_ZenQ_NumWriters8_InputSize6000000-8 1.02kB ±121%
_Chan_NumWriters100_InputSize6000000-8 43.7kB ±40%
_ZenQ_NumWriters100_InputSize6000000-8 11.2kB ±54%
_Chan_NumWriters1000_InputSize7000000-8 474kB ± 7%
_ZenQ_NumWriters1000_InputSize7000000-8 90.0kB ± 6%
_Chan_Million_Blocking_Writers-8 553MB ± 0%
_ZenQ_Million_Blocking_Writers-8 121MB ± 4%

name allocs/op
_Chan_NumWriters1_InputSize600-8 0.00
_ZenQ_NumWriters1_InputSize600-8 0.00
_Chan_NumWriters3_InputSize60000-8 0.00
_ZenQ_NumWriters3_InputSize60000-8 0.00
_Chan_NumWriters8_InputSize6000000-8 2.18 ±175%
_ZenQ_NumWriters8_InputSize6000000-8 5.13 ±56%
_Chan_NumWriters100_InputSize6000000-8 157 ±30%
_ZenQ_NumWriters100_InputSize6000000-8 26.3 ±56%
_Chan_NumWriters1000_InputSize7000000-8 1.76k ± 4%
_ZenQ_NumWriters1000_InputSize7000000-8 47.1 ±29%
_Chan_Million_Blocking_Writers-8 2.00M ± 0%
_ZenQ_Million_Blocking_Writers-8 1.00M ± 0%
62 changes: 62 additions & 0 deletions bench_reports/ubuntu_intel_xeon.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
.-/+oossssoo+/-. manas@dell-Precision-Tower-5810
`:+ssssssssssssssssss+:` -------------------------------
-+ssssssssssssssssssyyssss+- OS: Ubuntu 20.04.3 LTS x86_64
.ossssssssssssssssssdMMMNysssso. Host: Precision Tower 5810
/ssssssssssshdmmNNmmyNMMMMhssssss/ Kernel: 5.11.0-27-generic
+ssssssssshmydMMMMMMMNddddyssssssss+ Uptime: 2 hours, 49 mins
/sssssssshNMMMyhhyyyyhmNMMMNhssssssss/ Packages: 3061 (dpkg), 9 (snap)
.ssssssssdMMMNhsssssssssshNMMMdssssssss. Shell: zsh 5.8
+sssshhhyNMMNyssssssssssssyNMMMysssssss+ Resolution: 1920x1080
ossyNMMMNyMMhsssssssssssssshmmmhssssssso DE: Plasma
ossyNMMMNyMMhsssssssssssssshmmmhssssssso WM: KWin
+sssshhhyNMMNyssssssssssssyNMMMysssssss+ Theme: Breeze [Plasma], Breeze [GTK2/3]
.ssssssssdMMMNhsssssssssshNMMMdssssssss. Icons: breeze [Plasma], breeze [GTK2/3]
/sssssssshNMMMyhhyyyyhdNMMMNhssssssss/ Terminal: terminator
+sssssssssdmydMMMMMMMMddddyssssssss+ CPU: Intel Xeon E5-1620 v4 (8) @ 3.800GHz
/ssssssssssshdmNNNNmyNMMMMhssssss/ GPU: NVIDIA GeForce GTX 1080
.ossssssssssssssssssdMMMNysssso. Memory: 2875MiB / 64244MiB
-+sssssssssssssssssyyyssss+-
`:+ssssssssssssssssss+:`
.-/+oossssoo+/-.

name time/op
_Chan_NumWriters1_InputSize600-8 90.8µs ± 7%
_ZenQ_NumWriters1_InputSize600-8 52.8µs ±15%
_Chan_NumWriters3_InputSize60000-8 12.2ms ± 5%
_ZenQ_NumWriters3_InputSize60000-8 5.14ms ± 3%
_Chan_NumWriters8_InputSize6000000-8 1.48s ± 9%
_ZenQ_NumWriters8_InputSize6000000-8 429ms ± 3%
_Chan_NumWriters100_InputSize6000000-8 1.78s ± 7%
_ZenQ_NumWriters100_InputSize6000000-8 453ms ± 4%
_Chan_NumWriters1000_InputSize7000000-8 3.95s ± 5%
_ZenQ_NumWriters1000_InputSize7000000-8 545ms ± 4%
_Chan_Million_Blocking_Writers-8 7.49s ± 1%
_ZenQ_Million_Blocking_Writers-8 2.04s ± 5%

name alloc/op
_Chan_NumWriters1_InputSize600-8 0.00B
_ZenQ_NumWriters1_InputSize600-8 0.00B
_Chan_NumWriters3_InputSize60000-8 202B ±81%
_ZenQ_NumWriters3_InputSize60000-8 94.4B ±64%
_Chan_NumWriters8_InputSize6000000-8 333B ±104%
_ZenQ_NumWriters8_InputSize6000000-8 1.62kB ±124%
_Chan_NumWriters100_InputSize6000000-8 41.6kB ±28%
_ZenQ_NumWriters100_InputSize6000000-8 15.4kB ±46%
_Chan_NumWriters1000_InputSize7000000-8 485kB ± 8%
_ZenQ_NumWriters1000_InputSize7000000-8 136kB ± 8%
_Chan_Million_Blocking_Writers-8 553MB ± 0%
_ZenQ_Million_Blocking_Writers-8 123MB ± 3%

name allocs/op
_Chan_NumWriters1_InputSize600-8 0.00
_ZenQ_NumWriters1_InputSize600-8 0.00
_Chan_NumWriters3_InputSize60000-8 0.00
_ZenQ_NumWriters3_InputSize60000-8 0.00
_Chan_NumWriters8_InputSize6000000-8 3.59 ±123%
_ZenQ_NumWriters8_InputSize6000000-8 8.24 ±46%
_Chan_NumWriters100_InputSize6000000-8 156 ±19%
_ZenQ_NumWriters100_InputSize6000000-8 36.2 ±46%
_Chan_NumWriters1000_InputSize7000000-8 1.80k ± 4%
_ZenQ_NumWriters1000_InputSize7000000-8 76.4 ±31%
_Chan_Million_Blocking_Writers-8 2.00M ± 0%
_ZenQ_Million_Blocking_Writers-8 1.00M ± 0%
43 changes: 22 additions & 21 deletions zenq.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ const (

type (
slot[T any] struct {
State uint32
WriteParker *ThreadParker[T]
Item T
state uint32
writeParker *ThreadParker[T]
item T
}

selectFactory struct {
Expand Down Expand Up @@ -101,7 +101,7 @@ func New[T any](size uint32) *ZenQ[T] {
for idx := uint32(0); idx < queueSize; idx++ {
n := parkPool.Get().(*parkSpot[T])
n.threadPtr, n.next = nil, nil
contents[idx].WriteParker = NewThreadParker[T](unsafe.Pointer(n))
contents[idx].writeParker = NewThreadParker[T](unsafe.Pointer(n))
}
zenq := &ZenQ[T]{
strideLength: unsafe.Sizeof(slot[T]{}),
Expand Down Expand Up @@ -153,14 +153,14 @@ direct_send:
slot := (*slot[T])(unsafe.Pointer(uintptr(atomic.AddUint32(&self.writerIndex, 1)&self.indexMask)*self.strideLength + uintptr(self.contents)))

// CAS -> change slot_state to busy if slot_state == empty
for !atomic.CompareAndSwapUint32(&slot.State, SlotEmpty, SlotBusy) {
switch atomic.LoadUint32(&slot.State) {
for !atomic.CompareAndSwapUint32(&slot.state, SlotEmpty, SlotBusy) {
switch atomic.LoadUint32(&slot.state) {
case SlotBusy:
wait()
case SlotCommitted:
n := self.alloc().(*parkSpot[T])
n.threadPtr, n.next, n.value = GetG(), nil, value
slot.WriteParker.Park(unsafe.Pointer(n))
slot.writeParker.Park(unsafe.Pointer(n))
mcall(fast_park)
return
case SlotEmpty:
Expand All @@ -169,8 +169,8 @@ direct_send:
return
}
}
slot.Item = value
atomic.StoreUint32(&slot.State, SlotCommitted)
slot.item = value
atomic.StoreUint32(&slot.state, SlotCommitted)
return
}

Expand All @@ -179,13 +179,13 @@ func (self *ZenQ[T]) Read() (data T, queueOpen bool) {
slot := (*slot[T])(unsafe.Pointer(uintptr(atomic.AddUint32(&self.readerIndex, 1)&self.indexMask)*self.strideLength + uintptr(self.contents)))

// CAS -> change slot_state to busy if slot_state == committed
for !atomic.CompareAndSwapUint32(&slot.State, SlotCommitted, SlotBusy) {
switch atomic.LoadUint32(&slot.State) {
for !atomic.CompareAndSwapUint32(&slot.state, SlotCommitted, SlotBusy) {
switch atomic.LoadUint32(&slot.state) {
case SlotBusy:
wait()
case SlotEmpty:
var freeable *parkSpot[T]
if data, queueOpen, freeable = slot.WriteParker.Ready(); queueOpen {
if data, queueOpen, freeable = slot.writeParker.Ready(); queueOpen {
if freeable != nil {
self.free(freeable)
}
Expand All @@ -199,7 +199,7 @@ func (self *ZenQ[T]) Read() (data T, queueOpen bool) {
return
}
case SlotClosed:
if atomic.CompareAndSwapUint32(&slot.State, SlotClosed, SlotEmpty) {
if atomic.CompareAndSwapUint32(&slot.state, SlotClosed, SlotEmpty) {
atomic.StoreUint32(&self.globalState, StateFullyClosed)
}
queueOpen = false
Expand All @@ -208,8 +208,8 @@ func (self *ZenQ[T]) Read() (data T, queueOpen bool) {
continue
}
}
data, queueOpen = slot.Item, true
atomic.StoreUint32(&slot.State, SlotEmpty)
data, queueOpen = slot.item, true
atomic.StoreUint32(&slot.state, SlotEmpty)
return
}

Expand All @@ -228,8 +228,8 @@ func (self *ZenQ[T]) Close() (alreadyClosedForWrites bool) {
slot := (*slot[T])(unsafe.Pointer(uintptr(atomic.AddUint32(&self.writerIndex, 1)&self.indexMask)*self.strideLength + uintptr(self.contents)))

// CAS -> change slot_state to busy if slot_state == empty
for !atomic.CompareAndSwapUint32(&slot.State, SlotEmpty, SlotBusy) {
switch atomic.LoadUint32(&slot.State) {
for !atomic.CompareAndSwapUint32(&slot.state, SlotEmpty, SlotBusy) {
switch atomic.LoadUint32(&slot.state) {
case SlotBusy, SlotCommitted:
mcall(gosched_m)
case SlotEmpty:
Expand All @@ -239,7 +239,7 @@ func (self *ZenQ[T]) Close() (alreadyClosedForWrites bool) {
}
}
// Closing commit
atomic.StoreUint32(&slot.State, SlotClosed)
atomic.StoreUint32(&slot.state, SlotClosed)
return
}

Expand Down Expand Up @@ -293,9 +293,10 @@ func (self *ZenQ[T]) Reset() {
// Unsafe to be called from multiple goroutines
func (self *ZenQ[T]) Dump() {
fmt.Printf("writerIndex: %3d, readerIndex: %3d\n contents:-\n\n", self.writerIndex, self.readerIndex)
// for idx := range self.contents {
// fmt.Printf("%5v : State -> %5v, Item -> %5v\n", idx, self.contents[idx].State, self.contents[idx].Item)
// }
for idx := uintptr(0); idx <= uintptr(self.indexMask); idx++ {
slot := (*slot[T])(unsafe.Pointer(uintptr(self.contents) + idx*unsafe.Sizeof(slot[T]{})))
fmt.Printf("Slot -> %#v\n", *slot)
}
}

// selectSender is an auxillary thread which remains parked by default
Expand Down

0 comments on commit 12be04b

Please sign in to comment.