-
Notifications
You must be signed in to change notification settings - Fork 0
/
ecom-visitor-logs.py
152 lines (112 loc) · 5.5 KB
/
ecom-visitor-logs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#!/usr/bin/python
import time
import datetime
import pytz
import numpy as np
import random
import gzip
import zipfile
import sys
import argparse
from faker import Faker
from random import randrange
from tzlocal import get_localzone
local = get_localzone()
import json
from kafka import KafkaProducer
class switch(object):
def __init__(self, value):
self.value = value
self.fall = False
def __iter__(self):
yield self.match
raise StopIteration
def match(self, *args):
if self.fall or not args:
return True
elif self.value in args:
self.fall = True
return True
else:
return False
#Parsing Arguments.
parser = argparse.ArgumentParser(__file__, description="User Visit Generator")
parser.add_argument("--output", "-o", dest='output_type', help="Write to a Log file, a gzip file or to STDOUT", choices=['LOG','GZ','CONSOLE'] )
parser.add_argument("--log-format", "-l", dest='log_format', help="Log format, Common or Extended Log Format ", choices=['CLF','ELF',"common"])
parser.add_argument("--num", "-n", dest='num_lines', help="Number of lines to generate (0 for infinite)", type=int, default=1)
parser.add_argument("--prefix", "-p", dest='file_prefix', help="Prefix the output file name", type=str)
parser.add_argument("--sleep", "-s", help="Sleep this long between lines (in seconds)", default=0.0, type=float)
#Required argument, the cluster name of the Dataproc cluster
parser.add_argument("--cluster", "-c", dest = "cluster", help="Cluster Name", default="spark-etl", type=str)
args = parser.parse_args()
log_lines = args.num_lines
file_prefix = args.file_prefix
output_type = args.output_type
log_format = args.log_format
cluster_name = args.cluster
#This object is used to generate the fake ecommerce logs
faker = Faker()
# Data on the basis of which, the logs will be generated
timestr = time.strftime("%Y%m%d-%H%M%S")
otime = datetime.datetime.now()
response=["200","404","500","301"]
verb=["GET","POST",'DELETE',"PUT"]
mens_wear_cart =["/products/mens-wear/shoes/cart.jsp?pid=","/products/mens-wear/formal-tshirts/cart.jsp?pid="]
mens_wear_cart +=["/products/mens-wear/sports/cart.jsp?pid=","/products/men/home-lifestyle/cart.jsp?pid="]
mens_wear_cart +=["/products/men/home-gifting/cart.jsp?pid=","/products/men/bags/cart.jsp?pid="]
womens_wear_cart =["/products/womens-wear/shoes/cart.jsp?pid=","/products/womens-wear/accessories/cart.jsp?pid="]
womens_wear_cart +=["/products/womens-wear/grooming/cart.jsp?pid=","/products/womens-wear/bags/cart.jsp?pid="]
womens_wear_cart +=["/products/women/perfumes/cart.jsp?pid=","/products/women/home-gifting/cart.jsp?pid="]
women_product_hits = ["/women-clothing/list/dresses/","/women-clothing/list/leggings/"]
women_product_hits += ["/women-clothing/list/winter-clothing/","/women-clothing/list/sports-tees/"]
women_product_hits += ["/women/list/perfumes/","/women-clothing/list/pants/"]
women_product_hits += ["/women-clothing/list/accessories/","/women-clothing/list/denims/"]
mens_product_hits = ["/men-clothing/list/polo-tshirts/","/men-clothing/list/sports-tshirts/"]
mens_product_hits += ["/men-clothing/list/polo-tshirts/","/men-clothing/list/sports-tshirts/"]
mens_product_hits += ["/men-clothing/list/perfumes/","/men-clothing/list/trousers/"]
mens_product_hits += ["/men-clothing/list/accessories/","/men-clothing/list/denims/"]
resources = []
resources += mens_wear_cart
resources += mens_wear_cart+womens_wear_cart + mens_product_hits + women_product_hits
ualist = [faker.firefox, faker.chrome, faker.safari, faker.internet_explorer, faker.opera]
flag = True
while (flag):
#increment time according to the sleep interval provided.
if args.sleep:
increment = datetime.timedelta(seconds=args.sleep)
else:
increment = datetime.timedelta(seconds=random.randint(10,20))
otime += increment
#generate a fake ip-address
ip = faker.ipv4()
#generate a fake US-State
fake_state = faker.state() #US States
# convert date to the required format
dt = otime.strftime("%Y-%m-%d %H:%M:%S")
# Timezone
tz = datetime.datetime.now(local).strftime('%z')
#HTTP Verb. Provided higher probability for GET when compared to other verbs.
vrb = np.random.choice(verb,p=[0.6,0.1,0.1,0.2])
uri = random.choice(resources)
#Add pid to the uri, if "products" is present. This indicates the product has been added to the cart.
if "products" in uri :
uri += str(random.randint(1,2000))
# resp = numpy.random.choice(response,p=[0.9,0.04,0.02,0.04])
# byt = int(random.gauss(5000,50))
# referer = faker.uri()
# useragent = numpy.random.choice(ualist,p=[0.5,0.3,0.1,0.05,0.05] )()
#Kafka producer object.
producer = KafkaProducer(bootstrap_servers=[cluster_name+'-w-1:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'))
# producer = KafkaProducer(bootstrap_servers=[cluster_name+'-w-0:9092', cluster_name+'-w-1:9092'],value_serializer=lambda v: v.encode('utf-8'))
uri_segments = [i for i in uri.split("/") if i !='']
if len(uri_segments) == 4 :
pid = int(uri_segments[3].split("pid=",1)[1])
else :
pid = None
json_str = {"date_time": dt, "state":fake_state, "ip_address":ip, "category":uri_segments[0],'sub_cat':uri_segments[1],'type':uri_segments[2],"pid":pid}
producer.send(b'user_browsing_logs', value=json_str)
# producer.send(b'user_browsing_logs', value=uri)
if args.sleep:
time.sleep(args.sleep)
else :
time.sleep(0.1)