import pandas as pd
import numpy as np
import json
import time
from .base import BaseTransform, MinMaxOpTransform, BinaryOpTransform, ConstantOpTransform, UnaryOpTransform, SISOTransform, MISOTransform
from .utils import transform_to_config, transform_from_config, ComputationGraph, build_feature_graph
from finmlkit.utils.log import get_logger
logger = get_logger(__name__)
[docs]
class Feature:
r"""High-level wrapper for data transformations enabling intuitive mathematical operations and fluent feature engineering.
This class provides a user-friendly interface for financial feature engineering by wrapping :class:`BaseTransform`
instances and enabling mathematical operations, function composition, and chainable transformations using familiar
Python operators and methods. It serves as the primary building block for constructing complex feature engineering
pipelines through an intuitive, expression-based syntax that mirrors mathematical notation.
**Core Design Philosophy:**
The Feature class implements a **fluent interface** design pattern that enables natural mathematical expressions
for feature engineering. Instead of manually composing transform objects, users can write feature engineering
logic using familiar mathematical operators and method chaining:
.. code-block:: python
# Traditional transform composition (verbose)
ma_transform = SimpleMovingAverageTransform('close', 'sma_20')
ratio_transform = BinaryOpTransform(price_transform, ma_transform, 'div', lambda x, y: x / y)
# Feature-based composition (intuitive)
price = Feature(PriceTransform('close'))
ma20 = Feature(SimpleMovingAverageTransform('close', 'sma_20'))
price_to_ma_ratio = price / ma20
**Mathematical Operations Framework:**
The class overloads Python's mathematical operators to create new Feature instances with automatically
composed transformations. Supported operations include:
- **Binary Operations**: Addition (+), subtraction (-), multiplication (*), division (/)
- **Unary Operations**: Absolute value (abs()), negation
- **Comparison Operations**: Element-wise minimum and maximum (static methods)
- **Constant Operations**: Mathematical operations with scalar values (e.g., `feature * 2`)
- **Reverse Operations**: Enable natural syntax like `3 + feature`
Each mathematical operation creates a new Feature instance wrapping an appropriate transform that
performs the mathematical computation during evaluation.
**Function Application and Composition:**
The :meth:`apply` method enables applying arbitrary functions to feature outputs, supporting:
- **Custom Functions**: User-defined lambda functions or named functions
- **Built-in Methods**: Pandas methods like rolling operations, transformations
- **Mathematical Functions**: NumPy mathematical functions (log, exp, sqrt, etc.)
- **Automatic Naming**: Generates descriptive names based on function names and parameters
**Performance Optimization Features:**
- **Caching Support**: Optional caching mechanism to avoid recomputing expensive transformations
- **Backend Selection**: Supports both pandas ("pd") and Numba ("nb") computational backends
- **Lazy Evaluation**: Transformations are only computed when :meth:`__call__` is invoked
- **Transform Reuse**: Wrapped transforms can be shared across multiple Feature instances
**Built-in Convenience Methods:**
The class provides pre-implemented methods for common financial operations:
- **Statistical Functions**: Rolling mean, standard deviation, exponential moving average
- **Mathematical Transforms**: Logarithms, exponentials, square roots, clipping
- **Time Series Operations**: Lagging, shifting, rolling aggregations
- **Risk/Return Metrics**: Log returns, volatility calculations, normalized features
**Feature Naming and Metadata:**
Features maintain intelligent naming schemes that:
- **Preserve Traceability**: Names reflect the sequence of operations applied
- **Support Customization**: Allow manual name overrides for semantic clarity
- **Enable Pipeline Integration**: Generate column names suitable for DataFrame integration
- **Maintain Consistency**: Ensure naming conventions across mathematical operations
**Integration with Transform Hierarchy:**
The Feature class seamlessly integrates with the transform ecosystem:
- **Transform Wrapping**: Can wrap any :class:`BaseTransform` subclass (SISO, MISO, SIMO, MIMO)
- **Operation Transforms**: Automatically creates appropriate operation transforms for mathematical expressions
- **Backend Compatibility**: Supports both pandas and Numba backends through wrapped transforms
- **Validation Inheritance**: Inherits input validation and error handling from underlying transforms
.. note::
Feature instances are designed to be **immutable** - mathematical operations create new Feature
objects rather than modifying existing ones. This design promotes functional programming patterns
and prevents unintended side effects in complex feature engineering pipelines.
.. tip::
The caching mechanism is particularly valuable for expensive transformations that are reused
across multiple features. Consider enabling caching for computationally intensive operations
like rolling correlations, technical indicators, or statistical decompositions.
Args:
transform (BaseTransform): The underlying transform to wrap. Can be any subclass of
:class:`BaseTransform` including SISO, MISO, SIMO, or MIMO transforms.
Raises:
AttributeError: If the wrapped transform doesn't have the required `output_name` attribute.
TypeError: If name setter receives incompatible types during custom name assignment.
AssertionError: If custom names have mismatched lengths for multi-output transforms.
Examples:
Basic feature creation and mathematical operations:
>>> # doctest: +SKIP
>>> # Create base features
>>> dates = pd.date_range('2023-01-01', periods=20, freq='D')
>>> data = pd.DataFrame({
... 'close': 100 + np.random.randn(20).cumsum(),
... 'volume': np.random.randint(1000, 5000, 20)
... }, index=dates)
>>>
>>> # Wrap transforms as features
>>> price = Feature(SimpleMovingAverageTransform('close', 'sma_20')) # doctest: +SKIP
>>> volume = Feature(VolumeTransform('volume')) # doctest: +SKIP
>>>
>>> # Mathematical operations
>>> price_vol_ratio = price / volume # doctest: +SKIP
>>> normalized_price = (price - price.rolling_mean(10)) / price.rolling_std(10) # doctest: +SKIP
Advanced function application:
>>> # doctest: +SKIP
>>> # Custom function application
>>> log_returns = price.apply(lambda x: x.pct_change().apply(np.log), suffix='log_ret') # doctest: +SKIP
>>>
>>> # Built-in convenience methods
>>> clipped_returns = log_returns.clip(lower=-0.1, upper=0.1) # doctest: +SKIP
>>> volatility = log_returns.rolling_std(30) # doctest: +SKIP
>>>
>>> # Composite feature engineering
>>> momentum = price / price.lag(20) - 1 # doctest: +SKIP
>>> momentum_signal = Feature.max(Feature.min(momentum, 0.2), -0.2) # doctest: +SKIP
Performance optimization with caching:
>>> # doctest: +SKIP
>>> # Create expensive computation
>>> complex_indicator = Feature(ComplexTechnicalIndicator('close', window=100)) # doctest: +SKIP
>>>
>>> # Use caching for repeated calculations
>>> cache = pd.DataFrame() # doctest: +SKIP
>>> result1 = complex_indicator(data, cache=cache) # doctest: +SKIP
>>> result2 = complex_indicator(data, cache=cache) # Uses cached result # doctest: +SKIP
Feature pipeline construction:
.. code-block:: python
# Build a comprehensive feature set
base_price = Feature(PriceTransform('close'))
# Technical indicators
sma_20 = base_price.rolling_mean(20)
sma_50 = base_price.rolling_mean(50)
rsi = Feature(RSITransform('close', 14))
# Derived features
price_momentum = base_price / base_price.lag(10) - 1
sma_ratio = sma_20 / sma_50
mean_reversion = (base_price - sma_20) / base_price.rolling_std(20)
# Composite signals
trend_signal = Feature.max(Feature.min(sma_ratio - 1, 0.1), -0.1)
momentum_signal = price_momentum.clip(lower=-0.2, upper=0.2)
combined_signal = (trend_signal + momentum_signal) / 2
See Also:
- :class:`BaseTransform`: The underlying transform interface that Feature wraps.
- :class:`CoreTransform`: Base class for dual-backend transforms used within Features.
- :class:`BinaryOpTransform`: Transform class created for binary mathematical operations.
- :class:`UnaryOpTransform`: Transform class created for unary mathematical operations.
- :class:`ConstantOpTransform`: Transform class created for operations with scalar constants.
References:
.. _`Fluent Interface Pattern`: https://martinfowler.com/bliki/FluentInterface.html
.. _`Feature Engineering for Machine Learning`: https://www.oreilly.com/library/view/feature-engineering-for/9781491953235/
.. _`Operator Overloading in Python`: https://docs.python.org/3/reference/datamodel.html#emulating-numeric-types
"""
[docs]
def __init__(self, transform: BaseTransform):
self.transform = transform
self._name = transform.output_name
def __call__(self, x: pd.DataFrame, *, cache: pd.DataFrame = None, backend="nb"):
"""Forward the call to the underlying transform"""
if cache is not None:
if isinstance(self.transform.output_name, str) and self.transform.output_name in cache.columns:
logger.info(f"Using cached output for {self.transform.output_name}")
return cache[self.transform.output_name]
return self.transform(x, backend=backend)
@property
def name(self):
"""Get the output name from the wrapped transform"""
return self._name
@name.setter
def name(self, output_name):
"""Set a custom name from the feature"""
assert type(output_name) == type(self._name), "Same type"
if isinstance(output_name, (tuple, list)):
assert len(output_name) == len(self._name), "same length"
self._name = output_name
# --- Serialization -----------------------------------------------------
[docs]
def to_config(self) -> dict:
"""Serialize this Feature (and underlying transform) to a JSON-serializable dict.
Note: custom arbitrary functions used via Feature.apply may not be fully reconstructable
unless their op_name is recognized (abs, log, log1p, exp, square, sqrt, clip_*).
"""
return {
"name": self._name if isinstance(self._name, str) else list(self._name),
"transform": transform_to_config(self.transform),
}
[docs]
@staticmethod
def from_config(cfg: dict) -> "Feature":
t = transform_from_config(cfg["transform"]) # type: ignore[index]
f = Feature(t)
# Preserve custom name if provided
name = cfg.get("name")
if name is not None:
f.name = name
return f
[docs]
def apply(self, func, *args, suffix=None, **kwargs):
"""
Apply an arbitrary function to the output of this feature.
:param func: The function to apply to the feature output
:param args: Additional positional arguments to pass to the function
:param suffix: Optional suffix to add to the feature name (default is function name)
:param kwargs: Additional keyword arguments to pass to the function
:return: A new Feature with the function applied
"""
func_name = suffix if suffix is not None else func.__name__
# Use the current feature name as the base name instead of the input column
base_name = str(self.name) # Convert to string to be safe
# Combine the base name with the function name
new_name = f"{base_name}_{func_name}"
# Create the transform with the operation
transform = UnaryOpTransform(self.transform, func_name, lambda x: func(x, *args, **kwargs))
# Create and initialize the new feature
feature = Feature(transform)
# Update the name in both places - crucial for consistency
transform.produces = [new_name] # Update the transform's produces list to contain the new name
feature.name = new_name # Set the name directly on the feature
return feature
# Mathematical operations
def __add__(self, other):
if isinstance(other, Feature):
return Feature(BinaryOpTransform(self.transform, other.transform, "add", lambda x, y: x + y))
elif isinstance(other, (int, float)):
return Feature(ConstantOpTransform(self.transform, other, "add", lambda x, c: x + c))
return NotImplemented
def __sub__(self, other):
if isinstance(other, Feature):
return Feature(BinaryOpTransform(self.transform, other.transform, "sub", lambda x, y: x - y))
elif isinstance(other, (int, float)):
return Feature(ConstantOpTransform(self.transform, other, "sub", lambda x, c: x - c))
return NotImplemented
def __mul__(self, other):
if isinstance(other, Feature):
return Feature(BinaryOpTransform(self.transform, other.transform, "mul", lambda x, y: x * y))
elif isinstance(other, (int, float)):
return Feature(ConstantOpTransform(self.transform, other, "mul", lambda x, c: x * c))
return NotImplemented
def __truediv__(self, other):
if isinstance(other, Feature):
return Feature(BinaryOpTransform(self.transform, other.transform, "div", lambda x, y: x / y))
elif isinstance(other, (int, float)):
return Feature(ConstantOpTransform(self.transform, other, "div", lambda x, c: x / c))
return NotImplemented
def __abs__(self):
return Feature(UnaryOpTransform(self.transform, "abs", lambda x: x.abs()))
# Right-side operations (for const op feature cases)
def __radd__(self, other):
if isinstance(other, (int, float)):
return Feature(ConstantOpTransform(self.transform, other, "add", lambda x, c: x + c))
return NotImplemented
def __rsub__(self, other):
if isinstance(other, (int, float)):
return Feature(ConstantOpTransform(self.transform, other, "rsub", lambda x, c: c - x))
return NotImplemented
def __rmul__(self, other):
if isinstance(other, (int, float)):
return Feature(ConstantOpTransform(self.transform, other, "mul", lambda x, c: x * c))
return NotImplemented
def __rtruediv__(self, other):
if isinstance(other, (int, float)):
return Feature(ConstantOpTransform(self.transform, other, "rdiv", lambda x, c: c / x))
return NotImplemented
# Common operations that can be added as convenience methods
[docs]
def clip(self, lower=None, upper=None):
"""
Clip the values of the feature between lower and upper bounds.
:param lower: Lower boundary (optional)
:param upper: Upper boundary (optional)
:return: A new Feature with clipped values
"""
suffix = f"clip_{lower}_{upper}".replace("None", "")
return self.apply(lambda x: x.clip(lower=lower, upper=upper), suffix=suffix)
[docs]
def abs(self):
"""
Get the absolute values of the feature.
:return: A new Feature with absolute values
"""
return Feature(UnaryOpTransform(self.transform, "abs", lambda x: x.abs()))
[docs]
def log(self):
"""
Get the natural logarithm of the feature.
:return: A new Feature with log values
"""
return self.apply(lambda x: x.apply(lambda v: np.log(v) if v > 0 else np.nan), suffix="log")
[docs]
def log1p(self):
"""
Get the natural logarithm of the feature.
:return: A new Feature with log values
"""
return self.apply(lambda x: x.apply(lambda v: np.log1p(v) if v >= 0 else np.nan), suffix="log1p")
[docs]
def exp(self):
"""
Get the exponential of the feature.
:return: A new Feature with exp values
"""
return self.apply(lambda x: x.apply(np.exp), suffix="exp")
[docs]
def square(self):
"""
Get the square of the feature.
:return: A new Feature with squared values
"""
return self.apply(lambda x: x ** 2, suffix="square")
[docs]
def sqrt(self):
"""
Get the square root of the feature.
:return: A new Feature with square root values
"""
return self.apply(lambda x: x.apply(lambda v: np.sqrt(v) if v >= 0 else np.nan), suffix="sqrt")
[docs]
def rolling_mean(self, window):
"""
Calculate the rolling mean of the feature.
:param window: Rolling window size
:return: A new Feature with rolling mean values
"""
return self.apply(lambda x: x.rolling(window=window).mean(), suffix=f"rmean{window}")
[docs]
def ema(self, span, adjust=True):
"""
Calculate the Exponential Moving Average (EMA) of the feature.
:param span: Span for the EMA calculation
:param adjust: Whether to adjust the EMA calculation (default is True)
:return: A new Feature with EMA values
"""
return self.apply(lambda x: x.ewm(span=span, adjust=adjust).mean(), suffix=f"ema{span}")
[docs]
def rolling_sum(self, window):
"""
Calculate the rolling sum of the feature.
:param window: Rolling window size
:return: A new Feature with rolling sum values
"""
return self.apply(lambda x: x.rolling(window=window).sum(), suffix=f"rsum{window}")
[docs]
def rolling_std(self, window):
"""
Calculate the rolling standard deviation of the feature.
:param window: Rolling window size
:return: A new Feature with rolling std values
"""
return self.apply(lambda x: x.rolling(window=window).std(), suffix=f"rstd{window}")
[docs]
def lag(self, period):
"""
Create a lagged version of the feature.
:param period: Number of periods to lag
:return: A new Feature with lagged values
"""
return self.apply(lambda x: x.shift(period), suffix=f"lag{period}")
[docs]
@staticmethod
def min(a, b):
"""
Calculate the element-wise minimum between two features.
:param a: First feature or scalar
:param b: Second feature or scalar
:return: A new Feature containing the element-wise minimum
"""
if isinstance(a, Feature) and isinstance(b, Feature):
return Feature(MinMaxOpTransform(a.transform, b.transform, "min", lambda x, y: np.minimum(x, y)))
elif isinstance(a, Feature) and isinstance(b, (int, float)):
return Feature(ConstantOpTransform(a.transform, b, "min", lambda x, c: np.minimum(x, c)))
elif isinstance(b, Feature) and isinstance(a, (int, float)):
return Feature(ConstantOpTransform(b.transform, a, "min", lambda x, c: np.minimum(x, c)))
return NotImplemented
[docs]
@staticmethod
def max(a, b):
"""
Calculate the element-wise maximum between two features.
:param a: First feature or scalar
:param b: Second feature or scalar
:return: A new Feature containing the element-wise maximum
"""
if isinstance(a, Feature) and isinstance(b, Feature):
return Feature(MinMaxOpTransform(a.transform, b.transform, "max", lambda x, y: np.maximum(x, y)))
elif isinstance(a, Feature) and isinstance(b, (int, float)):
return Feature(ConstantOpTransform(a.transform, b, "max", lambda x, c: np.maximum(x, c)))
elif isinstance(b, Feature) and isinstance(a, (int, float)):
return Feature(ConstantOpTransform(b.transform, a, "max", lambda x, c: np.maximum(x, c)))
return NotImplemented
[docs]
class Compose(BaseTransform):
r"""Composite transform that chains multiple single-output transforms into a sequential processing pipeline.
This class implements the **Composite Pattern** for data transformations, enabling the creation of complex
feature engineering pipelines by sequentially chaining :class:`SISOTransform` and :class:`MISOTransform`
instances. It provides a unified interface for executing multi-step transformations while maintaining
the same interface as individual transforms, enabling seamless integration with the broader transform ecosystem.
**Pipeline Composition Framework:**
The Compose class creates a linear processing pipeline where each transform's output becomes the input
to the next transform in the sequence. For a composition of transforms :math:`T_1, T_2, \ldots, T_n`,
the overall transformation is:
.. math::
Y = T_n(T_{n-1}(\ldots T_2(T_1(X)) \ldots))
where :math:`X` is the input DataFrame and :math:`Y` is the final output Series. This composition
enables building sophisticated feature engineering workflows from simple, reusable transform components.
**Key Design Features:**
- **Sequential Processing**: Transforms are applied in the order specified during initialization
- **Type Safety**: Only accepts SISO and MISO transforms that produce single outputs compatible with subsequent inputs
- **Automatic Naming**: Generates descriptive output names by concatenating all transform identifiers
- **Input Validation**: Validates the initial input and ensures compatibility throughout the pipeline
- **Backend Consistency**: Maintains the same computational backend across all pipeline stages
- **Caching Optimization**: Supports skipping initial transforms if their outputs already exist in the input DataFrame
**Pipeline Execution Logic:**
The composition handles several execution scenarios:
1. **Fresh Computation**: All transforms are executed sequentially from the input DataFrame
2. **Partial Caching**: If the first transform's output already exists in the input DataFrame, it uses the cached result
3. **Intermediate Processing**: Each subsequent transform receives a temporary DataFrame containing only the required input column
4. **Result Propagation**: Intermediate results are passed through the pipeline until the final output is produced
**Naming Convention:**
Output names are constructed by concatenating the first transform's output name with all subsequent transforms'
`produces` identifiers:
.. math::
\text{output_name} = \text{first_output} + \text{"_"} + \text{produces}_2 + \text{"_"} + \ldots + \text{"_"} + \text{produces}_n
For example, composing a moving average transform (producing `'ma20'`), RSI transform (producing `'rsi14'`),
and signal transform (producing `'signal'`) results in the output name `'ma20_rsi14_signal'`.
**Performance Considerations:**
- **Memory Efficiency**: Only one intermediate Series is maintained at a time, minimizing memory footprint
- **Backend Optimization**: All transforms use the same computational backend for consistency
- **Caching Benefits**: Can leverage pre-computed results to avoid redundant calculations
- **Error Propagation**: Validation errors are caught early before expensive computations begin
**Use Cases in Financial Engineering:**
- **Technical Indicator Chains**: Price → Moving Average → RSI → Trading Signal
- **Risk Metric Pipelines**: Returns → Volatility → Value-at-Risk → Risk Adjusted Return
- **Factor Construction**: Raw Data → Normalization → Winsorization → Z-Score → Factor Score
- **Signal Processing**: Price → Log Returns → Smoothing → Momentum → Regime Classification
.. important::
All transforms in the composition must produce single outputs (SISO or MISO only). For transforms
producing multiple outputs (SIMO, MIMO), use explicit pipeline construction with intermediate
DataFrame management instead of the Compose class.
Args:
*transforms (SISOTransform|MISOTransform): Variable number of transforms to compose into a pipeline.
Must be single-output transforms with compatible input/output column specifications.
The first transform determines the pipeline's input requirements.
Raises:
TypeError: If input is not a pandas DataFrame during validation.
ValueError: If the required input column is not found in the DataFrame.
AssertionError: If backend parameter is not "pd" or "nb".
AttributeError: If transforms don't have required attributes (requires, produces, output_name).
Examples:
Creating a technical analysis pipeline:
.. code-block:: python
from your_module import Compose, SimpleMovingAverageTransform, RSITransform, SignalTransform
# Create individual transforms
ma_transform = SimpleMovingAverageTransform('close', 'ma20')
rsi_transform = RSITransform('ma20', 'rsi14') # RSI of moving average
signal_transform = SignalTransform('rsi14', 'signal') # Trading signal from RSI
# Compose into pipeline
pipeline = Compose(ma_transform, rsi_transform, signal_transform)
print(f"Pipeline input: {pipeline.requires}") # ['close']
print(f"Pipeline output: {pipeline.output_name}") # 'ma20_rsi14_signal'
Using the composed pipeline:
>>> # doctest: +SKIP
>>> # Sample price data
>>> dates = pd.date_range('2023-01-01', periods=100, freq='D')
>>> data = pd.DataFrame({
... 'close': 100 + np.random.randn(100).cumsum()
... }, index=dates)
>>>
>>> # Apply the complete pipeline
>>> pipeline = Compose(ma_transform, rsi_transform, signal_transform) # doctest: +SKIP
>>> result = pipeline(data, backend='pd') # doctest: +SKIP
>>> print(f"Pipeline output type: {type(result)}") # doctest: +SKIP
Pipeline output type: <class 'pandas.core.series.Series'>
>>> print(f"Output name: {result.name}") # doctest: +SKIP
Output name: ma20_rsi14_signal
Advanced pipeline with caching optimization:
>>> # doctest: +SKIP
>>> # Data with pre-computed moving average
>>> data_with_ma = data.copy()
>>> data_with_ma['ma20'] = data['close'].rolling(20).mean() # doctest: +SKIP
>>>
>>> # Pipeline will skip the first transform and use cached MA
>>> result_cached = pipeline(data_with_ma, backend='nb') # doctest: +SKIP
>>> # First transform is skipped, starts with RSI calculation
Multi-step risk analysis pipeline:
.. code-block:: python
# Risk analysis pipeline: Returns → Volatility → VaR → Risk Score
returns_transform = ReturnsTransform('close', 'returns')
volatility_transform = VolatilityTransform('returns', 'vol30')
var_transform = VaRTransform('vol30', 'var95')
risk_score_transform = RiskScoreTransform('var95', 'risk_score')
risk_pipeline = Compose(
returns_transform,
volatility_transform,
var_transform,
risk_score_transform
)
# Single call computes entire risk analysis chain
risk_metrics = risk_pipeline(price_data, backend='nb')
Integration with Feature class:
.. code-block:: python
# Compose can be wrapped in Feature for mathematical operations
from your_module import Feature
technical_pipeline = Compose(ma_transform, rsi_transform)
technical_feature = Feature(technical_pipeline)
# Mathematical operations on the composed pipeline
normalized_signal = (technical_feature - 50) / 50 # Normalize RSI
combined_signal = technical_feature * volume_feature
See Also:
- :class:`BaseTransform`: The base interface that Compose implements and extends.
- :class:`SISOTransform`: Single-input, single-output transforms that can be composed.
- :class:`MISOTransform`: Multiple-input, single-output transforms that can be composed.
- :class:`Feature`: High-level wrapper that can encapsulate Compose instances for mathematical operations.
- :class:`Pipeline`: Alternative approach for more complex multi-branch transformation workflows.
References:
.. _`Composite Pattern`: https://refactoring.guru/design-patterns/composite
.. _`Pipeline Pattern in Data Processing`: https://martinfowler.com/articles/collection-pipeline/
.. _`Functional Composition`: https://en.wikipedia.org/wiki/Function_composition_(computer_science)
"""
[docs]
def __init__(self, *transforms: SISOTransform|MISOTransform):
requires = transforms[0].requires[0] # First tfs determines the source column
first_output = transforms[0].output_name
produces = "_".join([first_output] + [t.produces[0] for t in transforms[1:]])
super().__init__(requires, produces)
self.transforms = transforms
@property
def output_name(self) -> str:
"""
Get the output name of the composed transform.
The output name is a combination of the first transform's output and the subsequent transforms' produces.
:return: Output name
"""
return self.produces[0]
[docs]
def _run_pipeline(self, x: pd.DataFrame, *, backend) -> pd.Series:
"""
Apply the composed transforms to the input DataFrame with caching/optimization:
- If the final output already exists in the input DataFrame, return it immediately.
- For each step, if its output exists in the DataFrame, reuse it rather than recomputing.
- Prefer DataFrame-provided required column(s) when available; else use the prior step's output.
:param x: DataFrame to transform
:param backend: Backend is already specified in the transforms
:return: Transformed Series
"""
self._validate_input(x)
# Short-circuit if final output is already present in the DataFrame
final_name = self.output_name
if final_name in x.columns:
return x[final_name]
current_series = None
for i, tfs in enumerate(self.transforms):
out_name = tfs.produces[0]
# If this step's output already exists, reuse it
if out_name in x.columns:
current_series = x[out_name]
continue
if i == 0:
# First transform: operate on full DataFrame x
current_series = tfs(x, backend=backend)
else:
# Subsequent transforms: prefer required column from DataFrame if available,
# otherwise use the prior step's series as the required input column.
req_col = tfs.requires[0]
if req_col in x.columns:
df_in = x[[req_col]]
else:
df_in = pd.DataFrame(current_series.values, index=current_series.index, columns=[req_col])
current_series = tfs(df_in, backend=backend)
# Optionally cache into x for downstream steps to reuse (without mutating original reference)
# We avoid modifying the input DataFrame in-place; downstream steps consult x for presence.
# If consumers expect caching to materialize, FeatureKit handles DataFrame accumulation.
# Return the final output Series with the composed name
current_series.name = final_name
return current_series
def __call__(self, x: pd.DataFrame, *, backend="nb") -> pd.Series:
"""
Apply the composed transforms to the input DataFrame.
:param x: DataFrame to transform
:param backend: Backend to use for the transform. Can be "pd" or "nb". Default is "nb".
:return: Transformed Series
"""
assert backend == "pd" or backend == "nb", "Backend must be either 'pd' or 'nb'."
return self._run_pipeline(x, backend=backend)
[docs]
class FeatureKit:
r"""High-level orchestration framework for executing collections of Feature objects in financial machine learning pipelines.
This class serves as the primary interface for building comprehensive feature engineering workflows by coordinating
multiple :class:`Feature` instances, managing computational resources, and providing performance diagnostics.
It represents the culmination of the feature engineering framework, enabling practitioners to construct, execute,
and analyze complex feature sets with minimal boilerplate code and maximum computational efficiency.
**Pipeline Orchestration Architecture:**
FeatureKit implements a **batch processing pattern** for feature computation, where multiple Feature objects are
executed sequentially against a shared DataFrame. This approach enables sophisticated optimization strategies:
- **Incremental Caching**: Intermediate feature outputs are cached within the working DataFrame, enabling dependent
features to reuse previously computed results without redundant calculations
- **Selective Retention**: Original DataFrame columns can be preserved alongside computed features, maintaining
data lineage and enabling hybrid analytical workflows
- **Performance Profiling**: Optional timing analysis identifies computational bottlenecks and guides optimization efforts
- **Backend Consistency**: All features execute with the same computational backend for consistent performance characteristics
**Mathematical Processing Framework:**
For a collection of features :math:`F_1, F_2, \ldots, F_n` applied to input DataFrame :math:`D`, the processing
follows this computational model:
.. math::
\begin{align}
D_0 &= D \cup \text{retain_columns} \\
D_1 &= D_0 \cup \{F_1(D_0)\} \\
D_2 &= D_1 \cup \{F_2(D_1)\} \\
&\vdots \\
D_n &= D_{n-1} \cup \{F_n(D_{n-1})\}
\end{align}
where :math:`\cup` represents column-wise DataFrame concatenation and each :math:`F_i(D_{i-1})` can access
all previously computed features through the caching mechanism.
This iterative approach enables:
- **Memory Efficiency**: Only one working DataFrame is maintained, with features added incrementally
- **Computational Reuse**: Expensive intermediate calculations are preserved for downstream feature computations
**Performance Analysis System:**
When timing analysis is enabled, FeatureKit generates detailed performance metrics:
.. math::
\text{Relative Performance} = \frac{t_i}{\max(t_1, t_2, \ldots, t_n)} \times 100\%
where :math:`t_i` is the execution time for feature :math:`i`. Results are visualized using ASCII bar charts
that provide immediate visual feedback on computational bottlenecks.
**Integration with Feature Ecosystem:**
FeatureKit seamlessly integrates with the complete feature engineering framework:
- **Feature Objects**: Accepts any Feature instance, regardless of the underlying transform type (SISO, MISO, SIMO, MIMO)
- **Mathematical Expressions**: Can execute features created through mathematical operations (addition, multiplication, etc.)
- **Composed Transforms**: Supports features built using the :class:`Compose` class for complex transformation chains
- **Custom Functions**: Works with features created using the :meth:`Feature.apply` method for arbitrary function application
**Production Deployment Considerations:**
The class is designed for both research and production environments:
- **Scalability**: Efficient memory management enables processing of large datasets without excessive resource consumption
- **Reproducibility**: Deterministic execution order ensures consistent results across runs
- **Debugging**: Timing analysis and clear error messages facilitate troubleshooting in complex pipelines
- **Flexibility**: Support for both pandas and Numba backends enables optimization for different deployment scenarios
**Caching Strategy and Optimization:**
The caching mechanism provides significant performance benefits:
1. **Intermediate Result Reuse**: Features that depend on common sub-computations automatically benefit from cached results
2. **Memory Efficiency**: Results are stored directly in the working DataFrame, minimizing memory overhead
3. **Cache Coherence**: The cache is updated incrementally, ensuring all features see consistent intermediate state
Args:
features (list[Feature]): Ordered list of Feature instances to execute. Order determines execution sequence
and affects caching behavior for interdependent features.
retain (list[str], optional): Column names from the input DataFrame to preserve in the output unchanged.
If None or empty, only computed features are included in the output DataFrame.
Raises:
TypeError: If any feature returns an unexpected type (not Series or tuple of Series).
KeyError: If retained columns are not present in the input DataFrame.
AttributeError: If Feature objects lack required attributes or methods.
Examples:
Basic feature pipeline construction:
>>> # doctest: +SKIP
>>> # Prepare sample financial data
>>> dates = pd.date_range('2023-01-01', periods=100, freq='D')
>>> np.random.seed(42)
>>> prices = 100 + np.random.randn(100).cumsum()
>>> data = pd.DataFrame({'close': prices, 'volume': np.random.randint(1000, 10000, 100)}, index=dates)
>>>
>>> # Create individual features
>>> price_feature = Feature(SimpleMovingAverageTransform('close', 'sma_20'))
>>> rsi_feature = Feature(RSITransform('close', 'rsi_14'))
>>>
>>> # Build feature pipeline
>>> feature_kit = FeatureKit([price_feature, rsi_feature], retain=['close', 'volume'])
>>> result_df = feature_kit.build(data, backend='nb', timeit=False)
>>> print(f"Output shape: {result_df.shape}") # doctest: +SKIP
Output shape: (100, 4)
>>> print(f"Columns: {list(result_df.columns)}") # doctest: +SKIP
Columns: ['close', 'volume', 'close_sma_20', 'close_rsi_14']
Advanced pipeline with interdependent features:
>>> # doctest: +SKIP
>>> # Create features with dependencies
>>> base_price = Feature(PriceTransform('close'))
>>> sma_20 = base_price.rolling_mean(20)
>>> price_to_sma_ratio = base_price / sma_20 # Depends on sma_20
>>> momentum_signal = price_to_sma_ratio.clip(lower=0.8, upper=1.2) # Depends on ratio
>>>
>>> advanced_kit = FeatureKit([
... base_price,
... sma_20,
... price_to_sma_ratio,
... momentum_signal
... ], retain=['close'])
>>>
>>> # Execute with performance profiling
>>> advanced_result = advanced_kit.build(data, backend='nb', timeit=True) # doctest: +SKIP
Production-scale feature engineering:
.. code-block:: python
# Large-scale feature construction
import pandas as pd
from finmlkit.features import Feature, FeatureKit
from finmlkit.transforms import *
# Load large dataset
large_data = pd.read_csv('large_financial_dataset.csv', index_col='timestamp', parse_dates=True)
# Comprehensive feature set
features = []
# Price-based features
price = Feature(PriceTransform('close'))
features.extend([
price.rolling_mean(10),
price.rolling_mean(20),
price.rolling_mean(50),
price.rolling_std(20),
price.log().diff(), # Log returns
])
# Volume-based features
volume = Feature(VolumeTransform('volume'))
features.extend([
volume.rolling_mean(20),
(price * volume).rolling_mean(20), # Dollar volume
])
# Technical indicators
features.extend([
Feature(RSITransform('close', 'rsi_14')),
Feature(MACDTransform('close', 'macd')),
Feature(BollingerBandsTransform('close', 'bb')),
])
# Cross-asset features (if multiple assets)
if 'close_spy' in large_data.columns:
spy_price = Feature(PriceTransform('close_spy'))
beta = Feature(BetaTransform(['close', 'close_spy'], 'beta_spy'))
features.append(beta)
# Create comprehensive feature kit
production_kit = FeatureKit(features, retain=['close', 'volume', 'open', 'high', 'low'])
# Execute with timing for optimization analysis
feature_matrix = production_kit.build(large_data, backend='nb', timeit=True)
# Save results for model training
feature_matrix.to_parquet('feature_matrix.parquet')
Reproducibility and config I/O:
.. code-block:: python
# Save and reload feature pipeline configuration
kit = FeatureKit(features, retain=['close', 'volume'])
kit.save_config('featurekit.json')
kit2 = FeatureKit.from_config('featurekit.json')
df2 = kit2.build(large_data, backend='pd', order='defined')
Execution order and dependency graph:
.. code-block:: python
# Compute in topological order to resolve dependencies automatically
df_topo = kit.build(large_data, backend='pd', order='topo')
# Visualize the graph
print(kit.build_graph().visualize())
External functions (e.g., NumPy/TA-Lib) via ExternalFunction:
.. code-block:: python
from finmlkit.feature.transforms import ExternalFunction
# Single-output example using NumPy (passes numpy arrays to function)
log_close = Feature(ExternalFunction('numpy.log', input_cols='close', output_cols='log_close', pass_numpy=True))
# TA-Lib example (if talib is installed)
# rsi14 = Feature(ExternalFunction('talib.RSI', input_cols='close', output_cols='ta_rsi14', args=[14], pass_numpy=True))
kit_ext = FeatureKit([log_close], retain=['close'])
df_ext = kit_ext.build(large_data, backend='pd')
See Also:
- :class:`Feature`: Core wrapper class for individual transformations with mathematical operations.
- :class:`BaseTransform`: Abstract base class for all transformation implementations.
- :class:`Compose`: Pipeline composition class for chaining single-output transforms.
- :class:`SISOTransform`, :class:`MISOTransform`, :class:`SIMOTransform`, :class:`MIMOTransform`: Concrete transform base classes.
References:
.. _`Feature Engineering for Machine Learning`: https://www.oreilly.com/library/view/feature-engineering-for/9781491953235/
.. _`Pipeline Pattern in Data Processing`: https://martinfowler.com/articles/collection-pipeline/
.. _`Efficient Feature Computation`: https://papers.nips.cc/paper/2019/hash/496e05e1aea0a9c4655800e8a7b9ea28-Abstract.html
"""
[docs]
def __init__(self, features: list[Feature], retain: list[str] = None):
self.features = features
self.retain = retain or []
# --- Serialization / Reproducibility ---------------------------------
[docs]
def to_config(self) -> dict:
return {
"retain": list(self.retain),
"features": [f.to_config() for f in self.features],
}
[docs]
def save_config(self, path: str):
with open(path, "w", encoding="utf-8") as f:
json.dump(self.to_config(), f, ensure_ascii=False, indent=2)
[docs]
@staticmethod
def from_dict(cfg: dict) -> "FeatureKit":
feats = [Feature.from_config(fc) for fc in cfg.get("features", [])]
retain = cfg.get("retain", [])
return FeatureKit(feats, retain=retain)
[docs]
@classmethod
def from_config(cls, path: str) -> "FeatureKit":
with open(path, "r", encoding="utf-8") as f:
cfg = json.load(f)
return cls.from_dict(cfg)
# --- Graph API --------------------------------------------------------
[docs]
def build_graph(self) -> ComputationGraph:
return build_feature_graph(self.features)
[docs]
def topological_order(self) -> list[str]:
# Compute topo order among feature names only (ignore input nodes)
g = self.build_graph()
names = [str(f.name) for f in self.features]
name_set = set(names)
# Build subgraph edges between feature nodes
edges = {n: set() for n in name_set}
indeg = {n: 0 for n in name_set}
for src, dests in g.edges.items():
if src not in name_set:
continue
for d in dests:
if d in name_set:
edges[src].add(d)
indeg[d] += 1
# Kahn on feature-only subgraph
zero = [n for n, d in indeg.items() if d == 0]
order = []
while zero:
n = zero.pop(0)
order.append(n)
for d in list(edges.get(n, [])):
indeg[d] -= 1
if indeg[d] == 0:
zero.append(d)
# Append any missing (fallback to original order)
missing = [n for n in names if n not in order]
return order + missing
[docs]
def build(self, df, *, backend="nb", timeit=False, order: str = "defined"):
"""Execute all Features and return a DataFrame with retained and computed columns.
Parameters:
df (pd.DataFrame): Input DataFrame containing raw columns required by features.
backend (str): Computational backend for all features. "pd" for pandas, "nb" for numba. Default "nb".
timeit (bool): If True, prints a timing analysis for each feature after execution.
order (str): Execution order for features:
- "defined" (default): Run features in the order they were provided to FeatureKit
- "topo": Run features in topological order based on dependencies inferred
from their underlying transforms. This helps when the list order doesn't already
respect dependencies (e.g., when a feature uses the output of another).
Returns:
pd.DataFrame: A DataFrame that contains retained columns and all computed feature columns.
"""
out = df[self.retain].copy()
df = df.copy()
# Determine execution order
features_seq = self.features
if order == "topo":
name2feat = {str(f.name): f for f in self.features}
topo_names = self.topological_order()
# Stable order: topo first, then any remaining by original order
topo_feats = [name2feat[n] for n in topo_names if n in name2feat]
remaining = [f for f in self.features if str(f.name) not in set(topo_names)]
features_seq = topo_feats + remaining
timing_info = {}
for feat in features_seq:
if timeit:
start_time = time.time()
res = feat(df, cache=df, backend=backend)
if timeit:
elapsed = time.time() - start_time
key = feat.name if isinstance(feat.name, str) else str(feat.name)
timing_info[key] = elapsed
if isinstance(res, pd.Series):
# Single output transform case
out[feat.name] = res
df[feat.transform.output_name] = res # cache the result in the DataFrame (for compose transforms)
elif isinstance(res, tuple):
# Multi output transform case
for item in res:
out[item.name] = item
df[item.name] = item # cache the result in the DataFrame (for compose transforms)
else:
raise TypeError(f"Transform {feat} returned unexpected type: {type(res)}")
if timeit:
# Create a simple console plot for timing information
print("\nFeature Timing Analysis:")
print("=======================")
# Sort features by execution time
sorted_times = sorted(timing_info.items(), key=lambda x: x[1], reverse=True)
# Find the max time for scaling
max_time = max(t for _, t in sorted_times) if sorted_times else 0
max_bar_length = 50 # Maximum number of characters for the bar
# Print bars for each feature
for feature_name, time_taken in sorted_times:
bar_length = int((time_taken / max_time) * max_bar_length) if max_time > 0 else 0
bar = '█' * bar_length
print(f"{feature_name:<30} | {bar} {time_taken:.4f}s")
return out