from .base import BaseTransform, MinMaxOpTransform, BinaryOpTransform, ConstantOpTransform, UnaryOpTransform, SISOTransform, MISOTransform
import pandas as pd
import numpy as np
from finmlkit.utils.log import get_logger
import time
logger = get_logger(__name__)
[docs]
class Feature:
"""
Wraps a BaseTransform and enables mathematical operations between features.
"""
def __init__(self, transform: BaseTransform):
self.transform = transform
self._name = transform.output_name
def __call__(self, x: pd.DataFrame, *, cache: pd.DataFrame = None, backend="nb"):
"""Forward the call to the underlying transform"""
if cache is not None:
if isinstance(self.transform.output_name, str) and self.transform.output_name in cache.columns:
logger.info(f"Using cached output for {self.transform.output_name}")
return cache[self.transform.output_name]
return self.transform(x, backend=backend)
@property
def name(self):
"""Get the output name from the wrapped transform"""
return self._name
@name.setter
def name(self, output_name):
"""Set a custom name from the feature"""
assert type(output_name) == type(self._name), "Same type"
if isinstance(output_name, (tuple, list)):
assert len(output_name) == len(self._name), "same length"
self._name = output_name
[docs]
def apply(self, func, *args, suffix=None, **kwargs):
"""
Apply an arbitrary function to the output of this feature.
:param func: The function to apply to the feature output
:param args: Additional positional arguments to pass to the function
:param suffix: Optional suffix to add to the feature name (default is function name)
:param kwargs: Additional keyword arguments to pass to the function
:return: A new Feature with the function applied
"""
func_name = suffix if suffix is not None else func.__name__
# Use the current feature name as the base name instead of the input column
base_name = str(self.name) # Convert to string to be safe
# Combine the base name with the function name
new_name = f"{base_name}_{func_name}"
# Create the transform with the operation
transform = UnaryOpTransform(self.transform, func_name, lambda x: func(x, *args, **kwargs))
# Create and initialize the new feature
feature = Feature(transform)
# Update the name in both places - crucial for consistency
transform.produces = [new_name] # Update the transform's produces list to contain the new name
feature.name = new_name # Set the name directly on the feature
return feature
# Mathematical operations
def __add__(self, other):
if isinstance(other, Feature):
return Feature(BinaryOpTransform(self.transform, other.transform, "add", lambda x, y: x + y))
elif isinstance(other, (int, float)):
return Feature(ConstantOpTransform(self.transform, other, "add", lambda x, c: x + c))
return NotImplemented
def __sub__(self, other):
if isinstance(other, Feature):
return Feature(BinaryOpTransform(self.transform, other.transform, "sub", lambda x, y: x - y))
elif isinstance(other, (int, float)):
return Feature(ConstantOpTransform(self.transform, other, "sub", lambda x, c: x - c))
return NotImplemented
def __mul__(self, other):
if isinstance(other, Feature):
return Feature(BinaryOpTransform(self.transform, other.transform, "mul", lambda x, y: x * y))
elif isinstance(other, (int, float)):
return Feature(ConstantOpTransform(self.transform, other, "mul", lambda x, c: x * c))
return NotImplemented
def __truediv__(self, other):
if isinstance(other, Feature):
return Feature(BinaryOpTransform(self.transform, other.transform, "div", lambda x, y: x / y))
elif isinstance(other, (int, float)):
return Feature(ConstantOpTransform(self.transform, other, "div", lambda x, c: x / c))
return NotImplemented
def __abs__(self):
return Feature(UnaryOpTransform(self.transform, "abs", lambda x: x.abs()))
# Right-side operations (for const op feature cases)
def __radd__(self, other):
if isinstance(other, (int, float)):
return Feature(ConstantOpTransform(self.transform, other, "add", lambda x, c: x + c))
return NotImplemented
def __rsub__(self, other):
if isinstance(other, (int, float)):
return Feature(ConstantOpTransform(self.transform, other, "rsub", lambda x, c: c - x))
return NotImplemented
def __rmul__(self, other):
if isinstance(other, (int, float)):
return Feature(ConstantOpTransform(self.transform, other, "mul", lambda x, c: x * c))
return NotImplemented
def __rtruediv__(self, other):
if isinstance(other, (int, float)):
return Feature(ConstantOpTransform(self.transform, other, "rdiv", lambda x, c: c / x))
return NotImplemented
# Common operations that can be added as convenience methods
[docs]
def clip(self, lower=None, upper=None):
"""
Clip the values of the feature between lower and upper bounds.
:param lower: Lower boundary (optional)
:param upper: Upper boundary (optional)
:return: A new Feature with clipped values
"""
suffix = f"clip_{lower}_{upper}".replace("None", "")
return self.apply(lambda x: x.clip(lower=lower, upper=upper), suffix=suffix)
[docs]
def abs(self):
"""
Get the absolute values of the feature.
:return: A new Feature with absolute values
"""
return Feature(UnaryOpTransform(self.transform, "abs", lambda x: x.abs()))
[docs]
def log(self):
"""
Get the natural logarithm of the feature.
:return: A new Feature with log values
"""
return self.apply(lambda x: x.apply(lambda v: np.log(v) if v > 0 else np.nan), suffix="log")
[docs]
def log1p(self):
"""
Get the natural logarithm of the feature.
:return: A new Feature with log values
"""
return self.apply(lambda x: x.apply(lambda v: np.log1p(v) if v >= 0 else np.nan), suffix="log1p")
[docs]
def exp(self):
"""
Get the exponential of the feature.
:return: A new Feature with exp values
"""
return self.apply(lambda x: x.apply(np.exp), suffix="exp")
[docs]
def square(self):
"""
Get the square of the feature.
:return: A new Feature with squared values
"""
return self.apply(lambda x: x ** 2, suffix="square")
[docs]
def sqrt(self):
"""
Get the square root of the feature.
:return: A new Feature with square root values
"""
return self.apply(lambda x: x.apply(lambda v: np.sqrt(v) if v >= 0 else np.nan), suffix="sqrt")
[docs]
def rolling_mean(self, window):
"""
Calculate the rolling mean of the feature.
:param window: Rolling window size
:return: A new Feature with rolling mean values
"""
return self.apply(lambda x: x.rolling(window=window).mean(), suffix=f"rmean{window}")
[docs]
def ema(self, span, adjust=True):
"""
Calculate the Exponential Moving Average (EMA) of the feature.
:param span: Span for the EMA calculation
:param adjust: Whether to adjust the EMA calculation (default is True)
:return: A new Feature with EMA values
"""
return self.apply(lambda x: x.ewm(span=span, adjust=adjust).mean(), suffix=f"ema{span}")
[docs]
def rolling_sum(self, window):
"""
Calculate the rolling sum of the feature.
:param window: Rolling window size
:return: A new Feature with rolling sum values
"""
return self.apply(lambda x: x.rolling(window=window).sum(), suffix=f"rsum{window}")
[docs]
def rolling_std(self, window):
"""
Calculate the rolling standard deviation of the feature.
:param window: Rolling window size
:return: A new Feature with rolling std values
"""
return self.apply(lambda x: x.rolling(window=window).std(), suffix=f"rstd{window}")
[docs]
def lag(self, period):
"""
Create a lagged version of the feature.
:param period: Number of periods to lag
:return: A new Feature with lagged values
"""
return self.apply(lambda x: x.shift(period), suffix=f"lag{period}")
[docs]
@staticmethod
def min(a, b):
"""
Calculate the element-wise minimum between two features.
:param a: First feature or scalar
:param b: Second feature or scalar
:return: A new Feature containing the element-wise minimum
"""
if isinstance(a, Feature) and isinstance(b, Feature):
return Feature(MinMaxOpTransform(a.transform, b.transform, "min", lambda x, y: np.minimum(x, y)))
elif isinstance(a, Feature) and isinstance(b, (int, float)):
return Feature(ConstantOpTransform(a.transform, b, "min", lambda x, c: np.minimum(x, c)))
elif isinstance(b, Feature) and isinstance(a, (int, float)):
return Feature(ConstantOpTransform(b.transform, a, "min", lambda x, c: np.minimum(x, c)))
return NotImplemented
[docs]
@staticmethod
def max(a, b):
"""
Calculate the element-wise maximum between two features.
:param a: First feature or scalar
:param b: Second feature or scalar
:return: A new Feature containing the element-wise maximum
"""
if isinstance(a, Feature) and isinstance(b, Feature):
return Feature(MinMaxOpTransform(a.transform, b.transform, "max", lambda x, y: np.maximum(x, y)))
elif isinstance(a, Feature) and isinstance(b, (int, float)):
return Feature(ConstantOpTransform(a.transform, b, "max", lambda x, c: np.maximum(x, c)))
elif isinstance(b, Feature) and isinstance(a, (int, float)):
return Feature(ConstantOpTransform(b.transform, a, "max", lambda x, c: np.maximum(x, c)))
return NotImplemented
[docs]
class Compose(BaseTransform):
def __init__(self, *transforms: SISOTransform|MISOTransform):
requires = transforms[0].requires[0] # First tfs determines the source column
first_output = transforms[0].output_name
produces = "_".join([first_output] + [t.produces[0] for t in transforms[1:]])
super().__init__(requires, produces)
self.transforms = transforms
def _validate_input(self, x: pd.DataFrame) -> bool:
"""
Validate that the input DataFrame contains the required columns for all transforms.
:param x: DataFrame to validate
:return: True if the input is valid
"""
if not isinstance(x, pd.DataFrame):
raise TypeError("Input must be a pandas DataFrame")
if self.requires[0] not in x.columns:
raise ValueError(f"Input column {self.requires} not found in DataFrame")
return True
@property
def output_name(self) -> str:
"""
Get the output name of the composed transform.
The output name is a combination of the first transform's output and the subsequent transforms' produces.
:return: Output name
"""
return self.produces[0]
def _run_pipeline(self, x: pd.DataFrame, *, backend) -> pd.Series:
"""
Apply the composed transforms to the input DataFrame.
:param x: DataFrame to transform
:param backend: Backend is already specified in the transforms
:return: Transformed Series
"""
self._validate_input(x)
series_out = None
for i, tfs in enumerate(self.transforms):
if i == 0:
# First transform on the input DataFrame
# Check if the first product is already in the DataFrame (Often the case for the first transform in the chain)
if tfs.produces[0] in x.columns:
series_out = x[tfs.produces[0]]
else:
series_out = tfs(x)
else:
# Subsequent transforms on the output of the previous transform
# print(tfs.requires[0])
series_out = tfs(pd.DataFrame(series_out.values, index=series_out.index, columns=[tfs.requires[0]]), backend=backend)
# Return the final output Series with the composed name
series_out.name = self.output_name
return series_out
def __call__(self, x: pd.DataFrame, *, backend="nb") -> pd.Series:
"""
Apply the composed transforms to the input DataFrame.
:param x: DataFrame to transform
:param backend: Backend to use for the transform. Can be "pd" or "nb". Default is "nb".
:return: Transformed Series
"""
assert backend == "pd" or backend == "nb", "Backend must be either 'pd' or 'nb'."
return self._run_pipeline(x, backend=backend)
[docs]
class FeatureKit:
def __init__(self, features: list[Feature], retain: list[str] = None):
self.features = features
self.retain = retain or []
[docs]
def build(self, df, *, backend="nb", timeit=False):
out = df[self.retain].copy()
df = df.copy()
timing_info = {}
for feat in self.features:
if timeit:
start_time = time.time()
res = feat(df, cache=df, backend=backend)
if timeit:
elapsed = time.time() - start_time
timing_info[feat.name] = elapsed
if isinstance(res, pd.Series):
# Single output transform case
out[feat.name] = res
df[feat.transform.output_name] = res # cache the result in the DataFrame (for compose transforms)
elif isinstance(res, tuple):
# Multi output transform case
for item in res:
out[item.name] = item
df[item.name] = item # cache the result in the DataFrame (for compose transforms)
else:
raise TypeError(f"Transform {feat} returned unexpected type: {type(res)}")
if timeit:
# Create a simple console plot for timing information
print("\nFeature Timing Analysis:")
print("=======================")
# Sort features by execution time
sorted_times = sorted(timing_info.items(), key=lambda x: x[1], reverse=True)
# Find the max time for scaling
max_time = max(t for _, t in sorted_times) if sorted_times else 0
max_bar_length = 50 # Maximum number of characters for the bar
# Print bars for each feature
for feature_name, time_taken in sorted_times:
bar_length = int((time_taken / max_time) * max_bar_length) if max_time > 0 else 0
bar = '█' * bar_length
print(f"{feature_name:<30} | {bar} {time_taken:.4f}s")
return out