queue

package
v0.1.41 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2026 License: LGPL-2.1 Imports: 13 Imported by: 0

README

Queue Package

The queue layer drives source and destination traversal and copy using pkg/db. Two queues (src and dst, plus copy in copy mode) perform breadth-first work in rounds (depth). The destination is gated by QueueCoordinator.

Frontier today is database-backed: pending work is pulled from DuckDB in batches (~10k rows, ID keyset pagination) into an in-memory pendingBuff; workers lease tasks from that buffer. Completed work is flushed via db.SealLevel (optionally through SealBuffer for async seal). There is no separate NodeCache / LevelCache source file—the model is “DB + pending buffer + seal,” not a pure memory-first level cache.


High-level flow

  1. PullPullTraversalTasks / PullRetryTasks / PullCopyTasks refill pendingBuff from SQL (ListNodesByDepthKeyset, ListDstBatchWithSrcChildren, copy keysets, etc.). Only one pull runs at a time (getPulling / setPulling).
  2. Lease – Workers take tasks from pendingBuff into inProgress.
  3. CompleteReportTaskResult updates state and enqueues seal work (nodes + per-depth stats) through the DB layer.
  4. Coordinator – DST may start round N only when SRC has completed rounds N and N+1 (or SRC traversal is done). SRC is not round-gated against DST; work is pulled and sealed to DuckDB in batches, so SRC can advance as fast as workers allow.

Core components

File Responsibility
queue.go Queue, Run, round advancement, seal handoff, Lease, ReportTaskResult, InitializeWithContext
queue_accessors.go Thread-safe getters/setters, keyset cursors, traversal “cache loaded” flags
queue_batch.go BuildExpectedMapsFromDstWithChildren, batch expected children for DST
mode_traversal.go PullTraversalTasks (DB keyset → tasks)
mode_retry.go PullRetryTasks; DST cleanup on SRC folder complete in retry mode
mode_copy.go PullCopyTasks, copy completion, CheckCopyCompletion
worker_traversal.go List children / compare → ReportTaskResult
worker_copy.go Folder/file copy → ReportTaskResult; optional dst list precheck when resuming partial copy (one anchor round only)
worker/interface.go Worker interface
task.go TaskBase, task types, ChildResult
seeding.go Root seeding helpers used with pkg/db
coordinator.go QueueCoordinator, CanDstStartRound, round tracking for SRC/DST
observer.go Polls stats / queues, writes queue_stats
queue_watchdog.go / progress_watchdog.go Timeouts / progress

Modes

  • QueueModeTraversal – Normal BFS; pull pending traversal tasks by depth.
  • QueueModeRetry – Pending/failed across depths; SRC folder success triggers DST child cleanup (see mode_retry.go).
  • QueueModeCopy / QueueModeCopyRetry – Copy phase and failed-only retry; pulls by copy status and node type.

Relationship with pkg/db

  • Reads: Keyset lists, joins (ListDstBatchWithSrcChildren), stats helpers for completion / observer.
  • Writes: SealLevel (bulk node append + per-depth stats snapshot), plus transactional writers for status events, review, deletes, etc. (RunWrite / Writer).

See pkg/db/README.md for schema (src_status_events, dst_status_events, node tables, stats, migrations, …).


Resumption

Resume uses the same DuckDB file: initializeQueues in pkg/migration/run.go restores rounds and cursors from DB state so pulls continue from the correct frontier.


Summary

  • DB-backed pulls into pendingBuff; seal persists levels/stats.
  • Coordinator enforces DST lag behind SRC (two-round lead); SRC has no coordinator throttle.
  • Modes: traversal, retry, copy, copy-retry.

Documentation

Index

Constants

View Source
const (
	TaskTypeSrcTraversal = "src-traversal"
	TaskTypeDstTraversal = "dst-traversal"
	TaskTypeUpload       = "upload"
	TaskTypeCopy         = "copy"
	TaskTypeCopyFolder   = "copy-folder" // Copy phase: create folder
	TaskTypeCopyFile     = "copy-file"   // Copy phase: copy file with streaming
)

Task types

Variables

This section is empty.

Functions

func BatchLoadExpectedChildrenByDSTIDs

func BatchLoadExpectedChildrenByDSTIDs(database *db.DB, dstParentIDs []string, dstIDToPath map[string]string) (
	expectedFoldersMap map[string][]types.Folder,
	expectedFilesMap map[string][]types.File,
	srcIDMap map[string]map[string]string,
	srcIDToMeta map[string]SrcNodeMeta,
	err error,
)

