-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproducer.py
47 lines (39 loc) · 1.51 KB
/
producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from kafka import KafkaProducer
import json
import ijson
import time
def produce_dataset(file_path, producer, topics, batch_size=20):
try:
with open(file_path, "r") as file:
# read the JSON file in a streaming manner
objects = ijson.items(file, 'item')
batch = []
# loop through the objects and send them in batches
for obj in objects:
batch.append(obj)
if len(batch) >= batch_size:
# send the batch to all topics
for topic in topics:
producer.send(topic, json.dumps(batch).encode('utf-8'))
producer.flush()
batch = []
print(f"Sent {batch_size} objects to {topics}")
time.sleep(5)
# send the last partial batch if there is any
if batch:
for topic in topics:
producer.send(topic, json.dumps(batch).encode('utf-8'))
producer.flush()
except FileNotFoundError:
print(f"File '{file_path}' not found.")
except Exception as e:
print(f"An error occurred: {e}")
# declare parameters
bootstrap_servers = 'localhost:9092'
dataset_file = 'data/preprocessed_dataset.json'
topics = ['Apriori', 'PCY', 'Custom']
# intialize the producer
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
# start producing the dataset
produce_dataset(dataset_file, producer, topics)
producer.close()