Arhitectura Feature Store: Design Patterns pentru Feature Engineering in ML
Feature store-urile rezolva una dintre cele mai comune probleme din ML de productie: training-serving skew. Cand feature-urile sunt calculate diferit in timpul antrenamentului (batch, in notebook-uri) si al serving-ului (real-time, in productie), performanta modelului se degradeaza in tacere. Un feature store asigura consistenta oferind o singura sursa de adevar pentru calculul si stocarea feature-urilor.
Problema Training-Serving Skew
Sa luam un model de predictie a churn-ului care foloseste "valoarea medie a comenzilor din ultimele 30 de zile":
# In timpul antrenamentului (data scientist in notebook)
df["avg_order_value_30d"] = df.groupby("customer_id")["order_value"].transform(
lambda x: x.rolling("30D").mean()
)
# In timpul serving-ului (engineer in productie)
# SELECT AVG(order_value) FROM orders
# WHERE customer_id = ? AND created_at > NOW() - INTERVAL '30 days'Aceste doua implementari vor produce rezultate diferite din cauza:
- Gestionarii diferite a cazurilor limita (null-uri, primele 30 de zile)
- Semanticii diferite a timestamp-urilor (sfarsit de zi vs. point-in-time)
- Surselor de date diferite (snapshot data lake vs. baza de date live)
Un feature store elimina aceasta problema calculand feature-urile o singura data si servind-le consistent.
Arhitectura Feature Store
Un feature store are patru componente fundamentale:
┌─────────────────────────────────────────────────────────────┐
│ Feature Store Architecture │
├─────────────────────────────────────────────────────────────┤
│ │
│ Feature Definitions (Code) │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ feature_views.py, Declarative feature definitions │ │
│ │ entities.py, Entity definitions (customer, product) │ │
│ │ sources.py, Data source connections │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────┴───────────┐ │
│ ▼ ▼ │
│ ┌───────────────────┐ ┌───────────────────┐ │
│ │ Offline Store │ │ Online Store │ │
│ │ (Date antrenament)│ │ (Date serving) │ │
│ │ │ │ │ │
│ │ - BigQuery/S3 │ │ - Redis/DynamoDB │ │
│ │ - Istoric complet │ │ - Valori recente │ │
│ │ - Citiri batch │ │ - Latenta scazuta │ │
│ │ - Point-in-time │ │ - < 10ms citiri │ │
│ └───────────────────┘ └───────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌───────────────────┐ ┌───────────────────┐ │
│ │ Training Pipeline │ │ Serving Endpoint │ │
│ │ (Kubeflow/Airflow)│ │ (FastAPI/gRPC) │ │
│ └───────────────────┘ └───────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Offline Store
- Stocheaza istoricul complet al valorilor feature-urilor
- Folosit pentru generarea datelor de antrenament cu corectitudine point-in-time
- Bazat pe data warehouses (BigQuery, Snowflake) sau stocare obiecte (S3/Parquet)
- Citiri batch, throughput ridicat, latenta mai mare acceptabila
Online Store
- Stocheaza cele mai recente valori ale feature-urilor pentru fiecare entitate
- Folosit pentru serving in timp real (inferenta modelului)
- Bazat pe key-value stores cu latenta scazuta (Redis, DynamoDB)
- Citiri sub milisecunda necesare
Implementare cu Feast
Feast este cel mai popular feature store open-source. Iata o implementare completa:
Definirea entitatilor si surselor
# features/entities.py
from feast import Entity, ValueType
customer = Entity(
name="customer_id",
value_type=ValueType.INT64,
description="Unique customer identifier",
)
product = Entity(
name="product_id",
value_type=ValueType.STRING,
description="Product SKU",
)# features/sources.py
from feast import FileSource, BigQuerySource
from datetime import timedelta
# Pentru dezvoltare: sursa bazata pe fisiere
customer_source_file = FileSource(
path="data/customer_features.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)
# Pentru productie: sursa BigQuery
customer_source_bq = BigQuerySource(
table="ml_features.customer_daily_features",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)
# Sursa real-time pentru streaming features
transaction_source = FileSource(
path="data/transaction_features.parquet",
timestamp_field="event_timestamp",
)Definirea Feature Views
# features/feature_views.py
from feast import FeatureView, Feature, ValueType
from datetime import timedelta
from features.entities import customer, product
from features.sources import customer_source_bq, transaction_source
customer_profile_features = FeatureView(
name="customer_profile",
entities=[customer],
ttl=timedelta(days=1),
features=[
Feature(name="account_age_days", dtype=ValueType.INT64),
Feature(name="total_orders", dtype=ValueType.INT64),
Feature(name="lifetime_value", dtype=ValueType.DOUBLE),
Feature(name="preferred_category", dtype=ValueType.STRING),
Feature(name="email_verified", dtype=ValueType.BOOL),
],
online=True,
source=customer_source_bq,
tags={"team": "ml-platform", "domain": "customer"},
)
customer_behavioral_features = FeatureView(
name="customer_behavior",
entities=[customer],
ttl=timedelta(hours=6), # Actualizat mai frecvent
features=[
Feature(name="purchases_last_7d", dtype=ValueType.INT64),
Feature(name="purchases_last_30d", dtype=ValueType.INT64),
Feature(name="avg_order_value_30d", dtype=ValueType.DOUBLE),
Feature(name="days_since_last_purchase", dtype=ValueType.INT64),
Feature(name="cart_abandonment_rate_30d", dtype=ValueType.DOUBLE),
Feature(name="support_tickets_last_30d", dtype=ValueType.INT64),
Feature(name="page_views_last_7d", dtype=ValueType.INT64),
],
online=True,
source=customer_source_bq,
tags={"team": "ml-platform", "domain": "customer"},
)Aplicare si materializare
# Aplica definitiile feature-urilor
feast apply
# Materializeaza feature-urile in online store
feast materialize 2026-01-01T00:00:00 2026-02-05T00:00:00
# Materializare incrementala (rulata zilnic)
feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")Antrenament: Recuperare feature-uri Point-in-Time
from feast import FeatureStore
import pandas as pd
store = FeatureStore(repo_path="features/")
# DataFrame de entitati pentru antrenament cu timestamp-uri
entity_df = pd.DataFrame({
"customer_id": [1001, 1002, 1003, 1001, 1002],
"event_timestamp": pd.to_datetime([
"2026-01-15", "2026-01-15", "2026-01-15",
"2026-02-01", "2026-02-01",
]),
"label": [1, 0, 0, 1, 1], # etichete churn
})
# Obtine feature-uri istorice cu corectitudine point-in-time
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"customer_profile:account_age_days",
"customer_profile:total_orders",
"customer_profile:lifetime_value",
"customer_behavior:purchases_last_30d",
"customer_behavior:avg_order_value_30d",
"customer_behavior:days_since_last_purchase",
"customer_behavior:cart_abandonment_rate_30d",
],
).to_df()
print(training_df.head())
# customer_id event_timestamp label account_age_days total_orders ...
# 1001 2026-01-15 1 365 23 ...
# 1002 2026-01-15 0 180 8 ...Corectitudinea point-in-time inseamna: pentru clientul 1001 pe 2026-01-15, feature store-ul returneaza valorile feature-urilor asa cum erau la acea data, nu cele mai recente valori. Aceasta previne scurgerea de date (folosirea informatiilor viitoare pentru a prezice evenimente trecute).
Serving: Recuperare feature-uri online
from feast import FeatureStore
store = FeatureStore(repo_path="features/")
# Obtine cele mai recente feature-uri pentru predictie in timp real
online_features = store.get_online_features(
features=[
"customer_profile:account_age_days",
"customer_profile:total_orders",
"customer_profile:lifetime_value",
"customer_behavior:purchases_last_30d",
"customer_behavior:avg_order_value_30d",
"customer_behavior:days_since_last_purchase",
],
entity_rows=[{"customer_id": 1001}],
).to_dict()
print(online_features)
# {
# "customer_id": [1001],
# "account_age_days": [380],
# "total_orders": [27],
# "lifetime_value": [4523.50],
# "purchases_last_30d": [3],
# "avg_order_value_30d": [167.88],
# "days_since_last_purchase": [5],
# }Integrare cu model serving
from fastapi import FastAPI
from feast import FeatureStore
import mlflow.pyfunc
app = FastAPI()
feature_store = FeatureStore(repo_path="features/")
model = mlflow.pyfunc.load_model("models:/churn-predictor/Production")
FEATURE_LIST = [
"customer_profile:account_age_days",
"customer_profile:total_orders",
"customer_profile:lifetime_value",
"customer_behavior:purchases_last_30d",
"customer_behavior:avg_order_value_30d",
"customer_behavior:days_since_last_purchase",
"customer_behavior:cart_abandonment_rate_30d",
]
@app.post("/predict/churn")
async def predict_churn(customer_id: int):
# Recupereaza feature-urile din online store
features = feature_store.get_online_features(
features=FEATURE_LIST,
entity_rows=[{"customer_id": customer_id}],
).to_df()
# Ruleaza predictia
prediction = model.predict(features.drop(columns=["customer_id"]))
return {
"customer_id": customer_id,
"churn_probability": float(prediction[0]),
"features_used": features.to_dict(orient="records")[0],
}Design Pattern: Feature-uri on-demand
Unele feature-uri trebuie calculate la momentul cererii (de exemplu, "timpul de la ultima vizualizare de pagina" se schimba in fiecare secunda). Foloseste on-demand feature views:
from feast import on_demand_feature_view, Field
from feast.types import Float64
@on_demand_feature_view(
sources=[customer_behavioral_features],
schema=[
Field(name="recency_score", dtype=Float64),
Field(name="activity_score", dtype=Float64),
],
)
def customer_scores(inputs: pd.DataFrame) -> pd.DataFrame:
"""Calculeaza feature-uri derivate on demand."""
df = pd.DataFrame()
# Scor recenta: 0-1, mai mare = mai recent
df["recency_score"] = 1.0 / (1.0 + inputs["days_since_last_purchase"] / 30.0)
# Scor activitate: combina mai multe semnale
df["activity_score"] = (
inputs["purchases_last_7d"] * 0.4
+ inputs["page_views_last_7d"] * 0.01
+ (1 - inputs["cart_abandonment_rate_30d"]) * 0.3
)
return dfDesign Pattern: Streaming Features
Pentru feature-uri real-time care se actualizeaza cu fiecare eveniment (de exemplu, numar curent de achizitii astazi):
# Kafka → Flink/Spark Streaming → Online Store
# Job Flink pentru calculul streaming features
from pyflink.table import StreamTableEnvironment
t_env = StreamTableEnvironment.create()
# Citire din Kafka
t_env.execute_sql("""
CREATE TABLE transactions (
customer_id BIGINT,
order_value DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'format' = 'json'
)
""")
# Calcul streaming features
t_env.execute_sql("""
INSERT INTO feature_store_sink
SELECT
customer_id,
COUNT(*) OVER w AS purchases_today,
SUM(order_value) OVER w AS spend_today,
AVG(order_value) OVER w AS avg_order_today
FROM transactions
WINDOW w AS (
PARTITION BY customer_id
ORDER BY event_time
RANGE BETWEEN INTERVAL '1' DAY PRECEDING AND CURRENT ROW
)
""")Implementare custom de Feature Store
Pentru echipele care au nevoie de mai mult control sau au cerinte mai simple, iata un feature store custom minimal:
from datetime import datetime
from typing import Any
import redis
import pyarrow.parquet as pq
class SimpleFeatureStore:
"""Minimal feature store with offline (Parquet) + online (Redis) stores."""
def __init__(self, offline_path: str, redis_url: str):
self.offline_path = offline_path
self.redis = redis.from_url(redis_url)
def materialize(self, feature_view: str, df):
"""Write features to both offline and online stores."""
# Offline: append to Parquet
path = f"{self.offline_path}/{feature_view}"
pq.write_to_dataset(df.to_arrow(), path, partition_cols=["date"])
# Online: write latest values to Redis
for _, row in df.iterrows():
entity_key = f"{feature_view}:{row['entity_id']}"
features = row.drop(["entity_id", "event_timestamp", "date"]).to_dict()
self.redis.hset(entity_key, mapping={k: str(v) for k, v in features.items()})
self.redis.expire(entity_key, 86400 * 7) # 7-day TTL
def get_online_features(self, feature_view: str, entity_id: str) -> dict[str, Any]:
"""Fetch latest features from Redis."""
key = f"{feature_view}:{entity_id}"
raw = self.redis.hgetall(key)
return {k.decode(): float(v.decode()) for k, v in raw.items()}
def get_historical_features(self, feature_view: str, entity_df) -> Any:
"""Point-in-time join from offline store."""
import pandas as pd
features_df = pd.read_parquet(f"{self.offline_path}/{feature_view}")
# Point-in-time join: pentru fiecare entitate+timestamp, obtine cele mai recente valori
merged = pd.merge_asof(
entity_df.sort_values("event_timestamp"),
features_df.sort_values("event_timestamp"),
on="event_timestamp",
by="entity_id",
direction="backward",
)
return mergedComparatie Feature Stores
| Feature Store | Tip | Online Store | Offline Store | Streaming | Recomandat pentru | |---------------|------|-------------|--------------|-----------|----------| | Feast | Open source | Redis, DynamoDB | BigQuery, S3 | Kafka → push | Self-hosted, orice cloud | | Tecton | Managed | DynamoDB | S3/Snowflake | Nativ | Enterprise, real-time | | Hopsworks | Open source | RonDB | Hudi | Flink | On-prem, all-in-one | | Databricks | Managed | DynamoDB | Delta Lake | Spark | Utilizatori Databricks | | Vertex AI | Managed | Bigtable | BigQuery | Dataflow | Utilizatori GCP |
Cand ai nevoie de un Feature Store?
Ai nevoie daca:
- Mai multe modele folosesc aceleasi feature-uri
- Antrenamentul si serving-ul folosesc surse de date diferite
- Ai nevoie de corectitudine point-in-time pentru antrenament
- Calculul feature-urilor este costisitor si ar trebui stocat in cache
- Ai nevoie de feature-uri real-time cu latenta < 50ms
Probabil nu ai nevoie daca:
- Ai un singur model in productie
- Feature-urile sunt transformari simple ale inputului brut
- Toate modelele sunt batch (fara serving real-time)
- Echipa ta are < 3 ingineri ML
Pasi urmatori
- MLflow: Urmareste experimentele si gestioneaza modelele alaturi de feature store
- Kubeflow Pipelines: Orchestreaza calculul feature-urilor si pipeline-urile de antrenament
- Bune practici MLOps: Integreaza feature stores in fluxul tau ML
- Ce este MLOps?: Intelege unde se incadreaza feature stores in ciclul de viata MLOps
Construiesti un feature store pentru platforma ta ML? DeviDevs proiecteaza si implementeaza feature stores de productie cu Feast, solutii custom sau platforme managed. Solicita o evaluare gratuita →
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →