diff --git a/fink_mm/conf/fink_mm.conf b/fink_mm/conf/fink_mm.conf index 3455f2e1..a0a7f86e 100644 --- a/fink_mm/conf/fink_mm.conf +++ b/fink_mm/conf/fink_mm.conf @@ -13,7 +13,7 @@ hdfs_gcn_storage=/user/roman.le-montagner/gcn_storage/raw # They can be in local FS (/path/ or files:///path/) or # in distributed FS (e.g. hdfs:///path/). # Be careful though to have enough disk space! -online_ztf_data_prefix=fink_mm/test/test_data/ztf_test +online_ztf_data_prefix=fink_mm/test/test_data/ztf_test/online/science # Prefix path on disk to save GRB join ZTF data (work for both online and offline). online_grb_data_prefix=fink_mm/test/test_output diff --git a/fink_mm/ztf_join_gcn.py b/fink_mm/ztf_join_gcn.py index 45f68031..bd0f4b66 100644 --- a/fink_mm/ztf_join_gcn.py +++ b/fink_mm/ztf_join_gcn.py @@ -185,30 +185,19 @@ def load_dataframe( # connection to the ztf science stream ztf_alert = connect_to_raw_database( ztf_path - + "/online/science/year={}/month={}/day={}".format( - night[0:4], night[4:6], night[6:8] - ), + + "/year={}/month={}/day={}".format(night[0:4], night[4:6], night[6:8]), ztf_path - + "/online/science/year={}/month={}/day={}".format( - night[0:4], night[4:6], night[6:8] - ), + + "/year={}/month={}/day={}".format(night[0:4], night[4:6], night[6:8]), latestfirst=False, ) - userschema = spark.read.option("mergeSchema", True).parquet(gcn_path).schema - gcn_alert = ( - spark.readStream.format("parquet") - .schema(userschema) - .option("basePath", gcn_path) - .option( - "path", - gcn_path + "/year={}/month={}/day=*?*".format(night[0:4], night[4:6]), - ) - .option("latestFirst", True) - .option("mergeSchema", True) - .load() + # load all GCN since the beginning + gcn_alert = connect_to_raw_database( + gcn_path, + gcn_path, + latestfirst=False, ) - # keep gcn emitted during the day time until the end of the stream (17:00 Paris Time) + # keep gcn emitted between the last day time and the end of the current stream (17:00 Paris Time) cur_time = Time(f"{night[0:4]}-{night[4:6]}-{night[6:8]}") last_time = cur_time - timedelta(hours=7) # 17:00 Paris time yesterday end_time = cur_time + timedelta(hours=17) # 17:00 Paris time today @@ -414,11 +403,11 @@ def gcn_pre_join( def ztf_join_gcn_stream( mm_mode: DataMode, - ztf_datapath_prefix: str, + ztf_dataframe: DataFrame, + gcn_dataframe: DataFrame, gcn_datapath_prefix: str, night: str, NSIDE: int, - time_window: int, hdfs_adress: str, ast_dist: float, pansstar_dist: float, @@ -433,10 +422,10 @@ def ztf_join_gcn_stream( ---------- mm_mode : DataMode run this function in streaming or offline mode. - ztf_datapath_prefix : string - the prefix path where are stored the ztf alerts. - gcn_datapath_prefix : string - the prefix path where are stored the gcn alerts. + ztf_dataframe : DataFrame + streaming dataframe containing the ztf alerts. + gcn_dataframe : DataFrame + streaming dataframe containing the gcn alerts. night : string the processing night NSIDE: String @@ -472,14 +461,6 @@ def ztf_join_gcn_stream( "science2mm_{}_{}{}{}".format(job_name, night[0:4], night[4:6], night[6:8]) ) - ztf_dataframe, gcn_dataframe = load_dataframe( - spark, - ztf_datapath_prefix, - gcn_datapath_prefix, - night, - int(time_window), - mm_mode, - ) ztf_dataframe = ztf_pre_join( ztf_dataframe, ast_dist, pansstar_dist, pansstar_star_score, gaia_dist, NSIDE ) @@ -564,6 +545,8 @@ def ztf_join_gcn( the maximum active time in second of the streaming process tinterval : int the processing interval time in second between the data batch + time_window : int + number of day in the past to load the gcn hdfs_adress: string HDFS adress used to instanciate the hdfs client from the hdfs package ast_dist: float @@ -625,13 +608,22 @@ def ztf_join_gcn( """ logger = init_logging() + ztf_dataframe, gcn_dataframe = load_dataframe( + spark, + ztf_datapath_prefix, + gcn_datapath_prefix, + night, + int(time_window), + mm_mode, + ) + df_join_mm, spark = ztf_join_gcn_stream( mm_mode, - ztf_datapath_prefix, + ztf_dataframe, + gcn_dataframe, gcn_datapath_prefix, night, NSIDE, - time_window, hdfs_adress, ast_dist, pansstar_dist,