-
Notifications
You must be signed in to change notification settings - Fork 13
/
ingress.py
66 lines (58 loc) · 2.23 KB
/
ingress.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import sys, time, os, getopt, argparse
start_time = time.time()
from modules.main import ArgParse
from modules import process as k8s
from modules.logging import Logger
from modules.get_ingress import K8sIngress
from modules.get_ns import K8sNameSpace
class _Ingress:
def __init__(self, namespace, logger):
self.namespace = namespace
self.logger = logger
if not self.namespace:
self.namespace = 'all'
self.k8s_object_list = K8sIngress.get_ingress(self.namespace, self.logger)
if not len(self.k8s_object_list.items):
logger.warning("No ingress found!")
sys.exit()
self.k8s_object = 'ingress'
def ingress_count(self, v, l):
data, total_ing = [], 0
ns_list = K8sNameSpace.get_ns(self.logger)
headers = ['NAMESPACE', 'INGRESS']
for ns in ns_list.items:
ing_count = 0
for item in self.k8s_object_list.items:
if item.metadata.namespace == ns.metadata.name:
ing_count += 1
if ing_count: data.append([ns.metadata.name, ing_count])
for i in data:
total_ing = total_ing + i[1]
data = k8s.Output.append_hyphen(data, '-------')
data.append(["Total: " , total_ing])
k8s.Output.print_table(data, headers, True, l)
def list_ingress(self, v, l):
headers = ['NAMESPACE', 'INGRESS', 'RULES', 'HOST [SERVICE:PORT]']
k8s.IngCheck.list_ingress(self.k8s_object_list, self.k8s_object, \
headers, v, self.namespace, l, self.logger)
def call_all(v, namespace, l, logger):
call = _Ingress(namespace, logger)
call.ingress_count(v, l)
call.list_ingress(v, l)
def main():
args = ArgParse.arg_parse()
# args is [u, verbose, ns, l, format, silent]
logger = Logger.get_logger(args.format, args.silent)
if args:
call_all(args.verbose, args.namespace, args.logging, logger)
k8s.Output.time_taken(start_time)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print(k8s.Output.RED + "[ERROR] " \
+ k8s.Output.RESET + 'Interrupted from keyboard!')
try:
sys.exit(0)
except SystemExit:
os._exit(0)