preprocessor

package
v0.0.0-...-6dcb24f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Overview

Package preprocessor contains auto multiline detection and aggregation logic.

Package preprocessor contains auto multiline detection and aggregation logic.

Package preprocessor contains auto multiline detection and aggregation logic.

Package preprocessor contains auto multiline detection and aggregation logic.

Package preprocessor contains auto multiline detection and aggregation logic.

Package preprocessor contains auto multiline detection and aggregation logic.

Package preprocessor contains auto multiline detection and aggregation logic.

Package preprocessor contains auto multiline detection and aggregation logic.

Package preprocessor contains auto multiline detection and aggregation logic.

Package preprocessor contains auto multiline detection and aggregation logic.

Package preprocessor contains auto multiline detection and aggregation logic.

Package preprocessor provides tokenization functionality for log messages.

Package preprocessor contains the logic for tokenizing, aggregating, and sampling logs.

Package preprocessor contains auto multiline detection and aggregation logic.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsMatch

func IsMatch(seqA []Token, seqB []Token, thresh float64) bool

isMatch compares two sequences of tokens and returns true if they match within the given threshold. if the token strings are different lengths, the shortest string is used for comparison. This function is optimized to exit early if the match is impossible without having to compare all of the tokens.

func TokensToString

func TokensToString(tokens []Token) string

tokensToString converts a list of tokens to a debug string.

Types

type Aggregator

type Aggregator interface {
	// Process handles a log line and returns zero or more completed messages.
	// label is the result of labeling this message; aggregators that don't use it may ignore it.
	Process(msg *message.Message, label Label) []*message.Message

	// Flush returns any buffered messages and clears internal state.
	Flush() []*message.Message

	// IsEmpty returns true if the aggregator has no buffered data.
	IsEmpty() bool
}

Aggregator is the interface for log line combining strategies. An Aggregator receives individual log lines and may buffer them, returning zero or more completed log messages per input. Each call to Process may return nil (line was buffered) or one or more messages (a group was completed). The returned slice is only valid until the next call to Process or Flush.

func NewCombiningAggregator

func NewCombiningAggregator(maxContentSize int, tagTruncatedLogs bool, tagMultiLineLogs bool, tailerInfo *status.InfoRegistry) Aggregator

NewCombiningAggregator creates a new combining aggregator.

func NewDetectingAggregator

func NewDetectingAggregator(tailerInfo *status.InfoRegistry) Aggregator

NewDetectingAggregator creates a new detecting aggregator.

type DiagnosticRow

type DiagnosticRow struct {
	TokenString string
	LabelString string

	Count     int64
	LastIndex int64
	// contains filtered or unexported fields
}

DiagnosticRow is a struct that represents a diagnostic view of a row in the PatternTable.

type Heuristic

type Heuristic interface {
	// ProcessAndContinue processes a log message and annotates the context with a label. It returns false if the message should be done processing.
	// Heuristic implementations may mutate the message context but must do so synchronously.
	ProcessAndContinue(*messageContext) bool
}

Heuristic is an interface representing a strategy to label log messages.

type IncrementalJSONValidator

type IncrementalJSONValidator struct {
	// contains filtered or unexported fields
}

IncrementalJSONValidator is a JSON validator that processes JSON messages incrementally.

func NewIncrementalJSONValidator

func NewIncrementalJSONValidator() *IncrementalJSONValidator

NewIncrementalJSONValidator creates a new IncrementalJSONValidator.

func (*IncrementalJSONValidator) Reset

func (d *IncrementalJSONValidator) Reset()

Reset resets the IncrementalJSONValidator.

func (*IncrementalJSONValidator) Write

func (d *IncrementalJSONValidator) Write(s []byte) JSONState

Write writes a byte slice to the IncrementalJSONValidator.

type JSONAggregator

type JSONAggregator interface {
	// Process handles one incoming message. It may buffer the message and return
	// nothing (incomplete JSON), return it unchanged (non-JSON or already complete),
	// or return a compacted single-line version (multi-part JSON).
	Process(msg *message.Message) []*message.Message

	// Flush returns any messages still buffered inside the aggregator.
	Flush() []*message.Message

	// IsEmpty reports whether the aggregator has no buffered state.
	IsEmpty() bool
}

JSONAggregator is the interface for the JSON aggregation stage of the Preprocessor. Implementations receive raw log lines and return zero or more complete messages.

func NewJSONAggregator

func NewJSONAggregator(tagCompleteJSON bool, maxContentSize int) JSONAggregator

NewJSONAggregator creates a new JSONAggregator.

