Skip to content

Commit

Permalink
runtime: make channels parallelism-safe
Browse files Browse the repository at this point in the history
  • Loading branch information
aykevl committed Dec 4, 2024
1 parent 2588bf7 commit ac06c05
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 30 deletions.
7 changes: 6 additions & 1 deletion src/internal/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Task struct {
}

// DataUint32 returns the Data field as a uint32. The value is only valid after
// setting it through SetDataUint32.
// setting it through SetDataUint32 or by storing to it using DataAtomicUint32.
func (t *Task) DataUint32() uint32 {
return *(*uint32)(unsafe.Pointer(&t.Data))
}
Expand All @@ -38,6 +38,11 @@ func (t *Task) SetDataUint32(val uint32) {
*(*uint32)(unsafe.Pointer(&t.Data)) = val
}

// DataAtomicUint32 returns the Data field as an atomic-if-needed Uint32 value.
func (t *Task) DataAtomicUint32() *Uint32 {
return (*Uint32)(unsafe.Pointer(&t.Data))
}

// getGoroutineStackSize is a compiler intrinsic that returns the stack size for
// the given function and falls back to the default stack size. It is replaced
// with a load from a special section just before codegen.
Expand Down
120 changes: 93 additions & 27 deletions src/runtime/chan.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ package runtime
// non-select operations) so that the select operation knows which case did
// proceed.
// The value is at the same time also a way that goroutines can be the first
// (and only) goroutine to 'take' a channel operation to change it from
// 'waiting' to any other value. This is important for the select statement
// because multiple goroutines could try to let different channels in the
// select statement proceed at the same time. By using Task.Data, only a
// single channel operation in the select statement can proceed.
// (and only) goroutine to 'take' a channel operation using an atomic CAS
// operation to change it from 'waiting' to any other value. This is important
// for the select statement because multiple goroutines could try to let
// different channels in the select statement proceed at the same time. By
// using Task.Data, only a single channel operation in the select statement
// can proceed.
// - It is possible for the channel queues to contain already-processed senders
// or receivers. This can happen when the select statement managed to proceed
// but the goroutine doing the select has not yet cleaned up the stale queue
Expand All @@ -49,15 +50,17 @@ import (

// The runtime implementation of the Go 'chan' type.
type channel struct {
closed bool
elementSize uintptr
bufCap uintptr // 'cap'
bufLen uintptr // 'len'
bufHead uintptr
bufTail uintptr
senders chanQueue
receivers chanQueue
buf unsafe.Pointer
closed bool
selectLocked bool
elementSize uintptr
bufCap uintptr // 'cap'
bufLen uintptr // 'len'
bufHead uintptr
bufTail uintptr
senders chanQueue
receivers chanQueue
lock task.PMutex
buf unsafe.Pointer
}

const (
Expand All @@ -73,7 +76,8 @@ type chanQueue struct {

// Pus the next channel operation to the queue. All appropriate fields must have
// been initialized already.
// This function must be called with interrupts disabled.
// This function must be called with interrupts disabled and the channel lock
// held.
func (q *chanQueue) push(node *channelOp) {
node.next = q.first
q.first = node
Expand All @@ -99,16 +103,17 @@ func (q *chanQueue) pop(chanOp uint32) *channelOp {
newDataValue := chanOp | popped.index<<2

// Try to be the first to proceed with this goroutine.
if popped.task.DataUint32() == chanOperationWaiting {
popped.task.SetDataUint32(newDataValue)
swapped := popped.task.DataAtomicUint32().CompareAndSwap(0, newDataValue)
if swapped {
return popped
}
}
}

// Remove the given to-be-removed node from the queue if it is part of the
// queue. If there are multiple, only one will be removed.
// This function must be called with interrupts disabled.
// This function must be called with interrupts disabled and the channel lock
// held.
func (q *chanQueue) remove(remove *channelOp) {
n := &q.first
for *n != nil {
Expand Down Expand Up @@ -159,8 +164,8 @@ func chanCap(c *channel) int {
}

// Push the value to the channel buffer array, for a send operation.
// This function may only be called when interrupts are disabled and it is known
// there is space available in the buffer.
// This function may only be called when interrupts are disabled, the channel is
// locked and it is known there is space available in the buffer.
func (ch *channel) bufferPush(value unsafe.Pointer) {
elemAddr := unsafe.Add(ch.buf, ch.bufHead*ch.elementSize)
ch.bufLen++
Expand All @@ -174,8 +179,8 @@ func (ch *channel) bufferPush(value unsafe.Pointer) {

// Pop a value from the channel buffer and store it in the 'value' pointer, for
// a receive operation.
// This function may only be called when interrupts are disabled and it is known
// there is at least one value available in the buffer.
// This function may only be called when interrupts are disabled, the channel is
// locked and it is known there is at least one value available in the buffer.
func (ch *channel) bufferPop(value unsafe.Pointer) {
elemAddr := unsafe.Add(ch.buf, ch.bufTail*ch.elementSize)
ch.bufLen--
Expand All @@ -191,7 +196,8 @@ func (ch *channel) bufferPop(value unsafe.Pointer) {
}

// Try to proceed with this send operation without blocking, and return whether
// the send succeeded. Interrupts must be disabled when calling this function.
// the send succeeded. Interrupts must be disabled and the lock must be held
// when calling this function.
func (ch *channel) trySend(value unsafe.Pointer) bool {
// To make sure we send values in the correct order, we can only send
// directly to a receiver when there are no values in the buffer.
Expand Down Expand Up @@ -230,9 +236,11 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
}

mask := interrupt.Disable()
ch.lock.Lock()

// See whether we can proceed immediately, and if so, return early.
if ch.trySend(value) {
ch.lock.Unlock()
interrupt.Restore(mask)
return
}
Expand All @@ -244,9 +252,12 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
op.index = 0
op.value = value
ch.senders.push(op)
ch.lock.Unlock()
interrupt.Restore(mask)

// Wait until this goroutine is resumed.
// It might be resumed after Unlock() and before Pause(). In that case,
// because we use semaphores, the Pause() will continue immediately.
task.Pause()

// Check whether the sent happened normally (not because the channel was
Expand All @@ -258,8 +269,8 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
}

// Try to proceed with this receive operation without blocking, and return
// whether the receive operation succeeded. Interrupts must be disabled when
// calling this function.
// whether the receive operation succeeded. Interrupts must be disabled and the
// lock must be held when calling this function.
func (ch *channel) tryRecv(value unsafe.Pointer) (received, ok bool) {
// To make sure we keep the values in the channel in the correct order, we
// first have to read values from the buffer before we can look at the
Expand Down Expand Up @@ -303,8 +314,10 @@ func chanRecv(ch *channel, value unsafe.Pointer, op *channelOp) bool {
}

mask := interrupt.Disable()
ch.lock.Lock()

if received, ok := ch.tryRecv(value); received {
ch.lock.Unlock()
interrupt.Restore(mask)
return ok
}
Expand All @@ -317,6 +330,7 @@ func chanRecv(ch *channel, value unsafe.Pointer, op *channelOp) bool {
op.task = t
op.index = 0
ch.receivers.push(op)
ch.lock.Unlock()
interrupt.Restore(mask)

// Wait until the goroutine is resumed.
Expand All @@ -335,9 +349,11 @@ func chanClose(ch *channel) {
}

mask := interrupt.Disable()
ch.lock.Lock()

if ch.closed {
// Not allowed by the language spec.
ch.lock.Unlock()
interrupt.Restore(mask)
runtimePanic("close of closed channel")
}
Expand Down Expand Up @@ -370,14 +386,56 @@ func chanClose(ch *channel) {

ch.closed = true

ch.lock.Unlock()
interrupt.Restore(mask)
}

// We currently use a global select lock to avoid deadlocks while locking each
// individual channel in the select. Without this global lock, two select
// operations that have a different order of the same channels could end up in a
// deadlock. This global lock is inefficient if there are many select operations
// happening in parallel, but gets the job done.
//
// If this becomes a performance issue, we can see how the Go runtime does this.
// I think it does this by sorting all states by channel address and then
// locking them in that order to avoid this deadlock.
var chanSelectLock task.PMutex

// Lock all channels (taking care to skip duplicate channels).
func lockAllStates(states []chanSelectState) {
if !hasParallelism {
return
}
for _, state := range states {
if state.ch != nil && !state.ch.selectLocked {
state.ch.lock.Lock()
state.ch.selectLocked = true
}
}
}

// Unlock all channels (taking care to skip duplicate channels).
func unlockAllStates(states []chanSelectState) {
if !hasParallelism {
return
}
for _, state := range states {
if state.ch != nil && state.ch.selectLocked {
state.ch.lock.Unlock()
state.ch.selectLocked = false
}
}
}

// chanSelect implements blocking or non-blocking select operations.
// The 'ops' slice must be set if (and only if) this is a blocking select.
func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelOp) (uint32, bool) {
mask := interrupt.Disable()

// Lock everything.
chanSelectLock.Lock()
lockAllStates(states)

const selectNoIndex = ^uint32(0)
selectIndex := selectNoIndex
selectOk := true
Expand Down Expand Up @@ -409,6 +467,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
// return early.
blocking := len(ops) != 0
if selectIndex != selectNoIndex || !blocking {
unlockAllStates(states)
chanSelectLock.Unlock()
interrupt.Restore(mask)
return selectIndex, selectOk
}
Expand All @@ -417,8 +477,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
// become more complicated.
// We add ourselves as a sender/receiver to every channel, and wait for the
// first one to complete. Only one will successfully complete, because
// senders and receivers will check t.Data for the state so that only one
// will be able to "take" this select operation.
// senders and receivers use a compare-and-exchange atomic operation on
// t.Data so that only one will be able to "take" this select operation.
t := task.Current()
t.Ptr = recvbuf
t.SetDataUint32(chanOperationWaiting)
Expand All @@ -438,13 +498,17 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
}

// Now we wait until one of the send/receive operations can proceed.
unlockAllStates(states)
chanSelectLock.Unlock()
interrupt.Restore(mask)
task.Pause()

// Resumed, so one channel operation must have progressed.

// Make sure all channel ops are removed from the senders/receivers
// queue before we return and the memory of them becomes invalid.
chanSelectLock.Lock()
lockAllStates(states)
for i, state := range states {
if state.ch == nil {
continue
Expand All @@ -458,6 +522,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
}
interrupt.Restore(mask)
}
unlockAllStates(states)
chanSelectLock.Unlock()

// Pull the return values out of t.Data (which contains two bitfields).
selectIndex = t.DataUint32() >> 2
Expand Down
8 changes: 6 additions & 2 deletions src/runtime/scheduler_cooperative.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import (
// queue a new scheduler invocation using setTimeout.
const asyncScheduler = GOOS == "js"

const hasScheduler = true

// Concurrency is not parallelism. While the cooperative scheduler has
// concurrency, it does not have parallelism.
const hasParallelism = false

// Queues used by the scheduler.
var (
runqueue task.Queue
Expand Down Expand Up @@ -248,5 +254,3 @@ func run() {
}()
scheduler(false)
}

const hasScheduler = true
3 changes: 3 additions & 0 deletions src/runtime/scheduler_none.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import "internal/task"

const hasScheduler = false

// No goroutines are allowed, so there's no parallelism anywhere.
const hasParallelism = false

// run is called by the program entry point to execute the go program.
// With the "none" scheduler, init and the main function are invoked directly.
func run() {
Expand Down

0 comments on commit ac06c05

Please sign in to comment.