add brain

This commit is contained in:
2026-03-12 15:17:52 +07:00
parent fd9f558fa1
commit e7821a7a9d
355 changed files with 93784 additions and 24 deletions

View File

@@ -0,0 +1,970 @@
#!/usr/bin/env python3
"""
Tech Debt Dashboard
Takes historical debt inventories (multiple scans over time) and generates trend analysis,
debt velocity (accruing vs paying down), health score, and executive summary.
Usage:
python debt_dashboard.py historical_data.json
python debt_dashboard.py data1.json data2.json data3.json
python debt_dashboard.py --input-dir ./debt_scans/ --output dashboard_report.json
python debt_dashboard.py historical_data.json --period quarterly --team-size 8
"""
import json
import argparse
import sys
import os
from collections import defaultdict, Counter
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, asdict
from statistics import mean, median, stdev
import re
@dataclass
class HealthMetrics:
"""Health metrics for a specific time period."""
overall_score: float # 0-100
debt_density: float # debt items per file
velocity_impact: float # estimated velocity reduction %
quality_score: float # 0-100
maintainability_score: float # 0-100
technical_risk_score: float # 0-100
@dataclass
class TrendAnalysis:
"""Trend analysis for debt metrics over time."""
metric_name: str
trend_direction: str # "improving", "declining", "stable"
change_rate: float # rate of change per period
correlation_strength: float # -1 to 1
forecast_next_period: float
confidence_interval: Tuple[float, float]
@dataclass
class DebtVelocity:
"""Debt velocity tracking - how fast debt is being created vs resolved."""
period: str
new_debt_items: int
resolved_debt_items: int
net_change: int
velocity_ratio: float # resolved/new, >1 is good
effort_hours_added: float
effort_hours_resolved: float
net_effort_change: float
class DebtDashboard:
"""Main dashboard class for debt trend analysis and reporting."""
def __init__(self, team_size: int = 5):
self.team_size = team_size
self.historical_data = []
self.processed_snapshots = []
self.trend_analyses = {}
self.health_history = []
self.velocity_history = []
# Configuration for health scoring
self.health_weights = {
"debt_density": 0.25,
"complexity_score": 0.20,
"test_coverage_proxy": 0.15,
"documentation_proxy": 0.10,
"security_score": 0.15,
"maintainability": 0.15
}
# Thresholds for categorization
self.thresholds = {
"excellent": 85,
"good": 70,
"fair": 55,
"poor": 40
}
def load_historical_data(self, file_paths: List[str]) -> bool:
"""Load multiple debt inventory files for historical analysis."""
self.historical_data = []
for file_path in file_paths:
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Normalize data format
if isinstance(data, dict) and 'debt_items' in data:
# Scanner output format
snapshot = {
"file_path": file_path,
"scan_date": data.get("scan_metadata", {}).get("scan_date",
self._extract_date_from_filename(file_path)),
"debt_items": data["debt_items"],
"summary": data.get("summary", {}),
"file_statistics": data.get("file_statistics", {})
}
elif isinstance(data, dict) and 'prioritized_backlog' in data:
# Prioritizer output format
snapshot = {
"file_path": file_path,
"scan_date": data.get("metadata", {}).get("analysis_date",
self._extract_date_from_filename(file_path)),
"debt_items": data["prioritized_backlog"],
"summary": data.get("insights", {}),
"file_statistics": {}
}
elif isinstance(data, list):
# Raw debt items array
snapshot = {
"file_path": file_path,
"scan_date": self._extract_date_from_filename(file_path),
"debt_items": data,
"summary": {},
"file_statistics": {}
}
else:
raise ValueError(f"Unrecognized data format in {file_path}")
self.historical_data.append(snapshot)
except Exception as e:
print(f"Error loading {file_path}: {e}")
continue
if not self.historical_data:
print("No valid data files loaded.")
return False
# Sort by date
self.historical_data.sort(key=lambda x: x["scan_date"])
print(f"Loaded {len(self.historical_data)} historical snapshots")
return True
def load_from_directory(self, directory_path: str, pattern: str = "*.json") -> bool:
"""Load all JSON files from a directory."""
directory = Path(directory_path)
if not directory.exists():
print(f"Directory does not exist: {directory_path}")
return False
file_paths = []
for file_path in directory.glob(pattern):
if file_path.is_file():
file_paths.append(str(file_path))
if not file_paths:
print(f"No matching files found in {directory_path}")
return False
return self.load_historical_data(file_paths)
def _extract_date_from_filename(self, file_path: str) -> str:
"""Extract date from filename if possible, otherwise use current date."""
filename = Path(file_path).name
# Try to find date patterns in filename
date_patterns = [
r"(\d{4}-\d{2}-\d{2})", # YYYY-MM-DD
r"(\d{4}\d{2}\d{2})", # YYYYMMDD
r"(\d{2}-\d{2}-\d{4})", # MM-DD-YYYY
]
for pattern in date_patterns:
match = re.search(pattern, filename)
if match:
date_str = match.group(1)
try:
if len(date_str) == 8: # YYYYMMDD
date_str = f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:]}"
datetime.strptime(date_str, "%Y-%m-%d")
return date_str + "T12:00:00"
except ValueError:
continue
# Fallback to file modification time
try:
mtime = os.path.getmtime(file_path)
return datetime.fromtimestamp(mtime).isoformat()
except:
return datetime.now().isoformat()
def generate_dashboard(self, period: str = "monthly") -> Dict[str, Any]:
"""
Generate comprehensive debt dashboard.
Args:
period: Analysis period ("weekly", "monthly", "quarterly")
Returns:
Dictionary containing dashboard data and analysis
"""
print(f"Generating debt dashboard for {len(self.historical_data)} snapshots...")
print(f"Analysis period: {period}")
print("=" * 50)
# Step 1: Process historical snapshots
self._process_snapshots()
# Step 2: Calculate health metrics for each snapshot
self._calculate_health_metrics()
# Step 3: Analyze trends
self._analyze_trends(period)
# Step 4: Calculate debt velocity
self._calculate_debt_velocity(period)
# Step 5: Generate forecasts
forecasts = self._generate_forecasts()
# Step 6: Create executive summary
executive_summary = self._generate_executive_summary()
# Step 7: Generate recommendations
recommendations = self._generate_strategic_recommendations()
# Step 8: Create visualizations data
visualizations = self._generate_visualization_data()
dashboard_data = {
"metadata": {
"generated_date": datetime.now().isoformat(),
"analysis_period": period,
"snapshots_analyzed": len(self.historical_data),
"date_range": {
"start": self.historical_data[0]["scan_date"] if self.historical_data else None,
"end": self.historical_data[-1]["scan_date"] if self.historical_data else None
},
"team_size": self.team_size
},
"executive_summary": executive_summary,
"current_health": self.health_history[-1] if self.health_history else None,
"trend_analysis": {name: asdict(trend) for name, trend in self.trend_analyses.items()},
"debt_velocity": [asdict(v) for v in self.velocity_history],
"forecasts": forecasts,
"recommendations": recommendations,
"visualizations": visualizations,
"detailed_metrics": self._get_detailed_metrics()
}
return dashboard_data
def _process_snapshots(self):
"""Process raw snapshots into standardized format."""
self.processed_snapshots = []
for snapshot in self.historical_data:
processed = {
"date": snapshot["scan_date"],
"total_debt_items": len(snapshot["debt_items"]),
"debt_by_type": Counter(item.get("type", "unknown") for item in snapshot["debt_items"]),
"debt_by_severity": Counter(item.get("severity", "medium") for item in snapshot["debt_items"]),
"debt_by_category": Counter(self._categorize_debt_item(item) for item in snapshot["debt_items"]),
"total_files": snapshot["summary"].get("total_files_scanned",
len(snapshot["file_statistics"])),
"total_effort_estimate": self._calculate_total_effort(snapshot["debt_items"]),
"high_priority_count": len([item for item in snapshot["debt_items"]
if self._is_high_priority(item)]),
"security_debt_count": len([item for item in snapshot["debt_items"]
if self._is_security_related(item)]),
"raw_data": snapshot
}
self.processed_snapshots.append(processed)
def _categorize_debt_item(self, item: Dict[str, Any]) -> str:
"""Categorize debt item into high-level categories."""
debt_type = item.get("type", "unknown")
categories = {
"code_quality": ["large_function", "high_complexity", "duplicate_code",
"long_line", "missing_docstring"],
"architecture": ["architecture_debt", "large_file"],
"security": ["security_risk", "hardcoded_secrets", "sql_injection_risk"],
"testing": ["test_debt", "missing_tests", "low_coverage"],
"maintenance": ["todo_comment", "commented_code"],
"dependencies": ["dependency_debt", "outdated_packages"],
"infrastructure": ["deployment_debt", "monitoring_gaps"],
"documentation": ["missing_docstring", "outdated_docs"]
}
for category, types in categories.items():
if debt_type in types:
return category
return "other"
def _calculate_total_effort(self, debt_items: List[Dict[str, Any]]) -> float:
"""Calculate total estimated effort for debt items."""
total_effort = 0.0
for item in debt_items:
# Try to get effort from existing analysis
if "effort_estimate" in item:
total_effort += item["effort_estimate"].get("hours_estimate", 0)
else:
# Estimate based on debt type and severity
effort = self._estimate_item_effort(item)
total_effort += effort
return total_effort
def _estimate_item_effort(self, item: Dict[str, Any]) -> float:
"""Estimate effort for a debt item."""
debt_type = item.get("type", "unknown")
severity = item.get("severity", "medium")
base_efforts = {
"todo_comment": 2,
"missing_docstring": 2,
"long_line": 1,
"large_function": 8,
"high_complexity": 16,
"duplicate_code": 12,
"large_file": 32,
"syntax_error": 4,
"security_risk": 20,
"architecture_debt": 80,
"test_debt": 16
}
base_effort = base_efforts.get(debt_type, 8)
severity_multipliers = {
"low": 0.5,
"medium": 1.0,
"high": 1.5,
"critical": 2.0
}
return base_effort * severity_multipliers.get(severity, 1.0)
def _is_high_priority(self, item: Dict[str, Any]) -> bool:
"""Determine if debt item is high priority."""
severity = item.get("severity", "medium")
priority_score = item.get("priority_score", 0)
debt_type = item.get("type", "")
return (severity in ["high", "critical"] or
priority_score >= 7 or
debt_type in ["security_risk", "syntax_error", "architecture_debt"])
def _is_security_related(self, item: Dict[str, Any]) -> bool:
"""Determine if debt item is security-related."""
debt_type = item.get("type", "")
description = item.get("description", "").lower()
security_types = ["security_risk", "hardcoded_secrets", "sql_injection_risk"]
security_keywords = ["password", "token", "key", "secret", "auth", "security"]
return (debt_type in security_types or
any(keyword in description for keyword in security_keywords))
def _calculate_health_metrics(self):
"""Calculate health metrics for each snapshot."""
self.health_history = []
for snapshot in self.processed_snapshots:
# Debt density (lower is better)
debt_density = snapshot["total_debt_items"] / max(1, snapshot["total_files"])
debt_density_score = max(0, 100 - (debt_density * 20)) # Scale to 0-100
# Complexity score (based on high complexity debt)
complex_debt_ratio = (snapshot["debt_by_type"].get("high_complexity", 0) +
snapshot["debt_by_type"].get("large_function", 0)) / max(1, snapshot["total_debt_items"])
complexity_score = max(0, 100 - (complex_debt_ratio * 100))
# Test coverage proxy (based on test debt)
test_debt_ratio = snapshot["debt_by_category"].get("testing", 0) / max(1, snapshot["total_debt_items"])
test_coverage_proxy = max(0, 100 - (test_debt_ratio * 150))
# Documentation proxy (based on documentation debt)
doc_debt_ratio = snapshot["debt_by_category"].get("documentation", 0) / max(1, snapshot["total_debt_items"])
documentation_proxy = max(0, 100 - (doc_debt_ratio * 100))
# Security score (based on security debt)
security_debt_ratio = snapshot["security_debt_count"] / max(1, snapshot["total_debt_items"])
security_score = max(0, 100 - (security_debt_ratio * 200))
# Maintainability (based on architecture and code quality debt)
maint_debt_count = (snapshot["debt_by_category"].get("architecture", 0) +
snapshot["debt_by_category"].get("code_quality", 0))
maint_debt_ratio = maint_debt_count / max(1, snapshot["total_debt_items"])
maintainability = max(0, 100 - (maint_debt_ratio * 120))
# Calculate weighted overall score
weights = self.health_weights
overall_score = (
debt_density_score * weights["debt_density"] +
complexity_score * weights["complexity_score"] +
test_coverage_proxy * weights["test_coverage_proxy"] +
documentation_proxy * weights["documentation_proxy"] +
security_score * weights["security_score"] +
maintainability * weights["maintainability"]
)
# Velocity impact (estimated percentage reduction in team velocity)
high_impact_ratio = snapshot["high_priority_count"] / max(1, snapshot["total_debt_items"])
velocity_impact = min(50, high_impact_ratio * 30 + debt_density * 5)
# Technical risk (0-100, higher is more risky)
risk_factors = snapshot["security_debt_count"] + snapshot["debt_by_type"].get("architecture_debt", 0)
technical_risk = min(100, risk_factors * 10 + (100 - security_score))
health_metrics = HealthMetrics(
overall_score=round(overall_score, 1),
debt_density=round(debt_density, 2),
velocity_impact=round(velocity_impact, 1),
quality_score=round((complexity_score + maintainability) / 2, 1),
maintainability_score=round(maintainability, 1),
technical_risk_score=round(technical_risk, 1)
)
# Add timestamp
health_entry = asdict(health_metrics)
health_entry["date"] = snapshot["date"]
self.health_history.append(health_entry)
def _analyze_trends(self, period: str):
"""Analyze trends in various metrics."""
self.trend_analyses = {}
if len(self.health_history) < 2:
return
# Define metrics to analyze
metrics_to_analyze = [
"overall_score",
"debt_density",
"velocity_impact",
"quality_score",
"technical_risk_score"
]
for metric in metrics_to_analyze:
values = [entry[metric] for entry in self.health_history]
dates = [datetime.fromisoformat(entry["date"].replace('Z', '+00:00'))
for entry in self.health_history]
trend = self._calculate_trend(values, dates, metric)
self.trend_analyses[metric] = trend
def _calculate_trend(self, values: List[float], dates: List[datetime], metric_name: str) -> TrendAnalysis:
"""Calculate trend analysis for a specific metric."""
if len(values) < 2:
return TrendAnalysis(metric_name, "stable", 0.0, 0.0, values[-1], (values[-1], values[-1]))
# Calculate simple linear trend
n = len(values)
x = list(range(n)) # Time periods as numbers
# Linear regression
x_mean = mean(x)
y_mean = mean(values)
numerator = sum((x[i] - x_mean) * (values[i] - y_mean) for i in range(n))
denominator = sum((x[i] - x_mean) ** 2 for i in range(n))
if denominator == 0:
slope = 0
else:
slope = numerator / denominator
# Correlation strength
if n > 2 and len(set(values)) > 1:
try:
correlation = numerator / (
(sum((x[i] - x_mean) ** 2 for i in range(n)) *
sum((values[i] - y_mean) ** 2 for i in range(n))) ** 0.5
)
except ZeroDivisionError:
correlation = 0.0
else:
correlation = 0.0
# Determine trend direction
if abs(slope) < 0.1:
trend_direction = "stable"
elif slope > 0:
if metric_name in ["overall_score", "quality_score"]:
trend_direction = "improving" # Higher is better
else:
trend_direction = "declining" # Higher is worse
else:
if metric_name in ["overall_score", "quality_score"]:
trend_direction = "declining"
else:
trend_direction = "improving"
# Forecast next period
forecast = values[-1] + slope
# Confidence interval (simple approach)
if n > 2:
residuals = [values[i] - (y_mean + slope * (x[i] - x_mean)) for i in range(n)]
std_error = (sum(r**2 for r in residuals) / (n - 2)) ** 0.5
confidence_interval = (forecast - std_error, forecast + std_error)
else:
confidence_interval = (forecast, forecast)
return TrendAnalysis(
metric_name=metric_name,
trend_direction=trend_direction,
change_rate=round(slope, 3),
correlation_strength=round(correlation, 3),
forecast_next_period=round(forecast, 2),
confidence_interval=(round(confidence_interval[0], 2), round(confidence_interval[1], 2))
)
def _calculate_debt_velocity(self, period: str):
"""Calculate debt velocity between snapshots."""
self.velocity_history = []
if len(self.processed_snapshots) < 2:
return
for i in range(1, len(self.processed_snapshots)):
current = self.processed_snapshots[i]
previous = self.processed_snapshots[i-1]
# Track debt by unique identifiers when possible
current_debt_ids = set()
previous_debt_ids = set()
current_effort = current["total_effort_estimate"]
previous_effort = previous["total_effort_estimate"]
# Simple approach: compare total counts and effort
debt_change = current["total_debt_items"] - previous["total_debt_items"]
effort_change = current_effort - previous_effort
# Estimate new vs resolved (rough approximation)
if debt_change >= 0:
new_debt_items = debt_change
resolved_debt_items = 0
else:
new_debt_items = 0
resolved_debt_items = abs(debt_change)
# Calculate velocity ratio
if new_debt_items > 0:
velocity_ratio = resolved_debt_items / new_debt_items
else:
velocity_ratio = float('inf') if resolved_debt_items > 0 else 1.0
velocity = DebtVelocity(
period=f"{previous['date'][:10]} to {current['date'][:10]}",
new_debt_items=new_debt_items,
resolved_debt_items=resolved_debt_items,
net_change=debt_change,
velocity_ratio=min(10.0, velocity_ratio), # Cap at 10 for display
effort_hours_added=max(0, effort_change),
effort_hours_resolved=max(0, -effort_change),
net_effort_change=effort_change
)
self.velocity_history.append(velocity)
def _generate_forecasts(self) -> Dict[str, Any]:
"""Generate forecasts based on trend analysis."""
if not self.trend_analyses:
return {}
forecasts = {}
# Overall health forecast
health_trend = self.trend_analyses.get("overall_score")
if health_trend:
current_score = self.health_history[-1]["overall_score"]
forecasts["health_score_3_months"] = max(0, min(100,
current_score + (health_trend.change_rate * 3)))
forecasts["health_score_6_months"] = max(0, min(100,
current_score + (health_trend.change_rate * 6)))
# Debt accumulation forecast
if self.velocity_history:
avg_net_change = mean([v.net_change for v in self.velocity_history[-3:]]) # Last 3 periods
current_debt = self.processed_snapshots[-1]["total_debt_items"]
forecasts["debt_count_3_months"] = max(0, current_debt + (avg_net_change * 3))
forecasts["debt_count_6_months"] = max(0, current_debt + (avg_net_change * 6))
# Risk forecast
risk_trend = self.trend_analyses.get("technical_risk_score")
if risk_trend:
current_risk = self.health_history[-1]["technical_risk_score"]
forecasts["risk_score_3_months"] = max(0, min(100,
current_risk + (risk_trend.change_rate * 3)))
return forecasts
def _generate_executive_summary(self) -> Dict[str, Any]:
"""Generate executive summary of debt status."""
if not self.health_history:
return {}
current_health = self.health_history[-1]
# Determine overall status
score = current_health["overall_score"]
if score >= self.thresholds["excellent"]:
status = "excellent"
status_message = "Code quality is excellent with minimal technical debt."
elif score >= self.thresholds["good"]:
status = "good"
status_message = "Code quality is good with manageable technical debt."
elif score >= self.thresholds["fair"]:
status = "fair"
status_message = "Code quality needs attention. Technical debt is accumulating."
else:
status = "poor"
status_message = "Critical: High levels of technical debt requiring immediate action."
# Key insights
insights = []
if len(self.health_history) > 1:
prev_health = self.health_history[-2]
score_change = current_health["overall_score"] - prev_health["overall_score"]
if score_change > 5:
insights.append("Health score improving significantly")
elif score_change < -5:
insights.append("Health score declining - attention needed")
if current_health["velocity_impact"] > 20:
insights.append("High velocity impact detected - development speed affected")
if current_health["technical_risk_score"] > 70:
insights.append("High technical risk - security and stability concerns")
# Debt velocity insight
if self.velocity_history:
recent_velocity = self.velocity_history[-1]
if recent_velocity.velocity_ratio < 0.5:
insights.append("Debt accumulating faster than resolution")
elif recent_velocity.velocity_ratio > 1.5:
insights.append("Good progress on debt reduction")
return {
"overall_status": status,
"health_score": current_health["overall_score"],
"status_message": status_message,
"key_insights": insights,
"total_debt_items": self.processed_snapshots[-1]["total_debt_items"] if self.processed_snapshots else 0,
"estimated_effort_hours": self.processed_snapshots[-1]["total_effort_estimate"] if self.processed_snapshots else 0,
"high_priority_items": self.processed_snapshots[-1]["high_priority_count"] if self.processed_snapshots else 0,
"velocity_impact_percent": current_health["velocity_impact"]
}
def _generate_strategic_recommendations(self) -> List[Dict[str, Any]]:
"""Generate strategic recommendations for debt management."""
recommendations = []
if not self.health_history:
return recommendations
current_health = self.health_history[-1]
current_snapshot = self.processed_snapshots[-1] if self.processed_snapshots else {}
# Health-based recommendations
if current_health["overall_score"] < 50:
recommendations.append({
"priority": "critical",
"category": "immediate_action",
"title": "Initiate Emergency Debt Reduction",
"description": "Current health score is critically low. Consider dedicating 50%+ of development capacity to debt reduction.",
"impact": "high",
"effort": "high"
})
# Velocity impact recommendations
if current_health["velocity_impact"] > 25:
recommendations.append({
"priority": "high",
"category": "productivity",
"title": "Address Velocity Blockers",
"description": f"Technical debt is reducing team velocity by {current_health['velocity_impact']:.1f}%. Focus on high-impact debt items first.",
"impact": "high",
"effort": "medium"
})
# Security recommendations
if current_health["technical_risk_score"] > 70:
recommendations.append({
"priority": "high",
"category": "security",
"title": "Security Debt Review Required",
"description": "High technical risk score indicates security vulnerabilities. Conduct immediate security debt audit.",
"impact": "high",
"effort": "medium"
})
# Trend-based recommendations
health_trend = self.trend_analyses.get("overall_score")
if health_trend and health_trend.trend_direction == "declining":
recommendations.append({
"priority": "medium",
"category": "process",
"title": "Implement Debt Prevention Measures",
"description": "Health score is declining over time. Establish coding standards, automated quality gates, and regular debt reviews.",
"impact": "medium",
"effort": "medium"
})
# Category-specific recommendations
if current_snapshot:
debt_by_category = current_snapshot["debt_by_category"]
top_category = debt_by_category.most_common(1)[0] if debt_by_category else None
if top_category and top_category[1] > 10:
category, count = top_category
recommendations.append({
"priority": "medium",
"category": "focus_area",
"title": f"Focus on {category.replace('_', ' ').title()} Debt",
"description": f"{category.replace('_', ' ').title()} represents the largest debt category ({count} items). Consider targeted initiatives.",
"impact": "medium",
"effort": "medium"
})
# Velocity-based recommendations
if self.velocity_history:
recent_velocities = self.velocity_history[-3:] if len(self.velocity_history) >= 3 else self.velocity_history
avg_velocity_ratio = mean([v.velocity_ratio for v in recent_velocities])
if avg_velocity_ratio < 0.8:
recommendations.append({
"priority": "medium",
"category": "capacity",
"title": "Increase Debt Resolution Capacity",
"description": "Debt is accumulating faster than resolution. Consider increasing debt budget or improving resolution efficiency.",
"impact": "medium",
"effort": "low"
})
return recommendations
def _generate_visualization_data(self) -> Dict[str, Any]:
"""Generate data for dashboard visualizations."""
visualizations = {}
# Health score timeline
visualizations["health_timeline"] = [
{
"date": entry["date"][:10], # Date only
"overall_score": entry["overall_score"],
"quality_score": entry["quality_score"],
"technical_risk": entry["technical_risk_score"]
}
for entry in self.health_history
]
# Debt accumulation trend
visualizations["debt_accumulation"] = [
{
"date": snapshot["date"][:10],
"total_debt": snapshot["total_debt_items"],
"high_priority": snapshot["high_priority_count"],
"security_debt": snapshot["security_debt_count"]
}
for snapshot in self.processed_snapshots
]
# Category distribution (latest snapshot)
if self.processed_snapshots:
latest_categories = self.processed_snapshots[-1]["debt_by_category"]
visualizations["category_distribution"] = [
{"category": category, "count": count}
for category, count in latest_categories.items()
]
# Velocity chart
visualizations["debt_velocity"] = [
{
"period": velocity.period,
"new_items": velocity.new_debt_items,
"resolved_items": velocity.resolved_debt_items,
"net_change": velocity.net_change,
"velocity_ratio": velocity.velocity_ratio
}
for velocity in self.velocity_history
]
# Effort estimation trend
visualizations["effort_trend"] = [
{
"date": snapshot["date"][:10],
"total_effort": snapshot["total_effort_estimate"]
}
for snapshot in self.processed_snapshots
]
return visualizations
def _get_detailed_metrics(self) -> Dict[str, Any]:
"""Get detailed metrics for the current state."""
if not self.processed_snapshots:
return {}
current = self.processed_snapshots[-1]
return {
"debt_breakdown": dict(current["debt_by_type"]),
"severity_breakdown": dict(current["debt_by_severity"]),
"category_breakdown": dict(current["debt_by_category"]),
"files_analyzed": current["total_files"],
"debt_density": current["total_debt_items"] / max(1, current["total_files"]),
"average_effort_per_item": current["total_effort_estimate"] / max(1, current["total_debt_items"])
}
def format_dashboard_report(dashboard_data: Dict[str, Any]) -> str:
"""Format dashboard data into human-readable report."""
output = []
# Header
output.append("=" * 60)
output.append("TECHNICAL DEBT DASHBOARD")
output.append("=" * 60)
metadata = dashboard_data["metadata"]
output.append(f"Generated: {metadata['generated_date'][:19]}")
output.append(f"Analysis Period: {metadata['analysis_period']}")
output.append(f"Snapshots Analyzed: {metadata['snapshots_analyzed']}")
if metadata["date_range"]["start"]:
output.append(f"Date Range: {metadata['date_range']['start'][:10]} to {metadata['date_range']['end'][:10]}")
output.append("")
# Executive Summary
exec_summary = dashboard_data["executive_summary"]
output.append("EXECUTIVE SUMMARY")
output.append("-" * 30)
output.append(f"Overall Status: {exec_summary['overall_status'].upper()}")
output.append(f"Health Score: {exec_summary['health_score']:.1f}/100")
output.append(f"Status: {exec_summary['status_message']}")
output.append("")
output.append("Key Metrics:")
output.append(f" • Total Debt Items: {exec_summary['total_debt_items']}")
output.append(f" • High Priority Items: {exec_summary['high_priority_items']}")
output.append(f" • Estimated Effort: {exec_summary['estimated_effort_hours']:.1f} hours")
output.append(f" • Velocity Impact: {exec_summary['velocity_impact_percent']:.1f}%")
output.append("")
if exec_summary["key_insights"]:
output.append("Key Insights:")
for insight in exec_summary["key_insights"]:
output.append(f"{insight}")
output.append("")
# Current Health
if dashboard_data["current_health"]:
health = dashboard_data["current_health"]
output.append("CURRENT HEALTH METRICS")
output.append("-" * 30)
output.append(f"Overall Score: {health['overall_score']:.1f}/100")
output.append(f"Quality Score: {health['quality_score']:.1f}/100")
output.append(f"Maintainability: {health['maintainability_score']:.1f}/100")
output.append(f"Technical Risk: {health['technical_risk_score']:.1f}/100")
output.append(f"Debt Density: {health['debt_density']:.2f} items/file")
output.append("")
# Trend Analysis
trends = dashboard_data["trend_analysis"]
if trends:
output.append("TREND ANALYSIS")
output.append("-" * 30)
for metric, trend in trends.items():
direction_symbol = {
"improving": "",
"declining": "",
"stable": ""
}.get(trend["trend_direction"], "")
output.append(f"{metric.replace('_', ' ').title()}: {direction_symbol} {trend['trend_direction']}")
output.append(f" Change Rate: {trend['change_rate']:.3f} per period")
output.append(f" Forecast: {trend['forecast_next_period']:.1f}")
output.append("")
# Top Recommendations
recommendations = dashboard_data["recommendations"]
if recommendations:
output.append("TOP RECOMMENDATIONS")
output.append("-" * 30)
for i, rec in enumerate(recommendations[:5], 1):
output.append(f"{i}. [{rec['priority'].upper()}] {rec['title']}")
output.append(f" {rec['description']}")
output.append(f" Impact: {rec['impact']}, Effort: {rec['effort']}")
output.append("")
return "\n".join(output)
def main():
"""Main entry point for the debt dashboard."""
parser = argparse.ArgumentParser(description="Generate technical debt dashboard")
parser.add_argument("files", nargs="*", help="Debt inventory files")
parser.add_argument("--input-dir", help="Directory containing debt inventory files")
parser.add_argument("--output", help="Output file path")
parser.add_argument("--format", choices=["json", "text", "both"],
default="both", help="Output format")
parser.add_argument("--period", choices=["weekly", "monthly", "quarterly"],
default="monthly", help="Analysis period")
parser.add_argument("--team-size", type=int, default=5, help="Team size")
args = parser.parse_args()
# Initialize dashboard
dashboard = DebtDashboard(args.team_size)
# Load data
if args.input_dir:
success = dashboard.load_from_directory(args.input_dir)
elif args.files:
success = dashboard.load_historical_data(args.files)
else:
print("Error: Must specify either files or --input-dir")
sys.exit(1)
if not success:
sys.exit(1)
# Generate dashboard
try:
dashboard_data = dashboard.generate_dashboard(args.period)
except Exception as e:
print(f"Dashboard generation failed: {e}")
sys.exit(1)
# Output results
if args.format in ["json", "both"]:
json_output = json.dumps(dashboard_data, indent=2, default=str)
if args.output:
output_path = args.output if args.output.endswith('.json') else f"{args.output}.json"
with open(output_path, 'w') as f:
f.write(json_output)
print(f"JSON dashboard written to: {output_path}")
else:
print("JSON DASHBOARD:")
print("=" * 50)
print(json_output)
if args.format in ["text", "both"]:
text_output = format_dashboard_report(dashboard_data)
if args.output:
output_path = args.output if args.output.endswith('.txt') else f"{args.output}.txt"
with open(output_path, 'w') as f:
f.write(text_output)
print(f"Text dashboard written to: {output_path}")
else:
print("\nTEXT DASHBOARD:")
print("=" * 50)
print(text_output)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,857 @@
#!/usr/bin/env python3
"""
Tech Debt Prioritizer
Takes a debt inventory (from scanner or manual JSON) and calculates interest rate,
effort estimates, and produces a prioritized backlog with recommended sprint allocation.
Uses cost-of-delay vs effort scoring and various prioritization frameworks.
Usage:
python debt_prioritizer.py debt_inventory.json
python debt_prioritizer.py debt_inventory.json --output prioritized_backlog.json
python debt_prioritizer.py debt_inventory.json --team-size 6 --sprint-capacity 80
python debt_prioritizer.py debt_inventory.json --framework wsjf --output results.json
"""
import json
import argparse
import sys
import math
from collections import defaultdict, Counter
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, asdict
@dataclass
class EffortEstimate:
"""Represents effort estimation for a debt item."""
size_points: int
hours_estimate: float
risk_factor: float # 1.0 = low risk, 1.5 = medium, 2.0+ = high
skill_level_required: str # junior, mid, senior, expert
confidence: float # 0.0-1.0
@dataclass
class BusinessImpact:
"""Represents business impact assessment for a debt item."""
customer_impact: int # 1-10 scale
revenue_impact: int # 1-10 scale
team_velocity_impact: int # 1-10 scale
quality_impact: int # 1-10 scale
security_impact: int # 1-10 scale
@dataclass
class InterestRate:
"""Represents the interest rate calculation for technical debt."""
daily_cost: float # cost per day if left unfixed
frequency_multiplier: float # how often this code is touched
team_impact_multiplier: float # how many developers affected
compound_rate: float # how quickly this debt makes other debt worse
class DebtPrioritizer:
"""Main class for prioritizing technical debt items."""
def __init__(self, team_size: int = 5, sprint_capacity_hours: int = 80):
self.team_size = team_size
self.sprint_capacity_hours = sprint_capacity_hours
self.debt_items = []
self.prioritized_items = []
# Prioritization framework weights
self.framework_weights = {
"cost_of_delay": {
"business_value": 0.3,
"urgency": 0.3,
"risk_reduction": 0.2,
"team_productivity": 0.2
},
"wsjf": {
"business_value": 0.25,
"time_criticality": 0.25,
"risk_reduction": 0.25,
"effort": 0.25
},
"rice": {
"reach": 0.25,
"impact": 0.25,
"confidence": 0.25,
"effort": 0.25
}
}
def load_debt_inventory(self, file_path: str) -> bool:
"""Load debt inventory from JSON file."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Handle different input formats
if isinstance(data, dict) and 'debt_items' in data:
self.debt_items = data['debt_items']
elif isinstance(data, list):
self.debt_items = data
else:
raise ValueError("Invalid debt inventory format")
print(f"Loaded {len(self.debt_items)} debt items from {file_path}")
return True
except Exception as e:
print(f"Error loading debt inventory: {e}")
return False
def analyze_and_prioritize(self, framework: str = "cost_of_delay") -> Dict[str, Any]:
"""
Analyze debt items and create prioritized backlog.
Args:
framework: Prioritization framework to use
Returns:
Dictionary containing prioritized backlog and analysis
"""
print(f"Analyzing {len(self.debt_items)} debt items...")
print(f"Using {framework} prioritization framework")
print("=" * 50)
# Step 1: Enrich debt items with estimates
enriched_items = []
for item in self.debt_items:
enriched_item = self._enrich_debt_item(item)
enriched_items.append(enriched_item)
# Step 2: Calculate prioritization scores
for item in enriched_items:
if framework == "cost_of_delay":
item["priority_score"] = self._calculate_cost_of_delay_score(item)
elif framework == "wsjf":
item["priority_score"] = self._calculate_wsjf_score(item)
elif framework == "rice":
item["priority_score"] = self._calculate_rice_score(item)
else:
raise ValueError(f"Unknown prioritization framework: {framework}")
# Step 3: Sort by priority score
self.prioritized_items = sorted(enriched_items,
key=lambda x: x["priority_score"],
reverse=True)
# Step 4: Generate sprint allocation recommendations
sprint_allocation = self._generate_sprint_allocation()
# Step 5: Generate insights and recommendations
insights = self._generate_insights()
# Step 6: Create visualization data
charts_data = self._generate_charts_data()
return {
"metadata": {
"analysis_date": datetime.now().isoformat(),
"framework_used": framework,
"team_size": self.team_size,
"sprint_capacity_hours": self.sprint_capacity_hours,
"total_items_analyzed": len(self.debt_items)
},
"prioritized_backlog": self.prioritized_items,
"sprint_allocation": sprint_allocation,
"insights": insights,
"charts_data": charts_data,
"recommendations": self._generate_recommendations()
}
def _enrich_debt_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
"""Enrich debt item with detailed estimates and impact analysis."""
enriched = item.copy()
# Generate effort estimate
effort = self._estimate_effort(item)
enriched["effort_estimate"] = asdict(effort)
# Generate business impact assessment
business_impact = self._assess_business_impact(item)
enriched["business_impact"] = asdict(business_impact)
# Calculate interest rate
interest_rate = self._calculate_interest_rate(item, business_impact)
enriched["interest_rate"] = asdict(interest_rate)
# Calculate cost of delay
enriched["cost_of_delay"] = self._calculate_cost_of_delay(interest_rate, effort)
# Assign categories and tags
enriched["category"] = self._categorize_debt_item(item)
enriched["impact_tags"] = self._generate_impact_tags(item, business_impact)
return enriched
def _estimate_effort(self, item: Dict[str, Any]) -> EffortEstimate:
"""Estimate effort required to fix debt item."""
debt_type = item.get("type", "unknown")
severity = item.get("severity", "medium")
# Base effort estimation by debt type
base_efforts = {
"todo_comment": (1, 2),
"missing_docstring": (1, 4),
"long_line": (0.5, 1),
"large_function": (4, 16),
"high_complexity": (8, 32),
"duplicate_code": (6, 24),
"large_file": (16, 64),
"syntax_error": (2, 8),
"security_risk": (4, 40),
"architecture_debt": (40, 160),
"test_debt": (8, 40),
"dependency_debt": (4, 24)
}
min_hours, max_hours = base_efforts.get(debt_type, (4, 16))
# Adjust by severity
severity_multipliers = {
"low": 0.5,
"medium": 1.0,
"high": 1.5,
"critical": 2.0
}
multiplier = severity_multipliers.get(severity, 1.0)
hours_estimate = (min_hours + max_hours) / 2 * multiplier
# Convert to story points (assuming 6 hours per point)
size_points = max(1, round(hours_estimate / 6))
# Determine risk factor
risk_factor = 1.0
if debt_type in ["architecture_debt", "security_risk", "large_file"]:
risk_factor = 1.8
elif debt_type in ["high_complexity", "duplicate_code"]:
risk_factor = 1.4
elif debt_type in ["syntax_error", "dependency_debt"]:
risk_factor = 1.2
# Determine skill level required
skill_requirements = {
"architecture_debt": "expert",
"security_risk": "senior",
"high_complexity": "senior",
"large_function": "mid",
"duplicate_code": "mid",
"dependency_debt": "mid",
"test_debt": "mid",
"todo_comment": "junior",
"missing_docstring": "junior",
"long_line": "junior"
}
skill_level = skill_requirements.get(debt_type, "mid")
# Confidence based on debt type clarity
confidence_levels = {
"todo_comment": 0.9,
"missing_docstring": 0.9,
"long_line": 0.95,
"syntax_error": 0.8,
"large_function": 0.7,
"duplicate_code": 0.6,
"high_complexity": 0.5,
"architecture_debt": 0.3,
"security_risk": 0.4
}
confidence = confidence_levels.get(debt_type, 0.6)
return EffortEstimate(
size_points=size_points,
hours_estimate=hours_estimate,
risk_factor=risk_factor,
skill_level_required=skill_level,
confidence=confidence
)
def _assess_business_impact(self, item: Dict[str, Any]) -> BusinessImpact:
"""Assess business impact of debt item."""
debt_type = item.get("type", "unknown")
severity = item.get("severity", "medium")
# Base impact scores by debt type (1-10 scale)
impact_profiles = {
"security_risk": (9, 8, 7, 9, 10), # customer, revenue, velocity, quality, security
"architecture_debt": (6, 7, 9, 8, 4),
"large_function": (3, 4, 7, 6, 2),
"high_complexity": (4, 5, 8, 7, 3),
"duplicate_code": (3, 4, 6, 6, 2),
"syntax_error": (7, 6, 8, 9, 3),
"test_debt": (5, 5, 7, 8, 3),
"dependency_debt": (6, 5, 6, 7, 7),
"todo_comment": (1, 1, 2, 2, 1),
"missing_docstring": (2, 2, 4, 3, 1)
}
base_impacts = impact_profiles.get(debt_type, (3, 3, 5, 5, 3))
# Adjust by severity
severity_adjustments = {
"low": 0.6,
"medium": 1.0,
"high": 1.4,
"critical": 1.8
}
adjustment = severity_adjustments.get(severity, 1.0)
# Apply adjustment and cap at 10
adjusted_impacts = [min(10, max(1, round(impact * adjustment)))
for impact in base_impacts]
return BusinessImpact(
customer_impact=adjusted_impacts[0],
revenue_impact=adjusted_impacts[1],
team_velocity_impact=adjusted_impacts[2],
quality_impact=adjusted_impacts[3],
security_impact=adjusted_impacts[4]
)
def _calculate_interest_rate(self, item: Dict[str, Any],
business_impact: BusinessImpact) -> InterestRate:
"""Calculate interest rate for technical debt."""
# Base daily cost calculation
velocity_impact = business_impact.team_velocity_impact
quality_impact = business_impact.quality_impact
# Daily cost in "developer hours lost"
daily_cost = (velocity_impact * 0.5) + (quality_impact * 0.3)
# Frequency multiplier based on code location and type
file_path = item.get("file_path", "")
debt_type = item.get("type", "unknown")
# Estimate frequency based on file path patterns
frequency_multiplier = 1.0
if any(pattern in file_path.lower() for pattern in ["main", "core", "auth", "api"]):
frequency_multiplier = 2.0
elif any(pattern in file_path.lower() for pattern in ["util", "helper", "common"]):
frequency_multiplier = 1.5
elif any(pattern in file_path.lower() for pattern in ["test", "spec", "config"]):
frequency_multiplier = 0.5
# Team impact multiplier
team_impact_multiplier = min(self.team_size, 8) / 5.0 # Normalize around team of 5
# Compound rate - how this debt creates more debt
compound_rates = {
"architecture_debt": 0.1, # Creates 10% more debt monthly
"duplicate_code": 0.08,
"high_complexity": 0.05,
"large_function": 0.03,
"test_debt": 0.04,
"security_risk": 0.02, # Doesn't compound much, but high initial impact
"todo_comment": 0.01
}
compound_rate = compound_rates.get(debt_type, 0.02)
return InterestRate(
daily_cost=daily_cost,
frequency_multiplier=frequency_multiplier,
team_impact_multiplier=team_impact_multiplier,
compound_rate=compound_rate
)
def _calculate_cost_of_delay(self, interest_rate: InterestRate,
effort: EffortEstimate) -> float:
"""Calculate total cost of delay if debt is not fixed."""
# Estimate delay in days (assuming debt gets fixed eventually)
estimated_delay_days = effort.hours_estimate / (self.sprint_capacity_hours / 14) # 2-week sprints
# Calculate cumulative cost
daily_cost = (interest_rate.daily_cost *
interest_rate.frequency_multiplier *
interest_rate.team_impact_multiplier)
# Add compound interest effect
compound_effect = (1 + interest_rate.compound_rate) ** (estimated_delay_days / 30)
total_cost = daily_cost * estimated_delay_days * compound_effect
return round(total_cost, 2)
def _categorize_debt_item(self, item: Dict[str, Any]) -> str:
"""Categorize debt item into high-level categories."""
debt_type = item.get("type", "unknown")
categories = {
"code_quality": ["large_function", "high_complexity", "duplicate_code",
"long_line", "missing_docstring"],
"architecture": ["architecture_debt", "large_file"],
"security": ["security_risk", "hardcoded_secrets"],
"testing": ["test_debt", "missing_tests"],
"maintenance": ["todo_comment", "commented_code"],
"dependencies": ["dependency_debt", "outdated_packages"],
"infrastructure": ["deployment_debt", "monitoring_gaps"],
"documentation": ["missing_docstring", "outdated_docs"]
}
for category, types in categories.items():
if debt_type in types:
return category
return "other"
def _generate_impact_tags(self, item: Dict[str, Any],
business_impact: BusinessImpact) -> List[str]:
"""Generate impact tags for debt item."""
tags = []
if business_impact.security_impact >= 7:
tags.append("security-critical")
if business_impact.customer_impact >= 7:
tags.append("customer-facing")
if business_impact.revenue_impact >= 7:
tags.append("revenue-impact")
if business_impact.team_velocity_impact >= 7:
tags.append("velocity-blocker")
if business_impact.quality_impact >= 7:
tags.append("quality-risk")
# Add effort-based tags
effort_hours = item.get("effort_estimate", {}).get("hours_estimate", 0)
if effort_hours <= 4:
tags.append("quick-win")
elif effort_hours >= 40:
tags.append("major-initiative")
return tags
def _calculate_cost_of_delay_score(self, item: Dict[str, Any]) -> float:
"""Calculate priority score using cost-of-delay framework."""
business_impact = item["business_impact"]
effort = item["effort_estimate"]
# Business value (weighted average of impacts)
business_value = (
business_impact["customer_impact"] * 0.3 +
business_impact["revenue_impact"] * 0.3 +
business_impact["quality_impact"] * 0.2 +
business_impact["team_velocity_impact"] * 0.2
)
# Urgency (how quickly value decreases)
urgency = item["interest_rate"]["daily_cost"] * 10 # Scale to 1-10
urgency = min(10, max(1, urgency))
# Risk reduction
risk_reduction = business_impact["security_impact"] * 0.6 + business_impact["quality_impact"] * 0.4
# Team productivity impact
team_productivity = business_impact["team_velocity_impact"]
# Combine with weights
weights = self.framework_weights["cost_of_delay"]
numerator = (
business_value * weights["business_value"] +
urgency * weights["urgency"] +
risk_reduction * weights["risk_reduction"] +
team_productivity * weights["team_productivity"]
)
# Divide by effort (adjusted for risk)
effort_adjusted = effort["hours_estimate"] * effort["risk_factor"]
denominator = max(1, effort_adjusted / 8) # Normalize to story points
return round(numerator / denominator, 2)
def _calculate_wsjf_score(self, item: Dict[str, Any]) -> float:
"""Calculate priority score using Weighted Shortest Job First (WSJF)."""
business_impact = item["business_impact"]
effort = item["effort_estimate"]
# Business value
business_value = (
business_impact["customer_impact"] * 0.4 +
business_impact["revenue_impact"] * 0.6
)
# Time criticality
time_criticality = item["cost_of_delay"] / 10 # Normalize
time_criticality = min(10, max(1, time_criticality))
# Risk reduction
risk_reduction = (
business_impact["security_impact"] * 0.5 +
business_impact["quality_impact"] * 0.5
)
# Job size (effort)
job_size = effort["size_points"]
# WSJF calculation
numerator = business_value + time_criticality + risk_reduction
denominator = max(1, job_size)
return round(numerator / denominator, 2)
def _calculate_rice_score(self, item: Dict[str, Any]) -> float:
"""Calculate priority score using RICE framework."""
business_impact = item["business_impact"]
effort = item["effort_estimate"]
# Reach (how many developers/users affected)
reach = min(10, self.team_size * business_impact["team_velocity_impact"] / 5)
# Impact
impact = (
business_impact["customer_impact"] * 0.3 +
business_impact["revenue_impact"] * 0.3 +
business_impact["quality_impact"] * 0.4
)
# Confidence
confidence = effort["confidence"] * 10
# Effort
effort_score = effort["size_points"]
# RICE calculation
rice_score = (reach * impact * confidence) / max(1, effort_score)
return round(rice_score, 2)
def _generate_sprint_allocation(self) -> Dict[str, Any]:
"""Generate sprint allocation recommendations."""
# Calculate total effort needed
total_effort_hours = sum(item["effort_estimate"]["hours_estimate"]
for item in self.prioritized_items)
# Assume 20% of sprint capacity goes to tech debt
debt_capacity_per_sprint = self.sprint_capacity_hours * 0.2
# Allocate items to sprints
sprints = []
current_sprint = {"sprint_number": 1, "items": [], "total_hours": 0, "capacity_used": 0}
for item in self.prioritized_items:
item_effort = item["effort_estimate"]["hours_estimate"]
if current_sprint["total_hours"] + item_effort <= debt_capacity_per_sprint:
current_sprint["items"].append(item)
current_sprint["total_hours"] += item_effort
current_sprint["capacity_used"] = current_sprint["total_hours"] / debt_capacity_per_sprint
else:
# Start new sprint
sprints.append(current_sprint)
current_sprint = {
"sprint_number": len(sprints) + 1,
"items": [item],
"total_hours": item_effort,
"capacity_used": item_effort / debt_capacity_per_sprint
}
# Add the last sprint
if current_sprint["items"]:
sprints.append(current_sprint)
# Calculate summary statistics
total_sprints_needed = len(sprints)
high_priority_items = len([item for item in self.prioritized_items
if item.get("priority", "medium") in ["high", "critical"]])
return {
"total_debt_hours": round(total_effort_hours, 1),
"debt_capacity_per_sprint": debt_capacity_per_sprint,
"total_sprints_needed": total_sprints_needed,
"high_priority_items": high_priority_items,
"sprint_plan": sprints[:6], # Show first 6 sprints
"recommendations": [
f"Allocate {debt_capacity_per_sprint} hours per sprint to tech debt",
f"Focus on {high_priority_items} high-priority items first",
f"Estimated {total_sprints_needed} sprints to clear current backlog"
]
}
def _generate_insights(self) -> Dict[str, Any]:
"""Generate insights from the prioritized debt analysis."""
# Category distribution
categories = Counter(item["category"] for item in self.prioritized_items)
# Effort distribution
total_effort = sum(item["effort_estimate"]["hours_estimate"]
for item in self.prioritized_items)
effort_by_category = defaultdict(float)
for item in self.prioritized_items:
effort_by_category[item["category"]] += item["effort_estimate"]["hours_estimate"]
# Priority distribution
priorities = Counter()
for item in self.prioritized_items:
score = item["priority_score"]
if score >= 8:
priorities["critical"] += 1
elif score >= 5:
priorities["high"] += 1
elif score >= 2:
priorities["medium"] += 1
else:
priorities["low"] += 1
# Risk analysis
high_risk_items = [item for item in self.prioritized_items
if item["effort_estimate"]["risk_factor"] >= 1.5]
# Quick wins identification
quick_wins = [item for item in self.prioritized_items
if (item["effort_estimate"]["hours_estimate"] <= 8 and
item["priority_score"] >= 3)]
# Cost analysis
total_cost_of_delay = sum(item["cost_of_delay"] for item in self.prioritized_items)
avg_interest_rate = sum(item["interest_rate"]["daily_cost"]
for item in self.prioritized_items) / len(self.prioritized_items)
return {
"category_distribution": dict(categories),
"total_effort_hours": round(total_effort, 1),
"effort_by_category": {k: round(v, 1) for k, v in effort_by_category.items()},
"priority_distribution": dict(priorities),
"high_risk_items_count": len(high_risk_items),
"quick_wins_count": len(quick_wins),
"total_cost_of_delay": round(total_cost_of_delay, 1),
"average_daily_interest_rate": round(avg_interest_rate, 2),
"top_categories_by_effort": sorted(effort_by_category.items(),
key=lambda x: x[1], reverse=True)[:3]
}
def _generate_charts_data(self) -> Dict[str, Any]:
"""Generate data for charts and visualizations."""
# Priority vs Effort scatter plot data
scatter_data = []
for item in self.prioritized_items:
scatter_data.append({
"x": item["effort_estimate"]["hours_estimate"],
"y": item["priority_score"],
"label": item.get("description", "")[:50],
"category": item["category"],
"size": item["cost_of_delay"]
})
# Category effort distribution (pie chart)
effort_by_category = defaultdict(float)
for item in self.prioritized_items:
effort_by_category[item["category"]] += item["effort_estimate"]["hours_estimate"]
pie_data = [{"category": k, "effort": round(v, 1)}
for k, v in effort_by_category.items()]
# Priority timeline (bar chart)
timeline_data = []
cumulative_effort = 0
for i, item in enumerate(self.prioritized_items[:20]): # Top 20 items
cumulative_effort += item["effort_estimate"]["hours_estimate"]
timeline_data.append({
"item_rank": i + 1,
"description": item.get("description", "")[:30],
"effort": item["effort_estimate"]["hours_estimate"],
"cumulative_effort": round(cumulative_effort, 1),
"priority_score": item["priority_score"]
})
# Interest rate trend (line chart data structure)
interest_trend_data = []
for i, item in enumerate(self.prioritized_items):
interest_trend_data.append({
"item_index": i,
"daily_cost": item["interest_rate"]["daily_cost"],
"category": item["category"]
})
return {
"priority_effort_scatter": scatter_data,
"category_effort_distribution": pie_data,
"priority_timeline": timeline_data,
"interest_rate_trend": interest_trend_data[:50] # Limit for performance
}
def _generate_recommendations(self) -> List[str]:
"""Generate actionable recommendations based on analysis."""
recommendations = []
insights = self._generate_insights()
# Quick wins recommendation
if insights["quick_wins_count"] > 0:
recommendations.append(
f"Start with {insights['quick_wins_count']} quick wins to build momentum "
"and demonstrate immediate value from tech debt reduction efforts."
)
# High-risk items
if insights["high_risk_items_count"] > 5:
recommendations.append(
f"Plan careful execution for {insights['high_risk_items_count']} high-risk items. "
"Consider pair programming, extra testing, and incremental approaches."
)
# Category focus
top_category = insights["top_categories_by_effort"][0][0]
recommendations.append(
f"Focus initial efforts on '{top_category}' category debt, which represents "
f"the largest effort investment ({insights['top_categories_by_effort'][0][1]:.1f} hours)."
)
# Cost of delay urgency
if insights["average_daily_interest_rate"] > 5:
recommendations.append(
f"High average daily interest rate ({insights['average_daily_interest_rate']:.1f}) "
"suggests urgent action needed. Consider increasing tech debt budget allocation."
)
# Sprint planning
sprints_needed = len(self.prioritized_items) / 10 # Rough estimate
if sprints_needed > 12:
recommendations.append(
"Large debt backlog detected. Consider dedicating entire sprints to debt reduction "
"rather than trying to fit debt work around features."
)
# Team capacity
total_effort = insights["total_effort_hours"]
weeks_needed = total_effort / (self.sprint_capacity_hours * 0.2)
if weeks_needed > 26: # Half a year
recommendations.append(
f"With current capacity allocation, debt backlog will take {weeks_needed:.0f} weeks. "
"Consider increasing tech debt budget or focusing on highest-impact items only."
)
return recommendations
def format_prioritized_report(analysis_result: Dict[str, Any]) -> str:
"""Format the prioritization analysis in human-readable format."""
output = []
# Header
output.append("=" * 60)
output.append("TECHNICAL DEBT PRIORITIZATION REPORT")
output.append("=" * 60)
metadata = analysis_result["metadata"]
output.append(f"Analysis Date: {metadata['analysis_date']}")
output.append(f"Framework: {metadata['framework_used'].upper()}")
output.append(f"Team Size: {metadata['team_size']}")
output.append(f"Sprint Capacity: {metadata['sprint_capacity_hours']} hours")
output.append("")
# Executive Summary
insights = analysis_result["insights"]
output.append("EXECUTIVE SUMMARY")
output.append("-" * 30)
output.append(f"Total Debt Items: {metadata['total_items_analyzed']}")
output.append(f"Total Effort Required: {insights['total_effort_hours']} hours")
output.append(f"Total Cost of Delay: ${insights['total_cost_of_delay']:,.0f}")
output.append(f"Quick Wins Available: {insights['quick_wins_count']}")
output.append(f"High-Risk Items: {insights['high_risk_items_count']}")
output.append("")
# Sprint Plan
sprint_plan = analysis_result["sprint_allocation"]
output.append("SPRINT ALLOCATION PLAN")
output.append("-" * 30)
output.append(f"Sprints Needed: {sprint_plan['total_sprints_needed']}")
output.append(f"Hours per Sprint: {sprint_plan['debt_capacity_per_sprint']}")
output.append("")
for sprint in sprint_plan["sprint_plan"][:3]: # Show first 3 sprints
output.append(f"Sprint {sprint['sprint_number']} ({sprint['capacity_used']:.0%} capacity):")
for item in sprint["items"][:3]: # Top 3 items per sprint
output.append(f"{item['description'][:50]}...")
output.append(f" Effort: {item['effort_estimate']['hours_estimate']:.1f}h, "
f"Priority: {item['priority_score']}")
output.append("")
# Top Priority Items
output.append("TOP 10 PRIORITY ITEMS")
output.append("-" * 30)
for i, item in enumerate(analysis_result["prioritized_backlog"][:10], 1):
output.append(f"{i}. [{item['priority_score']:.1f}] {item['description']}")
output.append(f" Category: {item['category']}, "
f"Effort: {item['effort_estimate']['hours_estimate']:.1f}h, "
f"Cost of Delay: ${item['cost_of_delay']:.0f}")
if item["impact_tags"]:
output.append(f" Tags: {', '.join(item['impact_tags'])}")
output.append("")
# Recommendations
output.append("RECOMMENDATIONS")
output.append("-" * 30)
for i, rec in enumerate(analysis_result["recommendations"], 1):
output.append(f"{i}. {rec}")
output.append("")
return "\n".join(output)
def main():
"""Main entry point for the debt prioritizer."""
parser = argparse.ArgumentParser(description="Prioritize technical debt backlog")
parser.add_argument("inventory_file", help="Path to debt inventory JSON file")
parser.add_argument("--output", help="Output file path")
parser.add_argument("--format", choices=["json", "text", "both"],
default="both", help="Output format")
parser.add_argument("--framework", choices=["cost_of_delay", "wsjf", "rice"],
default="cost_of_delay", help="Prioritization framework")
parser.add_argument("--team-size", type=int, default=5, help="Team size")
parser.add_argument("--sprint-capacity", type=int, default=80,
help="Sprint capacity in hours")
args = parser.parse_args()
# Initialize prioritizer
prioritizer = DebtPrioritizer(args.team_size, args.sprint_capacity)
# Load inventory
if not prioritizer.load_debt_inventory(args.inventory_file):
sys.exit(1)
# Analyze and prioritize
try:
analysis_result = prioritizer.analyze_and_prioritize(args.framework)
except Exception as e:
print(f"Analysis failed: {e}")
sys.exit(1)
# Output results
if args.format in ["json", "both"]:
json_output = json.dumps(analysis_result, indent=2, default=str)
if args.output:
output_path = args.output if args.output.endswith('.json') else f"{args.output}.json"
with open(output_path, 'w') as f:
f.write(json_output)
print(f"JSON report written to: {output_path}")
else:
print("JSON REPORT:")
print("=" * 50)
print(json_output)
if args.format in ["text", "both"]:
text_output = format_prioritized_report(analysis_result)
if args.output:
output_path = args.output if args.output.endswith('.txt') else f"{args.output}.txt"
with open(output_path, 'w') as f:
f.write(text_output)
print(f"Text report written to: {output_path}")
else:
print("\nTEXT REPORT:")
print("=" * 50)
print(text_output)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,684 @@
#!/usr/bin/env python3
"""
Tech Debt Scanner
Scans a codebase directory for tech debt signals using AST parsing (Python) and
regex patterns (any language). Detects various forms of technical debt and generates
both JSON inventory and human-readable reports.
Usage:
python debt_scanner.py /path/to/codebase
python debt_scanner.py /path/to/codebase --config config.json
python debt_scanner.py /path/to/codebase --output report.json --format both
"""
import ast
import json
import argparse
import os
import re
import sys
from collections import defaultdict, Counter
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any, Optional, Set, Tuple
class DebtScanner:
"""Main scanner class for detecting technical debt in codebases."""
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = self._load_default_config()
if config:
self.config.update(config)
self.debt_items = []
self.stats = defaultdict(int)
self.file_stats = {}
# Compile regex patterns for performance
self._compile_patterns()
def _load_default_config(self) -> Dict[str, Any]:
"""Load default configuration for debt detection."""
return {
"max_function_length": 50,
"max_complexity": 10,
"max_nesting_depth": 4,
"max_file_size_lines": 500,
"min_duplicate_lines": 3,
"ignore_patterns": [
"*.pyc", "__pycache__", ".git", ".svn", "node_modules",
"build", "dist", "*.min.js", "*.map"
],
"file_extensions": {
"python": [".py"],
"javascript": [".js", ".jsx", ".ts", ".tsx"],
"java": [".java"],
"csharp": [".cs"],
"cpp": [".cpp", ".cc", ".cxx", ".c", ".h", ".hpp"],
"ruby": [".rb"],
"php": [".php"],
"go": [".go"],
"rust": [".rs"],
"kotlin": [".kt"]
},
"comment_patterns": {
"todo": r"(?i)(TODO|FIXME|HACK|XXX|BUG)[\s:]*(.+)",
"commented_code": r"^\s*#.*[=(){}\[\];].*",
"magic_numbers": r"\b\d{2,}\b",
"long_strings": r'["\'](.{100,})["\']'
},
"severity_weights": {
"critical": 10,
"high": 7,
"medium": 5,
"low": 2,
"info": 1
}
}
def _compile_patterns(self):
"""Compile regex patterns for better performance."""
self.comment_regexes = {}
for name, pattern in self.config["comment_patterns"].items():
self.comment_regexes[name] = re.compile(pattern)
# Common code smells patterns
self.smell_patterns = {
"empty_catch": re.compile(r"except[^:]*:\s*pass\s*$", re.MULTILINE),
"print_debug": re.compile(r"print\s*\([^)]*debug[^)]*\)", re.IGNORECASE),
"hardcoded_paths": re.compile(r'["\'][/\\][^"\']*[/\\][^"\']*["\']'),
"sql_injection_risk": re.compile(r'["\'].*%s.*["\'].*execute', re.IGNORECASE),
}
def scan_directory(self, directory: str) -> Dict[str, Any]:
"""
Scan a directory for tech debt.
Args:
directory: Path to the directory to scan
Returns:
Dictionary containing debt inventory and statistics
"""
directory_path = Path(directory)
if not directory_path.exists():
raise ValueError(f"Directory does not exist: {directory}")
print(f"Scanning directory: {directory}")
print("=" * 50)
# Reset state
self.debt_items = []
self.stats = defaultdict(int)
self.file_stats = {}
# Walk through directory
for root, dirs, files in os.walk(directory):
# Filter out ignored directories
dirs[:] = [d for d in dirs if not self._should_ignore(d)]
for file in files:
if self._should_ignore(file):
continue
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, directory)
try:
self._scan_file(file_path, relative_path)
except Exception as e:
print(f"Error scanning {relative_path}: {e}")
self.stats["scan_errors"] += 1
# Post-process results
self._detect_duplicates(directory)
self._calculate_priorities()
return self._generate_report(directory)
def _should_ignore(self, name: str) -> bool:
"""Check if file/directory should be ignored."""
for pattern in self.config["ignore_patterns"]:
if "*" in pattern:
if re.match(pattern.replace("*", ".*"), name):
return True
elif pattern in name:
return True
return False
def _scan_file(self, file_path: str, relative_path: str):
"""Scan a single file for tech debt."""
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
lines = content.splitlines()
except Exception as e:
print(f"Cannot read {relative_path}: {e}")
return
file_ext = Path(file_path).suffix.lower()
file_info = {
"path": relative_path,
"lines": len(lines),
"size_kb": os.path.getsize(file_path) / 1024,
"language": self._detect_language(file_ext),
"debt_count": 0
}
self.stats["files_scanned"] += 1
self.stats["total_lines"] += len(lines)
# File size debt
if len(lines) > self.config["max_file_size_lines"]:
self._add_debt_item(
"large_file",
f"File is too large: {len(lines)} lines",
relative_path,
"medium",
{"lines": len(lines), "recommended_max": self.config["max_file_size_lines"]}
)
file_info["debt_count"] += 1
# Language-specific analysis
if file_info["language"] == "python" and file_ext == ".py":
self._scan_python_file(relative_path, content, lines)
else:
self._scan_generic_file(relative_path, content, lines, file_info["language"])
# Common patterns for all languages
self._scan_common_patterns(relative_path, content, lines)
self.file_stats[relative_path] = file_info
def _detect_language(self, file_ext: str) -> str:
"""Detect programming language from file extension."""
for lang, extensions in self.config["file_extensions"].items():
if file_ext in extensions:
return lang
return "unknown"
def _scan_python_file(self, file_path: str, content: str, lines: List[str]):
"""Scan Python files using AST parsing."""
try:
tree = ast.parse(content)
analyzer = PythonASTAnalyzer(self.config)
debt_items = analyzer.analyze(tree, file_path, lines)
self.debt_items.extend(debt_items)
self.stats["python_files"] += 1
except SyntaxError as e:
self._add_debt_item(
"syntax_error",
f"Python syntax error: {e}",
file_path,
"high",
{"line": e.lineno, "error": str(e)}
)
def _scan_generic_file(self, file_path: str, content: str, lines: List[str], language: str):
"""Scan non-Python files using pattern matching."""
# Detect long lines
for i, line in enumerate(lines):
if len(line) > 120:
self._add_debt_item(
"long_line",
f"Line too long: {len(line)} characters",
file_path,
"low",
{"line_number": i + 1, "length": len(line)}
)
# Detect deep nesting (approximate)
for i, line in enumerate(lines):
indent_level = len(line) - len(line.lstrip())
if language in ["python"]:
indent_level = indent_level // 4 # Python uses 4-space indents
elif language in ["javascript", "java", "csharp", "cpp"]:
# Count braces for brace-based languages
brace_level = content[:content.find('\n'.join(lines[:i+1]))].count('{') - content[:content.find('\n'.join(lines[:i+1]))].count('}')
if brace_level > self.config["max_nesting_depth"]:
self._add_debt_item(
"deep_nesting",
f"Deep nesting detected: {brace_level} levels",
file_path,
"medium",
{"line_number": i + 1, "nesting_level": brace_level}
)
def _scan_common_patterns(self, file_path: str, content: str, lines: List[str]):
"""Scan for common patterns across all file types."""
# TODO/FIXME comments
for i, line in enumerate(lines):
for pattern_name, regex in self.comment_regexes.items():
match = regex.search(line)
if match:
if pattern_name == "todo":
self._add_debt_item(
"todo_comment",
f"TODO/FIXME comment: {match.group(0)}",
file_path,
"low",
{"line_number": i + 1, "comment": match.group(0).strip()}
)
# Code smells
for smell_name, pattern in self.smell_patterns.items():
matches = pattern.finditer(content)
for match in matches:
line_num = content[:match.start()].count('\n') + 1
self._add_debt_item(
smell_name,
f"Code smell detected: {smell_name}",
file_path,
"medium",
{"line_number": line_num, "pattern": match.group(0)[:100]}
)
def _detect_duplicates(self, directory: str):
"""Detect duplicate code blocks across files."""
# Simple duplicate detection based on exact line matches
line_hashes = defaultdict(list)
for file_path, file_info in self.file_stats.items():
try:
full_path = os.path.join(directory, file_path)
with open(full_path, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
for i in range(len(lines) - self.config["min_duplicate_lines"] + 1):
block = ''.join(lines[i:i + self.config["min_duplicate_lines"]])
block_hash = hash(block.strip())
if len(block.strip()) > 50: # Only consider substantial blocks
line_hashes[block_hash].append((file_path, i + 1, block))
except Exception:
continue
# Report duplicates
for block_hash, occurrences in line_hashes.items():
if len(occurrences) > 1:
for file_path, line_num, block in occurrences:
self._add_debt_item(
"duplicate_code",
f"Duplicate code block found in {len(occurrences)} files",
file_path,
"medium",
{
"line_number": line_num,
"duplicate_count": len(occurrences),
"other_files": [f[0] for f in occurrences if f[0] != file_path]
}
)
def _calculate_priorities(self):
"""Calculate priority scores for debt items."""
severity_weights = self.config["severity_weights"]
for item in self.debt_items:
base_score = severity_weights.get(item["severity"], 1)
# Adjust based on debt type
type_multipliers = {
"syntax_error": 2.0,
"security_risk": 1.8,
"large_function": 1.5,
"high_complexity": 1.4,
"duplicate_code": 1.3,
"todo_comment": 0.5
}
multiplier = type_multipliers.get(item["type"], 1.0)
item["priority_score"] = int(base_score * multiplier)
# Set priority category
if item["priority_score"] >= 15:
item["priority"] = "critical"
elif item["priority_score"] >= 10:
item["priority"] = "high"
elif item["priority_score"] >= 5:
item["priority"] = "medium"
else:
item["priority"] = "low"
def _add_debt_item(self, debt_type: str, description: str, file_path: str,
severity: str, metadata: Dict[str, Any]):
"""Add a debt item to the inventory."""
item = {
"id": f"DEBT-{len(self.debt_items) + 1:04d}",
"type": debt_type,
"description": description,
"file_path": file_path,
"severity": severity,
"metadata": metadata,
"detected_date": datetime.now().isoformat(),
"status": "identified"
}
self.debt_items.append(item)
self.stats[f"debt_{debt_type}"] += 1
self.stats["total_debt_items"] += 1
if file_path in self.file_stats:
self.file_stats[file_path]["debt_count"] += 1
def _generate_report(self, directory: str) -> Dict[str, Any]:
"""Generate the final debt report."""
# Sort debt items by priority score
self.debt_items.sort(key=lambda x: x.get("priority_score", 0), reverse=True)
# Calculate summary statistics
priority_counts = Counter(item["priority"] for item in self.debt_items)
type_counts = Counter(item["type"] for item in self.debt_items)
# Calculate health score (0-100, higher is better)
total_files = self.stats.get("files_scanned", 1)
debt_density = len(self.debt_items) / total_files
health_score = max(0, 100 - (debt_density * 10))
report = {
"scan_metadata": {
"directory": directory,
"scan_date": datetime.now().isoformat(),
"scanner_version": "1.0.0",
"config": self.config
},
"summary": {
"total_files_scanned": self.stats.get("files_scanned", 0),
"total_lines_scanned": self.stats.get("total_lines", 0),
"total_debt_items": len(self.debt_items),
"health_score": round(health_score, 1),
"debt_density": round(debt_density, 2),
"priority_breakdown": dict(priority_counts),
"type_breakdown": dict(type_counts)
},
"debt_items": self.debt_items,
"file_statistics": self.file_stats,
"recommendations": self._generate_recommendations()
}
return report
def _generate_recommendations(self) -> List[str]:
"""Generate actionable recommendations based on findings."""
recommendations = []
# Priority-based recommendations
high_priority_count = len([item for item in self.debt_items
if item.get("priority") in ["critical", "high"]])
if high_priority_count > 10:
recommendations.append(
f"Address {high_priority_count} high-priority debt items immediately - "
"they pose significant risk to code quality and maintainability."
)
# Type-specific recommendations
type_counts = Counter(item["type"] for item in self.debt_items)
if type_counts.get("large_function", 0) > 5:
recommendations.append(
"Consider refactoring large functions into smaller, more focused units. "
"This will improve readability and testability."
)
if type_counts.get("duplicate_code", 0) > 3:
recommendations.append(
"Extract duplicate code into reusable functions or modules. "
"This reduces maintenance burden and potential for inconsistent changes."
)
if type_counts.get("todo_comment", 0) > 20:
recommendations.append(
"Review and address TODO/FIXME comments. Consider creating proper "
"tickets for substantial work items."
)
# General recommendations
total_files = self.stats.get("files_scanned", 1)
if len(self.debt_items) / total_files > 2:
recommendations.append(
"High debt density detected. Consider establishing coding standards "
"and regular code review processes to prevent debt accumulation."
)
if not recommendations:
recommendations.append("Code quality looks good! Continue current practices.")
return recommendations
class PythonASTAnalyzer(ast.NodeVisitor):
"""AST analyzer for Python-specific debt detection."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.debt_items = []
self.current_file = ""
self.lines = []
self.function_stack = []
def analyze(self, tree: ast.AST, file_path: str, lines: List[str]) -> List[Dict[str, Any]]:
"""Analyze Python AST for tech debt."""
self.debt_items = []
self.current_file = file_path
self.lines = lines
self.function_stack = []
self.visit(tree)
return self.debt_items
def visit_FunctionDef(self, node: ast.FunctionDef):
"""Analyze function definitions."""
self.function_stack.append(node.name)
# Calculate function length
func_length = node.end_lineno - node.lineno + 1
if func_length > self.config["max_function_length"]:
self._add_debt(
"large_function",
f"Function '{node.name}' is too long: {func_length} lines",
node.lineno,
"medium",
{"function_name": node.name, "length": func_length}
)
# Check for missing docstring
if not ast.get_docstring(node):
self._add_debt(
"missing_docstring",
f"Function '{node.name}' missing docstring",
node.lineno,
"low",
{"function_name": node.name}
)
# Calculate cyclomatic complexity
complexity = self._calculate_complexity(node)
if complexity > self.config["max_complexity"]:
self._add_debt(
"high_complexity",
f"Function '{node.name}' has high complexity: {complexity}",
node.lineno,
"high",
{"function_name": node.name, "complexity": complexity}
)
# Check parameter count
param_count = len(node.args.args)
if param_count > 5:
self._add_debt(
"too_many_parameters",
f"Function '{node.name}' has too many parameters: {param_count}",
node.lineno,
"medium",
{"function_name": node.name, "parameter_count": param_count}
)
self.generic_visit(node)
self.function_stack.pop()
def visit_ClassDef(self, node: ast.ClassDef):
"""Analyze class definitions."""
# Check for missing docstring
if not ast.get_docstring(node):
self._add_debt(
"missing_docstring",
f"Class '{node.name}' missing docstring",
node.lineno,
"low",
{"class_name": node.name}
)
# Check for too many methods
methods = [n for n in node.body if isinstance(n, ast.FunctionDef)]
if len(methods) > 20:
self._add_debt(
"large_class",
f"Class '{node.name}' has too many methods: {len(methods)}",
node.lineno,
"medium",
{"class_name": node.name, "method_count": len(methods)}
)
self.generic_visit(node)
def _calculate_complexity(self, node: ast.FunctionDef) -> int:
"""Calculate cyclomatic complexity of a function."""
complexity = 1 # Base complexity
for child in ast.walk(node):
if isinstance(child, (ast.If, ast.While, ast.For, ast.AsyncFor)):
complexity += 1
elif isinstance(child, ast.ExceptHandler):
complexity += 1
elif isinstance(child, ast.BoolOp):
complexity += len(child.values) - 1
return complexity
def _add_debt(self, debt_type: str, description: str, line_number: int,
severity: str, metadata: Dict[str, Any]):
"""Add a debt item to the collection."""
item = {
"id": f"DEBT-{len(self.debt_items) + 1:04d}",
"type": debt_type,
"description": description,
"file_path": self.current_file,
"line_number": line_number,
"severity": severity,
"metadata": metadata,
"detected_date": datetime.now().isoformat(),
"status": "identified"
}
self.debt_items.append(item)
def format_human_readable_report(report: Dict[str, Any]) -> str:
"""Format the report in human-readable format."""
output = []
# Header
output.append("=" * 60)
output.append("TECHNICAL DEBT SCAN REPORT")
output.append("=" * 60)
output.append(f"Directory: {report['scan_metadata']['directory']}")
output.append(f"Scan Date: {report['scan_metadata']['scan_date']}")
output.append(f"Scanner Version: {report['scan_metadata']['scanner_version']}")
output.append("")
# Summary
summary = report["summary"]
output.append("SUMMARY")
output.append("-" * 30)
output.append(f"Files Scanned: {summary['total_files_scanned']}")
output.append(f"Lines Scanned: {summary['total_lines_scanned']:,}")
output.append(f"Total Debt Items: {summary['total_debt_items']}")
output.append(f"Health Score: {summary['health_score']}/100")
output.append(f"Debt Density: {summary['debt_density']} items/file")
output.append("")
# Priority breakdown
output.append("PRIORITY BREAKDOWN")
output.append("-" * 30)
for priority, count in summary["priority_breakdown"].items():
output.append(f"{priority.capitalize()}: {count}")
output.append("")
# Top debt items
output.append("TOP DEBT ITEMS")
output.append("-" * 30)
top_items = report["debt_items"][:10]
for i, item in enumerate(top_items, 1):
output.append(f"{i}. [{item['priority'].upper()}] {item['description']}")
output.append(f" File: {item['file_path']}")
if 'line_number' in item:
output.append(f" Line: {item['line_number']}")
output.append("")
# Recommendations
output.append("RECOMMENDATIONS")
output.append("-" * 30)
for i, rec in enumerate(report["recommendations"], 1):
output.append(f"{i}. {rec}")
output.append("")
return "\n".join(output)
def main():
"""Main entry point for the debt scanner."""
parser = argparse.ArgumentParser(description="Scan codebase for technical debt")
parser.add_argument("directory", help="Directory to scan")
parser.add_argument("--config", help="Configuration file (JSON)")
parser.add_argument("--output", help="Output file path")
parser.add_argument("--format", choices=["json", "text", "both"],
default="both", help="Output format")
args = parser.parse_args()
# Load configuration
config = None
if args.config:
try:
with open(args.config, 'r') as f:
config = json.load(f)
except Exception as e:
print(f"Error loading config: {e}")
sys.exit(1)
# Run scan
scanner = DebtScanner(config)
try:
report = scanner.scan_directory(args.directory)
except Exception as e:
print(f"Scan failed: {e}")
sys.exit(1)
# Output results
if args.format in ["json", "both"]:
json_output = json.dumps(report, indent=2, default=str)
if args.output:
output_path = args.output if args.output.endswith('.json') else f"{args.output}.json"
with open(output_path, 'w') as f:
f.write(json_output)
print(f"JSON report written to: {output_path}")
else:
print("\nJSON REPORT:")
print("=" * 50)
print(json_output)
if args.format in ["text", "both"]:
text_output = format_human_readable_report(report)
if args.output:
output_path = args.output if args.output.endswith('.txt') else f"{args.output}.txt"
with open(output_path, 'w') as f:
f.write(text_output)
print(f"Text report written to: {output_path}")
else:
print("\nTEXT REPORT:")
print("=" * 50)
print(text_output)
if __name__ == "__main__":
main()