Go API Documentation

github.com/TykTechnologies/tyk/ee/middleware/streams

No package summary is available.

Package

Files: 6. Third party imports: 5. Imports from organisation: 0. Tests: 0. Benchmarks: 0.

Constants

const (
	// ExtensionTykStreaming is the OAS extension for Tyk streaming.
	ExtensionTykStreaming	= "x-tyk-streaming"
	StreamGCInterval	= 1 * time.Minute
)

Vars

var (
	ErrResponseWriterNotHijackable = errors.New("ResponseWriter is not hijackable")
)

GlobalStreamCounter is used for testing.

var GlobalStreamCounter atomic.Int64

Middleware implements model.Middleware.

var _ model.Middleware = &Middleware{}
var unsafeComponents = []string{
	// Inputs
	"csv", "dynamic", "file", "inproc", "socket", "socket_server", "stdin", "subprocess",

	// Processors
	"command", "subprocess", "wasm",

	// Outputs
	"file", "inproc", "socket",

	// Caches
	"file",
}

Types

APISpec

APISpec is a subset of gateway.APISpec for the values the middleware consumes.

Field name Field type Comment
APIID

string

No comment on field.
Name

string

No comment on field.
IsOAS

bool

No comment on field.
OAS

oas.OAS

No comment on field.
StripListenPath

model.StripListenPathFunc

No comment on field.
type APISpec struct {
	APIID	string
	Name	string
	IsOAS	bool
	OAS	oas.OAS

	StripListenPath	model.StripListenPathFunc
}

BaseMiddleware

BaseMiddleware is the subset of BaseMiddleware APIs that the middleware uses.

Field name Field type Comment
type

any

No comment on field.
type BaseMiddleware interface {
	model.LoggerProvider
}

Gateway

Gateway is the subset of Gateway APIs that the middleware uses.

Field name Field type Comment
type

any

No comment on field.
type Gateway interface {
	model.ConfigProvider
	model.ReplaceTykVariables
}

HandleFuncAdapter

This type doesn't have documentation.

Field name Field type Comment
StreamID

string

No comment on field.
StreamManager

*Manager

No comment on field.
StreamMiddleware

*Middleware

No comment on field.
Muxer

*mux.Router

No comment on field.
Logger

*logrus.Entry

No comment on field.
type HandleFuncAdapter struct {
	StreamID		string
	StreamManager		*Manager
	StreamMiddleware	*Middleware
	Muxer			*mux.Router
	Logger			*logrus.Entry
}

Manager

Manager is responsible for creating a single stream.

Field name Field type Comment
streams

sync.Map

No comment on field.
routeLock

sync.Mutex

No comment on field.
muxer

*mux.Router

No comment on field.
mw

*Middleware

No comment on field.
dryRun

bool

No comment on field.
listenPaths

[]string

No comment on field.
activityCounter

atomic.Int32

No comment on field.
analyticsFactory

StreamAnalyticsFactory

No comment on field.
type Manager struct {
	streams			sync.Map
	routeLock		sync.Mutex
	muxer			*mux.Router
	mw			*Middleware
	dryRun			bool
	listenPaths		[]string
	activityCounter		atomic.Int32	// Counts active subscriptions, requests.
	analyticsFactory	StreamAnalyticsFactory
}

Middleware

Middleware implements a streaming middleware.

Field name Field type Comment
Spec

*APISpec

No comment on field.
Gw

Gateway

No comment on field.
base

BaseMiddleware

No comment on field.
createStreamManagerLock

sync.Mutex

No comment on field.
StreamManagerCache

sync.Map

No comment on field.
ctx

context.Context

No comment on field.
cancel

context.CancelFunc

No comment on field.
allowedUnsafe

[]string

No comment on field.
defaultManager

*Manager

No comment on field.
analyticsFactory

StreamAnalyticsFactory

No comment on field.
type Middleware struct {
	Spec	*APISpec
	Gw	Gateway

	base	BaseMiddleware

	createStreamManagerLock	sync.Mutex
	StreamManagerCache	sync.Map	// Map of payload hash to Manager

	ctx			context.Context
	cancel			context.CancelFunc
	allowedUnsafe		[]string
	defaultManager		*Manager
	analyticsFactory	StreamAnalyticsFactory
}

NoopStreamAnalyticsFactory

This type doesn't have documentation.

type NoopStreamAnalyticsFactory struct{}

NoopStreamAnalyticsRecorder

This type doesn't have documentation.

type NoopStreamAnalyticsRecorder struct{}

Stream

Stream is a wrapper around stream

Field name Field type Comment
allowedUnsafe

[]string

No comment on field.
streamConfig

string

No comment on field.
stream

*service.Stream

No comment on field.
log

*logrus.Logger

No comment on field.
type Stream struct {
	allowedUnsafe	[]string
	streamConfig	string
	stream		*service.Stream
	log		*logrus.Logger
}

StreamAnalyticsFactory

This type doesn't have documentation.

Field name Field type Comment
type

any

No comment on field.
type StreamAnalyticsFactory interface {
	CreateRecorder(r *http.Request) StreamAnalyticsRecorder
	CreateResponseWriter(w http.ResponseWriter, r *http.Request, streamID string, recorder StreamAnalyticsRecorder) http.ResponseWriter
}

StreamAnalyticsRecorder

This type doesn't have documentation.

Field name Field type Comment
type

any

No comment on field.
type StreamAnalyticsRecorder interface {
	PrepareRecord(r *http.Request)
	RecordHit(statusCode int, latency analytics.Latency) error
}

StreamsConfig

StreamsConfig represents a stream configuration.

Field name Field type Comment
Info

struct { Version string `json:"version"` }

No comment on field.
Streams

map[string]any

No comment on field.
type StreamsConfig struct {
	Info	struct {
		Version string `json:"version"`
	}	`json:"info"`

	Streams	map[string]any	`json:"streams"`
}

Functions

func GetHTTPPaths

GetHTTPPaths is the main function to get HTTP paths from the stream configuration.

func GetHTTPPaths(streamConfig map[string]interface{}) []string {
	var paths []string
	for _, component := range []string{"input", "output"} {
		if componentMap, ok := streamConfig[component].(map[string]interface{}); ok {
			paths = append(paths, extractHTTPServerPaths(componentMap)...)
			if brokerConfig, ok := componentMap["broker"].(map[string]interface{}); ok {
				paths = append(paths, handleBroker(brokerConfig)...)
			}
		}
	}
	// remove duplicates
	var deduplicated []string
	exists := map[string]struct{}{}
	for _, item := range paths {
		if _, ok := exists[item]; !ok {
			deduplicated = append(deduplicated, item)
			exists[item] = struct{}{}
		}
	}
	return deduplicated
}

Cognitive complexity: 20, Cyclomatic complexity: 6

func NewAPISpec

NewAPISpec creates a new APISpec object based on the required inputs. The resulting object is a subset of *gateway.APISpec.

func NewAPISpec(id string, name string, isOasDef bool, oasDef oas.OAS, stripListenPath model.StripListenPathFunc) *APISpec {
	return &APISpec{
		APIID:			id,
		Name:			name,
		IsOAS:			isOasDef,
		OAS:			oasDef,
		StripListenPath:	stripListenPath,
	}
}

Cognitive complexity: 1, Cyclomatic complexity: 1

func NewMiddleware

NewMiddleware returns a new instance of Middleware.

func NewMiddleware(gw Gateway, mw BaseMiddleware, spec *APISpec, analyticsFactory StreamAnalyticsFactory) *Middleware {
	return &Middleware{
		base:			mw,
		Gw:			gw,
		Spec:			spec,
		analyticsFactory:	analyticsFactory,
	}
}

Cognitive complexity: 1, Cyclomatic complexity: 1

func NewStream

NewStream creates a new stream without initializing it

func NewStream(allowUnsafe []string) *Stream {
	logger := logrus.New()
	logger.Out = log.Writer()
	logger.Formatter = &logrus.TextFormatter{
		FullTimestamp: true,
	}
	logger.Level = logrus.DebugLevel

	if len(allowUnsafe) > 0 {
		logger.Warnf("Allowing unsafe components: %v", allowUnsafe)
	}

	return &Stream{
		log:		logger,
		allowedUnsafe:	allowUnsafe,
	}
}

Cognitive complexity: 4, Cyclomatic complexity: 2

Uses: log.Writer, logrus.DebugLevel, logrus.New, logrus.TextFormatter.

func (*HandleFuncAdapter) HandleFunc

func (h *HandleFuncAdapter) HandleFunc(path string, f func(http.ResponseWriter, *http.Request)) {
	h.Logger.Debugf("Registering streaming handleFunc for path: %s", path)

	if h.StreamMiddleware == nil || h.Muxer == nil {
		h.Logger.Error("Middleware or muxer is nil")
		return
	}

	h.StreamManager.routeLock.Lock()
	h.Muxer.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
		recorder := h.StreamManager.analyticsFactory.CreateRecorder(r)
		analyticsResponseWriter := h.StreamManager.analyticsFactory.CreateResponseWriter(w, r, h.StreamID, recorder)

		h.StreamManager.activityCounter.Add(1)
		defer h.StreamManager.activityCounter.Add(-1)
		f(analyticsResponseWriter, r)
	})
	h.StreamManager.routeLock.Unlock()
	h.Logger.Debugf("Registered handler for path: %s", path)
}

Cognitive complexity: 3, Cyclomatic complexity: 3

Uses: http.Request, http.ResponseWriter.

func (*Manager) SetAnalyticsFactory

func (sm *Manager) SetAnalyticsFactory(factory StreamAnalyticsFactory) {
	if factory == nil {
		factory = &NoopStreamAnalyticsFactory{}
	}
	sm.analyticsFactory = factory
}

Cognitive complexity: 3, Cyclomatic complexity: 2

func (*Middleware) CreateStreamManager

CreateStreamManager creates or retrieves a stream manager based on the request.

func (s *Middleware) CreateStreamManager(r *http.Request) *Manager {
	streamsConfig := s.getStreamsConfig(r)
	configJSON, _ := json.Marshal(streamsConfig)
	cacheKey := fmt.Sprintf("%x", sha256.Sum256(configJSON))

	s.createStreamManagerLock.Lock()
	defer s.createStreamManagerLock.Unlock()

	s.Logger().Debug("Attempting to load stream manager from cache")
	s.Logger().Debugf("Cache key: %s", cacheKey)
	if cachedManager, found := s.StreamManagerCache.Load(cacheKey); found {
		s.Logger().Debug("Found cached stream manager")
		return cachedManager.(*Manager)
	}

	newManager := &Manager{
		muxer:			mux.NewRouter(),
		mw:			s,
		dryRun:			r == nil,
		activityCounter:	atomic.Int32{},
		analyticsFactory:	&NoopStreamAnalyticsFactory{},
	}
	newManager.initStreams(r, streamsConfig)

	if r != nil {
		s.StreamManagerCache.Store(cacheKey, newManager)
	}
	return newManager
}

Cognitive complexity: 7, Cyclomatic complexity: 3

Uses: atomic.Int32, fmt.Sprintf, json.Marshal, mux.NewRouter, sha256.Sum256.

func (*Middleware) EnabledForSpec

EnabledForSpec checks if streaming is enabled on the config.

func (s *Middleware) EnabledForSpec() bool {
	s.Logger().Debug("Checking if streaming is enabled")

	streamingConfig := s.Gw.GetConfig().Streaming
	s.Logger().Debugf("Streaming config: %+v", streamingConfig)

	if streamingConfig.Enabled {
		s.Logger().Debug("Streaming is enabled in the config")
		s.allowedUnsafe = streamingConfig.AllowUnsafe
		s.Logger().Debugf("Allowed unsafe components: %v", s.allowedUnsafe)

		config := s.getStreamsConfig(nil)
		GlobalStreamCounter.Add(int64(len(config.Streams)))
		s.Logger().Debug("Total streams count: ", len(config.Streams))

		return len(config.Streams) != 0
	}

	s.Logger().Debug("Streaming is not enabled in the config")
	return false
}

Cognitive complexity: 2, Cyclomatic complexity: 2

func (*Middleware) GC

GC removes inactive stream managers.

func (s *Middleware) GC() {
	s.Logger().Debug("Starting garbage collection for inactive stream managers")

	s.StreamManagerCache.Range(func(key, value interface{}) bool {
		manager := value.(*Manager)
		if manager == s.defaultManager {
			return true
		}

		if manager.activityCounter.Load() <= 0 {
			s.Logger().Infof("Removing inactive stream manager: %v", key)
			manager.streams.Range(func(streamKey, streamValue interface{}) bool {
				streamID := streamKey.(string)
				err := manager.removeStream(streamID)
				if err != nil {
					s.Logger().WithError(err).Errorf("Error removing stream %s", streamID)
				}
				return true
			})
			s.StreamManagerCache.Delete(key)
		}

		return true
	})
}

Cognitive complexity: 10, Cyclomatic complexity: 4

func (*Middleware) GetStreamManager

func (s *Middleware) GetStreamManager() *Manager {
	return s.defaultManager
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (*Middleware) Init

Init initializes the middleware

func (s *Middleware) Init() {
	s.Logger().Debug("Initializing Middleware")
	s.ctx, s.cancel = context.WithCancel(context.Background())

	s.Logger().Debug("Initializing default stream manager")
	s.defaultManager = s.CreateStreamManager(nil)

	s.Logger().Debug("Initializing stream analytics factory")
	if s.analyticsFactory == nil {
		s.SetAnalyticsFactory(&NoopStreamAnalyticsFactory{})
	} else {
		s.SetAnalyticsFactory(s.analyticsFactory)
	}

	// Start garbage collection routine
	go func() {
		ticker := time.NewTicker(StreamGCInterval)
		defer ticker.Stop()

		for {
			select {
			case <-ticker.C:
				s.GC()
			case <-s.ctx.Done():
				return
			}
		}
	}()
}

Cognitive complexity: 11, Cyclomatic complexity: 5

Uses: context.Background, context.WithCancel, time.NewTicker.

func (*Middleware) Logger

Logger returns a logger with middleware filled out.

func (s *Middleware) Logger() *logrus.Entry {
	return s.base.Logger().WithField("mw", s.Name())
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (*Middleware) Name

Name returns the name for the middleware.

func (s *Middleware) Name() string {
	return "StreamingMiddleware"
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (*Middleware) ProcessRequest

ProcessRequest will handle the streaming functionality.

func (s *Middleware) ProcessRequest(w http.ResponseWriter, r *http.Request, _ interface{}) (error, int) {
	strippedPath := s.Spec.StripListenPath(r.URL.Path)
	if !s.defaultManager.hasPath(strippedPath) {
		s.Logger().Debugf("Path not found: %s", strippedPath)
		return errors.New("not found"), http.StatusNotFound
	}

	s.Logger().Debugf("Processing request: %s, %s", r.URL.Path, strippedPath)

	newRequest := &http.Request{
		Method:	r.Method,
		URL:	&url.URL{Scheme: r.URL.Scheme, Host: r.URL.Host, Path: strippedPath},
	}

	if !s.defaultManager.muxer.Match(newRequest, &mux.RouteMatch{}) {
		return nil, http.StatusOK
	}

	var match mux.RouteMatch
	streamManager := s.CreateStreamManager(r)
	streamManager.SetAnalyticsFactory(s.analyticsFactory)
	streamManager.routeLock.Lock()
	streamManager.muxer.Match(newRequest, &match)
	streamManager.routeLock.Unlock()

	// direct Bento handler
	handler, ok := match.Handler.(http.HandlerFunc)
	if !ok {
		return errors.New("invalid route handler"), http.StatusInternalServerError
	}

	streamManager.activityCounter.Add(1)
	defer streamManager.activityCounter.Add(-1)

	handler.ServeHTTP(w, r)

	return nil, middleware.StatusRespond
}

Cognitive complexity: 10, Cyclomatic complexity: 4

Uses: errors.New, http.HandlerFunc, http.Request, http.StatusInternalServerError, http.StatusNotFound, http.StatusOK, middleware.StatusRespond, mux.RouteMatch, url.URL.

func (*Middleware) SetAnalyticsFactory

func (s *Middleware) SetAnalyticsFactory(factory StreamAnalyticsFactory) {
	if factory == nil {
		factory = &NoopStreamAnalyticsFactory{}
	}
	s.analyticsFactory = factory
	s.defaultManager.SetAnalyticsFactory(factory)
}

Cognitive complexity: 3, Cyclomatic complexity: 2

func (*Middleware) Unload

Unload closes and remove active streams. This method is called when the API is removed.

func (s *Middleware) Unload() {
	s.Logger().Debugf("Unloading streaming middleware %s", s.Spec.Name)

	totalStreams := 0
	s.cancel()

	// Reset cached streams and stop the underlying Bento instances
	s.StreamManagerCache.Range(func(_, value interface{}) bool {
		manager, ok := value.(*Manager)
		if !ok {
			return true
		}
		manager.streams.Range(func(_, streamValue interface{}) bool {
			totalStreams++
			s.resetStream(streamValue)
			return true	// continue iterating
		})
		return true
	})

	// Finally, reset the default manager and stop the underlying Bento instance
	s.defaultManager.streams.Range(func(_, streamValue interface{}) bool {
		totalStreams++
		s.resetStream(streamValue)
		return true	// continue iterating
	})

	GlobalStreamCounter.Add(-int64(totalStreams))
	s.StreamManagerCache = sync.Map{}
	s.Logger().Info("All streams successfully removed")
}

Cognitive complexity: 9, Cyclomatic complexity: 2

Uses: sync.Map.

func (*NoopStreamAnalyticsFactory) CreateRecorder

func (n *NoopStreamAnalyticsFactory) CreateRecorder(r *http.Request) StreamAnalyticsRecorder {
	return &NoopStreamAnalyticsRecorder{}
}

Cognitive complexity: 1, Cyclomatic complexity: 1

func (*NoopStreamAnalyticsFactory) CreateResponseWriter

func (n *NoopStreamAnalyticsFactory) CreateResponseWriter(w http.ResponseWriter, r *http.Request, streamID string, recorder StreamAnalyticsRecorder) http.ResponseWriter {
	return w
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (*NoopStreamAnalyticsRecorder) PrepareRecord

func (n *NoopStreamAnalyticsRecorder) PrepareRecord(r *http.Request) {
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (*NoopStreamAnalyticsRecorder) RecordHit

func (n *NoopStreamAnalyticsRecorder) RecordHit(statusCode int, latency analytics.Latency) error {
	return nil
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (*Stream) GetConfig

GetConfig returns the configuration of the stream

func (s *Stream) GetConfig() string {
	return s.streamConfig
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (*Stream) Reset

Reset stops the stream

func (s *Stream) Reset() error {
	return s.Stop()
}

Cognitive complexity: 0, Cyclomatic complexity: 1

func (*Stream) SetLogger

SetLogger to be used by the stream

func (s *Stream) SetLogger(logger *logrus.Logger) {
	if logger != nil {
		s.log = logger
	}
}

Cognitive complexity: 2, Cyclomatic complexity: 2

func (*Stream) Start

Start loads up the configuration and starts the stream. Non blocking

func (s *Stream) Start(config map[string]interface{}, mux service.HTTPMultiplexer) error {
	s.log.Debugf("Starting stream")

	configPayload, err := yaml.Marshal(config)
	if err != nil {
		s.log.Errorf("Failed to marshal config: %v", err)
		return err
	}

	configPayload = s.removeUnsafe(configPayload)

	s.log.Debugf("Building new stream")
	builder := service.NewStreamBuilder()

	err = builder.SetYAML(string(configPayload))
	if err != nil {
		s.log.Errorf("Failed to set YAML: %v", err)
		return err
	}

	if mux != nil {
		builder.SetHTTPMux(mux)
	}

	stream, err := builder.Build()
	if err != nil {
		s.log.Errorf("Failed to build stream: %v", err)
		return err
	}

	s.streamConfig = string(configPayload)
	s.stream = stream

	s.log.Debugf("Stream built successfully, starting it")

	errChan := make(chan error, 1)
	go func() {
		s.log.Infof("Starting stream")
		errChan <- stream.Run(context.Background())
	}()

	select {
	case err := <-errChan:
		if err != nil {
			s.log.Errorf("Stream encountered an error: %v", err)
			return err
		}
	case <-time.After(100 * time.Millisecond):
		// If no error after a short delay, assume stream started successfully
	}

	s.log.Debugf("Stream started successfully")
	return nil
}

Cognitive complexity: 15, Cyclomatic complexity: 8

Uses: context.Background, service.NewStreamBuilder, time.After, time.Millisecond.

func (*Stream) Stop

Stop cleans up the stream

func (s *Stream) Stop() error {
	s.log.Printf("Stopping stream")

	if s.stream == nil {
		s.log.Printf("No active stream to stop")
		return nil
	}

	stopCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	errChan := make(chan error, 1)
	go func() {
		errChan <- s.stream.Stop(stopCtx)
	}()

	select {
	case err := <-errChan:
		if err != nil {
			s.log.Printf("Error stopping stream: %v", err)
		} else {
			s.log.Printf("Stream stopped successfully")
		}
	case <-stopCtx.Done():
		s.log.Printf("Timeout while stopping stream")
	}

	s.streamConfig = ""
	s.stream = nil

	return nil
}

Cognitive complexity: 10, Cyclomatic complexity: 5

Uses: context.Background, context.WithTimeout, time.Second.

Private functions

func extractHTTPServerPaths

extractHTTPServerPaths is a helper function to extract HTTP server paths from a given configuration.

extractHTTPServerPaths (config map[string]interface{}) []string

func extractPaths

Helper function to extract paths from an http_server configuration

extractPaths (httpConfig map[string]interface{}) []string

func handleBroker

handleBroker is a helper function to handle broker configurations.

handleBroker (brokerConfig map[string]interface{}) []string

func createStream

createStream creates a new stream

createStream (streamID string, config map[string]interface{}) error
References: fmt.Sprintf.

func hasPath

hasPath (path string) bool
References: strings.TrimPrefix.

func initStreams

initStreams (r *http.Request, config *StreamsConfig)
References: http.Request, http.ResponseWriter, mux.NewRouter.

func removeStream

removeStream removes a stream

removeStream (streamID string) error
References: fmt.Errorf.

func setUpOrDryRunStream

setUpOrDryRunStream (streamConfig any, streamID string)

func getStreamsConfig

getStreamsConfig (r *http.Request) *StreamsConfig

func processStreamsConfig

processStreamsConfig (r *http.Request, streams map[string]any, config *StreamsConfig)
References: json.Marshal, json.Unmarshal.

func resetStream

resetStream (streamValue any)

func removeUnsafe

removeUnsafe (yamlBytes []byte) []byte
References: fmt.Sprintf, regexp.MustCompile, regexp.QuoteMeta, strings.Contains.


Tests

Files: 1. Third party imports: 2. Imports from organisation: 0. Tests: 3. Benchmarks: 0.

Test functions

TestRemoveAndWhitelistUnsafeComponents

References: testing.T.

TestStreamStart

References: require.Error, require.NoError, require.NotNil, testing.T.

TestStreamStop

References: require.NoError, require.NotNil, testing.T.