manager

package
v0.0.0-...-ae8f3be Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 1, 2026 License: MIT Imports: 8 Imported by: 0

README

流管理器 (Stream Manager)

提供流生命周期管理功能,包括状态机、事件系统、健康检查和自动重连。

特性

  • 流状态机: 完整的流生命周期状态管理
  • 事件系统: 实时流状态变更通知
  • 健康检查: 自动检测流健康状况
  • 资源限制: 最大流数量限制
  • 自动重连: 可配置的重连策略

快速开始

mgr := manager.New(manager.Config{
    MaxStreams: 100,
    HealthCheckInterval: 10 * time.Second,
})

// 监听事件
mgr.OnEvent(func(e manager.Event) {
    log.Printf("Stream %s: %s", e.StreamID, e.Type)
})

mgr.Start()
defer mgr.Stop()

// 添加流
streamID, err := mgr.AddStream(manager.StreamConfig{
    ID: "stream-1",
    Input: "rtsp://camera/live",
    Output: "rtmp://server/live",
})

流状态

const (
    StateIdle           StreamState = iota
    StateStarting
    StateRunning
    StatePaused
    StateReconnecting
    StateStopping
    StateStopped
    StateFailed
)

事件类型

事件 说明
EventStreamAdded 流已添加
EventStreamStarted 流已启动
EventStreamStopped 流已停止
EventStreamFailed 流失败
EventStreamReconnecting 流正在重连
EventHealthCheckFailed 健康检查失败
EventResourceLimit 资源限制

文件说明

文件 说明
manager.go 管理器核心实现
stream.go 流生命周期管理

Documentation

Overview

Package manager 提供流管理器功能

流管理器负责管理多个流的生命周期,包括:

  • 流状态机管理
  • 事件系统
  • 健康检查
  • 资源限制
  • 自动重连策略

使用示例:

mgr := manager.New(manager.Config{
    MaxStreams: 100,
    HealthCheckInterval: 10 * time.Second,
})
mgr.OnEvent(func(e manager.Event) {
    log.Printf("Stream %s: %s", e.StreamID, e.Type)
})
mgr.Start()
defer mgr.Stop()

streamID, err := mgr.AddStream(streamConfig)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// 资源限制
	MaxStreams     int     // 最大流数量(0 表示无限制)
	MaxBandwidth   int64   // 最大总带宽(bps,0 表示无限制)
	MaxCPUPercent  float64 // 最大 CPU 使用率(0-100,0 表示无限制)
	MaxMemoryBytes int64   // 最大内存使用(字节,0 表示无限制)

	// 健康检查
	HealthCheckInterval time.Duration // 健康检查间隔
	HealthCheckTimeout  time.Duration // 健康检查超时
	UnhealthyThreshold  int           // 连续失败次数阈值

	// 重连策略
	AutoReconnect     bool          // 是否自动重连
	ReconnectDelay    time.Duration // 重连延迟
	MaxReconnectDelay time.Duration // 最大重连延迟
	MaxReconnects     int           // 最大重连次数(0 表示无限制)

	// 清理策略
	IdleTimeout  time.Duration // 空闲超时(自动停止)
	StaleTimeout time.Duration // 过期超时(自动移除)
}

Config 管理器配置

func DefaultConfig

func DefaultConfig() Config

DefaultConfig 返回默认配置

type Event

type Event struct {
	Type      EventType
	StreamID  string
	Stream    *ManagedStream
	Timestamp time.Time
	Data      map[string]interface{}
	Error     error
}

Event 事件

type EventHandler

type EventHandler func(Event)

EventHandler 事件处理器

type EventType

type EventType string

EventType 事件类型

const (
	EventStreamAdded        EventType = "stream_added"
	EventStreamRemoved      EventType = "stream_removed"
	EventStreamStarted      EventType = "stream_started"
	EventStreamStopped      EventType = "stream_stopped"
	EventStreamPaused       EventType = "stream_paused"
	EventStreamResumed      EventType = "stream_resumed"
	EventStreamFailed       EventType = "stream_failed"
	EventStreamReconnecting EventType = "stream_reconnecting"
	EventStreamReconnected  EventType = "stream_reconnected"
	EventHealthCheckFailed  EventType = "health_check_failed"
	EventResourceLimit      EventType = "resource_limit"
	EventError              EventType = "error"
)

type ManagedStream

type ManagedStream struct {
	ID       string
	Name     string
	Config   StreamConfig
	Tags     map[string]string
	Metadata map[string]interface{}
	// contains filtered or unexported fields
}

ManagedStream 被管理的流

func (*ManagedStream) CheckHealth

func (s *ManagedStream) CheckHealth(timeout time.Duration) bool

CheckHealth 检查健康状态

func (*ManagedStream) Pause

func (s *ManagedStream) Pause() error

Pause 暂停流

func (*ManagedStream) Restart

func (s *ManagedStream) Restart() error

Restart 重启流

func (*ManagedStream) Resume

func (s *ManagedStream) Resume() error

Resume 恢复流

func (*ManagedStream) Start

func (s *ManagedStream) Start() error

Start 启动流

func (*ManagedStream) State

func (s *ManagedStream) State() StreamState

State 返回状态

func (*ManagedStream) Stats

func (s *ManagedStream) Stats() StreamStats

Stats 返回统计

func (*ManagedStream) Stop

func (s *ManagedStream) Stop() error

Stop 停止流

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager 流管理器

func New

func New(config Config) *Manager

New 创建流管理器

func (*Manager) ActiveStreamCount

func (m *Manager) ActiveStreamCount() int

ActiveStreamCount 返回活跃流数量

func (*Manager) AddStream

func (m *Manager) AddStream(config StreamConfig) (string, error)

AddStream 添加流

func (*Manager) GetStream

func (m *Manager) GetStream(id string) *ManagedStream

GetStream 获取流

func (*Manager) IsRunning

func (m *Manager) IsRunning() bool

IsRunning 检查是否运行中

func (*Manager) ListStreams

func (m *Manager) ListStreams() []*ManagedStream

ListStreams 列出所有流

func (*Manager) ListStreamsByState

func (m *Manager) ListStreamsByState(state StreamState) []*ManagedStream

ListStreamsByState 按状态列出流

func (*Manager) ListStreamsByTag

func (m *Manager) ListStreamsByTag(key, value string) []*ManagedStream

ListStreamsByTag 按标签列出流

func (*Manager) OnEvent

func (m *Manager) OnEvent(handler EventHandler)

OnEvent 注册事件处理器

func (*Manager) PauseStream

func (m *Manager) PauseStream(id string) error

PauseStream 暂停流

func (*Manager) RemoveStream

func (m *Manager) RemoveStream(id string) error

RemoveStream 移除流

func (*Manager) ResumeStream

func (m *Manager) ResumeStream(id string) error

ResumeStream 恢复流

func (*Manager) Start

func (m *Manager) Start() error

Start 启动管理器

func (*Manager) StartStream

func (m *Manager) StartStream(id string) error

StartStream 启动流

func (*Manager) Stats

func (m *Manager) Stats() ManagerStats

Stats 返回统计信息

func (*Manager) Stop

func (m *Manager) Stop() error

Stop 停止管理器

func (*Manager) StopStream

func (m *Manager) StopStream(id string) error

StopStream 停止流

func (*Manager) StreamCount

func (m *Manager) StreamCount() int

StreamCount 返回流数量

type ManagerStats

type ManagerStats struct {
	TotalStreams    int64         // 总流数
	ActiveStreams   int64         // 活跃流数
	FailedStreams   int64         // 失败流数
	TotalBandwidth  int64         // 总带宽
	TotalBytesIn    int64         // 总输入字节
	TotalBytesOut   int64         // 总输出字节
	TotalReconnects int64         // 总重连次数
	Uptime          time.Duration // 运行时间
	StartTime       time.Time     // 启动时间
}

ManagerStats 管理器统计

type StreamConfig

type StreamConfig struct {
	ID         string                 // 流 ID(为空则自动生成)
	Name       string                 // 流名称
	InputURL   string                 // 输入 URL
	OutputURLs []string               // 输出 URL 列表
	Tags       map[string]string      // 标签
	Metadata   map[string]interface{} // 元数据

	// 重连设置(覆盖管理器默认值)
	AutoReconnect     *bool
	MaxReconnects     *int
	ReconnectDelay    *time.Duration
	MaxReconnectDelay *time.Duration
}

StreamConfig 流配置

type StreamState

type StreamState int32

StreamState 流状态

const (
	StateIdle StreamState = iota
	StateStarting
	StateRunning
	StatePaused
	StateReconnecting
	StateStopping
	StateStopped
	StateFailed
)

func (StreamState) IsActive

func (s StreamState) IsActive() bool

IsActive 检查是否为活跃状态

func (StreamState) IsTerminal

func (s StreamState) IsTerminal() bool

IsTerminal 检查是否为终止状态

func (StreamState) String

func (s StreamState) String() string

String 返回状态名称

type StreamStats

type StreamStats struct {
	BytesIn     int64
	BytesOut    int64
	PacketsIn   int64
	PacketsOut  int64
	FramesIn    int64
	FramesOut   int64
	Errors      int64
	Reconnects  int64
	Duration    time.Duration
	Bitrate     float64
	FPS         float64
	LastError   error
	LastErrorAt time.Time
}

StreamStats 流统计

Source Files

  • manager.go
  • stream.go

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL