Skip to content

Commit

Permalink
add the additional modification for the integration with fink
Browse files Browse the repository at this point in the history
  • Loading branch information
FusRoman committed Feb 9, 2024
1 parent d6c391a commit 778995f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 36 deletions.
2 changes: 1 addition & 1 deletion fink_mm/conf/fink_mm.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ hdfs_gcn_storage=/user/roman.le-montagner/gcn_storage/raw
# They can be in local FS (/path/ or files:///path/) or
# in distributed FS (e.g. hdfs:///path/).
# Be careful though to have enough disk space!
online_ztf_data_prefix=fink_mm/test/test_data/ztf_test
online_ztf_data_prefix=fink_mm/test/test_data/ztf_test/online/science

# Prefix path on disk to save GRB join ZTF data (work for both online and offline).
online_grb_data_prefix=fink_mm/test/test_output
Expand Down
62 changes: 27 additions & 35 deletions fink_mm/ztf_join_gcn.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,30 +185,19 @@ def load_dataframe(
# connection to the ztf science stream
ztf_alert = connect_to_raw_database(
ztf_path
+ "/online/science/year={}/month={}/day={}".format(
night[0:4], night[4:6], night[6:8]
),
+ "/year={}/month={}/day={}".format(night[0:4], night[4:6], night[6:8]),
ztf_path
+ "/online/science/year={}/month={}/day={}".format(
night[0:4], night[4:6], night[6:8]
),
+ "/year={}/month={}/day={}".format(night[0:4], night[4:6], night[6:8]),
latestfirst=False,
)

userschema = spark.read.option("mergeSchema", True).parquet(gcn_path).schema
gcn_alert = (
spark.readStream.format("parquet")
.schema(userschema)
.option("basePath", gcn_path)
.option(
"path",
gcn_path + "/year={}/month={}/day=*?*".format(night[0:4], night[4:6]),
)
.option("latestFirst", True)
.option("mergeSchema", True)
.load()
# load all GCN since the beginning
gcn_alert = connect_to_raw_database(
gcn_path,
gcn_path,
latestfirst=False,
)
# keep gcn emitted during the day time until the end of the stream (17:00 Paris Time)
# keep gcn emitted between the last day time and the end of the current stream (17:00 Paris Time)
cur_time = Time(f"{night[0:4]}-{night[4:6]}-{night[6:8]}")
last_time = cur_time - timedelta(hours=7) # 17:00 Paris time yesterday
end_time = cur_time + timedelta(hours=17) # 17:00 Paris time today
Expand Down Expand Up @@ -414,11 +403,11 @@ def gcn_pre_join(

def ztf_join_gcn_stream(
mm_mode: DataMode,
ztf_datapath_prefix: str,
ztf_dataframe: DataFrame,
gcn_dataframe: DataFrame,
gcn_datapath_prefix: str,
night: str,
NSIDE: int,
time_window: int,
hdfs_adress: str,
ast_dist: float,
pansstar_dist: float,
Expand All @@ -433,10 +422,10 @@ def ztf_join_gcn_stream(
----------
mm_mode : DataMode
run this function in streaming or offline mode.
ztf_datapath_prefix : string
the prefix path where are stored the ztf alerts.
gcn_datapath_prefix : string
the prefix path where are stored the gcn alerts.
ztf_dataframe : DataFrame
streaming dataframe containing the ztf alerts.
gcn_dataframe : DataFrame
streaming dataframe containing the gcn alerts.
night : string
the processing night
NSIDE: String
Expand Down Expand Up @@ -472,14 +461,6 @@ def ztf_join_gcn_stream(
"science2mm_{}_{}{}{}".format(job_name, night[0:4], night[4:6], night[6:8])
)

ztf_dataframe, gcn_dataframe = load_dataframe(
spark,
ztf_datapath_prefix,
gcn_datapath_prefix,
night,
int(time_window),
mm_mode,
)
ztf_dataframe = ztf_pre_join(
ztf_dataframe, ast_dist, pansstar_dist, pansstar_star_score, gaia_dist, NSIDE
)
Expand Down Expand Up @@ -564,6 +545,8 @@ def ztf_join_gcn(
the maximum active time in second of the streaming process
tinterval : int
the processing interval time in second between the data batch
time_window : int
number of day in the past to load the gcn
hdfs_adress: string
HDFS adress used to instanciate the hdfs client from the hdfs package
ast_dist: float
Expand Down Expand Up @@ -625,13 +608,22 @@ def ztf_join_gcn(
"""
logger = init_logging()

ztf_dataframe, gcn_dataframe = load_dataframe(
spark,
ztf_datapath_prefix,
gcn_datapath_prefix,
night,
int(time_window),
mm_mode,
)

df_join_mm, spark = ztf_join_gcn_stream(
mm_mode,
ztf_datapath_prefix,
ztf_dataframe,
gcn_dataframe,
gcn_datapath_prefix,
night,
NSIDE,
time_window,
hdfs_adress,
ast_dist,
pansstar_dist,
Expand Down

0 comments on commit 778995f

Please sign in to comment.