Documentation
¶
Index ¶
- Variables
- func EventuallyTrue(condition func() bool, timeout time.Duration, interval time.Duration) bool
- func IsRunningInCI() bool
- func NewProductionRateLimiter() *rate.Limiter
- func NewTestRateLimiter() *rate.Limiter
- func NewUnlimitedRateLimiter() *rate.Limiter
- func RunFastTest(t *testing.T, testFunc func(*MinimalTestHelper))
- func RunHeavyTest(t *testing.T, testFunc func(*MinimalTestHelper))
- func RunMediumTest(t *testing.T, testFunc func(*MinimalTestHelper))
- func TestMain(m *testing.M)
- func WithBudgetAwareScaling(testName string) (numGoroutines, operationsPerGoroutine int, shouldSkip bool, reason string)
- func WithReliableTimeout(t *testing.T, timeout time.Duration, testFunc func(context.Context))
- func WithResourceControl(t *testing.T, testName string, testFunc func())
- func WithSequentialExecution(t *testing.T, testName string, testFunc func())
- func WithTimeout(t *testing.T, timeout time.Duration, testFunc func(context.Context))
- type AdaptiveOptimizer
- type AuthContext
- type BackpressureConfig
- type BackpressureStats
- type BufferPool
- type CompressedMessage
- type CompressionConfig
- type CompressionManager
- func (cm *CompressionManager) CompressMessage(data []byte, messageType int) (*CompressedMessage, error)
- func (cm *CompressionManager) CreateUpgrader(baseUpgrader *websocket.Upgrader) *websocket.Upgrader
- func (cm *CompressionManager) DecompressMessage(data []byte, compressed bool) ([]byte, error)
- func (cm *CompressionManager) GetCompressionExtensions() []string
- func (cm *CompressionManager) GetCompressionRatio(data []byte) float64
- func (cm *CompressionManager) GetStats() *CompressionStats
- func (cm *CompressionManager) IsCompressionBeneficial(data []byte) bool
- func (cm *CompressionManager) ResetStats()
- func (cm *CompressionManager) Shutdown()
- func (cm *CompressionManager) SupportsExtension(extension string) bool
- type CompressionMiddleware
- type CompressionStats
- type ConcurrencyConfig
- type Connection
- func (c *Connection) Close() error
- func (c *Connection) Connect(ctx context.Context) error
- func (c *Connection) Disconnect() error
- func (c *Connection) ForceConnectionCheck()
- func (c *Connection) GetHeartbeat() *HeartbeatManager
- func (c *Connection) GetLastConnected() time.Time
- func (c *Connection) GetMetrics() ConnectionMetrics
- func (c *Connection) GetReconnectAttempts() int32
- func (c *Connection) GetURL() string
- func (c *Connection) IsConnected() bool
- func (c *Connection) IsReconnecting() bool
- func (c *Connection) LastError() error
- func (c *Connection) SendMessage(ctx context.Context, message []byte) error
- func (c *Connection) SendMessageSync(ctx context.Context, message []byte) error
- func (c *Connection) SetOnConnect(handler func())
- func (c *Connection) SetOnDisconnect(handler func(error))
- func (c *Connection) SetOnError(handler func(error))
- func (c *Connection) SetOnMessage(handler func([]byte))
- func (c *Connection) StartAutoReconnect(ctx context.Context)
- func (c *Connection) State() ConnectionState
- func (c *Connection) WaitForMessages(ctx context.Context, expectedCount int64) error
- type ConnectionBudget
- type ConnectionConfig
- type ConnectionMetrics
- type ConnectionPool
- func (p *ConnectionPool) FastStop() error
- func (p *ConnectionPool) GetActiveConnectionCount() int
- func (p *ConnectionPool) GetConnection(ctx context.Context) (*Connection, error)
- func (p *ConnectionPool) GetDetailedStatus() map[string]interface{}
- func (p *ConnectionPool) GetHealthyConnectionCount() int
- func (p *ConnectionPool) SendMessage(ctx context.Context, message []byte) error
- func (p *ConnectionPool) SetMessageHandler(handler func(data []byte))
- func (p *ConnectionPool) SetOnConnectionStateChange(handler func(connID string, state ConnectionState))
- func (p *ConnectionPool) SetOnHealthChange(handler func(connID string, healthy bool))
- func (p *ConnectionPool) Start(ctx context.Context) error
- func (p *ConnectionPool) Stats() PoolStats
- func (p *ConnectionPool) Stop() error
- type ConnectionPoolManager
- type ConnectionSlot
- type ConnectionState
- type DropActionType
- type EventHandler
- type EventHandlerWrapper
- type GoroutineInfo
- type GoroutineStats
- type HealthChecker
- type HeartbeatManager
- func (h *HeartbeatManager) GetConnectionHealth() float64
- func (h *HeartbeatManager) GetDetailedHealthStatus() map[string]interface{}
- func (h *HeartbeatManager) GetLastPingTime() time.Time
- func (h *HeartbeatManager) GetLastPongTime() time.Time
- func (h *HeartbeatManager) GetMissedPongCount() int32
- func (h *HeartbeatManager) GetPingPeriod() time.Duration
- func (h *HeartbeatManager) GetPongWait() time.Duration
- func (h *HeartbeatManager) GetState() HeartbeatState
- func (h *HeartbeatManager) GetStats() HeartbeatStats
- func (h *HeartbeatManager) IsHealthy() bool
- func (h *HeartbeatManager) OnPong()
- func (h *HeartbeatManager) Reset()
- func (h *HeartbeatManager) SetPingPeriod(period time.Duration)
- func (h *HeartbeatManager) SetPongWait(wait time.Duration)
- func (h *HeartbeatManager) Start(ctx context.Context)
- func (h *HeartbeatManager) Stop()
- type HeartbeatState
- type HeartbeatStats
- type ImprovedTransport
- func (t *ImprovedTransport) AddEventHandler(eventType string, handler EventHandler) string
- func (t *ImprovedTransport) GetStats() TransportStats
- func (t *ImprovedTransport) RemoveEventHandler(eventType string, handlerID string) error
- func (t *ImprovedTransport) SendEvent(ctx context.Context, event events.Event) error
- func (t *ImprovedTransport) Start(ctx context.Context) error
- func (t *ImprovedTransport) Stop() error
- func (t *ImprovedTransport) Subscribe(ctx context.Context, eventTypes []string, handler EventHandler) (*Subscription, error)
- func (t *ImprovedTransport) Unsubscribe(subscriptionID string) error
- type IsolatedTestRunner
- type JWTConfigExample
- type JWTTokenValidator
- func NewJWTTokenValidator(secretKey []byte, issuer string) *JWTTokenValidator
- func NewJWTTokenValidatorRSA(publicKey *rsa.PublicKey, issuer, audience string) *JWTTokenValidator
- func NewJWTTokenValidatorWithOptions(secretKey []byte, issuer, audience string, signingMethod jwt.SigningMethod) *JWTTokenValidator
- type LoadBalancingStrategy
- type LoadTestServer
- type MemoryManager
- func (mm *MemoryManager) AllocateBuffer(size int) []byte
- func (mm *MemoryManager) DeallocateBuffer(buf []byte)
- func (mm *MemoryManager) GetCurrentUsage() int64
- func (mm *MemoryManager) GetMemoryPressure() float64
- func (mm *MemoryManager) GetMonitoringInterval() time.Duration
- func (mm *MemoryManager) GetStats() map[string]int64
- func (mm *MemoryManager) Start(ctx context.Context, wg *sync.WaitGroup)
- func (mm *MemoryManager) TriggerCheck()
- type MessageBatcher
- type MessageSerializer
- type MetricsCollector
- func (mc *MetricsCollector) GetMetrics() *PerformanceMetrics
- func (mc *MetricsCollector) Start(ctx context.Context, wg *sync.WaitGroup)
- func (mc *MetricsCollector) TrackConnectionTime(duration time.Duration)
- func (mc *MetricsCollector) TrackError(errorType string)
- func (mc *MetricsCollector) TrackMessageLatency(latency time.Duration)
- func (mc *MetricsCollector) TrackMessageSize(size int)
- func (mc *MetricsCollector) TrackSerializationTime(duration time.Duration)
- type MinimalTestHelper
- type MinimalTestServer
- type MockEvent
- func (e *MockEvent) GetBaseEvent() *events.BaseEvent
- func (e *MockEvent) ID() string
- func (e *MockEvent) RunID() string
- func (e *MockEvent) SetTimestamp(timestamp int64)
- func (e *MockEvent) ThreadID() string
- func (e *MockEvent) Timestamp() *int64
- func (e *MockEvent) ToJSON() ([]byte, error)
- func (e *MockEvent) ToProtobuf() (*generated.Event, error)
- func (e *MockEvent) Type() events.EventType
- func (e *MockEvent) Validate() error
- type NoOpWSAuditLogger
- type OptimizedConnectionSlot
- type OptimizedTransport
- func (t *OptimizedTransport) AddEventHandlerOptimized(eventType string, handler EventHandler) string
- func (t *OptimizedTransport) RemoveEventHandlerOptimized(eventType string, handlerID string) error
- func (t *OptimizedTransport) SubscribeOptimized(ctx context.Context, eventTypes []string, handler EventHandler) (*Subscription, error)
- type PerfJSONSerializer
- type PerfOptimizedJSONSerializer
- type PerfProtobufSerializer
- type PerformanceConfig
- type PerformanceManager
- func (pm *PerformanceManager) BatchMessage(data []byte) error
- func (pm *PerformanceManager) FastStop() error
- func (pm *PerformanceManager) GetBuffer() []byte
- func (pm *PerformanceManager) GetConnectionSlot(ctx context.Context) (*ConnectionSlot, error)
- func (pm *PerformanceManager) GetMemoryUsage() int64
- func (pm *PerformanceManager) GetMetrics() *PerformanceMetrics
- func (pm *PerformanceManager) OptimizeMessage(event events.Event) ([]byte, error)
- func (pm *PerformanceManager) PutBuffer(buf []byte)
- func (pm *PerformanceManager) ReleaseConnectionSlot(slot *ConnectionSlot)
- func (pm *PerformanceManager) Start(ctx context.Context) error
- func (pm *PerformanceManager) Stop() error
- type PerformanceMetrics
- type PerformanceOptimizer
- type PoolConfig
- type PoolStats
- type Profiler
- type ReliableConnectionTester
- func (rt *ReliableConnectionTester) Cleanup()
- func (rt *ReliableConnectionTester) CreateConnection() *Connection
- func (rt *ReliableConnectionTester) TestConcurrentConnections(numConnections int, testFunc func(int, *Connection))
- func (rt *ReliableConnectionTester) TestConnection(testFunc func(*Connection))
- type ReliableMessageTester
- type ReliableTestServer
- func (s *ReliableTestServer) Close()
- func (s *ReliableTestServer) GetConnectionCount() int
- func (s *ReliableTestServer) GetStats() (connections, messages, errors int64)
- func (s *ReliableTestServer) SetDelay(delay time.Duration)
- func (s *ReliableTestServer) SetEchoMode(enabled bool)
- func (s *ReliableTestServer) URL() string
- type ResourceCleanupConfig
- type ResourceLimitedTest
- type SafeConnection
- func (sc *SafeConnection) Close() error
- func (sc *SafeConnection) IsClosed() bool
- func (sc *SafeConnection) ReadMessage() (messageType int, p []byte, err error)
- func (sc *SafeConnection) SetReadDeadline(t time.Time) error
- func (sc *SafeConnection) SetWriteDeadline(t time.Time) error
- func (sc *SafeConnection) WriteMessage(messageType int, data []byte) error
- type SecureConnection
- type SecurityConfig
- type SecurityEvent
- type SecurityManager
- func (sm *SecurityManager) CreateUpgrader() *websocket.Upgrader
- func (sm *SecurityManager) GetStats() map[string]interface{}
- func (sm *SecurityManager) SecureConnection(conn *websocket.Conn, authContext *AuthContext, r *http.Request) *SecureConnection
- func (sm *SecurityManager) Shutdown()
- func (sm *SecurityManager) ValidateUpgrade(w http.ResponseWriter, r *http.Request) (*AuthContext, error)
- type SequencedTestRunner
- type SerializerFactory
- type SerializerType
- type StaleConnectionError
- type Subscription
- type TestCategory
- type TestCleanupHelper
- func (h *TestCleanupHelper) CleanupAll()
- func (h *TestCleanupHelper) RegisterCleanupFunc(f func() error)
- func (h *TestCleanupHelper) RegisterConnection(conn *Connection)
- func (h *TestCleanupHelper) RegisterServer(server *ReliableTestServer)
- func (h *TestCleanupHelper) RegisterTransport(transport *Transport)
- type TestConfig
- type TestConnectionManager
- type TestResourceManager
- func (trm *TestResourceManager) AcquireBudget(testName string, requestedGoroutines int64) bool
- func (trm *TestResourceManager) AcquireHeavyTestSlot(t *testing.T, testName string) func()
- func (trm *TestResourceManager) GetBudgetStatus() (active, max int64, utilizationPercent float64)
- func (trm *TestResourceManager) ReleaseBudget(testName string)
- type TokenValidator
- type Transport
- func (t *Transport) AddEventHandler(eventType string, handler EventHandler) string
- func (t *Transport) CleanupEventHandlers()
- func (t *Transport) Close(ctx context.Context) error
- func (t *Transport) EnableAdaptiveOptimization()
- func (t *Transport) FastStop() error
- func (t *Transport) GetActiveConnectionCount() int
- func (t *Transport) GetBackpressureStats() BackpressureStats
- func (t *Transport) GetConnectionPoolStats() PoolStats
- func (t *Transport) GetDetailedStatus() map[string]interface{}
- func (t *Transport) GetEventHandlerCount() int
- func (t *Transport) GetEventHandlerStats() map[string]interface{}
- func (t *Transport) GetGoroutineStats() map[string]GoroutineStats
- func (t *Transport) GetHealthyConnectionCount() int
- func (t *Transport) GetMemoryUsage() int64
- func (t *Transport) GetPerformanceMetrics() *PerformanceMetrics
- func (t *Transport) GetSubscription(subscriptionID string) (*Subscription, error)
- func (t *Transport) IsConnected() bool
- func (t *Transport) ListSubscriptions() []*Subscription
- func (t *Transport) OnMemoryPressure(level int)
- func (t *Transport) OptimizeForLatency()
- func (t *Transport) OptimizeForMemory()
- func (t *Transport) OptimizeForThroughput()
- func (t *Transport) Ping(ctx context.Context) error
- func (t *Transport) RegisterCleanupFunc(cleanup func() error)
- func (t *Transport) RemoveEventHandler(eventType string, handlerID string) error
- func (t *Transport) ResetBackpressureStats()
- func (t *Transport) ResetForTesting() error
- func (t *Transport) SendEvent(ctx context.Context, event events.Event) error
- func (t *Transport) Start(ctx context.Context) error
- func (t *Transport) Stats() TransportStats
- func (t *Transport) Stop() error
- func (t *Transport) Subscribe(ctx context.Context, eventTypes []string, handler EventHandler) (*Subscription, error)
- func (t *Transport) Unsubscribe(subscriptionID string) error
- type TransportConfig
- type TransportStats
- type WSAuditLogger
- type ZeroCopyBuffer
Constants ¶
This section is empty.
Variables ¶
var ( ErrTokenExpired = errors.New("token is expired") ErrTokenNotYetValid = errors.New("token is not valid yet") ErrInvalidToken = errors.New("invalid token") ErrEmptyToken = errors.New("empty token") ErrInvalidIssuer = errors.New("invalid issuer") ErrInvalidAudience = errors.New("invalid audience") ErrMissingClaims = errors.New("missing required claims") )
Common JWT validation errors
Functions ¶
func EventuallyTrue ¶
EventuallyTrue is a helper that checks a condition repeatedly
func IsRunningInCI ¶
func IsRunningInCI() bool
IsRunningInCI detects if we're running in a CI environment
func NewProductionRateLimiter ¶
NewProductionRateLimiter creates a rate limiter suitable for production use Allows 100 messages per second with burst of 10
func NewTestRateLimiter ¶
NewTestRateLimiter creates a rate limiter suitable for testing scenarios Allows 10,000 messages per second with burst of 1000 for high concurrency tests
func NewUnlimitedRateLimiter ¶
NewUnlimitedRateLimiter creates a rate limiter with no practical limits Useful for load testing where rate limiting is not the focus
func RunFastTest ¶
func RunFastTest(t *testing.T, testFunc func(*MinimalTestHelper))
RunFastTest runs a light test with minimal overhead
func RunHeavyTest ¶
func RunHeavyTest(t *testing.T, testFunc func(*MinimalTestHelper))
RunHeavyTest runs a resource-intensive test with proper sequencing
func RunMediumTest ¶
func RunMediumTest(t *testing.T, testFunc func(*MinimalTestHelper))
RunMediumTest runs a medium-complexity test with moderate resources
func WithBudgetAwareScaling ¶
func WithBudgetAwareScaling(testName string) (numGoroutines, operationsPerGoroutine int, shouldSkip bool, reason string)
WithBudgetAwareScaling provides budget-aware resource scaling for tests that don't use WithResourceControl This is useful for tests that want to participate in resource budgeting without full resource control
func WithReliableTimeout ¶
WithReliableTimeout wraps a test function with enhanced timeout, cleanup, and goroutine tracking
func WithResourceControl ¶
WithResourceControl wraps resource-intensive tests with resource management
func WithSequentialExecution ¶
WithSequentialExecution ensures that resource-intensive tests run one at a time This prevents resource contention when running the full test suite
Types ¶
type AdaptiveOptimizer ¶
type AdaptiveOptimizer struct {
// contains filtered or unexported fields
}
AdaptiveOptimizer automatically adjusts settings based on current performance
func NewAdaptiveOptimizer ¶
func NewAdaptiveOptimizer(manager *PerformanceManager) *AdaptiveOptimizer
NewAdaptiveOptimizer creates a new adaptive optimizer
func (*AdaptiveOptimizer) Disable ¶
func (ao *AdaptiveOptimizer) Disable()
Disable disables adaptive optimization
func (*AdaptiveOptimizer) Enable ¶
func (ao *AdaptiveOptimizer) Enable()
Enable enables adaptive optimization
func (*AdaptiveOptimizer) Start ¶
func (ao *AdaptiveOptimizer) Start(ctx context.Context, wg *sync.WaitGroup)
Start starts the adaptive optimizer
func (*AdaptiveOptimizer) TriggerAdaptation ¶
func (ao *AdaptiveOptimizer) TriggerAdaptation()
TriggerAdaptation manually triggers the adaptation process for testing
type AuthContext ¶
type AuthContext struct {
UserID string `json:"user_id"`
Username string `json:"username"`
Roles []string `json:"roles"`
Permissions []string `json:"permissions"`
ExpiresAt time.Time `json:"expires_at"`
Claims map[string]interface{} `json:"claims"`
}
AuthContext contains authentication information
type BackpressureConfig ¶
type BackpressureConfig struct {
// EventChannelBuffer is the buffer size for event channel
EventChannelBuffer int
// MaxDroppedEvents is the maximum number of events that can be dropped before taking action
MaxDroppedEvents int64
// DropActionType defines what to do when max dropped events is reached
DropActionType DropActionType
// EnableBackpressureLogging enables detailed logging of backpressure events
EnableBackpressureLogging bool
// BackpressureThresholdPercent is the percentage at which to start applying backpressure (0-100)
BackpressureThresholdPercent int
// EnableChannelMonitoring enables monitoring of channel usage
EnableChannelMonitoring bool
// MonitoringInterval is the interval for channel monitoring
MonitoringInterval time.Duration
}
BackpressureConfig configures backpressure behavior for WebSocket transport
func DefaultBackpressureConfig ¶
func DefaultBackpressureConfig() *BackpressureConfig
DefaultBackpressureConfig returns default backpressure configuration for WebSocket
type BackpressureStats ¶
type BackpressureStats struct {
DroppedEvents int64 `json:"dropped_events"`
BackpressureActive bool `json:"backpressure_active"`
EventChannelUsage float64 `json:"event_channel_usage_percent"`
LastDropTime time.Time `json:"last_drop_time"`
EventChannelCapacity int `json:"event_channel_capacity"`
ThresholdPercent int `json:"threshold_percent"`
}
BackpressureStats contains backpressure monitoring statistics
type BufferPool ¶
type BufferPool struct {
// contains filtered or unexported fields
}
BufferPool manages a pool of reusable buffers
func NewBufferPool ¶
func NewBufferPool(poolSize, maxSize int) *BufferPool
NewBufferPool creates a new buffer pool
func (*BufferPool) GetStats ¶
func (bp *BufferPool) GetStats() map[string]int64
GetStats returns buffer pool statistics
func (*BufferPool) Reset ¶
func (bp *BufferPool) Reset()
Reset clears all buffers from the pool for test isolation
type CompressedMessage ¶
type CompressedMessage struct {
Data []byte
OriginalSize int64
CompressedSize int64
CompressionRatio float64
IsCompressed bool
Extension string
}
CompressedMessage represents a compressed WebSocket message
type CompressionConfig ¶
type CompressionConfig struct {
// Enable compression
Enabled bool `json:"enabled"`
// Compression levels (0-9, where 0 is no compression, 9 is best compression)
CompressionLevel int `json:"compression_level"`
// Compression threshold - messages smaller than this won't be compressed
CompressionThreshold int64 `json:"compression_threshold"`
// Maximum compression ratio - reject messages that compress poorly
MaxCompressionRatio float64 `json:"max_compression_ratio"`
// Memory limits
MaxMemoryUsage int64 `json:"max_memory_usage"`
WindowBits int `json:"window_bits"` // LZ77 sliding window size
MemLevel int `json:"mem_level"` // Memory level for compression
// Performance settings
UsePooledCompressors bool `json:"use_pooled_compressors"`
CompressorPoolSize int `json:"compressor_pool_size"`
// Fallback settings
FallbackEnabled bool `json:"fallback_enabled"` // Allow fallback to uncompressed
AutoDetectSupport bool `json:"auto_detect_support"` // Auto-detect client support
// Monitoring
CollectStatistics bool `json:"collect_statistics"`
StatisticsInterval time.Duration `json:"statistics_interval"`
}
CompressionConfig defines compression settings
func DefaultCompressionConfig ¶
func DefaultCompressionConfig() *CompressionConfig
DefaultCompressionConfig returns default compression settings
type CompressionManager ¶
type CompressionManager struct {
// contains filtered or unexported fields
}
CompressionManager handles WebSocket message compression
func NewCompressionManager ¶
func NewCompressionManager(config *CompressionConfig) *CompressionManager
NewCompressionManager creates a new compression manager
func (*CompressionManager) CompressMessage ¶
func (cm *CompressionManager) CompressMessage(data []byte, messageType int) (*CompressedMessage, error)
CompressMessage compresses a message if appropriate
func (*CompressionManager) CreateUpgrader ¶
func (cm *CompressionManager) CreateUpgrader(baseUpgrader *websocket.Upgrader) *websocket.Upgrader
CreateUpgrader creates an upgrader with compression support
func (*CompressionManager) DecompressMessage ¶
func (cm *CompressionManager) DecompressMessage(data []byte, compressed bool) ([]byte, error)
DecompressMessage decompresses a message if needed
func (*CompressionManager) GetCompressionExtensions ¶
func (cm *CompressionManager) GetCompressionExtensions() []string
GetCompressionExtensions returns supported compression extensions
func (*CompressionManager) GetCompressionRatio ¶
func (cm *CompressionManager) GetCompressionRatio(data []byte) float64
GetCompressionRatio estimates compression ratio for data
func (*CompressionManager) GetStats ¶
func (cm *CompressionManager) GetStats() *CompressionStats
GetStats returns compression statistics
func (*CompressionManager) IsCompressionBeneficial ¶
func (cm *CompressionManager) IsCompressionBeneficial(data []byte) bool
IsCompressionBeneficial determines if compression would be beneficial
func (*CompressionManager) ResetStats ¶
func (cm *CompressionManager) ResetStats()
ResetStats resets compression statistics
func (*CompressionManager) Shutdown ¶
func (cm *CompressionManager) Shutdown()
Shutdown gracefully shuts down the compression manager
func (*CompressionManager) SupportsExtension ¶
func (cm *CompressionManager) SupportsExtension(extension string) bool
SupportsExtension checks if an extension is supported
type CompressionMiddleware ¶
type CompressionMiddleware struct {
// contains filtered or unexported fields
}
CompressionMiddleware wraps a WebSocket connection with compression
func NewCompressionMiddleware ¶
func NewCompressionMiddleware(conn *websocket.Conn, manager *CompressionManager) *CompressionMiddleware
NewCompressionMiddleware creates a new compression middleware
func (*CompressionMiddleware) ReadMessage ¶
func (cm *CompressionMiddleware) ReadMessage() (messageType int, p []byte, err error)
ReadMessage reads a message with decompression
func (*CompressionMiddleware) WriteMessage ¶
func (cm *CompressionMiddleware) WriteMessage(messageType int, data []byte) error
WriteMessage writes a message with compression
type CompressionStats ¶
type CompressionStats struct {
// Message counts
TotalMessages int64 `json:"total_messages"`
CompressedMessages int64 `json:"compressed_messages"`
UncompressedMessages int64 `json:"uncompressed_messages"`
FailedCompressions int64 `json:"failed_compressions"`
// Byte counts
TotalBytesIn int64 `json:"total_bytes_in"`
TotalBytesOut int64 `json:"total_bytes_out"`
BytesSaved int64 `json:"bytes_saved"`
// Performance metrics
CompressionTime time.Duration `json:"compression_time"`
DecompressionTime time.Duration `json:"decompression_time"`
AverageCompressionRatio float64 `json:"average_compression_ratio"`
// Error tracking
CompressionErrors int64 `json:"compression_errors"`
DecompressionErrors int64 `json:"decompression_errors"`
// Memory usage
CurrentMemoryUsage int64 `json:"current_memory_usage"`
PeakMemoryUsage int64 `json:"peak_memory_usage"`
// contains filtered or unexported fields
}
CompressionStats holds compression statistics
type ConcurrencyConfig ¶
type ConcurrencyConfig struct {
NumGoroutines int
OperationsPerRoutine int
TimeoutScale float64
EnableSequential bool
}
getConcurrencyConfig returns optimized concurrency configuration for tests
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection represents a managed WebSocket connection
func CreateIsolatedConnection ¶
func CreateIsolatedConnection(t *testing.T, helper *TestCleanupHelper, config *ConnectionConfig) *Connection
CreateIsolatedConnection creates a connection with proper cleanup registration
func NewConnection ¶
func NewConnection(config *ConnectionConfig) (*Connection, error)
NewConnection creates a new managed WebSocket connection
func NewConnectionWithContext ¶
func NewConnectionWithContext(parentCtx context.Context, config *ConnectionConfig) (*Connection, error)
NewConnectionWithContext creates a new managed WebSocket connection with a parent context
func (*Connection) Close ¶
func (c *Connection) Close() error
Close permanently closes the connection and stops all goroutines
func (*Connection) Connect ¶
func (c *Connection) Connect(ctx context.Context) error
Connect establishes a WebSocket connection
func (*Connection) Disconnect ¶
func (c *Connection) Disconnect() error
Disconnect closes the WebSocket connection
func (*Connection) ForceConnectionCheck ¶
func (c *Connection) ForceConnectionCheck()
ForceConnectionCheck forces a connection check to detect disconnection
func (*Connection) GetHeartbeat ¶
func (c *Connection) GetHeartbeat() *HeartbeatManager
GetHeartbeat returns the heartbeat manager for this connection
func (*Connection) GetLastConnected ¶
func (c *Connection) GetLastConnected() time.Time
GetLastConnected returns the timestamp of the last successful connection
func (*Connection) GetMetrics ¶
func (c *Connection) GetMetrics() ConnectionMetrics
GetMetrics returns a copy of the connection metrics
func (*Connection) GetReconnectAttempts ¶
func (c *Connection) GetReconnectAttempts() int32
GetReconnectAttempts returns the current number of reconnection attempts
func (*Connection) IsConnected ¶
func (c *Connection) IsConnected() bool
IsConnected returns true if the connection is currently connected
func (*Connection) IsReconnecting ¶
func (c *Connection) IsReconnecting() bool
IsReconnecting returns true if the connection is currently reconnecting
func (*Connection) LastError ¶
func (c *Connection) LastError() error
LastError returns the last error encountered
func (*Connection) SendMessage ¶
func (c *Connection) SendMessage(ctx context.Context, message []byte) error
SendMessage sends a message through the WebSocket connection
func (*Connection) SendMessageSync ¶
func (c *Connection) SendMessageSync(ctx context.Context, message []byte) error
SendMessageSync sends a message synchronously and waits for it to be processed
func (*Connection) SetOnConnect ¶
func (c *Connection) SetOnConnect(handler func())
SetOnConnect sets the connect handler
func (*Connection) SetOnDisconnect ¶
func (c *Connection) SetOnDisconnect(handler func(error))
SetOnDisconnect sets the disconnect handler
func (*Connection) SetOnError ¶
func (c *Connection) SetOnError(handler func(error))
SetOnError sets the error handler
func (*Connection) SetOnMessage ¶
func (c *Connection) SetOnMessage(handler func([]byte))
SetOnMessage sets the message handler
func (*Connection) StartAutoReconnect ¶
func (c *Connection) StartAutoReconnect(ctx context.Context)
StartAutoReconnect starts automatic reconnection handling
func (*Connection) State ¶
func (c *Connection) State() ConnectionState
State returns the current connection state
func (*Connection) WaitForMessages ¶
func (c *Connection) WaitForMessages(ctx context.Context, expectedCount int64) error
WaitForMessages waits for all pending messages to be processed
type ConnectionBudget ¶
type ConnectionBudget struct {
// contains filtered or unexported fields
}
ConnectionBudget manages connection creation to prevent resource exhaustion
func NewConnectionBudget ¶
func NewConnectionBudget(maxConnections int) *ConnectionBudget
NewConnectionBudget creates a connection budget manager
func (*ConnectionBudget) AcquireConnection ¶
func (c *ConnectionBudget) AcquireConnection(ctx context.Context) error
AcquireConnection attempts to acquire a connection slot
func (*ConnectionBudget) GetActiveCount ¶
func (c *ConnectionBudget) GetActiveCount() int
GetActiveCount returns the number of active connections
func (*ConnectionBudget) ReleaseConnection ¶
func (c *ConnectionBudget) ReleaseConnection()
ReleaseConnection releases a connection slot
type ConnectionConfig ¶
type ConnectionConfig struct {
// URL is the WebSocket server URL
URL string
// MaxReconnectAttempts is the maximum number of reconnection attempts
// Set to 0 for unlimited retries
MaxReconnectAttempts int
// InitialReconnectDelay is the initial delay between reconnection attempts
InitialReconnectDelay time.Duration
// MaxReconnectDelay is the maximum delay between reconnection attempts
MaxReconnectDelay time.Duration
// ReconnectBackoffMultiplier is the multiplier for exponential backoff
ReconnectBackoffMultiplier float64
// DialTimeout is the timeout for establishing the connection
DialTimeout time.Duration
// HandshakeTimeout is the timeout for the WebSocket handshake
HandshakeTimeout time.Duration
// ReadTimeout is the timeout for reading messages
ReadTimeout time.Duration
// WriteTimeout is the timeout for writing messages
WriteTimeout time.Duration
// PingPeriod is the period between ping messages
PingPeriod time.Duration
// PongWait is the timeout for receiving pong messages
PongWait time.Duration
// MaxMessageSize is the maximum size of messages
MaxMessageSize int64
// WriteBufferSize is the size of the write buffer
WriteBufferSize int
// ReadBufferSize is the size of the read buffer
ReadBufferSize int
// EnableCompression enables message compression
EnableCompression bool
// Headers are additional headers to send during handshake
Headers map[string]string
// RateLimiter limits the rate of outgoing messages
RateLimiter *rate.Limiter
// Logger is the logger instance
Logger *zap.Logger
// TLSClientConfig is the TLS configuration for secure WebSocket connections
// If nil, default TLS settings are used
TLSClientConfig *tls.Config
}
ConnectionConfig contains configuration for WebSocket connections
func DefaultConnectionConfig ¶
func DefaultConnectionConfig() *ConnectionConfig
DefaultConnectionConfig returns a default configuration for WebSocket connections Uses configurable timeouts that adapt to test/production environments
type ConnectionMetrics ¶
type ConnectionMetrics struct {
ConnectAttempts int64
SuccessfulConnects int64
Disconnects int64
ReconnectAttempts int64
MessagesReceived int64
MessagesSent int64
BytesReceived int64
BytesSent int64
Errors int64
LastConnected time.Time
LastDisconnected time.Time
// contains filtered or unexported fields
}
ConnectionMetrics tracks connection statistics
type ConnectionPool ¶
type ConnectionPool struct {
// contains filtered or unexported fields
}
ConnectionPool manages a pool of WebSocket connections with load balancing
func NewConnectionPool ¶
func NewConnectionPool(config *PoolConfig) (*ConnectionPool, error)
NewConnectionPool creates a new connection pool
func (*ConnectionPool) FastStop ¶
func (p *ConnectionPool) FastStop() error
FastStop provides immediate shutdown for test scenarios
func (*ConnectionPool) GetActiveConnectionCount ¶
func (p *ConnectionPool) GetActiveConnectionCount() int
GetActiveConnectionCount returns the number of active connections
func (*ConnectionPool) GetConnection ¶
func (p *ConnectionPool) GetConnection(ctx context.Context) (*Connection, error)
GetConnection returns a connection based on the load balancing strategy
func (*ConnectionPool) GetDetailedStatus ¶
func (p *ConnectionPool) GetDetailedStatus() map[string]interface{}
GetDetailedStatus returns detailed status of all connections
func (*ConnectionPool) GetHealthyConnectionCount ¶
func (p *ConnectionPool) GetHealthyConnectionCount() int
GetHealthyConnectionCount returns the number of healthy connections
func (*ConnectionPool) SendMessage ¶
func (p *ConnectionPool) SendMessage(ctx context.Context, message []byte) error
SendMessage sends a message through the pool using load balancing
func (*ConnectionPool) SetMessageHandler ¶
func (p *ConnectionPool) SetMessageHandler(handler func(data []byte))
SetMessageHandler sets the message handler for all connections
func (*ConnectionPool) SetOnConnectionStateChange ¶
func (p *ConnectionPool) SetOnConnectionStateChange(handler func(connID string, state ConnectionState))
SetOnConnectionStateChange sets the connection state change handler
func (*ConnectionPool) SetOnHealthChange ¶
func (p *ConnectionPool) SetOnHealthChange(handler func(connID string, healthy bool))
SetOnHealthChange sets the health change handler
func (*ConnectionPool) Start ¶
func (p *ConnectionPool) Start(ctx context.Context) error
Start initializes the connection pool and establishes minimum connections
func (*ConnectionPool) Stats ¶
func (p *ConnectionPool) Stats() PoolStats
GetStats returns a copy of the pool statistics
func (*ConnectionPool) Stop ¶
func (p *ConnectionPool) Stop() error
Stop gracefully shuts down the connection pool
type ConnectionPoolManager ¶
type ConnectionPoolManager struct {
// contains filtered or unexported fields
}
ConnectionPoolManager manages connection slots for optimal resource usage
func NewConnectionPoolManager ¶
func NewConnectionPoolManager(maxConnections int) *ConnectionPoolManager
NewConnectionPoolManager creates a new connection pool manager
func (*ConnectionPoolManager) AcquireSlot ¶
func (cpm *ConnectionPoolManager) AcquireSlot(ctx context.Context) (*ConnectionSlot, error)
AcquireSlot acquires a connection slot
func (*ConnectionPoolManager) GetStats ¶
func (cpm *ConnectionPoolManager) GetStats() map[string]int64
GetStats returns connection pool manager statistics
func (*ConnectionPoolManager) ReleaseSlot ¶
func (cpm *ConnectionPoolManager) ReleaseSlot(slot *ConnectionSlot)
ReleaseSlot releases a connection slot
type ConnectionSlot ¶
type ConnectionSlot struct {
ID string
AcquiredAt time.Time
InUse bool
// contains filtered or unexported fields
}
ConnectionSlot represents a connection slot
type ConnectionState ¶
type ConnectionState int32
ConnectionState represents the current state of a WebSocket connection
const ( // StateDisconnected indicates the connection is not active StateDisconnected ConnectionState = iota // StateConnecting indicates connection is being established StateConnecting // StateConnected indicates connection is active and healthy StateConnected // StateReconnecting indicates connection is being re-established StateReconnecting // StateClosing indicates connection is being closed StateClosing // StateClosed indicates connection is permanently closed StateClosed )
func (ConnectionState) String ¶
func (s ConnectionState) String() string
String returns the string representation of the connection state
type DropActionType ¶
type DropActionType int
DropActionType defines actions to take when events are dropped
const ( // DropActionLog logs dropped events but continues DropActionLog DropActionType = iota // DropActionReconnect attempts to reconnect DropActionReconnect // DropActionStop stops the transport DropActionStop // DropActionSlowDown applies flow control DropActionSlowDown )
type EventHandler ¶
EventHandler represents a function that handles events
type EventHandlerWrapper ¶
type EventHandlerWrapper struct {
ID string
Handler EventHandler
}
EventHandlerWrapper wraps an event handler with a unique ID
type GoroutineInfo ¶
type GoroutineInfo struct {
Name string
StartTime time.Time
LastSeen time.Time
Function string
Context context.Context
Cancel context.CancelFunc
}
GoroutineInfo tracks information about active goroutines
type GoroutineStats ¶
type GoroutineStats struct {
Name string `json:"name"`
StartTime time.Time `json:"start_time"`
LastSeen time.Time `json:"last_seen"`
Duration time.Duration `json:"duration"`
IdleTime time.Duration `json:"idle_time"`
}
GoroutineStats contains goroutine monitoring statistics
type HealthChecker ¶
type HealthChecker struct {
// contains filtered or unexported fields
}
HealthChecker monitors connection health and manages pool size
func NewHealthChecker ¶
func NewHealthChecker(pool *ConnectionPool, interval time.Duration) *HealthChecker
NewHealthChecker creates a new health checker
type HeartbeatManager ¶
type HeartbeatManager struct {
// contains filtered or unexported fields
}
HeartbeatManager manages the ping/pong heartbeat mechanism for WebSocket connections
func NewHeartbeatManager ¶
func NewHeartbeatManager(connection *Connection, pingPeriod, pongWait time.Duration) *HeartbeatManager
NewHeartbeatManager creates a new heartbeat manager
func (*HeartbeatManager) GetConnectionHealth ¶
func (h *HeartbeatManager) GetConnectionHealth() float64
GetConnectionHealth returns a health score between 0 and 1 1 = perfectly healthy, 0 = completely unhealthy
func (*HeartbeatManager) GetDetailedHealthStatus ¶
func (h *HeartbeatManager) GetDetailedHealthStatus() map[string]interface{}
GetDetailedHealthStatus returns detailed health information
func (*HeartbeatManager) GetLastPingTime ¶
func (h *HeartbeatManager) GetLastPingTime() time.Time
GetLastPingTime returns the timestamp of the last sent ping
func (*HeartbeatManager) GetLastPongTime ¶
func (h *HeartbeatManager) GetLastPongTime() time.Time
GetLastPongTime returns the timestamp of the last received pong
func (*HeartbeatManager) GetMissedPongCount ¶
func (h *HeartbeatManager) GetMissedPongCount() int32
GetMissedPongCount returns the number of consecutive missed pongs
func (*HeartbeatManager) GetPingPeriod ¶
func (h *HeartbeatManager) GetPingPeriod() time.Duration
GetPingPeriod returns the ping period
func (*HeartbeatManager) GetPongWait ¶
func (h *HeartbeatManager) GetPongWait() time.Duration
GetPongWait returns the pong wait timeout
func (*HeartbeatManager) GetState ¶
func (h *HeartbeatManager) GetState() HeartbeatState
GetState returns the current heartbeat state
func (*HeartbeatManager) GetStats ¶
func (h *HeartbeatManager) GetStats() HeartbeatStats
GetStats returns a copy of the heartbeat statistics
func (*HeartbeatManager) IsHealthy ¶
func (h *HeartbeatManager) IsHealthy() bool
IsHealthy returns true if the connection is considered healthy
func (*HeartbeatManager) OnPong ¶
func (h *HeartbeatManager) OnPong()
OnPong is called when a pong message is received
func (*HeartbeatManager) Reset ¶
func (h *HeartbeatManager) Reset()
Reset resets the heartbeat state
func (*HeartbeatManager) SetPingPeriod ¶
func (h *HeartbeatManager) SetPingPeriod(period time.Duration)
SetPingPeriod updates the ping period
func (*HeartbeatManager) SetPongWait ¶
func (h *HeartbeatManager) SetPongWait(wait time.Duration)
SetPongWait updates the pong wait timeout
func (*HeartbeatManager) Start ¶
func (h *HeartbeatManager) Start(ctx context.Context)
Start begins the heartbeat mechanism
func (*HeartbeatManager) Stop ¶
func (h *HeartbeatManager) Stop()
Stop stops the heartbeat mechanism with enhanced aggressive shutdown timeout
type HeartbeatState ¶
type HeartbeatState int32
HeartbeatState represents the state of the heartbeat mechanism
const ( // HeartbeatStopped indicates the heartbeat is not running HeartbeatStopped HeartbeatState = iota // HeartbeatStarting indicates the heartbeat is starting HeartbeatStarting // HeartbeatRunning indicates the heartbeat is active HeartbeatRunning // HeartbeatStopping indicates the heartbeat is stopping HeartbeatStopping )
func (HeartbeatState) String ¶
func (s HeartbeatState) String() string
String returns the string representation of the heartbeat state
type HeartbeatStats ¶
type HeartbeatStats struct {
PingsSent int64
PongsReceived int64
MissedPongs int64
HealthChecks int64
UnhealthyPeriods int64
LastPingAt time.Time
LastPongAt time.Time
AverageRTT time.Duration
MinRTT time.Duration
MaxRTT time.Duration
// contains filtered or unexported fields
}
HeartbeatStats tracks heartbeat statistics
type ImprovedTransport ¶
type ImprovedTransport struct {
// contains filtered or unexported fields
}
ImprovedTransport implements the WebSocket transport with enhanced memory management
func NewImprovedTransport ¶
func NewImprovedTransport(config *TransportConfig) (*ImprovedTransport, error)
NewImprovedTransport creates a new WebSocket transport with improved memory management
func (*ImprovedTransport) AddEventHandler ¶
func (t *ImprovedTransport) AddEventHandler(eventType string, handler EventHandler) string
AddEventHandler adds an event handler for a specific event type and returns a handler ID
func (*ImprovedTransport) GetStats ¶
func (t *ImprovedTransport) GetStats() TransportStats
GetStats returns a copy of the transport statistics
func (*ImprovedTransport) RemoveEventHandler ¶
func (t *ImprovedTransport) RemoveEventHandler(eventType string, handlerID string) error
RemoveEventHandler removes an event handler by its ID
func (*ImprovedTransport) Start ¶
func (t *ImprovedTransport) Start(ctx context.Context) error
Start initializes the WebSocket transport
func (*ImprovedTransport) Stop ¶
func (t *ImprovedTransport) Stop() error
Stop gracefully shuts down the WebSocket transport
func (*ImprovedTransport) Subscribe ¶
func (t *ImprovedTransport) Subscribe(ctx context.Context, eventTypes []string, handler EventHandler) (*Subscription, error)
Subscribe creates a subscription for specific event types
func (*ImprovedTransport) Unsubscribe ¶
func (t *ImprovedTransport) Unsubscribe(subscriptionID string) error
Unsubscribe removes a subscription
type IsolatedTestRunner ¶
type IsolatedTestRunner struct {
// contains filtered or unexported fields
}
IsolatedTestRunner provides test isolation utilities
func NewIsolatedTestRunner ¶
func NewIsolatedTestRunner(t *testing.T) *IsolatedTestRunner
NewIsolatedTestRunner creates a new isolated test runner
func (*IsolatedTestRunner) GetCleanupHelper ¶
func (r *IsolatedTestRunner) GetCleanupHelper() *TestCleanupHelper
GetCleanupHelper returns the main cleanup helper
func (*IsolatedTestRunner) RunIsolated ¶
func (r *IsolatedTestRunner) RunIsolated(name string, timeout time.Duration, testFunc func(*TestCleanupHelper))
RunIsolated runs a test function with proper isolation
type JWTConfigExample ¶
type JWTConfigExample struct{}
JWTConfigExample shows how to configure JWT authentication for WebSocket transport
type JWTTokenValidator ¶
type JWTTokenValidator struct {
// contains filtered or unexported fields
}
JWTTokenValidator provides JWT token validation
func NewJWTTokenValidator ¶
func NewJWTTokenValidator(secretKey []byte, issuer string) *JWTTokenValidator
NewJWTTokenValidator creates a new JWT token validator
func NewJWTTokenValidatorRSA ¶
func NewJWTTokenValidatorRSA(publicKey *rsa.PublicKey, issuer, audience string) *JWTTokenValidator
NewJWTTokenValidatorRSA creates a new JWT token validator for RSA signatures
func NewJWTTokenValidatorWithOptions ¶
func NewJWTTokenValidatorWithOptions(secretKey []byte, issuer, audience string, signingMethod jwt.SigningMethod) *JWTTokenValidator
NewJWTTokenValidatorWithOptions creates a new JWT token validator with full options
func (*JWTTokenValidator) ValidateToken ¶
func (v *JWTTokenValidator) ValidateToken(ctx context.Context, tokenString string) (*AuthContext, error)
ValidateToken validates a JWT token
type LoadBalancingStrategy ¶
type LoadBalancingStrategy int
LoadBalancingStrategy defines different load balancing strategies
const ( // RoundRobin distributes requests evenly across connections RoundRobin LoadBalancingStrategy = iota // LeastConnections selects the connection with the fewest active requests LeastConnections // HealthBased selects the healthiest connection HealthBased // Random selects a random connection Random )
func (LoadBalancingStrategy) String ¶
func (s LoadBalancingStrategy) String() string
String returns the string representation of the load balancing strategy
type LoadTestServer ¶
type LoadTestServer struct {
// contains filtered or unexported fields
}
LoadTestServer provides a server for load testing scenarios
func NewLoadTestServer ¶
func NewLoadTestServer(t testing.TB) *LoadTestServer
func (*LoadTestServer) Close ¶
func (s *LoadTestServer) Close()
func (*LoadTestServer) GetStats ¶
func (s *LoadTestServer) GetStats() (connections, messages, errors int64)
func (*LoadTestServer) SetDropRate ¶
func (s *LoadTestServer) SetDropRate(rate float64)
func (*LoadTestServer) SetEchoMode ¶
func (s *LoadTestServer) SetEchoMode(enabled bool)
func (*LoadTestServer) URL ¶
func (s *LoadTestServer) URL() string
type MemoryManager ¶
type MemoryManager struct {
// contains filtered or unexported fields
}
MemoryManager manages memory usage and optimization
func NewMemoryManager ¶
func NewMemoryManager(maxMemory int64) *MemoryManager
NewMemoryManager creates a new memory manager
func (*MemoryManager) AllocateBuffer ¶
func (mm *MemoryManager) AllocateBuffer(size int) []byte
AllocateBuffer allocates a buffer with memory tracking
func (*MemoryManager) DeallocateBuffer ¶
func (mm *MemoryManager) DeallocateBuffer(buf []byte)
DeallocateBuffer deallocates a buffer
func (*MemoryManager) GetCurrentUsage ¶
func (mm *MemoryManager) GetCurrentUsage() int64
GetCurrentUsage returns current memory usage
func (*MemoryManager) GetMemoryPressure ¶
func (mm *MemoryManager) GetMemoryPressure() float64
GetMemoryPressure returns the current memory pressure percentage
func (*MemoryManager) GetMonitoringInterval ¶
func (mm *MemoryManager) GetMonitoringInterval() time.Duration
GetMonitoringInterval returns the current monitoring interval
func (*MemoryManager) GetStats ¶
func (mm *MemoryManager) GetStats() map[string]int64
GetStats returns memory manager statistics
func (*MemoryManager) Start ¶
func (mm *MemoryManager) Start(ctx context.Context, wg *sync.WaitGroup)
Start starts the memory manager with dynamic monitoring intervals
func (*MemoryManager) TriggerCheck ¶
func (mm *MemoryManager) TriggerCheck()
TriggerCheck triggers an immediate memory check and interval update
type MessageBatcher ¶
type MessageBatcher struct {
// contains filtered or unexported fields
}
MessageBatcher batches messages for optimized transmission
func NewMessageBatcher ¶
func NewMessageBatcher(batchSize int, batchTimeout time.Duration) *MessageBatcher
NewMessageBatcher creates a new message batcher
func (*MessageBatcher) AddMessage ¶
func (mb *MessageBatcher) AddMessage(data []byte) error
AddMessage adds a message to the batch
func (*MessageBatcher) Close ¶
func (mb *MessageBatcher) Close()
Close closes the message batcher and its channels
func (*MessageBatcher) GetBatch ¶
func (mb *MessageBatcher) GetBatch() [][]byte
GetBatch gets a batch of messages
func (*MessageBatcher) GetStats ¶
func (mb *MessageBatcher) GetStats() map[string]interface{}
GetStats returns batcher statistics
type MessageSerializer ¶
type MessageSerializer interface {
Serialize(event events.Event) ([]byte, error)
Deserialize(data []byte) (events.Event, error)
}
MessageSerializer defines interface for message serialization
type MetricsCollector ¶
type MetricsCollector struct {
// contains filtered or unexported fields
}
MetricsCollector collects and tracks performance metrics
func NewMetricsCollector ¶
func NewMetricsCollector(interval time.Duration) *MetricsCollector
NewMetricsCollector creates a new metrics collector
func (*MetricsCollector) GetMetrics ¶
func (mc *MetricsCollector) GetMetrics() *PerformanceMetrics
GetMetrics returns a copy of current metrics
func (*MetricsCollector) Start ¶
func (mc *MetricsCollector) Start(ctx context.Context, wg *sync.WaitGroup)
Start starts the metrics collector
func (*MetricsCollector) TrackConnectionTime ¶
func (mc *MetricsCollector) TrackConnectionTime(duration time.Duration)
TrackConnectionTime tracks connection establishment time
func (*MetricsCollector) TrackError ¶
func (mc *MetricsCollector) TrackError(errorType string)
TrackError tracks an error
func (*MetricsCollector) TrackMessageLatency ¶
func (mc *MetricsCollector) TrackMessageLatency(latency time.Duration)
TrackMessageLatency tracks message processing latency
func (*MetricsCollector) TrackMessageSize ¶
func (mc *MetricsCollector) TrackMessageSize(size int)
TrackMessageSize tracks message size
func (*MetricsCollector) TrackSerializationTime ¶
func (mc *MetricsCollector) TrackSerializationTime(duration time.Duration)
TrackSerializationTime tracks serialization time
type MinimalTestHelper ¶
type MinimalTestHelper struct {
// contains filtered or unexported fields
}
MinimalTestHelper provides the most basic resource management for tests
func NewMinimalTestHelper ¶
func NewMinimalTestHelper(t *testing.T) *MinimalTestHelper
NewMinimalTestHelper creates a minimal test helper
func (*MinimalTestHelper) Cleanup ¶
func (h *MinimalTestHelper) Cleanup()
Cleanup performs fast cleanup with aggressive timeouts
func (*MinimalTestHelper) CreateConnection ¶
func (h *MinimalTestHelper) CreateConnection(url string) *Connection
CreateConnection creates a minimal connection
func (*MinimalTestHelper) CreateServer ¶
func (h *MinimalTestHelper) CreateServer() *MinimalTestServer
CreateServer creates a minimal test server
type MinimalTestServer ¶
type MinimalTestServer struct {
// contains filtered or unexported fields
}
MinimalTestServer provides the simplest possible WebSocket test server
func NewMinimalTestServer ¶
func NewMinimalTestServer(t *testing.T) *MinimalTestServer
NewMinimalTestServer creates a minimal test server
func (*MinimalTestServer) URL ¶
func (s *MinimalTestServer) URL() string
URL returns the server's WebSocket URL
type MockEvent ¶
type MockEvent struct {
EventType events.EventType
Data interface{}
TimestampMs *int64 // Exported field for test initialization
ValidationFunc func() error // Optional validation function for testing
// contains filtered or unexported fields
}
MockEvent implements events.Event for websocket testing
func (*MockEvent) GetBaseEvent ¶
func (*MockEvent) SetTimestamp ¶
type NoOpWSAuditLogger ¶
type NoOpWSAuditLogger struct{}
NoOpWSAuditLogger provides a no-op audit logger for testing
func (*NoOpWSAuditLogger) LogSecurityEvent ¶
func (l *NoOpWSAuditLogger) LogSecurityEvent(ctx context.Context, event *SecurityEvent) error
LogSecurityEvent logs a security event (no-op)
type OptimizedConnectionSlot ¶
type OptimizedConnectionSlot struct {
ID string
Connection interface{}
InUse int32 // Use atomic operations instead of mutex
LastUsed int64 // Unix timestamp
}
OptimizedConnectionSlot reduces allocations in connection management
type OptimizedTransport ¶
type OptimizedTransport struct {
*Transport
// contains filtered or unexported fields
}
OptimizedTransport wraps the standard transport with performance optimizations
func NewOptimizedTransport ¶
func NewOptimizedTransport(config *TransportConfig) (*OptimizedTransport, error)
NewOptimizedTransport creates a new optimized transport
func (*OptimizedTransport) AddEventHandlerOptimized ¶
func (t *OptimizedTransport) AddEventHandlerOptimized(eventType string, handler EventHandler) string
AddEventHandlerOptimized adds an event handler with reduced allocations
func (*OptimizedTransport) RemoveEventHandlerOptimized ¶
func (t *OptimizedTransport) RemoveEventHandlerOptimized(eventType string, handlerID string) error
RemoveEventHandlerOptimized removes a handler with minimal allocations
func (*OptimizedTransport) SubscribeOptimized ¶
func (t *OptimizedTransport) SubscribeOptimized(ctx context.Context, eventTypes []string, handler EventHandler) (*Subscription, error)
SubscribeOptimized creates a subscription with reduced allocations
type PerfJSONSerializer ¶
type PerfJSONSerializer struct{}
PerfJSONSerializer implements standard JSON serialization
func (*PerfJSONSerializer) Deserialize ¶
func (js *PerfJSONSerializer) Deserialize(data []byte) (events.Event, error)
Deserialize deserializes JSON to an event
type PerfOptimizedJSONSerializer ¶
type PerfOptimizedJSONSerializer struct {
// contains filtered or unexported fields
}
PerfOptimizedJSONSerializer implements optimized JSON serialization
func (*PerfOptimizedJSONSerializer) Deserialize ¶
func (ojs *PerfOptimizedJSONSerializer) Deserialize(data []byte) (events.Event, error)
Deserialize deserializes optimized JSON to an event
type PerfProtobufSerializer ¶
type PerfProtobufSerializer struct{}
PerfProtobufSerializer implements Protocol Buffers serialization
func (*PerfProtobufSerializer) Deserialize ¶
func (ps *PerfProtobufSerializer) Deserialize(data []byte) (events.Event, error)
Deserialize deserializes Protocol Buffers to an event
type PerformanceConfig ¶
type PerformanceConfig struct {
// MaxConcurrentConnections is the maximum number of concurrent connections
MaxConcurrentConnections int
// MessageBatchSize is the number of messages to batch together
MessageBatchSize int
// MessageBatchTimeout is the timeout for message batching
MessageBatchTimeout time.Duration
// BufferPoolSize is the size of the buffer pool
BufferPoolSize int
// MaxBufferSize is the maximum size of individual buffers
MaxBufferSize int
// EnableZeroCopy enables zero-copy operations where possible
EnableZeroCopy bool
// EnableMemoryPooling enables memory pool management
EnableMemoryPooling bool
// EnableProfiling enables CPU and memory profiling
EnableProfiling bool
// ProfilingInterval is the interval for profiling snapshots
ProfilingInterval time.Duration
// MaxLatency is the maximum acceptable latency
MaxLatency time.Duration
// MaxMemoryUsage is the maximum memory usage for the given connection count
MaxMemoryUsage int64
// EnableMetrics enables performance metrics collection
EnableMetrics bool
// MetricsInterval is the interval for metrics collection
MetricsInterval time.Duration
// MessageSerializerType defines the serialization method to use
MessageSerializerType SerializerType
// Logger is the logger instance
Logger *zap.Logger
}
PerformanceConfig contains configuration for performance optimizations
func DefaultPerformanceConfig ¶
func DefaultPerformanceConfig() *PerformanceConfig
DefaultPerformanceConfig returns a default performance configuration Uses configurable timeouts that adapt to test/production environments
func HighConcurrencyPerformanceConfig ¶
func HighConcurrencyPerformanceConfig() *PerformanceConfig
HighConcurrencyPerformanceConfig returns a performance configuration optimized for high concurrency testing Uses configurable timeouts that adapt to test/production environments
type PerformanceManager ¶
type PerformanceManager struct {
// contains filtered or unexported fields
}
PerformanceManager manages performance optimizations for WebSocket transport
func NewPerformanceManager ¶
func NewPerformanceManager(config *PerformanceConfig) (*PerformanceManager, error)
NewPerformanceManager creates a new performance manager
func (*PerformanceManager) BatchMessage ¶
func (pm *PerformanceManager) BatchMessage(data []byte) error
BatchMessage adds a message to the batch for optimized transmission
func (*PerformanceManager) FastStop ¶
func (pm *PerformanceManager) FastStop() error
FastStop provides immediate shutdown for test scenarios
func (*PerformanceManager) GetBuffer ¶
func (pm *PerformanceManager) GetBuffer() []byte
GetBuffer gets a buffer from the pool
func (*PerformanceManager) GetConnectionSlot ¶
func (pm *PerformanceManager) GetConnectionSlot(ctx context.Context) (*ConnectionSlot, error)
GetConnectionSlot acquires a connection slot
func (*PerformanceManager) GetMemoryUsage ¶
func (pm *PerformanceManager) GetMemoryUsage() int64
GetMemoryUsage returns current memory usage
func (*PerformanceManager) GetMetrics ¶
func (pm *PerformanceManager) GetMetrics() *PerformanceMetrics
GetMetrics returns performance metrics
func (*PerformanceManager) OptimizeMessage ¶
func (pm *PerformanceManager) OptimizeMessage(event events.Event) ([]byte, error)
OptimizeMessage optimizes a message for transmission
func (*PerformanceManager) PutBuffer ¶
func (pm *PerformanceManager) PutBuffer(buf []byte)
PutBuffer returns a buffer to the pool
func (*PerformanceManager) ReleaseConnectionSlot ¶
func (pm *PerformanceManager) ReleaseConnectionSlot(slot *ConnectionSlot)
ReleaseConnectionSlot releases a connection slot
func (*PerformanceManager) Start ¶
func (pm *PerformanceManager) Start(ctx context.Context) error
Start initializes the performance manager
func (*PerformanceManager) Stop ¶
func (pm *PerformanceManager) Stop() error
Stop gracefully shuts down the performance manager
type PerformanceMetrics ¶
type PerformanceMetrics struct {
// Connection metrics
ActiveConnections int64
TotalConnections int64
ConnectionsPerSecond float64
AvgConnectionTime time.Duration
// Message metrics
MessagesPerSecond float64
MessagesSent int64
MessagesReceived int64
MessagesFailures int64
AvgMessageSize float64
// Latency metrics
AvgLatency time.Duration
MinLatency time.Duration
MaxLatency time.Duration
P95Latency time.Duration
P99Latency time.Duration
// Throughput metrics
BytesPerSecond float64
TotalBytesSent int64
TotalBytesReceived int64
// Memory metrics
MemoryUsage int64
BufferPoolUsage int64
GCPauses int64
AvgGCPause time.Duration
// Serialization metrics
SerializationTime time.Duration
SerializationPerSec float64
SerializationFailures int64
// Error metrics
ErrorRate float64
TotalErrors int64
ConnectionErrors int64
TimeoutErrors int64
// System metrics
CPUUsage float64
GoroutineCount int64
HeapSize int64
StackSize int64
// Timestamps
StartTime time.Time
LastUpdate time.Time
Uptime time.Duration
}
PerformanceMetrics contains various performance metrics
type PerformanceOptimizer ¶
type PerformanceOptimizer struct {
// contains filtered or unexported fields
}
PerformanceOptimizer provides high-level optimization methods
func NewPerformanceOptimizer ¶
func NewPerformanceOptimizer(manager *PerformanceManager) *PerformanceOptimizer
NewPerformanceOptimizer creates a new performance optimizer
func (*PerformanceOptimizer) OptimizeForLatency ¶
func (po *PerformanceOptimizer) OptimizeForLatency()
OptimizeForLatency optimizes configuration for minimum latency
func (*PerformanceOptimizer) OptimizeForMemory ¶
func (po *PerformanceOptimizer) OptimizeForMemory()
OptimizeForMemory optimizes configuration for minimum memory usage
func (*PerformanceOptimizer) OptimizeForThroughput ¶
func (po *PerformanceOptimizer) OptimizeForThroughput()
OptimizeForThroughput optimizes configuration for maximum throughput
type PoolConfig ¶
type PoolConfig struct {
// MinConnections is the minimum number of connections to maintain
MinConnections int
// MaxConnections is the maximum number of connections allowed
MaxConnections int
// ConnectionTimeout is the timeout for establishing connections
ConnectionTimeout time.Duration
// HealthCheckInterval is the interval for health checks
HealthCheckInterval time.Duration
// IdleTimeout is the timeout for idle connections
IdleTimeout time.Duration
// MaxIdleConnections is the maximum number of idle connections
MaxIdleConnections int
// LoadBalancingStrategy defines how connections are selected
LoadBalancingStrategy LoadBalancingStrategy
// ConnectionTemplate is the template configuration for new connections
ConnectionTemplate *ConnectionConfig
// URLs are the WebSocket URLs to connect to
URLs []string
// Logger is the logger instance
Logger *zap.Logger
}
PoolConfig contains configuration for the connection pool
func DefaultPoolConfig ¶
func DefaultPoolConfig() *PoolConfig
DefaultPoolConfig returns a default configuration for the connection pool
type PoolStats ¶
type PoolStats struct {
TotalConnections int64
ActiveConnections int64
IdleConnections int64
HealthyConnections int64
UnhealthyConnections int64
TotalRequests int64
FailedRequests int64
TotalBytesReceived int64
TotalBytesSent int64
AverageResponseTime time.Duration
// contains filtered or unexported fields
}
PoolStats tracks connection pool statistics
type Profiler ¶
type Profiler struct {
// contains filtered or unexported fields
}
Profiler handles CPU and memory profiling
func NewProfiler ¶
NewProfiler creates a new profiler
func (*Profiler) GetProfilingData ¶
GetProfilingData returns current profiling data
type ReliableConnectionTester ¶
type ReliableConnectionTester struct {
// contains filtered or unexported fields
}
ReliableConnectionTester provides utilities for testing WebSocket connections reliably
func NewReliableConnectionTester ¶
func NewReliableConnectionTester(t *testing.T) *ReliableConnectionTester
NewReliableConnectionTester creates a new connection tester
func (*ReliableConnectionTester) Cleanup ¶
func (rt *ReliableConnectionTester) Cleanup()
Cleanup cleans up test resources
func (*ReliableConnectionTester) CreateConnection ¶
func (rt *ReliableConnectionTester) CreateConnection() *Connection
CreateConnection creates a new connection with the test configuration
func (*ReliableConnectionTester) TestConcurrentConnections ¶
func (rt *ReliableConnectionTester) TestConcurrentConnections(numConnections int, testFunc func(int, *Connection))
TestConcurrentConnections tests multiple connections concurrently
func (*ReliableConnectionTester) TestConnection ¶
func (rt *ReliableConnectionTester) TestConnection(testFunc func(*Connection))
TestConnection tests a connection with retry logic
type ReliableMessageTester ¶
type ReliableMessageTester struct {
// contains filtered or unexported fields
}
ReliableMessageTester helps test message sending/receiving reliably
func NewReliableMessageTester ¶
func NewReliableMessageTester(conn *Connection) *ReliableMessageTester
NewReliableMessageTester creates a new message tester
func (*ReliableMessageTester) SendAndVerify ¶
func (mt *ReliableMessageTester) SendAndVerify(ctx context.Context, message []byte, timeout time.Duration) error
SendAndVerify sends a message and verifies it's echoed back
func (*ReliableMessageTester) Stop ¶
func (mt *ReliableMessageTester) Stop()
Stop stops the message tester
type ReliableTestServer ¶
type ReliableTestServer struct {
// contains filtered or unexported fields
}
ReliableTestServer provides a WebSocket server designed for reliable testing
func CreateIsolatedServer ¶
func CreateIsolatedServer(t *testing.T, helper *TestCleanupHelper) *ReliableTestServer
CreateIsolatedServer creates a server with proper cleanup registration
func NewReliableTestServer ¶
func NewReliableTestServer(t *testing.T) *ReliableTestServer
NewReliableTestServer creates a new reliable test server
func (*ReliableTestServer) Close ¶
func (s *ReliableTestServer) Close()
Close gracefully shuts down the server
func (*ReliableTestServer) GetConnectionCount ¶
func (s *ReliableTestServer) GetConnectionCount() int
GetConnectionCount returns the current number of active connections
func (*ReliableTestServer) GetStats ¶
func (s *ReliableTestServer) GetStats() (connections, messages, errors int64)
GetStats returns current server statistics
func (*ReliableTestServer) SetDelay ¶
func (s *ReliableTestServer) SetDelay(delay time.Duration)
SetDelay sets artificial delay for message processing
func (*ReliableTestServer) SetEchoMode ¶
func (s *ReliableTestServer) SetEchoMode(enabled bool)
SetEchoMode enables or disables echo mode
func (*ReliableTestServer) URL ¶
func (s *ReliableTestServer) URL() string
URL returns the WebSocket URL
type ResourceCleanupConfig ¶
type ResourceCleanupConfig struct {
// EnableGoroutineTracking enables tracking of goroutines
EnableGoroutineTracking bool
// CleanupInterval is the interval for resource cleanup
CleanupInterval time.Duration
// MaxGoroutineIdleTime is the maximum time a goroutine can be idle before cleanup
MaxGoroutineIdleTime time.Duration
// EnableResourceMonitoring enables monitoring of resource usage
EnableResourceMonitoring bool
}
ResourceCleanupConfig configures resource cleanup behavior
func DefaultResourceCleanupConfig ¶
func DefaultResourceCleanupConfig() *ResourceCleanupConfig
DefaultResourceCleanupConfig returns default resource cleanup configuration
type ResourceLimitedTest ¶
type ResourceLimitedTest struct {
// contains filtered or unexported fields
}
ResourceLimitedTest manages test resource consumption to prevent system exhaustion
func DefaultResourceLimits ¶
func DefaultResourceLimits() *ResourceLimitedTest
DefaultResourceLimits returns reasonable limits for websocket tests
func NewResourceLimitedTest ¶
func NewResourceLimitedTest(maxGoroutines int, maxMemoryMB int64) *ResourceLimitedTest
NewResourceLimitedTest creates a resource-limited test manager
func (*ResourceLimitedTest) RunWithLimits ¶
RunWithLimits executes a test function with resource limits
type SafeConnection ¶
type SafeConnection struct {
// contains filtered or unexported fields
}
SafeConnection wraps a WebSocket connection with additional safety checks
func NewSafeConnection ¶
func NewSafeConnection(conn *websocket.Conn) *SafeConnection
NewSafeConnection creates a new safe connection wrapper
func (*SafeConnection) Close ¶
func (sc *SafeConnection) Close() error
Close closes the connection safely
func (*SafeConnection) IsClosed ¶
func (sc *SafeConnection) IsClosed() bool
IsClosed returns true if the connection is closed
func (*SafeConnection) ReadMessage ¶
func (sc *SafeConnection) ReadMessage() (messageType int, p []byte, err error)
ReadMessage safely reads a message from the connection
func (*SafeConnection) SetReadDeadline ¶
func (sc *SafeConnection) SetReadDeadline(t time.Time) error
SetReadDeadline sets the read deadline
func (*SafeConnection) SetWriteDeadline ¶
func (sc *SafeConnection) SetWriteDeadline(t time.Time) error
SetWriteDeadline sets the write deadline
func (*SafeConnection) WriteMessage ¶
func (sc *SafeConnection) WriteMessage(messageType int, data []byte) error
WriteMessage safely writes a message to the connection
type SecureConnection ¶
type SecureConnection struct {
// contains filtered or unexported fields
}
SecureConnection represents a secured WebSocket connection
func (*SecureConnection) ValidateMessage ¶
func (sc *SecureConnection) ValidateMessage(messageType int, data []byte) error
ValidateMessage validates an incoming message
type SecurityConfig ¶
type SecurityConfig struct {
// Authentication
RequireAuth bool `json:"require_auth"`
AuthTimeout time.Duration `json:"auth_timeout"`
TokenValidator TokenValidator `json:"-"`
// Origin validation
AllowedOrigins []string `json:"allowed_origins"`
StrictOriginCheck bool `json:"strict_origin_check"`
// Subprotocol negotiation
SupportedProtocols []string `json:"supported_protocols"`
RequireSubprotocol bool `json:"require_subprotocol"`
// Rate limiting
GlobalRateLimit float64 `json:"global_rate_limit"` // requests per second
ClientRateLimit float64 `json:"client_rate_limit"` // requests per second per client
ClientBurstSize int `json:"client_burst_size"` // burst size for client rate limiting
MaxConnections int `json:"max_connections"` // maximum concurrent connections
// TLS/SSL settings
TLSConfig *tls.Config `json:"-"`
RequireTLS bool `json:"require_tls"`
MinTLSVersion uint16 `json:"min_tls_version"`
CertFile string `json:"cert_file"`
KeyFile string `json:"key_file"`
// Attack protection
MaxMessageSize int64 `json:"max_message_size"` // maximum message size in bytes
MaxFrameSize int64 `json:"max_frame_size"` // maximum frame size in bytes
ReadDeadline time.Duration `json:"read_deadline"` // read timeout
WriteDeadline time.Duration `json:"write_deadline"` // write timeout
PingInterval time.Duration `json:"ping_interval"` // ping interval for keepalive
PongTimeout time.Duration `json:"pong_timeout"` // pong timeout
// Audit logging
AuditLogger WSAuditLogger `json:"-"`
LogConnections bool `json:"log_connections"`
LogMessages bool `json:"log_messages"`
LogSecurityEvents bool `json:"log_security_events"`
}
SecurityConfig defines WebSocket security configuration
func DefaultSecurityConfig ¶
func DefaultSecurityConfig() *SecurityConfig
DefaultSecurityConfig returns secure default configuration Uses configurable timeouts that adapt to test/production environments
func ExampleHMACConfig ¶
func ExampleHMACConfig() (*SecurityConfig, error)
ExampleHMACConfig demonstrates configuring JWT validation with HMAC (symmetric key)
func ExampleRSAConfig ¶
func ExampleRSAConfig() (*SecurityConfig, error)
ExampleRSAConfig demonstrates configuring JWT validation with RSA (asymmetric key)
type SecurityEvent ¶
type SecurityEvent struct {
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
ClientIP string `json:"client_ip"`
UserAgent string `json:"user_agent"`
UserID string `json:"user_id,omitempty"`
Details map[string]interface{} `json:"details,omitempty"`
Severity string `json:"severity"`
Message string `json:"message"`
}
SecurityEvent represents a security-related event
type SecurityManager ¶
type SecurityManager struct {
// contains filtered or unexported fields
}
SecurityManager handles WebSocket security enforcement
func NewSecurityManager ¶
func NewSecurityManager(config *SecurityConfig) *SecurityManager
NewSecurityManager creates a new WebSocket security manager
func (*SecurityManager) CreateUpgrader ¶
func (sm *SecurityManager) CreateUpgrader() *websocket.Upgrader
CreateUpgrader creates a configured WebSocket upgrader
func (*SecurityManager) GetStats ¶
func (sm *SecurityManager) GetStats() map[string]interface{}
GetStats returns security statistics
func (*SecurityManager) SecureConnection ¶
func (sm *SecurityManager) SecureConnection(conn *websocket.Conn, authContext *AuthContext, r *http.Request) *SecureConnection
SecureConnection wraps a WebSocket connection with security features
func (*SecurityManager) Shutdown ¶
func (sm *SecurityManager) Shutdown()
Shutdown gracefully shuts down the security manager
func (*SecurityManager) ValidateUpgrade ¶
func (sm *SecurityManager) ValidateUpgrade(w http.ResponseWriter, r *http.Request) (*AuthContext, error)
ValidateUpgrade validates a WebSocket upgrade request
type SequencedTestRunner ¶
type SequencedTestRunner struct {
// contains filtered or unexported fields
}
SequencedTestRunner manages test execution to prevent resource conflicts
func NewSequencedTestRunner ¶
func NewSequencedTestRunner(t *testing.T, category TestCategory, timeout time.Duration) *SequencedTestRunner
NewSequencedTestRunner creates a test runner with proper sequencing
func (*SequencedTestRunner) Run ¶
func (r *SequencedTestRunner) Run(testFunc func(*MinimalTestHelper))
Run executes the test with proper sequencing and resource management
type SerializerFactory ¶
type SerializerFactory struct {
// contains filtered or unexported fields
}
SerializerFactory creates message serializers
func NewSerializerFactory ¶
func NewSerializerFactory(serializerType SerializerType) *SerializerFactory
NewSerializerFactory creates a new serializer factory
func (*SerializerFactory) GetSerializer ¶
func (sf *SerializerFactory) GetSerializer() MessageSerializer
GetSerializer gets a serializer from the pool
func (*SerializerFactory) PutSerializer ¶
func (sf *SerializerFactory) PutSerializer(serializer MessageSerializer)
PutSerializer returns a serializer to the pool
type SerializerType ¶
type SerializerType int
SerializerType defines different serialization methods
const ( // JSONSerializer uses standard JSON serialization JSONSerializer SerializerType = iota // OptimizedJSONSerializer uses optimized JSON serialization OptimizedJSONSerializer // ProtobufSerializer uses Protocol Buffers serialization ProtobufSerializer )
func (SerializerType) String ¶
func (s SerializerType) String() string
String returns the string representation of the serializer type
type StaleConnectionError ¶
type StaleConnectionError struct {
Generation int64
}
StaleConnectionError indicates that a connection reference has become stale
func (*StaleConnectionError) Error ¶
func (e *StaleConnectionError) Error() string
type Subscription ¶
type Subscription struct {
ID string
EventTypes []string
Handler EventHandler
HandlerIDs []string // Track handler IDs for reliable removal
Context context.Context
Cancel context.CancelFunc
CreatedAt time.Time
LastEventAt time.Time
EventCount int64
// contains filtered or unexported fields
}
Subscription represents an event subscription
type TestCategory ¶
type TestCategory int
TestCategory defines the resource intensity of a test
const ( LightTest TestCategory = iota // Basic unit tests MediumTest // Integration tests HeavyTest // Load/performance tests )
type TestCleanupHelper ¶
type TestCleanupHelper struct {
// contains filtered or unexported fields
}
TestCleanupHelper provides aggressive cleanup utilities for tests
func NewTestCleanupHelper ¶
func NewTestCleanupHelper(t *testing.T) *TestCleanupHelper
NewTestCleanupHelper creates a new cleanup helper
func (*TestCleanupHelper) CleanupAll ¶
func (h *TestCleanupHelper) CleanupAll()
CleanupAll performs aggressive cleanup of all registered resources with enhanced goroutine termination
func (*TestCleanupHelper) RegisterCleanupFunc ¶
func (h *TestCleanupHelper) RegisterCleanupFunc(f func() error)
RegisterCleanupFunc registers a custom cleanup function
func (*TestCleanupHelper) RegisterConnection ¶
func (h *TestCleanupHelper) RegisterConnection(conn *Connection)
RegisterConnection registers a connection for cleanup
func (*TestCleanupHelper) RegisterServer ¶
func (h *TestCleanupHelper) RegisterServer(server *ReliableTestServer)
RegisterServer registers a server for cleanup
func (*TestCleanupHelper) RegisterTransport ¶
func (h *TestCleanupHelper) RegisterTransport(transport *Transport)
RegisterTransport registers a transport for cleanup
type TestConfig ¶
TestConfig defines test timeout configurations
func FastTestConfig ¶
func FastTestConfig() TestConfig
FastTestConfig returns optimized timeout configurations for faster test execution
type TestConnectionManager ¶
type TestConnectionManager struct {
// contains filtered or unexported fields
}
TestConnectionManager helps manage connections in tests to prevent leaks
func NewTestConnectionManager ¶
func NewTestConnectionManager() *TestConnectionManager
NewTestConnectionManager creates a new connection manager for tests
func (*TestConnectionManager) AddConnection ¶
func (m *TestConnectionManager) AddConnection(conn *Connection)
AddConnection adds a connection to be managed
func (*TestConnectionManager) AddServer ¶
func (m *TestConnectionManager) AddServer(server *ReliableTestServer)
AddServer adds a test server to be managed
func (*TestConnectionManager) AddTransport ¶
func (m *TestConnectionManager) AddTransport(transport *Transport)
AddTransport adds a transport to be managed
func (*TestConnectionManager) CleanupAll ¶
func (m *TestConnectionManager) CleanupAll(t *testing.T)
CleanupAll closes all managed resources with timeout
type TestResourceManager ¶
type TestResourceManager struct {
// contains filtered or unexported fields
}
TestResourceManager provides centralized resource management for tests to prevent cumulative resource exhaustion when running the full test suite
func (*TestResourceManager) AcquireBudget ¶
func (trm *TestResourceManager) AcquireBudget(testName string, requestedGoroutines int64) bool
AcquireBudget attempts to acquire a portion of the global goroutine budget
func (*TestResourceManager) AcquireHeavyTestSlot ¶
func (trm *TestResourceManager) AcquireHeavyTestSlot(t *testing.T, testName string) func()
AcquireHeavyTestSlot acquires a slot for resource-intensive tests This prevents too many heavy tests from running simultaneously
func (*TestResourceManager) GetBudgetStatus ¶
func (trm *TestResourceManager) GetBudgetStatus() (active, max int64, utilizationPercent float64)
GetBudgetStatus returns current budget usage statistics
func (*TestResourceManager) ReleaseBudget ¶
func (trm *TestResourceManager) ReleaseBudget(testName string)
ReleaseBudget releases the goroutine budget for a test
type TokenValidator ¶
type TokenValidator interface {
ValidateToken(ctx context.Context, token string) (*AuthContext, error)
}
TokenValidator interface for authentication token validation
func ExampleCustomValidation ¶
func ExampleCustomValidation() TokenValidator
ExampleCustomValidation demonstrates custom JWT validation with additional claims
type Transport ¶
type Transport struct {
// contains filtered or unexported fields
}
Transport implements the WebSocket transport for the AG-UI protocol
func CreateIsolatedTransport ¶
func CreateIsolatedTransport(t *testing.T, helper *TestCleanupHelper, config *TransportConfig) *Transport
CreateIsolatedTransport creates a transport with proper cleanup registration
func NewTransport ¶
func NewTransport(config *TransportConfig) (*Transport, error)
NewTransport creates a new WebSocket transport
func (*Transport) AddEventHandler ¶
func (t *Transport) AddEventHandler(eventType string, handler EventHandler) string
AddEventHandler adds an event handler for a specific event type and returns a handler ID
func (*Transport) CleanupEventHandlers ¶
func (t *Transport) CleanupEventHandlers()
CleanupEventHandlers performs memory-pressure aware cleanup of event handlers
func (*Transport) EnableAdaptiveOptimization ¶
func (t *Transport) EnableAdaptiveOptimization()
EnableAdaptiveOptimization enables adaptive performance optimization
func (*Transport) GetActiveConnectionCount ¶
GetActiveConnectionCount returns the number of active connections
func (*Transport) GetBackpressureStats ¶
func (t *Transport) GetBackpressureStats() BackpressureStats
GetBackpressureStats returns current backpressure statistics
func (*Transport) GetConnectionPoolStats ¶
GetConnectionPoolStats returns the connection pool statistics
func (*Transport) GetDetailedStatus ¶
GetDetailedStatus returns detailed status information
func (*Transport) GetEventHandlerCount ¶
GetEventHandlerCount returns the number of event handlers
func (*Transport) GetEventHandlerStats ¶
GetEventHandlerStats returns statistics about event handlers for memory monitoring
func (*Transport) GetGoroutineStats ¶
func (t *Transport) GetGoroutineStats() map[string]GoroutineStats
GetGoroutineStats returns current goroutine statistics
func (*Transport) GetHealthyConnectionCount ¶
GetHealthyConnectionCount returns the number of healthy connections
func (*Transport) GetMemoryUsage ¶
GetMemoryUsage returns current memory usage
func (*Transport) GetPerformanceMetrics ¶
func (t *Transport) GetPerformanceMetrics() *PerformanceMetrics
GetPerformanceMetrics returns current performance metrics
func (*Transport) GetSubscription ¶
func (t *Transport) GetSubscription(subscriptionID string) (*Subscription, error)
GetSubscription returns a subscription by ID
func (*Transport) IsConnected ¶
IsConnected returns true if the transport has healthy connections
func (*Transport) ListSubscriptions ¶
func (t *Transport) ListSubscriptions() []*Subscription
ListSubscriptions returns all active subscriptions
func (*Transport) OnMemoryPressure ¶
OnMemoryPressure handles memory pressure events by cleaning up handlers
func (*Transport) OptimizeForLatency ¶
func (t *Transport) OptimizeForLatency()
OptimizeForLatency optimizes the transport for minimum latency
func (*Transport) OptimizeForMemory ¶
func (t *Transport) OptimizeForMemory()
OptimizeForMemory optimizes the transport for minimum memory usage
func (*Transport) OptimizeForThroughput ¶
func (t *Transport) OptimizeForThroughput()
OptimizeForThroughput optimizes the transport for maximum throughput
func (*Transport) RegisterCleanupFunc ¶
RegisterCleanupFunc registers a function to be called during shutdown
func (*Transport) RemoveEventHandler ¶
RemoveEventHandler removes an event handler by its ID with enhanced race condition protection
func (*Transport) ResetBackpressureStats ¶
func (t *Transport) ResetBackpressureStats()
ResetBackpressureStats resets backpressure statistics
func (*Transport) ResetForTesting ¶
ResetForTesting provides test isolation by clearing internal state
func (*Transport) Stats ¶
func (t *Transport) Stats() TransportStats
GetStats returns a copy of the transport statistics
func (*Transport) Subscribe ¶
func (t *Transport) Subscribe(ctx context.Context, eventTypes []string, handler EventHandler) (*Subscription, error)
Subscribe creates a subscription for specific event types
func (*Transport) Unsubscribe ¶
Unsubscribe removes a subscription
type TransportConfig ¶
type TransportConfig struct {
// URLs are the WebSocket server URLs
URLs []string
// PoolConfig configures the connection pool
PoolConfig *PoolConfig
// PerformanceConfig configures performance optimizations
PerformanceConfig *PerformanceConfig
// SecurityConfig configures security settings
SecurityConfig *SecurityConfig
// DialTimeout is the timeout for establishing WebSocket connections
DialTimeout time.Duration
// EventTimeout is the timeout for event processing
EventTimeout time.Duration
// MaxEventSize is the maximum size of events
MaxEventSize int64
// EnableEventValidation enables event validation
EnableEventValidation bool
// EventValidator is the event validator instance
EventValidator *events.EventValidator
// Logger is the logger instance
Logger *zap.Logger
// BackpressureConfig configures backpressure behavior
BackpressureConfig *BackpressureConfig
// ResourceCleanupConfig configures resource cleanup
ResourceCleanupConfig *ResourceCleanupConfig
// ShutdownTimeout is the timeout for transport shutdown (default 5s, reduced for tests)
ShutdownTimeout time.Duration
}
TransportConfig contains configuration for the WebSocket transport
func DefaultTransportConfig ¶
func DefaultTransportConfig() *TransportConfig
DefaultTransportConfig returns a default configuration for the WebSocket transport
func ExampleFullConfig ¶
func ExampleFullConfig() (*TransportConfig, error)
ExampleFullConfig demonstrates a complete WebSocket transport configuration with JWT
func FastTransportConfig ¶
func FastTransportConfig() *TransportConfig
FastTransportConfig returns a transport configuration optimized for fast tests
func HighConcurrencyTransportConfig ¶
func HighConcurrencyTransportConfig() *TransportConfig
HighConcurrencyTransportConfig returns a transport configuration optimized for high concurrency testing
func OptimizedTransportConfig ¶
func OptimizedTransportConfig() *TransportConfig
OptimizedTransportConfig returns a transport configuration optimized for testing
type TransportStats ¶
type TransportStats struct {
EventsSent int64
EventsReceived int64
EventsProcessed int64
EventsFailed int64
EventsDropped int64
ActiveSubscriptions int64
TotalSubscriptions int64
BytesTransferred int64
AverageLatency time.Duration
ActiveGoroutines int64
BackpressureEvents int64
ResourceCleanups int64
// contains filtered or unexported fields
}
TransportStats tracks transport statistics
type WSAuditLogger ¶
type WSAuditLogger interface {
LogSecurityEvent(ctx context.Context, event *SecurityEvent) error
}
WSAuditLogger interface for security event logging
type ZeroCopyBuffer ¶
type ZeroCopyBuffer struct {
// contains filtered or unexported fields
}
ZeroCopyBuffer implements zero-copy buffer operations
func NewZeroCopyBuffer ¶
func NewZeroCopyBuffer(data []byte) *ZeroCopyBuffer
NewZeroCopyBuffer creates a new zero-copy buffer
func (*ZeroCopyBuffer) Advance ¶
func (zcb *ZeroCopyBuffer) Advance(n int)
Advance advances the buffer offset
func (*ZeroCopyBuffer) Bytes ¶
func (zcb *ZeroCopyBuffer) Bytes() []byte
Bytes returns the buffer data using zero-copy
func (*ZeroCopyBuffer) String ¶
func (zcb *ZeroCopyBuffer) String() string
String returns the buffer data as a string using zero-copy