MLOps

Arhitectura Feature Store: ML Feature Engineering

Petru Constantin
--9 min lectura
#feature store#feature engineering#MLOps#ML infrastructure#Feast#ML pipeline

Arhitectura Feature Store: Design Patterns pentru Feature Engineering in ML

Feature store-urile rezolva una dintre cele mai comune probleme din ML de productie: training-serving skew. Cand feature-urile sunt calculate diferit in timpul antrenamentului (batch, in notebook-uri) si al serving-ului (real-time, in productie), performanta modelului se degradeaza in tacere. Un feature store asigura consistenta oferind o singura sursa de adevar pentru calculul si stocarea feature-urilor.

Problema Training-Serving Skew

Sa luam un model de predictie a churn-ului care foloseste "valoarea medie a comenzilor din ultimele 30 de zile":

# In timpul antrenamentului (data scientist in notebook)
df["avg_order_value_30d"] = df.groupby("customer_id")["order_value"].transform(
    lambda x: x.rolling("30D").mean()
)
 
# In timpul serving-ului (engineer in productie)
# SELECT AVG(order_value) FROM orders
# WHERE customer_id = ? AND created_at > NOW() - INTERVAL '30 days'

Aceste doua implementari vor produce rezultate diferite din cauza:

  • Gestionarii diferite a cazurilor limita (null-uri, primele 30 de zile)
  • Semanticii diferite a timestamp-urilor (sfarsit de zi vs. point-in-time)
  • Surselor de date diferite (snapshot data lake vs. baza de date live)

Un feature store elimina aceasta problema calculand feature-urile o singura data si servind-le consistent.

Arhitectura Feature Store

Un feature store are patru componente fundamentale:

┌─────────────────────────────────────────────────────────────┐
│                    Feature Store Architecture                  │
├─────────────────────────────────────────────────────────────┤
│                                                               │
│  Feature Definitions (Code)                                   │
│  ┌──────────────────────────────────────────────────────┐    │
│  │  feature_views.py, Declarative feature definitions    │    │
│  │  entities.py, Entity definitions (customer, product)  │    │
│  │  sources.py, Data source connections                  │    │
│  └──────────────────────────────────────────────────────┘    │
│                          │                                     │
│              ┌───────────┴───────────┐                        │
│              ▼                       ▼                        │
│  ┌───────────────────┐   ┌───────────────────┐               │
│  │   Offline Store    │   │   Online Store     │               │
│  │  (Date antrenament)│   │  (Date serving)    │               │
│  │                    │   │                    │               │
│  │  - BigQuery/S3     │   │  - Redis/DynamoDB  │               │
│  │  - Istoric complet │   │  - Valori recente  │               │
│  │  - Citiri batch    │   │  - Latenta scazuta │               │
│  │  - Point-in-time   │   │  - < 10ms citiri   │               │
│  └───────────────────┘   └───────────────────┘               │
│              │                       │                        │
│              ▼                       ▼                        │
│  ┌───────────────────┐   ┌───────────────────┐               │
│  │  Training Pipeline │   │  Serving Endpoint  │               │
│  │  (Kubeflow/Airflow)│   │  (FastAPI/gRPC)    │               │
│  └───────────────────┘   └───────────────────┘               │
└─────────────────────────────────────────────────────────────┘

Offline Store

  • Stocheaza istoricul complet al valorilor feature-urilor
  • Folosit pentru generarea datelor de antrenament cu corectitudine point-in-time
  • Bazat pe data warehouses (BigQuery, Snowflake) sau stocare obiecte (S3/Parquet)
  • Citiri batch, throughput ridicat, latenta mai mare acceptabila

Online Store

  • Stocheaza cele mai recente valori ale feature-urilor pentru fiecare entitate
  • Folosit pentru serving in timp real (inferenta modelului)
  • Bazat pe key-value stores cu latenta scazuta (Redis, DynamoDB)
  • Citiri sub milisecunda necesare

Implementare cu Feast

Feast este cel mai popular feature store open-source. Iata o implementare completa:

Definirea entitatilor si surselor

# features/entities.py
from feast import Entity, ValueType
 
customer = Entity(
    name="customer_id",
    value_type=ValueType.INT64,
    description="Unique customer identifier",
)
 
product = Entity(
    name="product_id",
    value_type=ValueType.STRING,
    description="Product SKU",
)
# features/sources.py
from feast import FileSource, BigQuerySource
from datetime import timedelta
 
# Pentru dezvoltare: sursa bazata pe fisiere
customer_source_file = FileSource(
    path="data/customer_features.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)
 
# Pentru productie: sursa BigQuery
customer_source_bq = BigQuerySource(
    table="ml_features.customer_daily_features",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)
 
# Sursa real-time pentru streaming features
transaction_source = FileSource(
    path="data/transaction_features.parquet",
    timestamp_field="event_timestamp",
)

Definirea Feature Views

# features/feature_views.py
from feast import FeatureView, Feature, ValueType
from datetime import timedelta
from features.entities import customer, product
from features.sources import customer_source_bq, transaction_source
 
customer_profile_features = FeatureView(
    name="customer_profile",
    entities=[customer],
    ttl=timedelta(days=1),
    features=[
        Feature(name="account_age_days", dtype=ValueType.INT64),
        Feature(name="total_orders", dtype=ValueType.INT64),
        Feature(name="lifetime_value", dtype=ValueType.DOUBLE),
        Feature(name="preferred_category", dtype=ValueType.STRING),
        Feature(name="email_verified", dtype=ValueType.BOOL),
    ],
    online=True,
    source=customer_source_bq,
    tags={"team": "ml-platform", "domain": "customer"},
)
 
customer_behavioral_features = FeatureView(
    name="customer_behavior",
    entities=[customer],
    ttl=timedelta(hours=6),  # Actualizat mai frecvent
    features=[
        Feature(name="purchases_last_7d", dtype=ValueType.INT64),
        Feature(name="purchases_last_30d", dtype=ValueType.INT64),
        Feature(name="avg_order_value_30d", dtype=ValueType.DOUBLE),
        Feature(name="days_since_last_purchase", dtype=ValueType.INT64),
        Feature(name="cart_abandonment_rate_30d", dtype=ValueType.DOUBLE),
        Feature(name="support_tickets_last_30d", dtype=ValueType.INT64),
        Feature(name="page_views_last_7d", dtype=ValueType.INT64),
    ],
    online=True,
    source=customer_source_bq,
    tags={"team": "ml-platform", "domain": "customer"},
)

Aplicare si materializare

# Aplica definitiile feature-urilor
feast apply
 
# Materializeaza feature-urile in online store
feast materialize 2026-01-01T00:00:00 2026-02-05T00:00:00
 
# Materializare incrementala (rulata zilnic)
feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")

Antrenament: Recuperare feature-uri Point-in-Time

from feast import FeatureStore
import pandas as pd
 
store = FeatureStore(repo_path="features/")
 
# DataFrame de entitati pentru antrenament cu timestamp-uri
entity_df = pd.DataFrame({
    "customer_id": [1001, 1002, 1003, 1001, 1002],
    "event_timestamp": pd.to_datetime([
        "2026-01-15", "2026-01-15", "2026-01-15",
        "2026-02-01", "2026-02-01",
    ]),
    "label": [1, 0, 0, 1, 1],  # etichete churn
})
 
# Obtine feature-uri istorice cu corectitudine point-in-time
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "customer_profile:account_age_days",
        "customer_profile:total_orders",
        "customer_profile:lifetime_value",
        "customer_behavior:purchases_last_30d",
        "customer_behavior:avg_order_value_30d",
        "customer_behavior:days_since_last_purchase",
        "customer_behavior:cart_abandonment_rate_30d",
    ],
).to_df()
 
print(training_df.head())
# customer_id  event_timestamp  label  account_age_days  total_orders  ...
# 1001         2026-01-15       1      365               23            ...
# 1002         2026-01-15       0      180               8             ...

Corectitudinea point-in-time inseamna: pentru clientul 1001 pe 2026-01-15, feature store-ul returneaza valorile feature-urilor asa cum erau la acea data, nu cele mai recente valori. Aceasta previne scurgerea de date (folosirea informatiilor viitoare pentru a prezice evenimente trecute).