type JSONDetector

type JSONDetector struct{}

JSONDetector is a heuristic to detect JSON messages.

func NewJSONDetector

func NewJSONDetector() *JSONDetector

NewJSONDetector returns a new JSON detection heuristic.

func (*JSONDetector) ProcessAndContinue

func (j *JSONDetector) ProcessAndContinue(context *messageContext) bool

ProcessAndContinue checks if a message is a JSON message. This implements the Heuristic interface - so we should stop processing if we detect a JSON message by returning false.

type JSONState

type JSONState int

JSONState represents the state of the JSON validator.

const (
	// Incomplete indicates that the JSON validator is still processing the JSON message.
	Incomplete JSONState = iota
	// Complete indicates that the JSON validator has processed the entire JSON message and the JSON is valid.
	Complete
	// Invalid indicates that the JSON is invalid.
	Invalid
)

type Label

type Label uint32

Label is a label for a log message.

type Labeler

type Labeler interface {
	Label(content []byte, tokens []Token, tokenIndices []int) Label
}

Labeler classifies a log line as startGroup, noAggregate, or aggregate. Tokens and tokenIndices are pre-computed by the Preprocessor's Tokenizer step and forwarded here so that heuristics can inspect them without re-tokenizing.

func NewLabeler

func NewLabeler(lablerHeuristics []Heuristic, analyticsHeuristics []Heuristic) Labeler

NewLabeler creates a new Labeler with the given heuristics. lablerHeuristics are used to mutate the label of a log message. analyticsHeuristics are used to analyze the log message and labeling process for the status page and telemetry.

type MatchContext

type MatchContext struct {
	// contains filtered or unexported fields
}

MatchContext is the context of a match.

type NoopJSONAggregator

type NoopJSONAggregator struct {
	// contains filtered or unexported fields
}

NoopJSONAggregator is a pass-through JSONAggregator that never buffers messages. Use this for pipeline paths where JSON aggregation is not needed (e.g. pass-through, regex multiline, detecting mode).

func NewNoopJSONAggregator

func NewNoopJSONAggregator() *NoopJSONAggregator

NewNoopJSONAggregator returns a new NoopJSONAggregator.

func (*NoopJSONAggregator) Flush

func (n *NoopJSONAggregator) Flush() []*message.Message

Flush is a no-op since NoopJSONAggregator never buffers.

func (*NoopJSONAggregator) IsEmpty

func (n *NoopJSONAggregator) IsEmpty() bool

IsEmpty always returns true since NoopJSONAggregator never buffers.

func (*NoopJSONAggregator) Process

func (n *NoopJSONAggregator) Process(msg *message.Message) []*message.Message

Process passes the message through unchanged. The returned slice is reused on each call — callers must not retain it across calls.

type NoopLabeler

type NoopLabeler struct{}

NoopLabeler is a Labeler that always returns noAggregate without any processing. Use this for pipeline paths that don't need auto-multiline detection (e.g. pass-through, regex multiline).

func NewNoopLabeler

func NewNoopLabeler() *NoopLabeler

NewNoopLabeler returns a new NoopLabeler.

func (*NoopLabeler) Label

func (l *NoopLabeler) Label(_ []byte, _ []Token, _ []int) Label

Label always returns noAggregate without inspecting content or tokens.

type NoopSampler

type NoopSampler struct{}

NoopSampler passes all messages through without modification. It is the default implementation used until adaptive sampling logic is added.

func NewNoopSampler

func NewNoopSampler() *NoopSampler

NewNoopSampler returns a new NoopSampler.

func (*NoopSampler) Flush

func (s *NoopSampler) Flush() *message.Message

Flush is a no-op since NoopSampler has no buffered state.

func (*NoopSampler) Process

func (s *NoopSampler) Process(msg *message.Message) *message.Message

Process returns the message unchanged.

type PassThroughAggregator

type PassThroughAggregator struct {
	// contains filtered or unexported fields
}

PassThroughAggregator is a stateless Aggregator that emits each message immediately after applying truncation handling. It is the equivalent of the decoder's SingleLineHandler.

func NewPassThroughAggregator

func NewPassThroughAggregator(lineLimit int) *PassThroughAggregator

NewPassThroughAggregator returns a new PassThroughAggregator with the given line size limit.

func (*PassThroughAggregator) Flush

func (a *PassThroughAggregator) Flush() []*message.Message

Flush returns nil since PassThroughAggregator has no buffered state.

func (*PassThroughAggregator) IsEmpty

func (a *PassThroughAggregator) IsEmpty() bool

IsEmpty always returns true since PassThroughAggregator is stateless.

func (*PassThroughAggregator) Process

func (a *PassThroughAggregator) Process(msg *message.Message, _ Label) []*message.Message

Process handles a log line, applying truncation flags if the content exceeds lineLimit, and returns it as a single-element slice. label is unused.

type PatternTable

type PatternTable struct {
	// contains filtered or unexported fields
}

PatternTable is a table of patterns that occur over time from a log source. The pattern table is always sorted by the frequency of the patterns. When the table becomes full, the least recently updated pattern is evicted.

func NewPatternTable

func NewPatternTable(maxTableSize int, matchThreshold float64, tailerInfo *status.InfoRegistry) *PatternTable

NewPatternTable returns a new PatternTable heuristic.

func (*PatternTable) DumpTable

func (p *PatternTable) DumpTable() []DiagnosticRow

DumpTable returns a slice of DiagnosticRow structs that represent the current state of the table.

func (*PatternTable) Info

func (p *PatternTable) Info() []string

Info returns a breakdown of the patterns in the table.

func (*PatternTable) InfoKey

func (p *PatternTable) InfoKey() string

InfoKey returns a string representing the key for the pattern table.

func (*PatternTable) ProcessAndContinue

func (p *PatternTable) ProcessAndContinue(context *messageContext) bool

ProcessAndContinue adds a pattern to the table and updates its label based on it's frequency. This implements the Heuristic interface - so we should stop processing if the label was changed due to pattern detection.

type Preprocessor

type Preprocessor struct {
	// contains filtered or unexported fields
}

Preprocessor owns all preprocessor stages and wires them in the correct order: JSON aggregation → tokenization → labeling → aggregation → sampling → outputChan

func NewPreprocessor

func NewPreprocessor(aggregator Aggregator, tokenizer *Tokenizer, labeler Labeler, sampler Sampler,
	outputChan chan *message.Message, jsonAggregator JSONAggregator, flushTimeout time.Duration) *Preprocessor

NewPreprocessor creates a new Preprocessor. Use NoopJSONAggregator for paths that don't aggregate JSON. Use NoopLabeler for paths that don't use auto multiline detection (regex, pass-through).

func (*Preprocessor) Flush

func (p *Preprocessor) Flush()

Flush flushes all preprocessor stages in order.

func (*Preprocessor) FlushChan

func (p *Preprocessor) FlushChan() <-chan time.Time

FlushChan returns a channel that signals when a flush should occur.

func (*Preprocessor) Process

func (p *Preprocessor) Process(msg *message.Message)

Process processes a message through all preprocessor stages in order. Step 1: Aggregate JSON logs

type RegexAggregator

type RegexAggregator struct {
	// contains filtered or unexported fields
}

RegexAggregator aggregates log lines into multiline messages using a regular expression to identify the start of a new log entry. It is the equivalent of the decoder's MultiLineHandler. The flush timer is managed externally by the Preprocessor.

func NewRegexAggregator

func NewRegexAggregator(newContentRe *regexp.Regexp, lineLimit int, telemetryEnabled bool, tailerInfo *status.InfoRegistry, multiLineTagValue string) *RegexAggregator

NewRegexAggregator returns a new RegexAggregator.

func (*RegexAggregator) CountInfo

func (a *RegexAggregator) CountInfo() *status.CountInfo

CountInfo returns the counter tracking multiline pattern matches. Used by the decoder to sync shared counters across multiple tailers for the same source.

func (*RegexAggregator) Flush

func (a *RegexAggregator) Flush() []*message.Message

Flush returns any buffered content as a completed message and resets state.

func (*RegexAggregator) IsEmpty

func (a *RegexAggregator) IsEmpty() bool

IsEmpty returns true if the aggregator has no buffered data.

func (*RegexAggregator) LinesCombinedInfo

func (a *RegexAggregator) LinesCombinedInfo() *status.CountInfo

LinesCombinedInfo returns the counter tracking lines combined into multiline messages. Used by the decoder to sync shared counters across multiple tailers for the same source.

func (*RegexAggregator) Process

func (a *RegexAggregator) Process(msg *message.Message, _ Label) []*message.Message

Process aggregates log lines using the regex to detect new log entry boundaries. Returns any completed messages (may be empty if the current line is buffered). label is unused.

func (*RegexAggregator) SetCountInfo

func (a *RegexAggregator) SetCountInfo(info *status.CountInfo)

SetCountInfo replaces the multiline match counter (used by decoder.syncSourceInfo).

func (*RegexAggregator) SetLinesCombinedInfo

func (a *RegexAggregator) SetLinesCombinedInfo(info *status.CountInfo)

SetLinesCombinedInfo replaces the lines-combined counter (used by decoder.syncSourceInfo).

type Sampler

type Sampler interface {
	// Process handles a completed log message and returns it, or nil to drop it.
	Process(msg *message.Message) *message.Message

	// Flush flushes any buffered state and returns a pending message, or nil if empty.
	Flush() *message.Message
}

Sampler is the final stage of the Preprocessor. It receives one completed log message and returns it unchanged or nil if the message should be dropped.

type TimestampDetector

type TimestampDetector struct {
	// contains filtered or unexported fields
}

TimestampDetector is a heuristic to detect timestamps.

func NewTimestampDetector

func NewTimestampDetector(matchThreshold float64) *TimestampDetector

NewTimestampDetector returns a new Timestamp detection heuristic.

func (*TimestampDetector) ProcessAndContinue

func (t *TimestampDetector) ProcessAndContinue(context *messageContext) bool

ProcessAndContinue checks if a message is likely to be a timestamp.

type Token

type Token byte

Token is the type that represents a single token.

const (
	Space Token = iota

	// Special Characters
	Colon        // :
	Semicolon    // ;
	Dash         // -
	Underscore   // _
	Fslash       // /
	Bslash       // \
	Period       // .
	Comma        // ,
	Singlequote  // '
	Doublequote  // "
	Backtick     // `
	Tilda        // ~
	Star         // *
	Plus         // +
	Equal        // =
	Parenopen    // (
	Parenclose   // )
	Braceopen    // {
	Braceclose   // }
	Bracketopen  // [
	Bracketclose // ]
	Ampersand    // &
	Exclamation  // !
	At           // @
	Pound        // #
	Dollar       // $
	Percent      // %
	Uparrow      // ^

	// Digit runs
	D1
	D2
	D3
	D4
	D5
	D6
	D7
	D8
	D9
	D10

	// Char runs
	C1
	C2
	C3
	C4
	C5
	C6
	C7
	C8
	C9
	C10

	// Special tokens
	Month
	Day
	Apm  // am or pm
	Zone // Represents a timezone
	T    // t (often `T`) denotes a time separator in many timestamp formats

	End // Not a valid token. Used to mark the end of the token list or as a terminator.
)

Disable linter since the token list is self explanatory, or documented where needed.

type TokenGraph

type TokenGraph struct {
	// contains filtered or unexported fields
}

TokenGraph is a directed cyclic graph of tokens that model the relationship between any two tokens. It is used to calculate the probability of an unknown sequence of tokens being represented by the graph.

func NewTokenGraph

func NewTokenGraph(minimumTokenLength int, inputData [][]Token) *TokenGraph

NewTokenGraph returns a new TokenGraph.

func (*TokenGraph) MatchProbability

func (m *TokenGraph) MatchProbability(ts []Token) MatchContext

MatchProbability returns the probability of a sequence of tokens being represented by the graph.

type Tokenizer

type Tokenizer struct {
	// contains filtered or unexported fields
}

Tokenizer is a heuristic to compute tokens from a log message. The tokenizer is used to convert a log message (string of bytes) into a list of tokens that represents the underlying structure of the log. The string of tokens is a compact slice of bytes that can be used to compare log messages structure. A tokenizer instance is not thread safe as buffers are reused to avoid allocations.

func NewTokenizer

func NewTokenizer(maxEvalBytes int) *Tokenizer

NewTokenizer returns a new Tokenizer detection heuristic.

func (*Tokenizer) Tokenize

func (t *Tokenizer) Tokenize(input []byte) ([]Token, []int)

Tokenize tokenizes the input bytes and returns tokens and their start indices. The caller is responsible for slicing the input to the desired length.

type UserSample

type UserSample struct {
	// contains filtered or unexported fields
}

UserSample represents a user-defined sample for auto multi-line detection.

type UserSamples

type UserSamples struct {
	// contains filtered or unexported fields
}

UserSamples is a heuristic that represents a collection of user-defined samples for auto multi-line aggreagtion.

func NewUserSamples

func NewUserSamples(cfgRdr model.Reader, sourceSamples []*config.AutoMultilineSample) *UserSamples

NewUserSamples creates a new UserSamples instance.

func (*UserSamples) ProcessAndContinue

func (j *UserSamples) ProcessAndContinue(context *messageContext) bool

ProcessAndContinue applies a user sample to a log message. If it matches, a label is assigned. This implements the Heuristic interface - so we should stop processing if we detect a user pattern by returning false.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL