FunWithAI/regression_gas_prices.py

133 lines
3.3 KiB
Python

import pymysql
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import sql_connection_handler as sql
np.set_printoptions(precision=3, suppress=True)
def get_data_from_sql() -> pd.DataFrame:
conn = sql.get_db_connection()
cur = conn.cursor()
query = 'SELECT timestamp, price FROM prices WHERE fuel_type = "E5" AND station = 1'
if not cur.execute(query):
raise pymysql.Error("Error loading data from SQL")
res = cur.fetchall()
raw_data = pd.DataFrame(res)
return raw_data
def prepare_data(dataset: pd.DataFrame):
dataset = dataset.dropna()
# Split into training and test data
train_dataset = dataset.sample(frac=0.8, random_state=0)
test_dataset = dataset.drop(train_dataset.index)
# Split into features and labels
train_features = train_dataset.copy()
test_features = test_dataset.copy()
train_labels = train_features.pop(1)
test_labels = test_features.pop(1)
return train_features, test_features, train_labels, test_labels
def normalize_data(train_features, test_features, train_labels, test_labels):
normalizer = tf.keras.layers.Normalization(axis=-1)
normalizer.adapt(np.asarray(train_features).astype('float32'))
price = np.asarray(train_features[0]).astype('float32')
price_normalizer = layers.Normalization(input_shape=[1, ], axis=None)
price_normalizer.adapt(price)
return price_normalizer
def generate_model(price_normalizer):
price_model = tf.keras.Sequential([
price_normalizer,
layers.Dense(units=1)
])
price_model.compile(
optimizer=tf.optimizers.Adam(learning_rate=0.1),
loss='mean_absolute_error'
)
return price_model
def train_model(price_model, train_features, train_labels):
history = price_model.fit(
np.asarray(train_features[0]).astype('float32'),
train_labels,
epochs=100,
verbose=0,
validation_split=0.2
)
# Show loss plot
plot_loss(history)
return price_model
def collect_results(price_model, test_features, test_labels):
test_results = {}
test_results['price_model'] = price_model.evaluate(
np.asarray(train_features[0]).astype('float32'),
test_labels,
verbose=0
)
print(test_results)
def predict_prices(price_model, train_features, train_labels):
x = tf.linspace(0.0, 250, 251)
y = price_model.predict(x)
plot_prices(x, y, train_features, train_labels)
def plot_loss(history):
plt.plot(history.history['loss'], label='loss')
plt.plot(history.history['val_loss'], label='val_loss')
plt.ylim([0, 10])
plt.xlabel('Epoch')
plt.ylabel('Error [MPG]')
plt.legend()
plt.grid(True)
plt.show()
def plot_prices(x, y, train_features, train_labels):
plt.scatter(train_features[0], train_labels, label='Data')
plt.plot(x, y, color='k', label='Predictions')
plt.xlabel('Datetime')
plt.ylabel('Price')
plt.legend()
plt.show()
if __name__ == '__main__':
dataset = get_data_from_sql().copy()
train_features, test_features, train_labels, test_labels = prepare_data(dataset)
price_normalizer = normalize_data(train_features, test_features, train_labels, test_labels)
price_model = generate_model(price_normalizer)
price_model = train_model(price_model, train_features, train_labels)
collect_results(price_model, test_features, test_labels)
predict_prices(price_model)