Go API Documentation

github.com/redis/go-redis/v9/internal/pool

No package summary is available.

Package

Files: 5. Third party imports: 0. Imports from organisation: 0. Tests: 0. Benchmarks: 0.

Constants

const (
	stateDefault	= 0
	stateInited	= 1
	stateClosed	= 2
)

Vars

var (
	// ErrClosed performs any operation on the closed client will return this error.
	ErrClosed	= errors.New("redis: client is closed")

	// ErrPoolExhausted is returned from a pool connection method
	// when the maximum number of database connections in the pool has been reached.
	ErrPoolExhausted	= errors.New("redis: connection pool exhausted")

	// ErrPoolTimeout timed out waiting to get a connection from the connection pool.
	ErrPoolTimeout	= errors.New("redis: connection pool timeout")
)
var _ Pooler = (*ConnPool)(nil)
var errUnexpectedRead = errors.New("unexpected read from socket")
var noDeadline = time.Time{}
var timers = sync.Pool{
	New: func() interface{} {
		t := time.NewTimer(time.Hour)
		t.Stop()
		return t
	},
}

Types

BadConnError

This type doesn't have documentation.

type BadConnError struct {
	wrapped error
}

Conn

This type doesn't have documentation.

type Conn struct {
	usedAt	int64	// atomic
	netConn	net.Conn

	rd	*proto.Reader
	bw	*bufio.Writer
	wr	*proto.Writer

	Inited		bool
	pooled		bool
	createdAt	time.Time
}

ConnPool

This type doesn't have documentation.

type ConnPool struct {
	cfg	*Options

	dialErrorsNum	uint32	// atomic
	lastDialError	atomic.Value

	queue	chan struct{}

	connsMu		sync.Mutex
	conns		[]*Conn
	idleConns	[]*Conn

	poolSize	int
	idleConnsLen	int

	stats	Stats

	_closed	uint32	// atomic
}

Options

This type doesn't have documentation.

type Options struct {
	Dialer	func(context.Context) (net.Conn, error)

	PoolFIFO	bool
	PoolSize	int
	PoolTimeout	time.Duration
	MinIdleConns	int
	MaxIdleConns	int
	MaxActiveConns	int
	ConnMaxIdleTime	time.Duration
	ConnMaxLifetime	time.Duration
}

Pooler

This type doesn't have documentation.

type Pooler interface {
	NewConn(context.Context) (*Conn, error)
	CloseConn(*Conn) error

	Get(context.Context) (*Conn, error)
	Put(context.Context, *Conn)
	Remove(context.Context, *Conn, error)

	Len() int
	IdleLen() int
	Stats() *Stats

	Close() error
}

SingleConnPool

This type doesn't have documentation.

type SingleConnPool struct {
	pool		Pooler
	cn		*Conn
	stickyErr	error
}

Stats

Stats contains pool state information and accumulated stats.

type Stats struct {
	Hits		uint32	// number of times free connection was found in the pool
	Misses		uint32	// number of times free connection was NOT found in the pool
	Timeouts	uint32	// number of times a wait timeout occurred

	TotalConns	uint32	// number of total connections in the pool
	IdleConns	uint32	// number of idle connections in the pool
	StaleConns	uint32	// number of stale connections removed from the pool
}

StickyConnPool

This type doesn't have documentation.

type StickyConnPool struct {
	pool	Pooler
	shared	int32	// atomic

	state	uint32	// atomic
	ch	chan *Conn

	_badConnError	atomic.Value
}

lastDialErrorWrap

This type doesn't have documentation.

type lastDialErrorWrap struct {
	err error
}

Functions

func NewConn

func NewConn(netConn net.Conn) *Conn {
	cn := &Conn{
		netConn:	netConn,
		createdAt:	time.Now(),
	}
	cn.rd = proto.NewReader(netConn)
	cn.bw = bufio.NewWriter(netConn)
	cn.wr = proto.NewWriter(cn.bw)
	cn.SetUsedAt(time.Now())
	return cn
}

Cognitive complexity: 1, Cyclomatic complexity: 1

Uses: bufio.NewWriter, proto.NewReader, proto.NewWriter, time.Now.

func NewConnPool

func NewConnPool(opt *Options) *ConnPool {
	p := &ConnPool{
		cfg:	opt,

		queue:		make(chan struct{}, opt.PoolSize),
		conns:		make([]*Conn, 0, opt.PoolSize),
		idleConns:	make([]*Conn, 0, opt.PoolSize),
	}

	p.connsMu.Lock()
	p.checkMinIdleConns()
	p.connsMu.Unlock()

	return p
}

Cognitive complexity: 2, Cyclomatic complexity: 1

func NewSingleConnPool