Serving: Recuperare feature-uri online

from feast import FeatureStore
 
store = FeatureStore(repo_path="features/")
 
# Obtine cele mai recente feature-uri pentru predictie in timp real
online_features = store.get_online_features(
    features=[
        "customer_profile:account_age_days",
        "customer_profile:total_orders",
        "customer_profile:lifetime_value",
        "customer_behavior:purchases_last_30d",
        "customer_behavior:avg_order_value_30d",
        "customer_behavior:days_since_last_purchase",
    ],
    entity_rows=[{"customer_id": 1001}],
).to_dict()
 
print(online_features)
# {
#   "customer_id": [1001],
#   "account_age_days": [380],
#   "total_orders": [27],
#   "lifetime_value": [4523.50],
#   "purchases_last_30d": [3],
#   "avg_order_value_30d": [167.88],
#   "days_since_last_purchase": [5],
# }

Integrare cu model serving

from fastapi import FastAPI
from feast import FeatureStore
import mlflow.pyfunc
 
app = FastAPI()
feature_store = FeatureStore(repo_path="features/")
model = mlflow.pyfunc.load_model("models:/churn-predictor/Production")
 
FEATURE_LIST = [
    "customer_profile:account_age_days",
    "customer_profile:total_orders",
    "customer_profile:lifetime_value",
    "customer_behavior:purchases_last_30d",
    "customer_behavior:avg_order_value_30d",
    "customer_behavior:days_since_last_purchase",
    "customer_behavior:cart_abandonment_rate_30d",
]
 
@app.post("/predict/churn")
async def predict_churn(customer_id: int):
    # Recupereaza feature-urile din online store
    features = feature_store.get_online_features(
        features=FEATURE_LIST,
        entity_rows=[{"customer_id": customer_id}],
    ).to_df()
 
    # Ruleaza predictia
    prediction = model.predict(features.drop(columns=["customer_id"]))
 
    return {
        "customer_id": customer_id,
        "churn_probability": float(prediction[0]),
        "features_used": features.to_dict(orient="records")[0],
    }

Design Pattern: Feature-uri on-demand

Unele feature-uri trebuie calculate la momentul cererii (de exemplu, "timpul de la ultima vizualizare de pagina" se schimba in fiecare secunda). Foloseste on-demand feature views:

from feast import on_demand_feature_view, Field
from feast.types import Float64
 
@on_demand_feature_view(
    sources=[customer_behavioral_features],
    schema=[
        Field(name="recency_score", dtype=Float64),
        Field(name="activity_score", dtype=Float64),
    ],
)
def customer_scores(inputs: pd.DataFrame) -> pd.DataFrame:
    """Calculeaza feature-uri derivate on demand."""
    df = pd.DataFrame()
    # Scor recenta: 0-1, mai mare = mai recent
    df["recency_score"] = 1.0 / (1.0 + inputs["days_since_last_purchase"] / 30.0)
    # Scor activitate: combina mai multe semnale
    df["activity_score"] = (
        inputs["purchases_last_7d"] * 0.4
        + inputs["page_views_last_7d"] * 0.01
        + (1 - inputs["cart_abandonment_rate_30d"]) * 0.3
    )
    return df

Design Pattern: Streaming Features

Pentru feature-uri real-time care se actualizeaza cu fiecare eveniment (de exemplu, numar curent de achizitii astazi):

# Kafka → Flink/Spark Streaming → Online Store
 
# Job Flink pentru calculul streaming features
from pyflink.table import StreamTableEnvironment
 
t_env = StreamTableEnvironment.create()
 
# Citire din Kafka
t_env.execute_sql("""
    CREATE TABLE transactions (
        customer_id BIGINT,
        order_value DOUBLE,
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'transactions',
        'format' = 'json'
    )
""")
 
# Calcul streaming features
t_env.execute_sql("""
    INSERT INTO feature_store_sink
    SELECT
        customer_id,
        COUNT(*) OVER w AS purchases_today,
        SUM(order_value) OVER w AS spend_today,
        AVG(order_value) OVER w AS avg_order_today
    FROM transactions
    WINDOW w AS (
        PARTITION BY customer_id
        ORDER BY event_time
        RANGE BETWEEN INTERVAL '1' DAY PRECEDING AND CURRENT ROW
    )
""")

Implementare custom de Feature Store

Pentru echipele care au nevoie de mai mult control sau au cerinte mai simple, iata un feature store custom minimal:

from datetime import datetime
from typing import Any
import redis
import pyarrow.parquet as pq
 
class SimpleFeatureStore:
    """Minimal feature store with offline (Parquet) + online (Redis) stores."""
 
    def __init__(self, offline_path: str, redis_url: str):
        self.offline_path = offline_path
        self.redis = redis.from_url(redis_url)
 
    def materialize(self, feature_view: str, df):
        """Write features to both offline and online stores."""
        # Offline: append to Parquet
        path = f"{self.offline_path}/{feature_view}"
        pq.write_to_dataset(df.to_arrow(), path, partition_cols=["date"])
 
        # Online: write latest values to Redis
        for _, row in df.iterrows():
            entity_key = f"{feature_view}:{row['entity_id']}"
            features = row.drop(["entity_id", "event_timestamp", "date"]).to_dict()
            self.redis.hset(entity_key, mapping={k: str(v) for k, v in features.items()})
            self.redis.expire(entity_key, 86400 * 7)  # 7-day TTL
 
    def get_online_features(self, feature_view: str, entity_id: str) -> dict[str, Any]:
        """Fetch latest features from Redis."""
        key = f"{feature_view}:{entity_id}"
        raw = self.redis.hgetall(key)
        return {k.decode(): float(v.decode()) for k, v in raw.items()}
 
    def get_historical_features(self, feature_view: str, entity_df) -> Any:
        """Point-in-time join from offline store."""
        import pandas as pd
        features_df = pd.read_parquet(f"{self.offline_path}/{feature_view}")
        # Point-in-time join: pentru fiecare entitate+timestamp, obtine cele mai recente valori
        merged = pd.merge_asof(
            entity_df.sort_values("event_timestamp"),
            features_df.sort_values("event_timestamp"),
            on="event_timestamp",
            by="entity_id",
            direction="backward",
        )
        return merged

Comparatie Feature Stores

| Feature Store | Tip | Online Store | Offline Store | Streaming | Recomandat pentru | |---------------|------|-------------|--------------|-----------|----------| | Feast | Open source | Redis, DynamoDB | BigQuery, S3 | Kafka → push | Self-hosted, orice cloud | | Tecton | Managed | DynamoDB | S3/Snowflake | Nativ | Enterprise, real-time | | Hopsworks | Open source | RonDB | Hudi | Flink | On-prem, all-in-one | | Databricks | Managed | DynamoDB | Delta Lake | Spark | Utilizatori Databricks | | Vertex AI | Managed | Bigtable | BigQuery | Dataflow | Utilizatori GCP |

Cand ai nevoie de un Feature Store?

Ai nevoie daca:

  • Mai multe modele folosesc aceleasi feature-uri
  • Antrenamentul si serving-ul folosesc surse de date diferite
  • Ai nevoie de corectitudine point-in-time pentru antrenament
  • Calculul feature-urilor este costisitor si ar trebui stocat in cache
  • Ai nevoie de feature-uri real-time cu latenta < 50ms

Probabil nu ai nevoie daca:

  • Ai un singur model in productie
  • Feature-urile sunt transformari simple ale inputului brut
  • Toate modelele sunt batch (fara serving real-time)
  • Echipa ta are < 3 ingineri ML

Pasi urmatori

  • MLflow: Urmareste experimentele si gestioneaza modelele alaturi de feature store
  • Kubeflow Pipelines: Orchestreaza calculul feature-urilor si pipeline-urile de antrenament
  • Bune practici MLOps: Integreaza feature stores in fluxul tau ML
  • Ce este MLOps?: Intelege unde se incadreaza feature stores in ciclul de viata MLOps

Construiesti un feature store pentru platforma ta ML? DeviDevs proiecteaza si implementeaza feature stores de productie cu Feast, solutii custom sau platforme managed. Solicita o evaluare gratuita →


Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.