Go API Documentation

github.com/TykTechnologies/tyk/internal/rate

No package summary is available.

Package

Files: 8. Third party imports: 0. Imports from organisation: 1. Tests: 0. Benchmarks: 0.

Constants

const (
	// LimiterKeyPrefix serves as a standard prefix for generating rate limit keys.
	LimiterKeyPrefix = "rate-limit-"
)
// The following constants enumerate implemented rate limiters.
const (
	LimitLeakyBucket	string	= "leaky-bucket"
	LimitTokenBucket	string	= "token-bucket"
	LimitFixedWindow	string	= "fixed-window"
	LimitSlidingWindow	string	= "sliding-window"
)

Vars

var (
	// ErrLimitExhausted is returned when the request should be blocked.
	ErrLimitExhausted = limiter.ErrLimitExhausted
)

ErrRedisClientProvider is returned if NewSlidingLog isn't passed a valid RedisClientProvider parameter.

var ErrRedisClientProvider = errors.New("Client doesn't implement RedisClientProvider")
var (
	NewAllowance		= model.NewAllowance
	NewAllowanceFromMap	= model.NewAllowanceFromMap
)

Compile time check that *AllowanceStore implements AllowanceRepository.

var _ AllowanceRepository = &AllowanceStore{}

Types

AllowanceStore

AllowanceStore implements AllowanceRepository.

Field name Field type Comment
redis

redis.UniversalClient

No comment on field.
cacheMu

sync.RWMutex

No comment on field.
cache

map[string][]byte

No comment on field.
stats

struct { set int64 setErrors int64 get int64 getCached int64 getErrors int64 locker int64 }

No comment on field.
type AllowanceStore struct {
	redis	redis.UniversalClient

	cacheMu	sync.RWMutex
	cache	map[string][]byte

	stats	struct {
		set		int64
		setErrors	int64

		get		int64
		getCached	int64
		getErrors	int64

		locker	int64
	}
}

SlidingLog

SlidingLog implements sliding log storage in redis.

Field name Field type Comment
conn

redis.UniversalClient

No comment on field.
pipeline

bool

No comment on field.
PipelineFn

func(context.Context, func(redis.Pipeliner) error) error

PipelineFn is exposed for black box tests in the same package.

smoothingFn

SmoothingFn

smoothingFn will evaluate the current rate and must return true if the request should be blocked. It's required.

type SlidingLog struct {
	conn		redis.UniversalClient
	pipeline	bool

	// PipelineFn is exposed for black box tests in the same package.
	PipelineFn	func(context.Context, func(redis.Pipeliner) error) error

	// smoothingFn will evaluate the current rate and must return true if
	// the request should be blocked. It's required.
	smoothingFn	SmoothingFn
}

Smoothing

Smoothing implements rate limiter smoothing.

Field name Field type Comment
allowanceStore

AllowanceRepository

No comment on field.
type Smoothing struct {
	allowanceStore AllowanceRepository
}

Allowance, AllowanceRepository, SmoothingFn

This type doesn't have documentation.

Field name Field type Comment
type

model.Allowance

No comment on field.
type

model.AllowanceRepository

No comment on field.
type

model.SmoothingFn

No comment on field.
type (
	Allowance		= model.Allowance
	AllowanceRepository	= model.AllowanceRepository
	SmoothingFn		= model.SmoothingFn
)

Functions

func Limiter

Limiter returns the appropriate rate limiter as configured by gateway.

func Limiter(gwConfig *config.Config, redis redis.UniversalClient) limiter.LimiterFunc {
	name, ok := LimiterKind(gwConfig)
	if !ok {
		return nil
	}

	res := limiter.NewLimiter(redis)

	switch name {
	case LimitLeakyBucket:
		return res.LeakyBucket
	case LimitTokenBucket:
		return res.TokenBucket
	case LimitFixedWindow:
		return res.FixedWindow
	case LimitSlidingWindow:
		return res.SlidingWindow
	}

	return nil
}

Cognitive complexity: 8, Cyclomatic complexity: 7

Uses: limiter.NewLimiter.

func LimiterKey

LimiterKey returns a redis key name based on passed parameters. The key should be post-fixed if multiple keys are required (sentinel).

func LimiterKey(currentSession *user.SessionState, rateScope string, key string, useCustomKey bool) string {
	if !useCustomKey && !currentSession.KeyHashEmpty() {
		return Prefix(LimiterKeyPrefix, rateScope, currentSession.KeyHash())
	}

	return Prefix(LimiterKeyPrefix, rateScope, key)
}

Cognitive complexity: 2, Cyclomatic complexity: 3

func LimiterKind

LimiterKind returns the kind of rate limiter enabled by config. This function is used for release builds.

func LimiterKind(c *config.Config) (string, bool) {
	if c.EnableFixedWindowRateLimiter {
		return LimitFixedWindow, true
	}
	return "", false
}

Cognitive complexity: 2, Cyclomatic complexity: 2

func NewAllowanceStore

NewAllowanceStore will return a new instance of *AllowanceStore.

func NewAllowanceStore(redis redis.UniversalClient) *AllowanceStore {
	return &AllowanceStore{
		redis:	redis,
		cache:	make(map[string][]byte),
	}
}

Cognitive complexity: 1, Cyclomatic complexity: 1

func NewSlidingLog

NewSlidingLog creates a new SlidingLog instance with a storage.Handler. In case the storage is offline, it's expected to return nil and an error to handle.

func NewSlidingLog(client interface{}, pipeline bool, smoothingFn SmoothingFn) (*SlidingLog, error) {
	cluster, ok := client.(model.RedisClientProvider)
	if !ok {
		return nil, ErrRedisClientProvider
	}

	conn, err := cluster.Client()
	if err != nil {
		return nil, err
	}

	return NewSlidingLogRedis(conn, pipeline, smoothingFn), nil
}

Cognitive complexity: 5, Cyclomatic complexity: 3

Uses: model.RedisClientProvider.

func NewSlidingLogRedis

NewSlidingLogRedis creates a new SlidingLog instance with a redis.UniversalClient.

func NewSlidingLogRedis(conn redis.UniversalClient, pipeline bool, smoothingFn SmoothingFn) *SlidingLog {
	return &SlidingLog{
		conn:		conn,
		pipeline:	pipeline,
		smoothingFn:	smoothingFn,
	}
}

Cognitive complexity: 1, Cyclomatic complexity: 1

func NewSmoothing

NewSmoothing will return a new instance of *Smoothing.

func NewSmoothing(redis redis.UniversalClient) *Smoothing {
	return &Smoothing{
		allowanceStore: NewAllowanceStore(redis),
	}
}

Cognitive complexity: 1, Cyclomatic complexity: 1

func NewStorage

NewStorage provides a redis v9 client for rate limiter use.

func NewStorage(cfg *config.StorageOptionsConf) redis.UniversalClient {
	// poolSize applies per cluster node and not for the whole cluster.
	poolSize := 500
	if cfg.MaxActive > 0 {
		poolSize = cfg.MaxActive
	}

	timeout := 5 * time.Second

	if cfg.Timeout > 0 {
		timeout = time.Duration(cfg.Timeout) * time.Second
	}

	var tlsConfig *tls.Config

	if cfg.UseSSL {
		tlsConfig = &tls.Config{
			InsecureSkipVerify: cfg.SSLInsecureSkipVerify,
		}
	}

	opts := &redis.UniversalOptions{
		Addrs:			cfg.HostAddrs(),
		MasterName:		cfg.MasterName,
		SentinelPassword:	cfg.SentinelPassword,
		Username:		cfg.Username,
		Password:		cfg.Password,
		DB:			cfg.Database,
		DialTimeout:		timeout,
		ReadTimeout:		timeout,
		WriteTimeout:		timeout,
		//		IdleTimeout:      240 * timeout,
		PoolSize:	poolSize,
		TLSConfig:	tlsConfig,
	}

	if opts.MasterName != "" {
		return redis.NewFailoverClient(opts.Failover())
	}

	if cfg.EnableCluster {
		return redis.NewClusterClient(opts.Cluster())
	}

	return redis.NewClient(opts.Simple())
}

Cognitive complexity: 12, Cyclomatic complexity: 6

Uses: redis.NewClient, redis.NewClusterClient, redis.NewFailoverClient, redis.UniversalOptions, time.Duration, time.Second, tls.Config.

func Prefix

Prefix is a utility function to generate rate limiter redis key names.

func Prefix(params ...string) string {
	var res strings.Builder
	var written int

	for _, p := range params {
		p = strings.Trim(p, "-")
		if p == "" {
			continue
		}

		if written == 0 {
			res.Write([]byte(p))
			written++
			continue
		}

		res.Write([]byte("-"))
		res.Write([]byte(p))
	}
	return res.String()
}

Cognitive complexity: 7, Cyclomatic complexity: 4

Uses: strings.Builder, strings.Trim.

func (*AllowanceStore) Get

Get retrieves and decodes an Allowance value from storage.

func (s *AllowanceStore) Get(ctx context.Context, key string) (*Allowance, error) {
	atomic.AddInt64(&s.stats.get, 1)

	result := s.get(key)
	if result != nil {
		atomic.AddInt64(&s.stats.getCached, 1)
		return result, nil
	}

	hval, err := s.redis.HGetAll(ctx, Prefix(key, "allowance")).Result()
	if err != nil {
		atomic.AddInt64(&s.stats.getErrors, 1)
		return nil, err
	}

	result = NewAllowanceFromMap(hval)

	s.set(key, result)

	return result, nil
}

Cognitive complexity: 4, Cyclomatic complexity: 3

Uses: atomic.AddInt64.

func (*AllowanceStore) Locker

Locker returns a distributed locker, similar to a mutex.

func (s *AllowanceStore) Locker(key string) limiters.DistLocker {
	atomic.AddInt64(&s.stats.locker, 1)
	// Handle distributed lock for the write
	return limiters.NewLockRedis(redis.NewPool(s.redis), Prefix(key, "lock"))
}

Cognitive complexity: 0, Cyclomatic complexity: 1

Uses: atomic.AddInt64, limiters.NewLockRedis, redis.NewPool.

func (*AllowanceStore) Set

Set will write the passed Allowance value to storage.

func (s *AllowanceStore) Set(ctx context.Context, key string, allowance *Allowance) error {
	allowanceKey := Prefix(key, "allowance")

	atomic.AddInt64(&s.stats.set, 1)
	err := s.redis.HSet(ctx, allowanceKey, allowance.Map()).Err()
	if err != nil {
		atomic.AddInt64(&s.stats.setErrors, 1)
	}
	s.redis.Expire(ctx, allowanceKey, 2*allowance.GetDelay())
	s.set(key, allowance)
	return err
}

Cognitive complexity: 2, Cyclomatic complexity: 2

Uses: atomic.AddInt64.

func (*AllowanceStore) String

String will return the stats for the AllowanceStore.

func (s *AllowanceStore) String() string {
	var (
		locker		= atomic.LoadInt64(&s.stats.locker)
		set		= atomic.LoadInt64(&s.stats.set)
		setErrors	= atomic.LoadInt64(&s.stats.setErrors)
		get		= atomic.LoadInt64(&s.stats.get)
		getCached	= atomic.LoadInt64(&s.stats.getCached)
		getErrors	= atomic.LoadInt64(&s.stats.getErrors)
	)
	return fmt.Sprintf("locker=%d set=%d setErrors=%d get=%d getCached=%d getErrors=%d", locker, set, setErrors, get, getCached, getErrors)
}

