Skip to content
Snippets Groups Projects
Commit 2c1a30b4 authored by enrgarc's avatar enrgarc
Browse files

TFM - Edificio LUCIA

parent 24b9ee73
No related branches found
No related tags found
No related merge requests found
#SERVER
SSL_KEYFILE=
SSL_CERTFILE=
PORT=
HOST=
WORKERS=
#BBDD
BBDD_HOST=
BBDD_USER=
BBDD_PASS=
BBDD_DATABASE=
\ No newline at end of file
#PUSHER
PUSHER_APP_ID=
PUSHER_KEY=
PUSHER_SECRET=
PUSHER_CLUSTER=eu
\ No newline at end of file
from fastapi.routing import APIRouter
from fastapi import BackgroundTasks
from starlette.requests import Request
from globals import *
import BD as bd
import json
import services.pipelineService as pipelineService
router = APIRouter(prefix="/pipeline")
@router.post("/getPipelines")
def getPipelines():
if(POSTGRESQL == True):
bd.execSQL("create table if not exists pipelines (idPipeline SERIAL primary key, name varchar(255) default null, description varchar(2000) default null, active boolean, execution int default null, dataset int default null, pipeline JSON DEFAULT NULL, createDate timestamp DEFAULT null, lastExecution timestamp default null)")
else:
bd.execSQL("create table if not exists pipelines (idPipeline int primary key AUTO_INCREMENT, name varchar(255) default null, description varchar(2000) default null, active boolean, execution int default null, dataset int default null, pipeline JSON DEFAULT NULL, createDate datetime DEFAULT null, lastExecution datetime default null) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci")
result = bd.selectFromSQL(
'select idpipeline as "idPipeline", name, description, active, execution, pipeline, createdate as "createDate", lastexecution as "lastExecution" from pipelines order by active desc')
return {
"status": True,
"data": result
}
@router.post("/deletePipeline")
async def deletePipeline(request: Request):
data = await request.json()
idPipeline = data['idPipeline']
bd.execSQL("delete from pipelines where idPipeline="+str(idPipeline))
return {
"status": True,
}
@router.post("/getCountActivePipelines")
def getCountActivePipelines():
if(POSTGRESQL == True):
bd.execSQL("create table if not exists pipelines (idPipeline SERIAL primary key, name varchar(255) default null, description varchar(2000) default null, active boolean, execution int default null, dataset int default null, pipeline JSON DEFAULT NULL, createDate timestamp DEFAULT null, lastExecution timestamp default null)")
else:
bd.execSQL("create table if not exists pipelines (idPipeline int primary key AUTO_INCREMENT, name varchar(255) default null, description varchar(2000) default null, active boolean, execution int default null, dataset int default null, pipeline JSON DEFAULT NULL, createDate datetime DEFAULT null, lastExecution datetime default null) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci")
result = bd.selectFromSQL(
"select count(*) as count from pipelines where active=TRUE")[0]
return {
"status": True,
"data": result
}
@router.post("/setActive")
async def setActive(request: Request):
data = await request.json()
idPipeline = data['idPipeline']
active = data['active']
bd.execSQL("update pipelines set active="+str(active) +
" where idPipeline="+str(idPipeline))
return {
"status": True
}
@router.post("/newPipeline")
async def newPipeline(request: Request, background_tasks: BackgroundTasks):
data = await request.json()
name = data['name']
description = data['description']
pipeline = json.dumps(data['pipeline'], ensure_ascii=False)
if(POSTGRESQL == True):
bd.execSQL("create table if not exists pipelines (idPipeline SERIAL primary key, name varchar(255) default null, description varchar(2000) default null, active boolean, execution int default null, dataset int default null, pipeline JSON DEFAULT NULL, createDate timestamp DEFAULT null, lastExecution timestamp default null)")
else:
bd.execSQL("create table if not exists pipelines (idPipeline int primary key AUTO_INCREMENT, name varchar(255) default null, description varchar(2000) default null, active boolean, execution int default null, dataset int default null, pipeline JSON DEFAULT NULL, createDate datetime DEFAULT null, lastExecution datetime default null) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci")
bd.execSQL("insert into pipelines (name, description, active, execution, pipeline, createDate, lastExecution) values('" +
name+"','"+description+"', true, 2, '"+pipeline+"',now() , now())")
pipeline = data['pipeline']
idPipeline = bd.selectFromSQL(
'select idPipeline as "idPipeline" from pipelines order by idPipeline desc limit 1')[0]['idPipeline']
background_tasks.add_task(
pipelineService.execPipeline, pipeline, idPipeline, description)
return {
"status": True,
}
@router.post("/execPipelines")
async def execPipelines(background_tasks: BackgroundTasks):
result = bd.selectFromSQL(
'select idpipeline as "idPipeline", name, description, active, execution, pipeline, createdate as "createDate", lastexecution as "lastExecution" from pipelines where active=TRUE')
for pipe in result:
background_tasks.add_task(pipelineService.execPipeline, json.loads(
pipe['pipeline']), pipe['idPipeline'], pipe['description'])
return {
"status": True,
}
import pusher
import os
from dotenv import load_dotenv
load_dotenv()
pusher_client = pusher.Pusher(
app_id=os.getenv('PUSHER_APP_ID'),
key=os.getenv('PUSHER_KEY'),
secret=os.getenv('PUSHER_SECRET'),
cluster=os.getenv('PUSHER_CLUSTER'),
ssl=True
)
#Uso de postgre o mysql
POSTGRESQL = False
......
......@@ -7,7 +7,6 @@ from fastapi.staticfiles import StaticFiles
import controllers.mlController as mlController
import controllers.datasetController as datasetController
import controllers.dataController as dataController
import controllers.pipelineController as pipelineController
import os
import BD as bd
# Variables globales
......@@ -19,7 +18,6 @@ app = FastAPI()
app.include_router(mlController.router)
app.include_router(datasetController.router)
app.include_router(dataController.router)
#app.include_router(pipelineController.router)
# Cors policy
origins = ["*"]
......@@ -51,7 +49,6 @@ def main():
def resetAppTables():
bd.execSQL("drop table if exists allData")
bd.execSQL("drop table if exists data")
bd.execSQL("drop table if exists pipelines")
bd.execSQL("drop table if exists datasets")
bd.execSQL("drop table if exists executions")
return {"status": True}
......
import uvicorn
import os
from dotenv import load_dotenv
load_dotenv()
class App:
if __name__ == "__main__":
uvicorn.run("main:app",
host=os.getenv('HOST'),
port=int(os.getenv('PORT')),
ssl_keyfile=os.getenv('SSL_KEYFILE'),
ssl_certfile=os.getenv('SSL_CERTFILE'),
workers=int(os.getenv('WORKERS'))
)
app = App()
from globals import *
import BD as bd
import json
import services.processData as pc
import pandas as pd
from scipy import stats
import numpy as np
import datetime
def execPipeline(pipeline, idPipeline, description):
STATUS = ""
try:
result = bd.selectFromSQL(
'select id, name, firstdate as "firstDate", lastdate as "lastDate", info, data from allData')
allData = {}
for d in result:
try:
info = json.loads(d['info'])
except:
info = d['info']
allData[d['name']] = {}
allData[d['name']]['firstDate'] = d['firstDate']
allData[d['name']]['lastDate'] = d['lastDate']
allData[d['name']]['nroRegistros'] = info['nroRegistros']
allData[d['name']]['max'] = info['max']
allData[d['name']]['min'] = info['min']
allData[d['name']]['type'] = info['type']
try:
allData[d['name']]['values'] = json.loads(d['data'])['values']
except:
allData[d['name']]['values'] = d['data']['values']
df = pd.DataFrame()
integrationType = ""
integrateData = None
keystoDelete = []
for item in pipeline:
if(item['action'] == 'filter'):
if(item['type'] == 'system'):
for key in allData:
includes = False
for system in item['attr']:
if system in key:
includes = True
if(includes == True):
keystoDelete.append(key)
if(item['type'] == 'types'):
for key in allData:
if(allData[key]['type'] not in item['attr']):
keystoDelete.append(key)
if(item['type'] == 'zscore'):
for key in allData:
objDict = {x['date']: float(x['value']) if float(
x['value']) != None else None for x in allData[key]['values']}
df = pd.DataFrame.from_dict(
objDict, orient='index', columns=[key])
df = df[(np.abs(stats.zscore(df)) < 3).all(axis=1)]
allData[key]['values'] = []
for i in range(len(df.index)):
obj = {}
obj["date"] = df.index[i]
obj['value'] = round(df[key][i])
allData[key]['values'].append(obj)
if(item['type'] == 'negativeValuesByType'):
for key in allData:
if(allData[key]['type'] not in item['attr']):
allData[key]['min'] = allData[key]['max']
for i in range(len(allData[key]['values']) - 1, -1, -1):
if (allData[key]['values'][i]['value'] < 0):
allData[key]['values'].pop(i)
else:
if (allData[key]['min'] > allData[key]['values'][i]['value']):
allData[key]['min'] = allData[key]['values'][i]['value']
if(item['type'] == 'dateRange'):
if(item['attr']['start'] != None and item['attr']['end'] != None):
for key in allData:
format = '%d/%m/%Y %H:%M:%S'
start = datetime.datetime.strptime(
item['attr']['start'], format)
end = datetime.datetime.strptime(
item['attr']['end'], format)
allData[key]['firstDate'] = start
allData[key]['lastDate'] = end
for i in range(len(allData[key]['values']) - 1, -1, -1):
date = datetime.datetime.strptime(
allData[key]['values'][i]['date'], '%Y-%m-%d %H:%M:%S')
if (end < date or start > date):
allData[key]['values'].pop(i)
allData[key]['nroRegistros'] -= 1
if(item['type'] == 'constants'):
for key in allData:
if(allData[key]['min'] == allData[key]['max']):
keystoDelete.append(key)
if(item['action'] == 'integrate'):
if(len(keystoDelete) != 0):
# Borrado de variables
for key in keystoDelete:
if key in allData:
del allData[key]
integrationType = item['attr']['type']
integrateData = pc.integrateData(
integrationType, allData, item['attr']['sumativeTypes'])
if(item['action'] == 'impute'):
df = pc.missingDataTreatment(item['attr'], integrateData)
df['date'] = df.index.astype(str)
df = df.to_dict('list')
jsonData = json.dumps(df, ensure_ascii=False)
result = bd.selectFromSQL(
'select iddataset as "idDataset" from datasets where name="' + 'Pipeline '+str(idPipeline)+'"')
if(len(result) == 0):
if(integrationType == 'integrationHour'):
integ = "Horas"
elif(integrationType == 'integrationDay'):
integ = "Dias"
else:
integ = "Semanas"
bd.execSQL("insert into datasets (name, description, integrationType, datesRange, data) values('" +
"Pipeline "+str(idPipeline)+"','"+description+"','"+integ+"', 'TODOS LOS DATOS','"+jsonData+"')")
else:
bd.execSQL("update datasets set data='"+jsonData +
"' where idDataset=" + str(result[0]['idDataset']))
bd.execSQL(
"update pipelines set execution=1, lastExecution=now() where idPipeline = " + str(idPipeline))
STATUS = "OK"
except:
bd.execSQL(
"update pipelines set execution=0, lastExecution=now() where idPipeline = " + str(idPipeline))
STATUS = "KO"
finally:
pusher_client.trigger('LUCIAAPP', 'PIPELINE-FINISHED', {'result': {
"pipeline": str(idPipeline),
"status": STATUS
}})
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment