Go組件學習——手寫連接池並沒有那麼簡單

  • 2019 年 10 月 3 日
  • 筆記

1、背景

前段時間在看gorm,發現gorm是復用database/sql的連接池。

於是翻了下database/sql的數據庫連接池的代碼實現,看完代碼,好像也不是很複雜,但是總覺得理解不夠深刻,於是萌生了自己想寫個連接池的想法。(最後也驗證了,看源碼的理解確實不夠深刻,一看就會,一做就跪)

2、連接池的實現原理

什麼是連接池

  • 顧名思義是一個池子
  • 池子裏面存放有限數量即時可用的連接,減少創建連接和關閉連接的時間
  • 連接是有存活時間的

具體到數據庫連接池,我根據自己的理解畫了一張獲取連接的流程圖

從上圖我們可以看出,除了連接池的容量大小,我們還有一個最大連接數的限制。池子里的連接讓我們不用頻繁的創建和關閉連接,同時應該也要有最大連接的限制,避免無限制的創建連接導致服務器資源耗盡,拖垮服務不可用。

池子中的連接也有存活時間,如果超過存活時間則會銷毀連接。

3、實現連接池我們需要考慮哪些問題

3.1 功能點

  • 獲取連接

  • 釋放連接

  • Ping

  • 關閉連接池

  • 設置最大連接數和連接池容量(連接存活時間等等)

3.2 實現細節

  • 連接應該有哪些屬性,比如最大連接數、連接池容量、連接創建時間和存活時間
  • 如何模擬使用連接池以及超過最大連接數後等待其他連接釋放
  • 如何保證在多協程操作下數據的一致性
  • 如果實現連接的超時監聽和通知

4、具體實現

這裡的連接池實現包括

  • 設置最大連接數和連接池容量
  • 獲取連接
  • 釋放連接

4.1 結構定義

定義Conn結構體,這裡包含了幾乎所有的有關連接需要的信息屬性

type Conn struct {  	maxConn       int                     // 最大連接數  	maxIdle       int                     // 最大可用連接數  	freeConn      int                     // 線程池空閑連接數  	connPool      []int                   // 連接池  	openCount     int                     // 已經打開的連接數  	waitConn      map[int]chan Permission // 排隊等待的連接隊列  	waitCount     int                     // 等待個數  	lock          sync.Mutex              // 鎖  	nextConnIndex NextConnIndex						// 下一個連接的ID標識(用於區分每個ID)  	freeConns     map[int]Permission 			// 連接池的連接  }  

  

這裡並不會創建一個真正的數據庫連接,而是使用一個非空的Permission表示拿到了連接。拿到一個非空的Permission才有資格執行後面類似增刪改查的操作。

Permission對應的結構體如下

type Permission struct {  	NextConnIndex								 // 對應Conn中的NextConnIndex  	Content     string					 // 通行證的具體內容,比如"PASSED"表示成功獲取  	CreatedAt   time.Time				 // 創建時間,即連接的創建時間  	MaxLifeTime time.Duration    // 連接的存活時間,本次沒有用到這個屬性,保留  }  

  

NextConnIndex對應的結構體如下

type NextConnIndex struct {  	Index int  }  

  

還有一個用來設置最大連接數以及連接池最大連接數的Config

type Config struct {  	MaxConn int  	MaxIdle int  }  

  

4.2 初始化連接池參數

func Prepare(ctx context.Context, config *Config) (conn *Conn) {  	// go func() {  		//for {  		//conn.expiredCh = make(chan string, len(conn.freeConns))  		//for _, value := range conn.freeConns {  		//	if value.CreatedAt.Add(value.MaxLifeTime).Before(nowFunc()) {  		//		conn.expiredCh <- "CLOSE"  		//	}  		//}  	// }()  	return &Conn{  		maxConn:   config.MaxConn,  		maxIdle:   config.MaxIdle,  		openCount: 0,  		connPool:  []int{},  		waitConn:  make(map[int]chan Permission),  		waitCount: 0,  		freeConns: make(map[int]Permission),  	}  }  

  

這裡主要是初始化上面的Conn結構體參數。

注釋的部分,主要想通過啟動一個監聽協程,用於監聽已經過期的連接,並通過channel發送。(這塊還有一些細節沒有想清楚,先擱置)

4.3 設置MaxConn和MaxIdle

在main.go中添加代碼

ctx := context.Background()  	config := &custom_pool.Config{  		MaxConn: 2,  		MaxIdle: 1,  	}  

  

這裡意味連接池只能緩存一個連接,最大新建連接數為2,超過則要加入等待隊列。

4.4 獲取連接

// 創建連接  func (conn *Conn) New(ctx context.Context) (permission Permission, err error) {  	/**  	1、如果當前連接池已滿,即len(freeConns)=0  	2、判定openConn是否大於maxConn,如果大於,則丟棄獲取加入隊列進行等待  	3、如果小於,則考慮創建新連接  	*/  	conn.lock.Lock()    	select {  	default:  	case <-ctx.Done():	// context取消或超時,則退出  		conn.lock.Unlock()    		return Permission{}, errors.New("new conn failed, context cancelled!")  	}      // 連接池不為空,從連接池獲取連接  	if len(conn.freeConns) > 0 {  		var (  			popPermission Permission  			popReqKey     int  		)        // 獲取其中一個連接  		for popReqKey, popPermission = range conn.freeConns {  			break  		}      // 從連接池刪除  		delete(conn.freeConns, popReqKey)  		fmt.Println("log", "use free conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns)  			conn.lock.Unlock()  		return popPermission, nil  	}    	if conn.openCount >= conn.maxConn { // 當前連接數大於上限,則加入等待隊列  		nextConnIndex := getNextConnIndex(conn)    		req := make(chan Permission, 1)  		conn.waitConn[nextConnIndex] = req  		conn.waitCount++  		conn.lock.Unlock()    		select {        // 如果在等待指定超時時間後,仍然無法獲取釋放連接,則放棄獲取連接,這裡如果不在超時時間後退出會一直阻塞  		case <-time.After(time.Second * time.Duration(3)):  			fmt.Println("超時,通知主線程退出")  			return  		case ret, ok := <-req: // 有放回的連接, 直接拿來用  			if !ok {  				return Permission{}, errors.New("new conn failed, no available conn release")  			}  			fmt.Println("log", "received released conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns)  			return ret, nil  		}  		return Permission{}, errors.New("new conn failed")  	}    	// 新建連接  	conn.openCount++  	conn.lock.Unlock()  	permission = Permission{NextConnIndex: NextConnIndex{nextConnIndex},  		Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5}  	fmt.Println("log", "create conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns)  	return permission, nil  }  

  

這裡主要分為三個部分

  • 如果連接池不為空,則直接從池子裏面獲取連接使用即可

  • 如果連接池為空,且當前的連接數已經超過最大連接數maxConn,則會將當前任務加入等待隊列,同時監聽是否有釋放的可用連接,如果有則拿來直接用,如果超過指定等待時間後仍然取不到連接則退出阻塞返回。

  • 如果連接池為空,且尚未達到最大連接數maxConn,則新建一個新連接。

getNextConnIndex函數

func getNextConnIndex(conn *Conn) int {  	currentIndex := conn.nextConnIndex.Index  	conn.nextConnIndex.Index = currentIndex + 1  	return conn.nextConnIndex.Index  }  

  

4.5 釋放連接

// 釋放連接  func (conn *Conn) Release(ctx context.Context) (result bool, err error) {  	conn.lock.Lock()    // 如果等待隊列有等待任務,則通知正在阻塞等待獲取連接的進程(即New方法中"<-req"邏輯)    // 這裡沒有做指定連接的釋放,只是保證釋放的連接會被利用起來  	if len(conn.waitConn) > 0 {  		var req chan Permission  		var reqKey int  		for reqKey, req = range conn.waitConn {  			break  		}  		// 假定釋放的連接就是下面新建的連接  		permission := Permission{NextConnIndex: NextConnIndex{reqKey},  			Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5}  		req <- permission  		conn.waitCount--  		delete(conn.waitConn, reqKey)  		conn.lock.Unlock()  	} else {  		if conn.openCount > 0 {  			conn.openCount--    			if len(conn.freeConns) < conn.maxIdle {	// 確保連接池大小不會超過maxIdle  				nextConnIndex := getNextConnIndex(conn)  				permission := Permission{NextConnIndex: NextConnIndex{nextConnIndex},  					Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5}  				conn.freeConns[nextConnIndex] = permission  			}  		}  		conn.lock.Unlock()  	}  	return  }  

  

這裡主要分為兩部分

  • 如果釋放連接的時候發現等待隊列有任務在等待,則將釋放的連接通過channel發送,給正在等待連接釋放的阻塞任務使用,同時從等待隊列中刪除該任務。
  • 如果當前無等待任務,則將連接放入連接池

這裡的nowFunc

var nowFunc = time.Now  

  

5、Case模擬

5.1 無釋放創建連接

即只有創建連接,拿到連接也不會釋放連接

package main    import (  	"context"  	custom_pool "go-demo/main/src/custom-pool"  )    func main() {    	ctx := context.Background()  	config := &custom_pool.Config{  		MaxConn: 2,  		MaxIdle: 1,  	}  	conn := custom_pool.Prepare(ctx, config)  	if _, err := conn.New(ctx); err != nil {  		return  	}  	if _, err := conn.New(ctx); err != nil {  		return  	}  	if _, err := conn.New(ctx); err != nil {  		return  	}  	if _, err := conn.New(ctx); err != nil {  		return  	}  	if _, err := conn.New(ctx); err != nil {  		return  	}  }  

  

執行結果如下

注意上面代碼都是一直在獲取連接,在獲取連接後沒有釋放連接。

第一次獲取,連接池為空,則新建連接

第二次獲取,連接池為空,繼續新建連接

第三次獲取,連接池為空,同時已有連接數>=maxConn,所以會阻塞等待釋放連接,但是因為沒有連接釋放,所以一直等待,直到3秒超時後退出。

所以第三次、第四次和第五次都是超時退出

5.2 釋放連接

如果我們釋放連接會怎麼樣,我們可以通過新啟一個協程用於釋放一個連接如下

package main    import (  	"context"  	custom_pool "go-demo/main/src/custom-pool"  )    func main() {    	ctx := context.Background()  	config := &custom_pool.Config{  		MaxConn: 2,  		MaxIdle: 1,  	}  	conn := custom_pool.Prepare(ctx, config)  	if _, err := conn.New(ctx); err != nil {  		return  	}  	if _, err := conn.New(ctx); err != nil {  		return  	}  	go conn.Release(ctx)  	if _, err := conn.New(ctx); err != nil {  		return  	}  	if _, err := conn.New(ctx); err != nil {  		return  	}  	if _, err := conn.New(ctx); err != nil {  		return  	}  }  

  

執行結果如下

log create conn!!!!! openCount:  1  freeConns:  map[]  log create conn!!!!! openCount:  2  freeConns:  map[]  log received released conn!!!!! openCount:  2  freeConns:  map[]  超時,通知主線程退出  超時,通知主線程退出  

  

前兩次和上面一樣,但是第三次獲取的時候,會收到一個釋放的連接,所以可以直接復用釋放的連接返回。

但是第四次和第五次創建,因為沒有釋放的連接,所以都會因為等待超時後退出。

5.3 使用連接池

上面的兩個case是在MaxConn=2,MaxIdle=1的情況下執行的。

下面我們看看如果基於以上兩個參數設定,模擬出正好使用連接池的情況。

package main    import (  	"context"  	custom_pool "go-demo/main/src/custom-pool"  )    func main() {    	ctx := context.Background()  	config := &custom_pool.Config{  		MaxConn: 2,  		MaxIdle: 1,  	}  	conn := custom_pool.Prepare(ctx, config)  	if _, err := conn.New(ctx); err != nil {  		return  	}  	go conn.Release(ctx)  	if _, err := conn.New(ctx); err != nil {  		return  	}  	go conn.Release(ctx)  	if _, err := conn.New(ctx); err != nil {  		return  	}  	go conn.Release(ctx)  	if _, err := conn.New(ctx); err != nil {  		return  	}  	go conn.Release(ctx)  	if _, err := conn.New(ctx); err != nil {  		return  	}  }  

  

即除了第一次,後面都會有連接釋放。

執行結果可能情況如下

log create conn!!!!! openCount:  1  freeConns:  map[]  log create conn!!!!! openCount:  2  freeConns:  map[]  log use free conn!!!!! openCount:  1  freeConns:  map[]  log use free conn!!!!! openCount:  0  freeConns:  map[]  log create conn!!!!! openCount:  1  freeConns:  map[]  

  

從執行結果可以看出,這裡有兩次使用了連接池中的連接。

注意:因為釋放是新啟協程執行,所以無法保證執行順序,不同的執行順序,會有不同的執行結果。上面只是執行結果的一種。

以上完整代碼參見https://github.com/DMinerJackie/go-demo/tree/master/main/src/custom-pool

6、總結和展望

6.1 總結

  • 通過手寫連接池加深對於連接池實現的理解
  • 學會使用channel和協程
  • 學會如何在channel阻塞指定時間後退出(設立超時時間)
  • 學會對於共享資源加鎖,比如nextConnIndex的獲取和更新需要加鎖

6.2 展望

  • Close和Ping沒有寫(實現不難)
  • 連接池連接需要有存活時間,並在連接過期的時候從連接池刪除
  • 實現使用的是普通的map集合,可以考慮並發安全的syncMap
  • 代碼實現比較簡陋不夠優雅,可以繼續完善保證職責單一

如果您覺得閱讀本文對您有幫助,請點一下“推薦”按鈕,您的“推薦”將是我最大的寫作動力!如果您想持續關注我的文章,請掃描二維碼,關注JackieZheng的微信公眾號,我會將我的文章推送給您,並和您一起分享我日常閱讀過的優質文章。