Implementing parallelism
How we added threading and multicore support to TinyGo
chan
, select
sync
sync/atomic
println
, runtime.NumCPU
, etcWhat even is a futex?
wait(address *atomic.Uint32, expected uint32)
wakeOne(address *atomic.Uint32)
wakeAll(address *atomic.Uint32)
// In the kernel:
var waitingThreads = make(map[*atomic.Uint32][]*OSThread)
func wait(address *atomic.Uint32, expected uint32) {
// do atomically:
if address.Load() == expected {
waitingThreads[address] = append(waitingThreads[address], currentThread())
// and now wait
}
}
Platform | API |
---|---|
Linux | see futex(2) |
MacOS | __ulock_wait2 __ulock_wake |
Windows | WaitOnAddress WakeByAddressSingle WakeByAddressAll |
WebAssembly | memory.atomic.wait memory.atomic.notify |
More information:
https://outerproduct.net/futex-dictionary.html
type Futex struct {
atomic.Uint32
}
func (f *Futex) Wait(expected uint32) {
wait(&f.Uint32, expected)
}
func (f *Futex) Wake() {
wakeOne(&f.Uint32)
}
func (f *Futex) WakeAll() {
wakeAll(&f.Uint32)
}
sync.WaitGroup
type WaitGroup struct {
futex task.Futex // wrapped atomic.Uint32
}
func (wg *WaitGroup) Add(delta int) {
if wg.futex.Add(uint32(delta)) == 0 {
wg.futex.WakeAll()
}
}
func (wg *WaitGroup) Wait() {
for {
counter := wg.futex.Load()
if counter == 0 { break }
wg.futex.Wait(counter)
}
}
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
// The runtime implementation of the Go 'chan' type.
type channel struct {
closed bool
selectLocked bool
elementSize uintptr
bufCap uintptr // 'cap'
bufLen uintptr // 'len'
bufHead uintptr
bufTail uintptr
senders chanQueue
receivers chanQueue
lock task.PMutex
buf unsafe.Pointer
}
select
var chan1 = make(chan int)
var chan2 = make(chan int)
func foo() {
select {
case <-chan1:
case chan2 <- 1: // deadlock!
}
}
func bar() {
select {
case chan2 <- 1:
case <-chan1: // deadlock!
}
}
Mark phase:
Before | After |
---|---|
type Futex struct {
atomic.Uint32
waiters Stack // linked list of waiting goroutines
}
func (f *Futex) Wait(expected uint32) {
spinlockTake()
if f.Uint32.Load() == cmp {
f.waiters.Push(Current())
spinlockRelease()
Pause()
} else {
spinlockRelease()
}
}
func (f *Futex) Wake() {
spinlockTake()
if t := f.waiters.Pop(); t != nil {
scheduleGoroutine(t)
}
spinlockRelease()
}
How to find me:
@ayke@hachyderm.io
@aykevl
@aykevl