California's privacy laws (CCPA/CPRA) require technical implementations for consumer rights, data handling, and privacy notices. This guide provides practical code for achieving and maintaining compliance.
CCPA/CPRA Requirements Overview
Compliance Framework
# ccpa_compliance_framework.py
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
from datetime import datetime
class ConsumerRight(Enum):
RIGHT_TO_KNOW = "right_to_know"
RIGHT_TO_DELETE = "right_to_delete"
RIGHT_TO_CORRECT = "right_to_correct" # CPRA addition
RIGHT_TO_PORTABILITY = "right_to_portability"
RIGHT_TO_OPT_OUT_SALE = "right_to_opt_out_sale"
RIGHT_TO_OPT_OUT_SHARING = "right_to_opt_out_sharing" # CPRA
RIGHT_TO_LIMIT_SENSITIVE = "right_to_limit_sensitive" # CPRA
class DataCategory(Enum):
IDENTIFIERS = "identifiers"
COMMERCIAL = "commercial_information"
BIOMETRIC = "biometric_information"
INTERNET_ACTIVITY = "internet_activity"
GEOLOCATION = "geolocation_data"
AUDIO_VISUAL = "audio_visual"
PROFESSIONAL = "professional_information"
EDUCATION = "education_information"
INFERENCES = "inferences"
SENSITIVE = "sensitive_personal_information" # CPRA
@dataclass
class PersonalInfoCategory:
category: DataCategory
examples: List[str]
sources: List[str]
purposes: List[str]
third_parties_shared: List[str]
retention_period: str
is_sold: bool
is_shared: bool # CPRA
@dataclass
class ConsumerRequest:
request_id: str
consumer_id: str
request_type: ConsumerRight
submitted_at: datetime
verified_at: Optional[datetime]
completed_at: Optional[datetime]
status: str
verification_method: str
response_data: Optional[Dict] = None
class CCPAComplianceManager:
"""Manage CCPA/CPRA compliance operations"""
def __init__(self, db_connection):
self.db = db_connection
self.data_inventory = self._build_data_inventory()
def _build_data_inventory(self) -> List[PersonalInfoCategory]:
"""Build inventory of personal information collected"""
return [
PersonalInfoCategory(
category=DataCategory.IDENTIFIERS,
examples=["Name", "Email", "IP address", "Account ID"],
sources=["Directly from consumer", "Automatic collection"],
purposes=["Provide services", "Customer support", "Security"],
third_parties_shared=["Service providers", "Analytics"],
retention_period="Duration of account + 3 years",
is_sold=False,
is_shared=True
),
PersonalInfoCategory(
category=DataCategory.COMMERCIAL,
examples=["Purchase history", "Products viewed", "Payment info"],
sources=["Directly from consumer", "Transaction records"],
purposes=["Order fulfillment", "Recommendations"],
third_parties_shared=["Payment processors", "Shipping"],
retention_period="7 years (financial records)",
is_sold=False,
is_shared=False
),
PersonalInfoCategory(
category=DataCategory.INTERNET_ACTIVITY,
examples=["Browsing history", "Search queries", "Clicks"],
sources=["Automatic collection via cookies"],
purposes=["Site improvement", "Personalization"],
third_parties_shared=["Analytics providers", "Ad networks"],
retention_period="2 years",
is_sold=False,
is_shared=True
),
PersonalInfoCategory(
category=DataCategory.GEOLOCATION,
examples=["IP-based location", "GPS coordinates"],
sources=["Automatic collection"],
purposes=["Fraud prevention", "Localized content"],
third_parties_shared=["Fraud prevention services"],
retention_period="90 days",
is_sold=False,
is_shared=False
),
PersonalInfoCategory(
category=DataCategory.SENSITIVE,
examples=["Social Security number", "Financial account", "Precise geolocation"],
sources=["Directly from consumer"],
purposes=["Identity verification", "Financial services"],
third_parties_shared=["Verification services"],
retention_period="As required by law",
is_sold=False,
is_shared=False
)
]
def generate_privacy_notice(self) -> Dict:
"""Generate CCPA-compliant privacy notice content"""
return {
"last_updated": datetime.utcnow().isoformat(),
"categories_collected": [
{
"category": cat.category.value,
"examples": cat.examples,
"sources": cat.sources,
"purposes": cat.purposes,
"shared_with": cat.third_parties_shared,
"sold": cat.is_sold,
"retention": cat.retention_period
}
for cat in self.data_inventory
],
"consumer_rights": [
{
"right": "Right to Know",
"description": "Request disclosure of personal information collected"
},
{
"right": "Right to Delete",
"description": "Request deletion of personal information"
},
{
"right": "Right to Correct",
"description": "Request correction of inaccurate information"
},
{
"right": "Right to Opt-Out of Sale/Sharing",
"description": "Direct us not to sell or share your personal information"
},
{
"right": "Right to Limit Sensitive PI",
"description": "Limit use of sensitive personal information"
},
{
"right": "Right to Non-Discrimination",
"description": "Exercise rights without discriminatory treatment"
}
],
"contact": {
"email": "privacy@example.com",
"phone": "1-800-PRIVACY",
"form_url": "/privacy/request"
}
}Consumer Rights Request Handling
Request Verification
# consumer_request_handler.py
from datetime import datetime, timedelta
import secrets
import hashlib
from typing import Optional, Dict
class ConsumerRequestHandler:
"""Handle consumer rights requests per CCPA/CPRA"""
VERIFICATION_METHODS = ["email", "sms", "knowledge_based", "authorized_agent"]
def __init__(self, db, email_service, sms_service):
self.db = db
self.email = email_service
self.sms = sms_service
def submit_request(
self,
email: str,
request_type: ConsumerRight,
details: Dict = None
) -> str:
"""Submit new consumer request"""
# Generate request ID
request_id = f"CCPA-{datetime.utcnow().strftime('%Y%m%d')}-{secrets.token_hex(4).upper()}"
# Look up consumer
consumer = self.db.fetchone(
"SELECT id, email, phone FROM users WHERE email = %s",
(email,)
)
if not consumer:
# Still create request for non-customers
consumer = {"id": None, "email": email, "phone": None}
# Create request record
self.db.execute("""
INSERT INTO ccpa_requests
(request_id, consumer_id, email, request_type, status, submitted_at, details)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", (
request_id,
consumer.get("id"),
email,
request_type.value,
"pending_verification",
datetime.utcnow(),
json.dumps(details or {})
))
# Initiate verification
self._send_verification(request_id, email, consumer.get("phone"))
return request_id
def _send_verification(
self,
request_id: str,
email: str,
phone: Optional[str]
):
"""Send verification to consumer"""
# Generate verification code
code = secrets.token_urlsafe(32)
code_hash = hashlib.sha256(code.encode()).hexdigest()
# Store verification
self.db.execute("""
INSERT INTO request_verifications
(request_id, code_hash, method, expires_at)
VALUES (%s, %s, %s, %s)
""", (
request_id,
code_hash,
"email",
datetime.utcnow() + timedelta(hours=24)
))
# Send email
verification_url = f"https://example.com/privacy/verify?request={request_id}&code={code}"
self.email.send(
to=email,
subject="Verify Your Privacy Request",
template="ccpa_verification",
data={
"request_id": request_id,
"verification_url": verification_url,
"expires_in": "24 hours"
}
)
def verify_request(self, request_id: str, code: str) -> bool:
"""Verify consumer request"""
code_hash = hashlib.sha256(code.encode()).hexdigest()
verification = self.db.fetchone("""
SELECT * FROM request_verifications
WHERE request_id = %s AND code_hash = %s AND expires_at > %s
""", (request_id, code_hash, datetime.utcnow()))
if not verification:
return False
# Update request status
self.db.execute("""
UPDATE ccpa_requests
SET status = 'verified', verified_at = %s
WHERE request_id = %s
""", (datetime.utcnow(), request_id))
# Mark verification as used
self.db.execute("""
DELETE FROM request_verifications WHERE request_id = %s
""", (request_id,))
return True
def process_request(self, request_id: str) -> Dict:
"""Process verified consumer request"""
request = self.db.fetchone(
"SELECT * FROM ccpa_requests WHERE request_id = %s",
(request_id,)
)
if not request or request["status"] != "verified":
raise ValueError("Request not found or not verified")
request_type = ConsumerRight(request["request_type"])
if request_type == ConsumerRight.RIGHT_TO_KNOW:
result = self._handle_right_to_know(request)
elif request_type == ConsumerRight.RIGHT_TO_DELETE:
result = self._handle_right_to_delete(request)
elif request_type == ConsumerRight.RIGHT_TO_CORRECT:
result = self._handle_right_to_correct(request)
elif request_type == ConsumerRight.RIGHT_TO_PORTABILITY:
result = self._handle_right_to_portability(request)
elif request_type == ConsumerRight.RIGHT_TO_OPT_OUT_SALE:
result = self._handle_opt_out_sale(request)
else:
raise ValueError(f"Unknown request type: {request_type}")
# Update request
self.db.execute("""
UPDATE ccpa_requests
SET status = 'completed', completed_at = %s, response_data = %s
WHERE request_id = %s
""", (datetime.utcnow(), json.dumps(result), request_id))
return resultRight to Know Implementation
def _handle_right_to_know(self, request: Dict) -> Dict:
"""Handle Right to Know request - disclose personal information"""
consumer_id = request["consumer_id"]
email = request["email"]
disclosure = {
"request_id": request["request_id"],
"generated_at": datetime.utcnow().isoformat(),
"categories_collected": [],
"specific_pieces": {},
"sources": [],
"purposes": [],
"third_parties": []
}
if consumer_id:
# Collect data from all tables
user_data = self.db.fetchone(
"SELECT * FROM users WHERE id = %s", (consumer_id,)
)
if user_data:
disclosure["specific_pieces"]["account"] = {
"name": user_data.get("name"),
"email": user_data.get("email"),
"phone": user_data.get("phone"),
"address": user_data.get("address"),
"created_at": user_data.get("created_at").isoformat() if user_data.get("created_at") else None
}
disclosure["categories_collected"].append("Identifiers")
disclosure["sources"].append("Directly from consumer")
# Orders
orders = self.db.fetchall(
"SELECT * FROM orders WHERE user_id = %s", (consumer_id,)
)
if orders:
disclosure["specific_pieces"]["orders"] = [
{
"order_id": o["id"],
"date": o["created_at"].isoformat(),
"total": float(o["total"]),
"items_count": o["items_count"]
}
for o in orders
]
disclosure["categories_collected"].append("Commercial Information")
# Activity logs
activity = self.db.fetchall("""
SELECT event_type, page_url, created_at
FROM activity_logs
WHERE user_id = %s
ORDER BY created_at DESC
LIMIT 100
""", (consumer_id,))
if activity:
disclosure["specific_pieces"]["internet_activity"] = [
{
"event": a["event_type"],
"page": a["page_url"],
"timestamp": a["created_at"].isoformat()
}
for a in activity
]
disclosure["categories_collected"].append("Internet Activity")
# Add sources and purposes from inventory
disclosure["sources"] = list(set(
source
for cat in self.data_inventory
for source in cat.sources
))
disclosure["purposes"] = list(set(
purpose
for cat in self.data_inventory
for purpose in cat.purposes
))
disclosure["third_parties"] = list(set(
party
for cat in self.data_inventory
for party in cat.third_parties_shared
))
return disclosureRight to Delete Implementation
def _handle_right_to_delete(self, request: Dict) -> Dict:
"""Handle Right to Delete request"""
consumer_id = request["consumer_id"]
email = request["email"]
deletion_log = {
"request_id": request["request_id"],
"deleted_at": datetime.utcnow().isoformat(),
"data_deleted": [],
"data_retained": [],
"service_providers_notified": []
}
if consumer_id:
# Delete from each table (or anonymize where deletion isn't possible)
# User profile - anonymize
self.db.execute("""
UPDATE users SET
name = 'Deleted User',
email = %s,
phone = NULL,
address = NULL,
deleted_at = %s
WHERE id = %s
""", (
f"deleted-{consumer_id}@deleted.example.com",
datetime.utcnow(),
consumer_id
))
deletion_log["data_deleted"].append("User profile (anonymized)")
# Activity logs - delete
self.db.execute(
"DELETE FROM activity_logs WHERE user_id = %s",
(consumer_id,)
)
deletion_log["data_deleted"].append("Activity logs")
# Marketing preferences - delete
self.db.execute(
"DELETE FROM marketing_preferences WHERE user_id = %s",
(consumer_id,)
)
deletion_log["data_deleted"].append("Marketing preferences")
# Orders - retain for legal compliance but anonymize PII
self.db.execute("""
UPDATE orders SET
shipping_name = 'Deleted',
shipping_address = 'Deleted',
billing_name = 'Deleted',
billing_address = 'Deleted'
WHERE user_id = %s
""", (consumer_id,))
deletion_log["data_retained"].append({
"category": "Order records",
"reason": "Legal/tax compliance - 7 year retention required",
"anonymized": True
})
# Notify service providers
service_providers = [
{"name": "Analytics Provider", "notified": True},
{"name": "Email Service", "notified": True},
{"name": "Payment Processor", "notified": True}
]
deletion_log["service_providers_notified"] = service_providers
return deletion_logDo Not Sell/Share Implementation
Opt-Out Mechanism
# opt_out_handler.py
class OptOutHandler:
"""Handle Do Not Sell/Share opt-outs"""
def __init__(self, db):
self.db = db
def process_opt_out(
self,
identifier: str,
identifier_type: str = "email",
opt_out_sale: bool = True,
opt_out_sharing: bool = True
) -> Dict:
"""Process opt-out request"""
# Record opt-out preference
self.db.execute("""
INSERT INTO opt_out_preferences
(identifier, identifier_type, opt_out_sale, opt_out_sharing, recorded_at)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (identifier, identifier_type)
DO UPDATE SET
opt_out_sale = EXCLUDED.opt_out_sale,
opt_out_sharing = EXCLUDED.opt_out_sharing,
recorded_at = EXCLUDED.recorded_at
""", (
identifier,
identifier_type,
opt_out_sale,
opt_out_sharing,
datetime.utcnow()
))
# Apply to known user if exists
if identifier_type == "email":
user = self.db.fetchone(
"SELECT id FROM users WHERE email = %s",
(identifier,)
)
if user:
self.db.execute("""
UPDATE users SET
opt_out_sale = %s,
opt_out_sharing = %s
WHERE id = %s
""", (opt_out_sale, opt_out_sharing, user["id"]))
# Notify ad partners to stop data sharing
if opt_out_sharing:
self._notify_ad_partners(identifier)
return {
"status": "success",
"identifier": identifier,
"opt_out_sale": opt_out_sale,
"opt_out_sharing": opt_out_sharing,
"effective_date": datetime.utcnow().isoformat()
}
def check_opt_out_status(self, identifier: str) -> Dict:
"""Check current opt-out status"""
record = self.db.fetchone("""
SELECT opt_out_sale, opt_out_sharing, recorded_at
FROM opt_out_preferences
WHERE identifier = %s
""", (identifier,))
if record:
return {
"opted_out_sale": record["opt_out_sale"],
"opted_out_sharing": record["opt_out_sharing"],
"since": record["recorded_at"].isoformat()
}
return {
"opted_out_sale": False,
"opted_out_sharing": False,
"since": None
}
def _notify_ad_partners(self, identifier: str):
"""Notify advertising partners of opt-out"""
# Implementation depends on ad partner APIs
pass
def generate_gpc_response_headers(self, gpc_signal: bool) -> Dict:
"""Generate response headers for Global Privacy Control"""
if gpc_signal:
return {
"Sec-GPC": "1",
"X-GPC-Acknowledged": "true"
}
return {}GPC (Global Privacy Control) Handler
# gpc_handler.py
from typing import Optional
class GPCHandler:
"""Handle Global Privacy Control signals"""
def __init__(self, opt_out_handler: OptOutHandler):
self.opt_out = opt_out_handler
def process_request(
self,
request_headers: Dict,
user_identifier: Optional[str] = None,
ip_address: str = None
) -> Dict:
"""Process incoming request with GPC signal"""
gpc_signal = request_headers.get("Sec-GPC") == "1"
result = {
"gpc_detected": gpc_signal,
"action_taken": None
}
if gpc_signal:
# Determine identifier
identifier = user_identifier or ip_address
if identifier:
# Auto-apply opt-out for GPC
self.opt_out.process_opt_out(
identifier=identifier,
identifier_type="email" if user_identifier else "ip",
opt_out_sale=True,
opt_out_sharing=True
)
result["action_taken"] = "opt_out_applied"
result["identifier_used"] = "user_id" if user_identifier else "ip_address"
# Disable third-party tracking for this request
result["tracking_disabled"] = True
result["response_headers"] = {
"X-GPC-Acknowledged": "true"
}
return resultSensitive Personal Information Handling
Limit Use of Sensitive PI
# sensitive_pi_handler.py
class SensitivePIHandler:
"""Handle sensitive personal information per CPRA"""
SENSITIVE_CATEGORIES = [
"social_security_number",
"drivers_license",
"passport_number",
"financial_account",
"precise_geolocation",
"racial_ethnic_origin",
"religious_beliefs",
"union_membership",
"genetic_data",
"biometric_data",
"health_data",
"sex_life_orientation"
]
def __init__(self, db):
self.db = db
def check_limit_preference(self, user_id: str) -> bool:
"""Check if user has limited sensitive PI use"""
pref = self.db.fetchone("""
SELECT limit_sensitive_pi FROM user_privacy_preferences
WHERE user_id = %s
""", (user_id,))
return pref["limit_sensitive_pi"] if pref else False
def set_limit_preference(self, user_id: str, limit: bool = True) -> Dict:
"""Set preference to limit sensitive PI use"""
self.db.execute("""
INSERT INTO user_privacy_preferences (user_id, limit_sensitive_pi, updated_at)
VALUES (%s, %s, %s)
ON CONFLICT (user_id)
DO UPDATE SET limit_sensitive_pi = EXCLUDED.limit_sensitive_pi,
updated_at = EXCLUDED.updated_at
""", (user_id, limit, datetime.utcnow()))
if limit:
# Apply restrictions
self._apply_sensitive_restrictions(user_id)
return {
"user_id": user_id,
"limit_sensitive_pi": limit,
"effective_at": datetime.utcnow().isoformat()
}
def _apply_sensitive_restrictions(self, user_id: str):
"""Apply restrictions on sensitive PI use"""
# Stop precise geolocation collection
self.db.execute("""
UPDATE user_settings
SET collect_precise_location = false
WHERE user_id = %s
""", (user_id,))
# Disable sensitive data for profiling
self.db.execute("""
UPDATE user_profiles
SET exclude_from_profiling = true
WHERE user_id = %s
""", (user_id,))
def filter_sensitive_data(self, data: Dict, user_id: str) -> Dict:
"""Filter sensitive data based on user preferences"""
if not self.check_limit_preference(user_id):
return data
# Remove sensitive fields
filtered = data.copy()
for category in self.SENSITIVE_CATEGORIES:
if category in filtered:
del filtered[category]
return filteredPrivacy Notice Components
Do Not Sell Link Component
// DoNotSellButton.tsx
import React, { useState, useEffect } from 'react';
interface OptOutStatus {
opted_out_sale: boolean;
opted_out_sharing: boolean;
since: string | null;
}
export function DoNotSellButton() {
const [status, setStatus] = useState<OptOutStatus | null>(null);
const [loading, setLoading] = useState(false);
useEffect(() => {
// Check current status
fetch('/api/privacy/opt-out-status')
.then(res => res.json())
.then(setStatus);
}, []);
const handleOptOut = async () => {
setLoading(true);
try {
const response = await fetch('/api/privacy/opt-out', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
opt_out_sale: true,
opt_out_sharing: true
})
});
if (response.ok) {
setStatus({
opted_out_sale: true,
opted_out_sharing: true,
since: new Date().toISOString()
});
}
} finally {
setLoading(false);
}
};
if (status?.opted_out_sale) {
return (
<div className="opt-out-status">
<span className="checkmark">✓</span>
You have opted out of the sale/sharing of your personal information.
<br />
<small>Since: {new Date(status.since!).toLocaleDateString()}</small>
</div>
);
}
return (
<button
onClick={handleOptOut}
disabled={loading}
className="do-not-sell-button"
>
{loading ? 'Processing...' : 'Do Not Sell or Share My Personal Information'}
</button>
);
}Summary
CCPA/CPRA compliance requires:
- Data inventory: Know what personal information you collect
- Consumer rights: Implement know, delete, correct, opt-out mechanisms
- Verification: Verify consumer identity before processing requests
- Timelines: Respond within 45 days (extendable to 90)
- Do Not Sell/Share: Prominent opt-out mechanism
- Sensitive PI: Additional controls for sensitive categories
- GPC support: Honor Global Privacy Control signals
Build these technical implementations into your privacy program, and conduct regular audits to maintain compliance as regulations evolve.