-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcompute_dark_and_cloud_metrics.py
386 lines (320 loc) · 15 KB
/
compute_dark_and_cloud_metrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
"""Script for analyzing CGF snow cover values during periods of winter darkness and cloud cover. This script is required for understanding how snow conditions are impacted by these uncertainty sources. Running this script in a main thread will produce a series of tagged GeoTIFFs that can be used to visualize the results in a GIS application. Otherwise, functionality can be imported and used in the actual snow metric computation to identify and filter CGF snow cover values as necessary."""
import logging
import argparse
import os
import numpy as np
import xarray as xr
import dask
from dask.distributed import Client
from config import preprocessed_dir, mask_dir, uncertainty_dir, SNOW_YEAR
from luts import inv_cgf_codes
from shared_utils import (
open_preprocessed_dataset,
apply_threshold,
fetch_raster_profile,
apply_mask,
write_tagged_geotiff,
)
def is_obscured(chunked_cgf_snow_cover, dark_source):
"""Determine if a grid cell is obscured by a 'dark' condition.
The 'dark' is a cloud or winter darkness condition. These uncertainty sources will be handled in an identical fashion. Maybe there is a better word than dark (penumbra???) to describe this condition, but I can't think of it right now.
Args:
chunked_cgf_snow_cover (xr.DataArray): chunked CGF snow cover data.
dark_source (str): one of 'Cloud' or 'Night'.
Returns:
xr.DataArray: Obscured grid cells.
"""
dark_on = chunked_cgf_snow_cover == inv_cgf_codes[dark_source]
return dark_on
def count_darkness(dark_on):
"""Count the per-pixel occurrence of polar/winter darkness or cloud cover (in the initialization period) in the snow year.
Args:
dark_on (xr.Dataset): Obscured grid cells.
Returns:
xarray.DataArray: count of 'Cloud' or 'Night' values.
"""
logging.info("Counting occurence of dark values...")
darkness_count = dark_on.sum(dim="time")
return darkness_count
def get_first_obscured_occurence_index(dark_on):
"""Get the first occurrence of the obscured condition.
Returns the time index of the first chronological occurrence of the obscured condition. Ranges between 0 and 365.
Args:
dark_on (xr.DataArray): Obscured grid cells.
Returns:
xr.DataArray: The first occurrence of the obscured condition."""
first_dark_index = dark_on.argmax(dim="time")
return first_dark_index
def get_last_obscured_occurence_index(dark_on):
"""Get the last occurrence of the obscured condition.
Returns the time index of the last chronological occurrence of the obscured condition. Ranges between 0 and 365.
Args:
dark_on (xr.DataArray): Obscured grid cells.
Returns:
xr.DataArray: The last occurrence of the obscured condition."""
dark_on_reverse_time = dark_on.isel(time=slice(None, None, -1))
last_occurrence_reverse = dark_on_reverse_time.argmax(dim="time")
last_dark_index = dark_on.time.size - last_occurrence_reverse - 1
return last_dark_index
def get_dusk_index(first_dark_index):
"""Get the time index of the last observation just before the onset of the obscured condition.
Ranges between 0 and 364 (or 365 in a leap year). A value of 0 likely indicates that a grid cell did not experience the obscured condition at any point in the snow year.
Args:
first_dark_index (xr.DataArray): The first occurrence of the obscured condition.
Returns:
xr.DataArray: The index of the last observation before the onset of the obscured condition.
"""
dusk_index = first_dark_index - 1
# handle edge case where obscured condition did not occur at all
# if we don't do this we hit an IndexError i.e. try to slice value at time index -1
dusk_index = dusk_index.where(dusk_index >= 0, 0)
return dusk_index
def get_dawn_index(last_dark_index, len_time_index):
"""Get the index of the first observation immediately after the obscured period.
Index values range between 1 and 364 (or 365 in a leap year).
Args:
last_dark_index (xr.DataArray): The final occurrence of the obscured condition.
len_time_index (int): The length of the time index for any given snow year used to cap the dawn index value.
Returns:
xr.DataArray: The index of the first observation following the obscured period.
"""
dawn_index = last_dark_index + 1
last_time_index = len_time_index - 1
# handle edge case where obscured condition did not occur at all
# if we don't do this we hit an IndexError i.e. try to slice value at time index 366
dawn_index = dawn_index.where(dawn_index <= last_time_index, last_time_index)
return dawn_index
def get_median_obscured_index(dusk_index, dawn_index):
"""Get the median index of the obscured period.
Basically - what is the midpoint of the dark or cloud period? We'll use this as a basis to fill values in the obscured period and determine first/last days of the full snow season.
Args:
dusk_index (xr.DataArray): The index of the last observation before the onset of the obscured condition.
dawn_index (xr.DataArray): The index of the first observation following the obscured period.
Returns:
xr.DataArray: The median index of the obscured period.
"""
return (dusk_index + dawn_index) // 2
@dask.delayed
def get_dusk_observation(dusk_index, chunked_cgf_snow_cover):
"""Get the value of the snow cover product at the onset of the obscured condition.
We delay this computation because index slicing the snowcover value at the time index basically requires an integer value.
Args:
chunked_cgf_snow_cover (xr.DataArray): chunked CGF snow cover data.
dusk_index (xr.DataArray): index of the last observation before the onset of the obscured condition.
Returns:
xr.DataArray: Value of observation prior to the obscured period.
"""
dusk_observation = chunked_cgf_snow_cover.isel(time=dusk_index)
return dusk_observation
@dask.delayed
def get_dawn_observation(dawn_index, chunked_cgf_snow_cover):
"""Get the value of the snow cover product immediately after the obscured condition.
We delay this computation because index slicing the snowcover value at the time index basically requires an integer value.
Args:
chunked_cgf_snow_cover (xr.DataArray): chunked CGF snow cover data.
dawn_index (xr.DataArray): index of the first observation following the obscured period.
Returns:
xr.DataArray: value of the first observation following the obscured period.
"""
dawn_observation = chunked_cgf_snow_cover.isel(time=dawn_index)
return dawn_observation
def is_snow_on_at_dusk(dusk_observation):
"""Determine if snow is "on" at dusk.
Args:
dusk_observation (xr.DataArray): Value of the last observation before the onset of the obscured condition.
Returns:
xr.DataArray: boolean array indicating if snow is on at dusk.
"""
return apply_threshold(dusk_observation)
def is_snow_on_at_dawn(dawn_observation):
"""Determine if snow is "on" at dawn.
Args:
dawn_observation (xr.DataArray): value of the first observation following the obscured period.
Returns:
xr.DataArray: boolean array indicating if snow is on at dawn.
"""
return apply_threshold(dawn_observation)
@dask.delayed
def get_snow_transition_cases(snow_is_on_at_dusk, snow_is_on_at_dawn):
"""Determine the snow transition case during the obscured period.
We delay this computation because `np.select` requires boolean arrays. Determine what, if anything, happened to the binary (on or off) snow state between the dusk observation the dawn observation. This information will be used to map different forward and backward data filling strategies for cloud or winter darkness conditions.
Args:
snow_is_on_at_dusk (xr.DataArray): boolean indicating if snow is on at dusk.
snow_is_on_at_dawn (xr.DataArray): boolean indicating if snow is on at dawn.
Returns:
xr.DataArray: integer array indicating the snow transition case during the obscured period. 1 = no change, 2 = snow flipped on, 3 = snow flipped off.
"""
# no change in snow condition during the obscured period
# return true when conditions are identical before and after obscured period
snow_did_not_flip = ~(snow_is_on_at_dusk ^ snow_is_on_at_dawn)
# first snow happened during the obscured period
snow_flipped_off_to_on = (snow_is_on_at_dusk == False) & (
snow_is_on_at_dawn == True
)
# snow went away during the obscured period
snow_flipped_on_to_off = (snow_is_on_at_dusk == True) & (
snow_is_on_at_dawn == False
)
# return an array where the value is 1 if snow did not flip
# or 2 if snow flipped off to on
# or 3 if snow flipped on to off
transition_cases = [
snow_did_not_flip,
snow_flipped_off_to_on,
snow_flipped_on_to_off,
]
# mapped to the above
numeric_transition_cases = [1, 2, 3]
snow_transition_cases = np.select(
transition_cases, numeric_transition_cases, default=0
)
return snow_transition_cases
def create_dark_metric_computation(dark_tag, dark_is_on, chunky_ds, tag_prefix=None):
"""Create a dictionary of delayed computations for darkness metrics.
This is a look-up-table for how each metric gets computed.
Args:
dark_tag (str): One of 'cloud' or 'night'.
dark_is_on (xr.DataArray): The obscured condition.
chunky_ds (xr.Dataset): The chunked CGF snow cover data.
Returns:
dict: A dictionary of delayed computations for darkness metrics."""
dark_metrics = dict()
if tag_prefix is not None:
dark_tag = f"{tag_prefix}_{dark_tag}"
dark_metrics.update({f"{dark_tag}_obscured_day_count": count_darkness(dark_is_on)})
dark_metrics.update(
{
f"dusk_index_of_last_obs_prior_to_{dark_tag}": get_dusk_index(
get_first_obscured_occurence_index(dark_is_on)
)
}
)
dark_metrics.update(
{
f"dawn_index_of_first_obs_after_{dark_tag}": get_dawn_index(
get_last_obscured_occurence_index(dark_is_on),
chunky_ds.time.size,
)
}
)
dark_metrics.update(
{
f"median_index_of_{dark_tag}_period": get_median_obscured_index(
dark_metrics[f"dusk_index_of_last_obs_prior_to_{dark_tag}"],
dark_metrics[f"dawn_index_of_first_obs_after_{dark_tag}"],
)
}
)
dark_metrics.update(
{
f"value_at_{dark_tag}_dusk": get_dusk_observation(
dark_metrics[f"dusk_index_of_last_obs_prior_to_{dark_tag}"],
chunky_ds,
),
}
)
dark_metrics.update(
{
f"value_at_{dark_tag}_dawn": get_dawn_observation(
dark_metrics[f"dawn_index_of_first_obs_after_{dark_tag}"],
chunky_ds,
),
}
)
dark_metrics.update(
{
f"snow_is_on_at_{dark_tag}_dusk": is_snow_on_at_dusk(
dark_metrics[f"value_at_{dark_tag}_dusk"]
)
}
)
dark_metrics.update(
{
f"snow_is_on_at_{dark_tag}_dawn": is_snow_on_at_dawn(
dark_metrics[f"value_at_{dark_tag}_dawn"]
)
}
)
dark_metrics.update(
{
f"snow_transition_cases_{dark_tag}": get_snow_transition_cases(
dark_metrics[f"snow_is_on_at_{dark_tag}_dusk"],
dark_metrics[f"snow_is_on_at_{dark_tag}_dawn"],
)
}
)
return dark_metrics
def write_dark_metric(dark_metric_name, computation_di):
"""Trigger the dark metric computation and write to disk with `write_tagged_geotiff`
Args:
dark_metric_name (str): name of the dark metric to compute, must be key of computation_di
computation_di (dict): dict of computations generate with create_dark_metric_computation
Returns:
None: writes data to GeoTIFF file
"""
dark_metric_arr = computation_di[dark_metric_name].compute()
write_tagged_geotiff(
uncertainty_dir,
tile_id,
"",
dark_metric_name,
out_profile,
apply_mask(combined_mask, dark_metric_arr).astype("int16"),
)
if __name__ == "__main__":
log_file_path = os.path.join(os.path.expanduser("~"), "dark_and_cloud_metrics.log")
logging.basicConfig(filename=log_file_path, level=logging.INFO)
parser = argparse.ArgumentParser(
description="Compute metrics for cloud and polar/winter darkness periods."
)
parser.add_argument("tile_id", type=str, help="VIIRS Tile ID (ex. h11v02)")
# optional argument to compute metrics for a smoothed dataset
parser.add_argument(
"--smoothed_input", type=str, help="Suffix of smoothed input file."
)
args = parser.parse_args()
tile_id = args.tile_id
smoothed_input = args.smoothed_input
logging.info(
f"Computing winter darkness and cloud cover metrics for tile {tile_id}."
)
client = Client(memory_limit="64GiB", timeout="60s") # mem per Dask worker
if smoothed_input is not None:
logging.info(f"Using smoothed input file: {smoothed_input}")
fp = preprocessed_dir / f"snow_year_{SNOW_YEAR}_{tile_id}_{smoothed_input}.nc"
ds = open_preprocessed_dataset(fp, {"x": "auto", "y": "auto"}).to_dataarray()[0]
output_tag = smoothed_input
else:
fp = preprocessed_dir / f"snow_year_{SNOW_YEAR}_{tile_id}.nc"
ds = open_preprocessed_dataset(
fp, {"x": "auto", "y": "auto"}, "CGF_NDSI_Snow_Cover"
)
output_tag = "raw"
# intialize input and output parameters
out_profile = fetch_raster_profile(tile_id, {"dtype": "int16", "nodata": 0})
combined_mask = mask_dir / f"{tile_id}__mask_combined_{SNOW_YEAR}.tif"
for darkness_source in ["Night"]: # , "Cloud"]:
tag = darkness_source.lower()
di_tag = f"{output_tag}_{darkness_source.lower()}"
darkness_is_on = is_obscured(ds, darkness_source)
dark_computation_di = create_dark_metric_computation(
tag, darkness_is_on, ds, output_tag
)
# these are relatively rapid computes
write_dark_metric(f"{di_tag}_obscured_day_count", dark_computation_di)
write_dark_metric(
f"dusk_index_of_last_obs_prior_to_{di_tag}", dark_computation_di
)
write_dark_metric(
f"dawn_index_of_first_obs_after_{di_tag}", dark_computation_di
)
write_dark_metric(f"median_index_of_{di_tag}_period", dark_computation_di)
# more memory intensive because they rely on computing the above indices
write_dark_metric(f"value_at_{di_tag}_dusk", dark_computation_di)
write_dark_metric(f"value_at_{di_tag}_dawn", dark_computation_di)
write_dark_metric(f"snow_is_on_at_{di_tag}_dusk", dark_computation_di)
write_dark_metric(f"snow_is_on_at_{di_tag}_dawn", dark_computation_di)
write_dark_metric(f"snow_transition_cases_{di_tag}", dark_computation_di)
client.close()
ds.close()
print("Computation of cloud and darkness metrics complete.")