-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcloudformation_parser.py
More file actions
109 lines (89 loc) · 3.28 KB
/
Copy pathcloudformation_parser.py
File metadata and controls
109 lines (89 loc) · 3.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
"""CloudFormation change set parser."""
from __future__ import annotations
import json
from typing import Any
from .models import ChangeAction, ChangeSource, DeployPlan, ResourceChange
# CloudFormation action mapping
CFN_ACTION_MAP: dict[str, ChangeAction] = {
"Add": ChangeAction.CREATE,
"Modify": ChangeAction.UPDATE,
"Remove": ChangeAction.DELETE,
"Import": ChangeAction.IMPORT,
}
# CloudFormation replacement mapping
CFN_REPLACEMENT_MAP: dict[str, bool] = {
"True": True,
"true": True,
"Conditional": True,
"False": False,
"false": False,
}
def parse_cloudformation_changeset(changeset_json: str | dict[str, Any]) -> DeployPlan:
"""Parse a CloudFormation change set into a DeployPlan.
Accepts the JSON output of `aws cloudformation describe-change-set`
or a change set JSON file.
Args:
changeset_json: Path to JSON file, raw JSON string, or parsed dict.
Returns:
DeployPlan with parsed resource changes.
"""
if isinstance(changeset_json, str):
try:
data = json.loads(changeset_json)
except json.JSONDecodeError:
with open(changeset_json) as f:
data = json.load(f)
else:
data = changeset_json
changes: list[ResourceChange] = []
changes_list = data.get("Changes", data.get("changes", []))
for change_entry in changes_list:
resource_change_data = change_entry.get(
"ResourceChange", change_entry.get("resource_change", {})
)
action_str = change_entry.get(
"Action", resource_change_data.get("Action", "Modify")
)
action = CFN_ACTION_MAP.get(action_str, ChangeAction.UPDATE)
# Check if this is a replacement
replacement = resource_change_data.get("Replacement", "")
if CFN_REPLACEMENT_MAP.get(str(replacement), False):
action = ChangeAction.REPLACE
resource_type = resource_change_data.get(
"Type", resource_change_data.get("ResourceType", "unknown")
)
resource_name = resource_change_data.get(
"LogicalResourceId",
resource_change_data.get("PhysicalResourceId", "unknown"),
)
address = resource_change_data.get(
"LogicalResourceId", f"{resource_type}.{resource_name}"
)
# Scope details for update changes
resource_change_data.get("Scope", [])
details = resource_change_data.get("Details", [])
before = {}
after = {}
for detail in details:
target = detail.get("Target", {})
attr = target.get("Attribute", "")
if attr:
before[attr] = target.get("BeforeValue", "N/A")
after[attr] = target.get("AfterValue", "N/A")
resource_change = ResourceChange(
address=address,
action=action,
resource_type=resource_type,
resource_name=resource_name,
source=ChangeSource.CLOUDFORMATION,
before=before or None,
after=after or None,
provider="aws",
)
changes.append(resource_change)
data.get("StackName", data.get("ChangeSetName", "unknown"))
return DeployPlan(
source=ChangeSource.CLOUDFORMATION,
changes=changes,
raw_data=data,
)