Skip to content
Snippets Groups Projects
Commit ad90a197 authored by maryagu's avatar maryagu
Browse files

Delete TFG_Marina_Yague.py

parent 86a413ae
No related branches found
No related tags found
No related merge requests found
import datetime
import os
import scipy
import numpy as np
import tensorflow as tf
tf.config.run_functions_eagerly(True)
tf.data.experimental.enable_debug_mode()
import antropy as ant
import time
import h5py
import random
from random import randrange
import csv
import matplotlib.pyplot as plt
from SignalSimulator import eeg_gen
class Environment:
def __init__(self):
self.sampling_rate = int(1/0.0005)
self.frequency = 120
self.intensity = 20
self.corrector_reward = 0
def get_general_path(self):
fecha_hoy = datetime.date.today().strftime('%Y%m%d')
general_path = "."+ "\\"+ "air_bio_1_1\BRAIN SLICE MEA RECORDING\DISCONNECTED SLICES" + "\\"+ fecha_hoy
return general_path
def get_files_path(self,general_path):
carpetas_y_archivos = {}
print(general_path)
for raiz, directorios, archivos in os.walk(general_path):
carpeta_actual = os.path.basename(raiz)
primer_archivo = archivos[0] if archivos else None
if primer_archivo:
ruta_primer_archivo = os.path.join(raiz, primer_archivo)
carpetas_y_archivos[carpeta_actual] = ruta_primer_archivo
else: continue
path_mat_file = carpetas_y_archivos.popitem()[1]
path_label = carpetas_y_archivos.popitem()[1]
return path_label,path_mat_file
def get_signal(self):
eeg_generator = eeg_gen(1,self.frequency)
eeg = []
B = []
G = []
target = []
for i in eeg_generator:
eeg.append(i[0])
B.append(i[1])
G.append(i[2])
target.append(i[3])
signal = eeg
divided_flags = target
return signal, divided_flags
def step(self, action):
if action == 0:
current_freq = self.frequency
noise = int(np.random.uniform(-10, 10))
new_freq = current_freq + noise
if new_freq > 200:
new_freq = 200
elif new_freq < 0:
new_freq = 0
self.frequency = new_freq
else:
current_intensity = self.intensity
noise = np.random.uniform(-10, 10)
new_intensity = current_intensity + noise
self.intensity = new_intensity
def stimulator(self,raw_signal):
bottom_freq = 75
top_freq = 200
t = np.arange(0, len(raw_signal), 1/self.sampling_rate)
if self.frequency<=top_freq and self.frequency>=bottom_freq:
stim_signal = self.intensity * np.sin(2*np.pi*self.frequency*t)
elif self.frequency>top_freq:
stim_signal = self.intensity * np.sin(2*np.pi*top_freq*t)
else:
stim_signal = self.intensity * np.sin(2*np.pi*bottom_freq*t)
stim_signal = stim_signal[:len(raw_signal)]
return stim_signal
def interference(self,observation,stim_signal):
result = observation + stim_signal
return result
def get_reward(self, flags):
num_zeros = sum(1 for elem in flags if elem == 0)
total_elements = len(flags)
zero_percentage = num_zeros / total_elements
if zero_percentage >= 0.5:
reward = min(1, (zero_percentage - 0.5) / 0.5)
else:
reward = -min(1, (0.5 - zero_percentage) / 0.5)
reward = reward + self.corrector_reward
self.corrector_reward = 0
return reward
class Agent:
def __init__(self, num_actions, learning_rate, discount_factor, epsilon, num_features, num_layers, num_neurons, loss_fn, optimizer ):
self.num_actions = num_actions
self.learning_rate = learning_rate
self.discount_factor = discount_factor
self.epsilon = epsilon
self.num_features = num_features
self.num_layers = num_layers
self.num_neurons = num_neurons
self.loss_fn = loss_fn
self.optimizer = optimizer
def model(self):
model = tf.keras.models.Sequential()
for i in range(self.num_layers):
model.add(tf.keras.layers.Dense(self.num_neurons, activation='relu'))
model.add(tf.keras.layers.Dense(self.num_actions, activation='softmax'))
model.compile(loss=self.loss_fn, optimizer=self.optimizer, run_eagerly=True)
return model
def select_action(self,states, model):
random = np.random.uniform(0,1)
if random < self.epsilon:
action = np.random.randint(self.num_actions)
else:
q_values = model.predict(np.expand_dims(states, axis=0))
action = np.argmax(q_values,axis=0)
return action
@tf.function
def update_model(self,states, action, next_states, reward, model):
with tf.GradientTape() as tape:
q_values = model(states)
predict_q_values = model.predict(np.expand_dims(next_states, axis=0))
predict_q_values = np.squeeze(predict_q_values)
target_q_values = q_values.numpy()
target_q_values[0, action] = reward + self.discount_factor * np.max(predict_q_values)
loss = self.loss_fn(q_values, target_q_values)
grads = tape.gradient(loss, model.trainable_weights)
self.optimizer.apply_gradients(zip(grads, model.trainable_weights))
return model
def create_window(self,signal,t,fs):
n = len(signal)
hanning = np.hanning(t)
se_completa = []
for i in range(n-t):
window = np.multiply(signal[i:i+t],hanning)
se_window = ant.spectral_entropy(window,fs, method='fft')
se_completa = np.append(se_completa, se_window)
return se_completa
env = Environment()
num_actions = 2
learning_rate = 0.003
discount_factor = 0.7
epsilon = 0.07
num_features = 1
num_layers = 4
num_neurons = 32
loss_fn = tf.keras.losses.CategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
agent = Agent(num_actions, learning_rate, discount_factor, epsilon, num_features, num_layers, num_neurons, loss_fn, optimizer)
state_size = num_features + 1
states= np.zeros(state_size)
model = agent.model()
cumulative_reward = []
sucess_rate = 0
sum_reward = 0
sum_flags = 0
cummulative_sucess_rate = []
cummulative_seizures = []
tuned_freq = []
for k in range(1500):
mat, label = env.get_signal()
t = 50
se_data = agent.create_window(mat, t, env.sampling_rate)
training_data_size = int(len(se_data) * 0.7)
training_data = se_data[:training_data_size]
training_labels = label[:training_data_size]
validation_data_size = int(len(se_data) * 0.3)
validation_data = se_data[validation_data_size:]
validation_labels = label[validation_data_size:]
if k == 0:
input_data = np.column_stack((training_data, training_labels))
X_test = np.column_stack((validation_data, validation_labels))
y_test = np.zeros((len(X_test), num_actions))
target_values = np.zeros((len(input_data), num_actions))
size = 64
model.fit(input_data,target_values, batch_size=size, epochs=10, verbose=2)
model.evaluate(X_test, y_test, verbose=2)
epochs = 10
raw_data,raw_labels = env.get_signal()
batch_size = int(len(raw_data)/epochs)
total_ictal_flags = 0
for epoch in range(epochs):
observation = raw_data[epoch*batch_size:(epoch+1)*batch_size]
flags = raw_labels[epoch*batch_size:(epoch+1)*batch_size]
sp = agent.create_window(observation, t, env.sampling_rate)
flags = flags[t:len(sp)+t]
states = np.column_stack((sp, flags))
action = agent.select_action(states, model)
env.step(action)
stim_signal = env.stimulator(observation)
result_signal = env.interference(observation, stim_signal)
next_flags = raw_labels[(epoch+1)*batch_size:(epoch+2)*batch_size]
next_sp = agent.create_window(result_signal, t, env.sampling_rate)
next_flags = next_flags[t:len(next_sp)+t]
next_states = np.column_stack((next_sp, next_flags))
reward = env.get_reward(next_flags)
sum_reward += reward
cumulative_reward.append(sum_reward)
model = agent.update_model(states, action, next_states, reward, model)
total_ictal_flags+=np.sum(np.asarray(flags)==1)
sum_flags += total_ictal_flags
cummulative_seizures.append(sum_flags)
if total_ictal_flags == 0:
sucess_rate += 1
cummulative_sucess_rate.append(sucess_rate)
header = ['cumulative_reward','cummulative_sucess_rate','cummulative_seizures','tuned_freq']
data_to_save =[cumulative_reward,cummulative_sucess_rate,cummulative_seizures,tuned_freq]
checkpoint_path = "training_{}/".format(h)
checkpoint_dir = os.path.dirname(checkpoint_path)
model.save(checkpoint_dir)
with open('training_{}/results.csv'.format(h),'w',encoding='UTF8',newline='') as f:
writer = csv.writer(f)
writer.writerow(header)
writer.writerow(data_to_save)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment