Prepare pyspark environment
If you have already installed pyspark, skip here!
Pull Jupyter Notebook for Pyspark via docker!
If you do not know how to use docker, please follow Quick Guide: Docker for Jupyter Notebook.
Note: only use ‘jupyter/pyspark-notebook’ instead of ‘jupyter/scipy-notebook’. Easy right :)?
Dataset
Please use this dataset. If you have a dateset for regression problem, use your own it!!
This dataset consists of time series, hourly electricity prices (DE/LU[€/MWh]), electricity consumption, and electricity generation.
Now we want to predict hourly electricity prices (DE/LU[€/MWh]) using the rest of variables.
Let’s predict the prices together using only pyspark!!
Note: Check official pyspark documents when you want to learn more.
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, count, isnan, when
# Create your spark dataset!
spark = SparkSession.builder.appName('Pyspark Practice').getOrCreate()
df = spark.read.csv('dataset.csv', header=True)
type(df)
pyspark.sql.dataframe.DataFrame
# show() like pandas's head()
df.show(3)
+-----+--------------------+----+-----+---+----+-----------+------------+-----------------+------------------+-----------------+-------+----------+
|Index| Date|Year|Month|Day|Hour|Day of week|DE/LU[€/MWh]|Comsumption [MWh]|Wind offshore[MWh]|Wind onshore[MWh]|PV[MWh]|Other[MWh]|
+-----+--------------------+----+-----+---+----+-----------+------------+-----------------+------------------+-----------------+-------+----------+
| 0|2018-10-01 00:00:...|2018| 10| 1| 0| 0| 59.53| 44217.25| 1617.25| 4242.0| 0.0| 42563.75|
| 1|2018-10-01 01:00:...|2018| 10| 1| 1| 0| 56.1| 42760.25| 1657.5| 4096.5| 0.0| 41708.0|
| 2|2018-10-01 02:00:...|2018| 10| 1| 2| 0| 51.41| 42239.5| 1750.5| 4152.0| 0.0| 40940.5|
+-----+--------------------+----+-----+---+----+-----------+------------+-----------------+------------------+-----------------+-------+----------+
only showing top 3 rows
# Drop useless columns
df = df.drop('Index','Date')
print('Nomber of rows:',df.count())
print('Nomber of columns:',len(df.columns))
Nomber of rows: 17448
Nomber of columns: 11
# All variables are string. Let's convert to float later.
df.printSchema()
root
|-- Year: string (nullable = true)
|-- Month: string (nullable = true)
|-- Day: string (nullable = true)
|-- Hour: string (nullable = true)
|-- Day of week: string (nullable = true)
|-- DE/LU[€/MWh]: string (nullable = true)
|-- Comsumption [MWh]: string (nullable = true)
|-- Wind offshore[MWh]: string (nullable = true)
|-- Wind onshore[MWh]: string (nullable = true)
|-- PV[MWh]: string (nullable = true)
|-- Other[MWh]: string (nullable = true)
df.columns
['Year',
'Month',
'Day',
'Hour',
'Day of week',
'DE/LU[€/MWh]',
'Comsumption [MWh]',
'Wind offshore[MWh]',
'Wind onshore[MWh]',
'PV[MWh]',
'Other[MWh]']
# cast can convert a column into 'int'/'float'/'string'
df_new = df.select(*(col(i).cast('float') for i in df.columns))
# Now all variables are float.
df_new.printSchema()
root
|-- Year: float (nullable = true)
|-- Month: float (nullable = true)
|-- Day: float (nullable = true)
|-- Hour: float (nullable = true)
|-- Day of week: float (nullable = true)
|-- DE/LU[€/MWh]: float (nullable = true)
|-- Comsumption [MWh]: float (nullable = true)
|-- Wind offshore[MWh]: float (nullable = true)
|-- Wind onshore[MWh]: float (nullable = true)
|-- PV[MWh]: float (nullable = true)
|-- Other[MWh]: float (nullable = true)
# Check null in each column
df_new.select([count(when(col(i).isNull(), i)).alias(i) for i in df_new.columns]).show()
+----+-----+---+----+-----------+------------+-----------------+------------------+-----------------+-------+----------+
|Year|Month|Day|Hour|Day of week|DE/LU[€/MWh]|Comsumption [MWh]|Wind offshore[MWh]|Wind onshore[MWh]|PV[MWh]|Other[MWh]|
+----+-----+---+----+-----------+------------+-----------------+------------------+-----------------+-------+----------+
| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|
+----+-----+---+----+-----------+------------+-----------------+------------------+-----------------+-------+----------+
# Drop 'DE/LU[€/MWh]' because it is our target variable (dependent variable)
features = df_new.drop('DE/LU[€/MWh]')
features.columns
['Year',
'Month',
'Day',
'Hour',
'Day of week',
'Comsumption [MWh]',
'Wind offshore[MWh]',
'Wind onshore[MWh]',
'PV[MWh]',
'Other[MWh]']
# From now we are going to use 'Pyspark ML'.
# Pyspark ML can train only a column.
# All variables are merged into a column.
assembler = VectorAssembler(inputCols=features.columns, outputCol='features')
# Transform multiple columns into a vector column.
output = assembler.transform(df_new)
# Now you can see new variable at end
output.show(3)
+------+-----+---+----+-----------+------------+-----------------+------------------+-----------------+-------+----------+--------------------+
| Year|Month|Day|Hour|Day of week|DE/LU[€/MWh]|Comsumption [MWh]|Wind offshore[MWh]|Wind onshore[MWh]|PV[MWh]|Other[MWh]| features|
+------+-----+---+----+-----------+------------+-----------------+------------------+-----------------+-------+----------+--------------------+
|2018.0| 10.0|1.0| 0.0| 0.0| 59.53| 44217.25| 1617.25| 4242.0| 0.0| 42563.75|[2018.0,10.0,1.0,...|
|2018.0| 10.0|1.0| 1.0| 0.0| 56.1| 42760.25| 1657.5| 4096.5| 0.0| 41708.0|[2018.0,10.0,1.0,...|
|2018.0| 10.0|1.0| 2.0| 0.0| 51.41| 42239.5| 1750.5| 4152.0| 0.0| 40940.5|[2018.0,10.0,1.0,...|
+------+-----+---+----+-----------+------------+-----------------+------------------+-----------------+-------+----------+--------------------+
only showing top 3 rows
# As you can see, the vector column represents each variable values.
output.select('features').toPandas().values
array([[DenseVector([2018.0, 10.0, 1.0, 0.0, 0.0, 44217.25, 1617.25, 4242.0, 0.0, 42563.75])],
[DenseVector([2018.0, 10.0, 1.0, 1.0, 0.0, 42760.25, 1657.5, 4096.5, 0.0, 41708.0])],
[DenseVector([2018.0, 10.0, 1.0, 2.0, 0.0, 42239.5, 1750.5, 4152.0, 0.0, 40940.5])],
...,
[DenseVector([2020.0, 9.0, 30.0, 21.0, 2.0, 56709.0, 3545.0, 6265.0, 0.0, 48341.0])],
[DenseVector([2020.0, 9.0, 30.0, 22.0, 2.0, 53270.0, 3932.0, 7375.0, 0.0, 43090.0])],
[DenseVector([2020.0, 9.0, 30.0, 23.0, 2.0, 49239.0, 4155.0, 8184.0, 0.0, 37611.0])]],
dtype=object)
# Use only the vector and target variable.
data = output.select('features','DE/LU[€/MWh]')
data.show(3)
+--------------------+------------+
| features|DE/LU[€/MWh]|
+--------------------+------------+
|[2018.0,10.0,1.0,...| 59.53|
|[2018.0,10.0,1.0,...| 56.1|
|[2018.0,10.0,1.0,...| 51.41|
+--------------------+------------+
only showing top 3 rows
# split dataset into a training set and a test set.
train_df, test_df = data.randomSplit([0.7,0.3])
# Use random forest regressor. If you want to use other machine learning algorithm, see official pyspark documents.
RFE = RandomForestRegressor(featuresCol='features', labelCol='DE/LU[€/MWh]',numTrees=10,maxDepth=10)
model = RFE.fit(train_df)
predictions = model.transform(test_df)
predictions.show(3)
+--------------------+------------+------------------+
| features|DE/LU[€/MWh]| prediction|
+--------------------+------------+------------------+
|[2018.0,10.0,1.0,...| 56.1|49.747462429912765|
|[2018.0,10.0,1.0,...| 47.38| 49.00691417262996|
|[2018.0,10.0,1.0,...| 51.61| 49.23819893025762|
+--------------------+------------+------------------+
only showing top 3 rows
evaluator_rmse = RegressionEvaluator(labelCol='DE/LU[€/MWh]', predictionCol='prediction',metricName='rmse')
print('RMSE:',round(evaluator_rmse.evaluate(predictions),3))
RMSE: 5.933
evaluator_r2 = RegressionEvaluator(labelCol='DE/LU[€/MWh]', predictionCol='prediction',metricName='r2')
r2 = evaluator_r2.evaluate(predictions)
adj_r2 = 1 - (1-r2)*(predictions.count()-1)/(predictions.count()-len(features.columns)-1)
print('Adjusted R Squared:',round(adj_r2,3))
Adjusted R Squared: 0.893