Files
2026-03-12 15:17:52 +07:00

1109 lines
49 KiB
Python

#!/usr/bin/env python3
"""
Rollback Generator - Generate comprehensive rollback procedures for migrations
This tool takes a migration plan and generates detailed rollback procedures for each phase,
including data rollback scripts, service rollback steps, validation checks, and communication
templates to ensure safe and reliable migration reversals.
Author: Migration Architect Skill
Version: 1.0.0
License: MIT
"""
import json
import argparse
import sys
import datetime
import hashlib
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
class RollbackTrigger(Enum):
"""Types of rollback triggers"""
MANUAL = "manual"
AUTOMATED = "automated"
THRESHOLD_BASED = "threshold_based"
TIME_BASED = "time_based"
class RollbackUrgency(Enum):
"""Rollback urgency levels"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
EMERGENCY = "emergency"
@dataclass
class RollbackStep:
"""Individual rollback step"""
step_id: str
name: str
description: str
script_type: str # sql, bash, api, manual
script_content: str
estimated_duration_minutes: int
dependencies: List[str]
validation_commands: List[str]
success_criteria: List[str]
failure_escalation: str
rollback_order: int
@dataclass
class RollbackPhase:
"""Rollback phase containing multiple steps"""
phase_name: str
description: str
urgency_level: str
estimated_duration_minutes: int
prerequisites: List[str]
steps: List[RollbackStep]
validation_checkpoints: List[str]
communication_requirements: List[str]
risk_level: str
@dataclass
class RollbackTriggerCondition:
"""Conditions that trigger automatic rollback"""
trigger_id: str
name: str
condition: str
metric_threshold: Optional[Dict[str, Any]]
evaluation_window_minutes: int
auto_execute: bool
escalation_contacts: List[str]
@dataclass
class DataRecoveryPlan:
"""Data recovery and restoration plan"""
recovery_method: str # backup_restore, point_in_time, event_replay
backup_location: str
recovery_scripts: List[str]
data_validation_queries: List[str]
estimated_recovery_time_minutes: int
recovery_dependencies: List[str]
@dataclass
class CommunicationTemplate:
"""Communication template for rollback scenarios"""
template_type: str # start, progress, completion, escalation
audience: str # technical, business, executive, customers
subject: str
body: str
urgency: str
delivery_methods: List[str]
@dataclass
class RollbackRunbook:
"""Complete rollback runbook"""
runbook_id: str
migration_id: str
created_at: str
rollback_phases: List[RollbackPhase]
trigger_conditions: List[RollbackTriggerCondition]
data_recovery_plan: DataRecoveryPlan
communication_templates: List[CommunicationTemplate]
escalation_matrix: Dict[str, Any]
validation_checklist: List[str]
post_rollback_procedures: List[str]
emergency_contacts: List[Dict[str, str]]
class RollbackGenerator:
"""Main rollback generator class"""
def __init__(self):
self.rollback_templates = self._load_rollback_templates()
self.validation_templates = self._load_validation_templates()
self.communication_templates = self._load_communication_templates()
def _load_rollback_templates(self) -> Dict[str, Any]:
"""Load rollback script templates for different migration types"""
return {
"database": {
"schema_rollback": {
"drop_table": "DROP TABLE IF EXISTS {table_name};",
"drop_column": "ALTER TABLE {table_name} DROP COLUMN IF EXISTS {column_name};",
"restore_column": "ALTER TABLE {table_name} ADD COLUMN {column_definition};",
"revert_type": "ALTER TABLE {table_name} ALTER COLUMN {column_name} TYPE {original_type};",
"drop_constraint": "ALTER TABLE {table_name} DROP CONSTRAINT {constraint_name};",
"add_constraint": "ALTER TABLE {table_name} ADD CONSTRAINT {constraint_name} {constraint_definition};"
},
"data_rollback": {
"restore_backup": "pg_restore -d {database_name} -c {backup_file}",
"point_in_time_recovery": "SELECT pg_create_restore_point('pre_migration_{timestamp}');",
"delete_migrated_data": "DELETE FROM {table_name} WHERE migration_batch_id = '{batch_id}';",
"restore_original_values": "UPDATE {table_name} SET {column_name} = backup_{column_name} WHERE migration_flag = true;"
}
},
"service": {
"deployment_rollback": {
"rollback_blue_green": "kubectl patch service {service_name} -p '{\"spec\":{\"selector\":{\"version\":\"blue\"}}}'",
"rollback_canary": "kubectl scale deployment {service_name}-canary --replicas=0",
"restore_previous_version": "kubectl rollout undo deployment/{service_name} --to-revision={revision_number}",
"update_load_balancer": "aws elbv2 modify-rule --rule-arn {rule_arn} --actions Type=forward,TargetGroupArn={original_target_group}"
},
"configuration_rollback": {
"restore_config_map": "kubectl apply -f {original_config_file}",
"revert_feature_flags": "curl -X PUT {feature_flag_api}/flags/{flag_name} -d '{\"enabled\": false}'",
"restore_environment_vars": "kubectl set env deployment/{deployment_name} {env_var_name}={original_value}"
}
},
"infrastructure": {
"cloud_rollback": {
"revert_terraform": "terraform apply -target={resource_name} {rollback_plan_file}",
"restore_dns": "aws route53 change-resource-record-sets --hosted-zone-id {zone_id} --change-batch file://{rollback_dns_changes}",
"rollback_security_groups": "aws ec2 authorize-security-group-ingress --group-id {group_id} --protocol {protocol} --port {port} --cidr {cidr}",
"restore_iam_policies": "aws iam put-role-policy --role-name {role_name} --policy-name {policy_name} --policy-document file://{original_policy}"
},
"network_rollback": {
"restore_routing": "aws ec2 replace-route --route-table-id {route_table_id} --destination-cidr-block {cidr} --gateway-id {original_gateway}",
"revert_load_balancer": "aws elbv2 modify-load-balancer --load-balancer-arn {lb_arn} --scheme {original_scheme}",
"restore_firewall_rules": "aws ec2 revoke-security-group-ingress --group-id {group_id} --protocol {protocol} --port {port} --source-group {source_group}"
}
}
}
def _load_validation_templates(self) -> Dict[str, List[str]]:
"""Load validation command templates"""
return {
"database": [
"SELECT COUNT(*) FROM {table_name};",
"SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{table_name}';",
"SELECT COUNT(*) FROM information_schema.columns WHERE table_name = '{table_name}' AND column_name = '{column_name}';",
"SELECT COUNT(DISTINCT {primary_key}) FROM {table_name};",
"SELECT MAX({timestamp_column}) FROM {table_name};"
],
"service": [
"curl -f {health_check_url}",
"kubectl get pods -l app={service_name} --field-selector=status.phase=Running",
"kubectl logs deployment/{service_name} --tail=100 | grep -i error",
"curl -f {service_endpoint}/api/v1/status"
],
"infrastructure": [
"aws ec2 describe-instances --instance-ids {instance_id} --query 'Reservations[*].Instances[*].State.Name'",
"nslookup {domain_name}",
"curl -I {load_balancer_url}",
"aws elbv2 describe-target-health --target-group-arn {target_group_arn}"
]
}
def _load_communication_templates(self) -> Dict[str, Dict[str, str]]:
"""Load communication templates"""
return {
"rollback_start": {
"technical": {
"subject": "ROLLBACK INITIATED: {migration_name}",
"body": """Team,
We have initiated rollback for migration: {migration_name}
Rollback ID: {rollback_id}
Start Time: {start_time}
Estimated Duration: {estimated_duration}
Reason: {rollback_reason}
Current Status: Rolling back phase {current_phase}
Next Updates: Every 15 minutes or upon phase completion
Actions Required:
- Monitor system health dashboards
- Stand by for escalation if needed
- Do not make manual changes during rollback
Incident Commander: {incident_commander}
"""
},
"business": {
"subject": "System Rollback In Progress - {system_name}",
"body": """Business Stakeholders,
We are currently performing a planned rollback of the {system_name} migration due to {rollback_reason}.
Impact: {business_impact}
Expected Resolution: {estimated_completion_time}
Affected Services: {affected_services}
We will provide updates every 30 minutes.
Contact: {business_contact}
"""
},
"executive": {
"subject": "EXEC ALERT: Critical System Rollback - {system_name}",
"body": """Executive Team,
A critical rollback is in progress for {system_name}.
Summary:
- Rollback Reason: {rollback_reason}
- Business Impact: {business_impact}
- Expected Resolution: {estimated_completion_time}
- Customer Impact: {customer_impact}
We are following established procedures and will update hourly.
Escalation: {escalation_contact}
"""
}
},
"rollback_complete": {
"technical": {
"subject": "ROLLBACK COMPLETED: {migration_name}",
"body": """Team,
Rollback has been successfully completed for migration: {migration_name}
Summary:
- Start Time: {start_time}
- End Time: {end_time}
- Duration: {actual_duration}
- Phases Completed: {completed_phases}
Validation Results:
{validation_results}
System Status: {system_status}
Next Steps:
- Continue monitoring for 24 hours
- Post-rollback review scheduled for {review_date}
- Root cause analysis to begin
All clear to resume normal operations.
Incident Commander: {incident_commander}
"""
}
}
}
def generate_rollback_runbook(self, migration_plan: Dict[str, Any]) -> RollbackRunbook:
"""Generate comprehensive rollback runbook from migration plan"""
runbook_id = f"rb_{hashlib.md5(str(migration_plan).encode()).hexdigest()[:8]}"
migration_id = migration_plan.get("migration_id", "unknown")
migration_type = migration_plan.get("migration_type", "unknown")
# Generate rollback phases (reverse order of migration phases)
rollback_phases = self._generate_rollback_phases(migration_plan)
# Generate trigger conditions
trigger_conditions = self._generate_trigger_conditions(migration_plan)
# Generate data recovery plan
data_recovery_plan = self._generate_data_recovery_plan(migration_plan)
# Generate communication templates
communication_templates = self._generate_communication_templates(migration_plan)
# Generate escalation matrix
escalation_matrix = self._generate_escalation_matrix(migration_plan)
# Generate validation checklist
validation_checklist = self._generate_validation_checklist(migration_plan)
# Generate post-rollback procedures
post_rollback_procedures = self._generate_post_rollback_procedures(migration_plan)
# Generate emergency contacts
emergency_contacts = self._generate_emergency_contacts(migration_plan)
return RollbackRunbook(
runbook_id=runbook_id,
migration_id=migration_id,
created_at=datetime.datetime.now().isoformat(),
rollback_phases=rollback_phases,
trigger_conditions=trigger_conditions,
data_recovery_plan=data_recovery_plan,
communication_templates=communication_templates,
escalation_matrix=escalation_matrix,
validation_checklist=validation_checklist,
post_rollback_procedures=post_rollback_procedures,
emergency_contacts=emergency_contacts
)
def _generate_rollback_phases(self, migration_plan: Dict[str, Any]) -> List[RollbackPhase]:
"""Generate rollback phases from migration plan"""
migration_phases = migration_plan.get("phases", [])
migration_type = migration_plan.get("migration_type", "unknown")
rollback_phases = []
# Reverse the order of migration phases for rollback
for i, phase in enumerate(reversed(migration_phases)):
if isinstance(phase, dict):
phase_name = phase.get("name", f"phase_{i}")
phase_duration = phase.get("duration_hours", 2) * 60 # Convert to minutes
phase_risk = phase.get("risk_level", "medium")
else:
phase_name = str(phase)
phase_duration = 120 # Default 2 hours
phase_risk = "medium"
rollback_steps = self._generate_rollback_steps(phase_name, migration_type, i)
rollback_phase = RollbackPhase(
phase_name=f"rollback_{phase_name}",
description=f"Rollback changes made during {phase_name} phase",
urgency_level=self._calculate_urgency(phase_risk),
estimated_duration_minutes=phase_duration // 2, # Rollback typically faster
prerequisites=self._get_rollback_prerequisites(phase_name, i),
steps=rollback_steps,
validation_checkpoints=self._get_validation_checkpoints(phase_name, migration_type),
communication_requirements=self._get_communication_requirements(phase_name, phase_risk),
risk_level=phase_risk
)
rollback_phases.append(rollback_phase)
return rollback_phases
def _generate_rollback_steps(self, phase_name: str, migration_type: str, phase_index: int) -> List[RollbackStep]:
"""Generate specific rollback steps for a phase"""
steps = []
templates = self.rollback_templates.get(migration_type, {})
if migration_type == "database":
if "migration" in phase_name.lower() or "cutover" in phase_name.lower():
# Data rollback steps
steps.extend([
RollbackStep(
step_id=f"rb_data_{phase_index}_01",
name="Stop data migration processes",
description="Halt all ongoing data migration processes",
script_type="sql",
script_content="-- Stop migration processes\nSELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE query LIKE '%migration%';",
estimated_duration_minutes=5,
dependencies=[],
validation_commands=["SELECT COUNT(*) FROM pg_stat_activity WHERE query LIKE '%migration%';"],
success_criteria=["No active migration processes"],
failure_escalation="Contact DBA immediately",
rollback_order=1
),
RollbackStep(
step_id=f"rb_data_{phase_index}_02",
name="Restore from backup",
description="Restore database from pre-migration backup",
script_type="bash",
script_content=templates.get("data_rollback", {}).get("restore_backup", "pg_restore -d {database_name} -c {backup_file}"),
estimated_duration_minutes=30,
dependencies=[f"rb_data_{phase_index}_01"],
validation_commands=["SELECT COUNT(*) FROM information_schema.tables;"],
success_criteria=["Database restored successfully", "All expected tables present"],
failure_escalation="Escalate to senior DBA and infrastructure team",
rollback_order=2
)
])
if "preparation" in phase_name.lower():
# Schema rollback steps
steps.append(
RollbackStep(
step_id=f"rb_schema_{phase_index}_01",
name="Drop migration artifacts",
description="Remove temporary migration tables and procedures",
script_type="sql",
script_content="-- Drop migration artifacts\nDROP TABLE IF EXISTS migration_log;\nDROP PROCEDURE IF EXISTS migrate_data();",
estimated_duration_minutes=5,
dependencies=[],
validation_commands=["SELECT COUNT(*) FROM information_schema.tables WHERE table_name LIKE '%migration%';"],
success_criteria=["No migration artifacts remain"],
failure_escalation="Manual cleanup required",
rollback_order=1
)
)
elif migration_type == "service":
if "cutover" in phase_name.lower():
# Service rollback steps
steps.extend([
RollbackStep(
step_id=f"rb_service_{phase_index}_01",
name="Redirect traffic back to old service",
description="Update load balancer to route traffic back to previous service version",
script_type="bash",
script_content=templates.get("deployment_rollback", {}).get("update_load_balancer", "aws elbv2 modify-rule --rule-arn {rule_arn} --actions Type=forward,TargetGroupArn={original_target_group}"),
estimated_duration_minutes=2,
dependencies=[],
validation_commands=["curl -f {health_check_url}"],
success_criteria=["Traffic routing to original service", "Health checks passing"],
failure_escalation="Emergency procedure - manual traffic routing",
rollback_order=1
),
RollbackStep(
step_id=f"rb_service_{phase_index}_02",
name="Rollback service deployment",
description="Revert to previous service deployment version",
script_type="bash",
script_content=templates.get("deployment_rollback", {}).get("restore_previous_version", "kubectl rollout undo deployment/{service_name} --to-revision={revision_number}"),
estimated_duration_minutes=10,
dependencies=[f"rb_service_{phase_index}_01"],
validation_commands=["kubectl get pods -l app={service_name} --field-selector=status.phase=Running"],
success_criteria=["Previous version deployed", "All pods running"],
failure_escalation="Manual pod management required",
rollback_order=2
)
])
elif migration_type == "infrastructure":
steps.extend([
RollbackStep(
step_id=f"rb_infra_{phase_index}_01",
name="Revert infrastructure changes",
description="Apply terraform plan to revert infrastructure to previous state",
script_type="bash",
script_content=templates.get("cloud_rollback", {}).get("revert_terraform", "terraform apply -target={resource_name} {rollback_plan_file}"),
estimated_duration_minutes=15,
dependencies=[],
validation_commands=["terraform plan -detailed-exitcode"],
success_criteria=["Infrastructure matches previous state", "No planned changes"],
failure_escalation="Manual infrastructure review required",
rollback_order=1
),
RollbackStep(
step_id=f"rb_infra_{phase_index}_02",
name="Restore DNS configuration",
description="Revert DNS changes to point back to original infrastructure",
script_type="bash",
script_content=templates.get("cloud_rollback", {}).get("restore_dns", "aws route53 change-resource-record-sets --hosted-zone-id {zone_id} --change-batch file://{rollback_dns_changes}"),
estimated_duration_minutes=10,
dependencies=[f"rb_infra_{phase_index}_01"],
validation_commands=["nslookup {domain_name}"],
success_criteria=["DNS resolves to original endpoints"],
failure_escalation="Contact DNS administrator",
rollback_order=2
)
])
# Add generic validation step for all migration types
steps.append(
RollbackStep(
step_id=f"rb_validate_{phase_index}_final",
name="Validate rollback completion",
description=f"Comprehensive validation that {phase_name} rollback completed successfully",
script_type="manual",
script_content="Execute validation checklist for this phase",
estimated_duration_minutes=10,
dependencies=[step.step_id for step in steps],
validation_commands=self.validation_templates.get(migration_type, []),
success_criteria=[f"{phase_name} fully rolled back", "All validation checks pass"],
failure_escalation=f"Investigate {phase_name} rollback failures",
rollback_order=99
)
)
return steps
def _generate_trigger_conditions(self, migration_plan: Dict[str, Any]) -> List[RollbackTriggerCondition]:
"""Generate automatic rollback trigger conditions"""
triggers = []
migration_type = migration_plan.get("migration_type", "unknown")
# Generic triggers for all migration types
triggers.extend([
RollbackTriggerCondition(
trigger_id="error_rate_spike",
name="Error Rate Spike",
condition="error_rate > baseline * 5 for 5 minutes",
metric_threshold={
"metric": "error_rate",
"operator": "greater_than",
"value": "baseline_error_rate * 5",
"duration_minutes": 5
},
evaluation_window_minutes=5,
auto_execute=True,
escalation_contacts=["on_call_engineer", "migration_lead"]
),
RollbackTriggerCondition(
trigger_id="response_time_degradation",
name="Response Time Degradation",
condition="p95_response_time > baseline * 3 for 10 minutes",
metric_threshold={
"metric": "p95_response_time",
"operator": "greater_than",
"value": "baseline_p95 * 3",
"duration_minutes": 10
},
evaluation_window_minutes=10,
auto_execute=False,
escalation_contacts=["performance_team", "migration_lead"]
),
RollbackTriggerCondition(
trigger_id="availability_drop",
name="Service Availability Drop",
condition="availability < 95% for 2 minutes",
metric_threshold={
"metric": "availability",
"operator": "less_than",
"value": 0.95,
"duration_minutes": 2
},
evaluation_window_minutes=2,
auto_execute=True,
escalation_contacts=["sre_team", "incident_commander"]
)
])
# Migration-type specific triggers
if migration_type == "database":
triggers.extend([
RollbackTriggerCondition(
trigger_id="data_integrity_failure",
name="Data Integrity Check Failure",
condition="data_validation_failures > 0",
metric_threshold={
"metric": "data_validation_failures",
"operator": "greater_than",
"value": 0,
"duration_minutes": 1
},
evaluation_window_minutes=1,
auto_execute=True,
escalation_contacts=["dba_team", "data_team"]
),
RollbackTriggerCondition(
trigger_id="migration_progress_stalled",
name="Migration Progress Stalled",
condition="migration_progress unchanged for 30 minutes",
metric_threshold={
"metric": "migration_progress_rate",
"operator": "equals",
"value": 0,
"duration_minutes": 30
},
evaluation_window_minutes=30,
auto_execute=False,
escalation_contacts=["migration_team", "dba_team"]
)
])
elif migration_type == "service":
triggers.extend([
RollbackTriggerCondition(
trigger_id="cpu_utilization_spike",
name="CPU Utilization Spike",
condition="cpu_utilization > 90% for 15 minutes",
metric_threshold={
"metric": "cpu_utilization",
"operator": "greater_than",
"value": 0.90,
"duration_minutes": 15
},
evaluation_window_minutes=15,
auto_execute=False,
escalation_contacts=["devops_team", "infrastructure_team"]
),
RollbackTriggerCondition(
trigger_id="memory_leak_detected",
name="Memory Leak Detected",
condition="memory_usage increasing continuously for 20 minutes",
metric_threshold={
"metric": "memory_growth_rate",
"operator": "greater_than",
"value": "1MB/minute",
"duration_minutes": 20
},
evaluation_window_minutes=20,
auto_execute=True,
escalation_contacts=["development_team", "sre_team"]
)
])
return triggers
def _generate_data_recovery_plan(self, migration_plan: Dict[str, Any]) -> DataRecoveryPlan:
"""Generate data recovery plan"""
migration_type = migration_plan.get("migration_type", "unknown")
if migration_type == "database":
return DataRecoveryPlan(
recovery_method="point_in_time",
backup_location="/backups/pre_migration_{migration_id}_{timestamp}.sql",
recovery_scripts=[
"pg_restore -d production -c /backups/pre_migration_backup.sql",
"SELECT pg_create_restore_point('rollback_point');",
"VACUUM ANALYZE; -- Refresh statistics after restore"
],
data_validation_queries=[
"SELECT COUNT(*) FROM critical_business_table;",
"SELECT MAX(created_at) FROM audit_log;",
"SELECT COUNT(DISTINCT user_id) FROM user_sessions;",
"SELECT SUM(amount) FROM financial_transactions WHERE date = CURRENT_DATE;"
],
estimated_recovery_time_minutes=45,
recovery_dependencies=["database_instance_running", "backup_file_accessible"]
)
else:
return DataRecoveryPlan(
recovery_method="backup_restore",
backup_location="/backups/pre_migration_state",
recovery_scripts=[
"# Restore configuration files from backup",
"cp -r /backups/pre_migration_state/config/* /app/config/",
"# Restart services with previous configuration",
"systemctl restart application_service"
],
data_validation_queries=[
"curl -f http://localhost:8080/health",
"curl -f http://localhost:8080/api/status"
],
estimated_recovery_time_minutes=20,
recovery_dependencies=["service_stopped", "backup_accessible"]
)
def _generate_communication_templates(self, migration_plan: Dict[str, Any]) -> List[CommunicationTemplate]:
"""Generate communication templates for rollback scenarios"""
templates = []
base_templates = self.communication_templates
# Rollback start notifications
for audience in ["technical", "business", "executive"]:
if audience in base_templates["rollback_start"]:
template_data = base_templates["rollback_start"][audience]
templates.append(CommunicationTemplate(
template_type="rollback_start",
audience=audience,
subject=template_data["subject"],
body=template_data["body"],
urgency="high" if audience == "executive" else "medium",
delivery_methods=["email", "slack"] if audience == "technical" else ["email"]
))
# Rollback completion notifications
for audience in ["technical", "business"]:
if audience in base_templates.get("rollback_complete", {}):
template_data = base_templates["rollback_complete"][audience]
templates.append(CommunicationTemplate(
template_type="rollback_complete",
audience=audience,
subject=template_data["subject"],
body=template_data["body"],
urgency="medium",
delivery_methods=["email", "slack"] if audience == "technical" else ["email"]
))
# Emergency escalation template
templates.append(CommunicationTemplate(
template_type="emergency_escalation",
audience="executive",
subject="CRITICAL: Rollback Emergency - {migration_name}",
body="""CRITICAL SITUATION - IMMEDIATE ATTENTION REQUIRED
Migration: {migration_name}
Issue: Rollback procedure has encountered critical failures
Current Status: {current_status}
Failed Components: {failed_components}
Business Impact: {business_impact}
Customer Impact: {customer_impact}
Immediate Actions:
1. Emergency response team activated
2. {emergency_action_1}
3. {emergency_action_2}
War Room: {war_room_location}
Bridge Line: {conference_bridge}
Next Update: {next_update_time}
Incident Commander: {incident_commander}
Executive On-Call: {executive_on_call}
""",
urgency="emergency",
delivery_methods=["email", "sms", "phone_call"]
))
return templates
def _generate_escalation_matrix(self, migration_plan: Dict[str, Any]) -> Dict[str, Any]:
"""Generate escalation matrix for different failure scenarios"""
return {
"level_1": {
"trigger": "Single component failure",
"response_time_minutes": 5,
"contacts": ["on_call_engineer", "migration_lead"],
"actions": ["Investigate issue", "Attempt automated remediation", "Monitor closely"]
},
"level_2": {
"trigger": "Multiple component failures or single critical failure",
"response_time_minutes": 2,
"contacts": ["senior_engineer", "team_lead", "devops_lead"],
"actions": ["Initiate rollback", "Establish war room", "Notify stakeholders"]
},
"level_3": {
"trigger": "System-wide failure or data corruption",
"response_time_minutes": 1,
"contacts": ["engineering_manager", "cto", "incident_commander"],
"actions": ["Emergency rollback", "All hands on deck", "Executive notification"]
},
"emergency": {
"trigger": "Business-critical failure with customer impact",
"response_time_minutes": 0,
"contacts": ["ceo", "cto", "head_of_operations"],
"actions": ["Emergency procedures", "Customer communication", "Media preparation if needed"]
}
}
def _generate_validation_checklist(self, migration_plan: Dict[str, Any]) -> List[str]:
"""Generate comprehensive validation checklist"""
migration_type = migration_plan.get("migration_type", "unknown")
base_checklist = [
"Verify system is responding to health checks",
"Confirm error rates are within normal parameters",
"Validate response times meet SLA requirements",
"Check all critical business processes are functioning",
"Verify monitoring and alerting systems are operational",
"Confirm no data corruption has occurred",
"Validate security controls are functioning properly",
"Check backup systems are working correctly",
"Verify integration points with downstream systems",
"Confirm user authentication and authorization working"
]
if migration_type == "database":
base_checklist.extend([
"Validate database schema matches expected state",
"Confirm referential integrity constraints",
"Check database performance metrics",
"Verify data consistency across related tables",
"Validate indexes and statistics are optimal",
"Confirm transaction logs are clean",
"Check database connections and connection pooling"
])
elif migration_type == "service":
base_checklist.extend([
"Verify service discovery is working correctly",
"Confirm load balancing is distributing traffic properly",
"Check service-to-service communication",
"Validate API endpoints are responding correctly",
"Confirm feature flags are in correct state",
"Check resource utilization (CPU, memory, disk)",
"Verify container orchestration is healthy"
])
elif migration_type == "infrastructure":
base_checklist.extend([
"Verify network connectivity between components",
"Confirm DNS resolution is working correctly",
"Check firewall rules and security groups",
"Validate load balancer configuration",
"Confirm SSL/TLS certificates are valid",
"Check storage systems are accessible",
"Verify backup and disaster recovery systems"
])
return base_checklist
def _generate_post_rollback_procedures(self, migration_plan: Dict[str, Any]) -> List[str]:
"""Generate post-rollback procedures"""
return [
"Monitor system stability for 24-48 hours post-rollback",
"Conduct thorough post-rollback testing of all critical paths",
"Review and analyze rollback metrics and timing",
"Document lessons learned and rollback procedure improvements",
"Schedule post-mortem meeting with all stakeholders",
"Update rollback procedures based on actual experience",
"Communicate rollback completion to all stakeholders",
"Archive rollback logs and artifacts for future reference",
"Review and update monitoring thresholds if needed",
"Plan for next migration attempt with improved procedures",
"Conduct security review to ensure no vulnerabilities introduced",
"Update disaster recovery procedures if affected by rollback",
"Review capacity planning based on rollback resource usage",
"Update documentation with rollback experience and timings"
]
def _generate_emergency_contacts(self, migration_plan: Dict[str, Any]) -> List[Dict[str, str]]:
"""Generate emergency contact list"""
return [
{
"role": "Incident Commander",
"name": "TBD - Assigned during migration",
"primary_phone": "+1-XXX-XXX-XXXX",
"email": "incident.commander@company.com",
"backup_contact": "backup.commander@company.com"
},
{
"role": "Technical Lead",
"name": "TBD - Migration technical owner",
"primary_phone": "+1-XXX-XXX-XXXX",
"email": "tech.lead@company.com",
"backup_contact": "senior.engineer@company.com"
},
{
"role": "Business Owner",
"name": "TBD - Business stakeholder",
"primary_phone": "+1-XXX-XXX-XXXX",
"email": "business.owner@company.com",
"backup_contact": "product.manager@company.com"
},
{
"role": "On-Call Engineer",
"name": "Current on-call rotation",
"primary_phone": "+1-XXX-XXX-XXXX",
"email": "oncall@company.com",
"backup_contact": "backup.oncall@company.com"
},
{
"role": "Executive Escalation",
"name": "CTO/VP Engineering",
"primary_phone": "+1-XXX-XXX-XXXX",
"email": "cto@company.com",
"backup_contact": "vp.engineering@company.com"
}
]
def _calculate_urgency(self, risk_level: str) -> str:
"""Calculate rollback urgency based on risk level"""
risk_to_urgency = {
"low": "low",
"medium": "medium",
"high": "high",
"critical": "emergency"
}
return risk_to_urgency.get(risk_level, "medium")
def _get_rollback_prerequisites(self, phase_name: str, phase_index: int) -> List[str]:
"""Get prerequisites for rollback phase"""
prerequisites = [
"Incident commander assigned and briefed",
"All team members notified of rollback initiation",
"Monitoring systems confirmed operational",
"Backup systems verified and accessible"
]
if phase_index > 0:
prerequisites.append("Previous rollback phase completed successfully")
if "cutover" in phase_name.lower():
prerequisites.extend([
"Traffic redirection capabilities confirmed",
"Load balancer configuration backed up",
"DNS changes prepared for quick execution"
])
if "data" in phase_name.lower() or "migration" in phase_name.lower():
prerequisites.extend([
"Database backup verified and accessible",
"Data validation queries prepared",
"Database administrator on standby"
])
return prerequisites
def _get_validation_checkpoints(self, phase_name: str, migration_type: str) -> List[str]:
"""Get validation checkpoints for rollback phase"""
checkpoints = [
f"{phase_name} rollback steps completed",
"System health checks passing",
"No critical errors in logs",
"Key metrics within acceptable ranges"
]
validation_commands = self.validation_templates.get(migration_type, [])
checkpoints.extend([f"Validation command passed: {cmd[:50]}..." for cmd in validation_commands[:3]])
return checkpoints
def _get_communication_requirements(self, phase_name: str, risk_level: str) -> List[str]:
"""Get communication requirements for rollback phase"""
base_requirements = [
"Notify incident commander of phase start/completion",
"Update rollback status dashboard",
"Log all actions and decisions"
]
if risk_level in ["high", "critical"]:
base_requirements.extend([
"Notify all stakeholders of phase progress",
"Update executive team if rollback extends beyond expected time",
"Prepare customer communication if needed"
])
if "cutover" in phase_name.lower():
base_requirements.append("Immediate notification when traffic is redirected")
return base_requirements
def generate_human_readable_runbook(self, runbook: RollbackRunbook) -> str:
"""Generate human-readable rollback runbook"""
output = []
output.append("=" * 80)
output.append(f"ROLLBACK RUNBOOK: {runbook.runbook_id}")
output.append("=" * 80)
output.append(f"Migration ID: {runbook.migration_id}")
output.append(f"Created: {runbook.created_at}")
output.append("")
# Emergency Contacts
output.append("EMERGENCY CONTACTS")
output.append("-" * 40)
for contact in runbook.emergency_contacts:
output.append(f"{contact['role']}: {contact['name']}")
output.append(f" Phone: {contact['primary_phone']}")
output.append(f" Email: {contact['email']}")
output.append(f" Backup: {contact['backup_contact']}")
output.append("")
# Escalation Matrix
output.append("ESCALATION MATRIX")
output.append("-" * 40)
for level, details in runbook.escalation_matrix.items():
output.append(f"{level.upper()}:")
output.append(f" Trigger: {details['trigger']}")
output.append(f" Response Time: {details['response_time_minutes']} minutes")
output.append(f" Contacts: {', '.join(details['contacts'])}")
output.append(f" Actions: {', '.join(details['actions'])}")
output.append("")
# Rollback Trigger Conditions
output.append("AUTOMATIC ROLLBACK TRIGGERS")
output.append("-" * 40)
for trigger in runbook.trigger_conditions:
output.append(f"{trigger.name}")
output.append(f" Condition: {trigger.condition}")
output.append(f" Auto-Execute: {'Yes' if trigger.auto_execute else 'No'}")
output.append(f" Evaluation Window: {trigger.evaluation_window_minutes} minutes")
output.append(f" Contacts: {', '.join(trigger.escalation_contacts)}")
output.append("")
# Rollback Phases
output.append("ROLLBACK PHASES")
output.append("-" * 40)
for i, phase in enumerate(runbook.rollback_phases, 1):
output.append(f"{i}. {phase.phase_name.upper()}")
output.append(f" Description: {phase.description}")
output.append(f" Urgency: {phase.urgency_level.upper()}")
output.append(f" Duration: {phase.estimated_duration_minutes} minutes")
output.append(f" Risk Level: {phase.risk_level.upper()}")
if phase.prerequisites:
output.append(" Prerequisites:")
for prereq in phase.prerequisites:
output.append(f"{prereq}")
output.append(" Steps:")
for step in sorted(phase.steps, key=lambda x: x.rollback_order):
output.append(f" {step.rollback_order}. {step.name}")
output.append(f" Duration: {step.estimated_duration_minutes} min")
output.append(f" Type: {step.script_type}")
if step.script_content and step.script_type != "manual":
output.append(" Script:")
for line in step.script_content.split('\n')[:3]: # Show first 3 lines
output.append(f" {line}")
if len(step.script_content.split('\n')) > 3:
output.append(" ...")
output.append(f" Success Criteria: {', '.join(step.success_criteria)}")
output.append("")
if phase.validation_checkpoints:
output.append(" Validation Checkpoints:")
for checkpoint in phase.validation_checkpoints:
output.append(f"{checkpoint}")
output.append("")
# Data Recovery Plan
output.append("DATA RECOVERY PLAN")
output.append("-" * 40)
drp = runbook.data_recovery_plan
output.append(f"Recovery Method: {drp.recovery_method}")
output.append(f"Backup Location: {drp.backup_location}")
output.append(f"Estimated Recovery Time: {drp.estimated_recovery_time_minutes} minutes")
output.append("Recovery Scripts:")
for script in drp.recovery_scripts:
output.append(f"{script}")
output.append("Validation Queries:")
for query in drp.data_validation_queries:
output.append(f"{query}")
output.append("")
# Validation Checklist
output.append("POST-ROLLBACK VALIDATION CHECKLIST")
output.append("-" * 40)
for i, item in enumerate(runbook.validation_checklist, 1):
output.append(f"{i:2d}. ☐ {item}")
output.append("")
# Post-Rollback Procedures
output.append("POST-ROLLBACK PROCEDURES")
output.append("-" * 40)
for i, procedure in enumerate(runbook.post_rollback_procedures, 1):
output.append(f"{i:2d}. {procedure}")
output.append("")
return "\n".join(output)
def main():
"""Main function with command line interface"""
parser = argparse.ArgumentParser(description="Generate comprehensive rollback runbooks from migration plans")
parser.add_argument("--input", "-i", required=True, help="Input migration plan file (JSON)")
parser.add_argument("--output", "-o", help="Output file for rollback runbook (JSON)")
parser.add_argument("--format", "-f", choices=["json", "text", "both"], default="both", help="Output format")
args = parser.parse_args()
try:
# Load migration plan
with open(args.input, 'r') as f:
migration_plan = json.load(f)
# Validate required fields
if "migration_id" not in migration_plan and "source" not in migration_plan:
print("Error: Migration plan must contain migration_id or source field", file=sys.stderr)
return 1
# Generate rollback runbook
generator = RollbackGenerator()
runbook = generator.generate_rollback_runbook(migration_plan)
# Output results
if args.format in ["json", "both"]:
runbook_dict = asdict(runbook)
if args.output:
with open(args.output, 'w') as f:
json.dump(runbook_dict, f, indent=2)
print(f"Rollback runbook saved to {args.output}")
else:
print(json.dumps(runbook_dict, indent=2))
if args.format in ["text", "both"]:
human_runbook = generator.generate_human_readable_runbook(runbook)
text_output = args.output.replace('.json', '.txt') if args.output else None
if text_output:
with open(text_output, 'w') as f:
f.write(human_runbook)
print(f"Human-readable runbook saved to {text_output}")
else:
print("\n" + "="*80)
print("HUMAN-READABLE ROLLBACK RUNBOOK")
print("="*80)
print(human_runbook)
except FileNotFoundError:
print(f"Error: Input file '{args.input}' not found", file=sys.stderr)
return 1
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in input file: {e}", file=sys.stderr)
return 1
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
sys.exit(main())