github.com/TykTechnologies/tyk/ee/middleware/streams
No package summary is available.
Package
Files: 6. Third party imports: 5. Imports from organisation: 0. Tests: 0. Benchmarks: 0.
Constants
const (
// ExtensionTykStreaming is the OAS extension for Tyk streaming.
ExtensionTykStreaming = "x-tyk-streaming"
StreamGCInterval = 1 * time.Minute
)
Vars
var (
ErrResponseWriterNotHijackable = errors.New("ResponseWriter is not hijackable")
)
GlobalStreamCounter is used for testing.
var GlobalStreamCounter atomic.Int64
Middleware implements model.Middleware.
var _ model.Middleware = &Middleware{}
var unsafeComponents = []string{
// Inputs
"csv", "dynamic", "file", "inproc", "socket", "socket_server", "stdin", "subprocess",
// Processors
"command", "subprocess", "wasm",
// Outputs
"file", "inproc", "socket",
// Caches
"file",
}
Types
APISpec
APISpec is a subset of gateway.APISpec for the values the middleware consumes.
| Field name | Field type | Comment |
|---|---|---|
| APIID |
|
No comment on field. |
| Name |
|
No comment on field. |
| IsOAS |
|
No comment on field. |
| OAS |
|
No comment on field. |
| StripListenPath |
|
No comment on field. |
type APISpec struct {
APIID string
Name string
IsOAS bool
OAS oas.OAS
StripListenPath model.StripListenPathFunc
}
BaseMiddleware
BaseMiddleware is the subset of BaseMiddleware APIs that the middleware uses.
| Field name | Field type | Comment |
|---|---|---|
| type |
|
No comment on field. |
type BaseMiddleware interface {
model.LoggerProvider
}
Gateway
Gateway is the subset of Gateway APIs that the middleware uses.
| Field name | Field type | Comment |
|---|---|---|
| type |
|
No comment on field. |
type Gateway interface {
model.ConfigProvider
model.ReplaceTykVariables
}
HandleFuncAdapter
This type doesn't have documentation.
| Field name | Field type | Comment |
|---|---|---|
| StreamID |
|
No comment on field. |
| StreamManager |
|
No comment on field. |
| StreamMiddleware |
|
No comment on field. |
| Muxer |
|
No comment on field. |
| Logger |
|
No comment on field. |
type HandleFuncAdapter struct {
StreamID string
StreamManager *Manager
StreamMiddleware *Middleware
Muxer *mux.Router
Logger *logrus.Entry
}
Manager
Manager is responsible for creating a single stream.
| Field name | Field type | Comment |
|---|---|---|
| streams |
|
No comment on field. |
| routeLock |
|
No comment on field. |
| muxer |
|
No comment on field. |
| mw |
|
No comment on field. |
| dryRun |
|
No comment on field. |
| listenPaths |
|
No comment on field. |
| activityCounter |
|
No comment on field. |
| analyticsFactory |
|
No comment on field. |
type Manager struct {
streams sync.Map
routeLock sync.Mutex
muxer *mux.Router
mw *Middleware
dryRun bool
listenPaths []string
activityCounter atomic.Int32 // Counts active subscriptions, requests.
analyticsFactory StreamAnalyticsFactory
}
Middleware
Middleware implements a streaming middleware.
| Field name | Field type | Comment |
|---|---|---|
| Spec |
|
No comment on field. |
| Gw |
|
No comment on field. |
| base |
|
No comment on field. |
| createStreamManagerLock |
|
No comment on field. |
| StreamManagerCache |
|
No comment on field. |
| ctx |
|
No comment on field. |
| cancel |
|
No comment on field. |
| allowedUnsafe |
|
No comment on field. |
| defaultManager |
|
No comment on field. |
| analyticsFactory |
|
No comment on field. |
type Middleware struct {
Spec *APISpec
Gw Gateway
base BaseMiddleware
createStreamManagerLock sync.Mutex
StreamManagerCache sync.Map // Map of payload hash to Manager
ctx context.Context
cancel context.CancelFunc
allowedUnsafe []string
defaultManager *Manager
analyticsFactory StreamAnalyticsFactory
}
NoopStreamAnalyticsFactory
This type doesn't have documentation.
type NoopStreamAnalyticsFactory struct{}
NoopStreamAnalyticsRecorder
This type doesn't have documentation.
type NoopStreamAnalyticsRecorder struct{}
Stream
Stream is a wrapper around stream
| Field name | Field type | Comment |
|---|---|---|
| allowedUnsafe |
|
No comment on field. |
| streamConfig |
|
No comment on field. |
| stream |
|
No comment on field. |
| log |
|
No comment on field. |
type Stream struct {
allowedUnsafe []string
streamConfig string
stream *service.Stream
log *logrus.Logger
}
StreamAnalyticsFactory
This type doesn't have documentation.
| Field name | Field type | Comment |
|---|---|---|
| type |
|
No comment on field. |
type StreamAnalyticsFactory interface {
CreateRecorder(r *http.Request) StreamAnalyticsRecorder
CreateResponseWriter(w http.ResponseWriter, r *http.Request, streamID string, recorder StreamAnalyticsRecorder) http.ResponseWriter
}
StreamAnalyticsRecorder
This type doesn't have documentation.
| Field name | Field type | Comment |
|---|---|---|
| type |
|
No comment on field. |
type StreamAnalyticsRecorder interface {
PrepareRecord(r *http.Request)
RecordHit(statusCode int, latency analytics.Latency) error
}
StreamsConfig
StreamsConfig represents a stream configuration.
| Field name | Field type | Comment |
|---|---|---|
| Info |
|
No comment on field. |
| Streams |
|
No comment on field. |
type StreamsConfig struct {
Info struct {
Version string `json:"version"`
} `json:"info"`
Streams map[string]any `json:"streams"`
}
Functions
func GetHTTPPaths
GetHTTPPaths is the main function to get HTTP paths from the stream configuration.
func GetHTTPPaths(streamConfig map[string]interface{}) []string {
var paths []string
for _, component := range []string{"input", "output"} {
if componentMap, ok := streamConfig[component].(map[string]interface{}); ok {
paths = append(paths, extractHTTPServerPaths(componentMap)...)
if brokerConfig, ok := componentMap["broker"].(map[string]interface{}); ok {
paths = append(paths, handleBroker(brokerConfig)...)
}
}
}
// remove duplicates
var deduplicated []string
exists := map[string]struct{}{}
for _, item := range paths {
if _, ok := exists[item]; !ok {
deduplicated = append(deduplicated, item)
exists[item] = struct{}{}
}
}
return deduplicated
}
Cognitive complexity: 20, Cyclomatic complexity: 6
func NewAPISpec
NewAPISpec creates a new APISpec object based on the required inputs.
The resulting object is a subset of *gateway.APISpec.
func NewAPISpec(id string, name string, isOasDef bool, oasDef oas.OAS, stripListenPath model.StripListenPathFunc) *APISpec {
return &APISpec{
APIID: id,
Name: name,
IsOAS: isOasDef,
OAS: oasDef,
StripListenPath: stripListenPath,
}
}
Cognitive complexity: 1, Cyclomatic complexity: 1
func NewMiddleware
NewMiddleware returns a new instance of Middleware.
func NewMiddleware(gw Gateway, mw BaseMiddleware, spec *APISpec, analyticsFactory StreamAnalyticsFactory) *Middleware {
return &Middleware{
base: mw,
Gw: gw,
Spec: spec,
analyticsFactory: analyticsFactory,
}
}
Cognitive complexity: 1, Cyclomatic complexity: 1
func NewStream
NewStream creates a new stream without initializing it
func NewStream(allowUnsafe []string) *Stream {
logger := logrus.New()
logger.Out = log.Writer()
logger.Formatter = &logrus.TextFormatter{
FullTimestamp: true,
}
logger.Level = logrus.DebugLevel
if len(allowUnsafe) > 0 {
logger.Warnf("Allowing unsafe components: %v", allowUnsafe)
}
return &Stream{
log: logger,
allowedUnsafe: allowUnsafe,
}
}
Cognitive complexity: 4, Cyclomatic complexity: 2
func (*HandleFuncAdapter) HandleFunc
func (h *HandleFuncAdapter) HandleFunc(path string, f func(http.ResponseWriter, *http.Request)) {
h.Logger.Debugf("Registering streaming handleFunc for path: %s", path)
if h.StreamMiddleware == nil || h.Muxer == nil {
h.Logger.Error("Middleware or muxer is nil")
return
}
h.StreamManager.routeLock.Lock()
h.Muxer.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
recorder := h.StreamManager.analyticsFactory.CreateRecorder(r)
analyticsResponseWriter := h.StreamManager.analyticsFactory.CreateResponseWriter(w, r, h.StreamID, recorder)
h.StreamManager.activityCounter.Add(1)
defer h.StreamManager.activityCounter.Add(-1)
f(analyticsResponseWriter, r)
})
h.StreamManager.routeLock.Unlock()
h.Logger.Debugf("Registered handler for path: %s", path)
}
Cognitive complexity: 3, Cyclomatic complexity: 3
func (*Manager) SetAnalyticsFactory
func (sm *Manager) SetAnalyticsFactory(factory StreamAnalyticsFactory) {
if factory == nil {
factory = &NoopStreamAnalyticsFactory{}
}
sm.analyticsFactory = factory
}
Cognitive complexity: 3, Cyclomatic complexity: 2
func (*Middleware) CreateStreamManager
CreateStreamManager creates or retrieves a stream manager based on the request.
func (s *Middleware) CreateStreamManager(r *http.Request) *Manager {
streamsConfig := s.getStreamsConfig(r)
configJSON, _ := json.Marshal(streamsConfig)
cacheKey := fmt.Sprintf("%x", sha256.Sum256(configJSON))
s.createStreamManagerLock.Lock()
defer s.createStreamManagerLock.Unlock()
s.Logger().Debug("Attempting to load stream manager from cache")
s.Logger().Debugf("Cache key: %s", cacheKey)
if cachedManager, found := s.StreamManagerCache.Load(cacheKey); found {
s.Logger().Debug("Found cached stream manager")
return cachedManager.(*Manager)
}
newManager := &Manager{
muxer: mux.NewRouter(),
mw: s,
dryRun: r == nil,
activityCounter: atomic.Int32{},
analyticsFactory: &NoopStreamAnalyticsFactory{},
}
newManager.initStreams(r, streamsConfig)
if r != nil {
s.StreamManagerCache.Store(cacheKey, newManager)
}
return newManager
}
Cognitive complexity: 7, Cyclomatic complexity: 3
func (*Middleware) EnabledForSpec
EnabledForSpec checks if streaming is enabled on the config.
func (s *Middleware) EnabledForSpec() bool {
s.Logger().Debug("Checking if streaming is enabled")
streamingConfig := s.Gw.GetConfig().Streaming
s.Logger().Debugf("Streaming config: %+v", streamingConfig)
if streamingConfig.Enabled {
s.Logger().Debug("Streaming is enabled in the config")
s.allowedUnsafe = streamingConfig.AllowUnsafe
s.Logger().Debugf("Allowed unsafe components: %v", s.allowedUnsafe)
config := s.getStreamsConfig(nil)
GlobalStreamCounter.Add(int64(len(config.Streams)))
s.Logger().Debug("Total streams count: ", len(config.Streams))
return len(config.Streams) != 0
}
s.Logger().Debug("Streaming is not enabled in the config")
return false
}
Cognitive complexity: 2, Cyclomatic complexity: 2
func (*Middleware) GC
GC removes inactive stream managers.
func (s *Middleware) GC() {
s.Logger().Debug("Starting garbage collection for inactive stream managers")
s.StreamManagerCache.Range(func(key, value interface{}) bool {
manager := value.(*Manager)
if manager == s.defaultManager {
return true
}
if manager.activityCounter.Load() <= 0 {
s.Logger().Infof("Removing inactive stream manager: %v", key)
manager.streams.Range(func(streamKey, streamValue interface{}) bool {
streamID := streamKey.(string)
err := manager.removeStream(streamID)
if err != nil {
s.Logger().WithError(err).Errorf("Error removing stream %s", streamID)
}
return true
})
s.StreamManagerCache.Delete(key)
}
return true
})
}
Cognitive complexity: 10, Cyclomatic complexity: 4
func (*Middleware) GetStreamManager
func (s *Middleware) GetStreamManager() *Manager {
return s.defaultManager
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func (*Middleware) Init
Init initializes the middleware
func (s *Middleware) Init() {
s.Logger().Debug("Initializing Middleware")
s.ctx, s.cancel = context.WithCancel(context.Background())
s.Logger().Debug("Initializing default stream manager")
s.defaultManager = s.CreateStreamManager(nil)
s.Logger().Debug("Initializing stream analytics factory")
if s.analyticsFactory == nil {
s.SetAnalyticsFactory(&NoopStreamAnalyticsFactory{})
} else {
s.SetAnalyticsFactory(s.analyticsFactory)
}
// Start garbage collection routine
go func() {
ticker := time.NewTicker(StreamGCInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.GC()
case <-s.ctx.Done():
return
}
}
}()
}
Cognitive complexity: 11, Cyclomatic complexity: 5
func (*Middleware) Logger
Logger returns a logger with middleware filled out.
func (s *Middleware) Logger() *logrus.Entry {
return s.base.Logger().WithField("mw", s.Name())
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func (*Middleware) Name
Name returns the name for the middleware.
func (s *Middleware) Name() string {
return "StreamingMiddleware"
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func (*Middleware) ProcessRequest
ProcessRequest will handle the streaming functionality.
func (s *Middleware) ProcessRequest(w http.ResponseWriter, r *http.Request, _ interface{}) (error, int) {
strippedPath := s.Spec.StripListenPath(r.URL.Path)
if !s.defaultManager.hasPath(strippedPath) {
s.Logger().Debugf("Path not found: %s", strippedPath)
return errors.New("not found"), http.StatusNotFound
}
s.Logger().Debugf("Processing request: %s, %s", r.URL.Path, strippedPath)
newRequest := &http.Request{
Method: r.Method,
URL: &url.URL{Scheme: r.URL.Scheme, Host: r.URL.Host, Path: strippedPath},
}
if !s.defaultManager.muxer.Match(newRequest, &mux.RouteMatch{}) {
return nil, http.StatusOK
}
var match mux.RouteMatch
streamManager := s.CreateStreamManager(r)
streamManager.SetAnalyticsFactory(s.analyticsFactory)
streamManager.routeLock.Lock()
streamManager.muxer.Match(newRequest, &match)
streamManager.routeLock.Unlock()
// direct Bento handler
handler, ok := match.Handler.(http.HandlerFunc)
if !ok {
return errors.New("invalid route handler"), http.StatusInternalServerError
}
streamManager.activityCounter.Add(1)
defer streamManager.activityCounter.Add(-1)
handler.ServeHTTP(w, r)
return nil, middleware.StatusRespond
}
Cognitive complexity: 10, Cyclomatic complexity: 4
func (*Middleware) SetAnalyticsFactory
func (s *Middleware) SetAnalyticsFactory(factory StreamAnalyticsFactory) {
if factory == nil {
factory = &NoopStreamAnalyticsFactory{}
}
s.analyticsFactory = factory
s.defaultManager.SetAnalyticsFactory(factory)
}
Cognitive complexity: 3, Cyclomatic complexity: 2
func (*Middleware) Unload
Unload closes and remove active streams. This method is called when the API is removed.
func (s *Middleware) Unload() {
s.Logger().Debugf("Unloading streaming middleware %s", s.Spec.Name)
totalStreams := 0
s.cancel()
// Reset cached streams and stop the underlying Bento instances
s.StreamManagerCache.Range(func(_, value interface{}) bool {
manager, ok := value.(*Manager)
if !ok {
return true
}
manager.streams.Range(func(_, streamValue interface{}) bool {
totalStreams++
s.resetStream(streamValue)
return true // continue iterating
})
return true
})
// Finally, reset the default manager and stop the underlying Bento instance
s.defaultManager.streams.Range(func(_, streamValue interface{}) bool {
totalStreams++
s.resetStream(streamValue)
return true // continue iterating
})
GlobalStreamCounter.Add(-int64(totalStreams))
s.StreamManagerCache = sync.Map{}
s.Logger().Info("All streams successfully removed")
}
Cognitive complexity: 9, Cyclomatic complexity: 2
func (*NoopStreamAnalyticsFactory) CreateRecorder
func (n *NoopStreamAnalyticsFactory) CreateRecorder(r *http.Request) StreamAnalyticsRecorder {
return &NoopStreamAnalyticsRecorder{}
}
Cognitive complexity: 1, Cyclomatic complexity: 1
func (*NoopStreamAnalyticsFactory) CreateResponseWriter
func (n *NoopStreamAnalyticsFactory) CreateResponseWriter(w http.ResponseWriter, r *http.Request, streamID string, recorder StreamAnalyticsRecorder) http.ResponseWriter {
return w
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func (*NoopStreamAnalyticsRecorder) PrepareRecord
func (n *NoopStreamAnalyticsRecorder) PrepareRecord(r *http.Request) {
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func (*NoopStreamAnalyticsRecorder) RecordHit
func (n *NoopStreamAnalyticsRecorder) RecordHit(statusCode int, latency analytics.Latency) error {
return nil
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func (*Stream) GetConfig
GetConfig returns the configuration of the stream
func (s *Stream) GetConfig() string {
return s.streamConfig
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func (*Stream) Reset
Reset stops the stream
func (s *Stream) Reset() error {
return s.Stop()
}
Cognitive complexity: 0, Cyclomatic complexity: 1
func (*Stream) SetLogger
SetLogger to be used by the stream
func (s *Stream) SetLogger(logger *logrus.Logger) {
if logger != nil {
s.log = logger
}
}
Cognitive complexity: 2, Cyclomatic complexity: 2
func (*Stream) Start
Start loads up the configuration and starts the stream. Non blocking
func (s *Stream) Start(config map[string]interface{}, mux service.HTTPMultiplexer) error {
s.log.Debugf("Starting stream")
configPayload, err := yaml.Marshal(config)
if err != nil {
s.log.Errorf("Failed to marshal config: %v", err)
return err
}
configPayload = s.removeUnsafe(configPayload)
s.log.Debugf("Building new stream")
builder := service.NewStreamBuilder()
err = builder.SetYAML(string(configPayload))
if err != nil {
s.log.Errorf("Failed to set YAML: %v", err)
return err
}
if mux != nil {
builder.SetHTTPMux(mux)
}
stream, err := builder.Build()
if err != nil {
s.log.Errorf("Failed to build stream: %v", err)
return err
}
s.streamConfig = string(configPayload)
s.stream = stream
s.log.Debugf("Stream built successfully, starting it")
errChan := make(chan error, 1)
go func() {
s.log.Infof("Starting stream")
errChan <- stream.Run(context.Background())
}()
select {
case err := <-errChan:
if err != nil {
s.log.Errorf("Stream encountered an error: %v", err)
return err
}
case <-time.After(100 * time.Millisecond):
// If no error after a short delay, assume stream started successfully
}
s.log.Debugf("Stream started successfully")
return nil
}
Cognitive complexity: 15, Cyclomatic complexity: 8
func (*Stream) Stop
Stop cleans up the stream
func (s *Stream) Stop() error {
s.log.Printf("Stopping stream")
if s.stream == nil {
s.log.Printf("No active stream to stop")
return nil
}
stopCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
errChan := make(chan error, 1)
go func() {
errChan <- s.stream.Stop(stopCtx)
}()
select {
case err := <-errChan:
if err != nil {
s.log.Printf("Error stopping stream: %v", err)
} else {
s.log.Printf("Stream stopped successfully")
}
case <-stopCtx.Done():
s.log.Printf("Timeout while stopping stream")
}
s.streamConfig = ""
s.stream = nil
return nil
}
Cognitive complexity: 10, Cyclomatic complexity: 5
Private functions
func extractHTTPServerPaths
extractHTTPServerPaths is a helper function to extract HTTP server paths from a given configuration.
extractHTTPServerPaths (config map[string]interface{}) []string
func extractPaths
Helper function to extract paths from an http_server configuration
extractPaths (httpConfig map[string]interface{}) []string
func handleBroker
handleBroker is a helper function to handle broker configurations.
handleBroker (brokerConfig map[string]interface{}) []string
func createStream
createStream creates a new stream
createStream (streamID string, config map[string]interface{}) error
References: fmt.Sprintf.
func hasPath
hasPath (path string) bool
References: strings.TrimPrefix.
func initStreams
initStreams (r *http.Request, config *StreamsConfig)
References: http.Request, http.ResponseWriter, mux.NewRouter.
func removeStream
removeStream removes a stream
removeStream (streamID string) error
References: fmt.Errorf.
func setUpOrDryRunStream
setUpOrDryRunStream (streamConfig any, streamID string)
func getStreamsConfig
getStreamsConfig (r *http.Request) *StreamsConfig
func processStreamsConfig
processStreamsConfig (r *http.Request, streams map[string]any, config *StreamsConfig)
References: json.Marshal, json.Unmarshal.
func resetStream
resetStream (streamValue any)
func removeUnsafe
removeUnsafe (yamlBytes []byte) []byte
References: fmt.Sprintf, regexp.MustCompile, regexp.QuoteMeta, strings.Contains.
Tests
Files: 1. Third party imports: 2. Imports from organisation: 0. Tests: 3. Benchmarks: 0.