github.com/TykTechnologies/tyk/rpc
No package summary is available.
Package
Files: 3. Third party imports: 5. Imports from organisation: 1. Tests: 0. Benchmarks: 0.
Constants
const ANALYTICS_KEYNAME = "tyk-system-analytics"
const (
ClientSingletonCall = "gorpcClientCall"
FuncClientSingletonCall = "gorpcDispatcherClientCall"
)
Vars
ErrRPCIsDown this is returned when we can't reach rpc server.
var ErrRPCIsDown = errors.New("RPCStorageHandler: rpc is either down or was not configured")
var (
GlobalRPCCallTimeout = 30 * time.Second
GlobalRPCPingTimeout = 60 * time.Second
Log = &logrus.Logger{}
Instrument *health.Stream
clientSingleton *gorpc.Client
clientSingletonMu sync.Mutex
funcClientSingleton *gorpc.DispatcherClient
dispatcher = gorpc.NewDispatcher()
addedFuncs = make(map[string]bool)
getGroupLoginCallback func(string, string) interface{}
emergencyModeCallback func()
emergencyModeLoadedCallback func()
killChan = make(chan int)
killed bool
id string
rpcLoginMu sync.Mutex
rpcConnectMu sync.Mutex
// UseSyncLoginRPC for tests where we dont need to execute as a goroutine
UseSyncLoginRPC bool
AnalyticsSerializers []serializer.AnalyticsSerializer
)
var (
syncForcerInstance *SyncronizerForcer
syncForcerOnce sync.Once
)
var errLogFailed = errors.New("Login incorrect")
rpc.Login is callend may places we only need one in flight at a time.
var loginFlight singleflight.Group
var rpcConnectionsPool []net.Conn
var values rpcOpts
Types
Config
This type doesn't have documentation.
| Field name | Field type | Comment |
|---|---|---|
| UseSSL |
|
No comment on field. |
| SSLInsecureSkipVerify |
|
No comment on field. |
| SSLMinVersion |
|
No comment on field. |
| SSLMaxVersion |
|
No comment on field. |
| ConnectionString |
|
No comment on field. |
| RPCKey |
|
No comment on field. |
| APIKey |
|
No comment on field. |
| GroupID |
|
No comment on field. |
| CallTimeout |
|
No comment on field. |
| PingTimeout |
|
No comment on field. |
| RPCPoolSize |
|
No comment on field. |
type Config struct {
UseSSL bool `json:"use_ssl"`
SSLInsecureSkipVerify bool `json:"ssl_insecure_skip_verify"`
SSLMinVersion uint16 `json:"ssl_min_version"`
SSLMaxVersion uint16 `json:"ssl_max_version"`
ConnectionString string `json:"connection_string"`
RPCKey string `json:"rpc_key"`
APIKey string `json:"api_key"`
GroupID string `json:"group_id"`
CallTimeout int `json:"call_timeout"`
PingTimeout int `json:"ping_timeout"`
RPCPoolSize int `json:"rpc_pool_size"`
}
Purger
RPCPurger will purge analytics data into a Mongo database, requires that the Mongo DB string is specified in the Config object
| Field name | Field type | Comment |
|---|---|---|
| Store |
|
No comment on field. |
type Purger struct {
Store storage.Handler
}
SyncronizerForcer
This type doesn't have documentation.
| Field name | Field type | Comment |
|---|---|---|
| store |
|
No comment on field. |
| getNodeDataFunc |
|
No comment on field. |
| isFirstConnection |
|
No comment on field. |
type SyncronizerForcer struct {
store *storage.RedisCluster
getNodeDataFunc func() []byte
isFirstConnection bool
}
rpcOpts
This type doesn't have documentation.
| Field name | Field type | Comment |
|---|---|---|
| loadCounts |
|
This tracks how many times have successfully logged. If this is 0 then we are in cold start. |
| emergencyMode |
|
No comment on field. |
| emergencyModeLoaded |
|
No comment on field. |
| config |
|
No comment on field. |
| clientIsConnected |
|
No comment on field. |
type rpcOpts struct {
// This tracks how many times have successfully logged. If this is 0 then we
// are in cold start.
loadCounts atomic.Value
emergencyMode atomic.Value
emergencyModeLoaded atomic.Value
config atomic.Value
clientIsConnected atomic.Value
}
Functions
func CloseConnections
func CloseConnections() {
for k, v := range rpcConnectionsPool {
err := v.Close()
if err != nil {
Log.WithError(err).Error("closing connection")
} else {
rpcConnectionsPool = append(rpcConnectionsPool[:k], rpcConnectionsPool[k+1:]...)
}
}
}
Cognitive complexity: 7, Cyclomatic complexity: 3
func Connect
Connect will establish a connection to the RPC server specified in connection options
func Connect(
connConfig Config,
suppressRegister bool,
dispatcherFuncs map[string]interface{},
getGroupLoginFunc func(string, string) interface{},
emergencyModeFunc func(),
emergencyModeLoadedFunc func(),
) bool {
rpcConnectMu.Lock()
defer rpcConnectMu.Unlock()
setupConnectionConfig(connConfig, getGroupLoginFunc, emergencyModeFunc, emergencyModeLoadedFunc)
if values.ClientIsConnected() {
Log.Debug("Using RPC singleton for connection")
return true
}
if clientSingleton != nil {
return !values.GetEmergencyMode()
}
Log.Info("Setting new RPC connection!")
initializeClient()
loadDispatcher(dispatcherFuncs)
if funcClientSingleton == nil {
funcClientSingleton = dispatcher.NewFuncClient(clientSingleton)
}
handleLogin()
if !suppressRegister {
register()
go checkDisconnect()
}
return true
}
Cognitive complexity: 10, Cyclomatic complexity: 5
func Disconnect
func Disconnect() bool {
values.clientIsConnected.Store(false)
return true
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func EmitErrorEvent
func EmitErrorEvent(jobName string, funcName string, err error) {
if Instrument == nil {
return
}
job := Instrument.NewJob(jobName)
if emitErr := job.EventErr(funcName, err); emitErr != nil {
Log.WithError(emitErr).WithFields(logrus.Fields{
"jobName": jobName,
"funcName": funcName,
})
}
}
Cognitive complexity: 5, Cyclomatic complexity: 3
func EmitErrorEventKv
func EmitErrorEventKv(jobName string, funcName string, err error, kv map[string]string) {
if Instrument == nil {
return
}
job := Instrument.NewJob(jobName)
if emitErr := job.EventErrKv(funcName, err, kv); emitErr != nil {
Log.WithError(emitErr).WithFields(logrus.Fields{
"jobName": jobName,
"funcName": funcName,
"kv": kv,
})
}
}
Cognitive complexity: 5, Cyclomatic complexity: 3
func ForceConnected
ForceConnected only intended to be used in tests do not use it for any other thing
func ForceConnected(t *testing.T) {
values.clientIsConnected.Store(true)
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func FuncClientSingleton
FuncClientSingleton performs RPC call. This might be called before we have established RPC connection, in that case we perform a retry with exponential backoff ensuring indeed we can't connect to the rpc, this will eventually fall into emergency mode( That is handled outside of this function call)
func FuncClientSingleton(funcName string, request interface{}) (result interface{}, err error) {
be := backoff.Retry(func() error {
if !values.ClientIsConnected() {
return ErrRPCIsDown
}
result, err = funcClientSingleton.CallTimeout(funcName, request, GlobalRPCCallTimeout)
return nil
}, backoff.WithMaxRetries(
backoff.NewConstantBackOff(10*time.Millisecond), 3,
))
if be != nil {
err = be
}
return
}
Cognitive complexity: 7, Cyclomatic complexity: 3
func GroupLogin
func GroupLogin() bool {
return doGroupLogin(groupLogin)
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func IsEmergencyMode
func IsEmergencyMode() bool {
return values.GetEmergencyMode()
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func LoadCount
func LoadCount() int {
return values.GetLoadCounts()
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func Login
Login tries to login to the rpc sever. Returns true if it succeeds and false if it fails.
func Login() bool {
// I know this is extreme but rpc.Login() appears about 17 times and the
// methods appears to be sometimes called in goroutines.
//
// Unless someone audits to ensure all of where this appears the parent calls
// are not concurrent, this is a much safer solution.
v, _, _ := loginFlight.Do("Login", func() (interface{}, error) {
return loginBase(), nil
})
return v.(bool)
}
Cognitive complexity: 2, Cyclomatic complexity: 1
func NewSyncForcer
NewSyncForcer returns a new syncforcer with a connected redis with a key prefix synchronizer-group- for group synchronization control.
func NewSyncForcer(controller *storage.ConnectionHandler, getNodeDataFunc func() []byte) *SyncronizerForcer {
syncForcerOnce.Do(func() {
sf := &SyncronizerForcer{}
sf.store = &storage.RedisCluster{KeyPrefix: "synchronizer-group-", ConnectionHandler: controller}
sf.store.Connect()
sf.getNodeDataFunc = getNodeDataFunc
sf.isFirstConnection = true
syncForcerInstance = sf
})
if syncForcerInstance != nil {
syncForcerInstance.getNodeDataFunc = getNodeDataFunc
}
return syncForcerInstance
}
Cognitive complexity: 5, Cyclomatic complexity: 2
func Reset
func Reset() {
clientSingleton.Stop()
clientSingleton = nil
funcClientSingleton = nil
values.Reset()
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func ResetEmergencyMode
func ResetEmergencyMode() {
values.SetEmergencyMode(false)
values.SetEmergencyModeLoaded(false)
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func SetEmergencyMode
SetEmergencyMode used in tests to force emergency mode
func SetEmergencyMode(t *testing.T, value bool) {
t.Helper()
values.SetEmergencyMode(value)
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func SetLoadCounts
func SetLoadCounts(t *testing.T, value int) {
t.Helper()
values.SetLoadCounts(value)
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func (*Purger) Connect
Connect Connects to RPC
func (r *Purger) Connect() {
if !values.ClientIsConnected() {
Log.Error("RPC client is not connected, use Connect method 1st")
}
// setup RPC func if needed
if !addedFuncs["Ping"] {
dispatcher.AddFunc("Ping", func() bool {
return false
})
addedFuncs["Ping"] = true
}
if !addedFuncs["PurgeAnalyticsData"] {
dispatcher.AddFunc("PurgeAnalyticsData", func(data string) error {
return nil
})
addedFuncs["PurgeAnalyticsData"] = true
}
Log.Info("RPC Analytics client using singleton")
}
Cognitive complexity: 8, Cyclomatic complexity: 4
func (*Purger) PurgeCache
PurgeCache will pull the data from the in-memory store and drop it into the specified MongoDB collection
func (r *Purger) PurgeCache() {
if !values.ClientIsConnected() {
Log.Error("RPC client is not connected, use Connect method 1st")
}
if _, err := FuncClientSingleton("Ping", nil); err != nil {
Log.WithError(err).Error("Can't purge cache, failed to ping RPC")
return
}
for i := -1; i < 10; i++ {
var analyticsKeyName string
if i == -1 {
//if it's the first iteration, we look for tyk-system-analytics to maintain backwards compatibility or if analytics_config.enable_multiple_analytics_keys is disabled in the gateway
analyticsKeyName = ANALYTICS_KEYNAME
} else {
// keyname + serializationmethod
analyticsKeyName = fmt.Sprintf("%v_%v", ANALYTICS_KEYNAME, i)
}
analyticsValues := r.Store.GetAndDeleteSet(analyticsKeyName)
if len(analyticsValues) == 0 {
continue
}
keys, failedRecords := processAnalyticsValues(analyticsValues)
Log.Debugf("could not decode %v records", failedRecords)
data, err := json.Marshal(keys)
if err != nil {
Log.WithError(err).Error("Failed to marshal analytics data")
return
}
// Send keys to RPC
if _, err := FuncClientSingleton("PurgeAnalyticsData", string(data)); err != nil {
EmitErrorEvent(FuncClientSingletonCall, "PurgeAnalyticsData", err)
Log.Warn("Failed to call purge, retrying: ", err)
}
}
}
Cognitive complexity: 16, Cyclomatic complexity: 8
func (*SyncronizerForcer) GetIsFirstConnection
func (sf *SyncronizerForcer) GetIsFirstConnection() bool {
return sf.isFirstConnection
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func (*SyncronizerForcer) GroupLoginCallback
GroupLoginCallback checks if the groupID key exists in the storage to turn on/off ForceSync param. If the the key doesn't exists in the storage, it creates it and set ForceSync to true
func (sf *SyncronizerForcer) GroupLoginCallback(userKey string, groupID string) interface{} {
shouldForce := sf.isFirstConnection
sf.SetFirstConnection(false)
_, err := sf.store.GetKey(groupID)
if err != nil && errors.Is(err, storage.ErrKeyNotFound) {
shouldForce = true
err = sf.store.SetKey(groupID, "", 0)
if err != nil {
Log.Error("error setting syncforcer key", err)
}
Log.Info("Forcing MDCB synchronization for group:", groupID)
}
return apidef.GroupLoginRequest{
UserKey: userKey,
GroupID: groupID,
Node: sf.getNodeDataFunc(),
ForceSync: shouldForce,
}
}
Cognitive complexity: 6, Cyclomatic complexity: 4
func (*SyncronizerForcer) SetFirstConnection
func (sf *SyncronizerForcer) SetFirstConnection(isFirstConnection bool) {
sf.isFirstConnection = isFirstConnection
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func (*rpcOpts) GetEmergencyMode
func (r *rpcOpts) GetEmergencyMode() bool {
if v := r.emergencyMode.Load(); v != nil {
return v.(bool)
}
return false
}
Cognitive complexity: 2, Cyclomatic complexity: 2
func (*rpcOpts) GetEmergencyModeLoaded
func (r *rpcOpts) GetEmergencyModeLoaded() bool {
if v := r.emergencyModeLoaded.Load(); v != nil {
return v.(bool)
}
return false
}
Cognitive complexity: 2, Cyclomatic complexity: 2
func (*rpcOpts) GetLoadCounts
func (r *rpcOpts) GetLoadCounts() int {
if v := r.loadCounts.Load(); v != nil {
return v.(int)
}
return 0
}
Cognitive complexity: 2, Cyclomatic complexity: 2
func (*rpcOpts) IncrLoadCounts
func (r *rpcOpts) IncrLoadCounts(n int) {
if v := r.loadCounts.Load(); v != nil {
r.loadCounts.Store(v.(int) + n)
} else {
r.loadCounts.Store(n)
}
}
Cognitive complexity: 4, Cyclomatic complexity: 2
func (*rpcOpts) Reset
func (r *rpcOpts) Reset() {
r.loadCounts.Store(0)
r.emergencyMode.Store(false)
r.emergencyModeLoaded.Store(false)
r.clientIsConnected.Store(false)
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func (*rpcOpts) SetEmergencyMode
func (r *rpcOpts) SetEmergencyMode(n bool) {
r.emergencyMode.Store(n)
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func (*rpcOpts) SetEmergencyModeLoaded
func (r *rpcOpts) SetEmergencyModeLoaded(n bool) {
r.emergencyModeLoaded.Store(n)
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func (*rpcOpts) SetLoadCounts
func (r *rpcOpts) SetLoadCounts(n int) {
r.loadCounts.Store(n)
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func (Purger) PurgeLoop
PurgeLoop starts the loop that will pull data out of the in-memory store and into RPC.
func (r Purger) PurgeLoop(ctx context.Context, interval time.Duration) {
tick := time.NewTicker(interval * time.Second)
for {
select {
case <-ctx.Done():
return
case <-tick.C:
r.PurgeCache()
}
}
}
Cognitive complexity: 5, Cyclomatic complexity: 4
func (rpcOpts) ClientIsConnected
func (r rpcOpts) ClientIsConnected() bool {
if v := r.clientIsConnected.Load(); v != nil {
return v.(bool)
}
return false
}
Cognitive complexity: 2, Cyclomatic complexity: 2
func (rpcOpts) Config
func (r rpcOpts) Config() Config {
if v := r.config.Load(); v != nil {
return v.(Config)
}
return Config{}
}
Cognitive complexity: 3, Cyclomatic complexity: 2
Private functions
func checkDisconnect
checkDisconnect ()
func decodeAnalyticsRecord
decodeAnalyticsRecord (encoded interface{}) (analytics.AnalyticsRecord, error)
References: analytics.AnalyticsRecord, msgpack.Unmarshal.
func doGroupLogin
doGroupLogin (login func() error) bool
func doLoginWithRetries
doLoginWithRetries uses login as a login function by calling it with retries until it succeeds or ultimately fail.
hasAPIKey is called to check whether config.APIKey is set if this function returns false we exit the process.
isGroup returns true if the config.GroupID is set. If this returns true then we perform group login.
doLoginWithRetries (login,group func() error, hasAPIKey,isGroup func() bool) bool
func groupLogin
groupLogin () error
func handleLogin
handleLogin ()
func hasAPIKey
hasAPIKey () bool
func initializeClient
initializeClient ()
References: gorpc.NewTCPClient, gorpc.NewTLSClient, gorpc.NilErrorLogger, logrus.DebugLevel, tls.Config, uuid.New.
func isGroup
isGroup () bool
func loadDispatcher
loadDispatcher (dispatcherFuncs map[string]interface{})
func login
login () error
func loginBase
loginBase () bool
func onConnectFunc
onConnectFunc (conn net.Conn) (net.Conn, string, error)
func processAnalyticsValues
processAnalyticsValues (analyticsValues []interface{}) ([]interface{}, int)
func recoverOp
recoverOp (fn func() error) func() error
func register
register ()
References: uuid.New.
func setupConnectionConfig
setupConnectionConfig (connConfig Config, loginFunc func(string, string) interface{}, emergencyModeFunc func(), emergencyModeLoadedFunc func())
func setupDialFunction
setupDialFunction (connID string)
References: net.Conn, net.Dialer, strconv.FormatBool, time.Second, tls.Config, tls.DialWithDialer.
Tests
Files: 3. Third party imports: 2. Imports from organisation: 0. Tests: 8. Benchmarks: 0.
Vars
var rc *storage.ConnectionHandler