websocket

package
v0.0.0-...-2e1155d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 21, 2025 License: MIT Imports: 42 Imported by: 0

README

WebSocket Transport for AG-UI Go SDK

The WebSocket transport provides RFC 6455 compliant WebSocket communication for the AG-UI Go SDK. It offers high-performance, bidirectional streaming with comprehensive message support, connection pooling, and advanced features like compression, security, and error recovery.

Features

  • RFC 6455 Compliance: Full WebSocket protocol implementation using gorilla/websocket
  • Message Integration: Native support for messages.Message interface types
  • Event System: Integration with the core event system
  • Connection Pooling: Advanced connection management with load balancing
  • Streaming Support: Bidirectional message and event streaming
  • Multiple Serialization: JSON and Protocol Buffer message support
  • Compression: Per-message compression with configurable levels
  • Security: TLS support, rate limiting, and message size constraints
  • Error Recovery: Automatic reconnection with exponential backoff
  • Performance: Optimized for high-throughput applications
  • Monitoring: Comprehensive metrics and health checking

Quick Start

Basic Usage
package main

import (
    "context"
    "log"

    "github.com/mattsp1290/ag-ui/go-sdk/pkg/transport/websocket"
    "github.com/mattsp1290/ag-ui/go-sdk/pkg/messages"
)

func main() {
    // Create transport configuration
    config := websocket.DefaultTransportConfig()
    config.URLs = []string{"ws://localhost:8080/ws"}
    
    // Create and start transport
    transport, err := websocket.NewTransport(config)
    if err != nil {
        log.Fatal(err)
    }
    
    ctx := context.Background()
    if err := transport.Start(ctx); err != nil {
        log.Fatal(err)
    }
    defer transport.Stop()
    
    // Create message integration
    messageIntegration := websocket.NewMessageIntegration(transport, websocket.FormatJSON)
    defer messageIntegration.Close()
    
    // Send a message
    msg := &messages.UserMessage{
        BaseMessage: messages.BaseMessage{
            Role: messages.RoleUser,
        },
        Content: stringPtr("Hello, WebSocket!"),
    }
    
    if err := messageIntegration.SendMessage(ctx, msg); err != nil {
        log.Fatal(err)
    }
}

func stringPtr(s string) *string { return &s }
With Message Handlers
// Set up message handling
messageIntegration.SetMessageHandler(websocket.MessageHandlerFunc(func(ctx context.Context, message messages.Message) error {
    fmt.Printf("Received message from %s: %v\n", message.GetRole(), message.GetContent())
    return nil
}))

// Set up event handling
messageIntegration.SetEventHandler(websocket.CoreEventHandlerFunc(func(ctx context.Context, event core.Event[any]) error {
    fmt.Printf("Received event: %s\n", event.Type())
    return nil
}))

Configuration

Basic Configuration
config := websocket.DefaultTransportConfig()
config.URLs = []string{"ws://localhost:8080/ws"}
config.DialTimeout = 30 * time.Second  // Timeout for establishing WebSocket connections
config.EventTimeout = 30 * time.Second
config.MaxEventSize = 1024 * 1024 // 1MB
Connection Pool Configuration
poolConfig := websocket.DefaultPoolConfig()
poolConfig.URLs = []string{
    "ws://server1:8080/ws",
    "ws://server2:8080/ws",
    "ws://server3:8080/ws",
}
poolConfig.MinConnections = 2
poolConfig.MaxConnections = 10
poolConfig.MaxRetries = 5
poolConfig.RetryInterval = 2 * time.Second
poolConfig.HealthCheckInterval = 30 * time.Second

config := websocket.DefaultTransportConfig()
config.PoolConfig = poolConfig
Advanced Configuration
poolConfig := &websocket.PoolConfig{
    URLs:                []string{"wss://secure-server:443/ws"},
    MinConnections:      3,
    MaxConnections:      15,
    ConnectionTimeout:   10 * time.Second,
    MaxRetries:          10,
    RetryInterval:       1 * time.Second,
    HealthCheckInterval: 20 * time.Second,
    IdleTimeout:         60 * time.Second,
    MaxIdleTime:         300 * time.Second,
    
    // TLS Configuration
    TLSConfig: &websocket.TLSConfig{
        InsecureSkipVerify: false,
        CertFile:          "/path/to/client.crt",
        KeyFile:           "/path/to/client.key",
        CAFile:            "/path/to/ca.crt",
    },
    
    // Compression Configuration
    CompressionConfig: &websocket.CompressionConfig{
        Enabled:              true,
        CompressionLevel:     6,
        CompressionThreshold: 1024,
        MaxCompressionRatio:  0.8,
        MaxMemoryUsage:       64 * 1024 * 1024, // 64MB
    },
    
    // Security Configuration
    SecurityConfig: &websocket.SecurityConfig{
        EnableRateLimiting:   true,
        RateLimitRequests:    100,
        RateLimitWindow:      time.Minute,
        MaxMessageSize:       5 * 1024 * 1024, // 5MB
        EnableMessageBuffer:  true,
        MessageBufferSize:    1000,
        EnableDDoSProtection: true,
        MaxConnectionsPerIP:  10,
    },
}

Message Formats

JSON Messages (Default)
messageIntegration := websocket.NewMessageIntegration(transport, websocket.FormatJSON)

// All message types are supported
userMsg := &messages.UserMessage{...}
assistantMsg := &messages.AssistantMessage{...}
systemMsg := &messages.SystemMessage{...}
toolMsg := &messages.ToolMessage{...}
devMsg := &messages.DeveloperMessage{...}
Protocol Buffer Messages
messageIntegration := websocket.NewMessageIntegration(transport, websocket.FormatProtobuf)

// Messages must implement proto.Message interface
// Custom protobuf definitions required

Streaming

Message Streaming
// Get message stream
messagesChan, err := messageIntegration.ReceiveMessages(ctx)
if err != nil {
    log.Fatal(err)
}

// Create stream wrapper
messageStream := websocket.NewMessageStreamWrapper(messagesChan, ctx)
defer messageStream.Close()

// Process streaming messages
for {
    event, err := messageStream.Next(ctx)
    if err != nil {
        log.Printf("Stream error: %v", err)
        break
    }
    
    if event == nil {
        break // Stream ended
    }
    
    fmt.Printf("Stream message: %v\n", event.Message)
}
Event Streaming
// Get event stream
eventsChan, err := messageIntegration.ReceiveEvents(ctx)
if err != nil {
    log.Fatal(err)
}

// Process streaming events
for event := range eventsChan {
    fmt.Printf("Event: %s - %v\n", event.Type(), event.Data())
}

Error Handling

Automatic Reconnection
poolConfig := websocket.DefaultPoolConfig()
poolConfig.MaxRetries = 10
poolConfig.RetryInterval = 2 * time.Second
poolConfig.HealthCheckInterval = 30 * time.Second

// Transport will automatically reconnect on connection failures
Error Recovery in Handlers
messageIntegration.SetMessageHandler(websocket.MessageHandlerFunc(func(ctx context.Context, message messages.Message) error {
    // Handle message processing errors gracefully
    if err := processMessage(message); err != nil {
        log.Printf("Failed to process message: %v", err)
        return err // Will be logged by the transport
    }
    return nil
}))
Circuit Breaker Pattern
// The transport includes built-in circuit breaker functionality
// Configure thresholds in SecurityConfig
securityConfig.EnableCircuitBreaker = true
securityConfig.CircuitBreakerThreshold = 10
securityConfig.CircuitBreakerTimeout = 30 * time.Second

Performance Optimization

High-Throughput Configuration
poolConfig := websocket.DefaultPoolConfig()
poolConfig.MinConnections = 10        // Pre-allocate connections
poolConfig.MaxConnections = 50        // Allow high concurrency
poolConfig.MessageBufferSize = 10000  // Large message buffer
poolConfig.HealthCheckInterval = 60 * time.Second // Reduce overhead

// Fast compression for large messages
poolConfig.CompressionConfig = &websocket.CompressionConfig{
    Enabled:              true,
    CompressionLevel:     1, // Fast compression
    CompressionThreshold: 1024,
}
Memory Management
// Configure memory limits
poolConfig.CompressionConfig.MaxMemoryUsage = 128 * 1024 * 1024 // 128MB
poolConfig.SecurityConfig.MaxMessageSize = 10 * 1024 * 1024     // 10MB per message
poolConfig.SecurityConfig.MessageBufferSize = 5000              // Limit buffered messages

Monitoring and Metrics

Transport Statistics
stats := transport.GetStats()
fmt.Printf("Events sent: %d\n", stats.EventsSent)
fmt.Printf("Events received: %d\n", stats.EventsReceived)
fmt.Printf("Bytes transferred: %d\n", stats.BytesTransferred)
fmt.Printf("Average latency: %v\n", stats.AverageLatency)
Connection Pool Health
poolStats := transport.pool.GetStats()
fmt.Printf("Active connections: %d\n", poolStats.ActiveConnections)
fmt.Printf("Total connections: %d\n", poolStats.TotalConnections)
fmt.Printf("Failed connections: %d\n", poolStats.FailedConnections)
Health Checks
if transport.pool.IsHealthy() {
    fmt.Println("Transport is healthy")
} else {
    fmt.Println("Transport has issues")
}

// Get detailed health status
healthStatus := transport.pool.GetHealthStatus()
for url, status := range healthStatus {
    fmt.Printf("Server %s: %v\n", url, status.Healthy)
}

Security

TLS Configuration
tlsConfig := &websocket.TLSConfig{
    InsecureSkipVerify: false,
    CertFile:          "/path/to/client.crt",
    KeyFile:           "/path/to/client.key",
    CAFile:            "/path/to/ca.crt",
    ServerName:        "secure-server.example.com",
}

poolConfig.TLSConfig = tlsConfig
Rate Limiting
securityConfig := &websocket.SecurityConfig{
    EnableRateLimiting: true,
    RateLimitRequests:  100,              // 100 requests
    RateLimitWindow:    time.Minute,      // per minute
    MaxMessageSize:     1024 * 1024,      // 1MB max message
    MaxConnectionsPerIP: 5,               // Per IP limit
}

poolConfig.SecurityConfig = securityConfig
DDoS Protection
securityConfig.EnableDDoSProtection = true
securityConfig.DDoSDetectionThreshold = 1000    // requests per window
securityConfig.DDoSBlockDuration = 10 * time.Minute

Best Practices

Connection Management
  1. Pre-allocate Connections: Set MinConnections to your expected base load
  2. Reasonable Limits: Don't set MaxConnections too high to avoid resource exhaustion
  3. Health Checks: Enable regular health checks for early problem detection
  4. Graceful Shutdown: Always call transport.Stop() and messageIntegration.Close()
Message Handling
  1. Handle Errors Gracefully: Always return appropriate errors from handlers
  2. Avoid Blocking: Keep message handlers fast and non-blocking
  3. Use Channels: Prefer channel-based communication for async processing
  4. Size Limits: Configure appropriate message size limits
Performance
  1. Buffer Sizes: Tune buffer sizes based on your message volume
  2. Compression: Enable compression for large messages
  3. Connection Pooling: Use multiple connections for high throughput
  4. Monitoring: Monitor transport statistics regularly
Security
  1. TLS: Always use TLS in production (wss:// URLs)
  2. Rate Limiting: Configure appropriate rate limits
  3. Message Validation: Validate message content and size
  4. Authentication: Implement proper authentication mechanisms

Troubleshooting

Common Issues
  1. Connection Failures: Check network connectivity and server availability
  2. High Latency: Monitor network conditions and server performance
  3. Memory Usage: Check message sizes and buffer configurations
  4. Rate Limiting: Verify rate limit settings match your usage patterns
Debug Logging
import "go.uber.org/zap"

// Enable debug logging
logger, _ := zap.NewDevelopment()
config.Logger = logger

// Transport will log detailed information about operations
Metrics Collection
// Implement custom metrics collection
type CustomMetricsCollector struct{}

func (c *CustomMetricsCollector) RecordEvent(eventType string, size int64, latency time.Duration) {
    // Send metrics to your monitoring system
}

// Set custom metrics collector
transport.SetMetricsCollector(&CustomMetricsCollector{})

Integration Examples

See the examples.go file for comprehensive examples including:

  • Basic usage patterns
  • Connection pooling
  • Streaming messages
  • Error handling
  • Advanced configuration
  • Performance optimization

API Reference

Core Types
  • Transport: Main WebSocket transport implementation
  • MessageIntegration: Message layer integration
  • TransportConfig: Transport configuration
  • PoolConfig: Connection pool configuration
  • MessageFormat: Message serialization format
Interfaces
  • MessageHandler: Handles incoming messages
  • CoreEventHandler: Handles incoming events
  • MessageStream: Streaming message interface

For complete API documentation, see the generated Go documentation.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTokenExpired     = errors.New("token is expired")
	ErrTokenNotYetValid = errors.New("token is not valid yet")
	ErrInvalidToken     = errors.New("invalid token")
	ErrEmptyToken       = errors.New("empty token")
	ErrInvalidIssuer    = errors.New("invalid issuer")
	ErrInvalidAudience  = errors.New("invalid audience")
	ErrMissingClaims    = errors.New("missing required claims")
)

Common JWT validation errors

Functions

func EventuallyTrue

func EventuallyTrue(condition func() bool, timeout time.Duration, interval time.Duration) bool

EventuallyTrue is a helper that checks a condition repeatedly

func IsRunningInCI

func IsRunningInCI() bool

IsRunningInCI detects if we're running in a CI environment

func NewProductionRateLimiter

func NewProductionRateLimiter() *rate.Limiter

NewProductionRateLimiter creates a rate limiter suitable for production use Allows 100 messages per second with burst of 10

func NewTestRateLimiter

func NewTestRateLimiter() *rate.Limiter

NewTestRateLimiter creates a rate limiter suitable for testing scenarios Allows 10,000 messages per second with burst of 1000 for high concurrency tests

func NewUnlimitedRateLimiter

func NewUnlimitedRateLimiter() *rate.Limiter

NewUnlimitedRateLimiter creates a rate limiter with no practical limits Useful for load testing where rate limiting is not the focus

func RunFastTest

func RunFastTest(t *testing.T, testFunc func(*MinimalTestHelper))

RunFastTest runs a light test with minimal overhead

func RunHeavyTest

func RunHeavyTest(t *testing.T, testFunc func(*MinimalTestHelper))

RunHeavyTest runs a resource-intensive test with proper sequencing

func RunMediumTest

func RunMediumTest(t *testing.T, testFunc func(*MinimalTestHelper))

RunMediumTest runs a medium-complexity test with moderate resources

func TestMain

func TestMain(m *testing.M)

TestMain sets up global test configuration to prevent hanging tests

func WithBudgetAwareScaling

func WithBudgetAwareScaling(testName string) (numGoroutines, operationsPerGoroutine int, shouldSkip bool, reason string)

WithBudgetAwareScaling provides budget-aware resource scaling for tests that don't use WithResourceControl This is useful for tests that want to participate in resource budgeting without full resource control

func WithReliableTimeout

func WithReliableTimeout(t *testing.T, timeout time.Duration, testFunc func(context.Context))

WithReliableTimeout wraps a test function with enhanced timeout, cleanup, and goroutine tracking

func WithResourceControl

func WithResourceControl(t *testing.T, testName string, testFunc func())

WithResourceControl wraps resource-intensive tests with resource management

func WithSequentialExecution

func WithSequentialExecution(t *testing.T, testName string, testFunc func())

WithSequentialExecution ensures that resource-intensive tests run one at a time This prevents resource contention when running the full test suite

func WithTimeout

func WithTimeout(t *testing.T, timeout time.Duration, testFunc func(context.Context))

WithTimeout executes a test function with a timeout and proper cleanup

Types

type AdaptiveOptimizer

type AdaptiveOptimizer struct {
	// contains filtered or unexported fields
}

AdaptiveOptimizer automatically adjusts settings based on current performance

func NewAdaptiveOptimizer

func NewAdaptiveOptimizer(manager *PerformanceManager) *AdaptiveOptimizer

NewAdaptiveOptimizer creates a new adaptive optimizer

func (*AdaptiveOptimizer) Disable

func (ao *AdaptiveOptimizer) Disable()

Disable disables adaptive optimization

func (*AdaptiveOptimizer) Enable

func (ao *AdaptiveOptimizer) Enable()

Enable enables adaptive optimization

func (*AdaptiveOptimizer) Start

func (ao *AdaptiveOptimizer) Start(ctx context.Context, wg *sync.WaitGroup)

Start starts the adaptive optimizer

func (*AdaptiveOptimizer) TriggerAdaptation

func (ao *AdaptiveOptimizer) TriggerAdaptation()

TriggerAdaptation manually triggers the adaptation process for testing

type AuthContext

type AuthContext struct {
	UserID      string                 `json:"user_id"`
	Username    string                 `json:"username"`
	Roles       []string               `json:"roles"`
	Permissions []string               `json:"permissions"`
	ExpiresAt   time.Time              `json:"expires_at"`
	Claims      map[string]interface{} `json:"claims"`
}

AuthContext contains authentication information

type BackpressureConfig

type BackpressureConfig struct {
	// EventChannelBuffer is the buffer size for event channel
	EventChannelBuffer int

	// MaxDroppedEvents is the maximum number of events that can be dropped before taking action
	MaxDroppedEvents int64

	// DropActionType defines what to do when max dropped events is reached
	DropActionType DropActionType

	// EnableBackpressureLogging enables detailed logging of backpressure events
	EnableBackpressureLogging bool

	// BackpressureThresholdPercent is the percentage at which to start applying backpressure (0-100)
	BackpressureThresholdPercent int

	// EnableChannelMonitoring enables monitoring of channel usage
	EnableChannelMonitoring bool

	// MonitoringInterval is the interval for channel monitoring
	MonitoringInterval time.Duration
}

BackpressureConfig configures backpressure behavior for WebSocket transport

func DefaultBackpressureConfig

func DefaultBackpressureConfig() *BackpressureConfig

DefaultBackpressureConfig returns default backpressure configuration for WebSocket

type BackpressureStats

type BackpressureStats struct {
	DroppedEvents        int64     `json:"dropped_events"`
	BackpressureActive   bool      `json:"backpressure_active"`
	EventChannelUsage    float64   `json:"event_channel_usage_percent"`
	LastDropTime         time.Time `json:"last_drop_time"`
	EventChannelCapacity int       `json:"event_channel_capacity"`
	ThresholdPercent     int       `json:"threshold_percent"`
}

BackpressureStats contains backpressure monitoring statistics

type BufferPool

type BufferPool struct {
	// contains filtered or unexported fields
}

BufferPool manages a pool of reusable buffers

func NewBufferPool

func NewBufferPool(poolSize, maxSize int) *BufferPool

NewBufferPool creates a new buffer pool

func (*BufferPool) Get

func (bp *BufferPool) Get() []byte

Get retrieves a buffer from the pool

func (*BufferPool) GetStats

func (bp *BufferPool) GetStats() map[string]int64

GetStats returns buffer pool statistics

func (*BufferPool) Put

func (bp *BufferPool) Put(buf []byte)

Put returns a buffer to the pool

func (*BufferPool) Reset

func (bp *BufferPool) Reset()

Reset clears all buffers from the pool for test isolation

type CompressedMessage

type CompressedMessage struct {
	Data             []byte
	OriginalSize     int64
	CompressedSize   int64
	CompressionRatio float64
	IsCompressed     bool
	Extension        string
}

CompressedMessage represents a compressed WebSocket message

type CompressionConfig

type CompressionConfig struct {
	// Enable compression
	Enabled bool `json:"enabled"`

	// Compression levels (0-9, where 0 is no compression, 9 is best compression)
	CompressionLevel int `json:"compression_level"`

	// Compression threshold - messages smaller than this won't be compressed
	CompressionThreshold int64 `json:"compression_threshold"`

	// Maximum compression ratio - reject messages that compress poorly
	MaxCompressionRatio float64 `json:"max_compression_ratio"`

	// Memory limits
	MaxMemoryUsage int64 `json:"max_memory_usage"`
	WindowBits     int   `json:"window_bits"` // LZ77 sliding window size
	MemLevel       int   `json:"mem_level"`   // Memory level for compression

	// Performance settings
	UsePooledCompressors bool `json:"use_pooled_compressors"`
	CompressorPoolSize   int  `json:"compressor_pool_size"`

	// Fallback settings
	FallbackEnabled   bool `json:"fallback_enabled"`    // Allow fallback to uncompressed
	AutoDetectSupport bool `json:"auto_detect_support"` // Auto-detect client support

	// Monitoring
	CollectStatistics  bool          `json:"collect_statistics"`
	StatisticsInterval time.Duration `json:"statistics_interval"`
}

CompressionConfig defines compression settings

func DefaultCompressionConfig

func DefaultCompressionConfig() *CompressionConfig

DefaultCompressionConfig returns default compression settings

type CompressionManager

type CompressionManager struct {
	// contains filtered or unexported fields
}

CompressionManager handles WebSocket message compression

func NewCompressionManager

func NewCompressionManager(config *CompressionConfig) *CompressionManager

NewCompressionManager creates a new compression manager

func (*CompressionManager) CompressMessage

func (cm *CompressionManager) CompressMessage(data []byte, messageType int) (*CompressedMessage, error)

CompressMessage compresses a message if appropriate

func (*CompressionManager) CreateUpgrader

func (cm *CompressionManager) CreateUpgrader(baseUpgrader *websocket.Upgrader) *websocket.Upgrader

CreateUpgrader creates an upgrader with compression support

func (*CompressionManager) DecompressMessage

func (cm *CompressionManager) DecompressMessage(data []byte, compressed bool) ([]byte, error)

DecompressMessage decompresses a message if needed

func (*CompressionManager) GetCompressionExtensions

func (cm *CompressionManager) GetCompressionExtensions() []string

GetCompressionExtensions returns supported compression extensions

func (*CompressionManager) GetCompressionRatio

func (cm *CompressionManager) GetCompressionRatio(data []byte) float64

GetCompressionRatio estimates compression ratio for data

func (*CompressionManager) GetStats

func (cm *CompressionManager) GetStats() *CompressionStats

GetStats returns compression statistics

func (*CompressionManager) IsCompressionBeneficial

func (cm *CompressionManager) IsCompressionBeneficial(data []byte) bool

IsCompressionBeneficial determines if compression would be beneficial

func (*CompressionManager) ResetStats

func (cm *CompressionManager) ResetStats()

ResetStats resets compression statistics

func (*CompressionManager) Shutdown

func (cm *CompressionManager) Shutdown()

Shutdown gracefully shuts down the compression manager

func (*CompressionManager) SupportsExtension

func (cm *CompressionManager) SupportsExtension(extension string) bool

SupportsExtension checks if an extension is supported

type CompressionMiddleware

type CompressionMiddleware struct {
	// contains filtered or unexported fields
}

CompressionMiddleware wraps a WebSocket connection with compression

func NewCompressionMiddleware

func NewCompressionMiddleware(conn *websocket.Conn, manager *CompressionManager) *CompressionMiddleware

NewCompressionMiddleware creates a new compression middleware

func (*CompressionMiddleware) ReadMessage

func (cm *CompressionMiddleware) ReadMessage() (messageType int, p []byte, err error)

ReadMessage reads a message with decompression

func (*CompressionMiddleware) WriteMessage

func (cm *CompressionMiddleware) WriteMessage(messageType int, data []byte) error

WriteMessage writes a message with compression

type CompressionStats

type CompressionStats struct {

	// Message counts
	TotalMessages        int64 `json:"total_messages"`
	CompressedMessages   int64 `json:"compressed_messages"`
	UncompressedMessages int64 `json:"uncompressed_messages"`
	FailedCompressions   int64 `json:"failed_compressions"`

	// Byte counts
	TotalBytesIn  int64 `json:"total_bytes_in"`
	TotalBytesOut int64 `json:"total_bytes_out"`
	BytesSaved    int64 `json:"bytes_saved"`

	// Performance metrics
	CompressionTime         time.Duration `json:"compression_time"`
	DecompressionTime       time.Duration `json:"decompression_time"`
	AverageCompressionRatio float64       `json:"average_compression_ratio"`

	// Error tracking
	CompressionErrors   int64 `json:"compression_errors"`
	DecompressionErrors int64 `json:"decompression_errors"`

	// Memory usage
	CurrentMemoryUsage int64 `json:"current_memory_usage"`
	PeakMemoryUsage    int64 `json:"peak_memory_usage"`
	// contains filtered or unexported fields
}

CompressionStats holds compression statistics

type ConcurrencyConfig

type ConcurrencyConfig struct {
	NumGoroutines        int
	OperationsPerRoutine int
	TimeoutScale         float64
	EnableSequential     bool
}

getConcurrencyConfig returns optimized concurrency configuration for tests

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection represents a managed WebSocket connection

func CreateIsolatedConnection

func CreateIsolatedConnection(t *testing.T, helper *TestCleanupHelper, config *ConnectionConfig) *Connection

CreateIsolatedConnection creates a connection with proper cleanup registration

func NewConnection

func NewConnection(config *ConnectionConfig) (*Connection, error)

NewConnection creates a new managed WebSocket connection

func NewConnectionWithContext

func NewConnectionWithContext(parentCtx context.Context, config *ConnectionConfig) (*Connection, error)

NewConnectionWithContext creates a new managed WebSocket connection with a parent context

func (*Connection) Close

func (c *Connection) Close() error

Close permanently closes the connection and stops all goroutines

func (*Connection) Connect

func (c *Connection) Connect(ctx context.Context) error

Connect establishes a WebSocket connection

func (*Connection) Disconnect

func (c *Connection) Disconnect() error

Disconnect closes the WebSocket connection

func (*Connection) ForceConnectionCheck

func (c *Connection) ForceConnectionCheck()

ForceConnectionCheck forces a connection check to detect disconnection

func (*Connection) GetHeartbeat

func (c *Connection) GetHeartbeat() *HeartbeatManager

GetHeartbeat returns the heartbeat manager for this connection

func (*Connection) GetLastConnected

func (c *Connection) GetLastConnected() time.Time

GetLastConnected returns the timestamp of the last successful connection

func (*Connection) GetMetrics

func (c *Connection) GetMetrics() ConnectionMetrics

GetMetrics returns a copy of the connection metrics

func (*Connection) GetReconnectAttempts

func (c *Connection) GetReconnectAttempts() int32

GetReconnectAttempts returns the current number of reconnection attempts

func (*Connection) GetURL

func (c *Connection) GetURL() string

GetURL returns the WebSocket URL

func (*Connection) IsConnected

func (c *Connection) IsConnected() bool

IsConnected returns true if the connection is currently connected

func (*Connection) IsReconnecting

func (c *Connection) IsReconnecting() bool

IsReconnecting returns true if the connection is currently reconnecting

func (*Connection) LastError

func (c *Connection) LastError() error

LastError returns the last error encountered

func (*Connection) SendMessage

func (c *Connection) SendMessage(ctx context.Context, message []byte) error

SendMessage sends a message through the WebSocket connection

func (*Connection) SendMessageSync

func (c *Connection) SendMessageSync(ctx context.Context, message []byte) error

SendMessageSync sends a message synchronously and waits for it to be processed

func (*Connection) SetOnConnect

func (c *Connection) SetOnConnect(handler func())

SetOnConnect sets the connect handler

func (*Connection) SetOnDisconnect

func (c *Connection) SetOnDisconnect(handler func(error))

SetOnDisconnect sets the disconnect handler

func (*Connection) SetOnError

func (c *Connection) SetOnError(handler func(error))

SetOnError sets the error handler

func (*Connection) SetOnMessage

func (c *Connection) SetOnMessage(handler func([]byte))

SetOnMessage sets the message handler

func (*Connection) StartAutoReconnect

func (c *Connection) StartAutoReconnect(ctx context.Context)

StartAutoReconnect starts automatic reconnection handling

func (*Connection) State

func (c *Connection) State() ConnectionState

State returns the current connection state

func (*Connection) WaitForMessages

func (c *Connection) WaitForMessages(ctx context.Context, expectedCount int64) error

WaitForMessages waits for all pending messages to be processed

type ConnectionBudget

type ConnectionBudget struct {
	// contains filtered or unexported fields
}

ConnectionBudget manages connection creation to prevent resource exhaustion

func NewConnectionBudget

func NewConnectionBudget(maxConnections int) *ConnectionBudget

NewConnectionBudget creates a connection budget manager

func (*ConnectionBudget) AcquireConnection

func (c *ConnectionBudget) AcquireConnection(ctx context.Context) error

AcquireConnection attempts to acquire a connection slot

func (*ConnectionBudget) GetActiveCount

func (c *ConnectionBudget) GetActiveCount() int

GetActiveCount returns the number of active connections

func (*ConnectionBudget) ReleaseConnection

func (c *ConnectionBudget) ReleaseConnection()

ReleaseConnection releases a connection slot

type ConnectionConfig

type ConnectionConfig struct {
	// URL is the WebSocket server URL
	URL string

	// MaxReconnectAttempts is the maximum number of reconnection attempts
	// Set to 0 for unlimited retries
	MaxReconnectAttempts int

	// InitialReconnectDelay is the initial delay between reconnection attempts
	InitialReconnectDelay time.Duration

	// MaxReconnectDelay is the maximum delay between reconnection attempts
	MaxReconnectDelay time.Duration

	// ReconnectBackoffMultiplier is the multiplier for exponential backoff
	ReconnectBackoffMultiplier float64

	// DialTimeout is the timeout for establishing the connection
	DialTimeout time.Duration

	// HandshakeTimeout is the timeout for the WebSocket handshake
	HandshakeTimeout time.Duration

	// ReadTimeout is the timeout for reading messages
	ReadTimeout time.Duration

	// WriteTimeout is the timeout for writing messages
	WriteTimeout time.Duration

	// PingPeriod is the period between ping messages
	PingPeriod time.Duration

	// PongWait is the timeout for receiving pong messages
	PongWait time.Duration

	// MaxMessageSize is the maximum size of messages
	MaxMessageSize int64

	// WriteBufferSize is the size of the write buffer
	WriteBufferSize int

	// ReadBufferSize is the size of the read buffer
	ReadBufferSize int

	// EnableCompression enables message compression
	EnableCompression bool

	// Headers are additional headers to send during handshake
	Headers map[string]string

	// RateLimiter limits the rate of outgoing messages
	RateLimiter *rate.Limiter

	// Logger is the logger instance
	Logger *zap.Logger

	// TLSClientConfig is the TLS configuration for secure WebSocket connections
	// If nil, default TLS settings are used
	TLSClientConfig *tls.Config
}

ConnectionConfig contains configuration for WebSocket connections

func DefaultConnectionConfig

func DefaultConnectionConfig() *ConnectionConfig

DefaultConnectionConfig returns a default configuration for WebSocket connections Uses configurable timeouts that adapt to test/production environments

type ConnectionMetrics

type ConnectionMetrics struct {
	ConnectAttempts    int64
	SuccessfulConnects int64
	Disconnects        int64
	ReconnectAttempts  int64
	MessagesReceived   int64
	MessagesSent       int64
	BytesReceived      int64
	BytesSent          int64
	Errors             int64
	LastConnected      time.Time
	LastDisconnected   time.Time
	// contains filtered or unexported fields
}

ConnectionMetrics tracks connection statistics

type ConnectionPool

type ConnectionPool struct {
	// contains filtered or unexported fields
}

ConnectionPool manages a pool of WebSocket connections with load balancing

func NewConnectionPool

func NewConnectionPool(config *PoolConfig) (*ConnectionPool, error)

NewConnectionPool creates a new connection pool

func (*ConnectionPool) FastStop

func (p *ConnectionPool) FastStop() error

FastStop provides immediate shutdown for test scenarios

func (*ConnectionPool) GetActiveConnectionCount

func (p *ConnectionPool) GetActiveConnectionCount() int

GetActiveConnectionCount returns the number of active connections

func (*ConnectionPool) GetConnection

func (p *ConnectionPool) GetConnection(ctx context.Context) (*Connection, error)

GetConnection returns a connection based on the load balancing strategy

func (*ConnectionPool) GetDetailedStatus

func (p *ConnectionPool) GetDetailedStatus() map[string]interface{}

GetDetailedStatus returns detailed status of all connections

func (*ConnectionPool) GetHealthyConnectionCount

func (p *ConnectionPool) GetHealthyConnectionCount() int

GetHealthyConnectionCount returns the number of healthy connections

func (*ConnectionPool) SendMessage

func (p *ConnectionPool) SendMessage(ctx context.Context, message []byte) error

SendMessage sends a message through the pool using load balancing

func (*ConnectionPool) SetMessageHandler

func (p *ConnectionPool) SetMessageHandler(handler func(data []byte))

SetMessageHandler sets the message handler for all connections

func (*ConnectionPool) SetOnConnectionStateChange

func (p *ConnectionPool) SetOnConnectionStateChange(handler func(connID string, state ConnectionState))

SetOnConnectionStateChange sets the connection state change handler

func (*ConnectionPool) SetOnHealthChange

func (p *ConnectionPool) SetOnHealthChange(handler func(connID string, healthy bool))

SetOnHealthChange sets the health change handler

func (*ConnectionPool) Start

func (p *ConnectionPool) Start(ctx context.Context) error

Start initializes the connection pool and establishes minimum connections