BatchLoadExpectedChildrenByDSTIDs loads SRC children for the given DST folder IDs by joining on parent_path = dst.path. Prefer ListDstBatchWithSrcChildren + BuildExpectedMapsFromDstWithChildren for keyset-based DST pull (no IN query). Returns expectedFoldersMap, expectedFilesMap (keyed by DST task ID), srcIDMap (Type+DisplayName -> SRC node ID per DST ID), and srcIDToMeta (SRC ID -> meta).

func BatchLoadRetryDstCleanup

func BatchLoadRetryDstCleanup(database *db.DB, srcFolderIDs []string) (map[string]*RetryDstCleanup, error)

BatchLoadRetryDstCleanup loads DST counterpart and children meta for each SRC folder ID (for retry mode DST cleanup).

func BuildExpectedMapsFromDstWithChildren

func BuildExpectedMapsFromDstWithChildren(dstBatch []db.FetchResult, childrenByDstID map[string][]*db.NodeState) (
	expectedFoldersMap map[string][]types.Folder,
	expectedFilesMap map[string][]types.File,
	srcIDMap map[string]map[string]string,
	srcIDToMeta map[string]SrcNodeMeta,
)

BuildExpectedMapsFromDstWithChildren builds expectedFoldersMap, expectedFilesMap, srcIDMap, and srcIDToMeta from a DST batch and its SRC children (e.g. from ListDstBatchWithSrcChildren). Keyed by DST node ID.

func SeedRootTask

func SeedRootTask(queueType string, rootFolder types.Folder, rootNodeID string, database *db.DB) error

SeedRootTask inserts the initial root folder task into DuckDB to kickstart traversal. For src: sets traversal_status='Pending' and copy_status='Successful' (root is not a copy task). For dst: sets traversal_status='Pending' Ensures stats bucket exists and is updated with root task counts. rootNodeID should be a deterministic ID generated by db.DeterministicNodeID().

func SeedRootTaskWithSrcID

func SeedRootTaskWithSrcID(queueType string, rootFolder types.Folder, rootNodeID string, srcID string, database *db.DB) error

SeedRootTaskWithSrcID inserts the initial root folder task into DuckDB with a pre-set SrcID. This is used for DST root nodes that need to be matched to SRC root nodes. rootNodeID should be a deterministic ID generated by db.DeterministicNodeID().

func SeedRootTasks

func SeedRootTasks(srcRoot types.Folder, dstRoot types.Folder, database *db.DB) error

SeedRootTasks is a convenience function to seed both src and dst root tasks at once. Writes root tasks to DuckDB. Generates deterministic IDs for both root nodes and matches DST root to SRC root.

Types

type ChildResult

type ChildResult struct {
	Folder        types.Folder // Folder info (if folder)
	File          types.File   // File info (if file)
	Status        string       // "pending", "successful", "missing", "not_on_src"
	IsFile        bool         // true if this is a file, false if folder
	SrcID         string       // ULID of corresponding SRC node (for DST nodes only, set during matching)
	SrcCopyStatus string       // Copy status to update on SRC node (if SrcID is set and match found): "pending" or "successful", empty if no update needed
}

ChildResult represents a discovered child node with its traversal status.

type CompletionCheckOptions

type CompletionCheckOptions struct {
	CheckRoundComplete     bool // Check if current round is complete
	CheckFinalCompletion   bool // Check if traversal/copy is complete (mode-specific logic in CheckTraversalCompletion / CheckCopyCompletion)
	AdvanceRoundIfComplete bool // Advance to next round if current round is complete
}

CompletionCheckOptions configures what actions to take during completion checks.

type CopyTask

type CopyTask struct {
	TaskBase
	SrcId  string
	DstId  string
	DstCtx types.ServiceContext
}

CopyTask represents a generic copy operation.

type CopyWorker

type CopyWorker struct {
	// contains filtered or unexported fields
}

CopyWorker executes copy tasks by creating folders or streaming files from source to destination. Each worker runs independently in its own goroutine, continuously polling the queue for work.

func NewCopyWorker

func NewCopyWorker(
	id string,
	queue *Queue,
	srcAdapter types.FSAdapter,
	dstAdapter types.FSAdapter,
	shutdownCtx context.Context,
) *CopyWorker

NewCopyWorker creates a worker that executes copy tasks. shutdownCtx is optional - if provided, the worker will check for cancellation and exit on shutdown.

func (*CopyWorker) Run

func (w *CopyWorker) Run()

Run is the main worker loop. It continuously polls the queue for tasks. When a task is found, it leases it, executes it, and reports the result. When no work is available or queue is paused, it briefly sleeps before polling again. When queue is exhausted, the worker exits.

type ExternalQueueMetrics

