Go API Documentation

github.com/TykTechnologies/tyk/rpc

No package summary is available.

Package

Files: 3. Third party imports: 5. Imports from organisation: 1. Tests: 0. Benchmarks: 0.

Constants

const ANALYTICS_KEYNAME = "tyk-system-analytics"
const (
	ClientSingletonCall	= "gorpcClientCall"
	FuncClientSingletonCall	= "gorpcDispatcherClientCall"
)

Vars

ErrRPCIsDown this is returned when we can't reach rpc server.

var ErrRPCIsDown = errors.New("RPCStorageHandler: rpc is either down or was not configured")
var (
	GlobalRPCCallTimeout	= 30 * time.Second
	GlobalRPCPingTimeout	= 60 * time.Second
	Log			= &logrus.Logger{}
	Instrument		*health.Stream

	clientSingleton		*gorpc.Client
	clientSingletonMu	sync.Mutex
	funcClientSingleton	*gorpc.DispatcherClient

	dispatcher	= gorpc.NewDispatcher()
	addedFuncs	= make(map[string]bool)

	getGroupLoginCallback		func(string, string) interface{}
	emergencyModeCallback		func()
	emergencyModeLoadedCallback	func()

	killChan	= make(chan int)
	killed		bool
	id		string

	rpcLoginMu	sync.Mutex

	rpcConnectMu	sync.Mutex

	// UseSyncLoginRPC for tests where we dont need to execute as a goroutine
	UseSyncLoginRPC	bool

	AnalyticsSerializers	[]serializer.AnalyticsSerializer
)
var (
	syncForcerInstance	*SyncronizerForcer
	syncForcerOnce		sync.Once
)
var errLogFailed = errors.New("Login incorrect")

rpc.Login is callend may places we only need one in flight at a time.

var loginFlight singleflight.Group
var rpcConnectionsPool []net.Conn
var values rpcOpts

Types

Config

This type doesn't have documentation.

Field name Field type Comment
UseSSL

bool

No comment on field.
SSLInsecureSkipVerify

bool

No comment on field.
SSLMinVersion

uint16

No comment on field.
SSLMaxVersion

uint16

No comment on field.
ConnectionString

string

No comment on field.
RPCKey

string

No comment on field.
APIKey

string

No comment on field.
GroupID

string

No comment on field.
CallTimeout

int

No comment on field.
PingTimeout

int

No comment on field.
RPCPoolSize

int

No comment on field.
type Config struct {
	UseSSL			bool	`json:"use_ssl"`
	SSLInsecureSkipVerify	bool	`json:"ssl_insecure_skip_verify"`
	SSLMinVersion		uint16	`json:"ssl_min_version"`
	SSLMaxVersion		uint16	`json:"ssl_max_version"`
	ConnectionString	string	`json:"connection_string"`
	RPCKey			string	`json:"rpc_key"`
	APIKey			string	`json:"api_key"`
	GroupID			string	`json:"group_id"`
	CallTimeout		int	`json:"call_timeout"`
	PingTimeout		int	`json:"ping_timeout"`
	RPCPoolSize		int	`json:"rpc_pool_size"`
}

Purger

RPCPurger will purge analytics data into a Mongo database, requires that the Mongo DB string is specified in the Config object

Field name Field type Comment
Store

storage.Handler

No comment on field.
type Purger struct {
	Store storage.Handler
}

SyncronizerForcer

This type doesn't have documentation.

Field name Field type Comment
store

*storage.RedisCluster

No comment on field.
getNodeDataFunc

func() []byte

No comment on field.
isFirstConnection

bool

No comment on field.
type SyncronizerForcer struct {
	store			*storage.RedisCluster
	getNodeDataFunc		func() []byte
	isFirstConnection	bool
}

rpcOpts

This type doesn't have documentation.

Field name Field type Comment
loadCounts

atomic.Value

This tracks how many times have successfully logged. If this is 0 then we are in cold start.

emergencyMode

atomic.Value

No comment on field.
emergencyModeLoaded

atomic.Value

No comment on field.
config

atomic.Value

No comment on field.
clientIsConnected

atomic.Value

No comment on field.
type rpcOpts struct {
	// This tracks how many times have successfully logged. If this is 0 then we
	// are in cold start.
	loadCounts		atomic.Value
	emergencyMode		atomic.Value
	emergencyModeLoaded	atomic.Value
	config			atomic.Value
	clientIsConnected	atomic.Value
}

Functions

func CloseConnections

func CloseConnections() {
	for k, v := range rpcConnectionsPool {
		err := v.Close()
		if err != nil {
			Log.WithError(err).Error("closing connection")
		} else {
			rpcConnectionsPool = append(rpcConnectionsPool[:k], rpcConnectionsPool[k+1:]...)
		}
	}
}

Cognitive complexity: 7, Cyclomatic complexity: 3

func Connect

Connect will establish a connection to the RPC server specified in connection options

func Connect(
	connConfig Config,
	suppressRegister bool,
	dispatcherFuncs map[string]interface{},
	getGroupLoginFunc func(string, string) interface{},
	emergencyModeFunc func(),
	emergencyModeLoadedFunc func(),
) bool {
	rpcConnectMu.Lock()
	defer rpcConnectMu.Unlock()

	setupConnectionConfig(connConfig, getGroupLoginFunc, emergencyModeFunc, emergencyModeLoadedFunc)
	if values.ClientIsConnected() {
		Log.Debug("Using RPC singleton for connection")
		return true
	}

	if clientSingleton != nil {
		return !values.GetEmergencyMode()
	}

	Log.Info("Setting new RPC connection!")
	initializeClient()
	loadDispatcher(dispatcherFuncs)

	if funcClientSingleton == nil {
		funcClientSingleton = dispatcher.NewFuncClient(clientSingleton)
	}

	handleLogin()
	if !suppressRegister {
		register()
		go checkDisconnect()
	}

	return true
}

Cognitive complexity: 10, Cyclomatic complexity: 5

func Disconnect

func Disconnect() bool {
	values.clientIsConnected.Store(false)
	return true
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func EmitErrorEvent

func EmitErrorEvent(jobName string, funcName string, err error) {
	if Instrument == nil {
		return
	}

	job := Instrument.NewJob(jobName)
	if emitErr := job.EventErr(funcName, err); emitErr != nil {
		Log.WithError(emitErr).WithFields(logrus.Fields{
			"jobName":	jobName,
			"funcName":	funcName,
		})
	}
}

Cognitive complexity: 5, Cyclomatic complexity: 3

Uses: logrus.Fields.

func EmitErrorEventKv

func EmitErrorEventKv(jobName string, funcName string, err error, kv map[string]string) {
	if Instrument == nil {
		return
	}

	job := Instrument.NewJob(jobName)
	if emitErr := job.EventErrKv(funcName, err, kv); emitErr != nil {
		Log.WithError(emitErr).WithFields(logrus.Fields{
			"jobName":	jobName,
			"funcName":	funcName,
			"kv":		kv,
		})
	}
}

Cognitive complexity: 5, Cyclomatic complexity: 3

Uses: logrus.Fields.

func ForceConnected

ForceConnected only intended to be used in tests do not use it for any other thing

func ForceConnected(t *testing.T) {
	values.clientIsConnected.Store(true)
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func FuncClientSingleton

FuncClientSingleton performs RPC call. This might be called before we have established RPC connection, in that case we perform a retry with exponential backoff ensuring indeed we can't connect to the rpc, this will eventually fall into emergency mode( That is handled outside of this function call)

func FuncClientSingleton(funcName string, request interface{}) (result interface{}, err error) {
	be := backoff.Retry(func() error {
		if !values.ClientIsConnected() {
			return ErrRPCIsDown
		}
		result, err = funcClientSingleton.CallTimeout(funcName, request, GlobalRPCCallTimeout)
		return nil
	}, backoff.WithMaxRetries(
		backoff.NewConstantBackOff(10*time.Millisecond), 3,
	))
	if be != nil {
		err = be
	}
	return
}

Cognitive complexity: 7, Cyclomatic complexity: 3

Uses: time.Millisecond.

func GroupLogin

func GroupLogin() bool {
	return doGroupLogin(groupLogin)
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func IsEmergencyMode

func IsEmergencyMode() bool {
	return values.GetEmergencyMode()
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func LoadCount

func LoadCount() int {
	return values.GetLoadCounts()
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func Login

Login tries to login to the rpc sever. Returns true if it succeeds and false if it fails.

func Login() bool {
	// I know this is extreme but rpc.Login() appears about 17 times and the
	// methods appears to be sometimes called in goroutines.
	//
	// Unless someone audits to ensure all of where this appears the parent calls
	// are not concurrent, this is a much safer solution.
	v, _, _ := loginFlight.Do("Login", func() (interface{}, error) {
		return loginBase(), nil
	})
	return v.(bool)
}

Cognitive complexity: 2, Cyclomatic complexity: 1

func NewSyncForcer

NewSyncForcer returns a new syncforcer with a connected redis with a key prefix synchronizer-group- for group synchronization control.

func NewSyncForcer(controller *storage.ConnectionHandler, getNodeDataFunc func() []byte) *SyncronizerForcer {
	syncForcerOnce.Do(func() {
		sf := &SyncronizerForcer{}
		sf.store = &storage.RedisCluster{KeyPrefix: "synchronizer-group-", ConnectionHandler: controller}
		sf.store.Connect()
		sf.getNodeDataFunc = getNodeDataFunc
		sf.isFirstConnection = true

		syncForcerInstance = sf
	})

	if syncForcerInstance != nil {
		syncForcerInstance.getNodeDataFunc = getNodeDataFunc
	}

	return syncForcerInstance
}

Cognitive complexity: 5, Cyclomatic complexity: 2

Uses: storage.RedisCluster.

func Reset

func Reset() {
	clientSingleton.Stop()
	clientSingleton = nil
	funcClientSingleton = nil
	values.Reset()
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func ResetEmergencyMode

func ResetEmergencyMode() {
	values.SetEmergencyMode(false)
	values.SetEmergencyModeLoaded(false)
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func SetEmergencyMode

SetEmergencyMode used in tests to force emergency mode

func SetEmergencyMode(t *testing.T, value bool) {
	t.Helper()
	values.SetEmergencyMode(value)
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func SetLoadCounts

func SetLoadCounts(t *testing.T, value int) {
	t.Helper()
	values.SetLoadCounts(value)
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (*Purger) Connect

Connect Connects to RPC

func (r *Purger) Connect() {
	if !values.ClientIsConnected() {
		Log.Error("RPC client is not connected, use Connect method 1st")
	}

	// setup RPC func if needed
	if !addedFuncs["Ping"] {
		dispatcher.AddFunc("Ping", func() bool {
			return false
		})
		addedFuncs["Ping"] = true
	}
	if !addedFuncs["PurgeAnalyticsData"] {
		dispatcher.AddFunc("PurgeAnalyticsData", func(data string) error {
			return nil
		})
		addedFuncs["PurgeAnalyticsData"] = true
	}

	Log.Info("RPC Analytics client using singleton")
}

Cognitive complexity: 8, Cyclomatic complexity: 4

func (*Purger) PurgeCache

PurgeCache will pull the data from the in-memory store and drop it into the specified MongoDB collection

func (r *Purger) PurgeCache() {

	if !values.ClientIsConnected() {
		Log.Error("RPC client is not connected, use Connect method 1st")
	}

	if _, err := FuncClientSingleton("Ping", nil); err != nil {
		Log.WithError(err).Error("Can't purge cache, failed to ping RPC")
		return
	}

	for i := -1; i < 10; i++ {
		var analyticsKeyName string
		if i == -1 {
			//if it's the first iteration, we look for tyk-system-analytics to maintain backwards compatibility or if analytics_config.enable_multiple_analytics_keys is disabled in the gateway
			analyticsKeyName = ANALYTICS_KEYNAME
		} else {
			// keyname + serializationmethod
			analyticsKeyName = fmt.Sprintf("%v_%v", ANALYTICS_KEYNAME, i)
		}

		analyticsValues := r.Store.GetAndDeleteSet(analyticsKeyName)
		if len(analyticsValues) == 0 {
			continue
		}
		keys, failedRecords := processAnalyticsValues(analyticsValues)
		Log.Debugf("could not decode %v records", failedRecords)

		data, err := json.Marshal(keys)
		if err != nil {
			Log.WithError(err).Error("Failed to marshal analytics data")
			return
		}

		// Send keys to RPC
		if _, err := FuncClientSingleton("PurgeAnalyticsData", string(data)); err != nil {
			EmitErrorEvent(FuncClientSingletonCall, "PurgeAnalyticsData", err)
			Log.Warn("Failed to call purge, retrying: ", err)
		}

	}
}

Cognitive complexity: 16, Cyclomatic complexity: 8

Uses: fmt.Sprintf, json.Marshal.

func (*SyncronizerForcer) GetIsFirstConnection

func (sf *SyncronizerForcer) GetIsFirstConnection() bool {
	return sf.isFirstConnection
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (*SyncronizerForcer) GroupLoginCallback

GroupLoginCallback checks if the groupID key exists in the storage to turn on/off ForceSync param. If the the key doesn't exists in the storage, it creates it and set ForceSync to true

func (sf *SyncronizerForcer) GroupLoginCallback(userKey string, groupID string) interface{} {
	shouldForce := sf.isFirstConnection
	sf.SetFirstConnection(false)

	_, err := sf.store.GetKey(groupID)
	if err != nil && errors.Is(err, storage.ErrKeyNotFound) {
		shouldForce = true

		err = sf.store.SetKey(groupID, "", 0)
		if err != nil {
			Log.Error("error setting syncforcer key", err)
		}
		Log.Info("Forcing MDCB synchronization for group:", groupID)
	}

	return apidef.GroupLoginRequest{
		UserKey:	userKey,
		GroupID:	groupID,
		Node:		sf.getNodeDataFunc(),
		ForceSync:	shouldForce,
	}
}

Cognitive complexity: 6, Cyclomatic complexity: 4

Uses: apidef.GroupLoginRequest, errors.Is, storage.ErrKeyNotFound.

func (*SyncronizerForcer) SetFirstConnection

func (sf *SyncronizerForcer) SetFirstConnection(isFirstConnection bool) {
	sf.isFirstConnection = isFirstConnection
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (*rpcOpts) GetEmergencyMode

func (r *rpcOpts) GetEmergencyMode() bool {
	if v := r.emergencyMode.Load(); v != nil {
		return v.(bool)
	}
	return false
}

Cognitive complexity: 2, Cyclomatic complexity: 2

func (*rpcOpts) GetEmergencyModeLoaded

func (r *rpcOpts) GetEmergencyModeLoaded() bool {
	if v := r.emergencyModeLoaded.Load(); v != nil {
		return v.(bool)
	}
	return false
}

Cognitive complexity: 2, Cyclomatic complexity: 2

func (*rpcOpts) GetLoadCounts

func (r *rpcOpts) GetLoadCounts() int {
	if v := r.loadCounts.Load(); v != nil {
		return v.(int)
	}
	return 0
}

Cognitive complexity: 2, Cyclomatic complexity: 2

func (*rpcOpts) IncrLoadCounts

func (r *rpcOpts) IncrLoadCounts(n int) {
	if v := r.loadCounts.Load(); v != nil {
		r.loadCounts.Store(v.(int) + n)
	} else {
		r.loadCounts.Store(n)
	}
}

Cognitive complexity: 4, Cyclomatic complexity: 2

func (*rpcOpts) Reset

func (r *rpcOpts) Reset() {
	r.loadCounts.Store(0)
	r.emergencyMode.Store(false)
	r.emergencyModeLoaded.Store(false)
	r.clientIsConnected.Store(false)
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (*rpcOpts) SetEmergencyMode

func (r *rpcOpts) SetEmergencyMode(n bool) {
	r.emergencyMode.Store(n)
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (*rpcOpts) SetEmergencyModeLoaded

func (r *rpcOpts) SetEmergencyModeLoaded(n bool) {
	r.emergencyModeLoaded.Store(n)
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (*rpcOpts) SetLoadCounts

func (r *rpcOpts) SetLoadCounts(n int) {
	r.loadCounts.Store(n)
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (Purger) PurgeLoop

PurgeLoop starts the loop that will pull data out of the in-memory store and into RPC.

func (r Purger) PurgeLoop(ctx context.Context, interval time.Duration) {
	tick := time.NewTicker(interval * time.Second)

	for {
		select {
		case <-ctx.Done():
			return
		case <-tick.C:
			r.PurgeCache()
		}
	}
}

Cognitive complexity: 5, Cyclomatic complexity: 4

Uses: time.NewTicker, time.Second.

func (rpcOpts) ClientIsConnected

func (r rpcOpts) ClientIsConnected() bool {
	if v := r.clientIsConnected.Load(); v != nil {
		return v.(bool)
	}

	return false
}

Cognitive complexity: 2, Cyclomatic complexity: 2

func (rpcOpts) Config

func (r rpcOpts) Config() Config {
	if v := r.config.Load(); v != nil {
		return v.(Config)
	}
	return Config{}
}

Cognitive complexity: 3, Cyclomatic complexity: 2

Private functions

func checkDisconnect

checkDisconnect ()

func decodeAnalyticsRecord

decodeAnalyticsRecord (encoded interface{}) (analytics.AnalyticsRecord, error)
References: analytics.AnalyticsRecord, msgpack.Unmarshal.

func doGroupLogin

doGroupLogin (login func() error) bool

func doLoginWithRetries

doLoginWithRetries uses login as a login function by calling it with retries until it succeeds or ultimately fail.

hasAPIKey is called to check whether config.APIKey is set if this function returns false we exit the process.

isGroup returns true if the config.GroupID is set. If this returns true then we perform group login.

doLoginWithRetries (login,group func() error, hasAPIKey,isGroup func() bool) bool

func groupLogin

groupLogin () error

func handleLogin

handleLogin ()

func hasAPIKey

hasAPIKey () bool

func initializeClient

initializeClient ()
References: gorpc.NewTCPClient, gorpc.NewTLSClient, gorpc.NilErrorLogger, logrus.DebugLevel, tls.Config, uuid.New.

func isGroup

isGroup () bool

func loadDispatcher

loadDispatcher (dispatcherFuncs map[string]interface{})

func login

login () error

func loginBase

loginBase () bool

func onConnectFunc

onConnectFunc (conn net.Conn) (net.Conn, string, error)

func processAnalyticsValues

processAnalyticsValues (analyticsValues []interface{}) ([]interface{}, int)

func recoverOp

recoverOp (fn func() error) func() error

func register

register ()
References: uuid.New.

func setupConnectionConfig

setupConnectionConfig (connConfig Config, loginFunc func(string, string) interface{}, emergencyModeFunc func(), emergencyModeLoadedFunc func())

func setupDialFunction

setupDialFunction (connID string)
References: net.Conn, net.Dialer, strconv.FormatBool, time.Second, tls.Config, tls.DialWithDialer.


Tests

Files: 3. Third party imports: 2. Imports from organisation: 0. Tests: 8. Benchmarks: 0.

Vars

var rc *storage.ConnectionHandler

Test functions

TestClientIsConnected

References: testing.T.

TestDecodeAnalyticsRecord

References: analytics.AnalyticsRecord, assert.Nil, msgpack.Marshal, testing.T.

TestGetNodeDataFunc

References: assert.Equal.

TestGroupLoginCallback

References: apidef.GroupLoginRequest, assert.Equal, assert.True.

TestMain

References: config.New, context.Background, context.WithTimeout, os.Exit, storage.NewConnectionHandler, time.Second.

TestNewSyncForcer

References: assert.Equal, assert.True.

TestProcessAnalyticsValues

References: analytics.AnalyticsRecord, assert.Equal, assert.Nil, msgpack.Marshal, testing.T.

TestRecoveryFromEmergencyMode