From 6a142c9283320aa017cb69b2744e5e83163d76ab Mon Sep 17 00:00:00 2001 From: jeanetteclark Date: Mon, 25 Mar 2024 06:13:43 +0000 Subject: [PATCH] =?UTF-8?q?Deploying=20to=20gh-pages=20from=20@=20NCEAS/re?= =?UTF-8?q?pro-research-course@7fa05b265512682644fd4ed7d17fac23a49ccafa=20?= =?UTF-8?q?=F0=9F=9A=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 2024-03-arctic/search.json | 34 +- .../sections/data-structures-netcdf.html | 84 ++-- 2024-03-arctic/sections/geopandas.html | 60 +-- .../sections/parallel-programming.html | 370 +++++++++++------- .../sections/parallel-with-dask.html | 50 +-- 2024-03-arctic/sections/python-intro.html | 44 +-- .../sections/software-design-1.html | 32 +- .../sections/software-design-2.html | 16 +- 2024-03-arctic/sections/zarr.html | 46 +-- 9 files changed, 425 insertions(+), 311 deletions(-) diff --git a/2024-03-arctic/search.json b/2024-03-arctic/search.json index bd1e8335..fbf38e93 100644 --- a/2024-03-arctic/search.json +++ b/2024-03-arctic/search.json @@ -344,7 +344,7 @@ "href": "sections/parallel-programming.html#task-parallelization-in-python", "title": "4  Pleasingly Parallel Programming", "section": "4.7 Task parallelization in Python", - "text": "4.7 Task parallelization in Python\nPython also provides a number of easy to use packages for concurrent processing. We will review two of these, concurrent.futures and parsl, to show just how easy it can be to parallelize your programs. concurrent.futures is built right into the python3 release, and is a great starting point for learning concurrency.\nWe’re going to start with a task that is a little expensive to compute, and define it in a function. All this task(x) function does is to use numpy to create a fairly large range of numbers, and then sum them.\n\ndef task(x):\n import numpy as np\n result = np.arange(x*10**8).sum()\n return result\n\nWe can start by executing this task function serially ten times with varying inputs. In this case, we create a function run_serial that takes a list of inputs to be run, and it calls the task function for each of those inputs. The @timethis decorator is a simple way to wrap the function with timing code so that we can see how long it takes to execute.\n\nimport numpy as np\n\n@timethis\ndef run_serial(task_list):\n return [task(x) for x in task_list]\n\nrun_serial(np.arange(10))\n\nrun_serial: 7156.259536743164 ms\n\n\n[0,\n 4999999950000000,\n 19999999900000000,\n 44999999850000000,\n 79999999800000000,\n 124999999750000000,\n 179999999700000000,\n 244999999650000000,\n 319999999600000000,\n 404999999550000000]\n\n\nIn this case, it takes around 25 seconds to execute 10 tasks, depending on what else is happening on the machine and network.\nSo, can we make this faster using a multi-threaded parallel process? Let’s try with concurrent.futures. The main concept in this package is one of a future, which is a structure which represents the value that will be created in a computation in the future when the function completes execution. With concurrent.futures, tasks are scheduled and do not block while they await their turn to be executed. Instead, threads are created and executed asynchronously, meaning that the function returns it’s future potentially before the thread has actually been executed. Using this approach, the user schedules a series of tasks to be executed asynchronously, and keeps track of the futures for each task. When the future indicates that the execution has been completed, we can then retrieve the result of the computation.\nIn practice this is a simple change from our serial implementation. We will use the ThreadPoolExecutor to create a pool of workers that are available to process tasks. Each worker is set up in its own thread, so it can execute in parallel with other workers. After setting up the pool of workers, we use concurrent.futures map() to schedule each task from our task_list (in this case, an input value from 1 to 10) to run on one of the workers. As for all map() implementations, we are asking for each value in task_list to be executed in the task function we defined above, but in this case it will be executed using one of the workers from the executor that we created.\n\nfrom concurrent.futures import ThreadPoolExecutor\n\n@timethis\ndef run_threaded(task_list):\n with ThreadPoolExecutor(max_workers=20) as executor:\n return executor.map(task, task_list)\n\nresults = run_threaded(np.arange(10))\n[x for x in results]\n\nrun_threaded: 4440.109491348267 ms\n[0,\n 4999999950000000,\n 19999999900000000,\n 44999999850000000,\n 79999999800000000,\n 124999999750000000,\n 179999999700000000,\n 244999999650000000,\n 319999999600000000,\n 404999999550000000]\nThis execution took about 🎉 4 seconds 🎉, which is about 6.25x faster than serial. Congratulations, you wrote your a multi-threaded python program!", + "text": "4.7 Task parallelization in Python\nPython also provides a number of easy to use packages for concurrent processing. We will review two of these, concurrent.futures and parsl, to show just how easy it can be to parallelize your programs. concurrent.futures is built right into the python3 release, and is a great starting point for learning concurrency.\nWe’re going to start with a task that is a little expensive to compute, and define it in a function. All this task(x) function does is to use numpy to create a fairly large range of numbers, and then sum them.\n\ndef task(x):\n import numpy as np\n result = np.arange(x*10**8).sum()\n return result\n\nWe can start by executing this task function serially ten times with varying inputs. In this case, we create a function run_serial that takes a list of inputs to be run, and it calls the task function for each of those inputs. The @timethis decorator is a simple way to wrap the function with timing code so that we can see how long it takes to execute.\n\nimport numpy as np\n\n@timethis\ndef run_serial(task_list):\n return [task(x) for x in task_list]\n\nrun_serial(np.arange(10))\n\nrun_serial: 7197.09587097168 ms\n\n\n[0,\n 4999999950000000,\n 19999999900000000,\n 44999999850000000,\n 79999999800000000,\n 124999999750000000,\n 179999999700000000,\n 244999999650000000,\n 319999999600000000,\n 404999999550000000]\n\n\nIn this case, it takes around 25 seconds to execute 10 tasks, depending on what else is happening on the machine and network.\nSo, can we make this faster using a multi-threaded parallel process? Let’s try with concurrent.futures. The main concept in this package is one of a future, which is a structure which represents the value that will be created in a computation in the future when the function completes execution. With concurrent.futures, tasks are scheduled and do not block while they await their turn to be executed. Instead, threads are created and executed asynchronously, meaning that the function returns it’s future potentially before the thread has actually been executed. Using this approach, the user schedules a series of tasks to be executed asynchronously, and keeps track of the futures for each task. When the future indicates that the execution has been completed, we can then retrieve the result of the computation.\nIn practice this is a simple change from our serial implementation. We will use the ThreadPoolExecutor to create a pool of workers that are available to process tasks. Each worker is set up in its own thread, so it can execute in parallel with other workers. After setting up the pool of workers, we use concurrent.futures map() to schedule each task from our task_list (in this case, an input value from 1 to 10) to run on one of the workers. As for all map() implementations, we are asking for each value in task_list to be executed in the task function we defined above, but in this case it will be executed using one of the workers from the executor that we created.\n\nfrom concurrent.futures import ThreadPoolExecutor\n\n@timethis\ndef run_threaded(task_list):\n with ThreadPoolExecutor(max_workers=20) as executor:\n return executor.map(task, task_list)\n\nresults = run_threaded(np.arange(10))\n[x for x in results]\n\nrun_threaded: 4440.109491348267 ms\n[0,\n 4999999950000000,\n 19999999900000000,\n 44999999850000000,\n 79999999800000000,\n 124999999750000000,\n 179999999700000000,\n 244999999650000000,\n 319999999600000000,\n 404999999550000000]\nThis execution took about 🎉 4 seconds 🎉, which is about 6.25x faster than serial. Congratulations, you wrote your a multi-threaded python program!\nBut you may have also seen the thread pool wasn’t much faster at all. In that case, you may be encountering the python Global Interpreter Lock (GIL), and would be better off with using parallel processes. Let’s do that with ProcessPoolExecutor.\n\nfrom concurrent.futures import ProcessPoolExecutor\n\n@timethis\ndef run_processes(task_list):\n with ProcessPoolExecutor(max_workers=10) as executor:\n return executor.map(task, task_list)\n\nresults = run_processes(np.arange(10))\n[x for x in results]\n\nFor me, that took about 8 seconds, about half the time of the serial case, but results can vary tremendously depending on what others are doing on the machine.", "crumbs": [ "4  Pleasingly Parallel Programming" ] @@ -354,7 +354,7 @@ "href": "sections/parallel-programming.html#exercise-parallel-downloads", "title": "4  Pleasingly Parallel Programming", "section": "4.8 Exercise: Parallel downloads", - "text": "4.8 Exercise: Parallel downloads\nIn this exercise, we’re going to parallelize a simple task that is often very time consuming – downloading data. And we’ll compare performance of simple downloads using first a serial loop, and then using two parallel execution libraries: concurrent.futures and parsl. We’re going to see an example here where parallel execution won’t always speed up this task, as this is likely an I/O bound task if you’re downloading a lot of data. But we still should be able to speed things up a lot until we hit the limits of our disk arrays.\nThe data we are downloading is a pan-Arctic time series of TIF images containing rasterized Arctic surface water indices from:\n\nElizabeth Webb. 2022. Pan-Arctic surface water (yearly and trend over time) 2000-2022. Arctic Data Center doi:10.18739/A2NK3665N.\n\n\n\n\nWebb surface water index data\n\n\nFirst, let’s download the data serially to set a benchmark. The data files are listed in a table with their filename and identifier, and can be downloaded directly from the Arctic Data Center using their identifier. To make things easier, we’ve already provided a data frame with the names and identifiers of each file that could be downloaded.\n\n\n\n\n\n\n\n\n\n\nfilename\nidentifier\n\n\n\n\n0\nSWI_2007.tif\nurn:uuid:5ee72c9c-789d-4a1c-95d8-cb2b24a20662\n\n\n1\nSWI_2019.tif\nurn:uuid:9cd1cdc3-0792-4e61-afff-c11f86d3a9be\n\n\n2\nSWI_2021.tif\nurn:uuid:14e1e509-77c0-4646-9cc3-d05f8d84977c\n\n\n3\nSWI_2020.tif\nurn:uuid:1ba473ff-8f03-470b-90d1-7be667995ea1\n\n\n4\nSWI_2001.tif\nurn:uuid:85150557-05fd-4f52-8bbd-ec5a2c27e23d\n\n\n\n\n\n\n\n\n\n4.8.1 Serial\nWhen you have a list of repetitive tasks, you may be able to speed it up by adding more computing power. If each task is completely independent of the others, then it is pleasingly parallel and a prime candidate for executing those tasks in parallel, each on its own core. For example, let’s build a simple loop that downloads the data files that we need for an analysis. First, we start with the serial implementation.\n\nimport urllib\n\ndef download_file(row):\n service = \"https://arcticdata.io/metacat/d1/mn/v2/object/\"\n pid = row[1]['identifier']\n filename = row[1]['filename']\n url = service + pid\n print(\"Downloading: \" + filename)\n msg = urllib.request.urlretrieve(url, filename)\n return filename\n\n@timethis\ndef download_serial(table):\n results = [download_file(row) for row in table.iterrows()]\n return results\n \nresults = download_serial(file_table[0:5])\nprint(results)\n\nDownloading: SWI_2007.tif\nDownloading: SWI_2019.tif\nDownloading: SWI_2021.tif\nDownloading: SWI_2020.tif\nDownloading: SWI_2001.tif\ndownload_serial: 102506.97588920593 ms\n['SWI_2007.tif', 'SWI_2019.tif', 'SWI_2021.tif', 'SWI_2020.tif', 'SWI_2001.tif']\n\n\nIn this code, we have one function (download_file) that downloads a single data file and saves it to disk. It is called iteratively from the function download_serial. The serial execution takes about 20-25 seconds, but can vary considerably based on network traffic and other factors.\nThe issue with this loop is that we execute each download task sequentially, which means that only one of our processors on this machine is in use. In order to exploit parallelism, we need to be able to dispatch our tasks and allow each to run at the same time, with one task going to each core. To do that, we can use one of the many parallelization libraries in python to help us out.\n\n\n4.8.2 Multi-threaded with concurrent.futures\nIn this case, we’ll use the same download_file function from before, but let’s switch up and create a download_threaded() function to use concurrent.futures with a ThreadPoolExecutor just as we did earlier.\n\nfrom concurrent.futures import ThreadPoolExecutor\n\n@timethis\ndef download_threaded(table):\n with ThreadPoolExecutor(max_workers=15) as executor:\n results = executor.map(download_file, table.iterrows(), timeout=60)\n return results\n\nresults = download_threaded(file_table[0:5])\nfor result in results:\n print(result)\n\nDownloading: SWI_2007.tif\nDownloading: SWI_2019.tif\nDownloading: SWI_2021.tif\nDownloading: SWI_2020.tif\nDownloading: SWI_2001.tif\ndownload_threaded: 18966.630458831787 ms\nSWI_2007.tif\nSWI_2019.tif\nSWI_2021.tif\nSWI_2020.tif\nSWI_2001.tif\n\n\nNote how the “Downloading…” messages were printed immediately, but then it still took over 20 seconds to download the 5 files. This could be for several reasons, including that one of the files alone took that long (e.g., due to network congestion), or that there was a bottleneck in writing the files to disk (i.e., we could have been disk I/O limited). Or maybe the multithreaded executor pool didn’t do a good job parallelizing the tasks. The trick is figuring out why you did or didn’t get a speedup when parallelizing. So, let’s try this another way, using a multi-processing approach, rather than multi-threading.\n\n\n4.8.3 Multi-process with concurrent.futures\nYou’ll remember from earlier that you can execute tasks concurrently by creating multiple threads within one process (multi-threaded), or by creating and executing muliple processes. The latter creates more independence, as each of the executing tasks has their own memory and process space, but it also takes longer to set up. With concurrent.futures, we can switch to a multi-process pool by using a ProcessPoolExecutor, analogously to how we used ThreadPoolExecutor previously. So, some simple changes, and we’re running multiple processes.\n\nfrom concurrent.futures import ProcessPoolExecutor\n\n@timethis\ndef download_process(table):\n with ProcessPoolExecutor(max_workers=15) as executor:\n results = executor.map(download_file, table.iterrows(), timeout=60)\n return results\n\nresults = download_process(file_table[0:5])\nfor result in results:\n print(result)\n\nDownloading: SWI_2007.tifDownloading: SWI_2019.tifDownloading: SWI_2020.tifDownloading: SWI_2021.tif\n\n\n\nDownloading: SWI_2001.tif\ndownload_process: 17364.819288253784 ms\nSWI_2007.tif\nSWI_2019.tif\nSWI_2021.tif\nSWI_2020.tif\nSWI_2001.tif\n\n\nAgain, the output messages print almost immediately, but then later the processes finish and report that it took between 10 to 15 seconds to run. Your mileage may vary. When I increase the number of files being downloaded to 10 or even to 20, I notice it is actually about the same, around 10-15 seconds. So, part of our time now is the overhead of setting up multiple processes. But once we have that infrastructure in place, we can make effective euse of the pool of processes to handle many downloads.", + "text": "4.8 Exercise: Parallel downloads\nIn this exercise, we’re going to parallelize a simple task that is often very time consuming – downloading data. And we’ll compare performance of simple downloads using first a serial loop, and then using two parallel execution libraries: concurrent.futures and parsl. We’re going to see an example here where parallel execution won’t always speed up this task, as this is likely an I/O bound task if you’re downloading a lot of data. But we still should be able to speed things up a lot until we hit the limits of our disk arrays.\nThe data we are downloading is a pan-Arctic time series of TIF images containing rasterized Arctic surface water indices from:\n\nElizabeth Webb. 2022. Pan-Arctic surface water (yearly and trend over time) 2000-2022. Arctic Data Center doi:10.18739/A2NK3665N.\n\n\n\n\nWebb surface water index data\n\n\nFirst, let’s download the data serially to set a benchmark. The data files are listed in a table with their filename and identifier, and can be downloaded directly from the Arctic Data Center using their identifier. To make things easier, we’ve already provided a data frame with the names and identifiers of each file that could be downloaded.\n\n\n\n\n\n\n\n\n\n\nfilename\nidentifier\n\n\n\n\n0\nSWI_2007.tif\nurn:uuid:5ee72c9c-789d-4a1c-95d8-cb2b24a20662\n\n\n1\nSWI_2019.tif\nurn:uuid:9cd1cdc3-0792-4e61-afff-c11f86d3a9be\n\n\n2\nSWI_2021.tif\nurn:uuid:14e1e509-77c0-4646-9cc3-d05f8d84977c\n\n\n3\nSWI_2020.tif\nurn:uuid:1ba473ff-8f03-470b-90d1-7be667995ea1\n\n\n4\nSWI_2001.tif\nurn:uuid:85150557-05fd-4f52-8bbd-ec5a2c27e23d\n\n\n\n\n\n\n\n\n\n4.8.1 Serial\nWhen you have a list of repetitive tasks, you may be able to speed it up by adding more computing power. If each task is completely independent of the others, then it is pleasingly parallel and a prime candidate for executing those tasks in parallel, each on its own core. For example, let’s build a simple loop that downloads the data files that we need for an analysis. First, we start with the serial implementation.\n\nimport urllib\n\ndef download_file(row):\n service = \"https://arcticdata.io/metacat/d1/mn/v2/object/\"\n pid = row[1]['identifier']\n filename = row[1]['filename']\n url = service + pid\n print(\"Downloading: \" + filename)\n msg = urllib.request.urlretrieve(url, filename)\n return filename\n\n@timethis\ndef download_serial(table):\n results = [download_file(row) for row in table.iterrows()]\n return results\n \nresults = download_serial(file_table[0:5])\nprint(results)\n\nDownloading: SWI_2007.tif\nDownloading: SWI_2019.tif\nDownloading: SWI_2021.tif\nDownloading: SWI_2020.tif\nDownloading: SWI_2001.tif\ndownload_serial: 118949.22089576721 ms\n['SWI_2007.tif', 'SWI_2019.tif', 'SWI_2021.tif', 'SWI_2020.tif', 'SWI_2001.tif']\n\n\nIn this code, we have one function (download_file) that downloads a single data file and saves it to disk. It is called iteratively from the function download_serial. The serial execution takes about 20-25 seconds, but can vary considerably based on network traffic and other factors.\nThe issue with this loop is that we execute each download task sequentially, which means that only one of our processors on this machine is in use. In order to exploit parallelism, we need to be able to dispatch our tasks and allow each to run at the same time, with one task going to each core. To do that, we can use one of the many parallelization libraries in python to help us out.\n\n\n4.8.2 Multi-threaded with concurrent.futures\nIn this case, we’ll use the same download_file function from before, but let’s switch up and create a download_threaded() function to use concurrent.futures with a ThreadPoolExecutor just as we did earlier.\n\nfrom concurrent.futures import ThreadPoolExecutor\n\n@timethis\ndef download_threaded(table):\n with ThreadPoolExecutor(max_workers=15) as executor:\n results = executor.map(download_file, table.iterrows(), timeout=60)\n return results\n\nresults = download_threaded(file_table[0:5])\nfor result in results:\n print(result)\n\nDownloading: SWI_2007.tif\nDownloading: SWI_2019.tif\nDownloading: SWI_2021.tif\nDownloading: SWI_2020.tif\nDownloading: SWI_2001.tif\ndownload_threaded: 40392.88139343262 ms\nSWI_2007.tif\nSWI_2019.tif\nSWI_2021.tif\nSWI_2020.tif\nSWI_2001.tif\n\n\nNote how the “Downloading…” messages were printed immediately, but then it still took over 20 seconds to download the 5 files. This could be for several reasons, including that one of the files alone took that long (e.g., due to network congestion), or that there was a bottleneck in writing the files to disk (i.e., we could have been disk I/O limited). Or maybe the multithreaded executor pool didn’t do a good job parallelizing the tasks. The trick is figuring out why you did or didn’t get a speedup when parallelizing. So, let’s try this another way, using a multi-processing approach, rather than multi-threading.\n\n\n4.8.3 Multi-process with concurrent.futures\nYou’ll remember from earlier that you can execute tasks concurrently by creating multiple threads within one process (multi-threaded), or by creating and executing muliple processes. The latter creates more independence, as each of the executing tasks has their own memory and process space, but it also takes longer to set up. With concurrent.futures, we can switch to a multi-process pool by using a ProcessPoolExecutor, analogously to how we used ThreadPoolExecutor previously. So, some simple changes, and we’re running multiple processes.\n\nfrom concurrent.futures import ProcessPoolExecutor\n\n@timethis\ndef download_process(table):\n with ProcessPoolExecutor(max_workers=15) as executor:\n results = executor.map(download_file, table.iterrows(), timeout=60)\n return results\n\nresults = download_process(file_table[0:5])\nfor result in results:\n print(result)\n\nDownloading: SWI_2007.tifDownloading: SWI_2019.tifDownloading: SWI_2021.tif\nDownloading: SWI_2001.tif\nDownloading: SWI_2020.tif\n\n\ndownload_process: 11409.16132926941 ms\nSWI_2007.tif\nSWI_2019.tif\nSWI_2021.tif\nSWI_2020.tif\nSWI_2001.tif\n\n\nAgain, the output messages print almost immediately, but then later the processes finish and report that it took between 10 to 15 seconds to run. Your mileage may vary. When I increase the number of files being downloaded to 10 or even to 20, I notice it is actually about the same, around 10-15 seconds. So, part of our time now is the overhead of setting up multiple processes. But once we have that infrastructure in place, we can make effective euse of the pool of processes to handle many downloads.", "crumbs": [ "4  Pleasingly Parallel Programming" ] @@ -364,7 +364,17 @@ "href": "sections/parallel-programming.html#parallel-processing-with-parsl", "title": "4  Pleasingly Parallel Programming", "section": "4.9 Parallel processing with parsl", - "text": "4.9 Parallel processing with parsl\nconcurrent.futures is great and powerful, but it has its limits. Particularly as you try to scale up into the thousands of concurrent tasks, other libraries like Parsl (docs), Dask, Ray, and others come into play. They all have their strengths, but Parsl makes it particularly easy to build parallel workflows out of existing python code through it’s use of decorators on existing python functions.\nIn addition, Parsl supports a lot of different kinds of providers, allowing the same python code to be easily run multi-threaded using a ThreadPoolExecutor and via multi-processing on many different cluster computing platforms using the HighThroughputExecutor. For example, Parsl includes providers supporting local execution, and on Slurm, Condor, Kubernetes, AWS, and other platforms. And Parsl handles data staging as well across these varied environments, making sure the data is in the right place when it’s needed for computations.\nSimilarly to before, we start by configuring an executor in parsl, and loading it. We’ll use multiprocessing by configuring the HighThroughputExecutor to use our local resources as a cluster, and we’ll activate our virtual environment to be sure we’re executing in a consistent environment.\n\n# Required packages\nimport parsl\nfrom parsl import python_app\nfrom parsl.config import Config\nfrom parsl.executors import HighThroughputExecutor\nfrom parsl.providers import LocalProvider\n\n# Configure the parsl executor\nactivate_env = 'workon scomp'\nhtex_local = Config(\n executors=[\n HighThroughputExecutor(\n max_workers=15,\n provider=LocalProvider(\n worker_init=activate_env\n )\n )\n ],\n)\nparsl.clear()\nparsl.load(htex_local)\n\n<parsl.dataflow.dflow.DataFlowKernel at 0x7fa749ae0460>\n\n\nWe now have a live parsl executor (htex_local) that is waiting to execute processes. We tell it to execute processes by annotating functions with decorators that indicate which tasks should be parallelized. Parsl then handles the scheduling and execution of those tasks based on the dependencies between them. In the simplest case, we’ll decorate our previous function for downloading a file with the @python_app decorator, which tells parsl that any function calls with this function should be run on the default executor (in this case, htex_local).\n\n# Decorators seem to be ignored as the first line of a cell, so print something first\nprint(\"Create decorated function\")\n\n@python_app\ndef download_file_parsl(row):\n import urllib\n service = \"https://arcticdata.io/metacat/d1/mn/v2/object/\"\n pid = row[1]['identifier']\n filename = row[1]['filename']\n url = service + pid\n print(\"Downloading: \" + filename)\n msg = urllib.request.urlretrieve(url, filename)\n return filename\n\nCreate decorated function\n\n\nNow we just write regular python code that calls that function, and parsl handles the scheduling. Parsl app executors return an AppFuture, and we can call the AppFuture.done() function to determine when the future result is ready without blocking. Or, we can just block on AppFuture.result() which waits for each of the executions to complete and then returns the result.\n\n#! eval: true\nprint(\"Define our download_futures function\")\n\n@timethis\ndef download_futures(table):\n results = []\n for row in table.iterrows():\n result = download_file_parsl(row)\n print(result)\n results.append(result)\n return(results)\n\n@timethis\ndef wait_for_futures(table):\n results = download_futures(table)\n done = [app_future.result() for app_future in results]\n print(done)\n\nwait_for_futures(file_table[0:5])\n\nDefine our download_futures function\n<AppFuture at 0x7fa70479cd90 state=pending>\n<AppFuture at 0x7fa70479ceb0 state=pending>\n<AppFuture at 0x7fa70479f910 state=pending>\n<AppFuture at 0x7fa70479ea10 state=pending>\n<AppFuture at 0x7fa70475ec80 state=pending>\ndownload_futures: 10.331869125366211 ms\n['SWI_2007.tif', 'SWI_2019.tif', 'SWI_2021.tif', 'SWI_2020.tif', 'SWI_2001.tif']\nwait_for_futures: 30859.310388565063 ms\n\n\nWhen we’re done, be sure to clean up and shutdown the htex_local executor, or it will continue to persist in your environment and utilize resources. Generally, an executor should be created when setting up your environment, and then it can be used repeatedly for many different tasks.\n\nhtex_local.executors[0].shutdown()\nparsl.clear()", + "text": "4.9 Parallel processing with parsl\nconcurrent.futures is great and powerful, but it has its limits. Particularly as you try to scale up into the thousands of concurrent tasks, other libraries like Parsl (docs), Dask, Ray, and others come into play. They all have their strengths, but Parsl makes it particularly easy to build parallel workflows out of existing python code through it’s use of decorators on existing python functions.\nIn addition, Parsl supports a lot of different kinds of providers, allowing the same python code to be easily run multi-threaded using a ThreadPoolExecutor and via multi-processing on many different cluster computing platforms using the HighThroughputExecutor. For example, Parsl includes providers supporting local execution, and on Slurm, Condor, Kubernetes, AWS, and other platforms. And Parsl handles data staging as well across these varied environments, making sure the data is in the right place when it’s needed for computations.\nSimilarly to before, we start by configuring an executor in parsl, and loading it. We’ll use multiprocessing by configuring the HighThroughputExecutor to use our local resources as a cluster, and we’ll activate our virtual environment to be sure we’re executing in a consistent environment.\n\n# Required packages\nimport parsl\nfrom parsl import python_app\nfrom parsl.config import Config\nfrom parsl.executors import HighThroughputExecutor\nfrom parsl.providers import LocalProvider\n\n# Configure the parsl executor\nactivate_env = 'workon scomp'\nhtex_local = Config(\n executors=[\n HighThroughputExecutor(\n max_workers=15,\n provider=LocalProvider(\n worker_init=activate_env\n )\n )\n ],\n)\nparsl.clear()\nparsl.load(htex_local)\n\n<parsl.dataflow.dflow.DataFlowKernel at 0x7f0917e50700>\n\n\nWe now have a live parsl executor (htex_local) that is waiting to execute processes. We tell it to execute processes by annotating functions with decorators that indicate which tasks should be parallelized. Parsl then handles the scheduling and execution of those tasks based on the dependencies between them. In the simplest case, we’ll decorate our previous function for downloading a file with the @python_app decorator, which tells parsl that any function calls with this function should be run on the default executor (in this case, htex_local).\n\n# Decorators seem to be ignored as the first line of a cell, so print something first\nprint(\"Create decorated function\")\n\n@python_app\ndef download_file_parsl(row):\n import urllib\n service = \"https://arcticdata.io/metacat/d1/mn/v2/object/\"\n pid = row[1]['identifier']\n filename = row[1]['filename']\n url = service + pid\n print(\"Downloading: \" + filename)\n msg = urllib.request.urlretrieve(url, filename)\n return filename\n\nCreate decorated function\n\n\nNow we just write regular python code that calls that function, and parsl handles the scheduling. Parsl app executors return an AppFuture, and we can call the AppFuture.done() function to determine when the future result is ready without blocking. Or, we can just block on AppFuture.result() which waits for each of the executions to complete and then returns the result.\n\n#! eval: true\nprint(\"Define our download_futures function\")\n\n@timethis\ndef download_futures(table):\n results = []\n for row in table.iterrows():\n result = download_file_parsl(row)\n print(result)\n results.append(result)\n return(results)\n\n@timethis\ndef wait_for_futures(table):\n results = download_futures(table)\n done = [app_future.result() for app_future in results]\n print(done)\n\nwait_for_futures(file_table[0:5])\n\nDefine our download_futures function\n<AppFuture at 0x7f09148aae30 state=pending>\n<AppFuture at 0x7f09148a98d0 state=pending>\n<AppFuture at 0x7f0917e51120 state=pending>\n<AppFuture at 0x7f0917e511e0 state=pending>\n<AppFuture at 0x7f09145288e0 state=pending>\ndownload_futures: 10.675907135009766 ms\n['SWI_2007.tif', 'SWI_2019.tif', 'SWI_2021.tif', 'SWI_2020.tif', 'SWI_2001.tif']\nwait_for_futures: 43896.12054824829 ms\n\n\nWhen we’re done, be sure to clean up and shutdown the htex_local executor, or it will continue to persist in your environment and utilize resources. Generally, an executor should be created when setting up your environment, and then it can be used repeatedly for many different tasks.\n\nhtex_local.executors[0].shutdown()\nparsl.clear()", + "crumbs": [ + "4  Pleasingly Parallel Programming" + ] + }, + { + "objectID": "sections/parallel-programming.html#are-we-done-yet", + "href": "sections/parallel-programming.html#are-we-done-yet", + "title": "4  Pleasingly Parallel Programming", + "section": "4.10 Are we done yet?", + "text": "4.10 Are we done yet?\nParsl and other concurrency libraries generally provide both blocking and non-blocking methods for accessing the results of asycnchronous method calls. By blocking, we are referring to methods that wait for the asynchronous opearation to complete before returning results, which blocks execution of the rest of a program. By non-blocking, we are referring to methods that return the result if it is available, but not if the async method hasn’t completed yet. In that case, it lets the rest of the program to continue execution.\nIn practice this means that we can either 1) wait for all async calls to complete, and then process them using the blocking methods, or 2) query with a non-blocking method to see if each async call is complete, and only then retrieve the results for that method. We illustrate this approach below with parsl.\n\n# Required packages\nimport parsl\nfrom parsl import python_app\nfrom parsl.config import Config\nfrom parsl.executors import HighThroughputExecutor\nfrom parsl.providers import LocalProvider\n\n# Configure the parsl executor\nactivate_env = 'workon scomp'\nhtex_local = Config(\n executors=[\n HighThroughputExecutor(\n max_workers=5,\n provider=LocalProvider(\n worker_init=activate_env\n )\n )\n ],\n)\nparsl.clear()\nparsl.load(htex_local)\n\n<parsl.dataflow.dflow.DataFlowKernel at 0x7f0959b47c10>\n\n\nDefine the task we want to run.\n\n@python_app\ndef do_stuff(x):\n import time\n time.sleep(1)\n return x**2\n\nAnd now execute the tasks with some sleeps to see which are blocking and which are completed.\n\nimport time\nall_futures = []\nfor x in range(0,10):\n future = do_stuff(x)\n all_futures.append(future)\n print(future)\n\ntime.sleep(3)\n\nfor future in all_futures:\n print(\"Checking: \", future)\n if (future.done()):\n print(\"Do more with result: \", future.result())\n else:\n print(\"Sorry, come back later.\")\n\n<AppFuture at 0x7f0914528fd0 state=pending>\n<AppFuture at 0x7f0917e51510 state=pending>\n<AppFuture at 0x7f091452abc0 state=pending>\n<AppFuture at 0x7f0914529450 state=pending>\n<AppFuture at 0x7f0914528e80 state=pending>\n<AppFuture at 0x7f091452a7a0 state=pending>\n<AppFuture at 0x7f091452ac20 state=pending>\n<AppFuture at 0x7f0914528c70 state=pending>\n<AppFuture at 0x7f0917e53e80 state=pending>\n<AppFuture at 0x7f09148aaa10 state=pending>\nChecking: <AppFuture at 0x7f0914528fd0 state=pending>\nSorry, come back later.\nChecking: <AppFuture at 0x7f0917e51510 state=pending>\nSorry, come back later.\nChecking: <AppFuture at 0x7f091452abc0 state=pending>\nSorry, come back later.\nChecking: <AppFuture at 0x7f0914529450 state=pending>\nSorry, come back later.\nChecking: <AppFuture at 0x7f0914528e80 state=pending>\nSorry, come back later.\nChecking: <AppFuture at 0x7f091452a7a0 state=pending>\nSorry, come back later.\nChecking: <AppFuture at 0x7f091452ac20 state=pending>\nSorry, come back later.\nChecking: <AppFuture at 0x7f0914528c70 state=pending>\nSorry, come back later.\nChecking: <AppFuture at 0x7f0917e53e80 state=pending>\nSorry, come back later.\nChecking: <AppFuture at 0x7f09148aaa10 state=pending>\nSorry, come back later.\n\n\nNotice in particular that about half of the jobs are not done yet.", "crumbs": [ "4  Pleasingly Parallel Programming" ] @@ -373,8 +383,8 @@ "objectID": "sections/parallel-programming.html#when-to-parallelize", "href": "sections/parallel-programming.html#when-to-parallelize", "title": "4  Pleasingly Parallel Programming", - "section": "4.10 When to parallelize", - "text": "4.10 When to parallelize\nIt’s not as simple as it may seem. While in theory each added processor would linearly increase the throughput of a computation, there is overhead that reduces that efficiency. For example, the code and, importantly, the data need to be copied to each additional CPU, and this takes time and bandwidth. Plus, new processes and/or threads need to be created by the operating system, which also takes time. This overhead reduces the efficiency enough that realistic performance gains are much less than theoretical, and usually do not scale linearly as a function of processing power. For example, if the time that a computation takes is short, then the overhead of setting up these additional resources may actually overwhelm any advantages of the additional processing power, and the computation could potentially take longer!\nIn addition, not all of a task can be parallelized. Depending on the proportion, the expected speedup can be significantly reduced. Some propose that this may follow Amdahl’s Law, where the speedup of the computation (y-axis) is a function of both the number of cores (x-axis) and the proportion of the computation that can be parallelized (see colored lines):\n\n\n\nAmdahl’s Law\n\n\nSo, it is important to evaluate the computational efficiency of requests, and work to ensure that additional compute resources brought to bear will pay off in terms of increased work being done.", + "section": "4.11 When to parallelize", + "text": "4.11 When to parallelize\nIt’s not as simple as it may seem. While in theory each added processor would linearly increase the throughput of a computation, there is overhead that reduces that efficiency. For example, the code and, importantly, the data need to be copied to each additional CPU, and this takes time and bandwidth. Plus, new processes and/or threads need to be created by the operating system, which also takes time. This overhead reduces the efficiency enough that realistic performance gains are much less than theoretical, and usually do not scale linearly as a function of processing power. For example, if the time that a computation takes is short, then the overhead of setting up these additional resources may actually overwhelm any advantages of the additional processing power, and the computation could potentially take longer!\nIn addition, not all of a task can be parallelized. Depending on the proportion, the expected speedup can be significantly reduced. Some propose that this may follow Amdahl’s Law, where the speedup of the computation (y-axis) is a function of both the number of cores (x-axis) and the proportion of the computation that can be parallelized (see colored lines):\n\n\n\nAmdahl’s Law\n\n\nSo, it is important to evaluate the computational efficiency of requests, and work to ensure that additional compute resources brought to bear will pay off in terms of increased work being done.", "crumbs": [ "4  Pleasingly Parallel Programming" ] @@ -383,8 +393,8 @@ "objectID": "sections/parallel-programming.html#parallel-pitfalls", "href": "sections/parallel-programming.html#parallel-pitfalls", "title": "4  Pleasingly Parallel Programming", - "section": "4.11 Parallel Pitfalls", - "text": "4.11 Parallel Pitfalls\nA set of tasks is considered ‘pleasingly parallel’ when large portions of the code can be executed indpendently of the other portions and have few or no dependencies on other parts of the execution. This situation is common, and we can frequently execute parallel tasks on independent subsets of our data. Nevertheless, dependencies among different parts of your computation can definitely create bottlenecks and slow down computations. Some of the challenges you may need to work around include:\n\nTask dependencies: occur when one task in the code depends on the results of another task or computation in the code.\nRace conditions: occur when two tasks execute in parallel, but produce different results based on which task finishes first. Ensuring that results are correct under different timing situations requires careful testing.\nDeadlocks: occur when two concurrent tasks block on the output of the other. Deadlocks cause parallel programs to lock up indefinitely, and can be difficult to track down.\n\nEven when tasks exhibit strong dependencies, it is frequently possible to still optimize that code by parallelizing explicit code sections, sometimes bringing other concurrency tools into the mix, such as the Message Passing Interface (MPI). But simply improving the efficiency of pleasingly parallel tasks can be liberating.", + "section": "4.12 Parallel Pitfalls", + "text": "4.12 Parallel Pitfalls\nA set of tasks is considered ‘pleasingly parallel’ when large portions of the code can be executed indpendently of the other portions and have few or no dependencies on other parts of the execution. This situation is common, and we can frequently execute parallel tasks on independent subsets of our data. Nevertheless, dependencies among different parts of your computation can definitely create bottlenecks and slow down computations. Some of the challenges you may need to work around include:\n\nTask dependencies: occur when one task in the code depends on the results of another task or computation in the code.\nRace conditions: occur when two tasks execute in parallel, but produce different results based on which task finishes first. Ensuring that results are correct under different timing situations requires careful testing.\nDeadlocks: occur when two concurrent tasks block on the output of the other. Deadlocks cause parallel programs to lock up indefinitely, and can be difficult to track down.\n\nEven when tasks exhibit strong dependencies, it is frequently possible to still optimize that code by parallelizing explicit code sections, sometimes bringing other concurrency tools into the mix, such as the Message Passing Interface (MPI). But simply improving the efficiency of pleasingly parallel tasks can be liberating.", "crumbs": [ "4  Pleasingly Parallel Programming" ] @@ -393,8 +403,8 @@ "objectID": "sections/parallel-programming.html#summary", "href": "sections/parallel-programming.html#summary", "title": "4  Pleasingly Parallel Programming", - "section": "4.12 Summary", - "text": "4.12 Summary\nIn this lesson, we showed examples of computing tasks that are likely limited by the number of CPU cores that can be applied, and we reviewed the architecture of computers to understand the relationship between CPU processors and cores. Next, we reviewed the way in which traditional sequential loops can be rewritten as functions that are applied to a list of inputs both serially and in parallel to utilize multiple cores to speed up computations. We reviewed the challenges of optimizing code, where one must constantly examine the bottlenecks that arise as we improve cpu-bound, I/O bound,and memory bound computations.", + "section": "4.13 Summary", + "text": "4.13 Summary\nIn this lesson, we showed examples of computing tasks that are likely limited by the number of CPU cores that can be applied, and we reviewed the architecture of computers to understand the relationship between CPU processors and cores. Next, we reviewed the way in which traditional sequential loops can be rewritten as functions that are applied to a list of inputs both serially and in parallel to utilize multiple cores to speed up computations. We reviewed the challenges of optimizing code, where one must constantly examine the bottlenecks that arise as we improve cpu-bound, I/O bound,and memory bound computations.", "crumbs": [ "4  Pleasingly Parallel Programming" ] @@ -403,8 +413,8 @@ "objectID": "sections/parallel-programming.html#further-reading", "href": "sections/parallel-programming.html#further-reading", "title": "4  Pleasingly Parallel Programming", - "section": "4.13 Further Reading", - "text": "4.13 Further Reading\nRyan Abernathey & Joe Hamman. 2020. Closed Platforms vs. Open Architectures for Cloud-Native Earth System Analytics. Medium.", + "section": "4.14 Further Reading", + "text": "4.14 Further Reading\nRyan Abernathey & Joe Hamman. 2020. Closed Platforms vs. Open Architectures for Cloud-Native Earth System Analytics. Medium.", "crumbs": [ "4  Pleasingly Parallel Programming" ] @@ -884,7 +894,7 @@ "href": "sections/software-design-1.html#locks", "title": "12  Software Design I: Functions and Concurrency", "section": "12.7 Locks", - "text": "12.7 Locks\nLocks are a mechanism to manage access to a resource so that multiple threads can access the resource. By adding locks to an otherwise parallel process, we introduce a degree of serial execution to the locked portion of the process. Basically, each thread can only access the resource when it has the lock, and only one lock is given out at a time. Take this example of what happens without locking:\n\nfrom concurrent.futures import ProcessPoolExecutor\nimport time\n\ndef hello(i):\n print(i, 'Hello')\n print(i, 'world')\n\nexecutor = ProcessPoolExecutor()\nfutures = [executor.submit(hello, i) for i in range(3)]\nfor future in futures:\n future.result()\n\n102 HelloHelloHello\n\n\n02 1 world world\nworld\n\n\n\nYou can see that the results come back in a semi-random order, and the call to sleep creates a delay between printing the two words, which means that the three messages get jumbled when printed. To fix this, we can introduce a lock from the multiprocessing package.\n\nfrom concurrent.futures import ProcessPoolExecutor\nimport time\nimport multiprocessing\n\ndef hello(i, lock):\n with lock:\n print(i, 'Hello')\n print(i, 'world')\n\nlock = multiprocessing.Manager().Lock()\nexecutor = ProcessPoolExecutor()\nfutures = [executor.submit(hello, i, lock) for i in range(3)]\nfor future in futures:\n future.result()\n\n0 Hello\n0 world\n1 Hello\n1 world\n2 Hello\n2 world\n\n\nThe lock is generated and then passed to each invocation of the hello function. Using with triggers the use of the context manager for the lock, which allows the manager to synchronize use of the lock. This ensures that only one process can be printing at the same time, ensuring that the outputs are properly ordered.\n\n\n\n\n\n\nWarning\n\n\n\nSynchronizing with a Lock turns a parallel process back into a serial process, at least while the lock is in use. So use with care lest you lose all benefits of concurrency.", + "text": "12.7 Locks\nLocks are a mechanism to manage access to a resource so that multiple threads can access the resource. By adding locks to an otherwise parallel process, we introduce a degree of serial execution to the locked portion of the process. Basically, each thread can only access the resource when it has the lock, and only one lock is given out at a time. Take this example of what happens without locking:\n\nfrom concurrent.futures import ProcessPoolExecutor\nimport time\n\ndef hello(i):\n print(i, 'Hello')\n print(i, 'world')\n\nexecutor = ProcessPoolExecutor()\nfutures = [executor.submit(hello, i) for i in range(3)]\nfor future in futures:\n future.result()\n\n012 Hello Hello\nHello\n1\n02 world world\nworld\n\n\n\nYou can see that the results come back in a semi-random order, and the call to sleep creates a delay between printing the two words, which means that the three messages get jumbled when printed. To fix this, we can introduce a lock from the multiprocessing package.\n\nfrom concurrent.futures import ProcessPoolExecutor\nimport time\nimport multiprocessing\n\ndef hello(i, lock):\n with lock:\n print(i, 'Hello')\n print(i, 'world')\n\nlock = multiprocessing.Manager().Lock()\nexecutor = ProcessPoolExecutor()\nfutures = [executor.submit(hello, i, lock) for i in range(3)]\nfor future in futures:\n future.result()\n\n1 Hello\n1 world\n0 Hello\n0 world\n2 Hello\n2 world\n\n\nThe lock is generated and then passed to each invocation of the hello function. Using with triggers the use of the context manager for the lock, which allows the manager to synchronize use of the lock. This ensures that only one process can be printing at the same time, ensuring that the outputs are properly ordered.\n\n\n\n\n\n\nWarning\n\n\n\nSynchronizing with a Lock turns a parallel process back into a serial process, at least while the lock is in use. So use with care lest you lose all benefits of concurrency.", "crumbs": [ "12  Software Design I: Functions and Concurrency" ] diff --git a/2024-03-arctic/sections/data-structures-netcdf.html b/2024-03-arctic/sections/data-structures-netcdf.html index 924b67bc..59252095 100644 --- a/2024-03-arctic/sections/data-structures-netcdf.html +++ b/2024-03-arctic/sections/data-structures-netcdf.html @@ -527,7 +527,7 @@

5.4.1.1 Create an xarray.DataArray

Let’s suppose we want to make an xarray.DataArray that includes the information from our previous exercise about measuring temperature across three days. First, we import all the necessary libraries.

-
+
import os              
 import urllib 
 import pandas as pd
@@ -537,7 +537,7 @@ 

+
# values of a single variable at each point of the coords 
 temp_data = np.array([np.zeros((5,5)), 
                       np.ones((5,5)), 
@@ -580,7 +580,7 @@ 

+
# names of the dimensions in the required order
 dims = ('time', 'lat', 'lon')
 
@@ -591,7 +591,7 @@ 

+
# attributes (metadata) of the data array 
 attrs = { 'title' : 'temperature across weather stations',
           'standard_name' : 'air_temperature',
@@ -599,7 +599,7 @@ 

+
# initialize xarray.DataArray
 temp = xr.DataArray(data = temp_data, 
                     dims = dims,
@@ -995,7 +995,7 @@ 
    • time
      (time)
      datetime64[ns]
      2022-09-01 2022-09-02 2022-09-03
      array(['2022-09-01T00:00:00.000000000', '2022-09-02T00:00:00.000000000',
      +       '2022-09-03T00:00:00.000000000'], dtype='datetime64[ns]')
    • lat
      (lat)
      int64
      70 60 50 40 30
      array([70, 60, 50, 40, 30])
    • lon
      (lon)
      int64
      60 70 80 90 100
      array([ 60,  70,  80,  90, 100])
    • time
      PandasIndex
      PandasIndex(DatetimeIndex(['2022-09-01', '2022-09-02', '2022-09-03'], dtype='datetime64[ns]', name='time', freq='D'))
    • lat
      PandasIndex
      PandasIndex(Index([70, 60, 50, 40, 30], dtype='int64', name='lat'))
    • lon
      PandasIndex
      PandasIndex(Index([60, 70, 80, 90, 100], dtype='int64', name='lon'))
  • title :
    temperature across weather stations
    standard_name :
    air_temperature
    units :
    degree_c
  • We can also update the variable’s attributes after creating the object. Notice that each of the coordinates is also an xarray.DataArray, so we can add attributes to them.

    -
    +
    # update attributes
     temp.attrs['description'] = 'simple example of an xarray.DataArray'
     
    @@ -1419,7 +1419,7 @@ 
    • time
      (time)
      datetime64[ns]
      2022-09-01 2022-09-02 2022-09-03
      description :
      date of measurement
      array(['2022-09-01T00:00:00.000000000', '2022-09-02T00:00:00.000000000',
      +       '2022-09-03T00:00:00.000000000'], dtype='datetime64[ns]')
    • lat
      (lat)
      int64
      70 60 50 40 30
      standard_name :
      grid_latitude
      units :
      degree_N
      array([70, 60, 50, 40, 30])
    • lon
      (lon)
      int64
      60 70 80 90 100
      standard_name :
      grid_longitude
      units :
      degree_E
      array([ 60,  70,  80,  90, 100])
    • time
      PandasIndex
      PandasIndex(DatetimeIndex(['2022-09-01', '2022-09-02', '2022-09-03'], dtype='datetime64[ns]', name='time', freq='D'))
    • lat
      PandasIndex
      PandasIndex(Index([70, 60, 50, 40, 30], dtype='int64', name='lat'))
    • lon
      PandasIndex
      PandasIndex(Index([60, 70, 80, 90, 100], dtype='int64', name='lon'))
  • title :
    temperature across weather stations
    standard_name :
    air_temperature
    units :
    degree_c
    description :
    simple example of an xarray.DataArray
  • At this point, since we have a single variable, the dataset attributes and the variable attributes are the same.

    @@ -1445,7 +1445,7 @@

    5.4.1.2 Indexing

    An xarray.DataArray allows both positional indexing (like numpy) and label-based indexing (like pandas). Positional indexing is the most basic, and it’s done using Python’s [] syntax, as in array[i,j] with i and j both integers. Label-based indexing takes advantage of dimensions in the array having names and coordinate values that we can use to access data instead of remembering the positional order of each dimension.

    As an example, suppose we want to know what was the temperature recorded by the weather station located at 40°0′N 80°0′E on September 1st, 2022. By recalling all the information about how the array is setup with respect to the dimensions and coordinates, we can access this data positionally:

    -
    +
    temp[0,1,2]
    @@ -1821,11 +1821,11 @@

    + description: simple example of an xarray.DataArray

    Or, we can use the dimensions names and their coordinates to access the same value:

    -
    +
    temp.sel(time='2022-09-01', lat=40, lon=80)
    @@ -2201,7 +2201,7 @@

    + description: simple example of an xarray.DataArray

    Notice that the result of this indexing is a 1x1 xarray.DataArray. This is because operations on an xarray.DataArray (resp. xarray.DataSet) always return another xarray.DataArray (resp. xarray.DataSet). In particular, operations returning scalar values will also produce xarray objects, so we need to cast them as numbers manually. See xarray.DataArray.item.

    @@ -2210,7 +2210,7 @@

    5.4.1.3 Reduction

    xarray has implemented several methods to reduce an xarray.DataArray along any number of dimensions. One of the advantages of xarray.DataArray is that, if we choose to, it can carry over attributes when doing calculations. For example, we can calculate the average temperature at each weather station over time and obtain a new xarray.DataArray.

    -
    +
    avg_temp = temp.mean(dim = 'time') 
     # to keep attributes add keep_attrs = True
     
    @@ -2590,11 +2590,11 @@ 

    • lat
      (lat)
      int64
      70 60 50 40 30
      standard_name :
      grid_latitude
      units :
      degree_N
      array([70, 60, 50, 40, 30])
    • lon
      (lon)
      int64
      60 70 80 90 100
      standard_name :
      grid_longitude
      units :
      degree_E
      array([ 60,  70,  80,  90, 100])
    • lat
      PandasIndex
      PandasIndex(Index([70, 60, 50, 40, 30], dtype='int64', name='lat'))
    • lon
      PandasIndex
      PandasIndex(Index([60, 70, 80, 90, 100], dtype='int64', name='lon'))
  • title :
    average temperature over three days
  • More about xarray computations.

    @@ -2606,7 +2606,7 @@

    5.4.2.1 Create an xarray.DataSet

    Following our previous example, we can create an xarray.DataSet by combining the temperature data with the average temperature data. We also add some attributes that now describe the whole dataset, not only each variable.

    -
    +
    # make dictionaries with variables and attributes
     data_vars = {'avg_temp': avg_temp,
                 'temp': temp}
    @@ -2624,7 +2624,7 @@ 

    +
    temp_dataset
    @@ -3001,12 +3001,12 @@

    • lat
      PandasIndex
      PandasIndex(Index([70, 60, 50, 40, 30], dtype='int64', name='lat'))
    • lon
      PandasIndex
      PandasIndex(Index([60, 70, 80, 90, 100], dtype='int64', name='lon'))
    • time
      PandasIndex
      PandasIndex(DatetimeIndex(['2022-09-01', '2022-09-02', '2022-09-03'], dtype='datetime64[ns]', name='time', freq='D'))
  • title :
    temperature data at weather stations: daily and and average
    description :
    simple example of an xarray.Dataset
  • 5.4.2.2 Save and Reopen

    Finally, we want to save our dataset as a NetCDF file. To do this, specify the file path and use the .nc extension for the file name. Then save the dataset using the to_netcdf method with your file path. Opening NetCDF is similarly straightforward using xarray.open_dataset().

    -
    +
    # specify file path: don't forget the .nc extension!
     fp = os.path.join(os.getcwd(),'temp_dataset.nc') 
     # save file
    @@ -3413,8 +3413,8 @@ 

    + description: simple example of an xarray.Dataset
    @@ -3422,12 +3422,12 @@

    5.4.3 Exercise

    For this exercise, we will use a dataset including time series of annual Arctic freshwater fluxes and storage terms. The data was produced for the publication Jahn and Laiho, 2020 about changes in the Arctic freshwater budget and is archived at the Arctic Data Center doi:10.18739/A2280504J

    -
    +
    url = 'https://arcticdata.io/metacat/d1/mn/v2/object/urn%3Auuid%3A792bfc37-416e-409e-80b1-fdef8ab60033'
     
     msg = urllib.request.urlretrieve(url, "FW_data_CESM_LW_2006_2100.nc")
    -
    +
    fp = os.path.join(os.getcwd(),'FW_data_CESM_LW_2006_2100.nc')
     fw_data = xr.open_dataset(fp)
     fw_data
    @@ -3819,7 +3819,7 @@

  • creation_date :
    02-Jun-2020 15:38:31
    author :
    Alexandra Jahn, CU Boulder, alexandra.jahn@colorado.edu
    title :
    Annual timeseries of freshwater data from the CESM Low Warming Ensemble
    description :
    Annual mean Freshwater (FW) fluxes and storage relative to 34.8 shown in Jahn and Laiho, GRL, 2020, calculated from the 11-member Community Earth System Model (CESM) Low Warming Ensemble output (Sanderson et al., 2017, Earth Syst. Dynam., 8, 827-847. doi: 10.5194/esd-8-827-2017). These 11 ensemble members were branched from the first 11 ensemble members of the CESM Large Ensemble (companion data file) at the end of 2005. Convention for the fluxes is that positive fluxes signify a source of FW to the Arctic and negative fluxes are a sink/export of FW for the Arctic. FW fluxes are the net fluxes through a strait over the full ocean depth, adding up any positive and negative fluxes. Liquid FW storage is calculated over the full depth of the ocean but ignoring any negative FW (resulting from salinties over 34.8). Solid FW storage includes FW stored in sea ice and FW stored in snow on sea ice. Surface fluxes and FW storage is calculated over the Arctic domain bounded by Barrow Strait, Nares Strait, Bering Strait, Fram Strait, and the Barents Sea Opeing (BSO). Davis Strait fluxes are included for reference only and are outside of the Arctic domain. A map showing the domain and the location of the straits can be found in Jahn and Laiho, GRL, 2020.
    data_structure :
    The data structure is |Ensemble member | Time (in years)|. All data are annual means. The data covers 2006-2100. There are 11 ensemble members.
    1. @@ -3858,11 +3858,11 @@

      We can see in the object viewer that the runoff_annual variable has two dimensions: time and member, in that order. We can also access the dimensions by calling:

      -
      +
      fw_data.runoff_annual.dims

      The second dimensions is member. Near the top of the object viewer, under coordinates, we can see that that member’s coordinates is an array from 1 to 11. We can directly see this array by calling:

      -
      +
      fw_data.member
      @@ -3883,7 +3883,7 @@

      -
      +
      member2 = fw_data.netPrec_annual.sel(member=2)
      @@ -3905,7 +3905,7 @@

      Based on our previous answer, this maximum is:

      -
      +
      x_max = member2.loc[2022:2100].max()
       x_max.item()
      @@ -3934,10 +3934,10 @@

      5.5.2 pandas to xarray

      What does the previous example look like when working with pandas and xarray?

      Let’s work with a csv file containing the previous temperature measurements. Essentially, we need to read this file as a pandas.DataFrame and then use the pandas.DataFrame.to_xarray() method, taking into account that the dimensions of the resulting xarray.DataSet will be formed using the index column(s) of the pandas.DataFrame. In this case, we know the first three columns will be our dimension columns, so we need to group them as a multindex for the pandas.DataFrame. We can do this by using the index_col argument directly when we read in the csv file.

      -
      +
      fp = os.path.join(os.getcwd(),'netcdf_temp_data.csv') 
      -
      +
      # specify columns representing dimensions
       dimension_columns = [0,1,2]
       
      @@ -4011,7 +4011,7 @@ 

      And this is our resulting xarray.DataSet:

      -
      +
      temp.to_xarray()
      @@ -4384,11 +4384,11 @@

      • time
        PandasIndex
        PandasIndex(Index(['2022-09-01', '2022-09-02'], dtype='object', name='time'))
      • longitude
        PandasIndex
        PandasIndex(Index([30, 40], dtype='int64', name='longitude'))
      • latitude
        PandasIndex
        PandasIndex(Index([60, 70], dtype='int64', name='latitude'))
    2. For further reading and examples about switching between pandas and xarray you can visit the following:

      diff --git a/2024-03-arctic/sections/geopandas.html b/2024-03-arctic/sections/geopandas.html index b85da685..d34e39a0 100644 --- a/2024-03-arctic/sections/geopandas.html +++ b/2024-03-arctic/sections/geopandas.html @@ -336,7 +336,7 @@

      9.3 Pre-processing raster data

      First we need to load in our libraries. We’ll use geopandas for vector manipulation, rasterio for raster maniupulation.

      First, we’ll use requests to download the ship traffic raster from Kapsar et al.. We grab a one month slice from August, 2020 of a coastal subset of data with 1km resolution. To get the URL in the code chunk below, you can right click the download button for the file of interest and select “copy link address.”

      -
      +
      import urllib
       
       url = 'https://arcticdata.io/metacat/d1/mn/v2/object/urn%3Auuid%3A6b847ab0-9a3d-4534-bf28-3a96c5fa8d72'
      @@ -344,7 +344,7 @@ 

      msg = urllib.request.urlretrieve(url, "Coastal_2020_08.tif")

      Using rasterio, open the raster file, plot it, and look at the metadata. We use the with here as a context manager. This ensures that the connection to the raster file is closed and cleaned up when we are done with it.

      -
      +
      import rasterio
       import matplotlib.pyplot as plt
       
      @@ -368,13 +368,13 @@ 

      +
      type(ships)
      numpy.ndarray
      -
      +
      type(ships_meta)
      rasterio.profiles.Profile
      @@ -402,20 +402,20 @@

      9.4 Pre-processing vector data

      Now download a vector dataset of commercial fishing districts in Alaska.

      -
      +
      url = 'https://knb.ecoinformatics.org/knb/d1/mn/v2/object/urn%3Auuid%3A7c942c45-1539-4d47-b429-205499f0f3e4'
       
       msg = urllib.request.urlretrieve(url, "Alaska_Commercial_Salmon_Boundaries.gpkg")

      Read in the data using geopandas.

      -
      +
      import geopandas as gpd
       
       comm = gpd.read_file("Alaska_Commercial_Salmon_Boundaries.gpkg")

      Note the “pandas” in the library name “geopandas.” Our comm object is really just a special type of pandas data frame called a geodataframe. This means that in addition to any geospatial stuff we need to do, we can also just do regular pandas things on this data frame.

      For example, we can get a list of column names (there are a lot!)

      -
      +
      comm.columns.values
      array(['OBJECTID', 'GEOMETRY_START_DATE', 'GEOMETRY_END_DATE',
      @@ -433,7 +433,7 @@ 

      +
      comm.head()
      @@ -600,7 +600,7 @@

      Source: Leah A. Wasser, Megan A. Jones, Zack Brym, Kristina Riemer, Jason Williams, Jeff Hollister, Mike Smorul. Raster 00: Intro to Raster Data in R. National Evologial Observatory Network (NEON).

      The diagram above shows the three different types of geometries that geospatial vector data can take, points, lines or polygons. Whatever the geometry type, the geometry information (the x,y points) is stored in the column named geometry in the geopandas data frame. In this example, we have a dataset containing polygons of fishing districts. Each row in the dataset corresponds to a district, with unique attributes (the other columns in the dataset), and its own set of points defining the boundaries of the district, contained in the geometry column.

      -
      +
      comm['geometry'][:5]
      0    MULTIPOLYGON (((-151.32805 64.96913, -151.3150...
      @@ -612,7 +612,7 @@ 

      +
      comm.crs
      <Geographic 2D CRS: EPSG:4326>
      @@ -629,7 +629,7 @@ 

      +
      comm['geometry'][8]
      @@ -640,7 +640,7 @@

      +
      comm.plot(figsize=(9,9))
      @@ -651,7 +651,7 @@

      +
      comm_3338 = comm.to_crs("EPSG:3338")
       
       comm_3338.plot()
      @@ -669,7 +669,7 @@

      9.5 Crop data to area of interest

      For this example, we are only interested in south central Alaska, encompassing Prince William Sound, Cook Inlet, and Kodiak. Our raster data is significantly larger than that, and the vector data is statewide. So, as a first step we might want to crop our data to the area of interest.

      First, we’ll need to create a bounding box. We use the box function from shapely to create the bounding box, then create a geoDataFrame from the points, and finally convert the WGS 84 coordinates to the Alaska Albers projection.

      -
      +
      from shapely.geometry import box
       
       coord_box = box(-159.5, 55, -144.5, 62)
      @@ -679,7 +679,7 @@ 

      geometry = [coord_box]).to_crs("EPSG:3338")

      Now, we can read in raster again cropped to bounding box. We use the mask function from rasterio.mask. Note that we apply this to the connection to the raster file (with rasterio.open(...)), then update the metadata associated with the raster, because the mask function requires as its first dataset argument a dataset object opened in r mode.

      -
      +
      import rasterio.mask
       import numpy as np
       
      @@ -693,7 +693,7 @@ 

      # turn the no-data values into NaNs. shipc_arr[shipc_arr == ship_con.nodata] = np.nan

      -
      +
      shipc_meta.update({"driver": "GTiff",
                        "height": shipc_arr.shape[0],
                        "width": shipc_arr.shape[1],
      @@ -701,7 +701,7 @@ 

      "compress": "lzw"})

      Now we’ll do a similar task with the vector data. Tin this case, we use a spatial join. The join will be an inner join, and select only rows from the left side (our fishing districts) that are within the right side (our bounding box). I chose this method as opposed to a clipping type operation because it ensures that we don’t end up with any open polygons at the boundaries of our box, which could cause problems for us down the road.

      -
      +
      comm_clip = gpd.sjoin(comm_3338,
                             coord_box_df,
                             how='inner',
      @@ -710,7 +710,7 @@ 

      9.5.1 Check extents

      Now let’s look at the two cropped datasets overlaid on each other to ensure that the extents look right.

      -
      +
      import rasterio.plot
       
       # set up plot
      @@ -741,7 +741,7 @@ 

      9.6.0.1 Zonal statistics over one polygon

      Let’s look at how this works over just one fishing area first. We use the rasterize method from the features module in rasterio. This takes as arguments the data to rasterize (in this case the 40th row of our dataset), and the shape and transform the output raster will take. We also set the all_touched argument to true, which means any pixel that touches a boundary of our vector will be burned into the mask.

      -
      +
      from rasterio import features
       
       r40 = features.rasterize(comm_clip['geometry'][40].geoms,
      @@ -750,7 +750,7 @@ 

      all_touched=True)

      If we have a look at a plot of our rasterized version of the single fishing district, we can see that instead of a vector, we now have a raster showing the rasterized district (with pixel values of 1) and any area not in the district has a pixel value of 0.

      -
      +
      # set up plot
       fig, ax = plt.subplots(figsize=(7, 7))
       # plot the raster
      @@ -770,14 +770,14 @@ 

      +
      np.unique(r40)
      array([0, 1], dtype=uint8)

      Finally, we need to know the indices where the fishing district is. We can use np.where to extract this information

      -
      +
      r40_index = np.where(r40 == 1)
       print(r40_index)
      @@ -797,7 +797,7 @@

      +
      np.nansum(shipc_arr[r40_index])
      14369028.0
      @@ -807,11 +807,11 @@

      9.6.0.2 Zonal statistics over all polygons

      -
      +
      comm_clip['id'] = range(0,len(comm_clip))

      For each district (with geometry and id), we run the features.rasterize function. Then we calculate the sum of the values of the shipping raster based on the indices in the raster where the district is located.

      -
      +
      distance_dict = {}
       for geom, idx in zip(comm_clip['geometry'], comm_clip['id']):
           rasterized = features.rasterize(geom.geoms,
      @@ -823,7 +823,7 @@ 

      distance_dict[idx] = np.nansum(shipc_arr[r_index])

      Now we just create a data frame from that dictionary, and join it to the vector data using pandas operations.

      -
      +
      import pandas as pd
       
       # create a data frame from the result
      @@ -836,14 +836,14 @@ 

      distance_df['distance'] = distance_df['distance']/1000

      Now we join the result to the original geodataframe.

      -
      +
      # join the sums to the original data frame
       res_full = comm_clip.merge(distance_df,
                                  on = "id",
                                  how = 'inner')

      Finally, we can plot our result!

      -
      +
      import matplotlib.ticker
       fig, ax = plt.subplots(figsize=(7, 7))
       
      @@ -866,13 +866,13 @@ 

      +
      reg_area = res_full.dissolve(by = "REGISTRATION_AREA_NAME", 
                                    aggfunc = 'sum',
                                    numeric_only = True)

      Let’s have a look at the same plot as before, but this time over our aggregated data.

      -
      +
      fig, ax = plt.subplots(figsize=(7, 7))
       
       ax = reg_area.plot(column = "distance", legend = True, ax = ax)
      diff --git a/2024-03-arctic/sections/parallel-programming.html b/2024-03-arctic/sections/parallel-programming.html
      index da14178c..645c08e1 100644
      --- a/2024-03-arctic/sections/parallel-programming.html
      +++ b/2024-03-arctic/sections/parallel-programming.html
      @@ -279,10 +279,11 @@ 

      Table of contents

    3. 4.8.3 Multi-process with concurrent.futures
    4. 4.9 Parallel processing with parsl
    5. -
    6. 4.10 When to parallelize
    7. -
    8. 4.11 Parallel Pitfalls
    9. -
    10. 4.12 Summary
    11. -
    12. 4.13 Further Reading
    13. +
    14. 4.10 Are we done yet?
    15. +
    16. 4.11 When to parallelize
    17. +
    18. 4.12 Parallel Pitfalls
    19. +
    20. 4.13 Summary
    21. +
    22. 4.14 Further Reading
    23. @@ -449,14 +450,14 @@

      4.7 Task parallelization in Python

      Python also provides a number of easy to use packages for concurrent processing. We will review two of these, concurrent.futures and parsl, to show just how easy it can be to parallelize your programs. concurrent.futures is built right into the python3 release, and is a great starting point for learning concurrency.

      We’re going to start with a task that is a little expensive to compute, and define it in a function. All this task(x) function does is to use numpy to create a fairly large range of numbers, and then sum them.

      -
      +
      def task(x):
           import numpy as np
           result = np.arange(x*10**8).sum()
           return result

      We can start by executing this task function serially ten times with varying inputs. In this case, we create a function run_serial that takes a list of inputs to be run, and it calls the task function for each of those inputs. The @timethis decorator is a simple way to wrap the function with timing code so that we can see how long it takes to execute.

      -
      +
      import numpy as np
       
       @timethis
      @@ -465,7 +466,7 @@ 

      run_serial(np.arange(10))

      -
      run_serial: 7156.259536743164 ms
      +
      run_serial: 7197.09587097168 ms
      [0,
      @@ -483,7 +484,7 @@ 

      +
      from concurrent.futures import ThreadPoolExecutor
       
       @timethis
      @@ -506,6 +507,19 @@ 

      🎉 4 seconds 🎉, which is about 6.25x faster than serial. Congratulations, you wrote your a multi-threaded python program!

      +

      But you may have also seen the thread pool wasn’t much faster at all. In that case, you may be encountering the python Global Interpreter Lock (GIL), and would be better off with using parallel processes. Let’s do that with ProcessPoolExecutor.

      +
      +
      from concurrent.futures import ProcessPoolExecutor
      +
      +@timethis
      +def run_processes(task_list):
      +    with ProcessPoolExecutor(max_workers=10) as executor:
      +        return executor.map(task, task_list)
      +
      +results = run_processes(np.arange(10))
      +[x for x in results]
      +
      +

      For me, that took about 8 seconds, about half the time of the serial case, but results can vary tremendously depending on what others are doing on the machine.

      4.8 Exercise: Parallel downloads

      @@ -521,7 +535,7 @@

      +
      @@ -571,32 +585,32 @@

      4.8.1 Serial

      When you have a list of repetitive tasks, you may be able to speed it up by adding more computing power. If each task is completely independent of the others, then it is pleasingly parallel and a prime candidate for executing those tasks in parallel, each on its own core. For example, let’s build a simple loop that downloads the data files that we need for an analysis. First, we start with the serial implementation.

      -
      -
      import urllib
      -
      -def download_file(row):
      -    service = "https://arcticdata.io/metacat/d1/mn/v2/object/"
      -    pid = row[1]['identifier']
      -    filename = row[1]['filename']
      -    url = service + pid
      -    print("Downloading: " + filename)
      -    msg = urllib.request.urlretrieve(url, filename)
      -    return filename
      -
      -@timethis
      -def download_serial(table):
      -    results = [download_file(row) for row in table.iterrows()]
      -    return results
      -    
      -results = download_serial(file_table[0:5])
      -print(results)
      +
      +
      import urllib
      +
      +def download_file(row):
      +    service = "https://arcticdata.io/metacat/d1/mn/v2/object/"
      +    pid = row[1]['identifier']
      +    filename = row[1]['filename']
      +    url = service + pid
      +    print("Downloading: " + filename)
      +    msg = urllib.request.urlretrieve(url, filename)
      +    return filename
      +
      +@timethis
      +def download_serial(table):
      +    results = [download_file(row) for row in table.iterrows()]
      +    return results
      +    
      +results = download_serial(file_table[0:5])
      +print(results)
      Downloading: SWI_2007.tif
       Downloading: SWI_2019.tif
       Downloading: SWI_2021.tif
       Downloading: SWI_2020.tif
       Downloading: SWI_2001.tif
      -download_serial: 102506.97588920593 ms
      +download_serial: 118949.22089576721 ms
       ['SWI_2007.tif', 'SWI_2019.tif', 'SWI_2021.tif', 'SWI_2020.tif', 'SWI_2001.tif']
      @@ -606,25 +620,25 @@

      4.8.2 Multi-threaded with concurrent.futures

      In this case, we’ll use the same download_file function from before, but let’s switch up and create a download_threaded() function to use concurrent.futures with a ThreadPoolExecutor just as we did earlier.

      -
      -
      from concurrent.futures import ThreadPoolExecutor
      -
      -@timethis
      -def download_threaded(table):
      -    with ThreadPoolExecutor(max_workers=15) as executor:
      -        results = executor.map(download_file, table.iterrows(), timeout=60)
      -        return results
      -
      -results = download_threaded(file_table[0:5])
      -for result in results:
      -    print(result)
      +
      +
      from concurrent.futures import ThreadPoolExecutor
      +
      +@timethis
      +def download_threaded(table):
      +    with ThreadPoolExecutor(max_workers=15) as executor:
      +        results = executor.map(download_file, table.iterrows(), timeout=60)
      +        return results
      +
      +results = download_threaded(file_table[0:5])
      +for result in results:
      +    print(result)
      Downloading: SWI_2007.tif
       Downloading: SWI_2019.tif
       Downloading: SWI_2021.tif
       Downloading: SWI_2020.tif
       Downloading: SWI_2001.tif
      -download_threaded: 18966.630458831787 ms
      +download_threaded: 40392.88139343262 ms
       SWI_2007.tif
       SWI_2019.tif
       SWI_2021.tif
      @@ -637,25 +651,25 @@ 

      4.8.3 Multi-process with concurrent.futures

      You’ll remember from earlier that you can execute tasks concurrently by creating multiple threads within one process (multi-threaded), or by creating and executing muliple processes. The latter creates more independence, as each of the executing tasks has their own memory and process space, but it also takes longer to set up. With concurrent.futures, we can switch to a multi-process pool by using a ProcessPoolExecutor, analogously to how we used ThreadPoolExecutor previously. So, some simple changes, and we’re running multiple processes.

      -
      -
      from concurrent.futures import ProcessPoolExecutor
      -
      -@timethis
      -def download_process(table):
      -    with ProcessPoolExecutor(max_workers=15) as executor:
      -        results = executor.map(download_file, table.iterrows(), timeout=60)
      -        return results
      -
      -results = download_process(file_table[0:5])
      -for result in results:
      -    print(result)
      +
      +
      from concurrent.futures import ProcessPoolExecutor
      +
      +@timethis
      +def download_process(table):
      +    with ProcessPoolExecutor(max_workers=15) as executor:
      +        results = executor.map(download_file, table.iterrows(), timeout=60)
      +        return results
      +
      +results = download_process(file_table[0:5])
      +for result in results:
      +    print(result)
      -
      Downloading: SWI_2007.tifDownloading: SWI_2019.tifDownloading: SWI_2020.tifDownloading: SWI_2021.tif
      -
      +
      Downloading: SWI_2007.tifDownloading: SWI_2019.tifDownloading: SWI_2021.tif
      +Downloading: SWI_2001.tif
      +Downloading: SWI_2020.tif
       
       
      -Downloading: SWI_2001.tif
      -download_process: 17364.819288253784 ms
      +download_process: 11409.16132926941 ms
       SWI_2007.tif
       SWI_2019.tif
       SWI_2021.tif
      @@ -671,92 +685,182 @@ 

      Parsl (docs), Dask, Ray, and others come into play. They all have their strengths, but Parsl makes it particularly easy to build parallel workflows out of existing python code through it’s use of decorators on existing python functions.

      In addition, Parsl supports a lot of different kinds of providers, allowing the same python code to be easily run multi-threaded using a ThreadPoolExecutor and via multi-processing on many different cluster computing platforms using the HighThroughputExecutor. For example, Parsl includes providers supporting local execution, and on Slurm, Condor, Kubernetes, AWS, and other platforms. And Parsl handles data staging as well across these varied environments, making sure the data is in the right place when it’s needed for computations.

      Similarly to before, we start by configuring an executor in parsl, and loading it. We’ll use multiprocessing by configuring the HighThroughputExecutor to use our local resources as a cluster, and we’ll activate our virtual environment to be sure we’re executing in a consistent environment.

      -
      -
      # Required packages
      -import parsl
      -from parsl import python_app
      -from parsl.config import Config
      -from parsl.executors import HighThroughputExecutor
      -from parsl.providers import LocalProvider
      -
      -# Configure the parsl executor
      -activate_env = 'workon scomp'
      -htex_local = Config(
      -    executors=[
      -        HighThroughputExecutor(
      -            max_workers=15,
      -            provider=LocalProvider(
      -                worker_init=activate_env
      -            )
      -        )
      -    ],
      -)
      -parsl.clear()
      -parsl.load(htex_local)
      +
      +
      # Required packages
      +import parsl
      +from parsl import python_app
      +from parsl.config import Config
      +from parsl.executors import HighThroughputExecutor
      +from parsl.providers import LocalProvider
      +
      +# Configure the parsl executor
      +activate_env = 'workon scomp'
      +htex_local = Config(
      +    executors=[
      +        HighThroughputExecutor(
      +            max_workers=15,
      +            provider=LocalProvider(
      +                worker_init=activate_env
      +            )
      +        )
      +    ],
      +)
      +parsl.clear()
      +parsl.load(htex_local)
      -
      <parsl.dataflow.dflow.DataFlowKernel at 0x7fa749ae0460>
      +
      <parsl.dataflow.dflow.DataFlowKernel at 0x7f0917e50700>

      We now have a live parsl executor (htex_local) that is waiting to execute processes. We tell it to execute processes by annotating functions with decorators that indicate which tasks should be parallelized. Parsl then handles the scheduling and execution of those tasks based on the dependencies between them. In the simplest case, we’ll decorate our previous function for downloading a file with the @python_app decorator, which tells parsl that any function calls with this function should be run on the default executor (in this case, htex_local).

      -
      -
      # Decorators seem to be ignored as the first line of a cell, so print something first
      -print("Create decorated function")
      -
      -@python_app
      -def download_file_parsl(row):
      -    import urllib
      -    service = "https://arcticdata.io/metacat/d1/mn/v2/object/"
      -    pid = row[1]['identifier']
      -    filename = row[1]['filename']
      -    url = service + pid
      -    print("Downloading: " + filename)
      -    msg = urllib.request.urlretrieve(url, filename)
      -    return filename
      +
      +
      # Decorators seem to be ignored as the first line of a cell, so print something first
      +print("Create decorated function")
      +
      +@python_app
      +def download_file_parsl(row):
      +    import urllib
      +    service = "https://arcticdata.io/metacat/d1/mn/v2/object/"
      +    pid = row[1]['identifier']
      +    filename = row[1]['filename']
      +    url = service + pid
      +    print("Downloading: " + filename)
      +    msg = urllib.request.urlretrieve(url, filename)
      +    return filename
      Create decorated function

      Now we just write regular python code that calls that function, and parsl handles the scheduling. Parsl app executors return an AppFuture, and we can call the AppFuture.done() function to determine when the future result is ready without blocking. Or, we can just block on AppFuture.result() which waits for each of the executions to complete and then returns the result.

      -
      -
      #! eval: true
      -print("Define our download_futures function")
      -
      -@timethis
      -def download_futures(table):
      -    results = []
      -    for row in table.iterrows():
      -        result = download_file_parsl(row)
      -        print(result)
      -        results.append(result)
      -    return(results)
      -
      -@timethis
      -def wait_for_futures(table):
      -    results = download_futures(table)
      -    done = [app_future.result() for app_future in results]
      -    print(done)
      -
      -wait_for_futures(file_table[0:5])
      +
      +
      #! eval: true
      +print("Define our download_futures function")
      +
      +@timethis
      +def download_futures(table):
      +    results = []
      +    for row in table.iterrows():
      +        result = download_file_parsl(row)
      +        print(result)
      +        results.append(result)
      +    return(results)
      +
      +@timethis
      +def wait_for_futures(table):
      +    results = download_futures(table)
      +    done = [app_future.result() for app_future in results]
      +    print(done)
      +
      +wait_for_futures(file_table[0:5])
      Define our download_futures function
      -<AppFuture at 0x7fa70479cd90 state=pending>
      -<AppFuture at 0x7fa70479ceb0 state=pending>
      -<AppFuture at 0x7fa70479f910 state=pending>
      -<AppFuture at 0x7fa70479ea10 state=pending>
      -<AppFuture at 0x7fa70475ec80 state=pending>
      -download_futures: 10.331869125366211 ms
      +<AppFuture at 0x7f09148aae30 state=pending>
      +<AppFuture at 0x7f09148a98d0 state=pending>
      +<AppFuture at 0x7f0917e51120 state=pending>
      +<AppFuture at 0x7f0917e511e0 state=pending>
      +<AppFuture at 0x7f09145288e0 state=pending>
      +download_futures: 10.675907135009766 ms
       ['SWI_2007.tif', 'SWI_2019.tif', 'SWI_2021.tif', 'SWI_2020.tif', 'SWI_2001.tif']
      -wait_for_futures: 30859.310388565063 ms
      +wait_for_futures: 43896.12054824829 ms

      When we’re done, be sure to clean up and shutdown the htex_local executor, or it will continue to persist in your environment and utilize resources. Generally, an executor should be created when setting up your environment, and then it can be used repeatedly for many different tasks.

      -
      -
      htex_local.executors[0].shutdown()
      -parsl.clear()
      +
      +
      htex_local.executors[0].shutdown()
      +parsl.clear()
      +
      +

      +
      +

      4.10 Are we done yet?

      +

      Parsl and other concurrency libraries generally provide both blocking and non-blocking methods for accessing the results of asycnchronous method calls. By blocking, we are referring to methods that wait for the asynchronous opearation to complete before returning results, which blocks execution of the rest of a program. By non-blocking, we are referring to methods that return the result if it is available, but not if the async method hasn’t completed yet. In that case, it lets the rest of the program to continue execution.

      +

      In practice this means that we can either 1) wait for all async calls to complete, and then process them using the blocking methods, or 2) query with a non-blocking method to see if each async call is complete, and only then retrieve the results for that method. We illustrate this approach below with parsl.

      +
      +
      # Required packages
      +import parsl
      +from parsl import python_app
      +from parsl.config import Config
      +from parsl.executors import HighThroughputExecutor
      +from parsl.providers import LocalProvider
      +
      +# Configure the parsl executor
      +activate_env = 'workon scomp'
      +htex_local = Config(
      +    executors=[
      +        HighThroughputExecutor(
      +            max_workers=5,
      +            provider=LocalProvider(
      +                worker_init=activate_env
      +            )
      +        )
      +    ],
      +)
      +parsl.clear()
      +parsl.load(htex_local)
      +
      +
      <parsl.dataflow.dflow.DataFlowKernel at 0x7f0959b47c10>
      +
      +
      +

      Define the task we want to run.

      +
      +
      @python_app
      +def do_stuff(x):
      +    import time
      +    time.sleep(1)
      +    return x**2
      +
      +

      And now execute the tasks with some sleeps to see which are blocking and which are completed.

      +
      +
      import time
      +all_futures = []
      +for x in range(0,10):
      +    future = do_stuff(x)
      +    all_futures.append(future)
      +    print(future)
      +
      +time.sleep(3)
      +
      +for future in all_futures:
      +    print("Checking: ", future)
      +    if (future.done()):
      +        print("Do more with result: ", future.result())
      +    else:
      +        print("Sorry, come back later.")
      +
      +
      <AppFuture at 0x7f0914528fd0 state=pending>
      +<AppFuture at 0x7f0917e51510 state=pending>
      +<AppFuture at 0x7f091452abc0 state=pending>
      +<AppFuture at 0x7f0914529450 state=pending>
      +<AppFuture at 0x7f0914528e80 state=pending>
      +<AppFuture at 0x7f091452a7a0 state=pending>
      +<AppFuture at 0x7f091452ac20 state=pending>
      +<AppFuture at 0x7f0914528c70 state=pending>
      +<AppFuture at 0x7f0917e53e80 state=pending>
      +<AppFuture at 0x7f09148aaa10 state=pending>
      +Checking:  <AppFuture at 0x7f0914528fd0 state=pending>
      +Sorry, come back later.
      +Checking:  <AppFuture at 0x7f0917e51510 state=pending>
      +Sorry, come back later.
      +Checking:  <AppFuture at 0x7f091452abc0 state=pending>
      +Sorry, come back later.
      +Checking:  <AppFuture at 0x7f0914529450 state=pending>
      +Sorry, come back later.
      +Checking:  <AppFuture at 0x7f0914528e80 state=pending>
      +Sorry, come back later.
      +Checking:  <AppFuture at 0x7f091452a7a0 state=pending>
      +Sorry, come back later.
      +Checking:  <AppFuture at 0x7f091452ac20 state=pending>
      +Sorry, come back later.
      +Checking:  <AppFuture at 0x7f0914528c70 state=pending>
      +Sorry, come back later.
      +Checking:  <AppFuture at 0x7f0917e53e80 state=pending>
      +Sorry, come back later.
      +Checking:  <AppFuture at 0x7f09148aaa10 state=pending>
      +Sorry, come back later.
      +
      +

      Notice in particular that about half of the jobs are not done yet.

      -
      -

      4.10 When to parallelize

      +
      +

      4.11 When to parallelize

      It’s not as simple as it may seem. While in theory each added processor would linearly increase the throughput of a computation, there is overhead that reduces that efficiency. For example, the code and, importantly, the data need to be copied to each additional CPU, and this takes time and bandwidth. Plus, new processes and/or threads need to be created by the operating system, which also takes time. This overhead reduces the efficiency enough that realistic performance gains are much less than theoretical, and usually do not scale linearly as a function of processing power. For example, if the time that a computation takes is short, then the overhead of setting up these additional resources may actually overwhelm any advantages of the additional processing power, and the computation could potentially take longer!

      In addition, not all of a task can be parallelized. Depending on the proportion, the expected speedup can be significantly reduced. Some propose that this may follow Amdahl’s Law, where the speedup of the computation (y-axis) is a function of both the number of cores (x-axis) and the proportion of the computation that can be parallelized (see colored lines):

      @@ -767,8 +871,8 @@

      So, it is important to evaluate the computational efficiency of requests, and work to ensure that additional compute resources brought to bear will pay off in terms of increased work being done.

      -
      -

      4.11 Parallel Pitfalls

      +
      +

      4.12 Parallel Pitfalls

      A set of tasks is considered ‘pleasingly parallel’ when large portions of the code can be executed indpendently of the other portions and have few or no dependencies on other parts of the execution. This situation is common, and we can frequently execute parallel tasks on independent subsets of our data. Nevertheless, dependencies among different parts of your computation can definitely create bottlenecks and slow down computations. Some of the challenges you may need to work around include:

      • Task dependencies: occur when one task in the code depends on the results of another task or computation in the code.

      • @@ -777,12 +881,12 @@

        Even when tasks exhibit strong dependencies, it is frequently possible to still optimize that code by parallelizing explicit code sections, sometimes bringing other concurrency tools into the mix, such as the Message Passing Interface (MPI). But simply improving the efficiency of pleasingly parallel tasks can be liberating.

      -
      -

      4.12 Summary

      +
      +

      4.13 Summary

      In this lesson, we showed examples of computing tasks that are likely limited by the number of CPU cores that can be applied, and we reviewed the architecture of computers to understand the relationship between CPU processors and cores. Next, we reviewed the way in which traditional sequential loops can be rewritten as functions that are applied to a list of inputs both serially and in parallel to utilize multiple cores to speed up computations. We reviewed the challenges of optimizing code, where one must constantly examine the bottlenecks that arise as we improve cpu-bound, I/O bound,and memory bound computations.

      -
      -

      4.13 Further Reading

      +
      +

      4.14 Further Reading

      Ryan Abernathey & Joe Hamman. 2020. Closed Platforms vs. Open Architectures for Cloud-Native Earth System Analytics. Medium.

      diff --git a/2024-03-arctic/sections/parallel-with-dask.html b/2024-03-arctic/sections/parallel-with-dask.html index 715857c9..35276d66 100644 --- a/2024-03-arctic/sections/parallel-with-dask.html +++ b/2024-03-arctic/sections/parallel-with-dask.html @@ -382,16 +382,16 @@

      6.4 Connect to an existing cluster

      The instructor is going to start a local cluster on the server that everyone can connect to. This scenario is a realistic simulation of what it would look like for you to connect to a shared Dask resource. You can create your own local cluster running on your laptop using code at the end of this lesson.

      First, the instructor (and only the instructor!) will run:

      -
      +
      from dask.distributed import LocalCluster
       cluster = LocalCluster(n_workers=70, memory_limit='auto', processes=True)

      To connect to it, we will use the address that the scheduler is listening on. The port is generated randomly, so first the instructor needs to get address for you to use in your code. In a ‘real world’ scenario, this address would be given to you by the administrator of the Dask cluster.

      -
      +
      cluster.scheduler_address

      Now, you can pass the address to the Client function, which sets up your session as a client of the Dask cluster,

      -
      +
      from dask.distributed import LocalCluster, Client
       
       # if you are copy pasting this from the book, make sure 
      @@ -433,19 +433,19 @@ 

      6.5.1 Reading a csv

      To get familiar with dask.dataframes, we will use tabular data of soil moisture measurements at six forest stands in northeastern Siberia. The data has been collected since 2014 and is archived at the Arctic Data Center (Loranty & Alexander, doi:10.18739/A24B2X59C). Just as we did in the previous lesson, we will download the data using the requests package and the data’s URL obtained from the Arctic Data Center.

      -
      +
      import os              
       import urllib 
       
       import dask.dataframe as dd
      -
      +
      url = 'https://arcticdata.io/metacat/d1/mn/v2/object/urn%3Auuid%3A27e4043d-75eb-4c4f-9427-0d442526c154'
       
       msg = urllib.request.urlretrieve(url, "dg_soil_moisture.csv")

      In the Arctic Data Center metadata we can see this file is 115 MB. To import this file as a dask.dataframe with more than one partition, we need to specify the size of each partition with the blocksize parameter. In this example, we will split the data frame into six partitions, meaning a block size of approximately 20 MB.

      -
      +
      fp = os.path.join(os.getcwd(),'dg_soil_moisture.csv')
       df = dd.read_csv(fp, blocksize = '20MB' , encoding='ISO-8859-1')
       df
      @@ -463,7 +463,7 @@

      About the encoding parameter: If we try to import the file directly, we will receive an UnicodeDecodeError. We can run the following code to find the file’s encoding and add the appropriate encoding to dask.dataframe.read_csv.

      -
      +
      import chardet
       fp = os.path.join(os.getcwd(),'dg_soil_moisture.csv')
       with open(fp, 'rb') as rawdata:
      @@ -475,20 +475,20 @@ 

      Notice that we cannot see any values in the data frame. This is because Dask has not really loaded the data. It will wait until we explicitly ask it to print or compute something to do so.

      However, we can still do df.head(). It’s not costly for memory to access a few data frame rows.

      -
      +
      df.head(3)

      6.5.2 Lazy Computations

      The application programming interface (API) of a dask.dataframe is a subset of the pandas.DataFrame API. So if you are familiar with pandas, many of the core pandas.DataFrame methods directly translate to dask.dataframes. For example:

      -
      +
      averages = df.groupby('year').mean(numeric_only=True)
       averages

      Notice that we cannot see any values in the resulting data frame. A major difference between pandas.DataFrames and dask.dataframes is that dask.dataframes are “lazy”. This means an object will queue transformations and calculations without executing them until we explicitly ask for the result of that chain of computations using the compute method. Once we run compute, the scheduler can allocate memory and workers to execute the computations in parallel. This kind of lazy evaluation (or delayed computation) is how most Dask workloads work. This varies from eager evaluation methods and functions, which start computing results right when they are executed.

      Before calling compute on an object, open the Dask dashboard to see how the parallel computation is happening.

      -
      +
      averages.compute()
      @@ -503,20 +503,20 @@

      In this short example we will create a 200x500 dask.array by specifying chunk sizes of 100x100.

      -
      +
      import numpy as np
       
       import dask.array as da
      -
      +
      data = np.arange(100_000).reshape(200, 500)
       a = da.from_array(data, chunks=(100, 100))

      Computations for dask.arrays also work lazily. We need to call compute to trigger computations and bring the result to memory.

      -
      +
      a.mean()
      -
      +
      a.mean().compute()
      @@ -530,7 +530,7 @@

      The NDVI is calculated using the near-infrared and red bands of the satellite image. The formula is

      \[NDVI = \frac{NIR - Red}{NIR + Red}.\]

      First, we download the data for the near-infrared (NIR) and red bands from the Arctic Data Center:

      -
      +
      # download red band
       url = 'https://arcticdata.io/metacat/d1/mn/v2/object/urn%3Auuid%3Aac25a399-b174-41c1-b6d3-09974b161e5a'
       msg = urllib.request.urlretrieve(url, "RU_ANS_TR2_FL005M_red.tif")
      @@ -543,10 +543,10 @@ 

      Because these are .tif files and have geospatial metadata, we will use rioxarray to read them. You can find more information about rioxarray here.

      To indicate we will open these .tif files with dask.arrays as the underlying object to the xarray.DataArray (instead of a numpy.array), we need to specify either a shape or the size in bytes for each chunk. Both files are 76 MB, so let’s have chunks of 15 MB to have roughly six chunks.

      -
      +
      import rioxarray as rioxr
      -
      +
      # read in the file
       fp_red = os.path.join(os.getcwd(),"RU_ANS_TR2_FL005M_red.tif")
       red = rioxr.open_rasterio(fp_red, chunks = '15MB')
      @@ -557,12 +557,12 @@

      There is geospatial information (transformation, CRS, resolution) and no-data values.
    24. There is an unnecessary dimension: a constant value for the band. So our next step is to squeeze the array to flatten it.
    25. -
      +
      # getting rid of unnecessary dimension
       red = red.squeeze()

      Next, we read in the NIR band and do the same pre-processing:

      -
      +
      # open data
       fp_nir = os.path.join(os.getcwd(),"RU_ANS_TR2_FL005M_nir.tif")
       nir = rioxr.open_rasterio(fp_nir, chunks = '15MB')
      @@ -574,15 +574,15 @@ 

      6.7.2 Calculating NDVI

      Now we set up the NDVI calculation. This step is easy because we can handle xarrays and Dask arrays as NumPy arrays for arithmetic operations. Also, both bands have values of type float32, so we won’t have trouble with the division.

      -
      +
      ndvi = (nir - red) / (nir + red)

      When we look at the NDVI we can see the result is another dask.array, nothing has been computed yet. Remember, Dask computations are lazy, so we need to call compute() to bring the results to memory.

      -
      +
      ndvi_values = ndvi.compute()

      And finally, we can see what these look like. Notice that xarray uses the value of the dimensions as labels along the x and y axes. We use robust=True to ignore the no-data values when plotting.

      -
      +
      ndvi_values.plot(robust=True)
      @@ -592,16 +592,16 @@

      6.8.1 Setting up a Local Cluster

      We can create a local cluster as follows:

      -
      +
      from dask.distributed import LocalCluster, Client
      -
      +
      cluster = LocalCluster(n_workers=4, memory_limit=0.1, processes=True)
       cluster

      And then we create a client to connect to our cluster, passing the Client function the cluster object.

      -
      +
      client = Client(cluster)
       client
      diff --git a/2024-03-arctic/sections/python-intro.html b/2024-03-arctic/sections/python-intro.html index d170e347..8b2fa480 100644 --- a/2024-03-arctic/sections/python-intro.html +++ b/2024-03-arctic/sections/python-intro.html @@ -379,7 +379,7 @@

      3.5 Brief overview of python syntax

      We’ll very briefly go over some basic python syntax and the base variable types. First, open a python script. From the File menu, select New File, type “python”, then save it as ‘python-intro.py’ in the top level of your directory.

      In your file, assign a value to a variable using = and print the result.

      -
      +
      x = 4
       print(x)
      @@ -402,7 +402,7 @@

      +
      str = 'Hello World!'
       print(str)
      @@ -410,7 +410,7 @@

      +
      list = [100, 50, -20, 'text']
       print(list)
      @@ -418,7 +418,7 @@

      +
      list[0] # print first element
       list[1:3] # print 2nd until 4th elements
       list[:2] # print first until the 3rd
      @@ -437,7 +437,7 @@ 

      +
      list2 = ['more', 'things']
       
       list + list2
      @@ -450,7 +450,7 @@ 

      +
      tuple = ('a', 'b', 'c', 'd')
       tuple[0]
       tuple * 3
      @@ -466,7 +466,7 @@ 

      +
      list[0] = 'new value'
       list
      @@ -474,12 +474,12 @@

      +
      tuple[0] = 'new value'
      TypeError: 'tuple' object does not support item assignment

      Dictionaries consist of key-value pairs, and are created using the syntax {key: value}. Keys are usually numbers or strings, and values can be any data type.

      -
      +
      dict = {'name': ['Jeanette', 'Matt'],
               'location': ['Tucson', 'Juneau']}
       
      @@ -493,7 +493,7 @@ 

      +
      type(list)
       type(tuple)
       type(dict)
      @@ -522,7 +522,7 @@

      3.6.1 Load libraries

      In your first code chunk, lets load in some modules. We’ll use pandas, numpy, matplotlib.pyplot, requests, skimpy, and exists from os.path.

      -
      +
      import pandas as pd
       import numpy as np
       import matplotlib.pyplot as plt
      @@ -547,11 +547,11 @@ 

      Click ‘copy link address’

      Create a variable called URL and assign it the link copied to your clipboard. Then use urllib.request.urlretrieve to download the file, and open to write it to disk, to a directory called data/. We’ll write this bundled in an if statement so that we only download the file if it doesn’t yet exist. First, we create the directory if it doesn’t exist:

      -
      +
      if not os.path.exists ('data/'):
               os.mkdir('data/')
      -
      +
      if not os.path.exists('data/discharge_timeseries.csv'):
       
               url = 'https://arcticdata.io/metacat/d1/mn/v2/object/urn%3Auuid%3Ae248467d-e1f9-4a32-9e38-a9b4fb17cefb'
      @@ -559,7 +559,7 @@ 

      msg = urllib.request.urlretrieve(url, 'data/discharge_timeseries.csv')

      Now we can read in the data from the file.

      -
      +
      df = pd.read_csv('data/discharge_timeseries.csv')
       df.head()
      @@ -634,7 +634,7 @@

      The column names are a bit messy so we can use clean_columns from skimpy to make them cleaner for programming very quickly. We can also use the skim function to get a quick summary of the data.

      -
      +
      clean_df = skimpy.clean_columns(df)
       skimpy.skim(clean_df)
      @@ -669,7 +669,7 @@

      We can see that the date column is classed as a string, and not a date, so let’s fix that.

      -
      +
      clean_df['date'] = pd.to_datetime(clean_df['date'])
       skimpy.skim(clean_df)
      @@ -709,21 +709,21 @@

      summarize over it by taking the mean

      First we should probably rename our existing date/time column to prevent from getting confused.

      -
      +
      clean_df = clean_df.rename(columns = {'date': 'datetime'})

      Now create the new date column

      -
      +
      clean_df['date'] = clean_df['datetime'].dt.date

      Finally, we use group by to split the data into groups according to the date. We can then apply the mean method to calculate the mean value across all of the columns. Note that there are other methods you can use to calculate different statistics across different columns (eg: clean_df.groupby('date').agg({'discharge_m_3_s': 'max'})).

      -
      +
      daily_flow = clean_df.groupby('date', as_index = False).mean(numeric_only = True)
      • create a simple plot
      -
      +
      var = 'discharge_m_3_s'
       var_labs = {'discharge_m_3_s': 'Total Discharge'}
       
      @@ -763,7 +763,7 @@ 

      +
      import matplotlib.pyplot as plt
       
       def myplot(df, var):
      @@ -784,7 +784,7 @@ 

      +
      myplot(daily_flow, 'temperature_degrees_c')
      diff --git a/2024-03-arctic/sections/software-design-1.html b/2024-03-arctic/sections/software-design-1.html index 5a713015..949c94a0 100644 --- a/2024-03-arctic/sections/software-design-1.html +++ b/2024-03-arctic/sections/software-design-1.html @@ -337,7 +337,7 @@

      12.3 Functions as objects

      Functions are first-class objects in Python (and many other languages). This has some real benefits for and implications for parallel programming. Because a function is an object, it means that it can be 1) stored in a variable, and 2) passed as an argument to another function. We saw that in the module on pleasingly parallel codes when we used ThreadPoolExecutor.map(), which takes a function and an iterable object as arguments. Let’s check out how you can use and manipulate functions as objects. First, let’s define a simple function, assign it to another variable, and then use both:

      -
      +
      def double(x):
           return 2*x
       
      @@ -349,7 +349,7 @@ 

      Note that when we print it to screen, we see that prod is of type function, and when we use the two instances, we get identical results:

      -
      +
      print(double(7))
       print(twotimes(7))
       print(double(5) == twotimes(5))
      @@ -360,14 +360,14 @@

      This representation of a function as an object comes in handy when we want to invoke a function in multiple different contexts, such as in a parallel execution environment via a map() function.

      -
      +
      list(map(twotimes, [2,3,4]))
      [4, 6, 8]

      This works because the function twotimes can be passed to map and executed from within map. When you execute a function that is passed in via an argument, it is called function composition. We can easily illustrate this by creating some function and passing it to a wrapper function to be executed:

      -
      +
      def some_function():
           print("Ran some_function")
       
      @@ -413,7 +413,7 @@ 

      12.4 Global variables

      When executing a function, the variables that are in scope to that function are local, meaning that the presence of another variable with the same name in another scope will not affect a calculation. For example:

      -
      +
      def do_task():
           x = 10
       
      @@ -425,7 +425,7 @@ 

      However, if that same variable is declared as global inside the function, then the assignment will have global impact on the value of the parent scope:

      -
      +
      def do_task():
           global x
           x = 10
      @@ -439,7 +439,7 @@ 

      So, you can see that writing a function that uses global variables can have effects outside of the scope of the function call. This can have drastic consequences on concurrent code, as the order in which function calls are made when operating concurrently are not deterministic, and so the impact of global variables will also not be deterministic.

      A related issue arises when code in a function depends on its enclosing namespace, such as when a function is defined inside of another function. When resolving a variable, python first looks in the Local namespace, and then in the Enclosing namespace, Global namespace, and Built-in namespace. So, even if a variable is not defined locally, it might still be resolved by one of the other namespaces in surprising ways.

      -
      +
      a = 3
       def do_stuff(b):
           return a*b
      @@ -476,7 +476,7 @@ 

      12.7 Locks

      Locks are a mechanism to manage access to a resource so that multiple threads can access the resource. By adding locks to an otherwise parallel process, we introduce a degree of serial execution to the locked portion of the process. Basically, each thread can only access the resource when it has the lock, and only one lock is given out at a time. Take this example of what happens without locking:

      -
      +
      from concurrent.futures import ProcessPoolExecutor
       import time
       
      @@ -489,16 +489,16 @@ 

      for future in futures: future.result()

      -
      102   HelloHelloHello
      -
      -
      -02 1 world world
      +
      012  Hello Hello
      +Hello
      +1
      +02  world world
       world
       

      You can see that the results come back in a semi-random order, and the call to sleep creates a delay between printing the two words, which means that the three messages get jumbled when printed. To fix this, we can introduce a lock from the multiprocessing package.

      -
      +
      from concurrent.futures import ProcessPoolExecutor
       import time
       import multiprocessing
      @@ -514,10 +514,10 @@ 

      for future in futures: future.result()

      -
      0 Hello
      -0 world
      -1 Hello
      +
      1 Hello
       1 world
      +0 Hello
      +0 world
       2 Hello
       2 world
      diff --git a/2024-03-arctic/sections/software-design-2.html b/2024-03-arctic/sections/software-design-2.html index ab5e6c47..1db88612 100644 --- a/2024-03-arctic/sections/software-design-2.html +++ b/2024-03-arctic/sections/software-design-2.html @@ -320,7 +320,7 @@

      16.2 Python modules and packages

      The Python Package Index currently houses over 400,000 software projects that you can install, reuse, and extend to accelerate your work. If you’ve worked in python for long at all, you have certainly used the import statement to to load modules that you need in your code into a namespace for use.

      -
      +
      import numpy as np
       np.pi*np.power(3, 2)
      @@ -350,7 +350,7 @@

      16.3 Anatomy of modules and packages

      Modules in python are loaded from the definitions stored in files of the same name with a .py file extension. Within each file are definitions of objects which can be accessed when the module is imported. For example, imagine you have a file named hello.py in your current directory that contains a function called helloworld with the following contents:

      -
      +
      def helloworld():
         """
         Print a hello message
      @@ -358,7 +358,7 @@ 

      print("Hello world!")

      With that in place, you can then import and use that function in other code you write:

      -
      +
      import helloworld
       helloworld()
      @@ -541,7 +541,7 @@

      16.6.2 Adding code to modules

      Let’s write some module code! In this case, we want to implement the adcmodel.data.load_adc_csv(identifier) function that we’ll use for data loading and caching. We do this by creating a new file called data.py in the adcmodel directory, and then implementing the function. Create a new python file called data.py and save it to the adcmodel module directory, with the following function implementation:

      -
      +
      import pandas as pd
       import urllib.request
       import hashlib
      @@ -564,7 +564,7 @@ 

      return df

      Once you save the file, we can use poetry install to install the package and try running the function in an interactive session. Check it out by opening a new Jupyter notebook and running the following in the first cell (after you install the needed jupyter packages):

      -
      +
      from adcmodel.data import load_adc_csv
       
       df = load_adc_csv("urn:uuid:e248467d-e1f9-4a32-9e38-a9b4fb17cefb")
      @@ -586,7 +586,7 @@ 

      +
      from adcmodel.data import load_adc_csv
       
       def test_load_adc_csv():
      @@ -595,7 +595,7 @@ 

      +
      def test_load_adc_csv_shape():
           df = load_adc_csv("urn:uuid:e248467d-e1f9-4a32-9e38-a9b4fb17cefb")
           assert df is not None
      @@ -603,7 +603,7 @@ 

      Finally, what happens when tests are run that fail? Try adding the following test, and determine if the failure is due to the code, the test, or the data? What would you change to make this test reliably pass, or fail more gracefully?

      -
      +
      def test_load_adc_csv_inputs():
           df = load_adc_csv()
           assert df is not None
      diff --git a/2024-03-arctic/sections/zarr.html b/2024-03-arctic/sections/zarr.html index 0835866c..66d31cd9 100644 --- a/2024-03-arctic/sections/zarr.html +++ b/2024-03-arctic/sections/zarr.html @@ -364,7 +364,7 @@

      14.4 Retrieving CMIP6 Data from Google Cloud

      Here, we will show how to uze Zarr and xarray to read in the CMIP6 climate model data from the World Climate Research Program, hosted by Googe Cloud. CMIP6 is a huge set of climate models, with around 100 models produced across 49 different groups. There is an enormous amount of data produced by these models, and in this demonstration we will load a tiny fraction of it. This demonstration is based off of an example put together by Pangeo

      First we’ll load some libraries. pandas, numpy, and xarray should all be familiar by now. We’ll also load in zarr, of course, and gcsfs, the Google Cloud Storage File System library, which enables access to Google Cloud datasets, including our CMIP6 example.

      -
      +
      import pandas as pd
       import numpy as np
       import zarr
      @@ -372,7 +372,7 @@ 

      import gcsfs

      Next, we’ll read a csv file that contains information about all of the CMIP6 stores available on Google Cloud. You can find this URL to this csv file by navigating to the Google Cloud CMIP6 home page, clicking on “View Dataset,” and pressing the copy link button for the file in the table.

      -
      +
      df = pd.read_csv('https://storage.googleapis.com/cmip6/cmip6-zarr-consolidated-stores.csv')
       df.head()
      @@ -477,7 +477,7 @@

      +
      df_ta = df.query("activity_id=='CMIP' & table_id == 'Oday' & variable_id == 'tos' & experiment_id == 'historical' & institution_id == 'NOAA-GFDL'")
       df_ta
      @@ -568,41 +568,41 @@

      +
      gcs = gcsfs.GCSFileSystem(token='anon')

      Now, we’ll create a variable with the path to the most recent store from the table above (note the table is sorted by version, so we grab the store URL from the last row), and create a mapping interface to the store using the connection to the Google Cloud Storage system. This interface is what allows us to access the Zarr store with all of the data we are interested in.

      -
      +
      zstore = df_ta.zstore.values[-1]
       mapper = gcs.get_mapper(zstore)

      Nex, we open the store using the xarray method open_zarr and examine its metadata. Note that the first argument to open_zarr can be a “MutableMapping where a Zarr Group has been stored” (what we are using here), or a path to a directory in file system (if a local Zarr store is being used).

      -
      +
      ds = xr.open_zarr(mapper)
       ds

      This piece of data is around 30 GB total, and we made it accessible to our environment nearly instantly. Consider also that the total volume of CMIP6 data available on Google Cloud is in the neighborhood of a few petabytes, and it is all available at your fingertips using Zarr.

      From here, using the xarray indexing, we can easily grab a time slice and make a plot of the data.

      -
      +
      ds.tos.sel(time='1999-01-01').squeeze().plot()

      We can also get a timeseries slice, here on the equator in the Eastern Pacific.

      -
      +
      ts = ds.tos.sel(lat = 0, lon = 272, method = "nearest")

      xarray allows us to do some pretty cool things, like calculate a rolling annual mean on our daily data. Again, because none of the data are loaded into memory, it is nearly instant.

      -
      +
      ts_ann = ts.rolling(time = 365).mean()

      Note that if we explicitly want to load the data into memory, we can by using the load method.

      -
      +
      ts.load()
      -
      +
      ts_ann.load()

      Finally, we can make a simple plot of the daily temperatures, along with the rolling annual mean.

      -
      +
      ts.plot(label = "daily")
       ts_ann.plot(label = "rolling annual mean")
      @@ -614,7 +614,7 @@

      14.5 Additional Reference: Creating a Zarr Dataset

      This is a simplified version of the official Zarr tutorial.

      First, create an array of 10,000 rows and 10,000 columns, filled with zeros, divided into chunks where each chunk is 1,000 x 1,000.

      -
      +
      import zarr
       import numpy as np
       z = zarr.zeros((10000, 10000), chunks=(1000, 1000), dtype='i4')
      @@ -624,7 +624,7 @@ 

      +
      z[0, :] = np.arange(10000)
       z[:]
      @@ -638,11 +638,11 @@

      +
      zarr.save('data/example.zarr', z)

      And open to open it again.

      -
      +
      arr = zarr.open('data/example.zarr')
       arr
      @@ -650,13 +650,13 @@

      +
      root = zarr.group()
       temp = root.create_group('temp')
       precip = root.create_group('precip')

      You can then create a dataset (array) within the group, again specifying the shape and chunks.

      -
      +
      t100 = temp.create_dataset('t100', shape=(10000, 10000), chunks=(1000, 1000), dtype='i4')
       t100
      @@ -664,7 +664,7 @@

      +
      root['temp']
       root['temp/t100'][:, 3]
      @@ -672,15 +672,15 @@

      +
      root.tree()
      -
      +
      root.info
      @@ -729,7 +729,7 @@

      -{"state":{"5dd51f9964284a349a1cd0c6cc5069cd":{"model_module":"ipytree","model_module_version":"^0.2","model_name":"NodeModel","state":{"_id":"18c38b17-2853-4571-841f-1cc38afd9648","_model_module":"ipytree","_model_module_version":"^0.2","_model_name":"NodeModel","_view_count":null,"_view_module":"ipytree","_view_module_version":"^0.2","_view_name":"NodeView","close_icon":"minus","close_icon_style":"default","disabled":true,"icon":"folder","icon_image":"","icon_style":"default","name":"/","nodes":["IPY_MODEL_94223409d2fb40deb4608e329a97ba60","IPY_MODEL_ab1082eba89f4fb682da770dd079bca3"],"open_icon":"plus","open_icon_style":"default","opened":true,"selected":false,"show_icon":true}},"6dcbb4607fd341e3880f9173ac165a9a":{"model_module":"ipytree","model_module_version":"^0.2","model_name":"TreeModel","state":{"_dom_classes":[],"_id":"#","_model_module":"ipytree","_model_module_version":"^0.2","_model_name":"TreeModel","_view_count":null,"_view_module":"ipytree","_view_module_version":"^0.2","_view_name":"TreeView","animation":200,"layout":"IPY_MODEL_e61f307919024182a37df2c08eba2a12","multiple_selection":true,"nodes":["IPY_MODEL_5dd51f9964284a349a1cd0c6cc5069cd"],"selected_nodes":[],"tabbable":null,"tooltip":null}},"94223409d2fb40deb4608e329a97ba60":{"model_module":"ipytree","model_module_version":"^0.2","model_name":"NodeModel","state":{"_id":"9da90861-57a9-4930-8941-f7e0b03ed570","_model_module":"ipytree","_model_module_version":"^0.2","_model_name":"NodeModel","_view_count":null,"_view_module":"ipytree","_view_module_version":"^0.2","_view_name":"NodeView","close_icon":"minus","close_icon_style":"default","disabled":true,"icon":"folder","icon_image":"","icon_style":"default","name":"precip","nodes":[],"open_icon":"plus","open_icon_style":"default","opened":false,"selected":false,"show_icon":true}},"ab1082eba89f4fb682da770dd079bca3":{"model_module":"ipytree","model_module_version":"^0.2","model_name":"NodeModel","state":{"_id":"fc89bfdb-e077-4f93-b2d8-181d79302b11","_model_module":"ipytree","_model_module_version":"^0.2","_model_name":"NodeModel","_view_count":null,"_view_module":"ipytree","_view_module_version":"^0.2","_view_name":"NodeView","close_icon":"minus","close_icon_style":"default","disabled":true,"icon":"folder","icon_image":"","icon_style":"default","name":"temp","nodes":["IPY_MODEL_ba4542714dab4d9f85f7d56495d92a47"],"open_icon":"plus","open_icon_style":"default","opened":false,"selected":false,"show_icon":true}},"ba4542714dab4d9f85f7d56495d92a47":{"model_module":"ipytree","model_module_version":"^0.2","model_name":"NodeModel","state":{"_id":"f787a0ce-5d28-439d-85b8-6022ce887dd9","_model_module":"ipytree","_model_module_version":"^0.2","_model_name":"NodeModel","_view_count":null,"_view_module":"ipytree","_view_module_version":"^0.2","_view_name":"NodeView","close_icon":"minus","close_icon_style":"default","disabled":true,"icon":"table","icon_image":"","icon_style":"default","name":"t100 (10000, 10000) int32","nodes":[],"open_icon":"plus","open_icon_style":"default","opened":false,"selected":false,"show_icon":true}},"e61f307919024182a37df2c08eba2a12":{"model_module":"@jupyter-widgets/base","model_module_version":"2.0.0","model_name":"LayoutModel","state":{"_model_module":"@jupyter-widgets/base","_model_module_version":"2.0.0","_model_name":"LayoutModel","_view_count":null,"_view_module":"@jupyter-widgets/base","_view_module_version":"2.0.0","_view_name":"LayoutView","align_content":null,"align_items":null,"align_self":null,"border_bottom":null,"border_left":null,"border_right":null,"border_top":null,"bottom":null,"display":null,"flex":null,"flex_flow":null,"grid_area":null,"grid_auto_columns":null,"grid_auto_flow":null,"grid_auto_rows":null,"grid_column":null,"grid_gap":null,"grid_row":null,"grid_template_areas":null,"grid_template_columns":null,"grid_template_rows":null,"height":null,"justify_content":null,"justify_items":null,"left":null,"margin":null,"max_height":null,"max_width":null,"min_height":null,"min_width":null,"object_fit":null,"object_position":null,"order":null,"overflow":null,"padding":null,"right":null,"top":null,"visibility":null,"width":null}}},"version_major":2,"version_minor":0} +{"state":{"0e30f30457694a38b402caed40339b5c":{"model_module":"@jupyter-widgets/base","model_module_version":"2.0.0","model_name":"LayoutModel","state":{"_model_module":"@jupyter-widgets/base","_model_module_version":"2.0.0","_model_name":"LayoutModel","_view_count":null,"_view_module":"@jupyter-widgets/base","_view_module_version":"2.0.0","_view_name":"LayoutView","align_content":null,"align_items":null,"align_self":null,"border_bottom":null,"border_left":null,"border_right":null,"border_top":null,"bottom":null,"display":null,"flex":null,"flex_flow":null,"grid_area":null,"grid_auto_columns":null,"grid_auto_flow":null,"grid_auto_rows":null,"grid_column":null,"grid_gap":null,"grid_row":null,"grid_template_areas":null,"grid_template_columns":null,"grid_template_rows":null,"height":null,"justify_content":null,"justify_items":null,"left":null,"margin":null,"max_height":null,"max_width":null,"min_height":null,"min_width":null,"object_fit":null,"object_position":null,"order":null,"overflow":null,"padding":null,"right":null,"top":null,"visibility":null,"width":null}},"2e61827ca7ba4399b9217fdbb834b2d6":{"model_module":"ipytree","model_module_version":"^0.2","model_name":"NodeModel","state":{"_id":"4f314ea5-4037-4cf4-9ab8-56ba11a3f179","_model_module":"ipytree","_model_module_version":"^0.2","_model_name":"NodeModel","_view_count":null,"_view_module":"ipytree","_view_module_version":"^0.2","_view_name":"NodeView","close_icon":"minus","close_icon_style":"default","disabled":true,"icon":"folder","icon_image":"","icon_style":"default","name":"/","nodes":["IPY_MODEL_5d74e00418044597b220a8295a3f5ee4","IPY_MODEL_a5202598ad7f4b71851568cfbb766bdf"],"open_icon":"plus","open_icon_style":"default","opened":true,"selected":false,"show_icon":true}},"5d74e00418044597b220a8295a3f5ee4":{"model_module":"ipytree","model_module_version":"^0.2","model_name":"NodeModel","state":{"_id":"61182916-1e43-46a2-9412-8211a6a8a276","_model_module":"ipytree","_model_module_version":"^0.2","_model_name":"NodeModel","_view_count":null,"_view_module":"ipytree","_view_module_version":"^0.2","_view_name":"NodeView","close_icon":"minus","close_icon_style":"default","disabled":true,"icon":"folder","icon_image":"","icon_style":"default","name":"precip","nodes":[],"open_icon":"plus","open_icon_style":"default","opened":false,"selected":false,"show_icon":true}},"a5202598ad7f4b71851568cfbb766bdf":{"model_module":"ipytree","model_module_version":"^0.2","model_name":"NodeModel","state":{"_id":"d9508770-cd73-4017-ad98-df2749f45c5e","_model_module":"ipytree","_model_module_version":"^0.2","_model_name":"NodeModel","_view_count":null,"_view_module":"ipytree","_view_module_version":"^0.2","_view_name":"NodeView","close_icon":"minus","close_icon_style":"default","disabled":true,"icon":"folder","icon_image":"","icon_style":"default","name":"temp","nodes":["IPY_MODEL_ca9961663c3e46c888688ea3b6d3e16d"],"open_icon":"plus","open_icon_style":"default","opened":false,"selected":false,"show_icon":true}},"ca9961663c3e46c888688ea3b6d3e16d":{"model_module":"ipytree","model_module_version":"^0.2","model_name":"NodeModel","state":{"_id":"ddfa9dd4-5195-4873-8914-770ba3835d94","_model_module":"ipytree","_model_module_version":"^0.2","_model_name":"NodeModel","_view_count":null,"_view_module":"ipytree","_view_module_version":"^0.2","_view_name":"NodeView","close_icon":"minus","close_icon_style":"default","disabled":true,"icon":"table","icon_image":"","icon_style":"default","name":"t100 (10000, 10000) int32","nodes":[],"open_icon":"plus","open_icon_style":"default","opened":false,"selected":false,"show_icon":true}},"cb6d7f1e58b540279d1b1194355876af":{"model_module":"ipytree","model_module_version":"^0.2","model_name":"TreeModel","state":{"_dom_classes":[],"_id":"#","_model_module":"ipytree","_model_module_version":"^0.2","_model_name":"TreeModel","_view_count":null,"_view_module":"ipytree","_view_module_version":"^0.2","_view_name":"TreeView","animation":200,"layout":"IPY_MODEL_0e30f30457694a38b402caed40339b5c","multiple_selection":true,"nodes":["IPY_MODEL_2e61827ca7ba4399b9217fdbb834b2d6"],"selected_nodes":[],"tabbable":null,"tooltip":null}}},"version_major":2,"version_minor":0}