telemetry

package
v0.0.0-...-4d9b869 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MetricToolCallsTotal         = "tool_calls_total"
	MetricToolDurationSeconds    = "tool_duration_seconds"
	MetricModelRequestsTotal     = "model_requests_total"
	MetricModelLatencySeconds    = "model_latency_seconds"
	MetricStorageOperationsTotal = "storage_operations_total"
	MetricStorageErrorsTotal     = "storage_errors_total"
	MetricActiveSessions         = "active_sessions"
	MetricMemoryUsageBytes       = "memory_usage_bytes"
)

Predefined metric names for Buckley.

View Source
const (
	// DefaultPersistBatchSize is the max events to buffer before writing.
	DefaultPersistBatchSize = 100
	// DefaultPersistFlushInterval is how often to flush buffered events.
	DefaultPersistFlushInterval = 500 * time.Millisecond
)
View Source
const (
	// DefaultEventQueueSize is the default buffer size for the event queue.
	DefaultEventQueueSize = 1000
	// DefaultBatchSize is the default number of events to batch before flushing.
	DefaultBatchSize = 100
	// DefaultFlushInterval is the default interval to flush batched events.
	DefaultFlushInterval = 100 * time.Millisecond
	// DefaultRateLimit is the default rate limit for events per second.
	DefaultRateLimit = 1000
	// DefaultSubscriberChannelSize is the default buffer size for subscriber channels.
	DefaultSubscriberChannelSize = 64
)

Variables

View Source
var DefaultHistogramBuckets = []float64{
	0.001,
	0.005,
	0.01,
	0.025,
	0.05,
	0.1,
	0.25,
	0.5,
	1.0,
	2.5,
	5.0,
	10.0,
}

DefaultHistogramBuckets are the default latency buckets in milliseconds. Buckets: 1ms, 5ms, 10ms, 25ms, 50ms, 100ms, 250ms, 500ms, 1s, 2.5s, 5s, 10s

View Source
var DefaultRegistry = NewRegistry()

DefaultRegistry is the default global registry.

Functions

func DecActiveSessions

func DecActiveSessions()

DecActiveSessions decrements the active sessions count.

func GetMemoryStatsJSON

func GetMemoryStatsJSON() ([]byte, error)

GetMemoryStatsJSON returns memory statistics as JSON.

func IncActiveSessions

func IncActiveSessions()

IncActiveSessions increments the active sessions count.

func RecordMemoryStats

func RecordMemoryStats()

RecordMemoryStats records current memory statistics.

func RecordModelLatency

func RecordModelLatency(duration time.Duration)

RecordModelLatency records the latency of a model request.

func RecordModelRequest

func RecordModelRequest(model string)

RecordModelRequest records a model request.

func RecordStorageError

func RecordStorageError(operation string)

RecordStorageError records a storage error.

func RecordStorageOperation

func RecordStorageOperation(operation string)

RecordStorageOperation records a storage operation.

func RecordToolCall

func RecordToolCall(toolName string)

RecordToolCall records a tool call.

func RecordToolDuration

func RecordToolDuration(toolName string, duration time.Duration)

RecordToolDuration records the duration of a tool execution.

func SetActiveSessions

func SetActiveSessions(count int64)

SetActiveSessions sets the number of active sessions.

func StartCPUProfile

func StartCPUProfile(w io.Writer) error

StartCPUProfile starts CPU profiling and writes to the given writer. Returns an error if profiling is already started.

func StopCPUProfile

func StopCPUProfile()

StopCPUProfile stops the current CPU profiling.

func WriteHeapProfile

func WriteHeapProfile(w io.Writer) error

WriteHeapProfile writes the current heap profile to the given writer.

Types

type Config

type Config struct {
	// EventQueueSize is the buffer size for the internal event queue.
	// Events are dropped if the queue is full.
	EventQueueSize int
	// BatchSize is the number of events to accumulate before flushing to subscribers.
	BatchSize int
	// FlushInterval is the maximum time to wait before flushing batched events.
	FlushInterval time.Duration
	// RateLimit is the maximum number of events per second.
	// Events exceeding this rate are dropped.
	RateLimit int
	// SubscriberChannelSize is the buffer size for individual subscriber channels.
	SubscriberChannelSize int
}

Config holds configuration options for the Hub.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a default configuration.

type Counter

type Counter struct {
	// contains filtered or unexported fields
}

Counter is a monotonically increasing metric.

func NewCounter

func NewCounter(name string, labels Labels) *Counter

NewCounter creates a new counter with the given name and labels.