type ExternalQueueMetrics struct {
	// Monotonic counters (traversal phase)
	FilesDiscoveredTotal   int64 `json:"files_discovered_total"`
	FoldersDiscoveredTotal int64 `json:"folders_discovered_total"`

	// EMA-smoothed rates (2-5 second window) - traversal phase
	DiscoveryRateItemsPerSec float64 `json:"discovery_rate_items_per_sec"`

	// Verification counts (for O(1) stats bucket lookups)
	TotalDiscovered int64 `json:"total_discovered"` // files + folders
	TotalPending    int   `json:"total_pending"`    // pending across all rounds (from DB)
	TotalFailed     int   `json:"total_failed"`     // failed across all rounds

	// Copy phase metrics (monotonic counters)
	Folders int64 `json:"folders"` // Total folders created
	Files   int64 `json:"files"`   // Total files created
	Total   int64 `json:"total"`   // Total items (folders + files)
	Bytes   int64 `json:"bytes"`   // Total bytes transferred

	// Copy phase rates (EMA-smoothed)
	ItemsPerSecond float64 `json:"items_per_second"` // Items/sec (folders + files)
	BytesPerSecond float64 `json:"bytes_per_second"` // Bytes/sec

	// Current state (for API)
	QueueStats
	Round int `json:"round"`
}

ExternalQueueMetrics contains user-facing metrics published to DuckDB for API access.

type InternalQueueMetrics

type InternalQueueMetrics struct {
	// State-based time tracking (additive counters)
	TimeProcessing          time.Duration
	TimeWaitingOnQueue      time.Duration
	TimeWaitingOnFS         time.Duration
	TimeRateLimited         time.Duration
	TimePausedRoundBoundary time.Duration
	TimeIdleNoWork          time.Duration

	// Capacity metrics
	TasksCompletedWhileActive int64
	ActiveProcessingTime      time.Duration

	// Utilization metrics
	WallClockTime       time.Duration
	LastState           QueueState
	LastStateChangeTime time.Time
}

InternalQueueMetrics contains control system metrics stored in memory for autoscaling decisions.

type ProgressWatchdog added in v0.1.41

type ProgressWatchdog struct {
	// contains filtered or unexported fields
}

ProgressWatchdog detects stalled tasks by requiring progress (Beat) within a timeout. If no progress occurs, it cancels the context so adapter operations abort.

func NewProgressWatchdog added in v0.1.41

func NewProgressWatchdog(parent context.Context, timeout time.Duration) (*ProgressWatchdog, context.Context)

NewProgressWatchdog creates a watchdog and a child context. When no Beat() occurs within timeout, the context is cancelled. Call Stop() when the task ends to release resources.

func (*ProgressWatchdog) Beat added in v0.1.41

func (w *ProgressWatchdog) Beat()

Beat records progress. Call whenever meaningful work completes.

func (*ProgressWatchdog) Stop added in v0.1.41

func (w *ProgressWatchdog) Stop()

Stop cancels the context and stops the monitor goroutine.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue maintains round-based task queues for BFS traversal coordination. It handles task leasing, retry logic, and cross-queue task propagation. All operational state lives in DuckDB, flushed via per-queue buffers.

func NewQueue

func NewQueue(name string, maxRetries int, workerCount int, coordinator *QueueCoordinator) *Queue

NewQueue creates a new Queue instance.

func (*Queue) Add

func (q *Queue) Add(task *TaskBase) bool

Add enqueues a task into the pending buffer. Returns false if task is nil, has empty ID, or is already in progress.

func (*Queue) AddWorker

func (q *Queue) AddWorker(worker Worker)

AddWorker registers a worker with this queue for reference. Workers manage their own lifecycle - this is just for tracking/debugging.

func (*Queue) AdvanceCopyRound

func (q *Queue) AdvanceCopyRound()

AdvanceCopyRound handles copy-specific round advancement logic. Round completion is determined by lastPullWasPartial (memory/keyset only); we never query the DB for in-round advancement. When called, the current round has just completed - we always advance to currentRound+1.

func (*Queue) AdvanceTraversalRound

func (q *Queue) AdvanceTraversalRound()

AdvanceTraversalRound handles traversal/retry-specific round advancement logic. For traversal/retry modes, simply increments the round by 1.

func (*Queue) CheckCopyCompletion

func (q *Queue) CheckCopyCompletion(currentRound int) bool

CheckCopyCompletion checks if the copy phase should switch passes or complete. Only called when we're past maxKnownDepth - the pass has exhausted itself round-by-round. Trust the per-round logic; no re-checking of pending/inProgress/wasFirstPull.

