-
Notifications
You must be signed in to change notification settings - Fork 93
/
Copy pathload_sas7bdat_from_zip.py
80 lines (65 loc) · 2.24 KB
/
load_sas7bdat_from_zip.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
"""
Data Recipe to load sas7bdat datasets from a zip file.
Just include this script inside the zip and upload it as a data recipe.
"""
from typing import Union, List
from h2oaicore.data import CustomData
from h2oaicore.systemutils import user_dir, config
import datatable as dt
import numpy as np
import pandas as pd
import os
import glob
import uuid
from zipfile import ZipFile
_global_modules_needed_by_name = ["sas7bdat"]
from sas7bdat import SAS7BDAT
FILE_EXTENSION = ".sas7bdat"
class SAS7BDATLoadFromZip(CustomData):
@staticmethod
def create_data(
X: dt.Frame = None,
) -> Union[
str,
List[str],
dt.Frame,
List[dt.Frame],
np.ndarray,
List[np.ndarray],
pd.DataFrame,
List[pd.DataFrame],
]:
zip_location = os.path.join(config.data_directory, "uploads")
zip_files = glob.glob(os.path.join(zip_location, "*.zip"))
if not zip_files:
raise ValueError(
f"No zip files found, please create a zip archive including"
+ f"all {FILE_EXTENSION} data files you want to load and this .py script."
)
latest_zip = max(zip_files, key=os.path.getctime)
zip_fl = ZipFile(latest_zip)
data_files = [
f
for f in zip_fl.namelist()
if (f.endswith(FILE_EXTENSION) and not f.startswith("__MACOSX"))
]
if data_files is None:
return ValueError(f"No file with {FILE_EXTENSION} extension found!")
temp_path = os.path.join(
user_dir(),
config.contrib_relative_directory,
"extract_data_%s" % str(uuid.uuid4()),
)
os.makedirs(temp_path, exist_ok=True)
for fl in data_files:
zip_fl.extract(fl, path=temp_path)
data_sets = {}
for f in data_files:
full_data_path = os.path.join(temp_path, f)
if not os.path.exists(full_data_path):
raise ValueError("File <<" + full_data_path + ">> does not exists!")
with SAS7BDAT(full_data_path, skip_header=False) as reader:
X = reader.to_data_frame()
print(X.head())
data_sets.update({f: X})
return data_sets