-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
44 lines (36 loc) · 1.21 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import json
from faker import Faker
import time
import random
from kafka import KafkaProducer
faker = Faker()
def generate_data():
'''
This function generates/produces random data and sends it to kafka topic
'''
data ={}
data['user_id'] = random.getrandbits(32)
data['event_type'] = random.choice(['click', 'view', 'download'])
data['country_code'] = faker.country_code()
data['city'] = faker.city()
data['device_type'] = random.choice(['desktop', 'mobile'])
data['product_id'] = random.getrandbits(32)
data['price'] = round(random.uniform(5.0, 500.0),2)
data['quantity'] = random.randint(1, 100)
data['timestamp'] = round(time.time())
return data
if __name__ == '__main__':
faker = Faker()
topic_name='ecommerce_event_data'
producer=KafkaProducer(
bootstrap_servers=['localhost:29092','localhost:39092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
count=0
# I am generating 50 records
while count<50:
data = generate_data()
producer.send(topic_name, value=data)
count+=1
#print(f'Record Number {count}: {data}')
producer.flush()