func (*Queue) CheckTraversalCompletion

func (q *Queue) CheckTraversalCompletion(currentRound int) bool

CheckTraversalCompletion checks if traversal/retry phase should complete. DB-backed: no pending at depth (from GetPendingTraversalCountAtDepthFromLive), attempted pull, first pull returned 0.

func (*Queue) Clear

func (q *Queue) Clear()

Clear removes all tasks from DuckDB and resets in-progress tracking. Note: This is a destructive operation - use with caution.

func (*Queue) Close

func (q *Queue) Close()

Close stops the queue and cleans up resources: stats publishing loop, output buffer, and sets state to Stopped if not already Completed or Stopped.

func (*Queue) CompleteCopyTask

func (q *Queue) CompleteCopyTask(task *TaskBase, executionDelta time.Duration)

CompleteCopyTask handles successful completion of copy tasks. Updates copy status, creates DST node entry, and updates join-lookup mapping.

func (*Queue) CompleteTraversalTask

func (q *Queue) CompleteTraversalTask(task *TaskBase, executionDelta time.Duration)

CompleteTraversalTask handles successful completion of traversal/retry tasks. This includes child discovery, status updates, and buffer operations.

func (*Queue) EnsureRoundExpectedFromStats

func (q *Queue) EnsureRoundExpectedFromStats()

EnsureRoundExpectedFromStats sets Expected for the current round from the stats bucket (O(1) lookup). Call after SetRound (e.g. on init or resume) so Expected reflects actual pending count and survives restarts.

func (*Queue) FailCopyTask

func (q *Queue) FailCopyTask(task *TaskBase, executionDelta time.Duration)

FailCopyTask handles failure of copy tasks. Updates copy status to failed if max retries exceeded, or back to pending if retrying.

func (*Queue) FailTraversalTask

func (q *Queue) FailTraversalTask(task *TaskBase, executionDelta time.Duration)

FailTraversalTask handles failure of traversal/retry tasks. Retries up to maxRetries, then marks as failed.

func (*Queue) GetAverageExecutionTime

func (q *Queue) GetAverageExecutionTime() time.Duration

GetAverageExecutionTime returns the current average task execution time.

func (*Queue) GetBytesTransferredTotal

func (q *Queue) GetBytesTransferredTotal() int64

GetBytesTransferredTotal returns the total bytes transferred during copy phase.

func (*Queue) GetCopyPass

func (q *Queue) GetCopyPass() int

GetCopyPass returns the current copy pass.

func (*Queue) GetExecutionTimeBufferSize

func (q *Queue) GetExecutionTimeBufferSize() int

GetExecutionTimeBufferSize returns the current size of the execution time buffer.

func (*Queue) GetExecutionTimeDeltas

func (q *Queue) GetExecutionTimeDeltas() []time.Duration

GetExecutionTimeDeltas returns a copy of the execution time deltas buffer.

func (*Queue) GetFilesCreatedTotal

func (q *Queue) GetFilesCreatedTotal() int64

GetFilesCreatedTotal returns the total files created during copy phase.

func (*Queue) GetFilesDiscoveredTotal

func (q *Queue) GetFilesDiscoveredTotal() int64

func (*Queue) GetFoldersCreatedTotal

func (q *Queue) GetFoldersCreatedTotal() int64

GetFoldersCreatedTotal returns the total folders created during copy phase.

func (*Queue) GetFoldersDiscoveredTotal

func (q *Queue) GetFoldersDiscoveredTotal() int64

func (*Queue) GetLastPullWasPartial added in v0.1.41

func (q *Queue) GetLastPullWasPartial() bool

func (*Queue) GetMode

func (q *Queue) GetMode() QueueMode

GetMode returns the current queue mode.

func (*Queue) GetPendingCount

func (q *Queue) GetPendingCount() int

GetPendingCount returns the number of tasks in the pending buffer.

func (*Queue) GetRound

func (q *Queue) GetRound() int

GetRound returns the current BFS round.

func (*Queue) GetRoundStats

func (q *Queue) GetRoundStats(round int) *RoundStats

GetRoundStats returns the statistics for a specific round. Returns nil if the round has no stats yet.

func (*Queue) GetTotalCompleted

func (q *Queue) GetTotalCompleted() int

GetTotalCompleted returns the total number of completed tasks across all rounds.

func (*Queue) GetTotalDiscovered

func (q *Queue) GetTotalDiscovered() int64

GetTotalDiscovered returns the total number of items discovered (files + folders).

func (*Queue) GetTotalFailed

func (q *Queue) GetTotalFailed() int

GetTotalFailed returns the total number of failed tasks across all rounds.

func (*Queue) GetWorkerCount added in v0.1.41

func (q *Queue) GetWorkerCount() int

GetWorkerCount returns the number of workers registered with this queue.

func (*Queue) InProgressCount

func (q *Queue) InProgressCount() int

InProgressCount returns the number of tasks currently being executed.

func (*Queue) InitializeCopyWithContext

func (q *Queue) InitializeCopyWithContext(database *db.DB, srcAdapter, dstAdapter types.FSAdapter, shutdownCtx context.Context)

InitializeCopyWithContext sets up a copy queue with both source and destination adapters. This is specifically for copy mode which requires both adapters.

func (*Queue) InitializeWithContext

func (q *Queue) InitializeWithContext(database *db.DB, adapter types.FSAdapter, shutdownCtx context.Context)

InitializeWithContext sets up the queue with DuckDB, context, and filesystem adapter references. Creates and starts workers immediately - they'll poll for tasks autonomously. shutdownCtx is optional - if provided, workers will check for cancellation and exit on shutdown. For copy mode, InitializeCopyWithContext should be used instead to provide both adapters.

func (*Queue) IsExhausted

func (q *Queue) IsExhausted() bool

IsExhausted returns true if the queue has finished all traversal or has been stopped.

func (*Queue) IsPaused

func (q *Queue) IsPaused() bool

IsPaused returns true if the queue is paused.

func (*Queue) Lease

func (q *Queue) Lease() *TaskBase

Lease attempts to lease a task for execution atomically. Returns nil if no tasks are available, queue is paused, or completed.

func (*Queue) Name

func (q *Queue) Name() string

Name returns the queue's name.

func (*Queue) Pause

func (q *Queue) Pause()

Pause pauses the queue (workers will not lease new tasks).

func (*Queue) PullCopyTasks

func (q *Queue) PullCopyTasks(force bool)

PullCopyTasks pulls copy tasks from DuckDB for the current round. Pulls from SRC copy status buckets, filters by pass (folders vs files), and skips round 0. Uses getter/setter methods - no direct mutex access.

func (*Queue) PullRetryTasks

func (q *Queue) PullRetryTasks(force bool)

PullRetryTasks pulls retry tasks from failed/pending status buckets. Checks maxKnownDepth and scans all known levels up to maxKnownDepth, then uses normal traversal logic for deeper levels. Uses getter/setter methods - no direct mutex access.

func (*Queue) PullTasksIfNeeded

func (q *Queue) PullTasksIfNeeded(force bool)

func (*Queue) PullTraversalTasks

func (q *Queue) PullTraversalTasks(force bool)

PullTraversalTasks refills the queue from DuckDB for the current round (ID-offset pagination, ~10K batch). SRC: ListNodesByDepthKeyset; DST: ListDstBatchWithSrcChildren (join for expected children). Pushed directly to queue.

func (*Queue) ReportTaskResult

func (q *Queue) ReportTaskResult(task *TaskBase, result TaskExecutionResult)

ReportTaskResult reports the result of a task execution and handles post-processing. This is the event-driven entry point that replaces separate Complete()/Fail() calls. After processing the result, it checks if we need to pull more tasks or advance rounds.

func (*Queue) Resume

func (q *Queue) Resume()

Resume resumes the queue after a pause.

func (*Queue) Run

func (q *Queue) Run()

Run is the main queue coordination loop. It has an outer loop for rounds and an inner loop for each round. The outer loop checks coordinator gates before starting each round (DST only). The inner loop processes tasks until the round is complete.

func (*Queue) SetCopyPass

func (q *Queue) SetCopyPass(pass int)

SetCopyPass sets the current copy pass.

func (*Queue) SetCopyResumeDstExistenceWindow added in v0.1.41

func (q *Queue) SetCopyResumeDstExistenceWindow(anchorPass, anchorRound int)

SetCopyResumeDstExistenceWindow enables the copy worker dst ListChildren precheck until the anchor pass+round is left (see AdvanceCopyRound). Only for normal copy mode (not copy-retry); call from RunCopyPhase when resuming a partially completed copy (Successful>0 and Pending>0 in status events).

func (*Queue) SetMaxKnownDepth

func (q *Queue) SetMaxKnownDepth(depth int)

SetMaxKnownDepth sets the maximum depth for traversal/copy. Set to -1 to auto-detect.

func (*Queue) SetMode

func (q *Queue) SetMode(mode QueueMode)

SetMode sets the queue mode (traversal, retry, or copy).

func (*Queue) SetObserver

func (q *Queue) SetObserver(observer *QueueObserver)

SetObserver registers this queue with an observer for DuckDB stats publishing. The observer will poll this queue directly for statistics.

func (*Queue) SetRound

func (q *Queue) SetRound(round int)

SetRound sets the queue's current round. Used for resume operations.

func (*Queue) SetShutdownContext

func (q *Queue) SetShutdownContext(ctx context.Context)

SetShutdownContext sets the shutdown context for the queue.

func (*Queue) SetState

func (q *Queue) SetState(state QueueState)

SetState sets the queue lifecycle state.

func (*Queue) SetStatsChannel

func (q *Queue) SetStatsChannel(ch chan QueueStats)

Shutdown gracefully shuts down the queue. No buffer to stop - all writes are direct/synchronous now. SetStatsChannel sets the channel for publishing queue statistics for UDP logging. The queue will periodically publish stats to this channel.

func (*Queue) SetTraversalCacheLoaded

func (q *Queue) SetTraversalCacheLoaded(loaded bool)

SetTraversalCacheLoaded sets whether we have completed the first pull for the current round. Until true, CheckTraversalCompletion returns false so the queue does not complete before the first pull.

func (*Queue) SetWorkers

func (q *Queue) SetWorkers(workers []Worker)

SetWorkers sets the workers associated with this queue.

func (*Queue) Shutdown

func (q *Queue) Shutdown()

Shutdown stops the stats publishing loop and cleans up resources.

func (*Queue) State

func (q *Queue) State() QueueState

State returns the current queue lifecycle state.

func (*Queue) Stats

func (q *Queue) Stats() QueueStats

Stats returns a snapshot of the queue's current state. Uses only in-memory counters - no database queries.

func (*Queue) TotalTracked

func (q *Queue) TotalTracked() int

TotalTracked returns the total number of tasks across all rounds (pending + in-progress).

type QueueCoordinator

type QueueCoordinator struct {
	// contains filtered or unexported fields
}

QueueCoordinator manages round advancement gates for dual-BFS traversal. It enforces: DST cannot advance to round N until SRC has completed rounds N and N+1 (or SRC traversal is done). SRC is not level-gated relative to DST; frontier is streamed to the database in chunks.

func NewQueueCoordinator

func NewQueueCoordinator() *QueueCoordinator

NewQueueCoordinator creates a new coordinator.

func (*QueueCoordinator) CanDstStartRound

func (c *QueueCoordinator) CanDstStartRound(targetRound int) bool

CanDstStartRound returns true if DST can start processing the specified round. DST can start round N if:

  • SRC has completed traversal entirely (DST can proceed freely), OR
  • SRC has completed rounds N and N+1 (SRC is at round N+2 or higher)

Note: Since DST can now freely advance to a round and then pause, the check is N+2 (if DST wants to start round 4, SRC needs to have completed rounds 4 and 5, so SRC >= 6). Once SRC is completed, DST can proceed at full speed with no restrictions.

func (*QueueCoordinator) GetRound

func (c *QueueCoordinator) GetRound(which string) int

GetRound returns the current round for SRC or DST.

func (*QueueCoordinator) IsCompleted

func (c *QueueCoordinator) IsCompleted(queueType string) bool

IsCompleted returns true if the specified queue ("src", "dst", or "both") has completed traversal.

func (*QueueCoordinator) MarkCompleted

func (c *QueueCoordinator) MarkCompleted(which string)

MarkCompleted marks SRC or DST as completed based on the argument ("src" or "dst").

func (*QueueCoordinator) UpdateRound

func (c *QueueCoordinator) UpdateRound(which string, round int)

UpdateRound updates the current round for SRC or DST.

func (*QueueCoordinator) WaitSealBackpressure

func (c *QueueCoordinator) WaitSealBackpressure(_ string, round int, database *db.DB) bool

WaitSealBackpressure ensures the seal buffer has flushed through the given round before the caller drops that level from node cache. Call after enqueueing the round's seal data and before dropping the level. Prevents the DB flush buffer from growing unbounded. Returns false if flushing fails; callers should fail closed (do not drop level or advance round).

type QueueMode

type QueueMode string

QueueMode represents the operation mode of a queue.

const (
	QueueModeTraversal QueueMode = "traversal"  // Normal BFS traversal
	QueueModeRetry     QueueMode = "retry"      // Retry failed tasks sweep
	QueueModeCopy      QueueMode = "copy"       // Copy phase (folders then files)
	QueueModeCopyRetry QueueMode = "copy-retry" // Copy retry: only copy_status = failed, max-depth guarded completion
)

type QueueObserver

type QueueObserver struct {
	// contains filtered or unexported fields
}

QueueObserver collects statistics from queues by polling them directly and publishes them to DuckDB periodically. Similar to QueueCoordinator, but focused on observability rather than coordination.

func NewQueueObserver

func NewQueueObserver(database *db.DB, updateInterval time.Duration) *QueueObserver

NewQueueObserver creates a new observer that will publish stats to DuckDB. updateInterval is how often stats are written to DuckDB (default: 200ms).

func (*QueueObserver) RegisterQueue

func (o *QueueObserver) RegisterQueue(queueName string, queue *Queue)

RegisterQueue registers a queue with the observer. The observer will poll this queue directly for statistics.

func (*QueueObserver) Start

func (o *QueueObserver) Start()

Start begins the observer loop that publishes stats to DuckDB. This is called automatically when the first queue is registered, but can be called manually.

func (*QueueObserver) Stop

func (o *QueueObserver) Stop()

Stop stops the observer loop and cleans up resources. This should only be called once. Calling it multiple times is safe but has no effect.

func (*QueueObserver) UnregisterQueue

func (o *QueueObserver) UnregisterQueue(queueName string)

UnregisterQueue removes a queue from the observer.

type QueueState

type QueueState string

QueueState represents the lifecycle state of a queue.

const (
	QueueStateRunning   QueueState = "running"   // Queue is active and processing
	QueueStatePaused    QueueState = "paused"    // Queue is paused
	QueueStateStopped   QueueState = "stopped"   // Queue is stopped
	QueueStateWaiting   QueueState = "waiting"   // Queue is waiting for coordinator to allow advancement (DST only)
	QueueStateCompleted QueueState = "completed" // Traversal complete (max depth reached)
)

type QueueStateSnapshot

type QueueStateSnapshot struct {
	State              QueueState
	Round              int
	PendingCount       int
	InProgressCount    int
	Pulling            bool
	LastPullWasPartial bool
	FirstPullForRound  bool
	PullLowWM          int
	Database           *db.DB
	Mode               QueueMode
}

getStateSnapshot returns a snapshot of queue state for use in logic functions

type QueueStats

type QueueStats struct {
	Name         string
	Round        int
	Pending      int
	InProgress   int
	TotalTracked int
	Workers      int
}

Stats returns current queue statistics.

type QueueWatchdog added in v0.1.41

type QueueWatchdog struct {
	// contains filtered or unexported fields
}

QueueWatchdog monitors queue progress and dumps state when stalled. A stall is detected when no tasks complete for the configured timeout while tasks remain in-progress or pending.

func NewQueueWatchdog added in v0.1.41

func NewQueueWatchdog(q *Queue, stallTimeout time.Duration) *QueueWatchdog

NewQueueWatchdog creates a watchdog for the given queue.

func (*QueueWatchdog) Beat added in v0.1.41

func (wd *QueueWatchdog) Beat()

Beat records progress (call when a task completes).

func (*QueueWatchdog) Start added in v0.1.41

func (wd *QueueWatchdog) Start()

Start begins monitoring in a background goroutine.

func (*QueueWatchdog) Stop added in v0.1.41

func (wd *QueueWatchdog) Stop()

Stop stops the watchdog monitoring.

type RetryDstChild

type RetryDstChild struct {
	ID              string
	Depth           int
	TraversalStatus string
}

RetryDstChild holds DST child node meta for retry DST cleanup; populated at pull to avoid per-child DB lookups.

type RetryDstCleanup

type RetryDstCleanup struct {
	DstID        string
	DstDepth     int
	DstOldStatus string
	Children     []RetryDstChild
}

RetryDstCleanup holds DST counterpart and its children meta for SRC folder tasks in retry mode; populated at pull.

type RoundInfo

type RoundInfo struct {
	Round           int       // Round number
	PullCount       int       // Number of pull operations (queries) performed this round
	ItemsYielded    int       // Pulled amount: total items actually returned from DB queries this round (like completed count but for pulls)
	ExpectedCount   int       // Expected items from DB (if known)
	TasksCompleted  int       // Successfully completed tasks
	TasksFailed     int       // Failed tasks
	StartTime       time.Time // When this round started
	LastPullTime    time.Time // Timestamp of last pull operation
	AvgTasksPerSec  float64   // Rolling average tasks/sec
	LastPartialPull bool      // Whether the last pull was partial (< batch size)
}

RoundInfo tracks statistics and metadata for a specific BFS round.

type RoundStats

type RoundStats struct {
	Expected  int // Expected tasks for this round (folder children inserted)
	Completed int // Tasks completed in this round (successful + failed)
	Failed    int // Tasks failed in this round
}

RoundStats tracks statistics for a specific round.

type SrcNodeMeta

type SrcNodeMeta struct {
	Depth           int
	TraversalStatus string
	CopyStatus      string
}

SrcNodeMeta holds Depth and CopyStatus for an SRC node; used by DST tasks at completion to avoid per-child DB lookups.

type TaskBase

type TaskBase struct {
	ID                  string                 // ULID for internal tracking (database keys)
	Type                string                 // Task type: "src-traversal", "dst-traversal", "upload", etc.
	Folder              types.Folder           // Folder to process (if applicable)
	File                types.File             // File to process (if applicable)
	Locked              bool                   // Whether this task is currently leased by a worker
	Attempts            int                    // Number of execution attempts
	Status              string                 // Execution result: "successful", "failed" (set by queue)
	WorkerResult        string                 // Worker execution result: "success", "error" (set by worker before ReportTaskResult)
	LastError           string                 // Error message from last execution attempt (set by worker, written by queue)
	ExpectedFolders     []types.Folder         // Expected folders (dst tasks only)
	ExpectedFiles       []types.File           // Expected files (dst tasks only)
	ExpectedSrcIDMap    map[string]string      // Map of Type+Name -> SRC node ID for matching (dst tasks only)
	ExpectedSrcNodeMeta map[string]SrcNodeMeta // SRC node Depth/CopyStatus keyed by SRC ID (dst tasks only, populated at pull)
	RetryDstCleanup     *RetryDstCleanup       // DST counterpart + children meta for SRC folder in retry mode (populated at pull)
	DiscoveredChildren  []ChildResult          // Children discovered during execution
	Round               int                    // The round this task belongs to (for buffer coordination)
	LeaseTime           time.Time              // Time when task was leased (for execution time tracking)
	CopyStatus          string                 // Current SRC copy status from DB (used to preserve copy_status on traversal completion events)
	// Copy phase specific fields
	CopyPass           int    // Copy pass number (1 for folders, 2 for files)
	SrcTraversalStatus string // SRC node traversal_status at pull time (preserved when writing copy_status events)
	BytesTransferred   int64  // Bytes transferred for file copy tasks
	DstParentID        string // Destination parent folder ID for creation
}

TaskBase represents the foundational structure for all task types. Workers lease tasks, mark them Locked, and attempt execution. Tasks are identified by ULID (ID) for internal tracking.

func (*TaskBase) Identifier

func (t *TaskBase) Identifier() string

Identifier returns the unique identifier for this task (absolute path).

func (*TaskBase) IsFile

func (t *TaskBase) IsFile() bool

IsFile returns whether this task represents a file operation.

func (*TaskBase) IsFolder

func (t *TaskBase) IsFolder() bool

IsFolder returns whether this task represents a folder traversal. Copy phase: use TaskType (copy-folder) so folder tasks are recognized even when ServiceID is empty; traversal/retry still use Folder.ServiceID when Type is not copy-folder/copy-file.

func (*TaskBase) LocationPath

func (t *TaskBase) LocationPath() string

LocationPath returns the logical, root-relative path for this task.

type TaskExecutionResult

type TaskExecutionResult string

TaskExecutionResult represents the result of a task execution.

const (
	TaskExecutionResultSuccessful TaskExecutionResult = "successful"
	TaskExecutionResultFailed     TaskExecutionResult = "failed"
)

type TraversalWorker

type TraversalWorker struct {
	// contains filtered or unexported fields
}

TraversalWorker executes traversal tasks by listing children and recording them to DuckDB. Each worker runs independently in its own goroutine, continuously polling the queue for work.

func NewTraversalWorker

func NewTraversalWorker(
	id string,
	queue *Queue,
	adapter types.FSAdapter,
	queueName string,
	shutdownCtx context.Context,
) *TraversalWorker

NewTraversalWorker creates a worker that executes traversal tasks. shutdownCtx is optional - if provided, the worker will check for cancellation and exit on shutdown.

func (*TraversalWorker) Run

func (w *TraversalWorker) Run()

Run is the main worker loop. It continuously polls the queue for tasks. When a task is found, it leases it, executes it, and reports the result. When no work is available or queue is paused, it briefly sleeps before polling again. When queue is exhausted, the worker exits.

type UploadTask

type UploadTask struct {
	TaskBase
	SrcId  string // Source file identifier
	DstId  string // Destination parent folder identifier
	DstCtx types.ServiceContext
}

UploadTask represents a task to upload a file from source to destination.

type Worker

type Worker interface {
	Run() // Main execution loop - polls queue and processes tasks
}

Worker represents a concurrent task executor. Each worker independently polls its queue for work, leases tasks, executes them, and reports results back to the queue and database.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL