California's privacy laws (CCPA/CPRA) require technical implementations for consumer rights, data handling, and privacy notices. This guide provides practical code for achieving and maintaining compliance.
CCPA/CPRA Requirements Overview
Compliance Framework
# ccpa_compliance_framework.py
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
from datetime import datetime
class ConsumerRight(Enum):
RIGHT_TO_KNOW = "right_to_know"
RIGHT_TO_DELETE = "right_to_delete"
RIGHT_TO_CORRECT = "right_to_correct" # CPRA addition
RIGHT_TO_PORTABILITY = "right_to_portability"
RIGHT_TO_OPT_OUT_SALE = "right_to_opt_out_sale"
RIGHT_TO_OPT_OUT_SHARING = "right_to_opt_out_sharing" # CPRA
RIGHT_TO_LIMIT_SENSITIVE = "right_to_limit_sensitive" # CPRA
class DataCategory(Enum):
IDENTIFIERS = "identifiers"
COMMERCIAL = "commercial_information"
BIOMETRIC = "biometric_information"
INTERNET_ACTIVITY = "internet_activity"
GEOLOCATION = "geolocation_data"
AUDIO_VISUAL = "audio_visual"
PROFESSIONAL = "professional_information"
EDUCATION = "education_information"
INFERENCES = "inferences"
SENSITIVE = "sensitive_personal_information" # CPRA
@dataclass
class PersonalInfoCategory:
category: DataCategory
examples: List[str]
sources: List[str]
purposes: List[str]
third_parties_shared: List[str]
retention_period: str
is_sold: bool
is_shared: bool # CPRA
@dataclass
class ConsumerRequest:
request_id: str
consumer_id: str
request_type: ConsumerRight
submitted_at: datetime
verified_at: Optional[datetime]
completed_at: Optional[datetime]
status: str
verification_method: str
response_data: Optional[Dict] = None
class CCPAComplianceManager:
"""Manage CCPA/CPRA compliance operations"""
def __init__(self, db_connection):
self.db = db_connection
self.data_inventory = self._build_data_inventory()
def _build_data_inventory(self) -> List[PersonalInfoCategory]:
"""Build inventory of personal information collected"""
return [
PersonalInfoCategory(
category=DataCategory.IDENTIFIERS,
examples=["Name", "Email", "IP address", "Account ID"],
sources=["Directly from consumer", "Automatic collection"],
purposes=["Provide services", "Customer support", "Security"],
third_parties_shared=["Service providers", "Analytics"],
retention_period="Duration of account + 3 years",
is_sold=False,
is_shared=True
),
PersonalInfoCategory(
category=DataCategory.COMMERCIAL,
examples=["Purchase history", "Products viewed", "Payment info"],
sources=["Directly from consumer", "Transaction records"],
purposes=["Order fulfillment", "Recommendations"],
third_parties_shared=["Payment processors", "Shipping"],
retention_period="7 years (financial records)",
is_sold=False,
is_shared=False
),
PersonalInfoCategory(
category=DataCategory.INTERNET_ACTIVITY,
examples=["Browsing history", "Search queries", "Clicks"],
sources=["Automatic collection via cookies"],
purposes=["Site improvement", "Personalization"],
third_parties_shared=["Analytics providers", "Ad networks"],
retention_period="2 years",
is_sold=False,
is_shared=True
),
PersonalInfoCategory(
category=DataCategory.GEOLOCATION,
examples=["IP-based location", "GPS coordinates"],
sources=["Automatic collection"],
purposes=["Fraud prevention", "Localized content"],
third_parties_shared=["Fraud prevention services"],
retention_period="90 days",
is_sold=False,
is_shared=False
),
PersonalInfoCategory(
category=DataCategory.SENSITIVE,
examples=["Social Security number", "Financial account", "Precise geolocation"],
sources=["Directly from consumer"],
purposes=["Identity verification", "Financial services"],
third_parties_shared=["Verification services"],
retention_period="As required by law",
is_sold=False,
is_shared=False
)
]
def generate_privacy_notice(self) -> Dict:
"""Generate CCPA-compliant privacy notice content"""
return {
"last_updated": datetime.utcnow().isoformat(),
"categories_collected": [
{
"category": cat.category.value,
"examples": cat.examples,
"sources": cat.sources,
"purposes": cat.purposes,
"shared_with": cat.third_parties_shared,
"sold": cat.is_sold,
"retention": cat.retention_period
}
for cat in self.data_inventory
],
"consumer_rights": [
{
"right": "Right to Know",
"description": "Request disclosure of personal information collected"
},
{
"right": "Right to Delete",
"description": "Request deletion of personal information"
},
{
"right": "Right to Correct",
"description": "Request correction of inaccurate information"
},
{
"right": "Right to Opt-Out of Sale/Sharing",
"description": "Direct us not to sell or share your personal information"
},
{
"right": "Right to Limit Sensitive PI",
"description": "Limit use of sensitive personal information"
},
{
"right": "Right to Non-Discrimination",
"description": "Exercise rights without discriminatory treatment"
}
],
"contact": {
"email": "privacy@example.com",
"phone": "1-800-PRIVACY",
"form_url": "/privacy/request"
}
}Consumer Rights Request Handling
Request Verification
# consumer_request_handler.py
from datetime import datetime, timedelta
import secrets
import hashlib
from typing import Optional, Dict
class ConsumerRequestHandler:
"""Handle consumer rights requests per CCPA/CPRA"""
VERIFICATION_METHODS = ["email", "sms", "knowledge_based", "authorized_agent"]
def __init__(self, db, email_service, sms_service):
self.db = db
self.email = email_service
self.sms = sms_service
def submit_request(
self,
email: str,
request_type: ConsumerRight,
details: Dict = None
) -> str:
"""Submit new consumer request"""
# Generate request ID
request_id = f"CCPA-{datetime.utcnow().strftime('%Y%m%d')}-{secrets.token_hex(4).upper()}"
# Look up consumer
consumer = self.db.fetchone(
"SELECT id, email, phone FROM users WHERE email = %s",
(email,)
)
if not consumer:
# Still create request for non-customers
consumer = {"id": None, "email": email, "phone": None}
# Create request record
self.db.execute("""
INSERT INTO ccpa_requests
(request_id, consumer_id, email, request_type, status, submitted_at, details)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", (
request_id,
consumer.get("id"),
email,
request_type.value,
"pending_verification",
datetime.utcnow(),
json.dumps(details or {})
))
# Initiate verification
self._send_verification(request_id, email, consumer.get("phone"))
return request_id
def _send_verification(
self,
request_id: str,
email: str,
phone: Optional[str]
):
"""Send verification to consumer"""
# Generate verification code
code = secrets.token_urlsafe(32)
code_hash = hashlib.sha256(code.encode()).hexdigest()
# Store verification
self.db.execute("""
INSERT INTO request_verifications
(request_id, code_hash, method, expires_at)
VALUES (%s, %s, %s, %s)
""", (
request_id,
code_hash,
"email",
datetime.utcnow() + timedelta(hours=24)
))
# Send email
verification_url = f"https://example.com/privacy/verify?request={request_id}&code={code}"
self.email.send(
to=email,
subject="Verify Your Privacy Request",
template="ccpa_verification",
data={
"request_id": request_id,
"verification_url": verification_url,
"expires_in": "24 hours"
}
)
def verify_request(self, request_id: str, code: str) -> bool:
"""Verify consumer request"""
code_hash = hashlib.sha256(code.encode()).hexdigest()
verification = self.db.fetchone("""
SELECT * FROM request_verifications
WHERE request_id = %s AND code_hash = %s AND expires_at > %s
""", (request_id, code_hash, datetime.utcnow()))
if not verification:
return False
# Update request status
self.db.execute("""
UPDATE ccpa_requests
SET status = 'verified', verified_at = %s
WHERE request_id = %s
""", (datetime.utcnow(), request_id))
# Mark verification as used
self.db.execute("""
DELETE FROM request_verifications WHERE request_id = %s
""", (request_id,))
return True
def process_request(self, request_id: str) -> Dict:
"""Process verified consumer request"""
request = self.db.fetchone(
"SELECT * FROM ccpa_requests WHERE request_id = %s",
(request_id,)
)
if not request or request["status"] != "verified":
raise ValueError("Request not found or not verified")
request_type = ConsumerRight(request["request_type"])
if request_type == ConsumerRight.RIGHT_TO_KNOW:
result = self._handle_right_to_know(request)
elif request_type == ConsumerRight.RIGHT_TO_DELETE:
result = self._handle_right_to_delete(request)
elif request_type == ConsumerRight.RIGHT_TO_CORRECT:
result = self._handle_right_to_correct(request)
elif request_type == ConsumerRight.RIGHT_TO_PORTABILITY:
result = self._handle_right_to_portability(request)
elif request_type == ConsumerRight.RIGHT_TO_OPT_OUT_SALE:
result = self._handle_opt_out_sale(request)
else:
raise ValueError(f"Unknown request type: {request_type}")
# Update request
self.db.execute("""
UPDATE ccpa_requests
SET status = 'completed', completed_at = %s, response_data = %s
WHERE request_id = %s
""", (datetime.utcnow(), json.dumps(result), request_id))
return resultRight to Know Implementation
def _handle_right_to_know(self, request: Dict) -> Dict:
"""Handle Right to Know request - disclose personal information"""
consumer_id = request["consumer_id"]
email = request["email"]
disclosure = {
"request_id": request["request_id"],
"generated_at": datetime.utcnow().isoformat(),
"categories_collected": [],
"specific_pieces": {},
"sources": [],
"purposes": [],
"third_parties": []
}
if consumer_id:
# Collect data from all tables
user_data = self.db.fetchone(
"SELECT * FROM users WHERE id = %s", (consumer_id,)
)
if user_data:
disclosure["specific_pieces"]["account"] = {
"name": user_data.get("name"),
"email": user_data.get("email"),
"phone": user_data.get("phone"),
"address": user_data.get("address"),
"created_at": user_data.get("created_at").isoformat() if user_data.get("created_at") else None
}
disclosure["categories_collected"].append("Identifiers")
disclosure["sources"].append("Directly from consumer")
# Orders
orders = self.db.fetchall(
"SELECT * FROM orders WHERE user_id = %s", (consumer_id,)
)
if orders:
disclosure["specific_pieces"]["orders"] = [
{
"order_id": o["id"],
"date": o["created_at"].isoformat(),
"total": float(o["total"]),
"items_count": o["items_count"]
}
for o in orders
]
disclosure["categories_collected"].append("Commercial Information")
# Activity logs
activity = self.db.fetchall("""
SELECT event_type, page_url, created_at
FROM activity_logs
WHERE user_id = %s
ORDER BY created_at DESC
LIMIT 100
""", (consumer_id,))
if activity:
disclosure["specific_pieces"]["internet_activity"] = [
{
"event": a["event_type"],
"page": a["page_url"],
"timestamp": a["created_at"].isoformat()
}
for a in activity
]
disclosure["categories_collected"].append("Internet Activity")
# Add sources and purposes from inventory
disclosure["sources"] = list(set(
source
for cat in self.data_inventory
for source in cat.sources
))
disclosure["purposes"] = list(set(
purpose
for cat in self.data_inventory
for purpose in cat.purposes
))
disclosure["third_parties"] = list(set(
party
for cat in self.data_inventory
for party in cat.third_parties_shared
))
return disclosureRight to Delete Implementation
def _handle_right_to_delete(self, request: Dict) -> Dict:
"""Handle Right to Delete request"""
consumer_id = request["consumer_id"]
email = request["email"]
deletion_log = {
"request_id": request["request_id"],
"deleted_at": datetime.utcnow().isoformat(),
"data_deleted": [],
"data_retained": [],
"service_providers_notified": []
}
if consumer_id:
# Delete from each table (or anonymize where deletion isn't possible)
# User profile - anonymize
self.db.execute("""
UPDATE users SET
name = 'Deleted User',
email = %s,
phone = NULL,
address = NULL,
deleted_at = %s
WHERE id = %s
""", (
f"deleted-{consumer_id}@deleted.example.com",
datetime.utcnow(),
consumer_id
))
deletion_log["data_deleted"].append("User profile (anonymized)")
# Activity logs - delete
self.db.execute(
"DELETE FROM activity_logs WHERE user_id = %s",
(consumer_id,)
)
deletion_log["data_deleted"].append("Activity logs")
# Marketing preferences - delete
self.db.execute(
"DELETE FROM marketing_preferences WHERE user_id = %s",
(consumer_id,)
)
deletion_log["data_deleted"].append("Marketing preferences")
# Orders - retain for legal compliance but anonymize PII
self.db.execute("""
UPDATE orders SET
shipping_name = 'Deleted',
shipping_address = 'Deleted',
billing_name = 'Deleted',
billing_address = 'Deleted'
WHERE user_id = %s
""", (consumer_id,))
deletion_log["data_retained"].append({
"category": "Order records",
"reason": "Legal/tax compliance - 7 year retention required",
"anonymized": True
})
# Notify service providers
service_providers = [
{"name": "Analytics Provider", "notified": True},
{"name": "Email Service", "notified": True},
{"name": "Payment Processor", "notified": True}
]
deletion_log["service_providers_notified"] = service_providers
return deletion_logDo Not Sell/Share Implementation
Opt-Out Mechanism
# opt_out_handler.py
class OptOutHandler:
"""Handle Do Not Sell/Share opt-outs"""
def __init__(self, db):
self.db = db
def process_opt_out(
self,
identifier: str,
identifier_type: str = "email",
opt_out_sale: bool = True,
opt_out_sharing: bool = True
) -> Dict:
"""Process opt-out request"""
# Record opt-out preference
self.db.execute("""
INSERT INTO opt_out_preferences
(identifier, identifier_type, opt_out_sale, opt_out_sharing, recorded_at)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (identifier, identifier_type)
DO UPDATE SET
opt_out_sale = EXCLUDED.opt_out_sale,
opt_out_sharing = EXCLUDED.opt_out_sharing,
recorded_at = EXCLUDED.recorded_at
""", (
identifier,
identifier_type,
opt_out_sale,
opt_out_sharing,
datetime.utcnow()
))
# Apply to known user if exists
if identifier_type == "email":
user = self.db.fetchone(
"SELECT id FROM users WHERE email = %s",
(identifier,)
)
if user:
self.db.execute("""
UPDATE users SET
opt_out_sale = %s,
opt_out_sharing = %s
WHERE id = %s
""", (opt_out_sale, opt_out_sharing, user["id"]))
# Notify ad partners to stop data sharing
if opt_out_sharing:
self._notify_ad_partners(identifier)
return {
"status": "success",
"identifier": identifier,
"opt_out_sale": opt_out_sale,
"opt_out_sharing": opt_out_sharing,
"effective_date": datetime.utcnow().isoformat()
}
def check_opt_out_status(self, identifier: str) -> Dict:
"""Check current opt-out status"""
record = self.db.fetchone("""
SELECT opt_out_sale, opt_out_sharing, recorded_at
FROM opt_out_preferences
WHERE identifier = %s
""", (identifier,))
if record:
return {
"opted_out_sale": record["opt_out_sale"],
"opted_out_sharing": record["opt_out_sharing"],
"since": record["recorded_at"].isoformat()
}
return {
"opted_out_sale": False,
"opted_out_sharing": False,
"since": None
}
def _notify_ad_partners(self, identifier: str):
"""Notify advertising partners of opt-out"""
# Implementation depends on ad partner APIs
pass
def generate_gpc_response_headers(self, gpc_signal: bool) -> Dict:
"""Generate response headers for Global Privacy Control"""
if gpc_signal:
return {
"Sec-GPC": "1",
"X-GPC-Acknowledged": "true"
}
return {}GPC (Global Privacy Control) Handler
# gpc_handler.py
from typing import Optional
class GPCHandler:
"""Handle Global Privacy Control signals"""
def __init__(self, opt_out_handler: OptOutHandler):
self.opt_out = opt_out_handler
def process_request(
self,
request_headers: Dict,
user_identifier: Optional[str] = None,
ip_address: str = None
) -> Dict:
"""Process incoming request with GPC signal"""
gpc_signal = request_headers.get("Sec-GPC") == "1"
result = {
"gpc_detected": gpc_signal,
"action_taken": None
}
if gpc_signal:
# Determine identifier
identifier = user_identifier or ip_address
if identifier:
# Auto-apply opt-out for GPC
self.opt_out.process_opt_out(
identifier=identifier,
identifier_type="email" if user_identifier else "ip",
opt_out_sale=True,
opt_out_sharing=True
)
result["action_taken"] = "opt_out_applied"
result["identifier_used"] = "user_id" if user_identifier else "ip_address"
# Disable third-party tracking for this request
result["tracking_disabled"] = True
result["response_headers"] = {
"X-GPC-Acknowledged": "true"
}
return resultSensitive Personal Information Handling
Limit Use of Sensitive PI
# sensitive_pi_handler.py
class SensitivePIHandler:
"""Handle sensitive personal information per CPRA"""
SENSITIVE_CATEGORIES = [
"social_security_number",
"drivers_license",
"passport_number",
"financial_account",
"precise_geolocation",
"racial_ethnic_origin",
"religious_beliefs",
"union_membership",
"genetic_data",
"biometric_data",
"health_data",
"sex_life_orientation"
]
def __init__(self, db):
self.db = db
def check_limit_preference(self, user_id: str) -> bool:
"""Check if user has limited sensitive PI use"""
pref = self.db.fetchone("""
SELECT limit_sensitive_pi FROM user_privacy_preferences
WHERE user_id = %s
""", (user_id,))
return pref["limit_sensitive_pi"] if pref else False
def set_limit_preference(self, user_id: str, limit: bool = True) -> Dict:
"""Set preference to limit sensitive PI use"""
self.db.execute("""
INSERT INTO user_privacy_preferences (user_id, limit_sensitive_pi, updated_at)
VALUES (%s, %s, %s)
ON CONFLICT (user_id)
DO UPDATE SET limit_sensitive_pi = EXCLUDED.limit_sensitive_pi,
updated_at = EXCLUDED.updated_at
""", (user_id, limit, datetime.utcnow()))
if limit:
# Apply restrictions
self._apply_sensitive_restrictions(user_id)
return {
"user_id": user_id,
"limit_sensitive_pi": limit,
"effective_at": datetime.utcnow().isoformat()
}
def _apply_sensitive_restrictions(self, user_id: str):
"""Apply restrictions on sensitive PI use"""
# Stop precise geolocation collection
self.db.execute("""
UPDATE user_settings
SET collect_precise_location = false
WHERE user_id = %s
""", (user_id,))
# Disable sensitive data for profiling
self.db.execute("""
UPDATE user_profiles
SET exclude_from_profiling = true
WHERE user_id = %s
""", (user_id,))
def filter_sensitive_data(self, data: Dict, user_id: str) -> Dict:
"""Filter sensitive data based on user preferences"""
if not self.check_limit_preference(user_id):
return data
# Remove sensitive fields
filtered = data.copy()
for category in self.SENSITIVE_CATEGORIES:
if category in filtered:
del filtered[category]
return filteredPrivacy Notice Components
Do Not Sell Link Component
// DoNotSellButton.tsx
import React, { useState, useEffect } from 'react';
interface OptOutStatus {
opted_out_sale: boolean;
opted_out_sharing: boolean;
since: string | null;
}
export function DoNotSellButton() {
const [status, setStatus] = useState<OptOutStatus | null>(null);
const [loading, setLoading] = useState(false);
useEffect(() => {
// Check current status
fetch('/api/privacy/opt-out-status')
.then(res => res.json())
.then(setStatus);
}, []);
const handleOptOut = async () => {
setLoading(true);
try {
const response = await fetch('/api/privacy/opt-out', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
opt_out_sale: true,
opt_out_sharing: true
})
});
if (response.ok) {
setStatus({
opted_out_sale: true,
opted_out_sharing: true,
since: new Date().toISOString()
});
}
} finally {
setLoading(false);
}
};
if (status?.opted_out_sale) {
return (
<div className="opt-out-status">
<span className="checkmark">✓</span>
You have opted out of the sale/sharing of your personal information.
<br />
<small>Since: {new Date(status.since!).toLocaleDateString()}</small>
</div>
);
}
return (
<button
onClick={handleOptOut}
disabled={loading}
className="do-not-sell-button"
>
{loading ? 'Processing...' : 'Do Not Sell or Share My Personal Information'}
</button>
);
}Summary
CCPA/CPRA compliance requires:
- Data inventory: Know what personal information you collect
- Consumer rights: Implement know, delete, correct, opt-out mechanisms
- Verification: Verify consumer identity before processing requests
- Timelines: Respond within 45 days (extendable to 90)
- Do Not Sell/Share: Prominent opt-out mechanism
- Sensitive PI: Additional controls for sensitive categories
- GPC support: Honor Global Privacy Control signals
Build these technical implementations into your privacy program, and conduct regular audits to maintain compliance as regulations evolve.
Is your AI system compliant with the EU AI Act? Free risk assessment - find out in 2 minutes →