gopl-zh.github.com/ch8/ch8-09.md

6.2 KiB
Raw Permalink Blame History

8.9. 并发的退出

有时候我们需要通知goroutine停止它正在干的事情比如一个正在执行计算的web服务然而它的客户端已经断开了和服务端的连接。

Go语言并没有提供在一个goroutine中终止另一个goroutine的方法由于这样会导致goroutine之间的共享变量落在未定义的状态上。在8.7节中的rocket launch程序中我们往名字叫abort的channel里发送了一个简单的值在countdown的goroutine中会把这个值理解为自己的退出信号。但是如果我们想要退出两个或者任意多个goroutine怎么办呢

一种可能的手段是向abort的channel里发送和goroutine数目一样多的事件来退出它们。如果这些goroutine中已经有一些自己退出了那么会导致我们的channel里的事件数比goroutine还多这样导致我们的发送直接被阻塞。另一方面如果这些goroutine又生成了其它的goroutine我们的channel里的数目又太少了所以有些goroutine可能会无法接收到退出消息。一般情况下我们是很难知道在某一个时刻具体有多少个goroutine在运行着的。另外当一个goroutine从abort channel中接收到一个值的时候他会消费掉这个值这样其它的goroutine就没法看到这条信息。为了能够达到我们退出goroutine的目的我们需要更靠谱的策略来通过一个channel把消息广播出去这样goroutine们能够看到这条事件消息并且在事件完成之后可以知道这件事已经发生过了。

回忆一下我们关闭了一个channel并且被消费掉了所有已发送的值操作channel之后的代码可以立即被执行并且会产生零值。我们可以将这个机制扩展一下来作为我们的广播机制不要向channel发送值而是用关闭一个channel来进行广播。

只要一些小修改我们就可以把退出逻辑加入到前一节的du程序。首先我们创建一个退出的channel不需要向这个channel发送任何值但其所在的闭包内要写明程序需要退出。我们同时还定义了一个工具函数cancelled这个函数在被调用的时候会轮询退出状态。

gopl.io/ch8/du4

var done = make(chan struct{})

func cancelled() bool {
	select {
	case <-done:
		return true
	default:
		return false
	}
}

下面我们创建一个从标准输入流中读取内容的goroutine这是一个比较典型的连接到终端的程序。每当有输入被读到比如用户按了回车键这个goroutine就会把取消消息通过关闭done的channel广播出去。

// Cancel traversal when input is detected.
go func() {
	os.Stdin.Read(make([]byte, 1)) // read a single byte
	close(done)
}()

现在我们需要使我们的goroutine来对取消进行响应。在main goroutine中我们添加了select的第三个case语句尝试从done channel中接收内容。如果这个case被满足的话在select到的时候即会返回但在结束之前我们需要把fileSizes channel中的内容“排”空在channel被关闭之前舍弃掉所有值。这样可以保证对walkDir的调用不要被向fileSizes发送信息阻塞住可以正确地完成。

for {
	select {
	case <-done:
		// Drain fileSizes to allow existing goroutines to finish.
		for range fileSizes {
			// Do nothing.
		}
		return
	case size, ok := <-fileSizes:
		// ...
	}
}

walkDir这个goroutine一启动就会轮询取消状态如果取消状态被设置的话会直接返回并且不做额外的事情。这样我们将所有在取消事件之后创建的goroutine改变为无操作。

func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
	defer n.Done()
	if cancelled() {
		return
	}
	for _, entry := range dirents(dir) {
		// ...
	}
}

在walkDir函数的循环中我们对取消状态进行轮询可以带来明显的益处可以避免在取消事件发生时还去创建goroutine。取消本身是有一些代价的想要快速的响应需要对程序逻辑进行侵入式的修改。确保在取消发生之后不要有代价太大的操作可能会需要修改你代码里的很多地方但是在一些重要的地方去检查取消事件也确实能带来很大的好处。

对这个程序的一个简单的性能分析可以揭示瓶颈在dirents函数中获取一个信号量。下面的select可以让这种操作可以被取消并且可以将取消时的延迟从几百毫秒降低到几十毫秒。

func dirents(dir string) []os.FileInfo {
	select {
	case sema <- struct{}{}: // acquire token
	case <-done:
		return nil // cancelled
	}
	defer func() { <-sema }() // release token
	// ...read directory...
}

现在当取消发生时所有后台的goroutine都会迅速停止并且主函数会返回。当然当主函数返回时一个程序会退出而我们又无法在主函数退出的时候确认其已经释放了所有的资源译注因为程序都退出了你的代码都没法执行了。这里有一个方便的窍门我们可以一用取代掉直接从主函数返回我们调用一个panic然后runtime会把每一个goroutine的栈dump下来。如果main goroutine是唯一一个剩下的goroutine的话他会清理掉自己的一切资源。但是如果还有其它的goroutine没有退出他们可能没办法被正确地取消掉也有可能被取消但是取消操作会很花时间所以这里的一个调研还是很有必要的。我们用panic来获取到足够的信息来验证我们上面的判断看看最终到底是什么样的情况。

练习 8.10 HTTP请求可能会因http.Request结构体中Cancel channel的关闭而取消。修改8.6节中的web crawler来支持取消http请求。提示http.Get并没有提供方便地定制一个请求的方法。你可以用http.NewRequest来取而代之设置它的Cancel字段然后用http.DefaultClient.Do(req)来进行这个http请求。

练习 8.11 紧接着8.4.4中的mirroredQuery流程实现一个并发请求url的fetch的变种。当第一个请求返回时直接取消其它的请求。