-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
92 lines (74 loc) · 2.71 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from sensor.configuration.mongo_db_connection import MongoDBClient
from sensor.exception import SensorException
import os,sys
from sensor.logger import logging
from sensor.pipeline import training_pipeline
from sensor.pipeline.training_pipeline import TrainPipeline
import os
from sensor.utils.main_utils import read_yaml_file
from sensor.constant.training_pipeline import SAVED_MODEL_DIR
from fastapi import FastAPI
from sensor.constant.application import APP_HOST, APP_PORT
from starlette.responses import RedirectResponse
from uvicorn import run as app_run
from fastapi.responses import Response
from sensor.ml.model.estimator import ModelResolver,TargetValueMapping
from sensor.utils.main_utils import load_object
from fastapi.middleware.cors import CORSMiddleware
import os
env_file_path=os.path.join(os.getcwd(),"env.yaml")
def set_env_variable(env_file_path):
if os.getenv('MONGO_DB_URL',None) is None:
env_config = read_yaml_file(env_file_path)
os.environ['MONGO_DB_URL']=env_config['MONGO_DB_URL']
app = FastAPI()
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/", tags=["authentication"])
async def index():
return RedirectResponse(url="/docs")
@app.get("/train")
async def train_route():
try:
train_pipeline = TrainPipeline()
if train_pipeline.is_pipeline_running:
return Response("Training pipeline is already running.")
train_pipeline.run_pipeline()
return Response("Training successful !!")
except Exception as e:
return Response(f"Error Occurred! {e}")
@app.get("/predict")
async def predict_route():
try:
#get data from user csv file
#conver csv file to dataframe
df=None
model_resolver = ModelResolver(model_dir=SAVED_MODEL_DIR)
if not model_resolver.is_model_exists():
return Response("Model is not available")
best_model_path = model_resolver.get_best_model_path()
model = load_object(file_path=best_model_path)
y_pred = model.predict(df)
df['predicted_column'] = y_pred
df['predicted_column'].replace(TargetValueMapping().reverse_mapping(),inplace=True)
#decide how to return file to user.
except Exception as e:
raise Response(f"Error Occured! {e}")
def main():
try:
set_env_variable(env_file_path)
training_pipeline = TrainPipeline()
training_pipeline.run_pipeline()
except Exception as e:
print(e)
logging.exception(e)
if __name__=="__main__":
#main()
# set_env_variable(env_file_path)
app_run(app, host=APP_HOST, port=APP_PORT)