Authors: Marco Balletti, Francesco Marino
Folder containing the input dataset as a CSV file (dataset.csv
).
Folder containing scripts and file for a container based execution of the project architecture:
start-dockers.sh
creates the Kafka Cluster and necessary Kafka topics,stop-dockers.sh
stops and deletes the Kafka Cluster after the created topics deletion anddocker-compose.yml
is the Docker Compose file used to create the container infrastructure.
Folder containing benchmark results (under Benchmark
directory), project report and presentation slides.
Folder containing Flink computation results as CSV files:
query1_daily.csv
containing the output of the first query evaluated by daily windows,query1_weekly.csv
containing the output of the first query evaluated by weekly windows,query1_monthly.csv
containing the output of the first query evaluated by monthly windows,query2_daily.csv
containing the output of the second query evaluated by daily windows,query2_weekly.csv
containing the output of the second query evaluated by weekly windows,query3_daily.csv
containing the output of the third query evaluated by daily windows andquery3_weekly.csv
containing the output of the third query evaluated by weekly windows.
Results are evaluated from the entire dataset content
This directory contains in its subdirectories Java code for:
- creation of Kafka Topic producer for input data,
- creation of a Flink topology to run a DSP analysis of the three queries,
- creation of a Kafka Streams topology to run an alternative DSP analysis of the same three queries and
- creation of several Kafka topic consumers for DSP output saving.
It is recommended to open the entire directory with an IDE for better code navigation. Java project part was developed using JetBrains' IntelliJ IDEA.
In the main folder there are processing architecture launchers:
ConsumersLauncher.java
that launches consumers for Kafka Streams and Flink outputs,FlinkDSPMain.java
that starts Flink data stream processing,KafkaStreamsDSPMain.java
that starts Kafka Streams processing andProducerLauncher.java
used to start a producer that reads from file and publish tuples to Kafka topics simulating a real time data source.
This package contains classes for queries' topologies building and execution using Flink as DSP framework.
AverageDelayAggregator.java
used to aggregate data for the first query using daily, weekly and monthly windows,AverageDelayOutcome.java
representing the aggregation result,AverageDelayProcessWindow.java
used to set correctly windows' start times,MonthlyWindowAssigner.java
contains a custom thumbling window assigner for tuples separation by event time month (this was necessary due to differences in month durations) andQuery1TopologyBuilder.java
that builds the topology of the first query.
ReasonRankingAggregator.java
used to aggregate data for the second query using daily and weekly windows,ReasonRankingOutcome.java
representing the aggregation result,ReasonRankingProcessWindow.java
used to set correctly windows' start times andQuery2TopologyBuilder.java
that builds the topology of the second query.
CompanyRankingAggregator.java
used to aggregate data for the third query using daily and weekly windows,CompanyRankingOutcome.java
representing the aggregation result,CompanyRankingProcessWindow.java
used to set correctly windows' start times andQuery3TopologyBuilder.java
that builds the topology of the third query.
This package contains configurations for the Kafka publish-subscribe service and classes for Consumers and Producers instantiation:
KafkaClusterConfig.java
containing topics name and properties builders (for publishers and subscribers),KafkaParametricConsumer.java
used to create and start consumers registered to Kafka topics (of DSP outputs) andKafkaSingleProducer.java
creates a producer that publishes DSP input tuples to Kafka topics.
This package contains classes for queries' topologies building and execution using Kafka Streams as DSP library and the KafkaStreamsConfig.java
used to get properties for the stream processing library execution.
This package contains classes for queries' topologies creation:
Query1TopologyBuilder.java
that builds the topology of the first query,Query2TopologyBuilder.java
that builds the topology of the second query andQuery3TopologyBuilder.java
that builds the topology of the third query.
This package contains custom Kafka Streams windows:
CustomTimeWindows.java
that is an abstract class representing a generic custom duration time window,DailyTimeWindows.java
that implements a daily time window aligned to a given time zone,MonthlyTimeWindows.java
that implements a monthly time window (aligned to the first day of a month in a given time zone) andWeeklyTimeWindows.java
implementing a weekly time window (starts on Monday and ends on Sunday aligned to a given time zone).
This package contains classes needed for queries' execution support, in particular:
BusData.java
structure representing tuple information needed for evaluation,DataCommonTransformation.java
containing common method needed for queries processing andOutputFormatter.java
needed for query outcomes formatting in order to be published on Kafka.
This package contains classes used as accumulators for both Flink and Kafka Streams processing:
AverageDelayAccumulator.java
used for average delay statistics grouped by neighbourhood (first query),AverageDelayStatistics.java
used to maintain information about per neighbourhood delay (first query),CompanyRankingAccumulator.java
used for company name ranking on delay basis (third query) andReasonRankingAccumulator.java
needed for delay reason rankings (second query).
This package contains utilities for latency and throughput evaluation:
BenchmarkFlinkSink.java
representing a sink that can be used in Flink topology to evaluate performances andSynchronizedCounter.java
that is a static counter for benchmark evaluation (counts tuples and time).
This package contains utilities for delay string parsing and delay type ranking:
DelayFixes.java
this is an Enum for wrongly converted string correction in the dataset,DelayFormatException.java
that is a custom Java Exception for failure on gaining information from delay strings,DealyInfo.java
representing a single parsed delay information,DelayParsingUtility.java
that contains delay strings parsing logic andDelayScorer.java
used to assign a score on delay and reason basis (third query).
This package contains data serialization and deserialization utilities:
FlinkStringToKafkaSerializer.java
needed to serialize Flink output strings for publication Kafka topics,JsonPOJODeserializer.java
used to deserialize custom object from JSON format,JsonPOJOSerializer.java
used to serialize custom object to JSON format andSerDesBuilders.java
used to build ser-des for Kafka Streams.