This repository has been archived by the owner on Oct 16, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathKafkaParametricConsumer.java
160 lines (142 loc) · 4.66 KB
/
KafkaParametricConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package kafka_pubsub;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import javax.annotation.Nullable;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.Properties;
/**
* Class used to create consumers, subscribe them to topics and run them
*/
public class KafkaParametricConsumer implements Runnable {
private final static boolean PRINT_KAFKA_ON_CONSOLE = true;
private final static int POLL_WAIT_TIME = 1000;
private final static int CYCLE_INTERVAL_TIME = 5000;
private String CONSUMER_GROUP_ID = "-topics-consumers";
private final Consumer<String, String> consumer;
private final int id;
private final String topic;
private final boolean flink;
private String path;
private boolean running = true;
/**
* Create a new consumer using the properties
* @return the created consumer
*/
private Consumer<String, String> createConsumer() {
Properties props = KafkaClusterConfig.getKafkaParametricConsumerProperties(CONSUMER_GROUP_ID);
return new KafkaConsumer<>(props);
}
/**
* Subscribe a consumer to a topic
* @param consumer to be subscribe
* @param topic chosen
*/
private static void subscribeToTopic(Consumer<String, String> consumer, String topic) {
consumer.subscribe(Collections.singletonList(topic));
}
/**
* Create a parametric consumer based on the arguments
* @param id consumer's id
* @param topic name
* @param flink boolean, if true is a flink consumer, in the other case it's a kafka one
* @param path where to store result if it's a flink consumer, null if it isn't
*/
public KafkaParametricConsumer(int id, String topic, boolean flink, @Nullable String path) {
this.flink = flink;
if (flink) {
CONSUMER_GROUP_ID = "flink" + CONSUMER_GROUP_ID;
this.path = Objects.requireNonNull(path);
} else {
CONSUMER_GROUP_ID = "kafka-streams" + CONSUMER_GROUP_ID;
}
this.id = id;
this.topic = topic;
// create the consumer
consumer = createConsumer();
// subscribe the consumer to the topic
subscribeToTopic(consumer, topic);
}
@Override
public void run() {
if (flink) {
runFlinkConsumer();
} else {
runKafkaStreamsConsumer();
}
}
/**
* Create a consumer that prints kafka streams queries' result if PRINT_KAFKA_ON_CONSOLE is set to true
*/
@SuppressWarnings({"BusyWait"})
private void runKafkaStreamsConsumer() {
System.out.println("Kafka Consumer " + CONSUMER_GROUP_ID + "-ID" + id + " running...");
try {
while (running) {
Thread.sleep(CYCLE_INTERVAL_TIME);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(POLL_WAIT_TIME));
if (PRINT_KAFKA_ON_CONSOLE) {
for (ConsumerRecord<String, String> record : records) {
System.out.println("[" + id + "] Consuming Kafka record at topic: " + topic +
"\n(key=" + record.key() + ", val=" + record.value() + ")");
}
}
}
} catch (InterruptedException ignored) {
// ignored
} finally {
// close consumer
consumer.close();
System.out.println("Kafka Consumer " + CONSUMER_GROUP_ID + "-ID" + id + " stopped");
}
}
/**
* Create a consumer that write flink queries' result to the csv in Results directory
*/
@SuppressWarnings({"BusyWait", "ResultOfMethodCallIgnored"})
private void runFlinkConsumer() {
System.out.println("Flink Consumer " + CONSUMER_GROUP_ID + "-ID" + id + " running...");
try {
while (running) {
Thread.sleep(CYCLE_INTERVAL_TIME);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(POLL_WAIT_TIME));
if (!records.isEmpty()) {
File file = new File(path);
if (!file.exists()) {
// creates the file if it does not exist
file.createNewFile();
}
// append to existing version of the same file
FileWriter writer = new FileWriter(file, true);
BufferedWriter bw = new BufferedWriter(writer);
for (ConsumerRecord<String, String> record : records) {
bw.append(record.value());
bw.append("\n");
}
// close both buffered writer and file writer
bw.close();
writer.close();
}
}
} catch (InterruptedException ignored) {
// ignored
} catch (IOException e) {
e.printStackTrace();
System.err.println("Could not export result to " + path);
} finally {
// close consumer
consumer.close();
System.out.println("Flink Consumer " + CONSUMER_GROUP_ID + "-ID" + id + " stopped");
}
}
public void stop() {
this.running = false;
}
}