github.com/TykTechnologies/tyk/internal/rate
No package summary is available.
Package
Files: 8. Third party imports: 0. Imports from organisation: 1. Tests: 0. Benchmarks: 0.
Constants
const (
// LimiterKeyPrefix serves as a standard prefix for generating rate limit keys.
LimiterKeyPrefix = "rate-limit-"
)
// The following constants enumerate implemented rate limiters.
const (
LimitLeakyBucket string = "leaky-bucket"
LimitTokenBucket string = "token-bucket"
LimitFixedWindow string = "fixed-window"
LimitSlidingWindow string = "sliding-window"
)
Vars
var (
// ErrLimitExhausted is returned when the request should be blocked.
ErrLimitExhausted = limiter.ErrLimitExhausted
)
ErrRedisClientProvider is returned if NewSlidingLog isn't passed a valid RedisClientProvider parameter.
var ErrRedisClientProvider = errors.New("Client doesn't implement RedisClientProvider")
var (
NewAllowance = model.NewAllowance
NewAllowanceFromMap = model.NewAllowanceFromMap
)
Compile time check that *AllowanceStore implements AllowanceRepository.
var _ AllowanceRepository = &AllowanceStore{}
Types
AllowanceStore
AllowanceStore implements AllowanceRepository.
| Field name | Field type | Comment |
|---|---|---|
| redis |
|
No comment on field. |
| cacheMu |
|
No comment on field. |
| cache |
|
No comment on field. |
| stats |
|
No comment on field. |
type AllowanceStore struct {
redis redis.UniversalClient
cacheMu sync.RWMutex
cache map[string][]byte
stats struct {
set int64
setErrors int64
get int64
getCached int64
getErrors int64
locker int64
}
}
SlidingLog
SlidingLog implements sliding log storage in redis.
| Field name | Field type | Comment |
|---|---|---|
| conn |
|
No comment on field. |
| pipeline |
|
No comment on field. |
| PipelineFn |
|
PipelineFn is exposed for black box tests in the same package. |
| smoothingFn |
|
smoothingFn will evaluate the current rate and must return true if the request should be blocked. It's required. |
type SlidingLog struct {
conn redis.UniversalClient
pipeline bool
// PipelineFn is exposed for black box tests in the same package.
PipelineFn func(context.Context, func(redis.Pipeliner) error) error
// smoothingFn will evaluate the current rate and must return true if
// the request should be blocked. It's required.
smoothingFn SmoothingFn
}
Smoothing
Smoothing implements rate limiter smoothing.
| Field name | Field type | Comment |
|---|---|---|
| allowanceStore |
|
No comment on field. |
type Smoothing struct {
allowanceStore AllowanceRepository
}
Allowance, AllowanceRepository, SmoothingFn
This type doesn't have documentation.
| Field name | Field type | Comment |
|---|---|---|
| type |
|
No comment on field. |
| type |
|
No comment on field. |
| type |
|
No comment on field. |
type (
Allowance = model.Allowance
AllowanceRepository = model.AllowanceRepository
SmoothingFn = model.SmoothingFn
)
Functions
func Limiter
Limiter returns the appropriate rate limiter as configured by gateway.
func Limiter(gwConfig *config.Config, redis redis.UniversalClient) limiter.LimiterFunc {
name, ok := LimiterKind(gwConfig)
if !ok {
return nil
}
res := limiter.NewLimiter(redis)
switch name {
case LimitLeakyBucket:
return res.LeakyBucket
case LimitTokenBucket:
return res.TokenBucket
case LimitFixedWindow:
return res.FixedWindow
case LimitSlidingWindow:
return res.SlidingWindow
}
return nil
}
Cognitive complexity: 8, Cyclomatic complexity: 7
func LimiterKey
LimiterKey returns a redis key name based on passed parameters. The key should be post-fixed if multiple keys are required (sentinel).
func LimiterKey(currentSession *user.SessionState, rateScope string, key string, useCustomKey bool) string {
if !useCustomKey && !currentSession.KeyHashEmpty() {
return Prefix(LimiterKeyPrefix, rateScope, currentSession.KeyHash())
}
return Prefix(LimiterKeyPrefix, rateScope, key)
}
Cognitive complexity: 2, Cyclomatic complexity: 3
func LimiterKind
LimiterKind returns the kind of rate limiter enabled by config. This function is used for release builds.
func LimiterKind(c *config.Config) (string, bool) {
if c.EnableFixedWindowRateLimiter {
return LimitFixedWindow, true
}
return "", false
}
Cognitive complexity: 2, Cyclomatic complexity: 2
func NewAllowanceStore
NewAllowanceStore will return a new instance of *AllowanceStore.
func NewAllowanceStore(redis redis.UniversalClient) *AllowanceStore {
return &AllowanceStore{
redis: redis,
cache: make(map[string][]byte),
}
}
Cognitive complexity: 1, Cyclomatic complexity: 1
func NewSlidingLog
NewSlidingLog creates a new SlidingLog instance with a storage.Handler. In case the storage is offline, it's expected to return nil and an error to handle.
func NewSlidingLog(client interface{}, pipeline bool, smoothingFn SmoothingFn) (*SlidingLog, error) {
cluster, ok := client.(model.RedisClientProvider)
if !ok {
return nil, ErrRedisClientProvider
}
conn, err := cluster.Client()
if err != nil {
return nil, err
}
return NewSlidingLogRedis(conn, pipeline, smoothingFn), nil
}
Cognitive complexity: 5, Cyclomatic complexity: 3
func NewSlidingLogRedis
NewSlidingLogRedis creates a new SlidingLog instance with a redis.UniversalClient.
func NewSlidingLogRedis(conn redis.UniversalClient, pipeline bool, smoothingFn SmoothingFn) *SlidingLog {
return &SlidingLog{
conn: conn,
pipeline: pipeline,
smoothingFn: smoothingFn,
}
}
Cognitive complexity: 1, Cyclomatic complexity: 1
func NewSmoothing
NewSmoothing will return a new instance of *Smoothing.
func NewSmoothing(redis redis.UniversalClient) *Smoothing {
return &Smoothing{
allowanceStore: NewAllowanceStore(redis),
}
}
Cognitive complexity: 1, Cyclomatic complexity: 1
func NewStorage
NewStorage provides a redis v9 client for rate limiter use.
func NewStorage(cfg *config.StorageOptionsConf) redis.UniversalClient {
// poolSize applies per cluster node and not for the whole cluster.
poolSize := 500
if cfg.MaxActive > 0 {
poolSize = cfg.MaxActive
}
timeout := 5 * time.Second
if cfg.Timeout > 0 {
timeout = time.Duration(cfg.Timeout) * time.Second
}
var tlsConfig *tls.Config
if cfg.UseSSL {
tlsConfig = &tls.Config{
InsecureSkipVerify: cfg.SSLInsecureSkipVerify,
}
}
opts := &redis.UniversalOptions{
Addrs: cfg.HostAddrs(),
MasterName: cfg.MasterName,
SentinelPassword: cfg.SentinelPassword,
Username: cfg.Username,
Password: cfg.Password,
DB: cfg.Database,
DialTimeout: timeout,
ReadTimeout: timeout,
WriteTimeout: timeout,
// IdleTimeout: 240 * timeout,
PoolSize: poolSize,
TLSConfig: tlsConfig,
}
if opts.MasterName != "" {
return redis.NewFailoverClient(opts.Failover())
}
if cfg.EnableCluster {
return redis.NewClusterClient(opts.Cluster())
}
return redis.NewClient(opts.Simple())
}
Cognitive complexity: 12, Cyclomatic complexity: 6
func Prefix
Prefix is a utility function to generate rate limiter redis key names.
func Prefix(params ...string) string {
var res strings.Builder
var written int
for _, p := range params {
p = strings.Trim(p, "-")
if p == "" {
continue
}
if written == 0 {
res.Write([]byte(p))
written++
continue
}
res.Write([]byte("-"))
res.Write([]byte(p))
}
return res.String()
}
Cognitive complexity: 7, Cyclomatic complexity: 4
func (*AllowanceStore) Get
Get retrieves and decodes an Allowance value from storage.
func (s *AllowanceStore) Get(ctx context.Context, key string) (*Allowance, error) {
atomic.AddInt64(&s.stats.get, 1)
result := s.get(key)
if result != nil {
atomic.AddInt64(&s.stats.getCached, 1)
return result, nil
}
hval, err := s.redis.HGetAll(ctx, Prefix(key, "allowance")).Result()
if err != nil {
atomic.AddInt64(&s.stats.getErrors, 1)
return nil, err
}
result = NewAllowanceFromMap(hval)
s.set(key, result)
return result, nil
}
Cognitive complexity: 4, Cyclomatic complexity: 3
func (*AllowanceStore) Locker
Locker returns a distributed locker, similar to a mutex.
func (s *AllowanceStore) Locker(key string) limiters.DistLocker {
atomic.AddInt64(&s.stats.locker, 1)
// Handle distributed lock for the write
return limiters.NewLockRedis(redis.NewPool(s.redis), Prefix(key, "lock"))
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func (*AllowanceStore) Set
Set will write the passed Allowance value to storage.
func (s *AllowanceStore) Set(ctx context.Context, key string, allowance *Allowance) error {
allowanceKey := Prefix(key, "allowance")
atomic.AddInt64(&s.stats.set, 1)
err := s.redis.HSet(ctx, allowanceKey, allowance.Map()).Err()
if err != nil {
atomic.AddInt64(&s.stats.setErrors, 1)
}
s.redis.Expire(ctx, allowanceKey, 2*allowance.GetDelay())
s.set(key, allowance)
return err
}
Cognitive complexity: 2, Cyclomatic complexity: 2
func (*AllowanceStore) String
String will return the stats for the AllowanceStore.
func (s *AllowanceStore) String() string {
var (
locker = atomic.LoadInt64(&s.stats.locker)
set = atomic.LoadInt64(&s.stats.set)
setErrors = atomic.LoadInt64(&s.stats.setErrors)
get = atomic.LoadInt64(&s.stats.get)
getCached = atomic.LoadInt64(&s.stats.getCached)
getErrors = atomic.LoadInt64(&s.stats.getErrors)
)
return fmt.Sprintf("locker=%d set=%d setErrors=%d get=%d getCached=%d getErrors=%d", locker, set, setErrors, get, getCached, getErrors)
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func (*SlidingLog) Do
Do will return two values, the first indicates if a request should be blocked, and the second
returns an error if any occurred. In case an error occurs, the first value will be true.
If there are issues with storage availability for example, requests will be blocked rather
than let through, as no rate limit can be enforced without storage.
func (r *SlidingLog) Do(ctx context.Context, now time.Time, key string, maxAllowedRate int64, per int64) (bool, error) {
currentRate, err := r.SetCount(ctx, now, key, per)
if err != nil {
return true, err
}
return r.smoothingFn(ctx, key, currentRate, maxAllowedRate), err
}
Cognitive complexity: 2, Cyclomatic complexity: 2
func (*SlidingLog) ExecPipeline
ExecPipeline will run a pipeline function in a pipeline or transaction.
func (r *SlidingLog) ExecPipeline(ctx context.Context, pipeFn func(redis.Pipeliner) error) error {
if r.PipelineFn != nil {
return r.PipelineFn(ctx, pipeFn)
}
return r.execPipeline(ctx, pipeFn)
}
Cognitive complexity: 2, Cyclomatic complexity: 2
func (*SlidingLog) Get
Get returns the items in the current sliding log window. The sliding log is trimmed removing older items.
func (r *SlidingLog) Get(ctx context.Context, now time.Time, keyName string, per int64) ([]string, error) {
onePeriodAgo := now.Add(time.Duration(-1*per) * time.Second)
var res *redis.StringSliceCmd
pipeFn := func(pipe redis.Pipeliner) error {
pipe.ZRemRangeByScore(ctx, keyName, "-inf", strconv.Itoa(int(onePeriodAgo.UnixNano())))
res = pipe.ZRange(ctx, keyName, 0, -1)
return nil
}
if err := r.ExecPipeline(ctx, pipeFn); err != nil {
return nil, err
}
return res.Result()
}
Cognitive complexity: 3, Cyclomatic complexity: 2
func (*SlidingLog) GetCount
GetCount returns the number of items in the current sliding log window. The sliding log is trimmed removing older items.
func (r *SlidingLog) GetCount(ctx context.Context, now time.Time, keyName string, per int64) (int64, error) {
onePeriodAgo := now.Add(time.Duration(-1*per) * time.Second)
var res *redis.IntCmd
pipeFn := func(pipe redis.Pipeliner) error {
pipe.ZRemRangeByScore(ctx, keyName, "-inf", strconv.Itoa(int(onePeriodAgo.UnixNano())))
res = pipe.ZCard(ctx, keyName)
return nil
}
if err := r.ExecPipeline(ctx, pipeFn); err != nil {
return 0, err
}
return res.Result()
}
Cognitive complexity: 3, Cyclomatic complexity: 2
func (*SlidingLog) Set
Set returns the items in the current sliding log window, before adding a new item.
The sliding log is trimmed removing older items, and a per seconds expiration is set on the complete log.
func (r *SlidingLog) Set(ctx context.Context, now time.Time, keyName string, per int64) ([]string, error) {
onePeriodAgo := now.Add(time.Duration(-1*per) * time.Second)
var res *redis.StringSliceCmd
pipeFn := func(pipe redis.Pipeliner) error {
pipe.ZRemRangeByScore(ctx, keyName, "-inf", strconv.Itoa(int(onePeriodAgo.UnixNano())))
res = pipe.ZRange(ctx, keyName, 0, -1)
element := redis.Z{
Score: float64(now.UnixNano()),
Member: strconv.Itoa(int(now.UnixNano())),
}
pipe.ZAdd(ctx, keyName, element)
pipe.Expire(ctx, keyName, time.Duration(per)*time.Second)
return nil
}
if err := r.ExecPipeline(ctx, pipeFn); err != nil {
return nil, err
}
return res.Result()
}
Cognitive complexity: 4, Cyclomatic complexity: 2
func (*SlidingLog) SetCount
SetCount returns the number of items in the current sliding log window, before adding a new item.
The sliding log is trimmed removing older items, and a per seconds expiration is set on the complete log.
func (r *SlidingLog) SetCount(ctx context.Context, now time.Time, keyName string, per int64) (int64, error) {
onePeriodAgo := now.Add(time.Duration(-1*per) * time.Second)
var res *redis.IntCmd
pipeFn := func(pipe redis.Pipeliner) error {
pipe.ZRemRangeByScore(ctx, keyName, "-inf", strconv.Itoa(int(onePeriodAgo.UnixNano())))
res = pipe.ZCard(ctx, keyName)
element := redis.Z{
Score: float64(now.UnixNano()),
Member: strconv.Itoa(int(now.UnixNano())),
}
pipe.ZAdd(ctx, keyName, element)
pipe.Expire(ctx, keyName, time.Duration(per)*time.Second)
return nil
}
if err := r.ExecPipeline(ctx, pipeFn); err != nil {
return 0, err
}
return res.Result()
}
Cognitive complexity: 4, Cyclomatic complexity: 2
func (*Smoothing) Do
Do processes the rate limit smoothing based on the provided session settings and current rate.
Internally it will get the current allowance, and if the update is allowed will acquire a lock, re-read the allowance, evaluate a smoothing change and write an updated allowance to redis.
An allowance may be returned together with an error. For example, if the distributed lock fails for some reason, the previous Get result that succeeded will be returned, alongside the error. If no error occured, the current allowance in effect is returned.
If an error occured writing an allowance, the previous allowance will be returned.
func (d *Smoothing) Do(r *http.Request, session *apidef.RateLimitSmoothing, key string, currentRate int64, maxAllowedRate int64) (*Allowance, error) {
// Rate limit smoothing is disabled or threshold is unset, no change, no error.
if !session.Valid() {
return nil, fmt.Errorf("smoothing invalid: %w", session.Err())
}
ctx := r.Context()
var createAllowance bool
allowance, err := d.allowanceStore.Get(ctx, key)
if err != nil {
return nil, fmt.Errorf("smoothing: getting allowance: %w", err)
}
if allowance.Delay == 0 {
// Set new allowance if none exists in storage.
// Starts with the Threshold (minimum allowance).
allowance = NewAllowance(session.Delay)
allowance.Current = session.Threshold
createAllowance = true
}
// Allowance can only be set once per defined interval
if !createAllowance && !allowance.Expired() {
return allowance, nil
}
// Handle distributed lock for the write
locker := d.allowanceStore.Locker(key)
// Lock protects get/set from a data race
if err := locker.Lock(ctx); err != nil {
return allowance, fmt.Errorf("smoothing: getting lock, skipping update: %w", err)
}
defer func() {
_ = locker.Unlock(ctx)
}()
// Create allowance
if createAllowance {
allowance.Touch()
if err := d.allowanceStore.Set(ctx, key, allowance); err != nil {
// return previous allowance and error
return allowance, fmt.Errorf("smoothing: can't set new allowance: %w", err)
}
return allowance, nil
}
// Re-read allowance behind the lock to have accurate state
allowance, err = d.allowanceStore.Get(ctx, key)
if err != nil {
return nil, fmt.Errorf("smoothing: getting allowance: %w", err)
}
// Allowance can only be set once per defined interval
if !allowance.Expired() {
return allowance, nil
}
// Get current allowed rate
allowedRate := allowance.Get()
// Increase allowance if necessary
if newAllowedRate, changed := increaseRateAllowance(session, allowedRate, currentRate, maxAllowedRate); changed {
newAllowance := NewAllowance(allowance.Delay)
newAllowance.Set(newAllowedRate)
if err := d.allowanceStore.Set(ctx, key, newAllowance); err != nil {
// return previous allowance and error
return allowance, fmt.Errorf("smoothing: can't set allowance increase: %w", err)
}
event.Add(r, event.RateLimitSmoothingUp)
return newAllowance, nil
}
// Decrease allowance if necessary
if newAllowedRate, changed := decreaseRateAllowance(session, allowedRate, currentRate, session.Threshold); changed {
newAllowance := NewAllowance(allowance.Delay)
newAllowance.Set(newAllowedRate)
if err := d.allowanceStore.Set(ctx, key, newAllowance); err != nil {
// return previous allowance and error
return allowance, fmt.Errorf("smoothing: can't set allowance decrease: %w", err)
}
event.Add(r, event.RateLimitSmoothingDown)
return newAllowance, nil
}
// return previous allowance (no smoothing performed)
return allowance, nil
}
Cognitive complexity: 27, Cyclomatic complexity: 15
func (*Smoothing) String
String returns the String output from the allowance store.
func (d *Smoothing) String() string {
return d.allowanceStore.String()
}
Cognitive complexity: 0, Cyclomatic complexity: 1
Private functions
func decreaseRateAllowance
decreaseRateAllowance (session *apidef.RateLimitSmoothing, allowedRate int64, currentRate int64, minAllowedRate int64) (int64, bool)
func increaseRateAllowance
increaseRateAllowance (session *apidef.RateLimitSmoothing, allowedRate int64, currentRate int64, maxAllowedRate int64) (int64, bool)
func get
get (key string) *Allowance
References: json.Unmarshal.
func set
set (key string, allowance *Allowance)
References: json.Marshal.
func execPipeline
execPipeline (ctx context.Context, pipeFn func(redis.Pipeliner) error) error
Tests
Files: 4. Third party imports: 2. Imports from organisation: 0. Tests: 10. Benchmarks: 0.