Skip to content

Commit

Permalink
feat(concurrency): add channel
Browse files Browse the repository at this point in the history
  • Loading branch information
shgopher committed Jan 18, 2024
1 parent 828b699 commit ba386a1
Showing 1 changed file with 58 additions and 0 deletions.
58 changes: 58 additions & 0 deletions 并发/channel/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -874,11 +874,69 @@ func fanintflect(chs ...chan any) chan any {
```
反射版本这里有个小问题,从 cases 中取出数据,往 out 中添加,这个操作只能一个 case 一个 case 的取,但是你如果观察上文的递归实现模式,你会发现它开启了很多 goroutine 所以是并发的去取数据往 out 中送。
### fan-out
fan out 跟 fan in 模式是相反的,fan out 指的是拥有一个输入源,多个输出源,实际上这是一种广播机制,读取一个数据,广播给众多接受者。

下面看一下代码:

```go
func fanout(value chan any,out []chan any,async bool){

}
```

实际上这段代码有 goroutine 泄露的风险,如果 out 一直不被消费,value 的数据越来越多,那么这么多等待发送信息的 goroutine 就会发生泄露问题

解决方案有下面几种:

- 使用信号量去控制最多的 goroutine 数量
- 给每一个 goroutine 设置超时时间
- 控制发送的 value channel 的发送频率
- 使用 worker pool 去限制并发数量

那么让我们用代码去实现这些改进的方案:

方案一:使用信号量控制
```go
```

方案二:使用超时时间
```go
```

方案三:控制发送 value 的发送频率
```go
```

方案四:使用 worker 池复用 goroutine 去控制并发的 goroutine 数量
```go
```

按照我的工作经验,使用工作池和信号量控制 goroutine 数量的方法最为常用,他们都是保证最多同时存在 n 个 goroutine,这样就避免了 goroutine 泄漏问题
### map-reduce

### pipeline-流水线模式
pipeline 模式的核心思想就是顺序,单线模式,数据从头到尾,顺序执行,一个阶段的输出是下一阶段的输入。

```go
var a A
a.get().then().Download()
```
这就是 pipeline 的一个简单掩饰,它的底层可能是这样实现的

```go
type A struct{}
func (a *A)get()*A{return a}
func (a *A)then()*A{return a}
func (a *A)Download()*A{return a}
```

那么在并发的场景里,如何使 goroutine 实现 pipeline 模式呢?

这显然跟一般的流水线实现方法不同。

我们将 channel 比作一个 token,所谓令牌,只要我们控制获取令牌的顺序,那么就可以控制持有这些令牌的 goroutine 顺序。


### stream-流模式

### pipeline 流水线模式和 stream 流模式的对比
Expand Down

0 comments on commit ba386a1

Please sign in to comment.