func (*ConnectionPool) Stats

func (p *ConnectionPool) Stats() PoolStats

GetStats returns a copy of the pool statistics

func (*ConnectionPool) Stop

func (p *ConnectionPool) Stop() error

Stop gracefully shuts down the connection pool

type ConnectionPoolManager

type ConnectionPoolManager struct {
	// contains filtered or unexported fields
}

ConnectionPoolManager manages connection slots for optimal resource usage

func NewConnectionPoolManager

func NewConnectionPoolManager(maxConnections int) *ConnectionPoolManager

NewConnectionPoolManager creates a new connection pool manager

func (*ConnectionPoolManager) AcquireSlot

func (cpm *ConnectionPoolManager) AcquireSlot(ctx context.Context) (*ConnectionSlot, error)

AcquireSlot acquires a connection slot

func (*ConnectionPoolManager) GetStats

func (cpm *ConnectionPoolManager) GetStats() map[string]int64

GetStats returns connection pool manager statistics

func (*ConnectionPoolManager) ReleaseSlot

func (cpm *ConnectionPoolManager) ReleaseSlot(slot *ConnectionSlot)

ReleaseSlot releases a connection slot

type ConnectionSlot

type ConnectionSlot struct {
	ID         string
	AcquiredAt time.Time
	InUse      bool
	// contains filtered or unexported fields
}

ConnectionSlot represents a connection slot

type ConnectionState

type ConnectionState int32

ConnectionState represents the current state of a WebSocket connection

const (
	// StateDisconnected indicates the connection is not active
	StateDisconnected ConnectionState = iota
	// StateConnecting indicates connection is being established
	StateConnecting
	// StateConnected indicates connection is active and healthy
	StateConnected
	// StateReconnecting indicates connection is being re-established
	StateReconnecting
	// StateClosing indicates connection is being closed
	StateClosing
	// StateClosed indicates connection is permanently closed
	StateClosed
)

func (ConnectionState) String

func (s ConnectionState) String() string

String returns the string representation of the connection state

type DropActionType

type DropActionType int

DropActionType defines actions to take when events are dropped

const (
	// DropActionLog logs dropped events but continues
	DropActionLog DropActionType = iota

	// DropActionReconnect attempts to reconnect
	DropActionReconnect

	// DropActionStop stops the transport
	DropActionStop

	// DropActionSlowDown applies flow control
	DropActionSlowDown
)

type EventHandler

type EventHandler func(ctx context.Context, event events.Event) error

EventHandler represents a function that handles events

type EventHandlerWrapper

type EventHandlerWrapper struct {
	ID      string
	Handler EventHandler
}

EventHandlerWrapper wraps an event handler with a unique ID

type GoroutineInfo

type GoroutineInfo struct {
	Name      string
	StartTime time.Time
	LastSeen  time.Time
	Function  string
	Context   context.Context
	Cancel    context.CancelFunc
}

GoroutineInfo tracks information about active goroutines

type GoroutineStats

type GoroutineStats struct {
	Name      string        `json:"name"`
	StartTime time.Time     `json:"start_time"`
	LastSeen  time.Time     `json:"last_seen"`
	Duration  time.Duration `json:"duration"`
	IdleTime  time.Duration `json:"idle_time"`
}

GoroutineStats contains goroutine monitoring statistics

type HealthChecker

type HealthChecker struct {
	// contains filtered or unexported fields
}

HealthChecker monitors connection health and manages pool size

func NewHealthChecker

func NewHealthChecker(pool *ConnectionPool, interval time.Duration) *HealthChecker

NewHealthChecker creates a new health checker

func (*HealthChecker) Start

func (h *HealthChecker) Start(ctx context.Context, wg *sync.WaitGroup)

Start begins health checking

type HeartbeatManager

type HeartbeatManager struct {
	// contains filtered or unexported fields
}

HeartbeatManager manages the ping/pong heartbeat mechanism for WebSocket connections

func NewHeartbeatManager

func NewHeartbeatManager(connection *Connection, pingPeriod, pongWait time.Duration) *HeartbeatManager

NewHeartbeatManager creates a new heartbeat manager

func (*HeartbeatManager) GetConnectionHealth

func (h *HeartbeatManager) GetConnectionHealth() float64

GetConnectionHealth returns a health score between 0 and 1 1 = perfectly healthy, 0 = completely unhealthy

func (*HeartbeatManager) GetDetailedHealthStatus

func (h *HeartbeatManager) GetDetailedHealthStatus() map[string]interface{}

GetDetailedHealthStatus returns detailed health information

func (*HeartbeatManager) GetLastPingTime

func (h *HeartbeatManager) GetLastPingTime() time.Time

GetLastPingTime returns the timestamp of the last sent ping

func (*HeartbeatManager) GetLastPongTime

func (h *HeartbeatManager) GetLastPongTime() time.Time

GetLastPongTime returns the timestamp of the last received pong

func (*HeartbeatManager) GetMissedPongCount

func (h *HeartbeatManager) GetMissedPongCount() int32

GetMissedPongCount returns the number of consecutive missed pongs

func (*HeartbeatManager) GetPingPeriod

func (h *HeartbeatManager) GetPingPeriod() time.Duration

GetPingPeriod returns the ping period

func (*HeartbeatManager) GetPongWait

func (h *HeartbeatManager) GetPongWait() time.Duration

GetPongWait returns the pong wait timeout

func (*HeartbeatManager) GetState

func (h *HeartbeatManager) GetState() HeartbeatState

GetState returns the current heartbeat state

func (*HeartbeatManager) GetStats

func (h *HeartbeatManager) GetStats() HeartbeatStats

GetStats returns a copy of the heartbeat statistics

func (*HeartbeatManager) IsHealthy

func (h *HeartbeatManager) IsHealthy() bool

IsHealthy returns true if the connection is considered healthy

func (*HeartbeatManager) OnPong

func (h *HeartbeatManager) OnPong()

OnPong is called when a pong message is received

func (*HeartbeatManager) Reset

func (h *HeartbeatManager) Reset()

Reset resets the heartbeat state

func (*HeartbeatManager) SetPingPeriod

func (h *HeartbeatManager) SetPingPeriod(period time.Duration)

SetPingPeriod updates the ping period

func (*HeartbeatManager) SetPongWait

func (h *HeartbeatManager) SetPongWait(wait time.Duration)

SetPongWait updates the pong wait timeout

func (*HeartbeatManager) Start

func (h *HeartbeatManager) Start(ctx context.Context)

Start begins the heartbeat mechanism

func (*HeartbeatManager) Stop

func (h *HeartbeatManager) Stop()

Stop stops the heartbeat mechanism with enhanced aggressive shutdown timeout

type HeartbeatState

type HeartbeatState int32

HeartbeatState represents the state of the heartbeat mechanism

const (
	// HeartbeatStopped indicates the heartbeat is not running
	HeartbeatStopped HeartbeatState = iota
	// HeartbeatStarting indicates the heartbeat is starting
	HeartbeatStarting
	// HeartbeatRunning indicates the heartbeat is active
	HeartbeatRunning
	// HeartbeatStopping indicates the heartbeat is stopping
	HeartbeatStopping
)

func (HeartbeatState) String

func (s HeartbeatState) String() string

String returns the string representation of the heartbeat state

type HeartbeatStats

type HeartbeatStats struct {
	PingsSent        int64
	PongsReceived    int64
	MissedPongs      int64
	HealthChecks     int64
	UnhealthyPeriods int64
	LastPingAt       time.Time
	LastPongAt       time.Time
	AverageRTT       time.Duration
	MinRTT           time.Duration
	MaxRTT           time.Duration
	// contains filtered or unexported fields
}

HeartbeatStats tracks heartbeat statistics

type ImprovedTransport

type ImprovedTransport struct {
	// contains filtered or unexported fields
}

ImprovedTransport implements the WebSocket transport with enhanced memory management

func NewImprovedTransport

func NewImprovedTransport(config *TransportConfig) (*ImprovedTransport, error)

NewImprovedTransport creates a new WebSocket transport with improved memory management

func (*ImprovedTransport) AddEventHandler

func (t *ImprovedTransport) AddEventHandler(eventType string, handler EventHandler) string

AddEventHandler adds an event handler for a specific event type and returns a handler ID

func (*ImprovedTransport) GetStats

func (t *ImprovedTransport) GetStats() TransportStats

GetStats returns a copy of the transport statistics

func (*ImprovedTransport) RemoveEventHandler

func (t *ImprovedTransport) RemoveEventHandler(eventType string, handlerID string) error

RemoveEventHandler removes an event handler by its ID

func (*ImprovedTransport) SendEvent

func (t *ImprovedTransport) SendEvent(ctx context.Context, event events.Event) error

SendEvent sends an event through the WebSocket transport

func (*ImprovedTransport) Start

func (t *ImprovedTransport) Start(ctx context.Context) error

Start initializes the WebSocket transport

func (*ImprovedTransport) Stop

func (t *ImprovedTransport) Stop() error

Stop gracefully shuts down the WebSocket transport

func (*ImprovedTransport) Subscribe

func (t *ImprovedTransport) Subscribe(ctx context.Context, eventTypes []string, handler EventHandler) (*Subscription, error)

Subscribe creates a subscription for specific event types

func (*ImprovedTransport) Unsubscribe

func (t *ImprovedTransport) Unsubscribe(subscriptionID string) error

Unsubscribe removes a subscription

type IsolatedTestRunner

type IsolatedTestRunner struct {
	// contains filtered or unexported fields
}

IsolatedTestRunner provides test isolation utilities

func NewIsolatedTestRunner

func NewIsolatedTestRunner(t *testing.T) *IsolatedTestRunner

NewIsolatedTestRunner creates a new isolated test runner

func (*IsolatedTestRunner) GetCleanupHelper

func (r *IsolatedTestRunner) GetCleanupHelper() *TestCleanupHelper

GetCleanupHelper returns the main cleanup helper

func (*IsolatedTestRunner) RunIsolated

func (r *IsolatedTestRunner) RunIsolated(name string, timeout time.Duration, testFunc func(*TestCleanupHelper))

RunIsolated runs a test function with proper isolation

type JWTConfigExample

type JWTConfigExample struct{}

JWTConfigExample shows how to configure JWT authentication for WebSocket transport

type JWTTokenValidator

type JWTTokenValidator struct {
	// contains filtered or unexported fields
}

JWTTokenValidator provides JWT token validation

func NewJWTTokenValidator

func NewJWTTokenValidator(secretKey []byte, issuer string) *JWTTokenValidator

NewJWTTokenValidator creates a new JWT token validator

func NewJWTTokenValidatorRSA

func NewJWTTokenValidatorRSA(publicKey *rsa.PublicKey, issuer, audience string) *JWTTokenValidator

NewJWTTokenValidatorRSA creates a new JWT token validator for RSA signatures

func NewJWTTokenValidatorWithOptions

func NewJWTTokenValidatorWithOptions(secretKey []byte, issuer, audience string, signingMethod jwt.SigningMethod) *JWTTokenValidator

NewJWTTokenValidatorWithOptions creates a new JWT token validator with full options

func (*JWTTokenValidator) ValidateToken

func (v *JWTTokenValidator) ValidateToken(ctx context.Context, tokenString string) (*AuthContext, error)

