upstream

package
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2026 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func StateChangeNotifier

func StateChangeNotifier(nm *NotificationManager, serverName string) func(oldState, newState types.ConnectionState, info *types.ConnectionInfo)

StateChangeNotifier creates state change notifications based on state transitions

Types

type ClientFactory

type ClientFactory interface {
	// Create core client for basic operations
	CreateCoreClient(id string, config *config.ServerConfig) (MCPClient, error)

	// Create managed client with state machine
	CreateManagedClient(id string, config *config.ServerConfig) (StatefulClient, error)

	// Create CLI client for debugging
	CreateCLIClient(id string, config *config.ServerConfig) (MCPClient, error)
}

ClientFactory creates different types of clients

type ClientPool

type ClientPool interface {
	// Pool management
	AddClient(id string, client StatefulClient) error
	RemoveClient(id string)
	GetClient(id string) (StatefulClient, bool)
	GetAllClients() map[string]StatefulClient

	// Bulk operations
	ConnectAll(ctx context.Context) error
	DisconnectAll() error

	// Tool operations across all clients
	DiscoverTools(ctx context.Context) ([]*config.ToolMetadata, error)
	CallTool(ctx context.Context, toolName string, args map[string]interface{}) (*mcp.CallToolResult, error)
}

ClientPool manages multiple clients

type MCPClient

type MCPClient interface {
	// Connection management
	Connect(ctx context.Context) error
	Disconnect() error
	IsConnected() bool

	// MCP operations
	ListTools(ctx context.Context) ([]*config.ToolMetadata, error)
	CallTool(ctx context.Context, toolName string, args map[string]interface{}) (*mcp.CallToolResult, error)

	// Status and info
	GetConnectionInfo() types.ConnectionInfo
	GetServerInfo() *mcp.InitializeResult
}

MCPClient defines the core interface for MCP client operations

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages connections to multiple upstream MCP servers

func NewManager

func NewManager(logger *zap.Logger, globalConfig *config.Config, boltStorage *storage.BoltDB, secretResolver *secret.Resolver, storageMgr *storage.Manager) *Manager

NewManager creates a new upstream manager

func (*Manager) AddNotificationHandler

func (m *Manager) AddNotificationHandler(handler NotificationHandler)

AddNotificationHandler adds a notification handler to receive state change notifications

func (*Manager) AddServer

func (m *Manager) AddServer(id string, serverConfig *config.ServerConfig) error

AddServer adds a new upstream server and connects to it (legacy method)

func (*Manager) AddServerConfig

func (m *Manager) AddServerConfig(id string, serverConfig *config.ServerConfig) error

AddServerConfig adds a server configuration without connecting

func (*Manager) CallTool

func (m *Manager) CallTool(ctx context.Context, toolName string, args map[string]interface{}) (interface{}, error)

CallTool calls a tool on the appropriate upstream server

func (*Manager) ClearOAuthToken

func (m *Manager) ClearOAuthToken(serverName string) error

ClearOAuthToken clears the OAuth token for a specific server. This is called by the OAuth logout flow to remove stored credentials. Works for both explicit OAuth config and discovered OAuth (server-announced OAuth).

func (*Manager) ConnectAll

func (m *Manager) ConnectAll(ctx context.Context) error

ConnectAll connects to all configured servers that should retry

func (*Manager) DisconnectAll

func (m *Manager) DisconnectAll() error

DisconnectAll disconnects from all servers

func (*Manager) DisconnectServer

func (m *Manager) DisconnectServer(serverName string) error

DisconnectServer disconnects a specific server without removing its configuration. This is used after OAuth logout to force re-authentication on next connect.

func (*Manager) DiscoverTools

func (m *Manager) DiscoverTools(ctx context.Context) ([]*config.ToolMetadata, error)

DiscoverTools discovers all tools from all connected upstream servers. Security: Tools from quarantined servers are NOT discovered to prevent Tool Poisoning Attacks (TPA) from exposing potentially malicious tool descriptions.

func (*Manager) ForceCleanupAllContainers

func (m *Manager) ForceCleanupAllContainers()

ForceCleanupAllContainers is a public wrapper for emergency container cleanup This is called when graceful shutdown fails and containers must be force-removed Only removes containers owned by THIS instance (matching instance ID)

func (*Manager) ForceReconnectAll

func (m *Manager) ForceReconnectAll(reason string) *ReconnectResult

ForceReconnectAll triggers reconnection attempts for all managed clients. For Docker-based servers, this includes container health verification to catch frozen containers. Returns detailed results about which servers were reconnected, skipped, or failed.

func (*Manager) GetAllClients

func (m *Manager) GetAllClients() map[string]*managed.Client

GetAllClients returns all clients

func (*Manager) GetAllServerNames

func (m *Manager) GetAllServerNames() []string

GetAllServerNames returns a slice of all configured server names

func (*Manager) GetClient

func (m *Manager) GetClient(id string) (*managed.Client, bool)

GetClient returns a client by ID

func (*Manager) GetDockerRecoveryStatus

func (m *Manager) GetDockerRecoveryStatus() *storage.DockerRecoveryState

GetDockerRecoveryStatus returns the current Docker recovery status

func (*Manager) GetStats

func (m *Manager) GetStats() map[string]interface{}

GetStats returns statistics about upstream connections GetStats returns statistics about all managed clients. Phase 6 Fix: Lock-free implementation to prevent deadlock with async operations.

func (*Manager) GetTotalToolCount

func (m *Manager) GetTotalToolCount() int

GetTotalToolCount returns the total number of tools across all servers. Uses cached counts to avoid excessive network calls (2-minute cache per server). Phase 6 Fix: Lock-free implementation to prevent deadlock.

func (*Manager) HasDockerContainers

func (m *Manager) HasDockerContainers() bool

HasDockerContainers checks if any Docker containers owned by THIS instance are actually running

func (*Manager) InvalidateAllToolCountCaches

func (m *Manager) InvalidateAllToolCountCaches()

InvalidateAllToolCountCaches invalidates tool count caches for all clients This should be called when tools are known to have changed (e.g., after indexing)

func (*Manager) ListServers

func (m *Manager) ListServers() map[string]*config.ServerConfig

ListServers returns information about all registered servers

func (*Manager) RefreshOAuthToken

func (m *Manager) RefreshOAuthToken(serverName string) error

RefreshOAuthToken triggers a token refresh for a specific server. This is called by the RefreshManager for proactive token refresh. TODO: This will be fully implemented in Phase 3 (US1) with RefreshManager integration.

func (*Manager) RemoveServer

func (m *Manager) RemoveServer(id string)

RemoveServer removes an upstream server

func (*Manager) RetryConnection

func (m *Manager) RetryConnection(serverName string) error

RetryConnection triggers a connection retry for a specific server This is typically called after OAuth completion to immediately use new tokens

func (*Manager) SetLogConfig

func (m *Manager) SetLogConfig(logConfig *config.LogConfig)

SetLogConfig sets the logging configuration for upstream server loggers

func (*Manager) SetToolDiscoveryCallback

func (m *Manager) SetToolDiscoveryCallback(callback func(ctx context.Context, serverName string) error)

SetToolDiscoveryCallback sets the callback for triggering tool re-indexing when upstream servers send notifications/tools/list_changed notifications. This callback will be passed to all new clients created by the manager.

func (*Manager) SetUserLoggedOut

func (m *Manager) SetUserLoggedOut(serverName string, loggedOut bool) error

SetUserLoggedOut marks a server as explicitly logged out by the user. This prevents automatic reconnection until the user explicitly logs in again.

func (*Manager) ShutdownAll

func (m *Manager) ShutdownAll(ctx context.Context) error

ShutdownAll disconnects all clients and ensures all Docker containers are stopped This should be called during application shutdown to ensure clean exit

func (*Manager) StartManualOAuth

func (m *Manager) StartManualOAuth(serverName string, force bool) error

StartManualOAuth performs an in-process OAuth flow for the given server. This avoids cross-process DB locking by using the daemon's storage directly.

func (*Manager) StartManualOAuthQuick

func (m *Manager) StartManualOAuthQuick(serverName string) (*core.OAuthStartResult, error)

StartManualOAuthQuick starts OAuth and returns browser status immediately. Unlike StartManualOAuth (fully async, no result) or StartManualOAuthWithInfo (fully sync, blocks), this returns browser status quickly but continues the OAuth callback handling in background.

This is the recommended method for API endpoints that need to return browser_opened status.

func (*Manager) StartManualOAuthWithInfo

func (m *Manager) StartManualOAuthWithInfo(serverName string, force bool) (*core.OAuthStartResult, error)

StartManualOAuthWithInfo performs an in-process OAuth flow and returns the auth URL and browser status. This is used by Phase 3 (Spec 020) to return structured information about the OAuth flow start. Unlike StartManualOAuth, this method waits for the auth URL to be obtained before returning.

type Notification

type Notification struct {
	Level      NotificationLevel `json:"level"`
	Title      string            `json:"title"`
	Message    string            `json:"message"`
	ServerName string            `json:"server_name,omitempty"`
	Timestamp  time.Time         `json:"timestamp"`
}

Notification represents a notification to be sent to the UI

type NotificationHandler

type NotificationHandler interface {
	SendNotification(notification *Notification)
}

NotificationHandler defines the interface for handling notifications

type NotificationLevel

type NotificationLevel int

NotificationLevel represents the level of a notification

const (
	NotificationInfo NotificationLevel = iota
	NotificationWarning
	NotificationError
)

func (NotificationLevel) String

func (l NotificationLevel) String() string

String returns the string representation of the notification level

type NotificationManager

type NotificationManager struct {
	// contains filtered or unexported fields
}

NotificationManager manages notification handlers and provides convenience methods

func NewNotificationManager

func NewNotificationManager() *NotificationManager

NewNotificationManager creates a new notification manager

func (*NotificationManager) AddHandler

func (nm *NotificationManager) AddHandler(handler NotificationHandler)

AddHandler adds a notification handler

func (*NotificationManager) NotifyOAuthRequired

func (nm *NotificationManager) NotifyOAuthRequired(serverName string)

NotifyOAuthRequired sends a notification when OAuth authentication is required

func (*NotificationManager) NotifyServerConnected

func (nm *NotificationManager) NotifyServerConnected(serverName string)

NotifyServerConnected sends a notification when a server connects

func (*NotificationManager) NotifyServerConnecting

func (nm *NotificationManager) NotifyServerConnecting(serverName string)

NotifyServerConnecting sends a notification when a server starts connecting

func (*NotificationManager) NotifyServerDisconnected

func (nm *NotificationManager) NotifyServerDisconnected(serverName string, reason error)

NotifyServerDisconnected sends a notification when a server disconnects

func (*NotificationManager) NotifyServerError

func (nm *NotificationManager) NotifyServerError(serverName string, err error)

NotifyServerError sends a notification when a server encounters an error

func (*NotificationManager) SendNotification

func (nm *NotificationManager) SendNotification(notification *Notification)

SendNotification sends a notification to all registered handlers

type ReconnectResult

type ReconnectResult struct {
	TotalServers      int
	AttemptedServers  int
	SuccessfulServers []string
	FailedServers     map[string]error
	SkippedServers    map[string]string // server name -> skip reason
}

ReconnectResult tracks the results of a ForceReconnectAll operation

type StatefulClient

type StatefulClient interface {
	MCPClient

	// State management
	GetState() types.ConnectionState
	IsConnecting() bool

	// Advanced connection management
	ShouldRetry() bool
	SetStateChangeCallback(callback func(oldState, newState types.ConnectionState, info *types.ConnectionInfo))

	// Tool count optimization
	GetCachedToolCount(ctx context.Context) (int, error)
}

StatefulClient extends MCPClient with state management capabilities

type TransportClient

type TransportClient interface {
	// Transport-specific connection
	ConnectWithTransport(ctx context.Context, transportType string) error

	// Access to underlying transport details
	GetTransportType() string
	GetStderr() io.Reader // For stdio transport
}

TransportClient defines transport-specific client creation

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL