FedRAMP Compliance for Cloud Services: Technical Implementation Guide
The Federal Risk and Authorization Management Program (FedRAMP) provides a standardized approach to security assessment for cloud services used by federal agencies. This guide covers technical implementation of FedRAMP controls and the authorization process.
Understanding FedRAMP
FedRAMP builds on NIST SP 800-53 controls with three impact levels:
- Low Impact: 125 controls for systems with low confidentiality, integrity, and availability requirements
- Moderate Impact: 325 controls for systems where loss would have serious adverse effects
- High Impact: 421 controls for systems where loss would have severe or catastrophic effects
Security Control Implementation
Access Control (AC Family)
# fedramp_access_control.py
"""
FedRAMP Access Control (AC) family implementation.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Set
from datetime import datetime, timedelta
from enum import Enum
import hashlib
import secrets
import json
class ClearanceLevel(Enum):
"""Security clearance levels."""
PUBLIC = 0
CONTROLLED = 1
SECRET = 2
TOP_SECRET = 3
class AccessDecision(Enum):
"""Access control decision."""
ALLOW = "allow"
DENY = "deny"
REQUIRE_MFA = "require_mfa"
REQUIRE_APPROVAL = "require_approval"
@dataclass
class User:
"""User account with FedRAMP attributes."""
user_id: str
username: str
email: str
clearance_level: ClearanceLevel
roles: Set[str]
organization: str
last_login: Optional[datetime]
account_created: datetime
account_expires: Optional[datetime]
mfa_enabled: bool
mfa_methods: List[str]
training_completed: Dict[str, datetime]
access_agreements: Dict[str, datetime]
login_failures: int = 0
locked_until: Optional[datetime] = None
@dataclass
class Resource:
"""Protected resource."""
resource_id: str
resource_type: str
classification: ClearanceLevel
required_roles: Set[str]
required_training: List[str]
requires_mfa: bool
audit_all_access: bool
class FedRAMPAccessControl:
"""
Implements FedRAMP Access Control family.
Controls implemented:
- AC-1: Policy and Procedures
- AC-2: Account Management
- AC-3: Access Enforcement
- AC-4: Information Flow Enforcement
- AC-5: Separation of Duties
- AC-6: Least Privilege
- AC-7: Unsuccessful Login Attempts
- AC-8: System Use Notification
- AC-11: Session Lock
- AC-12: Session Termination
- AC-17: Remote Access
"""
# AC-7: Unsuccessful login attempt limits
MAX_LOGIN_FAILURES = 3
LOCKOUT_DURATION_MINUTES = 30
# AC-11: Session lock timeout
SESSION_LOCK_TIMEOUT = timedelta(minutes=15)
# AC-12: Session termination
MAX_SESSION_DURATION = timedelta(hours=8)
def __init__(self, audit_logger):
self.users: Dict[str, User] = {}
self.resources: Dict[str, Resource] = {}
self.sessions: Dict[str, dict] = {}
self.audit = audit_logger
# AC-5: Separation of duties matrix
self.incompatible_roles: List[tuple] = [
("system_admin", "security_auditor"),
("developer", "production_deployer"),
("data_owner", "data_custodian"),
]
def create_account(
self,
user_data: dict,
created_by: str,
justification: str
) -> User:
"""
AC-2: Account Management - Create account with required attributes.
"""
# Validate required fields
required = ['username', 'email', 'clearance_level', 'roles', 'organization']
for field in required:
if field not in user_data:
raise ValueError(f"Missing required field: {field}")
# AC-5: Check for incompatible role combinations
roles = set(user_data['roles'])
for role1, role2 in self.incompatible_roles:
if role1 in roles and role2 in roles:
raise ValueError(f"Incompatible roles: {role1} and {role2}")
user = User(
user_id=f"USR-{secrets.token_hex(8)}",
username=user_data['username'],
email=user_data['email'],
clearance_level=ClearanceLevel[user_data['clearance_level']],
roles=roles,
organization=user_data['organization'],
last_login=None,
account_created=datetime.utcnow(),
account_expires=datetime.utcnow() + timedelta(days=365),
mfa_enabled=False,
mfa_methods=[],
training_completed={},
access_agreements={}
)
self.users[user.user_id] = user
# Audit logging
self.audit.log({
'event_type': 'account_created',
'user_id': user.user_id,
'created_by': created_by,
'justification': justification,
'timestamp': datetime.utcnow().isoformat()
})
return user
def check_access(
self,
user_id: str,
resource_id: str,
action: str,
context: dict
) -> AccessDecision:
"""
AC-3: Access Enforcement - Enforce access policy.
"""
user = self.users.get(user_id)
resource = self.resources.get(resource_id)
if not user or not resource:
self._audit_access_denied(user_id, resource_id, "invalid_user_or_resource")
return AccessDecision.DENY
# Check account status
if user.locked_until and user.locked_until > datetime.utcnow():
self._audit_access_denied(user_id, resource_id, "account_locked")
return AccessDecision.DENY
if user.account_expires and user.account_expires < datetime.utcnow():
self._audit_access_denied(user_id, resource_id, "account_expired")
return AccessDecision.DENY
# AC-6: Least privilege - check clearance level
if user.clearance_level.value < resource.classification.value:
self._audit_access_denied(user_id, resource_id, "insufficient_clearance")
return AccessDecision.DENY
# Check role requirements
if not user.roles.intersection(resource.required_roles):
self._audit_access_denied(user_id, resource_id, "missing_role")
return AccessDecision.DENY
# Check training requirements
for training in resource.required_training:
if training not in user.training_completed:
self._audit_access_denied(user_id, resource_id, f"missing_training:{training}")
return AccessDecision.DENY
# Check training currency (annual recertification)
training_date = user.training_completed[training]
if datetime.utcnow() - training_date > timedelta(days=365):
self._audit_access_denied(user_id, resource_id, f"expired_training:{training}")
return AccessDecision.DENY
# AC-17: Remote access - require MFA
if context.get('remote_access', False) or resource.requires_mfa:
if not user.mfa_enabled:
return AccessDecision.REQUIRE_MFA
# Access granted
self._audit_access_granted(user_id, resource_id, action)
return AccessDecision.ALLOW
def record_login_failure(self, user_id: str, source_ip: str):
"""
AC-7: Unsuccessful Login Attempts.
"""
user = self.users.get(user_id)
if not user:
return
user.login_failures += 1
self.audit.log({
'event_type': 'login_failure',
'user_id': user_id,
'source_ip': source_ip,
'failure_count': user.login_failures,
'timestamp': datetime.utcnow().isoformat()
})
if user.login_failures >= self.MAX_LOGIN_FAILURES:
user.locked_until = datetime.utcnow() + timedelta(
minutes=self.LOCKOUT_DURATION_MINUTES
)
self.audit.log({
'event_type': 'account_locked',
'user_id': user_id,
'reason': 'max_login_failures',
'locked_until': user.locked_until.isoformat(),
'timestamp': datetime.utcnow().isoformat()
})
def record_login_success(self, user_id: str, source_ip: str) -> str:
"""Record successful login and create session."""
user = self.users.get(user_id)
if not user:
raise ValueError("User not found")
user.login_failures = 0
user.last_login = datetime.utcnow()
# Create session
session_id = secrets.token_urlsafe(32)
self.sessions[session_id] = {
'user_id': user_id,
'created_at': datetime.utcnow(),
'last_activity': datetime.utcnow(),
'source_ip': source_ip,
'locked': False
}
self.audit.log({
'event_type': 'login_success',
'user_id': user_id,
'source_ip': source_ip,
'session_id': session_id,
'timestamp': datetime.utcnow().isoformat()
})
return session_id
def check_session(self, session_id: str) -> tuple:
"""
AC-11 & AC-12: Session management.
"""
session = self.sessions.get(session_id)
if not session:
return False, "invalid_session"
now = datetime.utcnow()
# AC-12: Check maximum session duration
if now - session['created_at'] > self.MAX_SESSION_DURATION:
self._terminate_session(session_id, "max_duration_exceeded")
return False, "session_expired"
# AC-11: Check for session lock
if now - session['last_activity'] > self.SESSION_LOCK_TIMEOUT:
session['locked'] = True
return False, "session_locked"
if session['locked']:
return False, "session_locked"
# Update activity
session['last_activity'] = now
return True, "valid"
def unlock_session(self, session_id: str, mfa_verified: bool) -> bool:
"""Unlock a locked session after MFA verification."""
session = self.sessions.get(session_id)
if not session:
return False
if not mfa_verified:
return False
session['locked'] = False
session['last_activity'] = datetime.utcnow()
self.audit.log({
'event_type': 'session_unlocked',
'session_id': session_id,
'user_id': session['user_id'],
'timestamp': datetime.utcnow().isoformat()
})
return True
def _terminate_session(self, session_id: str, reason: str):
"""Terminate a session."""
session = self.sessions.pop(session_id, None)
if session:
self.audit.log({
'event_type': 'session_terminated',
'session_id': session_id,
'user_id': session['user_id'],
'reason': reason,
'timestamp': datetime.utcnow().isoformat()
})
def _audit_access_denied(self, user_id: str, resource_id: str, reason: str):
"""Log access denial."""
self.audit.log({
'event_type': 'access_denied',
'user_id': user_id,
'resource_id': resource_id,
'reason': reason,
'timestamp': datetime.utcnow().isoformat()
})
def _audit_access_granted(self, user_id: str, resource_id: str, action: str):
"""Log access grant."""
self.audit.log({
'event_type': 'access_granted',
'user_id': user_id,
'resource_id': resource_id,
'action': action,
'timestamp': datetime.utcnow().isoformat()
})
def get_system_use_notification(self) -> str:
"""
AC-8: System Use Notification.
"""
return """
*******************************************************************************
* U.S. GOVERNMENT SYSTEM *
*******************************************************************************
This is a U.S. Government information system. This system is provided for
authorized use only. By using this system, you acknowledge that:
1. You have no reasonable expectation of privacy regarding any communication
or data transiting or stored on this system.
2. At any time, and for any lawful Government purpose, the Government may
monitor, intercept, and search any communication or data transiting or
stored on this system.
3. Any communication or data transiting or stored on this system may be
disclosed or used for any lawful Government purpose.
4. Unauthorized use of this system is prohibited and may be subject to
criminal and civil penalties.
By proceeding, you consent to these terms.
*******************************************************************************
"""Audit and Accountability (AU Family)
# fedramp_audit.py
"""
FedRAMP Audit and Accountability (AU) family implementation.
"""
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
from enum import Enum
import json
import hashlib
import gzip
import os
class AuditEventType(Enum):
"""Types of auditable events per AU-2."""
LOGIN_SUCCESS = "login_success"
LOGIN_FAILURE = "login_failure"
LOGOUT = "logout"
ACCESS_GRANTED = "access_granted"
ACCESS_DENIED = "access_denied"
PRIVILEGE_ESCALATION = "privilege_escalation"
ADMIN_ACTION = "admin_action"
SECURITY_CONFIG_CHANGE = "security_config_change"
DATA_ACCESS = "data_access"
DATA_MODIFICATION = "data_modification"
DATA_DELETION = "data_deletion"
SYSTEM_STARTUP = "system_startup"
SYSTEM_SHUTDOWN = "system_shutdown"
AUDIT_LOG_ACCESS = "audit_log_access"
@dataclass
class AuditRecord:
"""
Audit record structure per AU-3.
Required content:
- Type of event
- When the event occurred
- Where the event occurred
- Source of the event
- Outcome of the event
- Identity of individuals/subjects/objects
"""
record_id: str
event_type: AuditEventType
timestamp: datetime
source_system: str
source_component: str
source_ip: Optional[str]
user_id: Optional[str]
subject_type: str
object_id: Optional[str]
object_type: Optional[str]
action: str
outcome: str # success, failure
outcome_reason: Optional[str]
additional_data: Dict[str, Any]
integrity_hash: str
class FedRAMPAuditLogger:
"""
Implements FedRAMP Audit family controls.
Controls:
- AU-2: Audit Events
- AU-3: Content of Audit Records
- AU-4: Audit Storage Capacity
- AU-5: Response to Audit Processing Failures
- AU-6: Audit Review, Analysis, and Reporting
- AU-7: Audit Reduction and Report Generation
- AU-8: Time Stamps
- AU-9: Protection of Audit Information
- AU-10: Non-repudiation
- AU-11: Audit Record Retention
- AU-12: Audit Generation
"""
# AU-11: Retention period (minimum 90 days online, 1 year total)
ONLINE_RETENTION_DAYS = 90
TOTAL_RETENTION_DAYS = 365
def __init__(
self,
storage_backend,
alert_callback=None,
backup_backend=None
):
self.storage = storage_backend
self.alert_callback = alert_callback
self.backup_storage = backup_backend
self.previous_hash = "0" * 64
self.sequence_number = 0
self.failed_writes = 0
self.max_failed_writes = 5
def log(self, event_data: dict) -> str:
"""
AU-12: Audit Generation.
"""
# AU-8: Add trusted timestamp
timestamp = datetime.utcnow()
record = AuditRecord(
record_id=self._generate_record_id(),
event_type=AuditEventType(event_data.get('event_type', 'admin_action')),
timestamp=timestamp,
source_system=event_data.get('source_system', 'unknown'),
source_component=event_data.get('source_component', 'unknown'),
source_ip=event_data.get('source_ip'),
user_id=event_data.get('user_id'),
subject_type=event_data.get('subject_type', 'user'),
object_id=event_data.get('object_id'),
object_type=event_data.get('object_type'),
action=event_data.get('action', 'unknown'),
outcome=event_data.get('outcome', 'success'),
outcome_reason=event_data.get('outcome_reason'),
additional_data=event_data.get('additional_data', {}),
integrity_hash="" # Will be set below
)
# AU-10: Non-repudiation - create integrity hash
record.integrity_hash = self._calculate_integrity_hash(record)
# AU-12: Write to audit log
try:
self._write_record(record)
self.failed_writes = 0
except Exception as e:
self._handle_write_failure(record, e)
return record.record_id
def _write_record(self, record: AuditRecord):
"""Write audit record with integrity protection."""
record_dict = {
'record_id': record.record_id,
'event_type': record.event_type.value,
'timestamp': record.timestamp.isoformat(),
'source_system': record.source_system,
'source_component': record.source_component,
'source_ip': record.source_ip,
'user_id': record.user_id,
'subject_type': record.subject_type,
'object_id': record.object_id,
'object_type': record.object_type,
'action': record.action,
'outcome': record.outcome,
'outcome_reason': record.outcome_reason,
'additional_data': record.additional_data,
'integrity_hash': record.integrity_hash,
'previous_hash': self.previous_hash,
'sequence': self.sequence_number
}
self.storage.write(record_dict)
# Update chain
self.previous_hash = record.integrity_hash
self.sequence_number += 1
# AU-4: Check storage capacity
self._check_storage_capacity()
def _handle_write_failure(self, record: AuditRecord, error: Exception):
"""
AU-5: Response to Audit Processing Failures.
"""
self.failed_writes += 1
# Try backup storage
if self.backup_storage:
try:
self.backup_storage.write(record.__dict__)
except:
pass
# Alert on failures
if self.alert_callback:
self.alert_callback({
'alert_type': 'audit_write_failure',
'record_id': record.record_id,
'error': str(error),
'failed_writes': self.failed_writes
})
# AU-5(2): Generate real-time alert for critical
if self.failed_writes >= self.max_failed_writes:
if self.alert_callback:
self.alert_callback({
'alert_type': 'audit_system_critical',
'message': 'Audit logging has failed multiple times',
'failed_writes': self.failed_writes,
'severity': 'critical'
})
def _calculate_integrity_hash(self, record: AuditRecord) -> str:
"""
AU-9 & AU-10: Protect audit information and ensure non-repudiation.
"""
data_to_hash = json.dumps({
'record_id': record.record_id,
'event_type': record.event_type.value,
'timestamp': record.timestamp.isoformat(),
'user_id': record.user_id,
'action': record.action,
'outcome': record.outcome,
'previous_hash': self.previous_hash,
'sequence': self.sequence_number
}, sort_keys=True)
return hashlib.sha256(data_to_hash.encode()).hexdigest()
def _check_storage_capacity(self):
"""
AU-4: Audit Storage Capacity monitoring.
"""
capacity = self.storage.get_capacity()
if capacity['used_percent'] > 90:
if self.alert_callback:
self.alert_callback({
'alert_type': 'audit_storage_warning',
'used_percent': capacity['used_percent'],
'severity': 'warning'
})
if capacity['used_percent'] > 95:
# Archive old records
self._archive_old_records()
def _archive_old_records(self):
"""Archive records older than online retention."""
cutoff = datetime.utcnow() - timedelta(days=self.ONLINE_RETENTION_DAYS)
old_records = self.storage.get_records_before(cutoff)
if self.backup_storage:
for record in old_records:
self.backup_storage.archive(record)
self.storage.delete_records_before(cutoff)
def verify_integrity(self, start_sequence: int = 0) -> dict:
"""
AU-9: Verify audit log integrity.
"""
records = self.storage.get_all_records(start_sequence)
issues = []
prev_hash = "0" * 64 if start_sequence == 0 else None
prev_seq = start_sequence - 1
for record in records:
# Verify sequence
if record['sequence'] != prev_seq + 1:
issues.append({
'type': 'sequence_gap',
'expected': prev_seq + 1,
'found': record['sequence'],
'record_id': record['record_id']
})
# Verify chain
if prev_hash and record.get('previous_hash') != prev_hash:
issues.append({
'type': 'chain_break',
'record_id': record['record_id'],
'expected_prev': prev_hash,
'found_prev': record.get('previous_hash')
})
# Verify record hash
expected_hash = self._recalculate_hash(record)
if record['integrity_hash'] != expected_hash:
issues.append({
'type': 'hash_mismatch',
'record_id': record['record_id']
})
prev_hash = record['integrity_hash']
prev_seq = record['sequence']
return {
'verified': len(issues) == 0,
'records_checked': len(records),
'issues': issues
}
def _recalculate_hash(self, record: dict) -> str:
"""Recalculate hash for verification."""
data_to_hash = json.dumps({
'record_id': record['record_id'],
'event_type': record['event_type'],
'timestamp': record['timestamp'],
'user_id': record['user_id'],
'action': record['action'],
'outcome': record['outcome'],
'previous_hash': record['previous_hash'],
'sequence': record['sequence']
}, sort_keys=True)
return hashlib.sha256(data_to_hash.encode()).hexdigest()
def _generate_record_id(self) -> str:
"""Generate unique record ID."""
import secrets
return f"AUD-{datetime.utcnow().strftime('%Y%m%d')}-{secrets.token_hex(6)}"
def search_records(
self,
filters: dict,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
limit: int = 1000
) -> List[dict]:
"""
AU-7: Audit Reduction and Report Generation.
"""
return self.storage.search(
filters=filters,
start_date=start_date,
end_date=end_date,
limit=limit
)
def generate_report(
self,
report_type: str,
start_date: datetime,
end_date: datetime
) -> dict:
"""
AU-6: Audit Review, Analysis, and Reporting.
"""
records = self.search_records(
filters={},
start_date=start_date,
end_date=end_date,
limit=100000
)
report = {
'report_type': report_type,
'generated_at': datetime.utcnow().isoformat(),
'period': {
'start': start_date.isoformat(),
'end': end_date.isoformat()
},
'total_events': len(records),
'events_by_type': {},
'events_by_outcome': {'success': 0, 'failure': 0},
'unique_users': set(),
'anomalies': []
}
for record in records:
event_type = record.get('event_type', 'unknown')
report['events_by_type'][event_type] = report['events_by_type'].get(event_type, 0) + 1
outcome = record.get('outcome', 'unknown')
if outcome in report['events_by_outcome']:
report['events_by_outcome'][outcome] += 1
if record.get('user_id'):
report['unique_users'].add(record['user_id'])
report['unique_users'] = len(report['unique_users'])
# Detect anomalies
failure_rate = report['events_by_outcome']['failure'] / max(report['total_events'], 1)
if failure_rate > 0.1:
report['anomalies'].append({
'type': 'high_failure_rate',
'value': failure_rate,
'threshold': 0.1
})
return reportContinuous Monitoring
# continuous_monitoring.py
"""
FedRAMP Continuous Monitoring implementation.
"""
from dataclasses import dataclass
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from enum import Enum
class MonitoringFrequency(Enum):
CONTINUOUS = "continuous"
DAILY = "daily"
WEEKLY = "weekly"
MONTHLY = "monthly"
QUARTERLY = "quarterly"
ANNUAL = "annual"
@dataclass
class ControlAssessment:
"""Assessment result for a security control."""
control_id: str
assessment_date: datetime
status: str # satisfied, partially_satisfied, not_satisfied
findings: List[str]
evidence: List[str]
assessor: str
next_assessment: datetime
class ContinuousMonitoring:
"""
Implements FedRAMP Continuous Monitoring requirements.
"""
# Assessment frequencies by control family
ASSESSMENT_SCHEDULE = {
'AC': MonitoringFrequency.MONTHLY, # Access Control
'AU': MonitoringFrequency.MONTHLY, # Audit
'CA': MonitoringFrequency.ANNUAL, # Assessment
'CM': MonitoringFrequency.MONTHLY, # Configuration
'CP': MonitoringFrequency.ANNUAL, # Contingency Planning
'IA': MonitoringFrequency.MONTHLY, # Identification/Auth
'IR': MonitoringFrequency.QUARTERLY, # Incident Response
'MA': MonitoringFrequency.QUARTERLY, # Maintenance
'MP': MonitoringFrequency.QUARTERLY, # Media Protection
'PE': MonitoringFrequency.ANNUAL, # Physical
'PL': MonitoringFrequency.ANNUAL, # Planning
'PS': MonitoringFrequency.QUARTERLY, # Personnel
'RA': MonitoringFrequency.QUARTERLY, # Risk Assessment
'SA': MonitoringFrequency.ANNUAL, # System Acquisition
'SC': MonitoringFrequency.MONTHLY, # System Communications
'SI': MonitoringFrequency.MONTHLY, # System Integrity
}
def __init__(self, conmon_storage, alert_service):
self.storage = conmon_storage
self.alerts = alert_service
self.assessments: Dict[str, ControlAssessment] = {}
def assess_control(
self,
control_id: str,
assessor: str,
evidence: List[str]
) -> ControlAssessment:
"""Perform control assessment."""
control_family = control_id.split('-')[0]
frequency = self.ASSESSMENT_SCHEDULE.get(control_family, MonitoringFrequency.QUARTERLY)
# Run automated checks
findings = self._run_automated_checks(control_id)
# Determine status
if not findings:
status = "satisfied"
elif any(f.get('severity') == 'high' for f in findings):
status = "not_satisfied"
else:
status = "partially_satisfied"
assessment = ControlAssessment(
control_id=control_id,
assessment_date=datetime.utcnow(),
status=status,
findings=[f['description'] for f in findings],
evidence=evidence,
assessor=assessor,
next_assessment=self._calculate_next_assessment(frequency)
)
self.assessments[control_id] = assessment
self.storage.save_assessment(assessment)
# Generate POA&M items for findings
if status != "satisfied":
self._create_poam_items(control_id, findings)
return assessment
def _run_automated_checks(self, control_id: str) -> List[dict]:
"""Run automated security checks for a control."""
findings = []
# Example checks by control
if control_id.startswith('AC-2'):
findings.extend(self._check_account_management())
elif control_id.startswith('AC-7'):
findings.extend(self._check_login_lockout())
elif control_id.startswith('AU-'):
findings.extend(self._check_audit_logging())
elif control_id.startswith('CM-'):
findings.extend(self._check_configuration_management())
elif control_id.startswith('SI-'):
findings.extend(self._check_system_integrity())
return findings
def _check_account_management(self) -> List[dict]:
"""AC-2 automated checks."""
findings = []
# Check for inactive accounts
inactive_threshold = datetime.utcnow() - timedelta(days=90)
# Would query user database
inactive_count = 0 # Placeholder
if inactive_count > 0:
findings.append({
'description': f'{inactive_count} accounts inactive for >90 days',
'severity': 'medium',
'control': 'AC-2(3)'
})
return findings
def _check_login_lockout(self) -> List[dict]:
"""AC-7 automated checks."""
findings = []
# Check lockout configuration
return findings
def _check_audit_logging(self) -> List[dict]:
"""AU family automated checks."""
findings = []
# Verify audit logging is operational
return findings
def _check_configuration_management(self) -> List[dict]:
"""CM family automated checks."""
findings = []
# Check for configuration drift
return findings
def _check_system_integrity(self) -> List[dict]:
"""SI family automated checks."""
findings = []
# Check for vulnerabilities, malware
return findings
def _calculate_next_assessment(self, frequency: MonitoringFrequency) -> datetime:
"""Calculate next assessment date."""
now = datetime.utcnow()
if frequency == MonitoringFrequency.CONTINUOUS:
return now + timedelta(hours=1)
elif frequency == MonitoringFrequency.DAILY:
return now + timedelta(days=1)
elif frequency == MonitoringFrequency.WEEKLY:
return now + timedelta(weeks=1)
elif frequency == MonitoringFrequency.MONTHLY:
return now + timedelta(days=30)
elif frequency == MonitoringFrequency.QUARTERLY:
return now + timedelta(days=90)
else: # ANNUAL
return now + timedelta(days=365)
def _create_poam_items(self, control_id: str, findings: List[dict]):
"""Create POA&M items for findings."""
for finding in findings:
self.storage.create_poam({
'control_id': control_id,
'weakness': finding['description'],
'severity': finding['severity'],
'identified_date': datetime.utcnow().isoformat(),
'status': 'open'
})
def generate_conmon_report(self) -> dict:
"""Generate monthly continuous monitoring report."""
return {
'report_period': {
'start': (datetime.utcnow() - timedelta(days=30)).isoformat(),
'end': datetime.utcnow().isoformat()
},
'control_status': self._summarize_control_status(),
'vulnerability_summary': self._get_vulnerability_summary(),
'poam_summary': self._get_poam_summary(),
'significant_changes': self._get_significant_changes(),
'incidents': self._get_incident_summary()
}
def _summarize_control_status(self) -> dict:
"""Summarize control assessment status."""
summary = {'satisfied': 0, 'partially_satisfied': 0, 'not_satisfied': 0}
for assessment in self.assessments.values():
summary[assessment.status] = summary.get(assessment.status, 0) + 1
return summary
def _get_vulnerability_summary(self) -> dict:
"""Get vulnerability scan summary."""
return {
'critical': 0,
'high': 0,
'medium': 0,
'low': 0,
'scan_date': datetime.utcnow().isoformat()
}
def _get_poam_summary(self) -> dict:
"""Get POA&M status summary."""
return {
'open': 0,
'in_progress': 0,
'closed_this_period': 0,
'overdue': 0
}
def _get_significant_changes(self) -> List[dict]:
"""Get significant system changes."""
return []
def _get_incident_summary(self) -> dict:
"""Get security incident summary."""
return {
'total_incidents': 0,
'by_severity': {}
}Conclusion
FedRAMP compliance requires comprehensive implementation of NIST 800-53 controls with continuous monitoring:
- Access control with MFA, session management, and least privilege
- Audit logging with integrity protection and retention
- Continuous monitoring with automated assessments
- POA&M management for tracking and remediating findings
Organizations should leverage automation wherever possible to maintain compliance efficiently.