From 39e8abc4585a248e9b7c6ebaa3897198e747cc6e Mon Sep 17 00:00:00 2001 From: Rafoolin Date: Sat, 13 Jan 2024 18:33:45 +0100 Subject: [PATCH] chore: remove "tran_r_vehst" datasource and clean code --- .../etl/extract/tran_r_vehst_extractor.py | 20 -------- project/pipeline/etl/load/loader.py | 6 +-- .../transform/road_eqr_carpda_transformer.py | 1 - .../etl/transform/tran_r_vehst_transformer.py | 45 ---------------- project/pipeline/pipeline.py | 13 ----- .../extract/tran_r_vehst_extractor_test.py | 25 --------- project/tests/etl/load/loader_test.py | 14 +---- .../tran_r_vehst_transformer_test.py | 48 ----------------- project/tests/pipeline_test.py | 51 ++++--------------- 9 files changed, 13 insertions(+), 210 deletions(-) delete mode 100644 project/pipeline/etl/extract/tran_r_vehst_extractor.py delete mode 100644 project/pipeline/etl/transform/tran_r_vehst_transformer.py delete mode 100644 project/tests/etl/extract/tran_r_vehst_extractor_test.py delete mode 100644 project/tests/etl/transform/tran_r_vehst_transformer_test.py diff --git a/project/pipeline/etl/extract/tran_r_vehst_extractor.py b/project/pipeline/etl/extract/tran_r_vehst_extractor.py deleted file mode 100644 index 48b7fb8dfa..0000000000 --- a/project/pipeline/etl/extract/tran_r_vehst_extractor.py +++ /dev/null @@ -1,20 +0,0 @@ -import pandas as pd -from pipeline_utils.zip_helper import GZipFileHelper - - -def tran_r_vehst_data_extractor(compressed=True): - """ - Extracts stock of vehicles by category and NUTS 2 regions data from the data source URL. - - Parameters: - - compressed (bool, optional): Flag indicating whether the file is compressed. Both are possible from the data provider. - When True (default), the function assumes the file is compressed and uses GZip decompression. - When False, the file is assumed to be uncompressed. - - Returns: - - DataFrame: A DataFrame containing the extracted data. - """ - zip_helper = GZipFileHelper() - url = f"https://ec.europa.eu/eurostat/api/dissemination/sdmx/2.1/data/tran_r_vehst/?format=SDMX-CSV&compressed={compressed}" - csv_file_path = zip_helper.download_and_extract_url_file(url) - return pd.read_csv(csv_file_path) diff --git a/project/pipeline/etl/load/loader.py b/project/pipeline/etl/load/loader.py index 41ca65fb60..2a3a2012ba 100644 --- a/project/pipeline/etl/load/loader.py +++ b/project/pipeline/etl/load/loader.py @@ -3,21 +3,19 @@ import pipeline_utils.utils as util -def merge_data_to_sql(sdg_data, road_data, tran_data) -> pd.DataFrame: +def merge_data_to_sql(sdg_data, road_data) -> pd.DataFrame: """ Merge data from different sources into a single Pandas DataFrame. Parameters: - sdg_data (pd.DataFrame): DataFrame containing average CO2 emissions per km from new passenger cars. - road_data (pd.DataFrame): New passenger cars by type of motor energy. - - tran_data (pd.DataFrame): Stock of vehicles by category and NUTS 2 regions. Returns: pd.DataFrame: Merged DataFrame containing data from all three sources. """ # Merge based on the time and country code merge_on = ["geo", "TIME_PERIOD"] - sdg_tran_data = pd.merge(sdg_data, tran_data, on=merge_on, how="inner") - merged_data = pd.merge(road_data, sdg_tran_data, on=merge_on, how="inner") + merged_data = pd.merge(sdg_data, road_data, on=merge_on, how="inner") return merged_data.dropna() diff --git a/project/pipeline/etl/transform/road_eqr_carpda_transformer.py b/project/pipeline/etl/transform/road_eqr_carpda_transformer.py index 50d35ddff6..2347b1d6b7 100644 --- a/project/pipeline/etl/transform/road_eqr_carpda_transformer.py +++ b/project/pipeline/etl/transform/road_eqr_carpda_transformer.py @@ -39,5 +39,4 @@ def road_eqr_carpda_data_transformer(data_frame: pd.DataFrame) -> pd.DataFrame: data_frame["TIME_PERIOD"] = pd.to_datetime(data_frame["TIME_PERIOD"], format='%Y') # Reset indexes after changing and dropping rows data_frame = data_frame.reset_index(drop=True) - print(data_frame) return data_frame diff --git a/project/pipeline/etl/transform/tran_r_vehst_transformer.py b/project/pipeline/etl/transform/tran_r_vehst_transformer.py deleted file mode 100644 index a7a9901741..0000000000 --- a/project/pipeline/etl/transform/tran_r_vehst_transformer.py +++ /dev/null @@ -1,45 +0,0 @@ -import pandas as pd - - -def tran_r_vehst_data_transformer(data_frame: pd.DataFrame) -> pd.DataFrame: - """ - Clean data frame of stock of vehicles by category and NUTS 2 regions data. - - Parameters: - - dataFrame (pd.DataFrame): The raw data for stock of vehicles by category and NUTS 2 regions - Returns: - - pd.DataFrame: Cleaned data - """ - # Dropping some columns we do not need - to_drop = ["DATAFLOW", "LAST UPDATE", "OBS_FLAG"] - to_drop_filter = data_frame.filter(to_drop) - data_frame = data_frame.drop(to_drop_filter, axis=1) - # Filter and drop rows that its frequency(freq) is not A|a. - # This means we only consider annual frequencies! - if "freq" in data_frame.columns: - frame_filter = data_frame["freq"].str.contains(r"[A|a]") == False - data_frame = data_frame[~frame_filter] - # Now that rows are filtered, we drop the column - data_frame = data_frame.drop(["freq"], axis=1) - # Drop [vehicles] other than [CAR] - if "vehicle" in data_frame.columns: - frame_filter = data_frame["vehicle"].str.contains("CAR") == False - data_frame = data_frame[~frame_filter] - # Drop vehicle column - data_frame = data_frame.drop(["vehicle"], axis=1) - data_frame = data_frame.dropna() - # Convert [OBS_VALUE] to contains [int] values - if "OBS_VALUE" in data_frame.columns: - data_frame["OBS_VALUE"] = data_frame["OBS_VALUE"].astype(int) - data_frame = data_frame.rename({"OBS_VALUE": "n_vehicles"}, axis=1) - if "unit" in data_frame.columns: - data_frame = data_frame.rename({"unit": "vehicles_unit"}, axis=1) - # Reset indexes after changing and dropping rows - data_frame = data_frame.reset_index(drop=True) - if "TIME_PERIOD" in data_frame.columns: - # Convert [TIME_PERIOD] to [datetime] values - data_frame["TIME_PERIOD"] = data_frame["TIME_PERIOD"].astype(str) - data_frame["TIME_PERIOD"] = pd.to_datetime(data_frame["TIME_PERIOD"], format='%Y') - # Reset indexes after changing and dropping rows - data_frame = data_frame.reset_index(drop=True) - return data_frame diff --git a/project/pipeline/pipeline.py b/project/pipeline/pipeline.py index fbbe2a1c01..a2cc7c9425 100644 --- a/project/pipeline/pipeline.py +++ b/project/pipeline/pipeline.py @@ -5,10 +5,8 @@ import etl.extract.geo_extractor as geo_e import etl.extract.motor_energy_extractor as motor_e import etl.extract.unit_extractor as unit_e -import etl.extract.tran_r_vehst_extractor as tran_e import etl.extract.road_eqr_carpda_extractor as road_e import etl.transform.road_eqr_carpda_transformer as road_t -import etl.transform.tran_r_vehst_transformer as tran_t import etl.transform.sdg_transformer as sdg_t from etl.load import loader @@ -34,10 +32,6 @@ def __extract(self): self.unit = unit_e.unit_data_extractor() print(colored("Extracting unit data source Finished!", "green")) - print(colored("Extracting Tran_r_vhest data source...", "green")) - self.tran = tran_e.tran_r_vehst_data_extractor() - print(colored("Extracting Tran_r_vhest data source Finished!", "green")) - print(colored("Extracting Road_eqr_catpda data source...", "green")) self.road = road_e.road_eqr_carpda_data_extractor() print(colored("Extracting Road_eqr_catpda data source Finished!", "green")) @@ -50,10 +44,6 @@ def __transform(self): self.cleaned_road = road_t.road_eqr_carpda_data_transformer(self.road) print(colored("Transforming Road_eqr_catpda data source Finished!", "green")) - print(colored("Transforming tran_r_vehst data source...", "green")) - self.cleaned_tran = tran_t.tran_r_vehst_data_transformer(self.tran) - print(colored("Transforming tran_r_vehst data source Finished!", "green")) - print(colored("Transforming SDG data source...", "green")) self.cleaned_sdg = sdg_t.sdg_data_transformer(self.sdg) print(colored("Transforming SDG data source Finished!", "green")) @@ -67,7 +57,6 @@ def __merge(self): self.merged_data = loader.merge_data_to_sql( sdg_data=self.cleaned_sdg, road_data=self.cleaned_road, - tran_data=self.cleaned_tran, ) # Convert column names to lowercase self.merged_data.columns = self.merged_data.columns.str.lower() @@ -104,8 +93,6 @@ def run_pipeline(self): ) return - # Transform - # TODO:: rafoolin/made-template#14 try: self.__transform() except Exception as e: diff --git a/project/tests/etl/extract/tran_r_vehst_extractor_test.py b/project/tests/etl/extract/tran_r_vehst_extractor_test.py deleted file mode 100644 index a0d08ac497..0000000000 --- a/project/tests/etl/extract/tran_r_vehst_extractor_test.py +++ /dev/null @@ -1,25 +0,0 @@ -import unittest -from unittest.mock import patch -import pandas as pd - -from pipeline_utils.zip_helper import GZipFileHelper -from etl.extract.tran_r_vehst_extractor import tran_r_vehst_data_extractor - - -class TestTranDataExtractor(unittest.TestCase): - @patch.object(GZipFileHelper, "download_and_extract_url_file") - @patch("pandas.read_csv") - def test_sdg_extractor(self, mock_read_csv, mock_zip_helper): - mock_zip_helper.return_value = "tran_file.csv" - mock_csv_data = pd.DataFrame({"C1": [1, 2], "C2": [3, 4]}) - mock_read_csv.return_value = mock_csv_data - # Call the tran_extractor with compressed=True - result = tran_r_vehst_data_extractor(compressed=False) - path = "https://ec.europa.eu/eurostat/api/dissemination/sdmx/2.1/data/tran_r_vehst/?format=SDMX-CSV&compressed=False" - mock_zip_helper.assert_called_once_with(path) - mock_read_csv.assert_called_once_with("tran_file.csv") - self.assertTrue(result.equals(mock_csv_data)) - - -if __name__ == "__main__": - unittest.main() diff --git a/project/tests/etl/load/loader_test.py b/project/tests/etl/load/loader_test.py index 8a9bc0d3fc..ceae27c003 100644 --- a/project/tests/etl/load/loader_test.py +++ b/project/tests/etl/load/loader_test.py @@ -29,13 +29,6 @@ def test_merge_data_to_sql(self): "D2": [3, 4], } ) - mock_data_3 = pd.DataFrame( - { - "geo": ["A", "B"], - "TIME_PERIOD": [2023, 2029], - "D3": [5, 6], - } - ) expected_result = pd.DataFrame( { @@ -43,15 +36,10 @@ def test_merge_data_to_sql(self): "TIME_PERIOD": [2023], "D1": [1], "D2": [3], - "D3": [5], }, ) - result = loader.merge_data_to_sql( - sdg_data=mock_data_1, - road_data=mock_data_2, - tran_data=mock_data_3, - ) + result = loader.merge_data_to_sql(sdg_data=mock_data_1, road_data=mock_data_2) # The extra columns are deleted and are DFs are merged pd.testing.assert_frame_equal(result, expected_result, check_like=True) diff --git a/project/tests/etl/transform/tran_r_vehst_transformer_test.py b/project/tests/etl/transform/tran_r_vehst_transformer_test.py deleted file mode 100644 index e3bb0636ad..0000000000 --- a/project/tests/etl/transform/tran_r_vehst_transformer_test.py +++ /dev/null @@ -1,48 +0,0 @@ -import unittest -import pandas as pd - -from etl.transform.tran_r_vehst_transformer import tran_r_vehst_data_transformer - - -class TestTranTransformer(unittest.TestCase): - def test_tran_r_vehst_data_transformer(self) -> None: - mock_csv_data = pd.DataFrame( - { - "DATAFLOW": [1, 2, 3], - "LAST UPDATE": [1, 2, 3], - "OBS_VALUE": ["1", "2", "3"], - "freq": ["a", "a", "b"], - "unit": ["A", "B", "C"], - } - ) - expected_result = pd.DataFrame( - { - "n_vehicles": [1, 2], - "vehicles_unit": ["A", "B"], - } - ) - result = tran_r_vehst_data_transformer(mock_csv_data) - # The extra columns are deleted - pd.testing.assert_frame_equal(result, expected_result, check_like=True) - - def test_tran_r_vehst_data_transformer_empty(self) -> None: - mock_csv_data = pd.DataFrame( - { - "DATAFLOW": [1, 2, 3], - "LAST UPDATE": [1, 2, 3], - "OBS_VALUE": ["1", "2", "3"], - "freq": ["c", "b", "b"], - "unit": ["A", "B", "C"], - } - ) - result = tran_r_vehst_data_transformer(mock_csv_data) - # The extra columns are deleted - # We don't have any annual row in the data-frame - self.assertTrue( - result.empty, - msg='Only annual data is valid on the "freq" column', - ) - - -if __name__ == "__main__": - unittest.main() diff --git a/project/tests/pipeline_test.py b/project/tests/pipeline_test.py index 7021a9533b..e76acd179d 100644 --- a/project/tests/pipeline_test.py +++ b/project/tests/pipeline_test.py @@ -47,36 +47,6 @@ def setUp(self) -> None: } ) - self.mock_tran_data = pd.DataFrame( - { - "DATAFLOW": [ - "ESTAT:SDG_12_30(1.0)", - "ESTAT:SDG_12_30(1.0)", - "ESTAT:SDG_12_30(1.0)", - "ESTAT:SDG_12_30(1.0)", - "ESTAT:SDG_12_30(1.0)", - "ESTAT:SDG_12_30(1.0)", - "ESTAT:SDG_12_30(1.0)", - ], - "LAST UPDATE": [ - "13/03/23 23:00:00", - "13/03/23 23:00:00", - "13/03/23 23:00:00", - "13/03/23 23:00:00", - "13/03/23 23:00:00", - "13/03/23 23:00:00", - "13/03/23 23:00:00", - ], - "freq": ["A", "A", "A", "A", "A", "A", "A"], - "vehicle": ["BUS_TOT", "CAR", "CAR", "CAR", "CAR", "CAR", "CAR"], - "unit": ["NR", "NR", "NR", "NR", "NR", "NR", "NR"], - "geo": ["AT", "AT", "AT", "AT", "AT", "AT", "AT"], - "TIME_PERIOD": [2000, 2001, 2002, 2003, 2004, 2005, 2006], - "OBS_VALUE": [6583, 7146, 7535, 7627, 7867, 6583, 7146], - "OBS_FLAG": ["", "", "", "", "", "", ""], - } - ) - self.mock_road_data = pd.DataFrame( { "DATAFLOW": [ @@ -109,12 +79,10 @@ def setUp(self) -> None: self.expected_result = pd.DataFrame( { - "geo": ["AT", "AT", "AT", "AT", "AT", "AT"], - "vehicles_unit": ["NR", "NR", "NR", "NR", "NR", "NR"], - "n_vehicles": [7146, 7535, 7627, 7867, 6583, 7146], - "n_passenger_cars": [4935, 5703, 4114, 1285, 2074, 2389], - "emitted_co2": [165, 164, 163, 161, 162, 163], - "mot_nrg": ["ALT", "ALT", "ALT", "ALT", "ALT", "ALT"], + "geo": ["AT", "AT", "AT", "AT", "AT", "AT", "AT"], + "n_passenger_cars": [3757, 4935, 5703, 4114, 1285, 2074, 2389], + "emitted_co2": [168, 165, 164, 163, 161, 162, 163], + "mot_nrg": ["ALT", "ALT", "ALT", "ALT", "ALT", "ALT", "ALT"], } ) self.db_name = "pipeline" @@ -127,28 +95,29 @@ def test_run_pipeline(self) -> None: self.assertTrue(os.path.exists(self.db_path)) @patch("etl.extract.sdg_extractor.sdg_data_extractor") - @patch("etl.extract.tran_r_vehst_extractor.tran_r_vehst_data_extractor") @patch("etl.extract.road_eqr_carpda_extractor.road_eqr_carpda_data_extractor") def test_data_after_run_pipeline( self, mock_road_extractor, - mock_tran_extractor, mock_sdg_extractor, ) -> None: # Set mocks mock_sdg_extractor.return_value = self.mock_sdg_data - mock_tran_extractor.return_value = self.mock_tran_data mock_road_extractor.return_value = self.mock_road_data Pipeline().run_pipeline() # Check calls mock_sdg_extractor.assert_called_once() - mock_tran_extractor.assert_called_once() mock_road_extractor.assert_called_once() # Connection conn = sqlite3.connect(self.db_path) query = f"SELECT * FROM {self.table_name}" result = pd.read_sql_query(query, conn) - pd.testing.assert_frame_equal(result.drop('time_period', axis=1), self.expected_result, check_like=True, check_dtype=False) + pd.testing.assert_frame_equal( + result.drop("time_period", axis=1), + self.expected_result, + check_like=True, + check_dtype=False, + ) conn.commit() conn.close()