compression

package
v0.0.0-...-621b1f7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 16, 2025 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AdaptiveCompressor

type AdaptiveCompressor struct {
	// contains filtered or unexported fields
}

AdaptiveCompressor manages adaptive compression level selection

func NewAdaptiveCompressor

func NewAdaptiveCompressor(config *AdaptiveConfig, logger *zap.Logger) *AdaptiveCompressor

NewAdaptiveCompressor creates a new adaptive compressor

func (*AdaptiveCompressor) GetStats

func (ac *AdaptiveCompressor) GetStats() map[string]interface{}

GetStats returns adaptive compression statistics

func (*AdaptiveCompressor) RecordCompressionResult

func (ac *AdaptiveCompressor) RecordCompressionResult(ratio float64, duration time.Duration)

RecordCompressionResult records metrics from a compression operation

func (*AdaptiveCompressor) SelectCompressionLevel

func (ac *AdaptiveCompressor) SelectCompressionLevel() zstd.EncoderLevel

SelectCompressionLevel selects optimal compression level based on current conditions

func (*AdaptiveCompressor) ShouldCompress

func (ac *AdaptiveCompressor) ShouldCompress(dataSize int) bool

ShouldCompress determines if data should be compressed based on characteristics

type AdaptiveConfig

type AdaptiveConfig struct {
	Enabled             bool          `json:"enabled" yaml:"enabled"`
	MinCompressionRatio float64       `json:"min_compression_ratio" yaml:"min_compression_ratio"`
	TargetRatio         float64       `json:"target_ratio" yaml:"target_ratio"`
	MaxCPUUtilization   float64       `json:"max_cpu_utilization" yaml:"max_cpu_utilization"`
	AdjustInterval      time.Duration `json:"adjust_interval" yaml:"adjust_interval"`
	RatioWindow         int           `json:"ratio_window" yaml:"ratio_window"`
}

AdaptiveConfig configuration for adaptive compression

func DefaultAdaptiveConfig

func DefaultAdaptiveConfig() *AdaptiveConfig

DefaultAdaptiveConfig returns sensible defaults

type AdaptiveDeltaComputer

type AdaptiveDeltaComputer struct {
	// contains filtered or unexported fields
}

AdaptiveDeltaComputer selects the best algorithm based on data characteristics

func NewAdaptiveDeltaComputer

func NewAdaptiveDeltaComputer() *AdaptiveDeltaComputer

func (*AdaptiveDeltaComputer) ApplyDelta

func (a *AdaptiveDeltaComputer) ApplyDelta(baseline, delta []byte) ([]byte, error)

func (*AdaptiveDeltaComputer) ComputeDelta

func (a *AdaptiveDeltaComputer) ComputeDelta(baseline, current []byte) ([]byte, error)

func (*AdaptiveDeltaComputer) Name

func (a *AdaptiveDeltaComputer) Name() string

type BSDiffDeltaComputer

type BSDiffDeltaComputer struct{}

BSDiffDeltaComputer implements bsdiff algorithm for binary diffs

func (*BSDiffDeltaComputer) ApplyDelta

func (b *BSDiffDeltaComputer) ApplyDelta(baseline, delta []byte) ([]byte, error)

func (*BSDiffDeltaComputer) ComputeDelta

func (b *BSDiffDeltaComputer) ComputeDelta(baseline, current []byte) ([]byte, error)

func (*BSDiffDeltaComputer) Name

func (b *BSDiffDeltaComputer) Name() string

type BaselineState

type BaselineState struct {
	Data       []byte
	Timestamp  time.Time
	DeltaCount int // Number of deltas computed from this baseline
}

BaselineState represents a baseline snapshot for delta computation

type BaselineSyncConfig

type BaselineSyncConfig struct {
	Enabled            bool          `json:"enabled" yaml:"enabled"`
	SyncInterval       time.Duration `json:"sync_interval" yaml:"sync_interval"`
	MaxStaleness       time.Duration `json:"max_staleness" yaml:"max_staleness"`
	EnableVersioning   bool          `json:"enable_versioning" yaml:"enable_versioning"`
	ConflictResolution string        `json:"conflict_resolution" yaml:"conflict_resolution"` // "lww", "newest", "manual"
}

BaselineSyncConfig configuration for baseline synchronization

func DefaultBaselineSyncConfig

func DefaultBaselineSyncConfig() *BaselineSyncConfig

DefaultBaselineSyncConfig returns sensible defaults

type BaselineSynchronizer

type BaselineSynchronizer struct {
	// contains filtered or unexported fields
}

BaselineSynchronizer manages baseline state synchronization across cluster nodes

func NewBaselineSynchronizer

func NewBaselineSynchronizer(config *BaselineSyncConfig, logger *zap.Logger) *BaselineSynchronizer

NewBaselineSynchronizer creates a new baseline synchronizer

func (*BaselineSynchronizer) CleanupDeletedVMs

func (bs *BaselineSynchronizer) CleanupDeletedVMs(activeVMIDs []string) int

CleanupDeletedVMs removes baselines for VMs that no longer exist

func (*BaselineSynchronizer) Close

func (bs *BaselineSynchronizer) Close() error

Close stops the sync scheduler and releases resources

func (*BaselineSynchronizer) DeleteBaseline

func (bs *BaselineSynchronizer) DeleteBaseline(key string)

DeleteBaseline removes a baseline

func (*BaselineSynchronizer) ExportBaselines

func (bs *BaselineSynchronizer) ExportBaselines() ([]byte, error)

ExportBaselines exports all baselines for backup

func (*BaselineSynchronizer) GetBaseline

func (bs *BaselineSynchronizer) GetBaseline(key string) (*BaselineState, bool)

GetBaseline retrieves a baseline locally

func (*BaselineSynchronizer) GetStats

func (bs *BaselineSynchronizer) GetStats() map[string]interface{}

GetStats returns synchronization statistics

func (*BaselineSynchronizer) ImportBaselines

func (bs *BaselineSynchronizer) ImportBaselines(data []byte) error

ImportBaselines imports baselines from backup

func (*BaselineSynchronizer) MigrateBaseline

func (bs *BaselineSynchronizer) MigrateBaseline(key string, oldVersion, newVersion int) error

MigrateBaseline migrates a baseline to a new version format

func (*BaselineSynchronizer) RegisterNode

func (bs *BaselineSynchronizer) RegisterNode(nodeID, address string) error

RegisterNode registers a remote node for baseline synchronization

func (*BaselineSynchronizer) ResolveConflict

func (bs *BaselineSynchronizer) ResolveConflict(local, remote *BaselineState) *BaselineState

ResolveConflict resolves baseline conflicts using configured strategy

func (*BaselineSynchronizer) SetBaseline

func (bs *BaselineSynchronizer) SetBaseline(key string, baseline *BaselineState)

SetBaseline stores a baseline locally

func (*BaselineSynchronizer) SyncWithCluster

func (bs *BaselineSynchronizer) SyncWithCluster(ctx context.Context) error

SyncWithCluster synchronizes all baselines with cluster nodes

func (*BaselineSynchronizer) UnregisterNode

func (bs *BaselineSynchronizer) UnregisterNode(nodeID string)

UnregisterNode removes a remote node

type BaselineVersion

type BaselineVersion struct {
	Version   int       `json:"version"`
	Data      []byte    `json:"data"`
	Timestamp time.Time `json:"timestamp"`
	NodeID    string    `json:"node_id"`
}

BaselineVersion represents a versioned baseline for conflict resolution

type CompressionMetrics

type CompressionMetrics struct {
	// Compression stats
	TotalOperations      uint64
	TotalBytesOriginal   uint64
	TotalBytesCompressed uint64
	TotalBytesDelta      uint64

	// Delta encoding stats
	DeltaHits         uint64
	DeltaMisses       uint64
	BaselineRefreshes uint64
	BaselineCount     int

	// Dictionary stats
	DictionaryHits       uint64
	DictionaryMisses     uint64
	DictionariesTrained  int
	LastDictionaryUpdate time.Time

	// Algorithm usage
	XORDeltaCount    uint64
	RSyncDeltaCount  uint64
	BSDiffDeltaCount uint64
	AutoSelectCount  uint64

	// Performance
	TotalCompressionTime   time.Duration
	TotalDecompressionTime time.Duration
	FastestCompression     time.Duration
	SlowestCompression     time.Duration

	// Adaptive compression
	LevelAdjustments    uint64
	IncompressibleSkips uint64
	// contains filtered or unexported fields
}

CompressionMetrics tracks detailed compression performance metrics

func NewCompressionMetrics

func NewCompressionMetrics() *CompressionMetrics

NewCompressionMetrics creates a new metrics tracker

func (*CompressionMetrics) GetAverageCompressionTime

func (m *CompressionMetrics) GetAverageCompressionTime() time.Duration

GetAverageCompressionTime returns average compression duration

func (*CompressionMetrics) GetAverageDecompressionTime

func (m *CompressionMetrics) GetAverageDecompressionTime() time.Duration

GetAverageDecompressionTime returns average decompression duration

func (*CompressionMetrics) GetCompressionRatio

func (m *CompressionMetrics) GetCompressionRatio() float64

GetCompressionRatio returns the overall compression ratio

func (*CompressionMetrics) GetDeltaHitRate

func (m *CompressionMetrics) GetDeltaHitRate() float64

GetDeltaHitRate returns the delta hit rate as a percentage

func (*CompressionMetrics) GetDictionaryEfficiency

func (m *CompressionMetrics) GetDictionaryEfficiency() float64

GetDictionaryEfficiency returns dictionary hit rate as a percentage

func (*CompressionMetrics) GetSnapshot

func (m *CompressionMetrics) GetSnapshot() map[string]interface{}

GetSnapshot returns a snapshot of current metrics

func (*CompressionMetrics) RecordBaselineRefresh

func (m *CompressionMetrics) RecordBaselineRefresh()

RecordBaselineRefresh records a baseline refresh

func (*CompressionMetrics) RecordCompression

func (m *CompressionMetrics) RecordCompression(originalSize, compressedSize int, duration time.Duration, isDelta bool, usedDict bool)

RecordCompression records a compression operation

func (*CompressionMetrics) RecordDecompression

func (m *CompressionMetrics) RecordDecompression(duration time.Duration)

RecordDecompression records a decompression operation

func (*CompressionMetrics) RecordDeltaAlgorithm

func (m *CompressionMetrics) RecordDeltaAlgorithm(algorithm string)

RecordDeltaAlgorithm records which delta algorithm was used

func (*CompressionMetrics) RecordDictionaryUpdate

func (m *CompressionMetrics) RecordDictionaryUpdate(count int)

RecordDictionaryUpdate records dictionary training

func (*CompressionMetrics) RecordIncompressibleSkip

func (m *CompressionMetrics) RecordIncompressibleSkip()

RecordIncompressibleSkip records when data was skipped due to poor compressibility

func (*CompressionMetrics) RecordLevelAdjustment

func (m *CompressionMetrics) RecordLevelAdjustment()

RecordLevelAdjustment records an adaptive compression level change

func (*CompressionMetrics) Reset

func (m *CompressionMetrics) Reset()

Reset resets all metrics to zero

func (*CompressionMetrics) UpdateBaselineCount

func (m *CompressionMetrics) UpdateBaselineCount(count int)

UpdateBaselineCount updates the current baseline count

type DeltaAlgorithm

type DeltaAlgorithm string

DeltaAlgorithm represents a delta encoding algorithm

const (
	DeltaAlgorithmXOR    DeltaAlgorithm = "xor"
	DeltaAlgorithmRSync  DeltaAlgorithm = "rsync"
	DeltaAlgorithmBSDiff DeltaAlgorithm = "bsdiff"
	DeltaAlgorithmAuto   DeltaAlgorithm = "auto"
)

type DeltaComputer

type DeltaComputer interface {
	ComputeDelta(baseline, current []byte) ([]byte, error)
	ApplyDelta(baseline, delta []byte) ([]byte, error)
	Name() string
}

DeltaComputer interface for delta computation algorithms

func DeltaAlgorithmFactory

func DeltaAlgorithmFactory(algorithm DeltaAlgorithm) DeltaComputer

DeltaAlgorithmFactory creates delta computers based on algorithm type

type DeltaEncoder

type DeltaEncoder struct {
	// contains filtered or unexported fields
}

DeltaEncoder implements hierarchical delta encoding with baseline state management Phase 1: Production-ready with dictionary training and advanced algorithms

func NewDeltaEncoder

func NewDeltaEncoder(config *DeltaEncodingConfig, logger *zap.Logger) (*DeltaEncoder, error)

NewDeltaEncoder creates a new delta encoder with Phase 1 features

func (*DeltaEncoder) Close

func (de *DeltaEncoder) Close() error

Close releases resources including Phase 1 components

func (*DeltaEncoder) Decode

func (de *DeltaEncoder) Decode(stateKey string, encoded *EncodedData) ([]byte, error)

Decode decompresses and applies delta reconstruction

func (*DeltaEncoder) Encode

func (de *DeltaEncoder) Encode(stateKey string, data []byte) (*EncodedData, error)

Encode applies delta encoding and compression to data with Phase 1 optimizations stateKey identifies the resource (e.g., "vm-123-memory")

func (*DeltaEncoder) GetDetailedMetrics

func (de *DeltaEncoder) GetDetailedMetrics() map[string]interface{}

GetDetailedMetrics returns comprehensive Phase 1 metrics

func (*DeltaEncoder) GetMetrics

func (de *DeltaEncoder) GetMetrics() map[string]interface{}

GetMetrics returns current encoding metrics

func (*DeltaEncoder) PruneOldBaselines

func (de *DeltaEncoder) PruneOldBaselines() int

PruneOldBaselines removes baselines that haven't been used recently

func (*DeltaEncoder) TrainDictionaries

func (de *DeltaEncoder) TrainDictionaries() error

TrainDictionaries manually triggers dictionary training

type DeltaEncodingConfig

type DeltaEncodingConfig struct {
	Enabled          bool          `json:"enabled" yaml:"enabled"`
	BaselineInterval time.Duration `json:"baseline_interval" yaml:"baseline_interval"`
	MaxBaselineAge   time.Duration `json:"max_baseline_age" yaml:"max_baseline_age"`
	MaxDeltaChain    int           `json:"max_delta_chain" yaml:"max_delta_chain"`
	CompressionLevel int           `json:"compression_level" yaml:"compression_level"` // Zstandard: 0-9
	EnableDictionary bool          `json:"enable_dictionary" yaml:"enable_dictionary"`

	// Phase 1: Advanced features
	DeltaAlgorithm      string  `json:"delta_algorithm" yaml:"delta_algorithm"`             // "xor", "rsync", "bsdiff", "auto"
	AdaptiveThreshold   float64 `json:"adaptive_threshold" yaml:"adaptive_threshold"`       // Auto-adjust compression if ratio < threshold
	MinCompressionRatio float64 `json:"min_compression_ratio" yaml:"min_compression_ratio"` // Skip compression if ratio < this
	EnableAdaptive      bool    `json:"enable_adaptive" yaml:"enable_adaptive"`             // Enable adaptive compression
	EnableBaselineSync  bool    `json:"enable_baseline_sync" yaml:"enable_baseline_sync"`   // Enable cluster sync
}

DeltaEncodingConfig configuration for delta encoding

func DefaultDeltaEncodingConfig

func DefaultDeltaEncodingConfig() *DeltaEncodingConfig

DefaultDeltaEncodingConfig returns sensible defaults for Phase 1

type DictionaryTrainer

type DictionaryTrainer struct {
	// contains filtered or unexported fields
}

DictionaryTrainer manages Zstandard dictionary training for improved compression

func NewDictionaryTrainer

func NewDictionaryTrainer(config *DictionaryTrainingConfig, logger *zap.Logger) (*DictionaryTrainer, error)

NewDictionaryTrainer creates a new dictionary trainer

func (*DictionaryTrainer) AddSample

func (dt *DictionaryTrainer) AddSample(resourceType string, data []byte)

AddSample collects a sample for dictionary training

func (*DictionaryTrainer) Close

func (dt *DictionaryTrainer) Close() error

Close stops the training scheduler and releases resources

func (*DictionaryTrainer) GetDictionary

func (dt *DictionaryTrainer) GetDictionary(resourceType string) ([]byte, bool)

GetDictionary retrieves a trained dictionary for a resource type

func (*DictionaryTrainer) GetStats

func (dt *DictionaryTrainer) GetStats() map[string]interface{}

GetStats returns dictionary statistics

func (*DictionaryTrainer) TrainAllDictionaries

func (dt *DictionaryTrainer) TrainAllDictionaries() error

TrainAllDictionaries trains dictionaries for all resource types with samples

func (*DictionaryTrainer) TrainDictionary

func (dt *DictionaryTrainer) TrainDictionary(resourceType string) error

TrainDictionary trains a dictionary for a specific resource type

type DictionaryTrainingConfig

type DictionaryTrainingConfig struct {
	Enabled        bool          `json:"enabled" yaml:"enabled"`
	UpdateInterval time.Duration `json:"update_interval" yaml:"update_interval"`
	MaxSamples     int           `json:"max_samples" yaml:"max_samples"`
	MinSampleSize  int           `json:"min_sample_size" yaml:"min_sample_size"`
	MaxDictSize    int           `json:"max_dict_size" yaml:"max_dict_size"`
	StoragePath    string        `json:"storage_path" yaml:"storage_path"`
}

DictionaryTrainingConfig configuration for dictionary training

func DefaultDictionaryTrainingConfig

func DefaultDictionaryTrainingConfig() *DictionaryTrainingConfig

DefaultDictionaryTrainingConfig returns sensible defaults

type EncodedData

type EncodedData struct {
	Data           []byte
	OriginalSize   int
	CompressedSize int
	IsDelta        bool
	BaselineKey    string
	Timestamp      time.Time
}

EncodedData represents compressed and optionally delta-encoded data

func (*EncodedData) CompressionRatio

func (ed *EncodedData) CompressionRatio() float64

CompressionRatio returns the compression ratio achieved

type NodeStatus

type NodeStatus string

NodeStatus represents the status of a remote node

const (
	NodeStatusOnline  NodeStatus = "online"
	NodeStatusOffline NodeStatus = "offline"
	NodeStatusSyncing NodeStatus = "syncing"
)

type RSyncDeltaComputer

type RSyncDeltaComputer struct {
	// contains filtered or unexported fields
}

RSyncDeltaComputer implements rsync-style rolling checksums

func NewRSyncDeltaComputer

func NewRSyncDeltaComputer(blockSize int) *RSyncDeltaComputer

func (*RSyncDeltaComputer) ApplyDelta

func (r *RSyncDeltaComputer) ApplyDelta(baseline, delta []byte) ([]byte, error)

func (*RSyncDeltaComputer) ComputeDelta

func (r *RSyncDeltaComputer) ComputeDelta(baseline, current []byte) ([]byte, error)

func (*RSyncDeltaComputer) Name

func (r *RSyncDeltaComputer) Name() string

type RemoteNode

type RemoteNode struct {
	NodeID        string
	Address       string
	LastSync      time.Time
	BaselineCount int
	Status        NodeStatus
}

RemoteNode represents a remote cluster node

type TrainedDictionary

type TrainedDictionary struct {
	ResourceType string    `json:"resource_type"`
	Dictionary   []byte    `json:"dictionary"`
	SampleCount  int       `json:"sample_count"`
	TrainedAt    time.Time `json:"trained_at"`
	Version      int       `json:"version"`
}

TrainedDictionary represents a trained Zstandard dictionary

type XORDeltaComputer

type XORDeltaComputer struct{}

XORDeltaComputer implements simple XOR-based delta encoding

func (*XORDeltaComputer) ApplyDelta

func (x *XORDeltaComputer) ApplyDelta(baseline, delta []byte) ([]byte, error)

func (*XORDeltaComputer) ComputeDelta

func (x *XORDeltaComputer) ComputeDelta(baseline, current []byte) ([]byte, error)

func (*XORDeltaComputer) Name

func (x *XORDeltaComputer) Name() string