Cognitive complexity: 0, Cyclomatic complexity: 1

Uses: atomic.LoadInt64, fmt.Sprintf.

func (*SlidingLog) Do

Do will return two values, the first indicates if a request should be blocked, and the second returns an error if any occurred. In case an error occurs, the first value will be true. If there are issues with storage availability for example, requests will be blocked rather than let through, as no rate limit can be enforced without storage.

func (r *SlidingLog) Do(ctx context.Context, now time.Time, key string, maxAllowedRate int64, per int64) (bool, error) {
	currentRate, err := r.SetCount(ctx, now, key, per)
	if err != nil {
		return true, err
	}
	return r.smoothingFn(ctx, key, currentRate, maxAllowedRate), err
}

Cognitive complexity: 2, Cyclomatic complexity: 2

func (*SlidingLog) ExecPipeline

ExecPipeline will run a pipeline function in a pipeline or transaction.

func (r *SlidingLog) ExecPipeline(ctx context.Context, pipeFn func(redis.Pipeliner) error) error {
	if r.PipelineFn != nil {
		return r.PipelineFn(ctx, pipeFn)
	}

	return r.execPipeline(ctx, pipeFn)
}

Cognitive complexity: 2, Cyclomatic complexity: 2

func (*SlidingLog) Get

Get returns the items in the current sliding log window. The sliding log is trimmed removing older items.

func (r *SlidingLog) Get(ctx context.Context, now time.Time, keyName string, per int64) ([]string, error) {
	onePeriodAgo := now.Add(time.Duration(-1*per) * time.Second)

	var res *redis.StringSliceCmd

	pipeFn := func(pipe redis.Pipeliner) error {
		pipe.ZRemRangeByScore(ctx, keyName, "-inf", strconv.Itoa(int(onePeriodAgo.UnixNano())))
		res = pipe.ZRange(ctx, keyName, 0, -1)

		return nil
	}

	if err := r.ExecPipeline(ctx, pipeFn); err != nil {
		return nil, err
	}

	return res.Result()
}

Cognitive complexity: 3, Cyclomatic complexity: 2

Uses: redis.Pipeliner, redis.StringSliceCmd, strconv.Itoa, time.Duration, time.Second.

func (*SlidingLog) GetCount

GetCount returns the number of items in the current sliding log window. The sliding log is trimmed removing older items.

func (r *SlidingLog) GetCount(ctx context.Context, now time.Time, keyName string, per int64) (int64, error) {
	onePeriodAgo := now.Add(time.Duration(-1*per) * time.Second)

	var res *redis.IntCmd

	pipeFn := func(pipe redis.Pipeliner) error {
		pipe.ZRemRangeByScore(ctx, keyName, "-inf", strconv.Itoa(int(onePeriodAgo.UnixNano())))
		res = pipe.ZCard(ctx, keyName)

		return nil
	}

	if err := r.ExecPipeline(ctx, pipeFn); err != nil {
		return 0, err
	}

	return res.Result()
}

Cognitive complexity: 3, Cyclomatic complexity: 2

Uses: redis.IntCmd, redis.Pipeliner, strconv.Itoa, time.Duration, time.Second.

func (*SlidingLog) Set

Set returns the items in the current sliding log window, before adding a new item. The sliding log is trimmed removing older items, and a per seconds expiration is set on the complete log.

func (r *SlidingLog) Set(ctx context.Context, now time.Time, keyName string, per int64) ([]string, error) {
	onePeriodAgo := now.Add(time.Duration(-1*per) * time.Second)

	var res *redis.StringSliceCmd

	pipeFn := func(pipe redis.Pipeliner) error {
		pipe.ZRemRangeByScore(ctx, keyName, "-inf", strconv.Itoa(int(onePeriodAgo.UnixNano())))
		res = pipe.ZRange(ctx, keyName, 0, -1)

		element := redis.Z{
			Score:	float64(now.UnixNano()),
			Member:	strconv.Itoa(int(now.UnixNano())),
		}

		pipe.ZAdd(ctx, keyName, element)
		pipe.Expire(ctx, keyName, time.Duration(per)*time.Second)

		return nil
	}

	if err := r.ExecPipeline(ctx, pipeFn); err != nil {
		return nil, err
	}

	return res.Result()
}

Cognitive complexity: 4, Cyclomatic complexity: 2

Uses: redis.Pipeliner, redis.StringSliceCmd, redis.Z, strconv.Itoa, time.Duration, time.Second.

func (*SlidingLog) SetCount

SetCount returns the number of items in the current sliding log window, before adding a new item. The sliding log is trimmed removing older items, and a per seconds expiration is set on the complete log.

func (r *SlidingLog) SetCount(ctx context.Context, now time.Time, keyName string, per int64) (int64, error) {
	onePeriodAgo := now.Add(time.Duration(-1*per) * time.Second)

	var res *redis.IntCmd

	pipeFn := func(pipe redis.Pipeliner) error {
		pipe.ZRemRangeByScore(ctx, keyName, "-inf", strconv.Itoa(int(onePeriodAgo.UnixNano())))
		res = pipe.ZCard(ctx, keyName)

		element := redis.Z{
			Score:	float64(now.UnixNano()),
			Member:	strconv.Itoa(int(now.UnixNano())),
		}

		pipe.ZAdd(ctx, keyName, element)
		pipe.Expire(ctx, keyName, time.Duration(per)*time.Second)

		return nil
	}

	if err := r.ExecPipeline(ctx, pipeFn); err != nil {
		return 0, err
	}

	return res.Result()
}

Cognitive complexity: 4, Cyclomatic complexity: 2

Uses: redis.IntCmd, redis.Pipeliner, redis.Z, strconv.Itoa, time.Duration, time.Second.

func (*Smoothing) Do

Do processes the rate limit smoothing based on the provided session settings and current rate.

Internally it will get the current allowance, and if the update is allowed will acquire a lock, re-read the allowance, evaluate a smoothing change and write an updated allowance to redis.

An allowance may be returned together with an error. For example, if the distributed lock fails for some reason, the previous Get result that succeeded will be returned, alongside the error. If no error occured, the current allowance in effect is returned.

If an error occured writing an allowance, the previous allowance will be returned.

func (d *Smoothing) Do(r *http.Request, session *apidef.RateLimitSmoothing, key string, currentRate int64, maxAllowedRate int64) (*Allowance, error) {
	// Rate limit smoothing is disabled or threshold is unset, no change, no error.
	if !session.Valid() {
		return nil, fmt.Errorf("smoothing invalid: %w", session.Err())
	}

	ctx := r.Context()

	var createAllowance bool

	allowance, err := d.allowanceStore.Get(ctx, key)
	if err != nil {
		return nil, fmt.Errorf("smoothing: getting allowance: %w", err)
	}

	if allowance.Delay == 0 {
		// Set new allowance if none exists in storage.
		// Starts with the Threshold (minimum allowance).
		allowance = NewAllowance(session.Delay)
		allowance.Current = session.Threshold
		createAllowance = true
	}

	// Allowance can only be set once per defined interval
	if !createAllowance && !allowance.Expired() {
		return allowance, nil
	}

	// Handle distributed lock for the write
	locker := d.allowanceStore.Locker(key)

	// Lock protects get/set from a data race
	if err := locker.Lock(ctx); err != nil {
		return allowance, fmt.Errorf("smoothing: getting lock, skipping update: %w", err)
	}
	defer func() {
		_ = locker.Unlock(ctx)
	}()

	// Create allowance
	if createAllowance {
		allowance.Touch()
		if err := d.allowanceStore.Set(ctx, key, allowance); err != nil {
			// return previous allowance and error
			return allowance, fmt.Errorf("smoothing: can't set new allowance: %w", err)
		}
		return allowance, nil
	}

	// Re-read allowance behind the lock to have accurate state
	allowance, err = d.allowanceStore.Get(ctx, key)
	if err != nil {
		return nil, fmt.Errorf("smoothing: getting allowance: %w", err)
	}

	// Allowance can only be set once per defined interval
	if !allowance.Expired() {
		return allowance, nil
	}

	// Get current allowed rate
	allowedRate := allowance.Get()

	// Increase allowance if necessary
	if newAllowedRate, changed := increaseRateAllowance(session, allowedRate, currentRate, maxAllowedRate); changed {
		newAllowance := NewAllowance(allowance.Delay)
		newAllowance.Set(newAllowedRate)

		if err := d.allowanceStore.Set(ctx, key, newAllowance); err != nil {
			// return previous allowance and error
			return allowance, fmt.Errorf("smoothing: can't set allowance increase: %w", err)
		}

		event.Add(r, event.RateLimitSmoothingUp)
		return newAllowance, nil
	}

	// Decrease allowance if necessary
	if newAllowedRate, changed := decreaseRateAllowance(session, allowedRate, currentRate, session.Threshold); changed {
		newAllowance := NewAllowance(allowance.Delay)
		newAllowance.Set(newAllowedRate)

		if err := d.allowanceStore.Set(ctx, key, newAllowance); err != nil {
			// return previous allowance and error
			return allowance, fmt.Errorf("smoothing: can't set allowance decrease: %w", err)
		}

		event.Add(r, event.RateLimitSmoothingDown)
		return newAllowance, nil
	}

	// return previous allowance (no smoothing performed)
	return allowance, nil
}

Cognitive complexity: 27, Cyclomatic complexity: 15

Uses: event.Add, event.RateLimitSmoothingDown, event.RateLimitSmoothingUp, fmt.Errorf.

func (*Smoothing) String

String returns the String output from the allowance store.

func (d *Smoothing) String() string {
	return d.allowanceStore.String()
}

Cognitive complexity: 0, Cyclomatic complexity: 1

Private functions

func decreaseRateAllowance

decreaseRateAllowance (session *apidef.RateLimitSmoothing, allowedRate int64, currentRate int64, minAllowedRate int64) (int64, bool)

func increaseRateAllowance

increaseRateAllowance (session *apidef.RateLimitSmoothing, allowedRate int64, currentRate int64, maxAllowedRate int64) (int64, bool)

func get

get (key string) *Allowance
References: json.Unmarshal.

func set

set (key string, allowance *Allowance)
References: json.Marshal.

func execPipeline

execPipeline (ctx context.Context, pipeFn func(redis.Pipeliner) error) error


Tests

Files: 4. Third party imports: 2. Imports from organisation: 0. Tests: 10. Benchmarks: 0.

Test functions

TestAllowanceStore_Get

References: assert.Equal, assert.NoError, assert.NotNil, context.Background, time.Now, time.RFC3339Nano.

TestAllowanceStore_Locker

References: assert.NotNil.

TestAllowanceStore_Set

References: assert.Equal, assert.NoError, context.Background, time.Now, time.RFC3339Nano.

TestDecreaseRateAllowance

References: apidef.RateLimitSmoothing, assert.Equal, testing.T.

TestIncreaseRateAllowance

References: apidef.RateLimitSmoothing, assert.Equal, testing.T.

TestNewAllowanceStore

References: assert.Equal, assert.NotNil.

TestNewSmoothing

References: assert.NotEmpty, assert.NotNil.

TestNewStorage

References: assert.NoError, assert.NotNil, config.NewDefaultWithEnv.

TestPrefix

References: assert.Equal.

TestSmoothing_Do

References: apidef.RateLimitSmoothing, assert.EqualError, assert.NoError, context.Background, errors.New, http.NewRequestWithContext, mock.AllowanceStore, testing.T.