ValidateToken validates a JWT token

type LoadBalancingStrategy

type LoadBalancingStrategy int

LoadBalancingStrategy defines different load balancing strategies

const (
	// RoundRobin distributes requests evenly across connections
	RoundRobin LoadBalancingStrategy = iota
	// LeastConnections selects the connection with the fewest active requests
	LeastConnections
	// HealthBased selects the healthiest connection
	HealthBased
	// Random selects a random connection
	Random
)

func (LoadBalancingStrategy) String

func (s LoadBalancingStrategy) String() string

String returns the string representation of the load balancing strategy

type LoadTestServer

type LoadTestServer struct {
	// contains filtered or unexported fields
}

LoadTestServer provides a server for load testing scenarios

func NewLoadTestServer

func NewLoadTestServer(t testing.TB) *LoadTestServer

func (*LoadTestServer) Close

func (s *LoadTestServer) Close()

func (*LoadTestServer) GetStats

func (s *LoadTestServer) GetStats() (connections, messages, errors int64)

func (*LoadTestServer) SetDropRate

func (s *LoadTestServer) SetDropRate(rate float64)

func (*LoadTestServer) SetEchoMode

func (s *LoadTestServer) SetEchoMode(enabled bool)

func (*LoadTestServer) URL

func (s *LoadTestServer) URL() string

type MemoryManager

type MemoryManager struct {
	// contains filtered or unexported fields
}

MemoryManager manages memory usage and optimization

func NewMemoryManager

func NewMemoryManager(maxMemory int64) *MemoryManager

NewMemoryManager creates a new memory manager

func (*MemoryManager) AllocateBuffer

func (mm *MemoryManager) AllocateBuffer(size int) []byte

AllocateBuffer allocates a buffer with memory tracking

func (*MemoryManager) DeallocateBuffer

func (mm *MemoryManager) DeallocateBuffer(buf []byte)

DeallocateBuffer deallocates a buffer

func (*MemoryManager) GetCurrentUsage

func (mm *MemoryManager) GetCurrentUsage() int64

GetCurrentUsage returns current memory usage

func (*MemoryManager) GetMemoryPressure

func (mm *MemoryManager) GetMemoryPressure() float64

GetMemoryPressure returns the current memory pressure percentage

func (*MemoryManager) GetMonitoringInterval

func (mm *MemoryManager) GetMonitoringInterval() time.Duration

GetMonitoringInterval returns the current monitoring interval

func (*MemoryManager) GetStats

func (mm *MemoryManager) GetStats() map[string]int64

GetStats returns memory manager statistics

func (*MemoryManager) Start

func (mm *MemoryManager) Start(ctx context.Context, wg *sync.WaitGroup)

Start starts the memory manager with dynamic monitoring intervals

func (*MemoryManager) TriggerCheck

func (mm *MemoryManager) TriggerCheck()

TriggerCheck triggers an immediate memory check and interval update

type MessageBatcher

type MessageBatcher struct {
	// contains filtered or unexported fields
}

MessageBatcher batches messages for optimized transmission

func NewMessageBatcher

func NewMessageBatcher(batchSize int, batchTimeout time.Duration) *MessageBatcher

NewMessageBatcher creates a new message batcher

func (*MessageBatcher) AddMessage

func (mb *MessageBatcher) AddMessage(data []byte) error

AddMessage adds a message to the batch

func (*MessageBatcher) Close

func (mb *MessageBatcher) Close()

Close closes the message batcher and its channels

func (*MessageBatcher) GetBatch

func (mb *MessageBatcher) GetBatch() [][]byte

GetBatch gets a batch of messages

func (*MessageBatcher) GetStats

func (mb *MessageBatcher) GetStats() map[string]interface{}

GetStats returns batcher statistics

func (*MessageBatcher) Start

func (mb *MessageBatcher) Start(ctx context.Context, wg *sync.WaitGroup)

Start starts the message batcher

type MessageSerializer

type MessageSerializer interface {
	Serialize(event events.Event) ([]byte, error)
	Deserialize(data []byte) (events.Event, error)
}

MessageSerializer defines interface for message serialization

type MetricsCollector

type MetricsCollector struct {
	// contains filtered or unexported fields
}

MetricsCollector collects and tracks performance metrics

func NewMetricsCollector

func NewMetricsCollector(interval time.Duration) *MetricsCollector

NewMetricsCollector creates a new metrics collector

func (*MetricsCollector) GetMetrics

func (mc *MetricsCollector) GetMetrics() *PerformanceMetrics

GetMetrics returns a copy of current metrics

func (*MetricsCollector) Start

func (mc *MetricsCollector) Start(ctx context.Context, wg *sync.WaitGroup)

Start starts the metrics collector

func (*MetricsCollector) TrackConnectionTime

func (mc *MetricsCollector) TrackConnectionTime(duration time.Duration)

TrackConnectionTime tracks connection establishment time

func (*MetricsCollector) TrackError

func (mc *MetricsCollector) TrackError(errorType string)

TrackError tracks an error

func (*MetricsCollector) TrackMessageLatency

func (mc *MetricsCollector) TrackMessageLatency(latency time.Duration)

TrackMessageLatency tracks message processing latency

func (*MetricsCollector) TrackMessageSize

func (mc *MetricsCollector) TrackMessageSize(size int)

TrackMessageSize tracks message size

func (*MetricsCollector) TrackSerializationTime

func (mc *MetricsCollector) TrackSerializationTime(duration time.Duration)

TrackSerializationTime tracks serialization time

type MinimalTestHelper

type MinimalTestHelper struct {
	// contains filtered or unexported fields
}

MinimalTestHelper provides the most basic resource management for tests

func NewMinimalTestHelper

func NewMinimalTestHelper(t *testing.T) *MinimalTestHelper

NewMinimalTestHelper creates a minimal test helper

func (*MinimalTestHelper) Cleanup

func (h *MinimalTestHelper) Cleanup()

Cleanup performs fast cleanup with aggressive timeouts

func (*MinimalTestHelper) CreateConnection

func (h *MinimalTestHelper) CreateConnection(url string) *Connection

CreateConnection creates a minimal connection

func (*MinimalTestHelper) CreateServer

func (h *MinimalTestHelper) CreateServer() *MinimalTestServer

CreateServer creates a minimal test server

type MinimalTestServer

type MinimalTestServer struct {
	// contains filtered or unexported fields
}

MinimalTestServer provides the simplest possible WebSocket test server

func NewMinimalTestServer

func NewMinimalTestServer(t *testing.T) *MinimalTestServer

NewMinimalTestServer creates a minimal test server

func (*MinimalTestServer) Close

func (s *MinimalTestServer) Close()

Close closes the server

func (*MinimalTestServer) URL

func (s *MinimalTestServer) URL() string

URL returns the server's WebSocket URL

type MockEvent

type MockEvent struct {
	EventType   events.EventType
	Data        interface{}
	TimestampMs *int64 // Exported field for test initialization

	ValidationFunc func() error // Optional validation function for testing
	// contains filtered or unexported fields
}

MockEvent implements events.Event for websocket testing

func (*MockEvent) GetBaseEvent

func (e *MockEvent) GetBaseEvent() *events.BaseEvent

func (*MockEvent) ID

func (e *MockEvent) ID() string

func (*MockEvent) RunID

func (e *MockEvent) RunID() string

func (*MockEvent) SetTimestamp

func (e *MockEvent) SetTimestamp(timestamp int64)

func (*MockEvent) ThreadID

func (e *MockEvent) ThreadID() string

func (*MockEvent) Timestamp

func (e *MockEvent) Timestamp() *int64

func (*MockEvent) ToJSON

func (e *MockEvent) ToJSON() ([]byte, error)

func (*MockEvent) ToProtobuf

func (e *MockEvent) ToProtobuf() (*generated.Event, error)

func (*MockEvent) Type

func (e *MockEvent) Type() events.EventType

func (*MockEvent) Validate

func (e *MockEvent) Validate() error

type NoOpWSAuditLogger

type NoOpWSAuditLogger struct{}

NoOpWSAuditLogger provides a no-op audit logger for testing

func (*NoOpWSAuditLogger) LogSecurityEvent

func (l *NoOpWSAuditLogger) LogSecurityEvent(ctx context.Context, event *SecurityEvent) error

LogSecurityEvent logs a security event (no-op)

type OptimizedConnectionSlot

type OptimizedConnectionSlot struct {
	ID         string
	Connection interface{}
	InUse      int32 // Use atomic operations instead of mutex
	LastUsed   int64 // Unix timestamp
}

OptimizedConnectionSlot reduces allocations in connection management

type OptimizedTransport

type OptimizedTransport struct {
	*Transport
	// contains filtered or unexported fields
}

OptimizedTransport wraps the standard transport with performance optimizations

func NewOptimizedTransport

func NewOptimizedTransport(config *TransportConfig) (*OptimizedTransport, error)

NewOptimizedTransport creates a new optimized transport

func (*OptimizedTransport) AddEventHandlerOptimized

func (t *OptimizedTransport) AddEventHandlerOptimized(eventType string, handler EventHandler) string

AddEventHandlerOptimized adds an event handler with reduced allocations

func (*OptimizedTransport) RemoveEventHandlerOptimized

func (t *OptimizedTransport) RemoveEventHandlerOptimized(eventType string, handlerID string) error

RemoveEventHandlerOptimized removes a handler with minimal allocations

func (*OptimizedTransport) SubscribeOptimized

func (t *OptimizedTransport) SubscribeOptimized(ctx context.Context, eventTypes []string, handler EventHandler) (*Subscription, error)

SubscribeOptimized creates a subscription with reduced allocations

type PerfJSONSerializer

type PerfJSONSerializer struct{}

PerfJSONSerializer implements standard JSON serialization

func (*PerfJSONSerializer) Deserialize

func (js *PerfJSONSerializer) Deserialize(data []byte) (events.Event, error)

Deserialize deserializes JSON to an event

func (*PerfJSONSerializer) Serialize

func (js *PerfJSONSerializer) Serialize(event events.Event) ([]byte, error)

Serialize serializes an event to JSON

type PerfOptimizedJSONSerializer

type PerfOptimizedJSONSerializer struct {
	// contains filtered or unexported fields
}

PerfOptimizedJSONSerializer implements optimized JSON serialization

func (*PerfOptimizedJSONSerializer) Deserialize

func (ojs *PerfOptimizedJSONSerializer) Deserialize(data []byte) (events.Event, error)

Deserialize deserializes optimized JSON to an event

func (*PerfOptimizedJSONSerializer) Serialize

func (ojs *PerfOptimizedJSONSerializer) Serialize(event events.Event) ([]byte, error)

Serialize serializes an event to optimized JSON

type PerfProtobufSerializer

type PerfProtobufSerializer struct{}

PerfProtobufSerializer implements Protocol Buffers serialization

func (*PerfProtobufSerializer) Deserialize

func (ps *PerfProtobufSerializer) Deserialize(data []byte) (events.Event, error)

Deserialize deserializes Protocol Buffers to an event

func (*PerfProtobufSerializer) Serialize

func (ps *PerfProtobufSerializer) Serialize(event events.Event) ([]byte, error)

Serialize serializes an event to Protocol Buffers

type PerformanceConfig

type PerformanceConfig struct {
	// MaxConcurrentConnections is the maximum number of concurrent connections
	MaxConcurrentConnections int

	// MessageBatchSize is the number of messages to batch together
	MessageBatchSize int

	// MessageBatchTimeout is the timeout for message batching
	MessageBatchTimeout time.Duration

	// BufferPoolSize is the size of the buffer pool
	BufferPoolSize int

	// MaxBufferSize is the maximum size of individual buffers
	MaxBufferSize int

	// EnableZeroCopy enables zero-copy operations where possible
	EnableZeroCopy bool

	// EnableMemoryPooling enables memory pool management
	EnableMemoryPooling bool

	// EnableProfiling enables CPU and memory profiling
	EnableProfiling bool

	// ProfilingInterval is the interval for profiling snapshots
	ProfilingInterval time.Duration

	// MaxLatency is the maximum acceptable latency
	MaxLatency time.Duration

	// MaxMemoryUsage is the maximum memory usage for the given connection count
	MaxMemoryUsage int64

	// EnableMetrics enables performance metrics collection
	EnableMetrics bool

	// MetricsInterval is the interval for metrics collection
	MetricsInterval time.Duration

	// MessageSerializerType defines the serialization method to use
	MessageSerializerType SerializerType

	// Logger is the logger instance
	Logger *zap.Logger
}

PerformanceConfig contains configuration for performance optimizations

func DefaultPerformanceConfig

func DefaultPerformanceConfig() *PerformanceConfig

DefaultPerformanceConfig returns a default performance configuration Uses configurable timeouts that adapt to test/production environments

func HighConcurrencyPerformanceConfig

func HighConcurrencyPerformanceConfig() *PerformanceConfig

HighConcurrencyPerformanceConfig returns a performance configuration optimized for high concurrency testing Uses configurable timeouts that adapt to test/production environments

type PerformanceManager

type PerformanceManager struct {
	// contains filtered or unexported fields
}

PerformanceManager manages performance optimizations for WebSocket transport

func NewPerformanceManager

func NewPerformanceManager(config *PerformanceConfig) (*PerformanceManager, error)

NewPerformanceManager creates a new performance manager

func (*PerformanceManager) BatchMessage

func (pm *PerformanceManager) BatchMessage(data []byte) error

BatchMessage adds a message to the batch for optimized transmission

func (*PerformanceManager) FastStop

func (pm *PerformanceManager) FastStop() error

FastStop provides immediate shutdown for test scenarios

func (*PerformanceManager) GetBuffer

func (pm *PerformanceManager) GetBuffer() []byte

GetBuffer gets a buffer from the pool

func (*PerformanceManager) GetConnectionSlot

func (pm *PerformanceManager) GetConnectionSlot(ctx context.Context) (*ConnectionSlot, error)

GetConnectionSlot acquires a connection slot

func (*PerformanceManager) GetMemoryUsage

func (pm *PerformanceManager) GetMemoryUsage() int64

GetMemoryUsage returns current memory usage

func (*PerformanceManager) GetMetrics

func (pm *PerformanceManager) GetMetrics() *PerformanceMetrics

GetMetrics returns performance metrics

func (*PerformanceManager) OptimizeMessage

func (pm *PerformanceManager) OptimizeMessage(event events.Event) ([]byte, error)

OptimizeMessage optimizes a message for transmission

func (*PerformanceManager) PutBuffer

func (pm *PerformanceManager) PutBuffer(buf []byte)

PutBuffer returns a buffer to the pool

func (*PerformanceManager) ReleaseConnectionSlot

func (pm *PerformanceManager) ReleaseConnectionSlot(slot *ConnectionSlot)

ReleaseConnectionSlot releases a connection slot

func (*PerformanceManager) Start

func (pm *PerformanceManager) Start(ctx context.Context) error

Start initializes the performance manager

func (*PerformanceManager) Stop

func (pm *PerformanceManager) Stop() error

Stop gracefully shuts down the performance manager

type PerformanceMetrics

type PerformanceMetrics struct {
	// Connection metrics
	ActiveConnections    int64
	TotalConnections     int64
	ConnectionsPerSecond float64
	AvgConnectionTime    time.Duration

	// Message metrics
	MessagesPerSecond float64
	MessagesSent      int64
	MessagesReceived  int64
	MessagesFailures  int64
	AvgMessageSize    float64

	// Latency metrics
	AvgLatency time.Duration
	MinLatency time.Duration
	MaxLatency time.Duration
	P95Latency time.Duration
	P99Latency time.Duration

	// Throughput metrics
	BytesPerSecond     float64
	TotalBytesSent     int64
	TotalBytesReceived int64

	// Memory metrics
	MemoryUsage     int64
	BufferPoolUsage int64
	GCPauses        int64
	AvgGCPause      time.Duration

	// Serialization metrics
	SerializationTime     time.Duration
	SerializationPerSec   float64
	SerializationFailures int64

	// Error metrics
	ErrorRate        float64
	TotalErrors      int64
	ConnectionErrors int64
	TimeoutErrors    int64

	// System metrics
	CPUUsage       float64
	GoroutineCount int64
	HeapSize       int64
	StackSize      int64

	// Timestamps
	StartTime  time.Time
	LastUpdate time.Time
	Uptime     time.Duration
}

PerformanceMetrics contains various performance metrics

type PerformanceOptimizer

type PerformanceOptimizer struct {
	// contains filtered or unexported fields
}

PerformanceOptimizer provides high-level optimization methods

func NewPerformanceOptimizer

func NewPerformanceOptimizer(manager *PerformanceManager) *PerformanceOptimizer

NewPerformanceOptimizer creates a new performance optimizer

func (*PerformanceOptimizer) OptimizeForLatency

func (po *PerformanceOptimizer) OptimizeForLatency()

OptimizeForLatency optimizes configuration for minimum latency

func (*PerformanceOptimizer) OptimizeForMemory

func (po *PerformanceOptimizer) OptimizeForMemory()

OptimizeForMemory optimizes configuration for minimum memory usage

func (*PerformanceOptimizer) OptimizeForThroughput

func (po *PerformanceOptimizer) OptimizeForThroughput()

OptimizeForThroughput optimizes configuration for maximum throughput

type PoolConfig

type PoolConfig struct {
	// MinConnections is the minimum number of connections to maintain
	MinConnections int

	// MaxConnections is the maximum number of connections allowed
	MaxConnections int

	// ConnectionTimeout is the timeout for establishing connections
	ConnectionTimeout time.Duration

	// HealthCheckInterval is the interval for health checks
	HealthCheckInterval time.Duration

	// IdleTimeout is the timeout for idle connections
	IdleTimeout time.Duration

	// MaxIdleConnections is the maximum number of idle connections
	MaxIdleConnections int

	// LoadBalancingStrategy defines how connections are selected
	LoadBalancingStrategy LoadBalancingStrategy

	// ConnectionTemplate is the template configuration for new connections
	ConnectionTemplate *ConnectionConfig

	// URLs are the WebSocket URLs to connect to
	URLs []string

	// Logger is the logger instance
	Logger *zap.Logger
}

PoolConfig contains configuration for the connection pool

func DefaultPoolConfig

func DefaultPoolConfig() *PoolConfig

DefaultPoolConfig returns a default configuration for the connection pool

type PoolStats

type PoolStats struct {
	TotalConnections     int64
	ActiveConnections    int64
	IdleConnections      int64
	HealthyConnections   int64
	UnhealthyConnections int64
	TotalRequests        int64
	FailedRequests       int64
	TotalBytesReceived   int64
	TotalBytesSent       int64
	AverageResponseTime  time.Duration
	// contains filtered or unexported fields
}

PoolStats tracks connection pool statistics

type Profiler

type Profiler struct {
	// contains filtered or unexported fields
}

Profiler handles CPU and memory profiling

func NewProfiler

func NewProfiler(interval time.Duration) *Profiler

NewProfiler creates a new profiler

func (*Profiler) Disable

func (p *Profiler) Disable()

Disable disables profiling

func (*Profiler) Enable

func (p *Profiler) Enable()

Enable enables profiling

func (*Profiler) GetProfilingData

func (p *Profiler) GetProfilingData() map[string]interface{}

GetProfilingData returns current profiling data

func (*Profiler) Start

func (p *Profiler) Start(ctx context.Context, wg *sync.WaitGroup)

Start starts the profiler

type ReliableConnectionTester

type ReliableConnectionTester struct {
	// contains filtered or unexported fields
}

ReliableConnectionTester provides utilities for testing WebSocket connections reliably

func NewReliableConnectionTester

func NewReliableConnectionTester(t *testing.T) *ReliableConnectionTester

NewReliableConnectionTester creates a new connection tester

func (*ReliableConnectionTester) Cleanup

func (rt *ReliableConnectionTester) Cleanup()

Cleanup cleans up test resources

func (*ReliableConnectionTester) CreateConnection

func (rt *ReliableConnectionTester) CreateConnection() *Connection

CreateConnection creates a new connection with the test configuration

func (*ReliableConnectionTester) TestConcurrentConnections

func (rt *ReliableConnectionTester) TestConcurrentConnections(numConnections int, testFunc func(int, *Connection))

TestConcurrentConnections tests multiple connections concurrently

func (*ReliableConnectionTester) TestConnection

func (rt *ReliableConnectionTester) TestConnection(testFunc func(*Connection))

TestConnection tests a connection with retry logic

type ReliableMessageTester

type ReliableMessageTester struct {
	// contains filtered or unexported fields
}

ReliableMessageTester helps test message sending/receiving reliably

func NewReliableMessageTester

func NewReliableMessageTester(conn *Connection) *ReliableMessageTester

NewReliableMessageTester creates a new message tester

func (*ReliableMessageTester) SendAndVerify

func (mt *ReliableMessageTester) SendAndVerify(ctx context.Context, message []byte, timeout time.Duration) error

SendAndVerify sends a message and verifies it's echoed back

func (*ReliableMessageTester) Stop

func (mt *ReliableMessageTester) Stop()

Stop stops the message tester

type ReliableTestServer

type ReliableTestServer struct {
	// contains filtered or unexported fields
}

ReliableTestServer provides a WebSocket server designed for reliable testing

func CreateIsolatedServer

func CreateIsolatedServer(t *testing.T, helper *TestCleanupHelper) *ReliableTestServer

CreateIsolatedServer creates a server with proper cleanup registration

func NewReliableTestServer

func NewReliableTestServer(t *testing.T) *ReliableTestServer

NewReliableTestServer creates a new reliable test server

func (*ReliableTestServer) Close

func (s *ReliableTestServer) Close()

Close gracefully shuts down the server

func (*ReliableTestServer) GetConnectionCount

func (s *ReliableTestServer) GetConnectionCount() int

GetConnectionCount returns the current number of active connections

func (*ReliableTestServer) GetStats

func (s *ReliableTestServer) GetStats() (connections, messages, errors int64)

GetStats returns current server statistics

func (*ReliableTestServer) SetDelay

func (s *ReliableTestServer) SetDelay(delay time.Duration)

SetDelay sets artificial delay for message processing

func (*ReliableTestServer) SetEchoMode

func (s *ReliableTestServer) SetEchoMode(enabled bool)

SetEchoMode enables or disables echo mode

func (*ReliableTestServer) URL

func (s *ReliableTestServer) URL() string

URL returns the WebSocket URL

type ResourceCleanupConfig

type ResourceCleanupConfig struct {
	// EnableGoroutineTracking enables tracking of goroutines
	EnableGoroutineTracking bool

	// CleanupInterval is the interval for resource cleanup
	CleanupInterval time.Duration

	// MaxGoroutineIdleTime is the maximum time a goroutine can be idle before cleanup
	MaxGoroutineIdleTime time.Duration

	// EnableResourceMonitoring enables monitoring of resource usage
	EnableResourceMonitoring bool
}

ResourceCleanupConfig configures resource cleanup behavior

func DefaultResourceCleanupConfig

func DefaultResourceCleanupConfig() *ResourceCleanupConfig

DefaultResourceCleanupConfig returns default resource cleanup configuration

type ResourceLimitedTest

type ResourceLimitedTest struct {
	// contains filtered or unexported fields
}

ResourceLimitedTest manages test resource consumption to prevent system exhaustion

func DefaultResourceLimits

func DefaultResourceLimits() *ResourceLimitedTest

DefaultResourceLimits returns reasonable limits for websocket tests

func NewResourceLimitedTest

func NewResourceLimitedTest(maxGoroutines int, maxMemoryMB int64) *ResourceLimitedTest

NewResourceLimitedTest creates a resource-limited test manager

func (*ResourceLimitedTest) RunWithLimits

func (r *ResourceLimitedTest) RunWithLimits(t *testing.T, name string, testFunc func(t *testing.T))

RunWithLimits executes a test function with resource limits

type SafeConnection

type SafeConnection struct {
	// contains filtered or unexported fields
}

SafeConnection wraps a WebSocket connection with additional safety checks

func NewSafeConnection

func NewSafeConnection(conn *websocket.Conn) *SafeConnection

NewSafeConnection creates a new safe connection wrapper

func (*SafeConnection) Close

func (sc *SafeConnection) Close() error

Close closes the connection safely

func (*SafeConnection) IsClosed

func (sc *SafeConnection) IsClosed() bool

IsClosed returns true if the connection is closed

func (*SafeConnection) ReadMessage

func (sc *SafeConnection) ReadMessage() (messageType int, p []byte, err error)

ReadMessage safely reads a message from the connection

func (*SafeConnection) SetReadDeadline

func (sc *SafeConnection) SetReadDeadline(t time.Time) error

SetReadDeadline sets the read deadline

func (*SafeConnection) SetWriteDeadline

func (sc *SafeConnection) SetWriteDeadline(t time.Time) error

SetWriteDeadline sets the write deadline

func (*SafeConnection) WriteMessage

func (sc *SafeConnection) WriteMessage(messageType int, data []byte) error

WriteMessage safely writes a message to the connection

type SecureConnection

type SecureConnection struct {
	// contains filtered or unexported fields
}

SecureConnection represents a secured WebSocket connection

func (*SecureConnection) ValidateMessage

func (sc *SecureConnection) ValidateMessage(messageType int, data []byte) error

ValidateMessage validates an incoming message

type SecurityConfig

type SecurityConfig struct {
	// Authentication
	RequireAuth    bool           `json:"require_auth"`
	AuthTimeout    time.Duration  `json:"auth_timeout"`
	TokenValidator TokenValidator `json:"-"`

	// Origin validation
	AllowedOrigins    []string `json:"allowed_origins"`
	StrictOriginCheck bool     `json:"strict_origin_check"`

	// Subprotocol negotiation
	SupportedProtocols []string `json:"supported_protocols"`
	RequireSubprotocol bool     `json:"require_subprotocol"`

	// Rate limiting
	GlobalRateLimit float64 `json:"global_rate_limit"` // requests per second
	ClientRateLimit float64 `json:"client_rate_limit"` // requests per second per client
	ClientBurstSize int     `json:"client_burst_size"` // burst size for client rate limiting
	MaxConnections  int     `json:"max_connections"`   // maximum concurrent connections

	// TLS/SSL settings
	TLSConfig     *tls.Config `json:"-"`
	RequireTLS    bool        `json:"require_tls"`
	MinTLSVersion uint16      `json:"min_tls_version"`
	CertFile      string      `json:"cert_file"`
	KeyFile       string      `json:"key_file"`

	// Attack protection
	MaxMessageSize int64         `json:"max_message_size"` // maximum message size in bytes
	MaxFrameSize   int64         `json:"max_frame_size"`   // maximum frame size in bytes
	ReadDeadline   time.Duration `json:"read_deadline"`    // read timeout
	WriteDeadline  time.Duration `json:"write_deadline"`   // write timeout
	PingInterval   time.Duration `json:"ping_interval"`    // ping interval for keepalive
	PongTimeout    time.Duration `json:"pong_timeout"`     // pong timeout

	// Audit logging
	AuditLogger       WSAuditLogger `json:"-"`
	LogConnections    bool          `json:"log_connections"`
	LogMessages       bool          `json:"log_messages"`
	LogSecurityEvents bool          `json:"log_security_events"`
}

SecurityConfig defines WebSocket security configuration

func DefaultSecurityConfig

func DefaultSecurityConfig() *SecurityConfig

DefaultSecurityConfig returns secure default configuration Uses configurable timeouts that adapt to test/production environments

func ExampleHMACConfig

func ExampleHMACConfig() (*SecurityConfig, error)

ExampleHMACConfig demonstrates configuring JWT validation with HMAC (symmetric key)

func ExampleRSAConfig

func ExampleRSAConfig() (*SecurityConfig, error)

ExampleRSAConfig demonstrates configuring JWT validation with RSA (asymmetric key)

type SecurityEvent

type SecurityEvent struct {
	Type      string                 `json:"type"`
	Timestamp time.Time              `json:"timestamp"`
	ClientIP  string                 `json:"client_ip"`
	UserAgent string                 `json:"user_agent"`
	UserID    string                 `json:"user_id,omitempty"`
	Details   map[string]interface{} `json:"details,omitempty"`
	Severity  string                 `json:"severity"`
	Message   string                 `json:"message"`
}

SecurityEvent represents a security-related event

type SecurityManager

type SecurityManager struct {
	// contains filtered or unexported fields
}

SecurityManager handles WebSocket security enforcement

func NewSecurityManager

func NewSecurityManager(config *SecurityConfig) *SecurityManager

NewSecurityManager creates a new WebSocket security manager

func (*SecurityManager) CreateUpgrader

func (sm *SecurityManager) CreateUpgrader() *websocket.Upgrader

CreateUpgrader creates a configured WebSocket upgrader

func (*SecurityManager) GetStats

func (sm *SecurityManager) GetStats() map[string]interface{}

GetStats returns security statistics

func (*SecurityManager) SecureConnection

func (sm *SecurityManager) SecureConnection(conn *websocket.Conn, authContext *AuthContext, r *http.Request) *SecureConnection

SecureConnection wraps a WebSocket connection with security features

func (*SecurityManager) Shutdown

func (sm *SecurityManager) Shutdown()

Shutdown gracefully shuts down the security manager

func (*SecurityManager) ValidateUpgrade

func (sm *SecurityManager) ValidateUpgrade(w http.ResponseWriter, r *http.Request) (*AuthContext, error)

ValidateUpgrade validates a WebSocket upgrade request

type SequencedTestRunner

type SequencedTestRunner struct {
	// contains filtered or unexported fields
}

SequencedTestRunner manages test execution to prevent resource conflicts

func NewSequencedTestRunner

func NewSequencedTestRunner(t *testing.T, category TestCategory, timeout time.Duration) *SequencedTestRunner

NewSequencedTestRunner creates a test runner with proper sequencing

func (*SequencedTestRunner) Run

func (r *SequencedTestRunner) Run(testFunc func(*MinimalTestHelper))

Run executes the test with proper sequencing and resource management

type SerializerFactory

type SerializerFactory struct {
	// contains filtered or unexported fields
}

SerializerFactory creates message serializers

func NewSerializerFactory

func NewSerializerFactory(serializerType SerializerType) *SerializerFactory

NewSerializerFactory creates a new serializer factory

func (*SerializerFactory) GetSerializer

func (sf *SerializerFactory) GetSerializer() MessageSerializer

GetSerializer gets a serializer from the pool

func (*SerializerFactory) PutSerializer

func (sf *SerializerFactory) PutSerializer(serializer MessageSerializer)

PutSerializer returns a serializer to the pool

type SerializerType

type SerializerType int

SerializerType defines different serialization methods

const (
	// JSONSerializer uses standard JSON serialization
	JSONSerializer SerializerType = iota
	// OptimizedJSONSerializer uses optimized JSON serialization
	OptimizedJSONSerializer
	// ProtobufSerializer uses Protocol Buffers serialization
	ProtobufSerializer
)

func (SerializerType) String

func (s SerializerType) String() string

String returns the string representation of the serializer type

type StaleConnectionError

type StaleConnectionError struct {
	Generation int64
}

StaleConnectionError indicates that a connection reference has become stale

func (*StaleConnectionError) Error

func (e *StaleConnectionError) Error() string

type Subscription

type Subscription struct {
	ID          string
	EventTypes  []string
	Handler     EventHandler
	HandlerIDs  []string // Track handler IDs for reliable removal
	Context     context.Context
	Cancel      context.CancelFunc
	CreatedAt   time.Time
	LastEventAt time.Time
	EventCount  int64
	// contains filtered or unexported fields
}

Subscription represents an event subscription

type TestCategory

type TestCategory int

TestCategory defines the resource intensity of a test

const (
	LightTest  TestCategory = iota // Basic unit tests
	MediumTest                     // Integration tests
	HeavyTest                      // Load/performance tests
)

type TestCleanupHelper

type TestCleanupHelper struct {
	// contains filtered or unexported fields
}

TestCleanupHelper provides aggressive cleanup utilities for tests

func NewTestCleanupHelper

func NewTestCleanupHelper(t *testing.T) *TestCleanupHelper

NewTestCleanupHelper creates a new cleanup helper

func (*TestCleanupHelper) CleanupAll

func (h *TestCleanupHelper) CleanupAll()

CleanupAll performs aggressive cleanup of all registered resources with enhanced goroutine termination

func (*TestCleanupHelper) RegisterCleanupFunc

func (h *TestCleanupHelper) RegisterCleanupFunc(f func() error)

RegisterCleanupFunc registers a custom cleanup function

func (*TestCleanupHelper) RegisterConnection

func (h *TestCleanupHelper) RegisterConnection(conn *Connection)

RegisterConnection registers a connection for cleanup

func (*TestCleanupHelper) RegisterServer

func (h *TestCleanupHelper) RegisterServer(server *ReliableTestServer)

RegisterServer registers a server for cleanup

func (*TestCleanupHelper) RegisterTransport

func (h *TestCleanupHelper) RegisterTransport(transport *Transport)

RegisterTransport registers a transport for cleanup

type TestConfig

type TestConfig struct {
	ShortTest  time.Duration
	MediumTest time.Duration
	LongTest   time.Duration
}

TestConfig defines test timeout configurations

func FastTestConfig

func FastTestConfig() TestConfig

FastTestConfig returns optimized timeout configurations for faster test execution

type TestConnectionManager

type TestConnectionManager struct {
	// contains filtered or unexported fields
}

TestConnectionManager helps manage connections in tests to prevent leaks

func NewTestConnectionManager

func NewTestConnectionManager() *TestConnectionManager

NewTestConnectionManager creates a new connection manager for tests

func (*TestConnectionManager) AddConnection

func (m *TestConnectionManager) AddConnection(conn *Connection)

AddConnection adds a connection to be managed

func (*TestConnectionManager) AddServer

func (m *TestConnectionManager) AddServer(server *ReliableTestServer)

AddServer adds a test server to be managed

func (*TestConnectionManager) AddTransport

func (m *TestConnectionManager) AddTransport(transport *Transport)

AddTransport adds a transport to be managed

func (*TestConnectionManager) CleanupAll

func (m *TestConnectionManager) CleanupAll(t *testing.T)

CleanupAll closes all managed resources with timeout

type TestResourceManager

type TestResourceManager struct {
	// contains filtered or unexported fields
}

TestResourceManager provides centralized resource management for tests to prevent cumulative resource exhaustion when running the full test suite

func (*TestResourceManager) AcquireBudget

func (trm *TestResourceManager) AcquireBudget(testName string, requestedGoroutines int64) bool

AcquireBudget attempts to acquire a portion of the global goroutine budget

func (*TestResourceManager) AcquireHeavyTestSlot

func (trm *TestResourceManager) AcquireHeavyTestSlot(t *testing.T, testName string) func()

AcquireHeavyTestSlot acquires a slot for resource-intensive tests This prevents too many heavy tests from running simultaneously

func (*TestResourceManager) GetBudgetStatus

func (trm *TestResourceManager) GetBudgetStatus() (active, max int64, utilizationPercent float64)

GetBudgetStatus returns current budget usage statistics

func (*TestResourceManager) ReleaseBudget

func (trm *TestResourceManager) ReleaseBudget(testName string)

ReleaseBudget releases the goroutine budget for a test

type TokenValidator

type TokenValidator interface {
	ValidateToken(ctx context.Context, token string) (*AuthContext, error)
}

TokenValidator interface for authentication token validation

func ExampleCustomValidation

func ExampleCustomValidation() TokenValidator

ExampleCustomValidation demonstrates custom JWT validation with additional claims

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport implements the WebSocket transport for the AG-UI protocol

func CreateIsolatedTransport

func CreateIsolatedTransport(t *testing.T, helper *TestCleanupHelper, config *TransportConfig) *Transport

CreateIsolatedTransport creates a transport with proper cleanup registration

func NewTransport

func NewTransport(config *TransportConfig) (*Transport, error)

NewTransport creates a new WebSocket transport

func (*Transport) AddEventHandler

func (t *Transport) AddEventHandler(eventType string, handler EventHandler) string

AddEventHandler adds an event handler for a specific event type and returns a handler ID

func (*Transport) CleanupEventHandlers

func (t *Transport) CleanupEventHandlers()

CleanupEventHandlers performs memory-pressure aware cleanup of event handlers

func (*Transport) Close

func (t *Transport) Close(ctx context.Context) error

Close closes the transport and releases all resources

func (*Transport) EnableAdaptiveOptimization

func (t *Transport) EnableAdaptiveOptimization()

EnableAdaptiveOptimization enables adaptive performance optimization

func (*Transport) FastStop

func (t *Transport) FastStop() error

FastStop provides immediate shutdown for test scenarios

func (*Transport) GetActiveConnectionCount

func (t *Transport) GetActiveConnectionCount() int

GetActiveConnectionCount returns the number of active connections

func (*Transport) GetBackpressureStats

func (t *Transport) GetBackpressureStats() BackpressureStats

GetBackpressureStats returns current backpressure statistics

func (*Transport) GetConnectionPoolStats

func (t *Transport) GetConnectionPoolStats() PoolStats

GetConnectionPoolStats returns the connection pool statistics

func (*Transport) GetDetailedStatus

func (t *Transport) GetDetailedStatus() map[string]interface{}

GetDetailedStatus returns detailed status information

func (*Transport) GetEventHandlerCount

func (t *Transport) GetEventHandlerCount() int

GetEventHandlerCount returns the number of event handlers

func (*Transport) GetEventHandlerStats

func (t *Transport) GetEventHandlerStats() map[string]interface{}

GetEventHandlerStats returns statistics about event handlers for memory monitoring

func (*Transport) GetGoroutineStats

func (t *Transport) GetGoroutineStats() map[string]GoroutineStats

GetGoroutineStats returns current goroutine statistics

func (*Transport) GetHealthyConnectionCount

func (t *Transport) GetHealthyConnectionCount() int

GetHealthyConnectionCount returns the number of healthy connections

func (*Transport) GetMemoryUsage

func (t *Transport) GetMemoryUsage() int64

GetMemoryUsage returns current memory usage

func (*Transport) GetPerformanceMetrics

func (t *Transport) GetPerformanceMetrics() *PerformanceMetrics

GetPerformanceMetrics returns current performance metrics

func (*Transport) GetSubscription

func (t *Transport) GetSubscription(subscriptionID string) (*Subscription, error)

GetSubscription returns a subscription by ID

func (*Transport) IsConnected

func (t *Transport) IsConnected() bool

IsConnected returns true if the transport has healthy connections

func (*Transport) ListSubscriptions

func (t *Transport) ListSubscriptions() []*Subscription

ListSubscriptions returns all active subscriptions

func (*Transport) OnMemoryPressure

func (t *Transport) OnMemoryPressure(level int)

OnMemoryPressure handles memory pressure events by cleaning up handlers

func (*Transport) OptimizeForLatency

func (t *Transport) OptimizeForLatency()

OptimizeForLatency optimizes the transport for minimum latency

func (*Transport) OptimizeForMemory

func (t *Transport) OptimizeForMemory()

OptimizeForMemory optimizes the transport for minimum memory usage

func (*Transport) OptimizeForThroughput

func (t *Transport) OptimizeForThroughput()

OptimizeForThroughput optimizes the transport for maximum throughput

func (*Transport) Ping

func (t *Transport) Ping(ctx context.Context) error

Ping sends a ping through all connections

func (*Transport) RegisterCleanupFunc

func (t *Transport) RegisterCleanupFunc(cleanup func() error)

RegisterCleanupFunc registers a function to be called during shutdown

func (*Transport) RemoveEventHandler

func (t *Transport) RemoveEventHandler(eventType string, handlerID string) error

RemoveEventHandler removes an event handler by its ID with enhanced race condition protection

func (*Transport) ResetBackpressureStats

func (t *Transport) ResetBackpressureStats()

ResetBackpressureStats resets backpressure statistics

func (*Transport) ResetForTesting

func (t *Transport) ResetForTesting() error

ResetForTesting provides test isolation by clearing internal state

func (*Transport) SendEvent

func (t *Transport) SendEvent(ctx context.Context, event events.Event) error

SendEvent sends an event through the WebSocket transport

func (*Transport) Start

func (t *Transport) Start(ctx context.Context) error

Start initializes the WebSocket transport

func (*Transport) Stats

func (t *Transport) Stats() TransportStats

GetStats returns a copy of the transport statistics

func (*Transport) Stop

func (t *Transport) Stop() error

Stop gracefully shuts down the WebSocket transport

func (*Transport) Subscribe

func (t *Transport) Subscribe(ctx context.Context, eventTypes []string, handler EventHandler) (*Subscription, error)

Subscribe creates a subscription for specific event types

func (*Transport) Unsubscribe

func (t *Transport) Unsubscribe(subscriptionID string) error

Unsubscribe removes a subscription

type TransportConfig

type TransportConfig struct {
	// URLs are the WebSocket server URLs
	URLs []string

	// PoolConfig configures the connection pool
	PoolConfig *PoolConfig

	// PerformanceConfig configures performance optimizations
	PerformanceConfig *PerformanceConfig

	// SecurityConfig configures security settings
	SecurityConfig *SecurityConfig

	// DialTimeout is the timeout for establishing WebSocket connections
	DialTimeout time.Duration

	// EventTimeout is the timeout for event processing
	EventTimeout time.Duration

	// MaxEventSize is the maximum size of events
	MaxEventSize int64

	// EnableEventValidation enables event validation
	EnableEventValidation bool

	// EventValidator is the event validator instance
	EventValidator *events.EventValidator

	// Logger is the logger instance
	Logger *zap.Logger

	// BackpressureConfig configures backpressure behavior
	BackpressureConfig *BackpressureConfig

	// ResourceCleanupConfig configures resource cleanup
	ResourceCleanupConfig *ResourceCleanupConfig

	// ShutdownTimeout is the timeout for transport shutdown (default 5s, reduced for tests)
	ShutdownTimeout time.Duration
}

TransportConfig contains configuration for the WebSocket transport

func DefaultTransportConfig

func DefaultTransportConfig() *TransportConfig

DefaultTransportConfig returns a default configuration for the WebSocket transport

func ExampleFullConfig

func ExampleFullConfig() (*TransportConfig, error)

ExampleFullConfig demonstrates a complete WebSocket transport configuration with JWT

func FastTransportConfig

func FastTransportConfig() *TransportConfig

FastTransportConfig returns a transport configuration optimized for fast tests

func HighConcurrencyTransportConfig

func HighConcurrencyTransportConfig() *TransportConfig

HighConcurrencyTransportConfig returns a transport configuration optimized for high concurrency testing

func OptimizedTransportConfig

func OptimizedTransportConfig() *TransportConfig

OptimizedTransportConfig returns a transport configuration optimized for testing

type TransportStats

type TransportStats struct {
	EventsSent          int64
	EventsReceived      int64
	EventsProcessed     int64
	EventsFailed        int64
	EventsDropped       int64
	ActiveSubscriptions int64
	TotalSubscriptions  int64
	BytesTransferred    int64
	AverageLatency      time.Duration
	ActiveGoroutines    int64
	BackpressureEvents  int64
	ResourceCleanups    int64
	// contains filtered or unexported fields
}

TransportStats tracks transport statistics

type WSAuditLogger

type WSAuditLogger interface {
	LogSecurityEvent(ctx context.Context, event *SecurityEvent) error
}

WSAuditLogger interface for security event logging

type ZeroCopyBuffer

type ZeroCopyBuffer struct {
	// contains filtered or unexported fields
}

ZeroCopyBuffer implements zero-copy buffer operations

func NewZeroCopyBuffer

func NewZeroCopyBuffer(data []byte) *ZeroCopyBuffer

NewZeroCopyBuffer creates a new zero-copy buffer

func (*ZeroCopyBuffer) Advance

func (zcb *ZeroCopyBuffer) Advance(n int)

Advance advances the buffer offset

func (*ZeroCopyBuffer) Bytes

func (zcb *ZeroCopyBuffer) Bytes() []byte

Bytes returns the buffer data using zero-copy

func (*ZeroCopyBuffer) Len

func (zcb *ZeroCopyBuffer) Len() int

Len returns the remaining length

func (*ZeroCopyBuffer) Reset

func (zcb *ZeroCopyBuffer) Reset()

Reset resets the buffer

func (*ZeroCopyBuffer) String

func (zcb *ZeroCopyBuffer) String() string

String returns the buffer data as a string using zero-copy

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL