Heim >Backend-Entwicklung >Golang >Mehrere Implementierungsfälle zum Golang-Verbindungspool

Mehrere Implementierungsfälle zum Golang-Verbindungspool

藏色散人
藏色散人nach vorne
2020-12-14 14:41:442596Durchsuche

Die Kolumne „Golang-Tutorial“ stellt Ihnen mehrere Implementierungsfälle von „Golang“-Verbindungspooling vor. Ich hoffe, dass sie Freunden in Not hilfreich sein wird! Aufgrund des Drei-Wege-Handshakes von TCP und aus anderen Gründen ist der Verbindungsaufbau ein relativ kostspieliger Vorgang. Daher muss in einem Programm, das mehrmals mit einer bestimmten Entität interagieren muss, ein Verbindungspool mit wiederverwendbaren Verbindungen zur Wiederverwendung verwaltet werden.

Die grundlegendste Voraussetzung für die Aufrechterhaltung eines Verbindungspools ist: Mehrere Implementierungsfälle zum Golang-VerbindungspoolThread-Sicherheit

, insbesondere in einer Sprache wie Golang, deren Funktion Goroutine ist.


Implementieren eines einfachen Verbindungspools

type Pool struct {
    m sync.Mutex //保证多个goroutine访问时候,closed的线程安全
    res chan io.Closer //连接存储的chan
    factory func() (io.Closer,error) //新建连接的工厂方法
    closed bool //连接池关闭标志
}

Für diesen einfachen Verbindungspool verwenden wir chan, um die Verbindungen im Pool zu speichern. Die Methode zum Erstellen einer neuen Struktur ist relativ einfach:

func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
    if size <= 0 {
        return nil, errors.New("size的值太小了。")
    }
    return &Pool{
        factory: fn,
        res:     make(chan io.Closer, size),
    }, nil
}
Sie müssen lediglich die entsprechende Factory-Funktion und die Größe des Verbindungspools angeben.

Stellen Sie die Verbindung her

Wie gewinnen wir also Ressourcen daraus? Da die Struktur unserer internen Speicherverbindung Chan ist, benötigen wir nur eine einfache Auswahl, um die Thread-Sicherheit zu gewährleisten: vorbereitet. Die Fabrikfunktion führt die Konstruktionsverbindung durch. Wenn wir gleichzeitig die Verbindung von res erhalten, verwenden wir

ok

, um zunächst festzustellen, ob der Verbindungspool geschlossen wurde. Wenn es geschlossen wurde, geben wir den Fehler „Bereits vorbereitete Verbindung geschlossen“ zurück.

Schließen Sie den Verbindungspool

Da wir also das Schließen des Verbindungspools erwähnt haben, wie schließen wir den Verbindungspool?

//从资源池里获取一个资源
func (p *Pool) Acquire() (io.Closer,error) {
    select {
    case r,ok := <-p.res:
        log.Println("Acquire:共享资源")
        if !ok {
            return nil,ErrPoolClosed
        }
        return r,nil
    default:
        log.Println("Acquire:新生成资源")
        return p.factory()
    }
}
Hier müssen wir zuerst die Sperroperation

p.m.Lock()* ausführen. Dies liegt daran, dass wir das *closed

in der Struktur lesen und schreiben müssen. Sie müssen zuerst dieses Flag setzen und dann den Res-Chan schließen, damit die Acquire-Methode keine neuen Verbindungen mehr erhalten kann. Anschließend führen wir den Vorgang „Schließen“ für die Verbindung im Kanal „res“ durch.

Verbindung freigeben

Für das Freigeben der Verbindung ist zunächst eine Voraussetzung erforderlich, das heißt, der Verbindungspool wurde nicht geschlossen. Wenn der Verbindungspool geschlossen wurde und Sie eine Verbindung an

res

senden, wird eine Panik ausgelöst.

//关闭资源池,释放资源
func (p *Pool) Close() {
    p.m.Lock()
    defer p.m.Unlock()
    if p.closed {
        return
    }
    p.closed = true
    //关闭通道,不让写入了
    close(p.res)
    //关闭通道里的资源
    for r:=range p.res {
        r.Close()
    }
}
Das Obige ist eine einfache und threadsichere Verbindungspool-Implementierung. Was wir sehen können, ist, dass der Verbindungspool zwar jetzt implementiert ist, aber immer noch einige kleine Mängel aufweist:

Wir haben keine Begrenzung für die maximale Anzahl von Verbindungen. Wenn der Thread-Pool leer ist, erstellen wir direkt einen Neue Verbindung standardmäßig zurückgegeben. Sobald die Parallelität hoch ist, werden weiterhin neue Verbindungen erstellt, was leicht zu zu vielen Verbindungen-Fehlern führen kann (insbesondere bei MySQL).

Da wir die maximale Anzahl verfügbarer Verbindungen sicherstellen müssen, möchten wir nicht, dass die Anzahl zu starr ist. Wir hoffen, dass wir im Leerlauf eine bestimmte Anzahl inaktiver Verbindungen aufrechterhalten können, hoffen aber auch, dass wir die maximale Anzahl verfügbarer Verbindungen auf maxNum begrenzen können.

Die erste Situation ist zu viel Parallelität. Was ist, wenn die Parallelität zu gering ist? Nachdem wir nun eine neue Verbindung erstellt und zurückgegeben haben, verwenden wir diese Verbindung für längere Zeit nicht mehr. Dann dürfte diese Verbindung schon vor ein paar Stunden oder noch länger hergestellt worden sein. Wir können die Verfügbarkeit einer Verbindung, die längere Zeit nicht genutzt wurde, nicht garantieren. Es ist möglich, dass die Verbindung, die wir das nächste Mal erhalten, eine abgelaufene Verbindung ist.

Dann können wir uns die MySQL-Verbindungspoolbibliothek und die Redis-Verbindungspoolbibliothek ansehen, die bereits ausgereift eingesetzt wurden, um zu sehen, wie sie diese Probleme lösen.
    SQL-Verbindungspool der Golang-Standardbibliothek
  1. Der Verbindungspool von Golang ist unter der Standardbibliothek database/sql/sql.go implementiert. Wenn wir Folgendes ausführen: too many connections的报错发生。

  2. 既然我们需要保证最大可获取连接数量,那么我们就不希望数量定的太死。希望空闲的时候可以维护一定的空闲连接数量idleNum,但是又希望我们能限制最大可获取连接数量maxNum。

  3. 第一种情况是并发过多的情况,那么如果并发量过少呢?现在我们在新建一个连接并且归还后,我们很长一段时间不再使用这个连接。那么这个连接很有可能在几个小时甚至更长时间之前就已经建立的了。长时间闲置的连接我们并没有办法保证它的可用性。便有可能我们下次获取的连接是已经失效的连接。

那么我们可以从已经成熟使用的MySQL连接池库和Redis连接池库中看看,它们是怎么解决这些问题的。

Golang标准库的Sql连接池

Golang的连接池实现在标准库 database/sql/sql.go下。当我们运行:

func (p *Pool) Release(r io.Closer){
    //保证该操作和Close方法的操作是安全的
    p.m.Lock()
    defer p.m.Unlock()
    //资源池都关闭了,就省这一个没有释放的资源了,释放即可
    if p.closed {
        r.Close()
        return
    }
    select {
    case p.res <- r:
        log.Println("资源释放到池子里了")
    default:
        log.Println("资源池满了,释放这个资源吧")
        r.Close()
    }
}

的时候,就会打开一个连接池。我们可以看看返回的db的结构体:

db, err := sql.Open("mysql", "xxxx")

上面省去了一些暂时不需要关注的field。我们可以看的,DB这个连接池内部存储连接的结构freeConn,并不是我们之前使用的chan,而是[]*driverConn,一个连接切片。同时我们还可以看到,里面有maxIdle等相关变量来控制空闲连接数量。值得注意的是,DB的初始化函数Open函数并没有新建数据库连接。而新建连接在哪个函数呢?我们可以在Query方法一路往回找,我们可以看到这个函数: func(db*DB)conn(ctx context.Context,strategy connReuseStrategy)(*driverConn,error)

