Skip to content

Commit

Permalink
chore: remove "tran_r_vehst" datasource and clean code
Browse files Browse the repository at this point in the history
  • Loading branch information
rafoolin committed Jan 13, 2024
1 parent aba8c6b commit 39e8abc
Show file tree
Hide file tree
Showing 9 changed files with 13 additions and 210 deletions.
20 changes: 0 additions & 20 deletions project/pipeline/etl/extract/tran_r_vehst_extractor.py

This file was deleted.

6 changes: 2 additions & 4 deletions project/pipeline/etl/load/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@
import pipeline_utils.utils as util


def merge_data_to_sql(sdg_data, road_data, tran_data) -> pd.DataFrame:
def merge_data_to_sql(sdg_data, road_data) -> pd.DataFrame:
"""
Merge data from different sources into a single Pandas DataFrame.
Parameters:
- sdg_data (pd.DataFrame): DataFrame containing average CO2 emissions per km from new passenger cars.
- road_data (pd.DataFrame): New passenger cars by type of motor energy.
- tran_data (pd.DataFrame): Stock of vehicles by category and NUTS 2 regions.
Returns:
pd.DataFrame: Merged DataFrame containing data from all three sources.
"""
# Merge based on the time and country code
merge_on = ["geo", "TIME_PERIOD"]
sdg_tran_data = pd.merge(sdg_data, tran_data, on=merge_on, how="inner")
merged_data = pd.merge(road_data, sdg_tran_data, on=merge_on, how="inner")
merged_data = pd.merge(sdg_data, road_data, on=merge_on, how="inner")
return merged_data.dropna()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,4 @@ def road_eqr_carpda_data_transformer(data_frame: pd.DataFrame) -> pd.DataFrame:
data_frame["TIME_PERIOD"] = pd.to_datetime(data_frame["TIME_PERIOD"], format='%Y')
# Reset indexes after changing and dropping rows
data_frame = data_frame.reset_index(drop=True)
print(data_frame)
return data_frame
45 changes: 0 additions & 45 deletions project/pipeline/etl/transform/tran_r_vehst_transformer.py

This file was deleted.

13 changes: 0 additions & 13 deletions project/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
import etl.extract.geo_extractor as geo_e
import etl.extract.motor_energy_extractor as motor_e
import etl.extract.unit_extractor as unit_e
import etl.extract.tran_r_vehst_extractor as tran_e
import etl.extract.road_eqr_carpda_extractor as road_e
import etl.transform.road_eqr_carpda_transformer as road_t
import etl.transform.tran_r_vehst_transformer as tran_t
import etl.transform.sdg_transformer as sdg_t
from etl.load import loader

Expand All @@ -34,10 +32,6 @@ def __extract(self):
self.unit = unit_e.unit_data_extractor()
print(colored("Extracting unit data source Finished!", "green"))

print(colored("Extracting Tran_r_vhest data source...", "green"))
self.tran = tran_e.tran_r_vehst_data_extractor()
print(colored("Extracting Tran_r_vhest data source Finished!", "green"))

print(colored("Extracting Road_eqr_catpda data source...", "green"))
self.road = road_e.road_eqr_carpda_data_extractor()
print(colored("Extracting Road_eqr_catpda data source Finished!", "green"))
Expand All @@ -50,10 +44,6 @@ def __transform(self):
self.cleaned_road = road_t.road_eqr_carpda_data_transformer(self.road)
print(colored("Transforming Road_eqr_catpda data source Finished!", "green"))

print(colored("Transforming tran_r_vehst data source...", "green"))
self.cleaned_tran = tran_t.tran_r_vehst_data_transformer(self.tran)
print(colored("Transforming tran_r_vehst data source Finished!", "green"))

print(colored("Transforming SDG data source...", "green"))
self.cleaned_sdg = sdg_t.sdg_data_transformer(self.sdg)
print(colored("Transforming SDG data source Finished!", "green"))
Expand All @@ -67,7 +57,6 @@ def __merge(self):
self.merged_data = loader.merge_data_to_sql(
sdg_data=self.cleaned_sdg,
road_data=self.cleaned_road,
tran_data=self.cleaned_tran,
)
# Convert column names to lowercase
self.merged_data.columns = self.merged_data.columns.str.lower()
Expand Down Expand Up @@ -104,8 +93,6 @@ def run_pipeline(self):
)
return

# Transform
# TODO:: rafoolin/made-template#14
try:
self.__transform()
except Exception as e:
Expand Down
25 changes: 0 additions & 25 deletions project/tests/etl/extract/tran_r_vehst_extractor_test.py

This file was deleted.

14 changes: 1 addition & 13 deletions project/tests/etl/load/loader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,17 @@ def test_merge_data_to_sql(self):
"D2": [3, 4],
}
)
mock_data_3 = pd.DataFrame(
{
"geo": ["A", "B"],
"TIME_PERIOD": [2023, 2029],
"D3": [5, 6],
}
)