func NewSingleConnPool(pool Pooler, cn *Conn) *SingleConnPool {
	return &SingleConnPool{
		pool:	pool,
		cn:	cn,
	}
}

Cognitive complexity: 1, Cyclomatic complexity: 1

func NewStickyConnPool

func NewStickyConnPool(pool Pooler) *StickyConnPool {
	p, ok := pool.(*StickyConnPool)
	if !ok {
		p = &StickyConnPool{
			pool:	pool,
			ch:	make(chan *Conn, 1),
		}
	}
	atomic.AddInt32(&p.shared, 1)
	return p
}

Cognitive complexity: 3, Cyclomatic complexity: 2

Uses: atomic.AddInt32.

func (*Conn) Close

func (cn *Conn) Close() error {
	return cn.netConn.Close()
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (*Conn) RemoteAddr

func (cn *Conn) RemoteAddr() net.Addr {
	if cn.netConn != nil {
		return cn.netConn.RemoteAddr()
	}
	return nil
}

Cognitive complexity: 2, Cyclomatic complexity: 2

func (*Conn) SetNetConn

func (cn *Conn) SetNetConn(netConn net.Conn) {
	cn.netConn = netConn
	cn.rd.Reset(netConn)
	cn.bw.Reset(netConn)
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (*Conn) SetUsedAt

func (cn *Conn) SetUsedAt(tm time.Time) {
	atomic.StoreInt64(&cn.usedAt, tm.Unix())
}

Cognitive complexity: 0, Cyclomatic complexity: 1

Uses: atomic.StoreInt64.

func (*Conn) UsedAt

func (cn *Conn) UsedAt() time.Time {
	unix := atomic.LoadInt64(&cn.usedAt)
	return time.Unix(unix, 0)
}

Cognitive complexity: 0, Cyclomatic complexity: 1

Uses: atomic.LoadInt64, time.Unix.

func (*Conn) WithReader

func (cn *Conn) WithReader(
	ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error,
) error {
	if timeout >= 0 {
		if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {
			return err
		}
	}
	return fn(cn.rd)
}

Cognitive complexity: 4, Cyclomatic complexity: 3

func (*Conn) WithWriter

func (cn *Conn) WithWriter(
	ctx context.Context, timeout time.Duration, fn func(wr *proto.Writer) error,
) error {
	if timeout >= 0 {
		if err := cn.netConn.SetWriteDeadline(cn.deadline(ctx, timeout)); err != nil {
			return err
		}
	}

	if cn.bw.Buffered() > 0 {
		cn.bw.Reset(cn.netConn)
	}

	if err := fn(cn.wr); err != nil {
		return err
	}

	return cn.bw.Flush()
}

Cognitive complexity: 8, Cyclomatic complexity: 5

func (*Conn) Write

func (cn *Conn) Write(b []byte) (int, error) {
	return cn.netConn.Write(b)
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (*ConnPool) CloseConn

func (p *ConnPool) CloseConn(cn *Conn) error {
	p.removeConnWithLock(cn)
	return p.closeConn(cn)
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (*ConnPool) Filter

func (p *ConnPool) Filter(fn func(*Conn) bool) error {
	p.connsMu.Lock()
	defer p.connsMu.Unlock()

	var firstErr error
	for _, cn := range p.conns {
		if fn(cn) {
			if err := p.closeConn(cn); err != nil && firstErr == nil {
				firstErr = err
			}
		}
	}
	return firstErr
}

Cognitive complexity: 7, Cyclomatic complexity: 5

func (*ConnPool) Get

Get returns existed connection from the pool or creates a new one.

func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
	if p.closed() {
		return nil, ErrClosed
	}

	if err := p.waitTurn(ctx); err != nil {
		return nil, err
	}

	for {
		p.connsMu.Lock()
		cn, err := p.popIdle()
		p.connsMu.Unlock()

		if err != nil {
			p.freeTurn()
			return nil, err
		}

		if cn == nil {
			break
		}

		if !p.isHealthyConn(cn) {
			_ = p.CloseConn(cn)
			continue
		}

		atomic.AddUint32(&p.stats.Hits, 1)
		return cn, nil
	}

	atomic.AddUint32(&p.stats.Misses, 1)

	newcn, err := p.newConn(ctx, true)
	if err != nil {
		p.freeTurn()
		return nil, err
	}

	return newcn, nil
}

Cognitive complexity: 14, Cyclomatic complexity: 8

Uses: atomic.AddUint32.

func (*ConnPool) IdleLen

IdleLen returns number of idle connections.

func (p *ConnPool) IdleLen() int {
	p.connsMu.Lock()
	n := p.idleConnsLen
	p.connsMu.Unlock()
	return n
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (*ConnPool) Len

Len returns total number of connections.

func (p *ConnPool) Len() int {
	p.connsMu.Lock()
	n := len(p.conns)
	p.connsMu.Unlock()
	return n
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (*ConnPool) Put

func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
	if cn.rd.Buffered() > 0 {
		internal.Logger.Printf(ctx, "Conn has unread data")
		p.Remove(ctx, cn, BadConnError{})
		return
	}

	if !cn.pooled {
		p.Remove(ctx, cn, nil)
		return
	}

	var shouldCloseConn bool

	p.connsMu.Lock()

	if p.cfg.MaxIdleConns == 0 || p.idleConnsLen < p.cfg.MaxIdleConns {
		p.idleConns = append(p.idleConns, cn)
		p.idleConnsLen++
	} else {
		p.removeConn(cn)
		shouldCloseConn = true
	}

	p.connsMu.Unlock()

	p.freeTurn()

	if shouldCloseConn {
		_ = p.closeConn(cn)
	}
}

Cognitive complexity: 11, Cyclomatic complexity: 6

func (*ConnPool) Remove

func (p *ConnPool) Remove(_ context.Context, cn *Conn, reason error) {
	p.removeConnWithLock(cn)
	p.freeTurn()
	_ = p.closeConn(cn)
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (*ConnPool) Stats

func (p *ConnPool) Stats() *Stats {
	return &Stats{
		Hits:		atomic.LoadUint32(&p.stats.Hits),
		Misses:		atomic.LoadUint32(&p.stats.Misses),
		Timeouts:	atomic.LoadUint32(&p.stats.Timeouts),

		TotalConns:	uint32(p.Len()),
		IdleConns:	uint32(p.IdleLen()),
		StaleConns:	atomic.LoadUint32(&p.stats.StaleConns),
	}
}

Cognitive complexity: 1, Cyclomatic complexity: 1

Uses: atomic.LoadUint32.

func (*StickyConnPool) Reset

func (p *StickyConnPool) Reset(ctx context.Context) error {
	if p.badConnError() == nil {
		return nil
	}

	select {
	case cn, ok := <-p.ch:
		if !ok {
			return ErrClosed
		}
		p.pool.Remove(ctx, cn, ErrClosed)
		p._badConnError.Store(BadConnError{wrapped: nil})
	default:
		return errors.New("redis: StickyConnPool does not have a Conn")
	}

	if !atomic.CompareAndSwapUint32(&p.state, stateInited, stateDefault) {
		state := atomic.LoadUint32(&p.state)
		return fmt.Errorf("redis: invalid StickyConnPool state: %d", state)
	}

	return nil
}

Cognitive complexity: 9, Cyclomatic complexity: 5

Uses: atomic.CompareAndSwapUint32, atomic.LoadUint32, errors.New, fmt.Errorf.

func (BadConnError) Error

func (e BadConnError) Error() string {
	s := "redis: Conn is in a bad state"
	if e.wrapped != nil {
		s += ": " + e.wrapped.Error()
	}
	return s
}

Cognitive complexity: 2, Cyclomatic complexity: 2

func (BadConnError) Unwrap

func (e BadConnError) Unwrap() error {
	return e.wrapped
}

Cognitive complexity: 0, Cyclomatic complexity: 1

Private functions

func connCheck

connCheck (conn net.Conn) error
References: io.EOF, syscall.Conn, syscall.EAGAIN, syscall.EWOULDBLOCK, syscall.Read, time.Time.

func deadline

deadline (ctx context.Context, timeout time.Duration) time.Time
References: time.Now.

func addIdleConn

addIdleConn () error
References: context.TODO.

func checkMinIdleConns

checkMinIdleConns ()

func closeConn

closeConn (cn *Conn) error

func closed

closed () bool
References: atomic.LoadUint32.

func dialConn

dialConn (ctx context.Context, pooled bool) (*Conn, error)
References: atomic.AddUint32, atomic.LoadUint32.

func freeTurn

freeTurn ()

func getLastDialError

getLastDialError () error

func isHealthyConn

isHealthyConn (cn *Conn) bool
References: time.Now.

func newConn

newConn (ctx context.Context, pooled bool) (*Conn, error)

func popIdle

popIdle () (*Conn, error)

func removeConn

removeConn (cn *Conn)
References: atomic.AddUint32.

func removeConnWithLock

removeConnWithLock (cn *Conn)

func setLastDialError

setLastDialError (err error)

func tryDial

tryDial ()
References: atomic.StoreUint32, context.Background, time.Second, time.Sleep.

func waitTurn

waitTurn (ctx context.Context) error
References: atomic.AddUint32, time.Timer.

func badConnError

badConnError () error

func freeConn

freeConn (ctx context.Context, cn *Conn)


Tests

Files: 2. Third party imports: 2. Imports from organisation: 0. Tests: 0. Benchmarks: 0.

Vars

var conn net.Conn
var err error
var ts *httptest.Server