Golang channel 使用小结

以常规方式编写并发程序,需要对共享变量作正确的访问控制,处理起来很困难。而 golang 提出一种不同的方式,即共享变量通过 channel 传递,共享变量从不被各个独立运行的线程(goroutine)同时享有,在任一时刻,共享变量仅可被一个 goroutine 访问。所以,不会产生数据竞争。并发编程,golang 鼓励以此种方式进行思考,精简为一句口号——“勿通过共享内存来进行通信,而应通过通信来进行内存共享”。

1 Unbuffered channels 与 Buffered channels

Unbuffered channels 的接收者阻塞直至收到消息,发送者阻塞直至接收者接收到消息,该机制可用于两个 goroutine 的状态同步。Buffered channels 在缓冲区未满时,发送者仅在值拷贝到缓冲区之前是阻塞的,而在缓冲区已满时,发送者会阻塞,直至接收者取走了消息,缓冲区有了空余。

1.1 Unbuffered channels

如下代码使用 Unbuffered channel 作同步控制。给定一个整型数组,在主 routine 启动另一个 goroutine 将该数组排序,当其完成时,给 done channel 发送完成消息,主 routine 会一直等待直至排序完成,打印结果。

package main

import (
    "fmt"
    "sort"
    "time"
)

func main() {
    done := make(chan bool)
    nums := []int{2, 1, 3, 5, 4}
    go func() {
        time.Sleep(time.Second)
        sort.Ints(nums)
        done <- true
    }()
    <-done
    fmt.Println(nums)
}

1.2 Buffered channels

如下代码中,messages chan 的缓冲区大小为 2,因其为 Buffered channel,所以消息发送与接收无须分开到两个并发的 goroutine 中。

package main

import (
    "fmt"
)

func main() {
    messages := make(chan string, 2)
    messages <- "hello"
    messages <- "world"
    fmt.Println(<-messages, <-messages)
}

2 配套使用

2.1 指明 channel direction

函数封装时,对仅作消息接收或仅作消息发送的 chan 标识 direction 可以借用编译器检查增强类型使用安全。如下代码中,ping 函数中 pings chan 仅用来接收消息,所以参数列表中将其标识为接收者。pong 函数中,pings chan 仅用来发送消息,pongs chan 仅用来接收消息,所以参数列表中二者分别标识为发送者与接收者。

package main

import "fmt"

func ping(pings chan<- string, msg string) {
    pings <- msg
}

func pong(pings <-chan string, pongs chan<- string) {
    pongs <- <-pings
}

func main() {
    pings, pongs := make(chan string, 1), make(chan string, 1)
    ping(pings, "ping")
    pong(pings, pongs)
    fmt.Println(<-pongs)
}

2.2 select

使用 select 可以用来等待多个 channel 的消息,如下代码,创建两个 chan,启动两个 goroutine 耗费不等时间计算结果,主 routine 监听消息,使用两次 select,第一次接收到了 ch2 的消息,第二次接收到了 ch1 的消息,用时 2.000521146s。

package main

import (
    "fmt"
    "time"
)

func main() {
    c1, c2 := make(chan int, 1), make(chan int, 1)
    go func() {
        time.Sleep(2 * time.Second)
        c1 <- 1
    }()
    go func() {
        time.Sleep(time.Second)
        c2 <- 2
    }()
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-c1:
            fmt.Println("received msg from c1", msg1)
        case msg2 := <-c2:
            fmt.Println("received msg from c2", msg2)
        }
    }
}

2.3 select with default

select with default 可以用来处理非阻塞式消息发送、接收及多路选择。如下代码中,第一个 select 为非阻塞式消息接收,若收到消息,则落入<-messages case,否则落入 default。第二个 select 为非阻塞式消息发送,与非阻塞式消息接收类似,因 messages chan 为 Unbuffered channel 且无异步消息接收者,因此落入 default case。第三个 select 为多路非阻塞式消息接收。

package main

import "fmt"

func main() {
    messages := make(chan string)
    signal := make(chan bool)

    // receive with default
    select {
    case <-messages:
        fmt.Println("message received")
    default:
        fmt.Println("no message received")
    }

    // send with default
    select {
    case messages <- "message":
        fmt.Println("message sent successfully")
    default:
        fmt.Println("message sent failed")
    }

    // muti-way select
    select {
    case <-messages:
        fmt.Println("message received")
    case <-signal:
        fmt.Println("signal received")
    default:
        fmt.Println("no message or signal received")
    }
}

2.4 close

当无需再给 channel 发送消息时,可将其 close。如下代码中,创建一个 Buffered channel,首先启动一个异步 goroutine 循环消费消息,然后主 routine 完成消息发送后关闭 chan,消费 goroutine 检测到 chan 关闭后,退出循环。

package main

import "fmt"

func main() {
    messages := make(chan int, 10)
    done := make(chan bool)

    // consumer
    go func() {
        for {
            msg, more := <-messages
            if !more {
                fmt.Println("no more message")
                done <- true
                break
            }
            fmt.Println("message received", msg)
        }
    }()

    // producer
    for i := 0; i < 5; i++ {
        messages <- i
    }
    close(messages)
    <-done
}

2.5 for range

for range 语法不仅可对基础数据结构(slice、map 等)作迭代,还可对 channel 作消息接收迭代。如下代码中,给 messages chan 发送两条消息后将其关闭,然后迭代 messages chan 打印消息。

package main

import "fmt"

func main() {
    messages := make(chan string, 2)
    messages <- "hello"
    messages <- "world"
    close(messages)

    for msg := range messages {
        fmt.Println(msg)
    }
}

3 应用场景

3.1 超时控制

资源访问、网络请求等场景作超时控制是非常必要的,可以使用 channel 结合 select 来实现。如下代码,对常规 sum 函数增加超时限制,sumWithTimeout 函数中,select 的 v := <-rlt 在等待计算结果,若在时限范围内计算完成,则正常返回计算结果,若超过时限则落入<-time.After(timeout) case,抛出 timeout error。

package main

import (
    "errors"
    "fmt"
    "time"
)

func sum(nums []int) int {
    rlt := 0
    for _, num := range nums {
        rlt += num
    }
    return rlt
}

func sumWithTimeout(nums []int, timeout time.Duration) (int, error) {
    rlt := make(chan int)
    go func() {
        time.Sleep(2 * time.Second)
        rlt <- sum(nums)
    }()
    select {
    case v := <-rlt:
        return v, nil
    case <-time.After(timeout):
        return 0, errors.New("timeout")
    }
}

func main() {
    nums := []int{1, 2, 3, 4, 5}
    timeout := 3 * time.Second // time.Second
    rlt, err := sumWithTimeout(nums, timeout)
    if nil != err {
        fmt.Println("error", err)
        return
    }
    fmt.Println(rlt)
}

本文代码托管地址:https://github.com/olzhy/go-exercises/tree/master/channels

参考资料

[1] https://golang.org/doc/effective_go.html#channels

[2] https://gobyexample.com/channel-synchronization

[3] https://gobyexample.com/channel-buffering

[4] https://gobyexample.com/channel-directions

[5] https://gobyexample.com/select

[6] https://gobyexample.com/non-blocking-channel-operations

[7] https://gobyexample.com/closing-channels

[8] https://gobyexample.com/range-over-channels

[9] https://gobyexample.com/timeouts

创作不易,如果我的文章确实帮助到了您,请我喝点东西就是一种莫大的支持!Thanks!
微信支付宝