type DB struct {
    waitDuration int64 // Total time waited for new connections.
    mu           sync.Mutex // protects following fields
    freeConn     []*driverConn
    connRequests map[uint64]chan connRequest
    nextRequest  uint64 // Next key to use in connRequests.
    numOpen      int    // number of opened and pending open connections
    // Used to signal the need for new connections
    // a goroutine running connectionOpener() reads on this chan and
    // maybeOpenNewConnections sends on the chan (one send per needed connection)
    // It is closed during db.Close(). The close tells the connectionOpener
    // goroutine to exit.
    openerCh          chan struct{}
    closed            bool
    maxIdle           int                    // zero means defaultMaxIdleConns; negative means 0
    maxOpen           int                    // <= 0 means unlimited
    maxLifetime       time.Duration          // maximum amount of time a connection may be reused
    cleanerCh         chan struct{}
    waitCount         int64 // Total number of connections waited for.
    maxIdleClosed     int64 // Total number of connections closed due to idle.
    maxLifetimeClosed int64 // Total number of connections closed due to max free limit.
}

, wird ein Verbindungspool geöffnet. Wir können einen Blick auf die Struktur der zurückgegebenen 🎜db🎜 werfen: 🎜
// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
    // 先判断db是否已经关闭。
    db.mu.Lock()
    if db.closed {
        db.mu.Unlock()
        return nil, errDBClosed
    }
    // 注意检测context是否已经被超时等原因被取消。
    select {
    default:
    case <-ctx.Done():
        db.mu.Unlock()
        return nil, ctx.Err()
    }
    lifetime := db.maxLifetime
    // 这边如果在freeConn这个切片有空闲连接的话,就left pop一个出列。注意的是,这边因为是切片操作,所以需要前面需要加锁且获取后进行解锁操作。同时判断返回的连接是否已经过期。
    numFree := len(db.freeConn)
    if strategy == cachedOrNewConn && numFree > 0 {
        conn := db.freeConn[0]
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        conn.inUse = true
        db.mu.Unlock()
        if conn.expired(lifetime) {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        // Lock around reading lastErr to ensure the session resetter finished.
        conn.Lock()
        err := conn.lastErr
        conn.Unlock()
        if err == driver.ErrBadConn {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        return conn, nil
    }
    // 这边就是等候获取连接的重点了。当空闲的连接为空的时候,这边将会新建一个request(的等待连接 的请求)并且开始等待
    if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        // 下面的动作相当于往connRequests这个map插入自己的号码牌。
        // 插入号码牌之后这边就不需要阻塞等待继续往下走逻辑。
        req := make(chan connRequest, 1)
        reqKey := db.nextRequestKeyLocked()
        db.connRequests[reqKey] = req
        db.waitCount++
        db.mu.Unlock()
        waitStart := time.Now()
        // Timeout the connection request with the context.
        select {
        case <-ctx.Done():
            // context取消操作的时候,记得从connRequests这个map取走自己的号码牌。
            db.mu.Lock()
            delete(db.connRequests, reqKey)
            db.mu.Unlock()
            atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
            select {
            default:
            case ret, ok := <-req:
                // 这边值得注意了,因为现在已经被context取消了。但是刚刚放了自己的号码牌进去排队里面。意思是说不定已经发了连接了,所以得注意归还!
                if ok && ret.conn != nil {
                    db.putConn(ret.conn, ret.err, false)
                }
            }
            return nil, ctx.Err()
        case ret, ok := <-req:
            // 下面是已经获得连接后的操作了。检测一下获得连接的状况。因为有可能已经过期了等等。
            atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
            if !ok {
                return nil, errDBClosed
            }
            if ret.err == nil && ret.conn.expired(lifetime) {
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            if ret.conn == nil {
                return nil, ret.err
            }
            ret.conn.Lock()
            err := ret.conn.lastErr
            ret.conn.Unlock()
            if err == driver.ErrBadConn {
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            return ret.conn, ret.err
        }
    }
    // 下面就是如果上面说的限制情况不存在,可以创建先连接时候,要做的创建连接操作了。
    db.numOpen++ // optimistically
    db.mu.Unlock()
    ci, err := db.connector.Connect(ctx)
    if err != nil {
        db.mu.Lock()
        db.numOpen-- // correct for earlier optimism
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        return nil, err
    }
    db.mu.Lock()
    dc := &driverConn{
        db:        db,
        createdAt: nowFunc(),
        ci:        ci,
        inUse:     true,
    }
    db.addDepLocked(dc, dc)
    db.mu.Unlock()
    return dc, nil
}
复制代码
🎜 Einige Felder, die vorerst keine Aufmerksamkeit erfordern, werden weggelassen. Wir können sehen, dass die Struktur 🎜freeConn🎜 im DB-Verbindungspool zum Speichern von Verbindungen nicht der Kanal ist, den wir zuvor verwendet haben, sondern 🎜[]*driverConn🎜, ein Verbindungs-Slice. Gleichzeitig können wir auch sehen, dass es verwandte Variablen wie 🎜maxIdle🎜 gibt, um die Anzahl der inaktiven Verbindungen zu steuern. Es ist erwähnenswert, dass die Initialisierungsfunktion 🎜Open🎜 von 🎜DB🎜 keine neue Datenbankverbindung erstellt. Mit welcher Funktion wird eine neue Verbindung erstellt? Wir können ganz zurück in die Methode 🎜Query🎜 gehen und diese Funktion sehen: func(db*DB)conn(ctx context.Context,strategy connReuseStrategy)(*driverConn,error) . Und unsere Methode zum Abrufen von Verbindungen aus dem Verbindungspool beginnt hier: 🎜

获取连接

// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
    // 先判断db是否已经关闭。
    db.mu.Lock()
    if db.closed {
        db.mu.Unlock()
        return nil, errDBClosed
    }
    // 注意检测context是否已经被超时等原因被取消。
    select {
    default:
    case <-ctx.Done():
        db.mu.Unlock()
        return nil, ctx.Err()
    }
    lifetime := db.maxLifetime
    // 这边如果在freeConn这个切片有空闲连接的话,就left pop一个出列。注意的是,这边因为是切片操作,所以需要前面需要加锁且获取后进行解锁操作。同时判断返回的连接是否已经过期。
    numFree := len(db.freeConn)
    if strategy == cachedOrNewConn && numFree > 0 {
        conn := db.freeConn[0]
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        conn.inUse = true
        db.mu.Unlock()
        if conn.expired(lifetime) {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        // Lock around reading lastErr to ensure the session resetter finished.
        conn.Lock()
        err := conn.lastErr
        conn.Unlock()
        if err == driver.ErrBadConn {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        return conn, nil
    }
    // 这边就是等候获取连接的重点了。当空闲的连接为空的时候,这边将会新建一个request(的等待连接 的请求)并且开始等待
    if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        // 下面的动作相当于往connRequests这个map插入自己的号码牌。
        // 插入号码牌之后这边就不需要阻塞等待继续往下走逻辑。
        req := make(chan connRequest, 1)
        reqKey := db.nextRequestKeyLocked()
        db.connRequests[reqKey] = req
        db.waitCount++
        db.mu.Unlock()
        waitStart := time.Now()
        // Timeout the connection request with the context.
        select {
        case <-ctx.Done():
            // context取消操作的时候,记得从connRequests这个map取走自己的号码牌。
            db.mu.Lock()
            delete(db.connRequests, reqKey)
            db.mu.Unlock()
            atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
            select {
            default:
            case ret, ok := <-req:
                // 这边值得注意了,因为现在已经被context取消了。但是刚刚放了自己的号码牌进去排队里面。意思是说不定已经发了连接了,所以得注意归还!
                if ok && ret.conn != nil {
                    db.putConn(ret.conn, ret.err, false)
                }
            }
            return nil, ctx.Err()
        case ret, ok := <-req:
            // 下面是已经获得连接后的操作了。检测一下获得连接的状况。因为有可能已经过期了等等。
            atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
            if !ok {
                return nil, errDBClosed
            }
            if ret.err == nil && ret.conn.expired(lifetime) {
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            if ret.conn == nil {
                return nil, ret.err
            }
            ret.conn.Lock()
            err := ret.conn.lastErr
            ret.conn.Unlock()
            if err == driver.ErrBadConn {
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            return ret.conn, ret.err
        }
    }
    // 下面就是如果上面说的限制情况不存在,可以创建先连接时候,要做的创建连接操作了。
    db.numOpen++ // optimistically
    db.mu.Unlock()
    ci, err := db.connector.Connect(ctx)
    if err != nil {
        db.mu.Lock()
        db.numOpen-- // correct for earlier optimism
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        return nil, err
    }
    db.mu.Lock()
    dc := &driverConn{
        db:        db,
        createdAt: nowFunc(),
        ci:        ci,
        inUse:     true,
    }
    db.addDepLocked(dc, dc)
    db.mu.Unlock()
    return dc, nil
}
复制代码

简单来说,DB结构体除了用的是slice来存储连接,还加了一个类似排队机制的connRequests来解决获取等待连接的过程。同时在判断连接健康性都有很好的兼顾。那么既然有了排队机制,归还连接的时候是怎么做的呢?

释放连接

我们可以直接找到 func(db*DB)putConnDBLocked(dc*driverConn,err error)bool这个方法。就像注释说的,这个方法主要的目的是:

Satisfy a connRequest or put the driverConn in the idle pool and return true or return false.

我们主要来看看里面重点那几行:

...
    // 如果已经超过最大打开数量了,就不需要在回归pool了
    if db.maxOpen > 0 && db.numOpen > db.maxOpen {
        return false
    }
    // 这边是重点了,基本来说就是从connRequest这个map里面随机抽一个在排队等着的请求。取出来后发给他。就不用归还池子了。
    if c := len(db.connRequests); c > 0 {
        var req chan connRequest
        var reqKey uint64
        for reqKey, req = range db.connRequests {
            break
        }
        delete(db.connRequests, reqKey) // 删除这个在排队的请求。
        if err == nil {
            dc.inUse = true
        }
        // 把连接给这个正在排队的连接。
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    } else if err == nil && !db.closed {
        // 既然没人排队,就看看到了最大连接数目没有。没到就归还给freeConn。
        if db.maxIdleConnsLocked() > len(db.freeConn) {
            db.freeConn = append(db.freeConn, dc)
            db.startCleanerLocked()
            return true
        }
        db.maxIdleClosed++
    }
...

我们可以看到,当归还连接时候,如果有在排队轮候的请求就不归还给池子直接发给在轮候的人了。

现在基本就解决前面说的小问题了。不会出现连接太多导致无法控制too many connections的情况。也很好了维持了连接池的最小数量。同时也做了相关对于连接健康性的检查操作。

值得注意的是,作为标准库的代码,相关注释和代码都非常完美,真的可以看的神清气爽。

redis Golang实现的Redis客户端

这个Golang实现的Redis客户端,是怎么实现连接池的。这边的思路非常奇妙,还是能学习到不少好思路。当然了,由于代码注释比较少,啃起来第一下还是有点迷糊的。相关代码地址在https://github.com/go-redis/redis/blob/master/internal/pool/pool.go 可以看到。

而它的连接池结构如下

type ConnPool struct {
    ...
    queue chan struct{}
    connsMu      sync.Mutex
    conns        []*Conn
    idleConns    []*Conn
    poolSize     int
    idleConnsLen int
    stats Stats
    _closed  uint32 // atomic
    closedCh chan struct{}
}

我们可以看到里面存储连接的结构还是slice。但是我们可以重点看看 queueconnsidleConns这几个变量,后面会提及到。但是值得注意的是!我们可以看到,这里有两个[]*Conn结构:connsidleConns,那么问题来了:

到底连接存在哪里?

新建连接池连接

我们先从新建连接池连接开始看:

func NewConnPool(opt *Options) *ConnPool {
    ....
    p.checkMinIdleConns()
    if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
        go p.reaper(opt.IdleCheckFrequency)
    }
    ....
}

初始化连接池的函数有个和前面两个不同的地方。

  1. checkMinIdleConns方法,在连接池初始化的时候就会往连接池填满空闲的连接。

  2. go p.reaper(opt.IdleCheckFrequency)则会在初始化连接池的时候就会起一个go程,周期性的淘汰连接池里面要被淘汰的连接。

获取连接

func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
    if p.closed() {
        return nil, ErrClosed
    }
    //这边和前面sql获取连接函数的流程不同。sql是先看看连接池有没有空闲连接,有的话先获取不到再排队。这边是直接先排队获取令牌,排队函数后面会分析。
    err := p.waitTurn(ctx)
    if err != nil {
        return nil, err
    }
    //前面没出error的话,就已经排队轮候到了。接下来就是获取的流程。
    for {
        p.connsMu.Lock()
        //从空闲连接里面先获取一个空闲连接。
        cn := p.popIdle()
        p.connsMu.Unlock()
        if cn == nil {
            // 没有空闲连接时候直接跳出循环。
            break
        }
        // 判断是否已经过时,是的话close掉了然后继续取出。
        if p.isStaleConn(cn) {
            _ = p.CloseConn(cn)
            continue
        }
        atomic.AddUint32(&p.stats.Hits, 1)
        return cn, nil
    }
    atomic.AddUint32(&p.stats.Misses, 1)
    // 如果没有空闲连接的话,这边就直接新建连接了。
    newcn, err := p.newConn(ctx, true)
    if err != nil {
        // 归还令牌。
        p.freeTurn()
        return nil, err
    }
    return newcn, nil
}

我们可以试着回答开头那个问题:连接到底存在哪里?答案是从 cn:=p.popIdle()这句话可以看出,获取连接这个动作,是从 idleConns里面获取的,而里面的函数也证明了这一点。同时我的理解是:

  1. sql的排队意味着我对连接池申请连接后,把自己的编号告诉连接池。连接那边一看到有空闲了,就叫我的号。我答应了一声,然后连接池就直接给个连接给我。我如果不归还,连接池就一直不叫下一个号。

  2. redis这边的意思是,我去和连接池申请的不是连接而是令牌。我就一直排队等着,连接池给我令牌了,我才去仓库里面找空闲连接或者自己新建一个连接。用完了连接除了归还连接外,还得归还令牌。当然了,如果我自己新建连接出错了,我哪怕拿不到连接回家,我也得把令牌给回连接池,不然连接池的令牌数少了,最大连接数也会变小。

而:

func (p *ConnPool) freeTurn() {
    <-p.queue
}
func (p *ConnPool) waitTurn(ctx context.Context) error {
...
    case p.queue <- struct{}{}:
        return nil
...
}

就是在靠queue这个chan来维持令牌数量。

那么 conns的作用是什么呢?我们可以来看看新建连接这个函数:

新建连接

func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
    cn, err := p.dialConn(ctx, pooled)
    if err != nil {
        return nil, err
    }
    p.connsMu.Lock()
    p.conns = append(p.conns, cn)
    if pooled {
        // 如果连接池满了,会在后面移除。
        if p.poolSize >= p.opt.PoolSize {
            cn.pooled = false
        } else {
            p.poolSize++
        }
    }
    p.connsMu.Unlock()
    return cn, nil
}

基本逻辑出来了。就是如果新建连接的话,我并不会直接放在 idleConns里面,而是先放 conns里面。同时先看池子满了没有。满的话后面归还的时候会标记,后面会删除。那么这个后面会删除,指的是什么时候呢?那就是下面说的归还连接的时候了。

归还连接

func (p *ConnPool) Put(cn *Conn) {
    if cn.rd.Buffered() > 0 {
        internal.Logger.Printf("Conn has unread data")
        p.Remove(cn, BadConnError{})
        return
    }
    //这就是我们刚刚说的后面了,前面标记过不要入池的,这边就删除了。当然了,里面也会进行freeTurn操作。
    if !cn.pooled {
        p.Remove(cn, nil)
        return
    }
    p.connsMu.Lock()
    p.idleConns = append(p.idleConns, cn)
    p.idleConnsLen++
    p.connsMu.Unlock()
    //我们可以看到很明显的这个归还号码牌的动作。
    p.freeTurn()
}

其实归还的过程,就是从 conns转移到 idleConns的过程。当然了,如果新建这个连接时候发现已经 超卖了,后面归还时候就不转移,直接删除了。

等等,上面的逻辑似乎有点不对?我们来理一下获取连接流程:

  1. 先 waitTurn,拿到令牌。而令牌数量是根据pool里面的 queue决定的。

  2. 拿到令牌了,去库房 idleConns里面拿空闲的连接。没有的话就自己 newConn一个,并且把他记录到 conns里面。

  3. Wenn Sie mit der Verwendung fertig sind, rufen Sie put auf, um zurückzukehren: das heißt, von conns nach idleConns zu übertragen. Überprüfen Sie bei der Rücksendung, ob bei newConn die Überverkauft-Markierung vorgenommen wurde. Wenn ja, wird es nicht an idleConns übertragen. put归还:也就是从 conns转移到 idleConns。归还的时候就检查在 newConn时候是不是已经做了超卖标记了。是的话就不转移到 idleConns

我当时疑惑了好久,既然始终都需要获得令牌才能得到连接,令牌数量是定的。为什么还会超卖呢?翻了一下源码,我的答案是:

虽然 Get方法获取连接是 newConn这个私用方法,受到令牌管制导致不会出现超卖。但是这个方法接受传参:pooledbool。所以我猜是担心其他人调用这个方法时候,不管三七二十一就传了true,导致poolSize越来越大。

总的来说,redis这个连接池的连接数控制,还是在 queue

Ich war lange verwirrt. Da man für eine Verbindung immer einen Token benötigt, ist die Anzahl der Token festgelegt. Warum ist es immer noch überverkauft? Nachdem ich den Quellcode durchgesehen habe, lautet meine Antwort:

Obwohl die Get-Methode zum Erhalten der Verbindung eine private Methode von newConn ist, unterliegt sie der Token-Kontrolle und wird dies tun nicht zu einem Überverkauf führen. Aber diese Methode akzeptiert Parameter: pooledbool. Daher mache ich mir wohl Sorgen, dass andere Leute, wenn sie diese Methode aufrufen, unabhängig von der Situation true übergeben, was dazu führt, dass poolSize immer größer wird.

Im Allgemeinen wird die Anzahl der Verbindungen im Redis-Verbindungspool durch den queue-Kanal gesteuert, den ich Token nenne.

Zusammenfassung


Wie Sie oben sehen können, ist die Thread-Sicherheit beim Herstellen der Verbindung die grundlegendste Garantie des Verbindungspools. Bei der Implementierung vieler zusätzlicher Funktionen werden diese jedoch aus unterschiedlichen Blickwinkeln implementiert. Immer noch sehr interessant. Aber unabhängig davon, ob die Speicherstruktur Chan oder Slice ist, kann dies sehr gut erreicht werden. Wenn Sie Slice zum Speichern von Verbindungen wie SQL oder Redis verwenden, müssen Sie eine Struktur beibehalten, um den Effekt der Warteschlange darzustellen.

🎜🎜Autor dieses Artikels: Xiao Ling für viel Glück Datum: 28.02.2020 Ursprünglicher Link: https://juejin.im/post/5e58e3b7f265da57537eb7ed🎜🎜🎜🎜🎜

Das obige ist der detaillierte Inhalt vonMehrere Implementierungsfälle zum Golang-Verbindungspool. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:csdn.net. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen