From 78475986df2784f3a34374e356ac100e40019700 Mon Sep 17 00:00:00 2001 From: ehlxr Date: Fri, 22 Dec 2017 17:44:04 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AB=99=E7=82=B9=E6=9B=B4=E6=96=B0=EF=BC=9A20?= =?UTF-8?q?17-12-22=2017:44:04?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pool.go | 77 +++++++++++++++++++++++++++++++++++ common/pool_test.go | 72 ++++++++++++++++++++++++++++++++ common/runner.go | 2 +- common/runner_test.go | 2 +- common/sync/sync_pool_test.go | 61 +++++++++++++++++++++++++++ common/sync_map.go | 49 ++++++++++++++++++++++ 6 files changed, 261 insertions(+), 2 deletions(-) create mode 100644 common/pool.go create mode 100644 common/pool_test.go create mode 100644 common/sync/sync_pool_test.go create mode 100644 common/sync_map.go diff --git a/common/pool.go b/common/pool.go new file mode 100644 index 0000000..707c3df --- /dev/null +++ b/common/pool.go @@ -0,0 +1,77 @@ +package common + +import ( + "errors" + "io" + "log" + "sync" +) + +//一个安全的资源池,被管理的资源必须都实现io.Close接口 +type Pool struct { + m sync.Mutex + res chan io.Closer + factory func() (io.Closer, error) + closed bool +} + +var ErrPoolClosed = errors.New("资源池已经被关闭。") + +//创建一个资源池 +func NewPool(fn func() (io.Closer, error), size uint) (*Pool, error) { + if size <= 0 { + return nil, errors.New("size的值太小了。") + } + return &Pool{ + factory: fn, + res: make(chan io.Closer, size), + }, nil +} + +//从资源池里获取一个资源 +func (p *Pool) Acquire() (io.Closer, error) { + select { + case r, ok := <-p.res: + log.Println("Acquire:共享资源") + if !ok { + return nil, ErrPoolClosed + } + return r, nil + default: + log.Println("Acquire:新生成资源") + return p.factory() + } +} + +//关闭资源池,释放资源 +func (p *Pool) Close() { + p.m.Lock() + defer p.m.Unlock() + if p.closed { + return + } + p.closed = true + //关闭通道,不让写入了 + close(p.res) + //关闭通道里的资源 + for r := range p.res { + r.Close() + } +} +func (p *Pool) Release(r io.Closer) { + //保证该操作和Close方法的操作是安全的 + p.m.Lock() + defer p.m.Unlock() + //资源池都关闭了,就省这一个没有释放的资源了,释放即可 + if p.closed { + r.Close() + return + } + select { + case p.res <- r: + log.Println("资源释放到池子里了") + default: + log.Println("资源池满了,释放这个资源吧") + r.Close() + } +} diff --git a/common/pool_test.go b/common/pool_test.go new file mode 100644 index 0000000..cfa35e5 --- /dev/null +++ b/common/pool_test.go @@ -0,0 +1,72 @@ +package common + +import ( + "io" + "log" + "math/rand" + "sync" + "sync/atomic" + "testing" + "time" +) + +const ( + //模拟的最大goroutine + maxGoroutine = 5 + //资源池的大小 + poolRes = 2 +) + +func TestPool(t *testing.T) { + //等待任务完成 + var wg sync.WaitGroup + wg.Add(maxGoroutine) + p, err := NewPool(createConnection, poolRes) + if err != nil { + log.Println(err) + return + } + //模拟好几个goroutine同时使用资源池查询数据 + for query := 0; query < maxGoroutine; query++ { + go func(q int) { + dbQuery(q, p) + wg.Done() + }(query) + } + wg.Wait() + log.Println("开始关闭资源池") + p.Close() +} + +//模拟数据库查询 +func dbQuery(query int, pool *Pool) { + conn, err := pool.Acquire() + if err != nil { + log.Println(err) + return + } + defer pool.Release(conn) + //模拟查询 + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + log.Printf("第%d个查询,使用的是ID为%d的数据库连接", query, conn.(*dbConnection).ID) +} + +//数据库连接 +type dbConnection struct { + ID int32 //连接的标志 +} + +//实现io.Closer接口 +func (db *dbConnection) Close() error { + log.Println("关闭连接", db.ID) + return nil +} + +var idCounter int32 + +//生成数据库连接的方法,以供资源池使用 +func createConnection() (io.Closer, error) { + //并发安全,给数据库连接生成唯一标志 + id := atomic.AddInt32(&idCounter, 1) + return &dbConnection{id}, nil +} diff --git a/common/runner.go b/common/runner.go index 28c4d08..21bbe9b 100644 --- a/common/runner.go +++ b/common/runner.go @@ -19,7 +19,7 @@ type Runner struct { interrupt chan os.Signal //可以控制强制终止的信号 } -func New(tm time.Duration) *Runner { +func NewRunner(tm time.Duration) *Runner { return &Runner{ complete: make(chan error), timeout: time.After(tm), diff --git a/common/runner_test.go b/common/runner_test.go index 8a82b29..786cc3b 100644 --- a/common/runner_test.go +++ b/common/runner_test.go @@ -11,7 +11,7 @@ func TestRunner(t *testing.T) { log.Println("...开始执行任务...") timeout := 3 * time.Second - r := New(timeout) + r := NewRunner(timeout) r.Add(createTask(), createTask(), createTask()) if err := r.Start(); err != nil { switch err { diff --git a/common/sync/sync_pool_test.go b/common/sync/sync_pool_test.go new file mode 100644 index 0000000..e0ad4de --- /dev/null +++ b/common/sync/sync_pool_test.go @@ -0,0 +1,61 @@ +package sync + +import ( + "log" + "math/rand" + "sync" + "sync/atomic" + "testing" + "time" +) + +const ( + //模拟的最大goroutine + maxGoroutine = 5 +) + +func TestPool(t *testing.T) { + //等待任务完成 + var wg sync.WaitGroup + wg.Add(maxGoroutine) + p := &sync.Pool{ + New: createConnection, + } + //模拟好几个goroutine同时使用资源池查询数据 + for query := 0; query < maxGoroutine; query++ { + go func(q int) { + dbQuery(q, p) + wg.Done() + }(query) + } + wg.Wait() +} + +//模拟数据库查询 +func dbQuery(query int, pool *sync.Pool) { + conn := pool.Get().(*dbConnection) + defer pool.Put(conn) + //模拟查询 + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + log.Printf("第%d个查询,使用的是ID为%d的数据库连接", query, conn.ID) +} + +//数据库连接 +type dbConnection struct { + ID int32 //连接的标志 +} + +//实现io.Closer接口 +func (db *dbConnection) Close() error { + log.Println("关闭连接", db.ID) + return nil +} + +var idCounter int32 + +//生成数据库连接的方法,以供资源池使用 +func createConnection() interface{} { + //并发安全,给数据库连接生成唯一标志 + id := atomic.AddInt32(&idCounter, 1) + return &dbConnection{ID: id} +} diff --git a/common/sync_map.go b/common/sync_map.go new file mode 100644 index 0000000..b4072cc --- /dev/null +++ b/common/sync_map.go @@ -0,0 +1,49 @@ +package common + +import ( + "sync" +) + +//安全的Map +type SynchronizedMap struct { + rw *sync.RWMutex + data map[interface{}]interface{} +} + +//存储操作 +func (sm *SynchronizedMap) Put(k, v interface{}) { + sm.rw.Lock() + defer sm.rw.Unlock() + sm.data[k] = v +} + +//获取操作 +func (sm *SynchronizedMap) Get(k interface{}) interface{} { + sm.rw.RLock() + defer sm.rw.RUnlock() + return sm.data[k] +} + +//删除操作 +func (sm *SynchronizedMap) Delete(k interface{}) { + sm.rw.Lock() + defer sm.rw.Unlock() + delete(sm.data, k) +} + +//遍历Map,并且把遍历的值给回调函数,可以让调用者控制做任何事情 +func (sm *SynchronizedMap) Each(cb func(interface{}, interface{})) { + sm.rw.RLock() + defer sm.rw.RUnlock() + for k, v := range sm.data { + cb(k, v) + } +} + +//生成初始化一个SynchronizedMap +func NewSynchronizedMap() *SynchronizedMap { + return &SynchronizedMap{ + rw: new(sync.RWMutex), + data: make(map[interface{}]interface{}), + } +}