gopl-zh.github.com/ch9/ch9-07.md

345 lines
15 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

## 9.7. 示例: 并发的非阻塞缓存
本节中我们会做一个无阻塞的缓存这种工具可以帮助我们来解决现实世界中并发程序出现但没有现成的库可以解决的问题。这个问题叫作缓存memoizing函数译注Memoization的定义 memoization 一词是Donald Michie 根据拉丁语memorandum杜撰的一个词。相应的动词、过去分词、ing形式有memoiz、memoized、memoizing也就是说我们需要缓存函数的返回结果这样在对函数进行调用的时候我们就只需要一次计算之后只要返回计算的结果就可以了。我们的解决方案会是并发安全且会避免对整个缓存加锁而导致所有操作都去争一个锁的设计。
我们将使用下面的httpGetBody函数作为我们需要缓存的函数的一个样例。这个函数会去进行HTTP GET请求并且获取http响应body。对这个函数的调用本身开销是比较大的所以我们尽量避免在不必要的时候反复调用。
```go
func httpGetBody(url string) (interface{}, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
```
最后一行稍微隐藏了一些细节。ReadAll会返回两个结果一个[]byte数组和一个错误不过这两个对象可以被赋值给httpGetBody的返回声明里的interface{}和error类型所以我们也就可以这样返回结果并且不需要额外的工作了。我们在httpGetBody中选用这种返回类型是为了使其可以与缓存匹配。
下面是我们要设计的cache的第一个“草稿”
<u><i>gopl.io/ch9/memo1</i></u>
```go
// Package memo provides a concurrency-unsafe
// memoization of a function of type Func.
package memo
// A Memo caches the results of calling a Func.
type Memo struct {
f Func
cache map[string]result
}
// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)
type result struct {
value interface{}
err error
}
func New(f Func) *Memo {
return &Memo{f: f, cache: make(map[string]result)}
}
// NOTE: not concurrency-safe!
func (memo *Memo) Get(key string) (interface{}, error) {
res, ok := memo.cache[key]
if !ok {
res.value, res.err = memo.f(key)
memo.cache[key] = res
}
return res.value, res.err
}
```
Memo实例会记录需要缓存的函数f类型为Func以及缓存内容里面是一个string到result映射的map。每一个result都是简单的函数返回的值对儿——一个值和一个错误值。继续下去我们会展示一些Memo的变种不过所有的例子都会遵循上面的这些方面。
下面是一个使用Memo的例子。对于流入的URL的每一个元素我们都会调用Get并打印调用延时以及其返回的数据大小的log
```go
m := memo.New(httpGetBody)
for url := range incomingURLs() {
start := time.Now()
value, err := m.Get(url)
if err != nil {
log.Print(err)
}
fmt.Printf("%s, %s, %d bytes\n",
url, time.Since(start), len(value.([]byte)))
}
```
我们可以使用测试包第11章的主题来系统地鉴定缓存的效果。从下面的测试输出我们可以看到URL流包含了一些重复的情况尽管我们第一次对每一个URL的`(*Memo).Get`的调用都会花上几百毫秒但第二次就只需要花1毫秒就可以返回完整的数据了。
```
$ go test -v gopl.io/ch9/memo1
=== RUN Test
https://golang.org, 175.026418ms, 7537 bytes
https://godoc.org, 172.686825ms, 6878 bytes
https://play.golang.org, 115.762377ms, 5767 bytes
http://gopl.io, 749.887242ms, 2856 bytes
https://golang.org, 721ns, 7537 bytes
https://godoc.org, 152ns, 6878 bytes
https://play.golang.org, 205ns, 5767 bytes
http://gopl.io, 326ns, 2856 bytes
--- PASS: Test (1.21s)
PASS
ok gopl.io/ch9/memo1 1.257s
```
这个测试是顺序地去做所有的调用的。
由于这种彼此独立的HTTP请求可以很好地并发我们可以把这个测试改成并发形式。可以使用sync.WaitGroup来等待所有的请求都完成之后再返回。
```go
m := memo.New(httpGetBody)
var n sync.WaitGroup
for url := range incomingURLs() {
n.Add(1)
go func(url string) {
start := time.Now()
value, err := m.Get(url)
if err != nil {
log.Print(err)
}
fmt.Printf("%s, %s, %d bytes\n",
url, time.Since(start), len(value.([]byte)))
n.Done()
}(url)
}
n.Wait()
```
这次测试跑起来更快了然而不幸的是貌似这个测试不是每次都能够正常工作。我们注意到有一些意料之外的cache miss缓存未命中或者命中了缓存但却返回了错误的值或者甚至会直接崩溃。
但更糟糕的是有时候这个程序还是能正确的运行也就是最让人崩溃的偶发bug所以我们甚至可能都不会意识到这个程序有bug。但是我们可以使用-race这个flag来运行程序竞争检测器§9.6)会打印像下面这样的报告:
```
$ go test -run=TestConcurrent -race -v gopl.io/ch9/memo1
=== RUN TestConcurrent
...
WARNING: DATA RACE
Write by goroutine 36:
runtime.mapassign1()
~/go/src/runtime/hashmap.go:411 +0x0
gopl.io/ch9/memo1.(*Memo).Get()
~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205
...
Previous write by goroutine 35:
runtime.mapassign1()
~/go/src/runtime/hashmap.go:411 +0x0
gopl.io/ch9/memo1.(*Memo).Get()
~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205
...
Found 1 data race(s)
FAIL gopl.io/ch9/memo1 2.393s
```
memo.go的32行出现了两次说明有两个goroutine在没有同步干预的情况下更新了cache map。这表明Get不是并发安全的存在数据竞争。
```go
28 func (memo *Memo) Get(key string) (interface{}, error) {
29 res, ok := memo.cache(key)
30 if !ok {
31 res.value, res.err = memo.f(key)
32 memo.cache[key] = res
33 }
34 return res.value, res.err
35 }
```
最简单的使cache并发安全的方式是使用基于监控的同步。只要给Memo加上一个mutex在Get的一开始获取互斥锁return的时候释放锁就可以让cache的操作发生在临界区内了
<u><i>gopl.io/ch9/memo2</i></u>
```go
type Memo struct {
f Func
mu sync.Mutex // guards cache
cache map[string]result
}
// Get is concurrency-safe.
func (memo *Memo) Get(key string) (value interface{}, err error) {
memo.mu.Lock()
res, ok := memo.cache[key]
if !ok {
res.value, res.err = memo.f(key)
memo.cache[key] = res
}
memo.mu.Unlock()
return res.value, res.err
}
```
测试依然并发进行但这回竞争检查器“沉默”了。不幸的是对于Memo的这一点改变使我们完全丧失了并发的性能优点。每次对f的调用期间都会持有锁Get将本来可以并行运行的I/O操作串行化了。我们本章的目的是完成一个无锁缓存而不是现在这样的将所有请求串行化的函数的缓存。
下一个Get的实现调用Get的goroutine会两次获取锁查找阶段获取一次如果查找没有返回任何内容那么进入更新阶段会再次获取。在这两次获取锁的中间阶段其它goroutine可以随意使用cache。
<u><i>gopl.io/ch9/memo3</i></u>
```go
func (memo *Memo) Get(key string) (value interface{}, err error) {
memo.mu.Lock()
res, ok := memo.cache[key]
memo.mu.Unlock()
if !ok {
res.value, res.err = memo.f(key)
// Between the two critical sections, several goroutines
// may race to compute f(key) and update the map.
memo.mu.Lock()
memo.cache[key] = res
memo.mu.Unlock()
}
return res.value, res.err
}
```
这些修改使性能再次得到了提升但有一些URL被获取了两次。这种情况在两个以上的goroutine同一时刻调用Get来请求同样的URL时会发生。多个goroutine一起查询cache发现没有值然后一起调用f这个慢不拉叽的函数。在得到结果后也都会去更新map。其中一个获得的结果会覆盖掉另一个的结果。
理想情况下是应该避免掉多余的工作的。而这种“避免”工作一般被称为duplicate suppression重复抑制/避免。下面版本的Memo每一个map元素都是指向一个条目的指针。每一个entry包含对函数f调用结果的内容缓存。与之前不同的是这次entry还包含了一个叫ready的channel。在entry的res字段被设置之后这个channel就会被关闭以向其它goroutine广播§8.9去读取该entry内的结果是安全的了。
<u><i>gopl.io/ch9/memo4</i></u>
```go
type entry struct {
res result
ready chan struct{} // closed when res is ready
}
func New(f Func) *Memo {
return &Memo{f: f, cache: make(map[string]*entry)}
}
type Memo struct {
f Func
mu sync.Mutex // guards cache
cache map[string]*entry
}
func (memo *Memo) Get(key string) (value interface{}, err error) {
memo.mu.Lock()
e := memo.cache[key]
if e == nil {
// This is the first request for this key.
// This goroutine becomes responsible for computing
// the value and broadcasting the ready condition.
e = &entry{ready: make(chan struct{})}
memo.cache[key] = e
memo.mu.Unlock()
e.res.value, e.res.err = memo.f(key)
close(e.ready) // broadcast ready condition
} else {
// This is a repeat request for this key.
memo.mu.Unlock()
<-e.ready // wait for ready condition
}
return e.res.value, e.res.err
}
```
现在Get函数包括下面这些步骤了获取互斥锁来保护共享变量cache map查询map中是否存在指定条目如果没有找到那么分配空间插入一个新条目释放互斥锁。如果存在条目的话且其值没有写入完成也就是有其它的goroutine在调用f这个慢函数goroutine必须等待值ready之后才能读到条目的结果。而想知道是否ready的话可以直接从ready channel中读取由于这个读取操作在channel关闭之前一直是阻塞。
如果没有条目的话需要向map中插入一个没有准备好的条目当前正在调用的goroutine就需要负责调用慢函数、更新条目以及向其它所有goroutine广播条目已经ready可读的消息了。
条目中的e.res.value和e.res.err变量是在多个goroutine之间共享的。创建条目的goroutine同时也会设置条目的值其它goroutine在收到"ready"的广播消息之后立刻会去读取条目的值。尽管会被多个goroutine同时访问但却并不需要互斥锁。ready channel的关闭一定会发生在其它goroutine接收到广播事件之前因此第一个goroutine对这些变量的写操作是一定发生在这些读操作之前的。不会发生数据竞争。
这样并发、不重复、无阻塞的cache就完成了。
上面这样Memo的实现使用了一个互斥量来保护多个goroutine调用Get时的共享map变量。不妨把这种设计和前面提到的把map变量限制在一个单独的monitor goroutine的方案做一些对比后者在调用Get时需要发消息。
Func、result和entry的声明和之前保持一致
```go
// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)
// A result is the result of calling a Func.
type result struct {
value interface{}
err error
}
type entry struct {
res result
ready chan struct{} // closed when res is ready
}
```
然而Memo类型现在包含了一个叫做requests的channelGet的调用方用这个channel来和monitor goroutine来通信。requests channel中的元素类型是request。Get的调用方会把这个结构中的两组key都填充好实际上用这两个变量来对函数进行缓存的。另一个叫response的channel会被拿来发送响应结果。这个channel只会传回一个单独的值。
<u><i>gopl.io/ch9/memo5</i></u>
```go
// A request is a message requesting that the Func be applied to key.
type request struct {
key string
response chan<- result // the client wants a single result
}
type Memo struct{ requests chan request }
// New returns a memoization of f. Clients must subsequently call Close.
func New(f Func) *Memo {
memo := &Memo{requests: make(chan request)}
go memo.server(f)
return memo
}
func (memo *Memo) Get(key string) (interface{}, error) {
response := make(chan result)
memo.requests <- request{key, response}
res := <-response
return res.value, res.err
}
func (memo *Memo) Close() { close(memo.requests) }
```
上面的Get方法会创建一个response channel把它放进request结构中然后发送给monitor goroutine然后马上又会接收它。
cache变量被限制在了monitor goroutine ``(*Memo).server`中下面会看到。monitor会在循环中一直读取请求直到request channel被Close方法关闭。每一个请求都会去查询cache如果没有找到条目的话那么就会创建/插入一个新的条目。
```go
func (memo *Memo) server(f Func) {
cache := make(map[string]*entry)
for req := range memo.requests {
e := cache[req.key]
if e == nil {
// This is the first request for this key.
e = &entry{ready: make(chan struct{})}
cache[req.key] = e
go e.call(f, req.key) // call f(key)
}
go e.deliver(req.response)
}
}
func (e *entry) call(f Func, key string) {
// Evaluate the function.
e.res.value, e.res.err = f(key)
// Broadcast the ready condition.
close(e.ready)
}
func (e *entry) deliver(response chan<- result) {
// Wait for the ready condition.
<-e.ready
// Send the result to the client.
response <- e.res
}
```
和基于互斥量的版本类似第一个对某个key的请求需要负责去调用函数f并传入这个key将结果存在条目里并关闭ready channel来广播条目的ready消息。使用`(*entry).call`来完成上述工作。
紧接着对同一个key的请求会发现map中已经有了存在的条目然后会等待结果变为ready并将结果从response发送给客户端的goroutine。上述工作是用`(*entry).deliver`来完成的。对call和deliver方法的调用必须让它们在自己的goroutine中进行以确保monitor goroutines不会因此而被阻塞住而没法处理新的请求。
这个例子说明我们无论用上锁,还是通信来建立并发程序都是可行的。
上面的两种方案并不好说特定情境下哪种更好不过了解他们还是有价值的。有时候从一种方式切换到另一种可以使你的代码更为简洁。译注不是说好的golang推崇通信并发么。
**练习 9.3** 扩展Func类型和`(*Memo).Get`方法支持调用方提供一个可选的done channel使其具备通过该channel来取消整个操作的能力§8.9。一个被取消了的Func的调用结果不应该被缓存。