-
Notifications
You must be signed in to change notification settings - Fork 1
/
find_task.py
122 lines (86 loc) · 3.1 KB
/
find_task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/python3
"""
find tasks in large codebase
"""
import argparse
import textwrap
import pprint
import glob
import yaml
import os.path
import re
"""
ignore vaulted items.
"""
class Vault(yaml.YAMLObject):
yaml_tag = u'!vault'
def __init__(self,value):
pass
def __new__(self,tag,value):
return value
def __repr__(self, value):
#print('%s' % self.name)
return "%s(value=%)" % ( self.__class__.__name__, self.value )
yaml.SafeLoader.add_constructor(u'!vault', Vault)
yaml.SafeLoader.add_constructor(u'!vault-encrypted', Vault)
def find_yaml(directory):
yaml_files = []
shell_scripts = []
python_files = []
if os.path.isfile(directory):
yaml_files.append( directory )
else:
excluded = set(['.git'])
for root, dirs, files in os.walk(directory, topdown=True):
dirs[:] = [d for d in dirs if d not in excluded]
for file in files:
if file.endswith((".yaml",".yml",".json",".jsn")):
yaml_files.append( os.path.join(root, file) )
return yaml_files
def yaml_print( output ):
print( yaml.dump(output[0]) )
print( yaml.dump(output[1]) )
def search_tasks(yaml_content, pattern, filepath):
ansible_tasks = []
if isinstance(yaml_content, dict):
for key, value in yaml_content.items():
if isinstance(value, list):
for task in value:
if isinstance(task, dict):
for taskitem in task:
if pattern.search(str(task[taskitem])):
ansible_tasks.append((filepath, task))
elif isinstance(value, (dict, list)):
for taskitem in value:
if pattern.search(str(value[taskitem])):
ansible_tasks.append((filepath, value))
elif isinstance(yaml_content, list):
for item in yaml_content:
if isinstance(item, (dict, list)):
ansible_tasks.extend(search_tasks(item, pattern, filepath))
return ansible_tasks
def find_ansible_tasks(directory, regex):
"""
Find all YAML files in the directory, although.. could be json!?
"""
yaml_files = find_yaml(directory)
ansible_tasks = []
# Compile the regular expression
pattern = re.compile(regex)
for yaml_file in yaml_files:
with open(yaml_file, 'r') as file:
yaml_content = yaml.safe_load(file)
ansible_tasks.extend(search_tasks(yaml_content,pattern,yaml_file) )
return ansible_tasks
def main():
parser = argparse.ArgumentParser(description='Find Ansible tasks in YAML files.')
parser.add_argument('-p', '--path', required=True, help='Path to directory containing YAML files')
parser.add_argument('-r', '--regex', required=True, help='Regular expression pattern to match tasks')
args = parser.parse_args()
tasks = find_ansible_tasks(args.path, args.regex)
print("Matching Ansible tasks:")
for task in tasks:
yaml_print(task)
if __name__ == "__main__":
main()