From ccc253b13a86aa452bff8b7f3592a58ec808b127 Mon Sep 17 00:00:00 2001 From: Jonas Jaszkowic Date: Mon, 16 Oct 2023 15:20:28 +0200 Subject: [PATCH] fix: age calculation respecting pflanzjahr=0 and preprocess trees data (#105) fix: age calculation respecting pflanzjahr=0 and preprocess trees data with tippecanoe (#105) --- .github/workflows/test-harvest.yml | 3 +- harvester/Dockerfile | 5 +- harvester/harvester.py | 121 ---------------- harvester/mapbox-tree-update.py | 224 +++++++++++++++++++++++++++++ harvester/requirements.txt | 3 +- 5 files changed, 232 insertions(+), 124 deletions(-) create mode 100644 harvester/mapbox-tree-update.py diff --git a/.github/workflows/test-harvest.yml b/.github/workflows/test-harvest.yml index 5fc592d2..a47f84c8 100644 --- a/.github/workflows/test-harvest.yml +++ b/.github/workflows/test-harvest.yml @@ -25,6 +25,7 @@ env: MAPBOXTILESET: "xyz" MAPBOXLAYERNAME: "abc" LOGGING: INFO + SKIP_MAPBOX: "True" # DATABASE_URL: postgresql://fangorn:ent@localhost:5432/trees?schema=public jobs: @@ -60,7 +61,7 @@ jobs: id: api-start run: cd api && supabase start | grep -w "service_role key" | cut -d ":" -f 2 | xargs | tr -d '\n' | awk '{print "service_role_key="$1}' >> "$GITHUB_OUTPUT" && cd .. - name: run the harvester - run: docker run --env PG_SERVER='0.0.0.0' --env PG_DB --env PG_PORT --env PG_USER --env PG_PASS --env SUPABASE_URL --env SUPABASE_SERVICE_ROLE_KEY='${{ steps.api-start.outputs.service_role_key }}' --env SUPABASE_BUCKET_NAME --env MAPBOXTOKEN --env MAPBOXUSERNAME --env LOGGING --env OUTPUT --network host technologiestiftung/giessdenkiez-de-dwd-harvester:test + run: docker run --env PG_SERVER='0.0.0.0' --env SKIP_MAPBOX --env PG_DB --env PG_PORT --env PG_USER --env PG_PASS --env SUPABASE_URL --env SUPABASE_SERVICE_ROLE_KEY='${{ steps.api-start.outputs.service_role_key }}' --env SUPABASE_BUCKET_NAME --env MAPBOXTOKEN --env MAPBOXUSERNAME --env LOGGING --env OUTPUT --network host technologiestiftung/giessdenkiez-de-dwd-harvester:test - name: stop the api run: cd api && supabase stop && cd .. release: diff --git a/harvester/Dockerfile b/harvester/Dockerfile index f7b0938c..3ee211be 100644 --- a/harvester/Dockerfile +++ b/harvester/Dockerfile @@ -24,6 +24,9 @@ RUN cd /app/ && python -m pip install --no-warn-script-location --prefix=/instal FROM base as app COPY --from=builder /install /usr/local +RUN apt-get update && apt-get -y install git && apt-get -y install make +RUN git clone https://github.com/mapbox/tippecanoe.git && cd tippecanoe && make -j && make install + # COPY harvester.py /app/ # COPY prepare.py /app/ # COPY grid/ grid/ @@ -31,4 +34,4 @@ COPY --from=builder /install /usr/local COPY . /app/ -CMD ["python", "/app/harvester.py"] +CMD python /app/harvester.py && python /app/mapbox-tree-update.py diff --git a/harvester/harvester.py b/harvester/harvester.py index 2c7f3ed3..ef9601ba 100644 --- a/harvester/harvester.py +++ b/harvester/harvester.py @@ -12,12 +12,6 @@ from dotenv import load_dotenv import logging import os -import sys -import math -import boto3 -import requests -import json - # setting up logging logging.basicConfig() @@ -305,37 +299,6 @@ features_light.append(feature_template.format( cell[1], cell[0], sum(clean[cellindex]))) - def check_file_exists_in_supabase_storage(file_name): - url = f'{SUPABASE_URL}/storage/v1/object/info/public/{SUPABASE_BUCKET_NAME}/{file_name}' - response = requests.get(url) - return response.status_code == 200 - - def upload_file_to_supabase_storage(file_path, file_name): - try: - file = open(file_path, 'rb') - file_url = f'{SUPABASE_URL}/storage/v1/object/{SUPABASE_BUCKET_NAME}/{file_name}' - r = requests.put if check_file_exists_in_supabase_storage(file_name) else requests.post - response = r( - file_url, - files={'file': file}, - headers={ - 'Authorization': f'Bearer {SUPABASE_SERVICE_ROLE_KEY}', - 'ContentType': 'application/geo+json', - 'AcceptEncoding': 'gzip, deflate, br' - }, - ) - - if response.status_code == 200: - logging.info("✅ Uploaded {} to supabase storage".format(file_name)) - else: - logging.warning(response.status_code) - logging.warning(response.content) - logging.warning("❌ Could not upload {} to supabase storage".format(file_name)) - - except Exception as error: - logging.warning(error) - logging.warning("❌ Could not upload {} supabase storage".format(file_name)) - def finishGeojson(feature_list, file_name): geojson = '{{"type":"FeatureCollection","properties":{{"start":"{}","end":"{}"}},"features":[{}]}}'.format( startdate, enddate, ",".join(feature_list)) @@ -345,93 +308,9 @@ def finishGeojson(feature_list, file_name): text_file.close() n = None - upload_file_to_supabase_storage(path + file_name, file_name) - finishGeojson(features, "weather.geojson") finishGeojson(features_light, "weather_light.geojson") - # create a CSV with all trees (id, lat, lng, radolan_sum) - with conn.cursor() as cur: - # WARNING: The db is still mislabeled lat <> lng - cur.execute("SELECT trees.id, trees.lat, trees.lng, trees.radolan_sum, (date_part('year', CURRENT_DATE) - trees.pflanzjahr) as age FROM trees WHERE ST_CONTAINS(ST_SetSRID (( SELECT ST_EXTENT (geometry) FROM radolan_geometry), 4326), trees.geom)") - trees = cur.fetchall() - trees_head = "id,lng,lat,radolan_sum,age" - trees_csv = trees_head - pLimit = math.ceil(len(trees) / 4) - pCount = 0 - pfCount = 1 - singleCSV = trees_head - singleCSVs = [] - for tree in trees: - newLine = "\n" - newLine += "{},{},{},{}".format(tree[0], tree[1], tree[2], tree[3]) - if tree[4] is None: - newLine += "," - else: - newLine += ",{}".format(int(tree[4])) - singleCSV += newLine - trees_csv += newLine - pCount += 1 - if pCount >= pLimit: - text_file = open(path + "trees-p{}.csv".format(pfCount), "w") - singleCSVs.append(singleCSV) - n = text_file.write(singleCSV) - text_file.close() - n = None - pfCount += 1 - pCount = 0 - singleCSV = trees_head - - text_file = open(path + "trees-p{}.csv".format(pfCount), "w") - singleCSVs.append(singleCSV) - n = text_file.write(singleCSV) - text_file.close() - n = None - - text_file = open(path + "trees.csv", "w") - n = text_file.write(trees_csv) - text_file.close() - n = None - - upload_file_to_supabase_storage(path + "trees.csv", "trees.csv") - - for i in range(4): - upload_file_to_supabase_storage(path + "trees-p{}.csv".format(i + 1), "trees-p{}.csv".format(i + 1)) - - # send the updated csv to mapbox - - # get upload credentials - try: - url = "https://api.mapbox.com/uploads/v1/{}/credentials?access_token={}".format( - os.getenv("MAPBOXUSERNAME"), os.getenv("MAPBOXTOKEN")) - response = requests.post(url) - s3_credentials = json.loads(response.content) - - # upload latest data - - s3mapbox = boto3.client('s3', aws_access_key_id=s3_credentials["accessKeyId"], - aws_secret_access_key=s3_credentials["secretAccessKey"], aws_session_token=s3_credentials["sessionToken"]) - s3mapbox.upload_file(path + "trees.csv", - s3_credentials["bucket"], s3_credentials["key"]) - - # tell mapbox that new data has arrived - - url = "https://api.mapbox.com/uploads/v1/{}?access_token={}".format( - os.getenv("MAPBOXUSERNAME"), os.getenv("MAPBOXTOKEN")) - payload = '{{"url":"http://{}.s3.amazonaws.com/{}","tileset":"{}.{}","name":"{}"}}'.format( - s3_credentials["bucket"], s3_credentials["key"], os.getenv("MAPBOXUSERNAME"), os.getenv("MAPBOXTILESET"), os.getenv("MAPBOXLAYERNAME")) - headers = {'content-type': 'application/json', - 'Accept-Charset': 'UTF-8', 'Cache-Control': 'no-cache'} - response = requests.post(url, data=payload, headers=headers) - # wohooo! - logging.info("✅ Map updated to timespan: {} to {}".format( - startdate, enddate)) - except: - logging.warning( - "could not upload tree data to mapbox for vector tiles") - trees_csv = None - csv_data = None - # remove all temporary files shutil.rmtree(path) diff --git a/harvester/mapbox-tree-update.py b/harvester/mapbox-tree-update.py new file mode 100644 index 00000000..a838dec6 --- /dev/null +++ b/harvester/mapbox-tree-update.py @@ -0,0 +1,224 @@ +import os +import requests +import json +import tempfile +import time +import shutil +import subprocess +import boto3 +from datetime import datetime +import psycopg2 +import logging +from tqdm import tqdm + +# Set the log level +logging.root.setLevel(logging.INFO) + +SKIP_MAPBOX = os.getenv("SKIP_MAPBOX") + +# Database connection parameters +pg_server = os.getenv("PG_SERVER") +pg_port = os.getenv("PG_PORT") +pg_username = os.getenv("PG_USER") +pg_password = os.getenv("PG_PASS") +pg_database = os.getenv("PG_DB") +dsn = f"host='{pg_server}' port={pg_port} user='{pg_username}' password='{pg_password}' dbname='{pg_database}'" + +# Supabase configuration +SUPABASE_URL = os.getenv("SUPABASE_URL") +SUPABASE_BUCKET_NAME = os.getenv("SUPABASE_BUCKET_NAME") +SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") + + +# Function to check if a file exists in Supabase storage +def check_file_exists_in_supabase_storage(file_name): + url = f"{SUPABASE_URL}/storage/v1/object/info/public/{SUPABASE_BUCKET_NAME}/{file_name}" + response = requests.get(url) + return response.status_code == 200 + + +# Function to upload a file to Supabase storage +def upload_file_to_supabase_storage(file_path, file_name): + try: + with open(file_path, "rb") as file: + file_url = ( + f"{SUPABASE_URL}/storage/v1/object/{SUPABASE_BUCKET_NAME}/{file_name}" + ) + http_method = ( + requests.put + if check_file_exists_in_supabase_storage(file_name) + else requests.post + ) + response = http_method( + file_url, + files={"file": file}, + headers={ + "Authorization": f"Bearer {SUPABASE_SERVICE_ROLE_KEY}", + "ContentType": "application/geo+json", + "AcceptEncoding": "gzip, deflate, br", + }, + ) + + if response.status_code == 200: + logging.info(f"✅ Uploaded {file_name} to Supabase storage") + else: + logging.error(response.status_code) + logging.error(response.content) + logging.error(f"❌ Could not upload {file_name} to Supabase storage") + + except Exception as error: + logging.error(error) + logging.error(f"❌ Could not upload {file_name} to Supabase storage") + + +# Create a temporary directory +path = tempfile.mkdtemp() + +# Get the current year +current_year = datetime.now().year + +# Initialize the database connection +try: + conn = psycopg2.connect(dsn) + logging.info("🗄 Database connection established") +except Exception as e: + logging.error("❌ Could not establish a database connection") + conn = None + +if conn is not None: + with conn.cursor() as cur: + logging.info("Fetching trees from the database...") + # WARNING: The db is still mislabeled lat <> lng + cur.execute( + "SELECT trees.id, trees.lat, trees.lng, trees.radolan_sum, trees.pflanzjahr FROM trees WHERE ST_CONTAINS(ST_SetSRID (( SELECT ST_EXTENT (geometry) FROM radolan_geometry), 4326), trees.geom)" + ) + trees = cur.fetchall() + + header = "id,lng,lat,radolan_sum,age" + logging.info(f"Creating trees.csv file for {len(trees)} trees") + + lines = [] + for tree in tqdm(trees): + age = int(current_year) - int(tree[4]) if tree[4] != 0 else "" + line = "{},{},{},{},{}".format(tree[0], tree[1], tree[2], tree[3], age) + lines.append(line) + + trees_csv = "\n".join([header] + lines) + + trees_csv_full_path = os.path.join(path, "trees.csv") + trees_preprocessed_full_path = os.path.join(path, "trees-preprocessed.mbtiles") + + with open(trees_csv_full_path, "w") as out: + out.write(trees_csv) + + # Pre-process trees.csv with tippecanoe + logging.info("Preprocess trees.csv with tippecanoe...") + subprocess.call( + [ + "tippecanoe", + "-zg", + "-o", + trees_preprocessed_full_path, + "--force", + "--drop-fraction-as-needed", + trees_csv_full_path, + ] + ) + logging.info("Preprocess trees.csv with tippecanoe... Done.") + + # Upload preprocessed data to Supabase storage + upload_file_to_supabase_storage( + trees_preprocessed_full_path, "trees-preprocessed.mbtiles" + ) + + # Send the updated CSV to Mapbox + if SKIP_MAPBOX != "True": + try: + url = "https://api.mapbox.com/uploads/v1/{}/credentials?access_token={}".format( + os.getenv("MAPBOXUSERNAME"), os.getenv("MAPBOXTOKEN") + ) + response = requests.post(url) + s3_credentials = json.loads(response.content) + + # Upload the latest data to S3 + s3mapbox = boto3.client( + "s3", + aws_access_key_id=s3_credentials["accessKeyId"], + aws_secret_access_key=s3_credentials["secretAccessKey"], + aws_session_token=s3_credentials["sessionToken"], + ) + s3mapbox.upload_file( + trees_preprocessed_full_path, + s3_credentials["bucket"], + s3_credentials["key"], + ) + + # Tell Mapbox that new data has arrived + url = "https://api.mapbox.com/uploads/v1/{}?access_token={}".format( + os.getenv("MAPBOXUSERNAME"), os.getenv("MAPBOXTOKEN") + ) + payload = '{{"url":"http://{}.s3.amazonaws.com/{}","tileset":"{}.{}","name":"{}"}}'.format( + s3_credentials["bucket"], + s3_credentials["key"], + os.getenv("MAPBOXUSERNAME"), + os.getenv("MAPBOXTILESET"), + os.getenv("MAPBOXLAYERNAME"), + ) + headers = { + "content-type": "application/json", + "Accept-Charset": "UTF-8", + "Cache-Control": "no-cache", + } + response = requests.post(url, data=payload, headers=headers) + if response.status_code != 201: + logging.error("Could not generate Mapbox upload") + logging.error(response.content) + + upload_id = json.loads(response.content)["id"] + logging.info( + f"Initialized generation of Mapbox tilesets for upload={upload_id}..." + ) + + # Check for the status of Mapbox upload until completed or error + complete = False + error = None + while not complete and error is None: + url = "https://api.mapbox.com/uploads/v1/{}/{}?access_token={}".format( + os.getenv("MAPBOXUSERNAME"), upload_id, os.getenv("MAPBOXTOKEN") + ) + headers = { + "content-type": "application/json", + "Accept-Charset": "UTF-8", + "Cache-Control": "no-cache", + } + response = requests.get(url, headers=headers) + responseJson = json.loads(response.content) + complete = responseJson["complete"] + error = responseJson["error"] + progress = responseJson["progress"] + logging.info( + f"Waiting for tileset generation for upload={upload_id} progress={progress} complete={complete} error={error}" + ) + time.sleep(2) + + if error is not None: + logging.error(error) + exit(1) + + except Exception as error: + logging.error("Could not upload tree data to Mapbox for vector tiles") + logging.error(error) + exit(1) + else: + logging.info("Skipping Mapbox Tileset generation.") + + # Clean up + trees_csv = None + csv_data = None + +# Remove all temporary files +shutil.rmtree(path) + +# Close the database connection +if conn is not None: + conn.close() diff --git a/harvester/requirements.txt b/harvester/requirements.txt index 3cb7b8c3..d8987f5c 100644 --- a/harvester/requirements.txt +++ b/harvester/requirements.txt @@ -22,4 +22,5 @@ Shapely==1.7.0 six==1.14.0 urllib3==1.25.8 zope.interface==5.0.1 -requests==2.25.1 \ No newline at end of file +requests==2.25.1 +tqdm==4.66.1 \ No newline at end of file