expected_result = pd.DataFrame(
{
"geo": ["A"],
"TIME_PERIOD": [2023],
"D1": [1],
"D2": [3],
"D3": [5],
},
)

result = loader.merge_data_to_sql(
sdg_data=mock_data_1,
road_data=mock_data_2,
tran_data=mock_data_3,
)
result = loader.merge_data_to_sql(sdg_data=mock_data_1, road_data=mock_data_2)
# The extra columns are deleted and are DFs are merged
pd.testing.assert_frame_equal(result, expected_result, check_like=True)

Expand Down
48 changes: 0 additions & 48 deletions project/tests/etl/transform/tran_r_vehst_transformer_test.py

This file was deleted.

51 changes: 10 additions & 41 deletions project/tests/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,36 +47,6 @@ def setUp(self) -> None:
}
)

self.mock_tran_data = pd.DataFrame(
{
"DATAFLOW": [
"ESTAT:SDG_12_30(1.0)",
"ESTAT:SDG_12_30(1.0)",
"ESTAT:SDG_12_30(1.0)",
"ESTAT:SDG_12_30(1.0)",
"ESTAT:SDG_12_30(1.0)",
"ESTAT:SDG_12_30(1.0)",
"ESTAT:SDG_12_30(1.0)",
],
"LAST UPDATE": [
"13/03/23 23:00:00",
"13/03/23 23:00:00",
"13/03/23 23:00:00",
"13/03/23 23:00:00",
"13/03/23 23:00:00",
"13/03/23 23:00:00",
"13/03/23 23:00:00",
],
"freq": ["A", "A", "A", "A", "A", "A", "A"],
"vehicle": ["BUS_TOT", "CAR", "CAR", "CAR", "CAR", "CAR", "CAR"],
"unit": ["NR", "NR", "NR", "NR", "NR", "NR", "NR"],
"geo": ["AT", "AT", "AT", "AT", "AT", "AT", "AT"],
"TIME_PERIOD": [2000, 2001, 2002, 2003, 2004, 2005, 2006],
"OBS_VALUE": [6583, 7146, 7535, 7627, 7867, 6583, 7146],
"OBS_FLAG": ["", "", "", "", "", "", ""],
}
)

self.mock_road_data = pd.DataFrame(
{
"DATAFLOW": [
Expand Down Expand Up @@ -109,12 +79,10 @@ def setUp(self) -> None:

self.expected_result = pd.DataFrame(
{
"geo": ["AT", "AT", "AT", "AT", "AT", "AT"],
"vehicles_unit": ["NR", "NR", "NR", "NR", "NR", "NR"],
"n_vehicles": [7146, 7535, 7627, 7867, 6583, 7146],
"n_passenger_cars": [4935, 5703, 4114, 1285, 2074, 2389],
"emitted_co2": [165, 164, 163, 161, 162, 163],
"mot_nrg": ["ALT", "ALT", "ALT", "ALT", "ALT", "ALT"],
"geo": ["AT", "AT", "AT", "AT", "AT", "AT", "AT"],
"n_passenger_cars": [3757, 4935, 5703, 4114, 1285, 2074, 2389],
"emitted_co2": [168, 165, 164, 163, 161, 162, 163],
"mot_nrg": ["ALT", "ALT", "ALT", "ALT", "ALT", "ALT", "ALT"],
}
)
self.db_name = "pipeline"
Expand All @@ -127,28 +95,29 @@ def test_run_pipeline(self) -> None:
self.assertTrue(os.path.exists(self.db_path))

@patch("etl.extract.sdg_extractor.sdg_data_extractor")
@patch("etl.extract.tran_r_vehst_extractor.tran_r_vehst_data_extractor")
@patch("etl.extract.road_eqr_carpda_extractor.road_eqr_carpda_data_extractor")
def test_data_after_run_pipeline(
self,
mock_road_extractor,
mock_tran_extractor,
mock_sdg_extractor,
) -> None:
# Set mocks
mock_sdg_extractor.return_value = self.mock_sdg_data
mock_tran_extractor.return_value = self.mock_tran_data
mock_road_extractor.return_value = self.mock_road_data
Pipeline().run_pipeline()
# Check calls
mock_sdg_extractor.assert_called_once()
mock_tran_extractor.assert_called_once()
mock_road_extractor.assert_called_once()
# Connection
conn = sqlite3.connect(self.db_path)
query = f"SELECT * FROM {self.table_name}"
result = pd.read_sql_query(query, conn)
pd.testing.assert_frame_equal(result.drop('time_period', axis=1), self.expected_result, check_like=True, check_dtype=False)
pd.testing.assert_frame_equal(
result.drop("time_period", axis=1),
self.expected_result,
check_like=True,
check_dtype=False,
)
conn.commit()
conn.close()

Expand Down

0 comments on commit 39e8abc

Please sign in to comment.