不要通过共享内存来通信,而应通过通信来共享内存。
在“Go编程实战:博客备份”一文中,使用 Go 语言实现了博客备份的串行流程。本文,我们来学习使用 Go channel 的基于通信的并发编程。
并发编程模型
并发是一个很有趣也很有挑战性的话题。 CPU 设计已经朝多核方向发展多时,而并发是充分利用多核优势的编程模型。用《火影忍者》的术语,并发就相当于多重影分身术,可以同时分化出不计其数的鸣人来进行攻击和防御。
不过,并发是有一定难度的。与串行程序按照指令顺序执行不同,并发的指令执行顺序是不确定的,因此更容易出错,出现难以排查和难以解决的 BUG。
目前有两种主要的并发模型:
- 基于共享内存的并发模型。即多个线程可以同时对同一个内存区域进行读写。这种并发模型,必须非常小心地对共享内存进行同步访问,否则,就很可能出现各种非预期的问题。详情可阅:Java并发的若干基本陷阱、原理及解决方案。
- 基于通信的并发模型。多个线程或协程通过 channel 来通信,通过 channel 来协调多个线程或协程的执行顺序。这种并发模型,实际上隐式地依赖了共享内存,但通过限制共享内存的访问而降低了出错概率。channel,实际上就是共享阻塞队列,但这种队列只允许一个写,一个读,或者只能写或只能读。
Go 语言最令人激动的就是将并发内置在语言里,提供了基于 channel 通信的并发编程模型。当然,channel 让并发编程模型变得简单,并不代表并发的难度就降低了。不仔细处理,并发依然是容易出错的。下面给出基于 Go channel 的并发编程示例,读者可以慢慢体会并发编程的“魅力”。
基本尝试
如下代码所示。只是改了 SequentialRun2 和 WriteMarkdown。
- 声明了一个 等待组 sync.WaitGroup wg,可以看作是一个倒数计数器。
- 每拿到一个有效博文链接,就使用 wg.Add(1) 加一个计数; 每当执行完成一个 WriteMarkdown, 就用 wg.Done() 减一个计数(相当于 Add(-1));
- 使用一个 wg.Wait() 阻塞住主流程。
类比下:
- 有十个运动员准备短跑。每个运动员进场就计数一次。
- 发令枪一响,每个运动员都开始短跑。每一个运动员到达终点,就减一个计数。
- 当计数减为零时,比赛结束。
两个问题:
- 为什么 WriteMarkdown 调用需要用 go ? 因为 go 会起一个协程去异步执行任务,这样就使得每个博文的 WriteMarkdown 的执行是并发的。
- 为什么要有 wg.Wait() ?读者可以去掉试试。会发现程序很快就退出了,并且几乎什么都没打印。这是因为 main goroutine 退出时,整个程序就结束了,协程也就无法执行了。
并发就是这么简单! 真的吗?下面将揭示,并发编程里令人烧脑的地方。
func SequentialRun2(fpath string) {
blogRssp, err := ReadXml(fpath)
if err != nil {
os.Exit(2)
}
var wg sync.WaitGroup
mdlinksptr := GetMdLinksPtr(blogRssp)
for i:=0 ; i<len(*mdlinksptr); i++ {
linktrimed := strings.Trim((*mdlinksptr)[i].Link, " ")
if linktrimed == "" {
continue
}
wg.Add(1)
go WriteMarkdown((*mdlinksptr)[i], wg)
}
wg.Wait()
}
func WriteMarkdown(mdlink MarkdownFile, wg sync.WaitGroup) {
defer wg.Done()
// code...
}
并发问题
sync.WaitGroup 适合每个子任务都是相互独立无依赖的。如果任务之间是有依赖的关系,就不能这么处理了。
先来梳理下整个流程:
从博客备份文件中解析出博文链接列表 => 从每个博文链接中下载 HTML 并转换成 Markdown 文件。
假设我每解析出一个博文链接,就将这个博文链接通过 channel 输送给 WriteMarkdown 函数。并且,为了增大并发度,将这个 channel 声明成 buffered channel。
过早退出
先看下面这段代码。使用了一个叫做 mdchannel 的 buffered channel 来传递博文链接列表。每拿到一个博文链接,就通过 mdchannel 输送给 WriteMarkdownFromChannel。这个程序有什么问题?
blog_backup_con_bug_1.go
func sendMdLinks(blogRss *BlogRss, mdchannel chan MarkdownFile) {
blogchannelp := blogRss.Channel
blogitems := (*blogchannelp).Items
for _, item := range blogitems {
mdchannel <- MarkdownFile{Title: item.Title, Link: item.Link}
}
}
func WriteMarkdownFromChannel(mdchannel chan MarkdownFile) {
mdlink := <- mdchannel
fmt.Printf("%v", mdlink)
go WriteMarkdown(mdlink)
}
func ConRun(fpath string) {
blogRssp, err := ReadXml(fpath)
if err != nil {
os.Exit(2)
}
mdchannel := make(chan MarkdownFile, 6)
go sendMdLinks(blogRssp, mdchannel)
WriteMarkdownFromChannel(mdchannel)
}
func main() {
ConRun(GetFiles()[0])
}
你会发现,这个程序只打印了一条博文链接,而且没有生成任何 Markdown 文件。为什么会这样?
- 虽然 blogitems 通过 for-range 进行了遍历,但是 WriteMarkdownFromChannel 只执行了一次,
- 当 mdlink := <- mdchannel 获取到一条博文链接,开始启动一个协程来执行 WriteMarkdown 时,main goroutine 已经无阻塞地退出了,程序就结束了。因此不会生成任何 Markdown 文件。
这是基于 channel 并发编程遇到的第一个问题:main goroutine 过早退出。一切已无法挽回。
为了阻止 main goroutine 过早退出,必须想出一种办法来阻塞 main goroutine。我们想到了 sync.WaitGroup.Wait 方法。于是有了第二个版本。
第二个版本使用了 sync.WaitGroup 。遗憾的是,第二个版本犯了与第一个版本几乎相同的错误,即使使用了 sync.WaitGroup.Wait 也无济于事。
这里有个问题,已经使用了 defer wg.Done() 和 wg.Wait() 阻塞了 main goro