Skip to content

Commit

Permalink
feat(concurrency): add optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
shgopher committed Mar 2, 2024
1 parent a413586 commit fd54f64
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 3 deletions.
4 changes: 3 additions & 1 deletion 并发/channel/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* @Author: shgopher [email protected]
* @Date: 2023-05-14 23:08:19
* @LastEditors: shgopher [email protected]
* @LastEditTime: 2024-02-20 20:36:37
* @LastEditTime: 2024-03-02 15:18:19
* @FilePath: /GOFamily/并发/channel/README.md
* @Description:
*
Expand Down Expand Up @@ -693,6 +693,8 @@ func serve(addr string,handler http.Handler,stop <-chan struct{})error {
// 信号传递和关闭服务结合
// 这里不能使用 os.Exit()
// os.exit 会立刻全部停止,不优雅,比较暴力
// log.Fetal 这个函数通常不要使用,因为它的底层调用的就是os.Exit()
// 如果要是使用这个 log.Fetal 函数,只用在程序的起始位置,比如main的开头,或者init函数中
go func(){
<- stop
s.Shutdown(contex.Background())
Expand Down
155 changes: 153 additions & 2 deletions 并发/并发优化/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* @Author: shgopher [email protected]
* @Date: 2024-02-26 00:23:06
* @LastEditors: shgopher [email protected]
* @LastEditTime: 2024-03-02 01:39:11
* @LastEditTime: 2024-03-02 17:43:37
* @FilePath: /GOFamily/并发/并发优化/README.md
* @Description:
*
Expand Down Expand Up @@ -51,7 +51,71 @@ func main(){


## 优先使用 channel + context 的方法去优雅关闭
核心就是把众多启动的 goroutine 改成 worker pool + context 的模型

在 channel 中我们讲过 worker pool 的实现,为什么一定要优先使用 worker pool?因为很多时候你启动了很多的 goroutine,不知不觉就会造成混乱,以及 goroutine 的泄露问题,我们使用 worker pool 的方式,可以很好的控制 goroutine 的并发数量,以及优雅的关闭 goroutine

看一个优秀的例子:
```go

type Tracker struct {
ch chan string // 作为工作池子让worker消费
stop chan struct{}
}

func NewTracker() *Tracker {
return &Tracker{
ch: make(chan string, 10),
stop: make(chan struct{}, 2),
}
}
func (t *Tracker) Event(ctx context.Context, data string) error {
select {
case t.ch <- data:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (t *Tracker) Run() {
for data := range t.ch {
// 模拟消费
time.Sleep(time.Second * 5)
fmt.Println(data)
}
// run 数据结束之后就可以发送信号了
t.stop <- struct{}{}
}

// shutDown 通过关闭ch 让 run中的range结束,进而再发送stop信号,让shutdown函数退出
func (t *Tracker) Shutdown(ctx context.Context) {
close(t.ch)
select {
case <-t.stop:
case <-ctx.Done():
}
}
```
```go
func main() {
tr := NewTracker()
// 开启两个消费者
go tr.Run()
go tr.Run()

tr.Event(context.Background(), "test")
tr.Event(context.Background(), "test2")
tr.Event(context.Background(), "test3")
tr.Event(context.Background(), "test4")
tr.Event(context.Background(), "tes5")
tr.Event(context.Background(), "test6")

time.Sleep(10 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
defer cancel()
tr.Shutdown(ctx)
}
```
## 使用方去决定是否并发
```go
func ListDirectory(dir string) chan string
Expand Down Expand Up @@ -139,4 +203,91 @@ for _, name := range names {
}
```

可以看到这个函数的实现完成了基本的功能,既可以并发,又可以不并发
可以看到这个函数的实现完成了基本的功能,既可以并发,又可以不并发
## 必须让发送方决定 channel 的关闭
如果不能让发送方决定 channel 的关闭,而是不控制或者是接收方去控制,那么数据的丢失就不可避免了

```go
package main

import (
"fmt"
"sync"
)

func main() {
ch := make(chan int)
wg := sync.WaitGroup{}

// 发送者
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 5; i++ {
ch <- i
}
close(ch) // 发送方关闭channel
}()

// 接收者
wg.Add(1)
go func() {
defer wg.Done()
for {
if data, ok := <-ch; ok {
fmt.Println(data)
} else {
break // channel已关闭
}
}
}()

wg.Wait()
}
```
那么让我看看一下几个反例

```go
// 反例1:接收方关闭channel
func main() {
ch := make(chan int)

go func() {
for i := 0; i < 5; i++ {
ch <- i
}
}()

for {
fmt.Println(<-ch)
// 当你接收方去关闭channel的时候
// 发送方如果不知道的话,向一个close的channel发送数据会panic的
if xx {
close(ch) // 接收方关闭channel
break
}
}
}
```
```go
// 反例2:没有关闭channel
func main() {
ch := make(chan int)

go func() {
for i := 0; i < 5; i++ {
ch <- i
}
}()

for {
// 这里 ok 永远都不会false,所以 这里会死循环
if data, ok := <-ch; ok {
fmt.Println(data)
} else {
break
}
}
}
```

0 comments on commit fd54f64

Please sign in to comment.