github.com/redis/go-redis/v9/internal/pool
No package summary is available.
Package
Files: 5. Third party imports: 0. Imports from organisation: 0. Tests: 0. Benchmarks: 0.
Constants
const (
stateDefault = 0
stateInited = 1
stateClosed = 2
)
Vars
var (
// ErrClosed performs any operation on the closed client will return this error.
ErrClosed = errors.New("redis: client is closed")
// ErrPoolExhausted is returned from a pool connection method
// when the maximum number of database connections in the pool has been reached.
ErrPoolExhausted = errors.New("redis: connection pool exhausted")
// ErrPoolTimeout timed out waiting to get a connection from the connection pool.
ErrPoolTimeout = errors.New("redis: connection pool timeout")
)
var _ Pooler = (*ConnPool)(nil)
var errUnexpectedRead = errors.New("unexpected read from socket")
var noDeadline = time.Time{}
var timers = sync.Pool{
New: func() interface{} {
t := time.NewTimer(time.Hour)
t.Stop()
return t
},
}
Types
BadConnError
This type doesn't have documentation.
type BadConnError struct {
wrapped error
}
Conn
This type doesn't have documentation.
type Conn struct {
usedAt int64 // atomic
netConn net.Conn
rd *proto.Reader
bw *bufio.Writer
wr *proto.Writer
Inited bool
pooled bool
createdAt time.Time
}
ConnPool
This type doesn't have documentation.
type ConnPool struct {
cfg *Options
dialErrorsNum uint32 // atomic
lastDialError atomic.Value
queue chan struct{}
connsMu sync.Mutex
conns []*Conn
idleConns []*Conn
poolSize int
idleConnsLen int
stats Stats
_closed uint32 // atomic
}
Options
This type doesn't have documentation.
type Options struct {
Dialer func(context.Context) (net.Conn, error)
PoolFIFO bool
PoolSize int
PoolTimeout time.Duration
MinIdleConns int
MaxIdleConns int
MaxActiveConns int
ConnMaxIdleTime time.Duration
ConnMaxLifetime time.Duration
}
Pooler
This type doesn't have documentation.
type Pooler interface {
NewConn(context.Context) (*Conn, error)
CloseConn(*Conn) error
Get(context.Context) (*Conn, error)
Put(context.Context, *Conn)
Remove(context.Context, *Conn, error)
Len() int
IdleLen() int
Stats() *Stats
Close() error
}
SingleConnPool
This type doesn't have documentation.
type SingleConnPool struct {
pool Pooler
cn *Conn
stickyErr error
}
Stats
Stats contains pool state information and accumulated stats.
type Stats struct {
Hits uint32 // number of times free connection was found in the pool
Misses uint32 // number of times free connection was NOT found in the pool
Timeouts uint32 // number of times a wait timeout occurred
TotalConns uint32 // number of total connections in the pool
IdleConns uint32 // number of idle connections in the pool
StaleConns uint32 // number of stale connections removed from the pool
}
StickyConnPool
This type doesn't have documentation.
type StickyConnPool struct {
pool Pooler
shared int32 // atomic
state uint32 // atomic
ch chan *Conn
_badConnError atomic.Value
}
lastDialErrorWrap
This type doesn't have documentation.
type lastDialErrorWrap struct {
err error
}
Functions
func NewConn
func NewConn(netConn net.Conn) *Conn {
cn := &Conn{
netConn: netConn,
createdAt: time.Now(),
}
cn.rd = proto.NewReader(netConn)
cn.bw = bufio.NewWriter(netConn)
cn.wr = proto.NewWriter(cn.bw)
cn.SetUsedAt(time.Now())
return cn
}
Cognitive complexity: 1
, Cyclomatic complexity: 1
func NewConnPool
func NewConnPool(opt *Options) *ConnPool {
p := &ConnPool{
cfg: opt,
queue: make(chan struct{}, opt.PoolSize),
conns: make([]*Conn, 0, opt.PoolSize),
idleConns: make([]*Conn, 0, opt.PoolSize),
}
p.connsMu.Lock()
p.checkMinIdleConns()
p.connsMu.Unlock()
return p
}
Cognitive complexity: 2
, Cyclomatic complexity: 1
func NewSingleConnPool
func NewSingleConnPool(pool Pooler, cn *Conn) *SingleConnPool {
return &SingleConnPool{
pool: pool,
cn: cn,
}
}
Cognitive complexity: 1
, Cyclomatic complexity: 1
func NewStickyConnPool
func NewStickyConnPool(pool Pooler) *StickyConnPool {
p, ok := pool.(*StickyConnPool)
if !ok {
p = &StickyConnPool{
pool: pool,
ch: make(chan *Conn, 1),
}
}
atomic.AddInt32(&p.shared, 1)
return p
}
Cognitive complexity: 3
, Cyclomatic complexity: 2
func (*Conn) Close
func (cn *Conn) Close() error {
return cn.netConn.Close()
}
Cognitive complexity: 0
, Cyclomatic complexity: 1
func (*Conn) RemoteAddr
func (cn *Conn) RemoteAddr() net.Addr {
if cn.netConn != nil {
return cn.netConn.RemoteAddr()
}
return nil
}
Cognitive complexity: 2
, Cyclomatic complexity: 2
func (*Conn) SetNetConn
func (cn *Conn) SetNetConn(netConn net.Conn) {
cn.netConn = netConn
cn.rd.Reset(netConn)
cn.bw.Reset(netConn)
}
Cognitive complexity: 0
, Cyclomatic complexity: 1
func (*Conn) SetUsedAt
func (cn *Conn) SetUsedAt(tm time.Time) {
atomic.StoreInt64(&cn.usedAt, tm.Unix())
}
Cognitive complexity: 0
, Cyclomatic complexity: 1
func (*Conn) UsedAt
func (cn *Conn) UsedAt() time.Time {
unix := atomic.LoadInt64(&cn.usedAt)
return time.Unix(unix, 0)
}
Cognitive complexity: 0
, Cyclomatic complexity: 1
func (*Conn) WithReader
func (cn *Conn) WithReader(
ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error,
) error {
if timeout >= 0 {
if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {
return err
}
}
return fn(cn.rd)
}
Cognitive complexity: 4
, Cyclomatic complexity: 3
func (*Conn) WithWriter
func (cn *Conn) WithWriter(
ctx context.Context, timeout time.Duration, fn func(wr *proto.Writer) error,
) error {
if timeout >= 0 {
if err := cn.netConn.SetWriteDeadline(cn.deadline(ctx, timeout)); err != nil {
return err
}
}
if cn.bw.Buffered() > 0 {
cn.bw.Reset(cn.netConn)
}
if err := fn(cn.wr); err != nil {
return err
}
return cn.bw.Flush()
}
Cognitive complexity: 8
, Cyclomatic complexity: 5
func (*Conn) Write
func (cn *Conn) Write(b []byte) (int, error) {
return cn.netConn.Write(b)
}
Cognitive complexity: 0
, Cyclomatic complexity: 1
func (*ConnPool) CloseConn
func (p *ConnPool) CloseConn(cn *Conn) error {
p.removeConnWithLock(cn)
return p.closeConn(cn)
}
Cognitive complexity: 0
, Cyclomatic complexity: 1
func (*ConnPool) Filter
func (p *ConnPool) Filter(fn func(*Conn) bool) error {
p.connsMu.Lock()
defer p.connsMu.Unlock()
var firstErr error
for _, cn := range p.conns {
if fn(cn) {
if err := p.closeConn(cn); err != nil && firstErr == nil {
firstErr = err
}
}
}
return firstErr
}
Cognitive complexity: 7
, Cyclomatic complexity: 5
func (*ConnPool) Get
Get returns existed connection from the pool or creates a new one.
func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
if p.closed() {
return nil, ErrClosed
}
if err := p.waitTurn(ctx); err != nil {
return nil, err
}
for {
p.connsMu.Lock()
cn, err := p.popIdle()
p.connsMu.Unlock()
if err != nil {
p.freeTurn()
return nil, err
}
if cn == nil {
break
}
if !p.isHealthyConn(cn) {
_ = p.CloseConn(cn)
continue
}
atomic.AddUint32(&p.stats.Hits, 1)
return cn, nil
}
atomic.AddUint32(&p.stats.Misses, 1)
newcn, err := p.newConn(ctx, true)
if err != nil {
p.freeTurn()
return nil, err
}
return newcn, nil
}
Cognitive complexity: 14
, Cyclomatic complexity: 8
func (*ConnPool) IdleLen
IdleLen returns number of idle connections.
func (p *ConnPool) IdleLen() int {
p.connsMu.Lock()
n := p.idleConnsLen
p.connsMu.Unlock()
return n
}
Cognitive complexity: 0
, Cyclomatic complexity: 1
func (*ConnPool) Len
Len returns total number of connections.
func (p *ConnPool) Len() int {
p.connsMu.Lock()
n := len(p.conns)
p.connsMu.Unlock()
return n
}
Cognitive complexity: 0
, Cyclomatic complexity: 1
func (*ConnPool) Put
func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
if cn.rd.Buffered() > 0 {
internal.Logger.Printf(ctx, "Conn has unread data")
p.Remove(ctx, cn, BadConnError{})
return
}
if !cn.pooled {
p.Remove(ctx, cn, nil)
return
}
var shouldCloseConn bool
p.connsMu.Lock()
if p.cfg.MaxIdleConns == 0 || p.idleConnsLen < p.cfg.MaxIdleConns {
p.idleConns = append(p.idleConns, cn)
p.idleConnsLen++
} else {
p.removeConn(cn)
shouldCloseConn = true
}
p.connsMu.Unlock()
p.freeTurn()
if shouldCloseConn {
_ = p.closeConn(cn)
}
}
Cognitive complexity: 11
, Cyclomatic complexity: 6
func (*ConnPool) Remove
func (p *ConnPool) Remove(_ context.Context, cn *Conn, reason error) {
p.removeConnWithLock(cn)
p.freeTurn()
_ = p.closeConn(cn)
}
Cognitive complexity: 0
, Cyclomatic complexity: 1
func (*ConnPool) Stats
func (p *ConnPool) Stats() *Stats {
return &Stats{
Hits: atomic.LoadUint32(&p.stats.Hits),
Misses: atomic.LoadUint32(&p.stats.Misses),
Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
TotalConns: uint32(p.Len()),
IdleConns: uint32(p.IdleLen()),
StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
}
}
Cognitive complexity: 1
, Cyclomatic complexity: 1
func (*StickyConnPool) Reset
func (p *StickyConnPool) Reset(ctx context.Context) error {
if p.badConnError() == nil {
return nil
}
select {
case cn, ok := <-p.ch:
if !ok {
return ErrClosed
}
p.pool.Remove(ctx, cn, ErrClosed)
p._badConnError.Store(BadConnError{wrapped: nil})
default:
return errors.New("redis: StickyConnPool does not have a Conn")
}
if !atomic.CompareAndSwapUint32(&p.state, stateInited, stateDefault) {
state := atomic.LoadUint32(&p.state)
return fmt.Errorf("redis: invalid StickyConnPool state: %d", state)
}
return nil
}
Cognitive complexity: 9
, Cyclomatic complexity: 5
func (BadConnError) Error
func (e BadConnError) Error() string {
s := "redis: Conn is in a bad state"
if e.wrapped != nil {
s += ": " + e.wrapped.Error()
}
return s
}
Cognitive complexity: 2
, Cyclomatic complexity: 2
func (BadConnError) Unwrap
func (e BadConnError) Unwrap() error {
return e.wrapped
}
Cognitive complexity: 0
, Cyclomatic complexity: 1
Private functions
func connCheck
connCheck (conn net.Conn) error
References: io.EOF, syscall.Conn, syscall.EAGAIN, syscall.EWOULDBLOCK, syscall.Read, time.Time.
func deadline
deadline (ctx context.Context, timeout time.Duration) time.Time
References: time.Now.
func addIdleConn
addIdleConn () error
References: context.TODO.
func checkMinIdleConns
checkMinIdleConns ()
func closeConn
closeConn (cn *Conn) error
func closed
closed () bool
References: atomic.LoadUint32.
func dialConn
dialConn (ctx context.Context, pooled bool) (*Conn, error)
References: atomic.AddUint32, atomic.LoadUint32.
func freeTurn
freeTurn ()
func getLastDialError
getLastDialError () error
func isHealthyConn
isHealthyConn (cn *Conn) bool
References: time.Now.
func newConn
newConn (ctx context.Context, pooled bool) (*Conn, error)
func popIdle
popIdle () (*Conn, error)
func removeConn
removeConn (cn *Conn)
References: atomic.AddUint32.
func removeConnWithLock
removeConnWithLock (cn *Conn)
func setLastDialError
setLastDialError (err error)
func tryDial
tryDial ()
References: atomic.StoreUint32, context.Background, time.Second, time.Sleep.
func waitTurn
waitTurn (ctx context.Context) error
References: atomic.AddUint32, time.Timer.
func badConnError
badConnError () error
func freeConn
freeConn (ctx context.Context, cn *Conn)
Tests
Files: 2. Third party imports: 2. Imports from organisation: 0. Tests: 0. Benchmarks: 0.
Vars
var conn net.Conn
var err error
var ts *httptest.Server