func RegisterCounter

func RegisterCounter(name string, labels Labels) *Counter

RegisterCounter registers a counter in the default registry.

func (*Counter) Add

func (c *Counter) Add(delta int64)

Add adds the given value to the counter.

func (*Counter) Get

func (c *Counter) Get() int64

Get returns the current value.

func (*Counter) Inc

func (c *Counter) Inc()

Inc increments the counter by 1.

func (*Counter) Labels

func (c *Counter) Labels() Labels

Labels returns the metric labels.

func (*Counter) MarshalJSON

func (c *Counter) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler.

func (*Counter) Name

func (c *Counter) Name() string

Name returns the metric name.

func (*Counter) String

func (c *Counter) String() string

String returns a human-readable representation.

func (*Counter) Type

func (c *Counter) Type() MetricType

Type returns the metric type.

type Event

type Event struct {
	Type      EventType      `json:"type"`
	Timestamp time.Time      `json:"timestamp"`
	SessionID string         `json:"sessionId,omitempty"`
	PlanID    string         `json:"planId,omitempty"`
	TaskID    string         `json:"taskId,omitempty"`
	Data      map[string]any `json:"data,omitempty"`
}

Event describes workflow telemetry that UIs and IPC clients can consume.

type EventType

type EventType string

EventType identifies the kind of telemetry event.

const (
	EventPlanCreated                EventType = "plan.created"
	EventPlanUpdated                EventType = "plan.updated"
	EventTaskStarted                EventType = "task.started"
	EventTaskCompleted              EventType = "task.completed"
	EventTaskFailed                 EventType = "task.failed"
	EventResearchStarted            EventType = "research.started"
	EventResearchCompleted          EventType = "research.completed"
	EventResearchFailed             EventType = "research.failed"
	EventBuilderStarted             EventType = "builder.started"
	EventBuilderCompleted           EventType = "builder.completed"
	EventBuilderFailed              EventType = "builder.failed"
	EventCostUpdated                EventType = "cost.updated"
	EventTokenUsageUpdated          EventType = "tokens.updated"
	EventShellCommandStarted        EventType = "shell.started"
	EventShellCommandCompleted      EventType = "shell.completed"
	EventShellCommandFailed         EventType = "shell.failed"
	EventToolStarted                EventType = "tool.started"
	EventToolCompleted              EventType = "tool.completed"
	EventToolFailed                 EventType = "tool.failed"
	EventModelStreamStarted         EventType = "model.stream_start"
	EventModelStreamEnded           EventType = "model.stream_end"
	EventIndexStarted               EventType = "index.started"
	EventIndexCompleted             EventType = "index.completed"
	EventIndexFailed                EventType = "index.failed"
	EventEditorInline               EventType = "editor.inline"
	EventEditorPropose              EventType = "editor.propose"
	EventEditorApply                EventType = "editor.apply"
	EventUICommand                  EventType = "ui.command"
	EventExperimentStarted          EventType = "experiment.started"
	EventExperimentCompleted        EventType = "experiment.completed"
	EventExperimentFailed           EventType = "experiment.failed"
	EventExperimentVariantStarted   EventType = "experiment.variant.started"
	EventExperimentVariantCompleted EventType = "experiment.variant.completed"
	EventExperimentVariantFailed    EventType = "experiment.variant.failed"
	EventRLMIteration               EventType = "rlm.iteration"
	EventCircuitFailure             EventType = "circuit.failure"
	EventCircuitStateChange         EventType = "circuit.state_change"

	// RLM transparency events
	EventRLMEscalation    EventType = "rlm.escalation"     // Weight tier escalation
	EventRLMToolCall      EventType = "rlm.tool_call"      // Sub-agent tool execution
	EventRLMReasoning     EventType = "rlm.reasoning"      // Coordinator reasoning trace
	EventRLMBudgetWarning EventType = "rlm.budget_warning" // Token/time budget alerts

	// Browser runtime events
	EventBrowserSessionCreated EventType = "browser.session_created"
	EventBrowserSessionClosed  EventType = "browser.session_closed"
	EventBrowserNavigate       EventType = "browser.navigate"
	EventBrowserObserve        EventType = "browser.observe"
	EventBrowserAction         EventType = "browser.action"
	EventBrowserActionFailed   EventType = "browser.action_failed"
	EventBrowserFrameDelivered EventType = "browser.frame_delivered"
	EventBrowserStreamEvent    EventType = "browser.stream_event"

	// Machine state events
	EventMachineSpawned   EventType = "machine.spawned"
	EventMachineState     EventType = "machine.state"
	EventMachineCompleted EventType = "machine.completed"
	EventMachineFailed    EventType = "machine.failed"
	EventMachineSteering  EventType = "machine.steering"

	// Machine lock events
	EventMachineLockAcquired EventType = "machine.lock.acquired"
	EventMachineLockWaiting  EventType = "machine.lock.waiting"
	EventMachineLockReleased EventType = "machine.lock.released"
	EventMachineLockStale    EventType = "machine.lock.stale"

	// Machine tool events
	EventMachineToolStart    EventType = "machine.tool.start"
	EventMachineToolComplete EventType = "machine.tool.complete"

	// Machine review events
	EventMachineReview    EventType = "machine.review"
	EventMachineIteration EventType = "machine.iteration"
)

type Gauge

type Gauge struct {
	// contains filtered or unexported fields
}

Gauge is a metric that can go up and down.

func NewGauge

func NewGauge(name string, labels Labels) *Gauge

NewGauge creates a new gauge with the given name and labels.

func RegisterGauge

func RegisterGauge(name string, labels Labels) *Gauge

RegisterGauge registers a gauge in the default registry.

func (*Gauge) Add

func (g *Gauge) Add(delta int64)

Add adds the given value to the gauge.

func (*Gauge) Dec

func (g *Gauge) Dec()

Dec decrements the gauge by 1.

func (*Gauge) Get

func (g *Gauge) Get() int64

Get returns the current value.

func (*Gauge) GetFloat64

func (g *Gauge) GetFloat64() float64

GetFloat64 returns the current value as float64.

func (*Gauge) Inc

func (g *Gauge) Inc()

Inc increments the gauge by 1.

func (*Gauge) Labels

func (g *Gauge) Labels() Labels

Labels returns the metric labels.

func (*Gauge) MarshalJSON

func (g *Gauge) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler.

func (*Gauge) Name

func (g *Gauge) Name() string

Name returns the metric name.

func (*Gauge) Set

func (g *Gauge) Set(value int64)

Set sets the gauge to the given value.

func (*Gauge) SetFloat64

func (g *Gauge) SetFloat64(value float64)

SetFloat64 sets the gauge to the given float64 value (stored as nanoseconds for time values).

func (*Gauge) String

func (g *Gauge) String() string

String returns a human-readable representation.

func (*Gauge) Type

func (g *Gauge) Type() MetricType

Type returns the metric type.

type Histogram

type Histogram struct {
	// contains filtered or unexported fields
}

Histogram is a metric that samples observations and counts them in buckets.

func NewHistogram

func NewHistogram(name string, labels Labels, buckets []float64) *Histogram

NewHistogram creates a new histogram with the given name, labels, and buckets. If buckets is nil, DefaultHistogramBuckets is used.

func RegisterHistogram

func RegisterHistogram(name string, labels Labels, buckets []float64) *Histogram

RegisterHistogram registers a histogram in the default registry.

func (*Histogram) GetBuckets

func (h *Histogram) GetBuckets() []int64

GetBuckets returns the bucket counts.

func (*Histogram) GetCount

func (h *Histogram) GetCount() int64

GetCount returns the total number of observations.

func (*Histogram) GetSum

func (h *Histogram) GetSum() float64

GetSum returns the sum of all observed values (in seconds).

func (*Histogram) Labels

func (h *Histogram) Labels() Labels

Labels returns the metric labels.

func (*Histogram) MarshalJSON

func (h *Histogram) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler.

func (*Histogram) Name

func (h *Histogram) Name() string

Name returns the metric name.

func (*Histogram) Observe

func (h *Histogram) Observe(value float64)

Observe records a value in the histogram. Value should be in seconds (float64).

func (*Histogram) ObserveDuration

func (h *Histogram) ObserveDuration(duration time.Duration)

ObserveDuration records a duration observation.

func (*Histogram) P50

func (h *Histogram) P50() float64

P50 returns the 50th percentile (median).

func (*Histogram) P90

func (h *Histogram) P90() float64

P90 returns the 90th percentile.

func (*Histogram) P99

func (h *Histogram) P99() float64

P99 returns the 99th percentile.

func (*Histogram) Percentile

func (h *Histogram) Percentile(p float64) float64

Percentile returns the estimated value at the given percentile (0-1). Returns 0 if no observations have been recorded.

func (*Histogram) String

func (h *Histogram) String() string

String returns a human-readable representation.

func (*Histogram) Type

func (h *Histogram) Type() MetricType

Type returns the metric type.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub is an optimized telemetry event hub that supports: - Non-blocking event publishing with buffered queue - Event batching for high-frequency scenarios - Rate limiting using token bucket algorithm - Thread-safe subscriber management - Graceful shutdown with event flushing

func NewHub

func NewHub() *Hub

NewHub creates a new telemetry hub with default configuration.

func NewHubWithConfig

func NewHubWithConfig(config *Config) *Hub

NewHubWithConfig creates a new telemetry hub with custom configuration.

func (*Hub) Close

func (h *Hub) Close()

Close unsubscribes all listeners and prevents future publications. This is an alias for Stop() + Wait() for backward compatibility.

func (*Hub) DroppedEvents

func (h *Hub) DroppedEvents() int64

DroppedEvents returns the total number of events dropped due to rate limiting or full subscriber channels.

func (*Hub) Flush

func (h *Hub) Flush()

Flush forces an immediate flush of batched events. This is useful for tests or when immediate delivery is required.

func (*Hub) GetStats

func (h *Hub) GetStats() Stats

GetStats returns current hub statistics.

func (*Hub) Publish

func (h *Hub) Publish(event Event)

Publish notifies all subscribers of an event. This method is non-blocking; events are dropped if the queue is full. Maintains backward compatibility with the original Hub.Publish signature.

func (*Hub) Stop

func (h *Hub) Stop()

Stop initiates graceful shutdown and flushes remaining events. This method returns immediately; use Wait() to block until complete.

func (*Hub) Subscribe

func (h *Hub) Subscribe() (<-chan Event, func())

Subscribe returns a channel that will receive future events and a cleanup func. Maintains backward compatibility with the original Hub.Subscribe signature.

func (*Hub) SubscribeWithID

func (h *Hub) SubscribeWithID() (<-chan Event, string)

SubscribeWithID returns a channel that will receive future events and a subscriber ID. The returned ID can be used with Unsubscribe(id) for explicit unsubscription.

func (*Hub) Unsubscribe

func (h *Hub) Unsubscribe(id string)

Unsubscribe removes a subscriber by ID and closes its channel. This is thread-safe and can be called concurrently.

func (*Hub) Wait

func (h *Hub) Wait()

Wait blocks until the hub has finished processing all events and shut down. Call Stop() before Wait() to initiate graceful shutdown.

type Labels

type Labels map[string]string

Labels represents a set of dimensional labels for metrics.

func (Labels) String

func (l Labels) String() string

String returns a string representation of labels for map keys.

type MemoryStats

type MemoryStats struct {
	Alloc        uint64 `json:"alloc"`
	TotalAlloc   uint64 `json:"total_alloc"`
	Sys          uint64 `json:"sys"`
	NumGC        uint32 `json:"num_gc"`
	HeapAlloc    uint64 `json:"heap_alloc"`
	HeapSys      uint64 `json:"heap_sys"`
	HeapIdle     uint64 `json:"heap_idle"`
	HeapInuse    uint64 `json:"heap_inuse"`
	HeapReleased uint64 `json:"heap_released"`
	HeapObjects  uint64 `json:"heap_objects"`
	StackInuse   uint64 `json:"stack_inuse"`
	StackSys     uint64 `json:"stack_sys"`
	Goroutines   int    `json:"goroutines"`
	Timestamp    int64  `json:"timestamp"`
}

MemoryStats holds key memory statistics.

func GetMemoryStats

func GetMemoryStats() MemoryStats

GetMemoryStats returns current memory statistics.

type Metric

type Metric interface {
	Name() string
	Type() MetricType
	String() string
}

Metric is the common interface for all metric types.

type MetricType

type MetricType string

MetricType identifies the kind of metric.

const (
	MetricTypeCounter   MetricType = "counter"
	MetricTypeGauge     MetricType = "gauge"
	MetricTypeHistogram MetricType = "histogram"
)

type Persister

type Persister struct {
	// contains filtered or unexported fields
}

Persister subscribes to a Hub and durably writes events to a SQLiteStore. Events are batched for write efficiency.

func NewPersister

func NewPersister(hub *Hub, store *SQLiteStore, streamID string) *Persister

NewPersister creates a Persister that subscribes to the Hub and writes events to the store under the given streamID (typically a session ID).

func (*Persister) Flush

func (p *Persister) Flush()

Flush forces an immediate write of buffered events to SQLite.

func (*Persister) Stop

func (p *Persister) Stop()

Stop unsubscribes from the Hub, flushes remaining events, and shuts down.

type ProfileConfig

type ProfileConfig struct {
	MemoryInterval time.Duration // Interval for recording memory stats
}

ProfileConfig holds configuration for continuous profiling.

func DefaultProfileConfig

func DefaultProfileConfig() *ProfileConfig

DefaultProfileConfig returns default profiling configuration.

type ProfileRecorder

type ProfileRecorder struct {
	// contains filtered or unexported fields
}

ProfileRecorder handles continuous profiling.

func NewProfileRecorder

func NewProfileRecorder(config *ProfileConfig) *ProfileRecorder

NewProfileRecorder creates a new profile recorder.

func (*ProfileRecorder) Start

func (pr *ProfileRecorder) Start()

Start begins continuous profiling.

func (*ProfileRecorder) Stop

func (pr *ProfileRecorder) Stop()

Stop stops continuous profiling.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry manages all metrics.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new metric registry.

func (*Registry) Export

func (r *Registry) Export() map[string]any

Export exports all metrics as a map suitable for JSON serialization.

func (*Registry) ExportJSON

func (r *Registry) ExportJSON() ([]byte, error)

ExportJSON exports all metrics as JSON.

func (*Registry) GetAllCounters

func (r *Registry) GetAllCounters() []*Counter

GetAllCounters returns all registered counters.

func (*Registry) GetAllGauges

func (r *Registry) GetAllGauges() []*Gauge

GetAllGauges returns all registered gauges.

func (*Registry) GetAllHistograms

func (r *Registry) GetAllHistograms() []*Histogram

GetAllHistograms returns all registered histograms.

func (*Registry) GetCounter

func (r *Registry) GetCounter(name string, labels Labels) (*Counter, bool)

GetCounter retrieves a counter by name and labels.

func (*Registry) GetGauge

func (r *Registry) GetGauge(name string, labels Labels) (*Gauge, bool)

GetGauge retrieves a gauge by name and labels.

func (*Registry) GetHistogram

func (r *Registry) GetHistogram(name string, labels Labels) (*Histogram, bool)

GetHistogram retrieves a histogram by name and labels.

func (*Registry) RegisterCounter

func (r *Registry) RegisterCounter(name string, labels Labels) *Counter

RegisterCounter registers a counter metric.

func (*Registry) RegisterGauge

func (r *Registry) RegisterGauge(name string, labels Labels) *Gauge

RegisterGauge registers a gauge metric.

func (*Registry) RegisterHistogram

func (r *Registry) RegisterHistogram(name string, labels Labels, buckets []float64) *Histogram

RegisterHistogram registers a histogram metric.

func (*Registry) WriteTo

func (r *Registry) WriteTo(w io.Writer) (int64, error)

WriteTo writes all metrics to the given writer, implementing io.WriterTo.

type SQLiteStore

type SQLiteStore struct {
	// contains filtered or unexported fields
}

SQLiteStore persists telemetry events to SQLite for replay and analysis.

func NewSQLiteStore

func NewSQLiteStore(dbPath string) (*SQLiteStore, error)

NewSQLiteStore creates a new SQLite-backed event store. Accepts a file path or ":memory:" for in-memory storage.

func (*SQLiteStore) Append

func (s *SQLiteStore) Append(ctx context.Context, streamID string, events []Event) error

Append writes events to the store. Each event gets an auto-incremented version within the stream for ordering and replay.

func (*SQLiteStore) Close

func (s *SQLiteStore) Close() error

Close closes the database connection.

func (*SQLiteStore) Read

func (s *SQLiteStore) Read(ctx context.Context, streamID string, fromVersion int64) ([]Event, error)

Read returns events for a stream starting from the given version (inclusive). Pass version=0 to read all events.

func (*SQLiteStore) ReadByType

func (s *SQLiteStore) ReadByType(ctx context.Context, streamID string, eventType EventType) ([]Event, error)

ReadByType returns events for a stream filtered by event type.

type Stats

type Stats struct {
	SubscriberCount int
	QueueSize       int
	BatchSize       int
	RateLimit       int
}

Stats returns current hub statistics for monitoring.

type Timer

type Timer struct {
	// contains filtered or unexported fields
}

Timer is a helper for timing operations.

func NewTimer

func NewTimer() *Timer

NewTimer creates a new timer.

func (*Timer) Elapsed

func (t *Timer) Elapsed() time.Duration

Elapsed returns the elapsed time.

func (*Timer) Observe

func (t *Timer) Observe(h *Histogram)

Observe records the elapsed time in a histogram.

func (*Timer) Start

func (t *Timer) Start()

Start resets and starts the timer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL