strategy

package
v1.7.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package strategy provides built-in assignment strategy implementations.

Assignment strategies determine how partitions are distributed across workers. The package includes three built-in strategies:

  • WeightedConsistentHash: Weighted consistent hashing with extreme partition handling and soft load caps (recommended for weighted workloads)
  • ConsistentHash: Standard consistent hashing with virtual nodes (recommended for equal-weight partitions)
  • RoundRobin: Simple round-robin distribution

Strategy Selection Guide

WeightedConsistentHash:

  • Use when partitions have significantly different processing costs
  • Balances cache affinity with load distribution
  • Handles extreme partitions (2x+ average weight) via round-robin
  • Applies soft load caps to prevent worker overload
  • Configuration: virtual nodes, hash seed, overload threshold, extreme threshold

ConsistentHash:

  • Use when all partitions have equal or similar weights
  • Maximizes cache affinity during scaling events
  • Minimal configuration: virtual nodes, hash seed

RoundRobin:

  • Use for simple, stateless workloads
  • Guarantees even distribution
  • No cache affinity preservation

Custom strategies can be implemented by satisfying the types.AssignmentStrategy interface.

Index

Constants

This section is empty.

Variables

View Source
var ErrNoWorkers = errors.New("no workers available for assignment")

ErrNoWorkers indicates that no workers were provided for assignment.

Functions

This section is empty.

Types

type ConsistentHash

type ConsistentHash struct {
	// contains filtered or unexported fields
}

ConsistentHash implements consistent hashing with virtual nodes.

func NewConsistentHash

func NewConsistentHash(opts ...ConsistentHashOption) *ConsistentHash

NewConsistentHash creates a new consistent hash strategy.

The strategy uses a hash ring with virtual nodes to distribute partitions evenly across workers while minimizing partition movement during scaling. Achieves >80% cache affinity during rebalancing.

Parameters:

  • opts: Optional configuration (WithVirtualNodes, WithHashSeed)

Returns:

  • *ConsistentHash: Initialized consistent hash strategy

Example:

strategy := strategy.NewConsistentHash(
    strategy.WithVirtualNodes(300),
)
js, _ := jetstream.New(conn)
mgr, err := parti.NewManager(&cfg, js, src, strategy)
if err != nil { /* handle */ }

func (*ConsistentHash) Assign

func (ch *ConsistentHash) Assign(workers []string, partitions []types.Partition) (map[string][]types.Partition, error)

Assign calculates partition assignments using consistent hashing.

The algorithm:

  1. Build hash ring with virtual nodes for each worker
  2. Place each partition on ring based on hash of partition keys
  3. Assign partition to nearest clockwise virtual node
  4. Apply weight balancing if partition weights differ

Parameters:

  • workers: List of worker IDs (e.g., ["worker-0", "worker-1"])
  • partitions: List of partitions to assign

Returns:

  • map[string][]types.Partition: Map from workerID to assigned partitions
  • error: Assignment error (e.g., no workers available)

Example:

assignments, err := strategy.Assign(
    []string{"worker-0", "worker-1"},
    partitions,
)

type ConsistentHashOption

type ConsistentHashOption func(*ConsistentHash)

ConsistentHashOption configures a ConsistentHash strategy.

func WithHashSeed

func WithHashSeed(seed uint64) ConsistentHashOption

WithHashSeed sets a custom hash seed for consistent hashing.

Parameters:

  • seed: Hash seed value

Returns:

  • consistentHashOption: Configuration option

func WithVirtualNodes

func WithVirtualNodes(nodes int) ConsistentHashOption

WithVirtualNodes sets the number of virtual nodes per worker.

Higher values provide better distribution but increase memory usage. Recommended range: 100-300 (default: 150). Values below 1 are clamped to 1.

Parameters:

  • nodes: Number of virtual nodes per worker (minimum: 1)

Returns:

  • consistentHashOption: Configuration option

type RoundRobin

type RoundRobin struct{}

RoundRobin implements simple round-robin partition assignment.

func NewRoundRobin

func NewRoundRobin() *RoundRobin

NewRoundRobin creates a new round-robin strategy.

The strategy distributes partitions evenly across workers in a simple round-robin fashion. This provides predictable assignment but does not preserve cache affinity during scaling.

Returns:

  • *RoundRobin: Initialized round-robin strategy

Example:

strategy := strategy.NewRoundRobin()
js, _ := jetstream.New(conn)
mgr, err := parti.NewManager(&cfg, js, src, strategy)
if err != nil { /* handle */ }

func (*RoundRobin) Assign

func (rr *RoundRobin) Assign(workers []string, partitions []types.Partition) (map[string][]types.Partition, error)

Assign calculates partition assignments using round-robin distribution.

The algorithm:

  1. Sort workers and partitions for deterministic assignment
  2. Distribute partitions evenly in round-robin fashion

Parameters:

  • workers: List of worker IDs (e.g., ["worker-0", "worker-1"])
  • partitions: List of partitions to assign

Returns:

  • map[string][]types.Partition: Map from workerID to assigned partitions
  • error: Assignment error (e.g., no workers available)

Example:

assignments, err := strategy.Assign(
    []string{"worker-0", "worker-1"},
    partitions,
)

type WeightedConsistentHash

type WeightedConsistentHash struct {
	// contains filtered or unexported fields
}

WeightedConsistentHash implements weighted consistent hashing with extreme partition handling.

func NewWeightedConsistentHash

func NewWeightedConsistentHash(opts ...WeightedConsistentHashOption) *WeightedConsistentHash

NewWeightedConsistentHash creates a new weighted consistent hash strategy.

Parameters:

  • opts: Optional configuration (WithWeightedVirtualNodes, WithWeightedHashSeed, WithOverloadThreshold, WithExtremeThreshold, WithMinPartitionCount, WithDefaultWeight, WithWeightedLogger)

Returns:

  • *WeightedConsistentHash: Initialized weighted consistent hash strategy ready for use.

func (*WeightedConsistentHash) Assign

func (wch *WeightedConsistentHash) Assign(workers []string, partitions []types.Partition) (map[string][]types.Partition, error)

Assign calculates partition assignments using weighted consistent hashing with extreme partition handling.

The algorithm balances two competing goals:

  1. Cache affinity - Keep partitions on the same workers across rebalancing (via consistent hashing)
  2. Load balance - Prevent workers from being overloaded by heavy partitions

Algorithm Overview:

  1. Validation - Check for workers and normalize partition weights
  2. Equal-weight fast path - When all partitions have the same weight, use pure consistent hashing
  3. Two-phase weighted assignment: a. Extreme partitions - Distribute heavy partitions (weight > avgWeight * extremeThreshold) round-robin b. Normal partitions - Assign remaining partitions using consistent hashing with soft load cap

The soft load cap (avgWeight * overloadThreshold) allows some imbalance to preserve cache affinity, but reassigns partitions to the lightest worker when the cap is exceeded.

Parameters:

  • workers: List of worker IDs to assign partitions to
  • partitions: List of partitions to distribute across workers

Returns:

  • map[string][]types.Partition: Worker ID → assigned partitions
  • error: ErrNoWorkers if workers list is empty, nil otherwise

Example:

strategy := NewWeightedConsistentHash(
    WithOverloadThreshold(1.3),     // Allow 30% overload
    WithExtremeThreshold(2.0),      // Partitions 2x average are "extreme"
)
assignments, err := strategy.Assign(workers, partitions)
if err != nil {
    log.Fatal(err)
}

type WeightedConsistentHashOption

type WeightedConsistentHashOption func(*WeightedConsistentHash)

WeightedConsistentHashOption configures a WeightedConsistentHash strategy.

func WithDefaultWeight

func WithDefaultWeight(weight int64) WeightedConsistentHashOption

WithDefaultWeight sets the default weight applied when a partition reports zero weight.

func WithExtremeThreshold

func WithExtremeThreshold(threshold float64) WeightedConsistentHashOption

WithExtremeThreshold sets the multiplier used to classify extreme partitions.

func WithMinPartitionCount

func WithMinPartitionCount(factor float64) WeightedConsistentHashOption

WithMinPartitionCount sets the minimum percentage of average partition count that a worker must accept before load shedding occurs. factor: 0.0 to 1.0 (e.g., 0.3 means 30% of avg count)

func WithOverloadThreshold

func WithOverloadThreshold(threshold float64) WeightedConsistentHashOption

WithOverloadThreshold sets the maximum allowed load variance per worker.

func WithWeightedHashSeed

func WithWeightedHashSeed(seed uint64) WeightedConsistentHashOption

WithWeightedHashSeed sets a custom hash seed for consistent hashing.

func WithWeightedLogger

func WithWeightedLogger(logger types.Logger) WeightedConsistentHashOption

WithWeightedLogger sets the logger used for configuration warnings and debug diagnostics.

func WithWeightedVirtualNodes

func WithWeightedVirtualNodes(nodes int) WeightedConsistentHashOption

WithWeightedVirtualNodes sets the number of virtual nodes per worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL