Documentation
¶
Overview ¶
Package preprocessor contains auto multiline detection and aggregation logic.
Package preprocessor contains auto multiline detection and aggregation logic.
Package preprocessor contains auto multiline detection and aggregation logic.
Package preprocessor contains auto multiline detection and aggregation logic.
Package preprocessor contains auto multiline detection and aggregation logic.
Package preprocessor contains auto multiline detection and aggregation logic.
Package preprocessor contains auto multiline detection and aggregation logic.
Package preprocessor contains auto multiline detection and aggregation logic.
Package preprocessor contains auto multiline detection and aggregation logic.
Package preprocessor contains auto multiline detection and aggregation logic.
Package preprocessor contains auto multiline detection and aggregation logic.
Package preprocessor provides tokenization functionality for log messages.
Package preprocessor contains the logic for tokenizing, aggregating, and sampling logs.
Package preprocessor contains auto multiline detection and aggregation logic.
Index ¶
- func IsMatch(seqA []Token, seqB []Token, thresh float64) bool
- func TokensToString(tokens []Token) string
- type Aggregator
- type DiagnosticRow
- type Heuristic
- type IncrementalJSONValidator
- type JSONAggregator
- type JSONDetector
- type JSONState
- type Label
- type Labeler
- type MatchContext
- type NoopJSONAggregator
- type NoopLabeler
- type NoopSampler
- type PassThroughAggregator
- type PatternTable
- type Preprocessor
- type RegexAggregator
- func (a *RegexAggregator) CountInfo() *status.CountInfo
- func (a *RegexAggregator) Flush() []*message.Message
- func (a *RegexAggregator) IsEmpty() bool
- func (a *RegexAggregator) LinesCombinedInfo() *status.CountInfo
- func (a *RegexAggregator) Process(msg *message.Message, _ Label) []*message.Message
- func (a *RegexAggregator) SetCountInfo(info *status.CountInfo)
- func (a *RegexAggregator) SetLinesCombinedInfo(info *status.CountInfo)
- type Sampler
- type TimestampDetector
- type Token
- type TokenGraph
- type Tokenizer
- type UserSample
- type UserSamples
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsMatch ¶
isMatch compares two sequences of tokens and returns true if they match within the given threshold. if the token strings are different lengths, the shortest string is used for comparison. This function is optimized to exit early if the match is impossible without having to compare all of the tokens.
func TokensToString ¶
tokensToString converts a list of tokens to a debug string.
Types ¶
type Aggregator ¶
type Aggregator interface {
// Process handles a log line and returns zero or more completed messages.
// label is the result of labeling this message; aggregators that don't use it may ignore it.
Process(msg *message.Message, label Label) []*message.Message
// Flush returns any buffered messages and clears internal state.
Flush() []*message.Message
// IsEmpty returns true if the aggregator has no buffered data.
IsEmpty() bool
}
Aggregator is the interface for log line combining strategies. An Aggregator receives individual log lines and may buffer them, returning zero or more completed log messages per input. Each call to Process may return nil (line was buffered) or one or more messages (a group was completed). The returned slice is only valid until the next call to Process or Flush.
func NewCombiningAggregator ¶
func NewCombiningAggregator(maxContentSize int, tagTruncatedLogs bool, tagMultiLineLogs bool, tailerInfo *status.InfoRegistry) Aggregator
NewCombiningAggregator creates a new combining aggregator.
func NewDetectingAggregator ¶
func NewDetectingAggregator(tailerInfo *status.InfoRegistry) Aggregator
NewDetectingAggregator creates a new detecting aggregator.
type DiagnosticRow ¶
type DiagnosticRow struct {
TokenString string
LabelString string
Count int64
LastIndex int64
// contains filtered or unexported fields
}
DiagnosticRow is a struct that represents a diagnostic view of a row in the PatternTable.
type Heuristic ¶
type Heuristic interface {
// ProcessAndContinue processes a log message and annotates the context with a label. It returns false if the message should be done processing.
// Heuristic implementations may mutate the message context but must do so synchronously.
ProcessAndContinue(*messageContext) bool
}
Heuristic is an interface representing a strategy to label log messages.
type IncrementalJSONValidator ¶
type IncrementalJSONValidator struct {
// contains filtered or unexported fields
}
IncrementalJSONValidator is a JSON validator that processes JSON messages incrementally.
func NewIncrementalJSONValidator ¶
func NewIncrementalJSONValidator() *IncrementalJSONValidator
NewIncrementalJSONValidator creates a new IncrementalJSONValidator.
func (*IncrementalJSONValidator) Reset ¶
func (d *IncrementalJSONValidator) Reset()
Reset resets the IncrementalJSONValidator.
func (*IncrementalJSONValidator) Write ¶
func (d *IncrementalJSONValidator) Write(s []byte) JSONState
Write writes a byte slice to the IncrementalJSONValidator.
type JSONAggregator ¶
type JSONAggregator interface {
// Process handles one incoming message. It may buffer the message and return
// nothing (incomplete JSON), return it unchanged (non-JSON or already complete),
// or return a compacted single-line version (multi-part JSON).
Process(msg *message.Message) []*message.Message
// Flush returns any messages still buffered inside the aggregator.
Flush() []*message.Message
// IsEmpty reports whether the aggregator has no buffered state.
IsEmpty() bool
}
JSONAggregator is the interface for the JSON aggregation stage of the Preprocessor. Implementations receive raw log lines and return zero or more complete messages.
func NewJSONAggregator ¶
func NewJSONAggregator(tagCompleteJSON bool, maxContentSize int) JSONAggregator
NewJSONAggregator creates a new JSONAggregator.
type JSONDetector ¶
type JSONDetector struct{}
JSONDetector is a heuristic to detect JSON messages.
func NewJSONDetector ¶
func NewJSONDetector() *JSONDetector
NewJSONDetector returns a new JSON detection heuristic.
func (*JSONDetector) ProcessAndContinue ¶
func (j *JSONDetector) ProcessAndContinue(context *messageContext) bool
ProcessAndContinue checks if a message is a JSON message. This implements the Heuristic interface - so we should stop processing if we detect a JSON message by returning false.
type Labeler ¶
Labeler classifies a log line as startGroup, noAggregate, or aggregate. Tokens and tokenIndices are pre-computed by the Preprocessor's Tokenizer step and forwarded here so that heuristics can inspect them without re-tokenizing.
func NewLabeler ¶
NewLabeler creates a new Labeler with the given heuristics. lablerHeuristics are used to mutate the label of a log message. analyticsHeuristics are used to analyze the log message and labeling process for the status page and telemetry.
type MatchContext ¶
type MatchContext struct {
// contains filtered or unexported fields
}
MatchContext is the context of a match.
type NoopJSONAggregator ¶
type NoopJSONAggregator struct {
// contains filtered or unexported fields
}
NoopJSONAggregator is a pass-through JSONAggregator that never buffers messages. Use this for pipeline paths where JSON aggregation is not needed (e.g. pass-through, regex multiline, detecting mode).
func NewNoopJSONAggregator ¶
func NewNoopJSONAggregator() *NoopJSONAggregator
NewNoopJSONAggregator returns a new NoopJSONAggregator.
func (*NoopJSONAggregator) Flush ¶
func (n *NoopJSONAggregator) Flush() []*message.Message
Flush is a no-op since NoopJSONAggregator never buffers.
func (*NoopJSONAggregator) IsEmpty ¶
func (n *NoopJSONAggregator) IsEmpty() bool
IsEmpty always returns true since NoopJSONAggregator never buffers.
type NoopLabeler ¶
type NoopLabeler struct{}
NoopLabeler is a Labeler that always returns noAggregate without any processing. Use this for pipeline paths that don't need auto-multiline detection (e.g. pass-through, regex multiline).
type NoopSampler ¶
type NoopSampler struct{}
NoopSampler passes all messages through without modification. It is the default implementation used until adaptive sampling logic is added.
func (*NoopSampler) Flush ¶
func (s *NoopSampler) Flush() *message.Message
Flush is a no-op since NoopSampler has no buffered state.
type PassThroughAggregator ¶
type PassThroughAggregator struct {
// contains filtered or unexported fields
}
PassThroughAggregator is a stateless Aggregator that emits each message immediately after applying truncation handling. It is the equivalent of the decoder's SingleLineHandler.
func NewPassThroughAggregator ¶
func NewPassThroughAggregator(lineLimit int) *PassThroughAggregator
NewPassThroughAggregator returns a new PassThroughAggregator with the given line size limit.
func (*PassThroughAggregator) Flush ¶
func (a *PassThroughAggregator) Flush() []*message.Message
Flush returns nil since PassThroughAggregator has no buffered state.
func (*PassThroughAggregator) IsEmpty ¶
func (a *PassThroughAggregator) IsEmpty() bool
IsEmpty always returns true since PassThroughAggregator is stateless.
type PatternTable ¶
type PatternTable struct {
// contains filtered or unexported fields
}
PatternTable is a table of patterns that occur over time from a log source. The pattern table is always sorted by the frequency of the patterns. When the table becomes full, the least recently updated pattern is evicted.
func NewPatternTable ¶
func NewPatternTable(maxTableSize int, matchThreshold float64, tailerInfo *status.InfoRegistry) *PatternTable
NewPatternTable returns a new PatternTable heuristic.
func (*PatternTable) DumpTable ¶
func (p *PatternTable) DumpTable() []DiagnosticRow
DumpTable returns a slice of DiagnosticRow structs that represent the current state of the table.
func (*PatternTable) Info ¶
func (p *PatternTable) Info() []string
Info returns a breakdown of the patterns in the table.
func (*PatternTable) InfoKey ¶
func (p *PatternTable) InfoKey() string
InfoKey returns a string representing the key for the pattern table.
func (*PatternTable) ProcessAndContinue ¶
func (p *PatternTable) ProcessAndContinue(context *messageContext) bool
ProcessAndContinue adds a pattern to the table and updates its label based on it's frequency. This implements the Heuristic interface - so we should stop processing if the label was changed due to pattern detection.
type Preprocessor ¶
type Preprocessor struct {
// contains filtered or unexported fields
}
Preprocessor owns all preprocessor stages and wires them in the correct order: JSON aggregation → tokenization → labeling → aggregation → sampling → outputChan
func NewPreprocessor ¶
func NewPreprocessor(aggregator Aggregator, tokenizer *Tokenizer, labeler Labeler, sampler Sampler, outputChan chan *message.Message, jsonAggregator JSONAggregator, flushTimeout time.Duration) *Preprocessor
NewPreprocessor creates a new Preprocessor. Use NoopJSONAggregator for paths that don't aggregate JSON. Use NoopLabeler for paths that don't use auto multiline detection (regex, pass-through).
func (*Preprocessor) Flush ¶
func (p *Preprocessor) Flush()
Flush flushes all preprocessor stages in order.
func (*Preprocessor) FlushChan ¶
func (p *Preprocessor) FlushChan() <-chan time.Time
FlushChan returns a channel that signals when a flush should occur.
func (*Preprocessor) Process ¶
func (p *Preprocessor) Process(msg *message.Message)
Process processes a message through all preprocessor stages in order. Step 1: Aggregate JSON logs
type RegexAggregator ¶
type RegexAggregator struct {
// contains filtered or unexported fields
}
RegexAggregator aggregates log lines into multiline messages using a regular expression to identify the start of a new log entry. It is the equivalent of the decoder's MultiLineHandler. The flush timer is managed externally by the Preprocessor.
func NewRegexAggregator ¶
func NewRegexAggregator(newContentRe *regexp.Regexp, lineLimit int, telemetryEnabled bool, tailerInfo *status.InfoRegistry, multiLineTagValue string) *RegexAggregator
NewRegexAggregator returns a new RegexAggregator.
func (*RegexAggregator) CountInfo ¶
func (a *RegexAggregator) CountInfo() *status.CountInfo
CountInfo returns the counter tracking multiline pattern matches. Used by the decoder to sync shared counters across multiple tailers for the same source.
func (*RegexAggregator) Flush ¶
func (a *RegexAggregator) Flush() []*message.Message
Flush returns any buffered content as a completed message and resets state.
func (*RegexAggregator) IsEmpty ¶
func (a *RegexAggregator) IsEmpty() bool
IsEmpty returns true if the aggregator has no buffered data.
func (*RegexAggregator) LinesCombinedInfo ¶
func (a *RegexAggregator) LinesCombinedInfo() *status.CountInfo
LinesCombinedInfo returns the counter tracking lines combined into multiline messages. Used by the decoder to sync shared counters across multiple tailers for the same source.
func (*RegexAggregator) Process ¶
Process aggregates log lines using the regex to detect new log entry boundaries. Returns any completed messages (may be empty if the current line is buffered). label is unused.
func (*RegexAggregator) SetCountInfo ¶
func (a *RegexAggregator) SetCountInfo(info *status.CountInfo)
SetCountInfo replaces the multiline match counter (used by decoder.syncSourceInfo).
func (*RegexAggregator) SetLinesCombinedInfo ¶
func (a *RegexAggregator) SetLinesCombinedInfo(info *status.CountInfo)
SetLinesCombinedInfo replaces the lines-combined counter (used by decoder.syncSourceInfo).
type Sampler ¶
type Sampler interface {
// Process handles a completed log message and returns it, or nil to drop it.
Process(msg *message.Message) *message.Message
// Flush flushes any buffered state and returns a pending message, or nil if empty.
Flush() *message.Message
}
Sampler is the final stage of the Preprocessor. It receives one completed log message and returns it unchanged or nil if the message should be dropped.
type TimestampDetector ¶
type TimestampDetector struct {
// contains filtered or unexported fields
}
TimestampDetector is a heuristic to detect timestamps.
func NewTimestampDetector ¶
func NewTimestampDetector(matchThreshold float64) *TimestampDetector
NewTimestampDetector returns a new Timestamp detection heuristic.
func (*TimestampDetector) ProcessAndContinue ¶
func (t *TimestampDetector) ProcessAndContinue(context *messageContext) bool
ProcessAndContinue checks if a message is likely to be a timestamp.
type Token ¶
type Token byte
Token is the type that represents a single token.
const ( Space Token = iota // Special Characters Colon // : Semicolon // ; Dash // - Underscore // _ Fslash // / Bslash // \ Period // . Comma // , Singlequote // ' Doublequote // " Backtick // ` Tilda // ~ Star // * Plus // + Equal // = Parenopen // ( Parenclose // ) Braceopen // { Braceclose // } Bracketopen // [ Bracketclose // ] Ampersand // & Exclamation // ! At // @ Pound // # Dollar // $ Percent // % Uparrow // ^ // Digit runs D1 D2 D3 D4 D5 D6 D7 D8 D9 D10 // Char runs C1 C2 C3 C4 C5 C6 C7 C8 C9 C10 // Special tokens Month Day Apm // am or pm Zone // Represents a timezone T // t (often `T`) denotes a time separator in many timestamp formats End // Not a valid token. Used to mark the end of the token list or as a terminator. )
Disable linter since the token list is self explanatory, or documented where needed.
type TokenGraph ¶
type TokenGraph struct {
// contains filtered or unexported fields
}
TokenGraph is a directed cyclic graph of tokens that model the relationship between any two tokens. It is used to calculate the probability of an unknown sequence of tokens being represented by the graph.
func NewTokenGraph ¶
func NewTokenGraph(minimumTokenLength int, inputData [][]Token) *TokenGraph
NewTokenGraph returns a new TokenGraph.
func (*TokenGraph) MatchProbability ¶
func (m *TokenGraph) MatchProbability(ts []Token) MatchContext
MatchProbability returns the probability of a sequence of tokens being represented by the graph.
type Tokenizer ¶
type Tokenizer struct {
// contains filtered or unexported fields
}
Tokenizer is a heuristic to compute tokens from a log message. The tokenizer is used to convert a log message (string of bytes) into a list of tokens that represents the underlying structure of the log. The string of tokens is a compact slice of bytes that can be used to compare log messages structure. A tokenizer instance is not thread safe as buffers are reused to avoid allocations.
func NewTokenizer ¶
NewTokenizer returns a new Tokenizer detection heuristic.
type UserSample ¶
type UserSample struct {
// contains filtered or unexported fields
}
UserSample represents a user-defined sample for auto multi-line detection.
type UserSamples ¶
type UserSamples struct {
// contains filtered or unexported fields
}
UserSamples is a heuristic that represents a collection of user-defined samples for auto multi-line aggreagtion.
func NewUserSamples ¶
func NewUserSamples(cfgRdr model.Reader, sourceSamples []*config.AutoMultilineSample) *UserSamples
NewUserSamples creates a new UserSamples instance.
func (*UserSamples) ProcessAndContinue ¶
func (j *UserSamples) ProcessAndContinue(context *messageContext) bool
ProcessAndContinue applies a user sample to a log message. If it matches, a label is assigned. This implements the Heuristic interface - so we should stop processing if we detect a user pattern by returning false.