Databricks Linear Regression With Spark ML

Databricks Linear Regression With Spark ML

Apache Spark has a library for different types of machine learning models. In this tutorial, we will talk about how to use Databricks to implement the spark ML linear regression model. We will cover:

  • What’s the difference between Spark MLlib and Spark ML?
  • How to process the data to the right format?
  • How to fit a Spark ML linear regression model?
  • How to evaluate model performance?
  • How to save the model?
  • How to make predictions for new data?

Resources for this post:

Databricks Linear Regression With Spark ML – GrabNGoInfo.com

Step 0: Spark MLlib Vs. Spark ML

Firstly, let’s talk about the difference between spark MLlib and spark ML.

spark.mllib is the legacy machine learning API built on RDDs. spark.ml is the new machine learning API based on dataframe.

The name MLlib includes both the RDD-based API and the dataframe-based API. The RDD-based API is now in maintenance mode, so there will not be new features added to the RDD-based API.

In this tutorial, we will use the dataframe-based API, and I suggest you use it as well.

Step 1: Import Libraries

In step 1, we will import the libraries. pandas is for data processing. make_regression is for creating synthetic modeling datasets.

From the pyspark.ml library, we imported VectorAssembler for feature formatting, LinearRegression for model training, RegressionEvaluator for model evaluation, Pipeline, and PipelineModel for pipeline creation and loading.

# Data processing
import pandas as pd

# Create synthetic dataset
from sklearn.datasets import make_regression

# Modeling
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline, PipelineModel

Step 2: Create Dataset For Linear Regression

In step 2, we will create a synthetic dataset for the linear regression model.

Using make_regression, a dataset with one million records is created. The dataset has two features, a bias of 2, one numeric dependence variable, and 30% noise. random_state ensures the randomly created dataset is reproducible. The random state does not have to be 42. It can be any number.

The output of make_regression is in array format. We convert it into a pandas dataframe, then convert it into a spark dataframe.

summary() gives us the summary statistics of the dataset.

# Create a synthetic dataset
X, y = make_regression(n_samples=1000000, n_features=2, noise=0.3, bias=2, random_state=42)

# Convert the data from numpy array to a pandas dataframe
pdf = pd.DataFrame({'feature1': X[:, 0], 'feature2': X[:, 1], 'dependent_variable': y})

# Convert pandas dataframe to spark dataframe
sdf = spark.createDataFrame(pdf)

# Check data summary statistics
display(sdf.summary())
Summary statistics for the synthetic data — Image from GrabNGoInfo.com

Step 3: Train Test Split

After creating the modeling dataset, in step 3, we will make the train test split.

Using randomSplit, we split the dataset into 80% training and 20% validation. seed=42 makes the random split results reproducible. However, we need to make sure that the same cluster and partition number are used when reproducing the split.

# Train test split
trainDF, testDF = sdf.randomSplit([.8, .2], seed=42)

# Print the number of records
print(f'There are {trainDF.cache().count()} records in the training dataset.')
print(f'There are {testDF.cache().count()} records in the testing dataset.')

We got 800,299 in the training dataset and 199,701 in the testing dataset after the split.

There are 800299 records in the training dataset.
There are 199701 records in the testing dataset.

Step 4: Vector Assembler

Linear regression expects a vector input as features, so in step 4, we will use VectorAssembler to transform the features into a vector format.

# Linear regression expect a vector input
vecAssembler = VectorAssembler(inputCols=['feature1', 'feature2'], outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF)

# Take a look at the data
display(vecTrainDF)

We can see that in the newly created column called “features”, the two features are listed in vector format.

Databricks Spark ML feature format — Image from GrabNGoInfo.com

Step 5: Fit Spark ML Linear Regression Model

In step 5, we will fit the linear regression model.

Firstly, the feature column and the label column are specified for the linear regression.

Then, the linear regression model is fit on the vectorized training dataset.

After that, the model intercept and coefficients are printed.

# Create linear regression
lr = LinearRegression(featuresCol="features", labelCol="dependent_variable")

# Fit the linear regresssion model
lrModel = lr.fit(vecTrainDF)

# Print model intercept and coefficients
print(f'The intercept of the model is {lrModel.intercept:.2f} and the coefficients of the model are {lrModel.coefficients[0]:.2f} and {lrModel.coefficients[1]:.2f}')

We can see that the model has an intercept of 2, and coefficients of 82.09 and 35.03 for the two features.

The intercept of the model is 2.00 and the coefficients of the model are 82.09 and 35.03

Alternatively, we can create a pipeline and fit the model on the pipeline. A pipeline usually includes both the data processing steps and the model fitting step.

# Create pipeline
stages = [vecAssembler, lr]
pipeline = Pipeline(stages=stages)

# Fit the pipeline model
pipelineModel = pipeline.fit(trainDF)

Step 6: Model Performance Evaluation

In step 6, we will evaluate the model performance using the testing dataset.

In order to do model performance evaluation, we need to first make predictions on the testing dataset. Using the pipelineModel created in the previous step, we can transform the testing dataset and make predictions with a single line of code.

# Make predictions on testing dataset
predDF = pipelineModel.transform(testDF)

# Take a look at the output
display(predDF.select("features", "dependent_variable", "prediction"))
Databricks Spark ML model performance evaluation — Image from GrabNGoInfo.com

After having the predicted values, we pass the prediction column name and the actual value column name into RegressionEvaluator.

metricName can be one of the following values:

  • rmse: Root mean squared error is the default value
  • mse: Mean squared error
  • r2: R square
  • mae: Mean absolute error
# Create regression evaluator
regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="dependent_variable", metricName="rmse")

# RMSE
rmse = regressionEvaluator.evaluate(predDF)
print(f"The RMSE for the linear regression model is {rmse:0.2f}")

# MSE
mse = regressionEvaluator.setMetricName("mse").evaluate(predDF)
print(f"The MSE for the linear regression model is {mse:0.2f}")

# R2
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"The R2 for the linear regression model is {r2:0.2f}")

# MAE
mae = regressionEvaluator.setMetricName("mae").evaluate(predDF)
print(f"The MAE for the linear regression model is {mae:0.2f}")
The RMSE for the linear regression model is 0.30
The MSE for the linear regression model is 0.09
The R2 for the linear regression model is 1.00
The MAE for the linear regression model is 0.24

We can also create a scatter plot to visually check the relationship between the actual value and the predicted value. To learn how to use the built-in charts function in Databricks notebook, please check my previous tutorial on Databricks Dashboard For Big Data.

Databricks Spark ML Linear Regression actual vs. prediction — Image from GrabNGoInfo.com

Step 7: Save Model

In step 7, we will save the pipeline model to AWS S3 bucket. To learn how to mount S3 bucket to Databricks, please check my previous tutorial Databricks Mount To AWS S3 And Import Data.

# Path to save the model
pipelinePath = '/mnt/demo4tutorial/model/linear_regression_pipeline_model'

# Save the model to the path
pipelineModel.write().overwrite().save(pipelinePath)

After saving the model to S3, we can confirm the model is in the bucket using the %fs ls command.

# Confirm the model is saved
%fs ls '/mnt/demo4tutorial/model/linear_regression_pipeline_model'

We can see that both the pipeline stages and the metadata are saved for the model.

Databricks Spark ML save model — Image from GrabNGoInfo.com

Step 8: Make Predictions For New Data

In step 8, we will go over how to make predictions for new data using the saved model.

We first create a new dataset with 1000 records. The number of features, the bias, and the noise is the same as the training dataset to ensure the new dataset follows the same distribution.

# Create a new synthetic dataset
X_new, y_new = make_regression(n_samples=1000, n_features=2, bias=2, noise=0.3, random_state=0)

# Convert the data from numpy array to a pandas dataframe
pdf_new = pd.DataFrame({'feature1': X_new[:, 0], 'feature2': X_new[:, 1], 'dependent_variable': y_new})

# Convert pandas dataframe to spark dataframe
sdf_new = spark.createDataFrame(pdf_new)

# Check data summary statistics
display(sdf_new.summary())

Then the pipeline model is loaded from the S3 bucket, and the prediction is made on the new dataset.

# Load the saved model
loadedPipelineModel = PipelineModel.load(pipelinePath)

# Make prediction for the new dataset
predDF_new = loadedPipelineModel.transform(sdf_new)

# Take a look at the data
display(predDF_new.select("features", "dependent_variable", "prediction"))

Step 9: Put All Code Together

###### Step 1: Import Libraries

# Data processing
import pandas as pd

# Create synthetic dataset
from sklearn.datasets import make_regression

# Modeling
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline, PipelineModel


###### Step 2: Create Dataset For Linear Regression

# Create a synthetic dataset
X, y = make_regression(n_samples=1000000, n_features=2, noise=0.3, bias=2, random_state=42)

# Convert the data from numpy array to a pandas dataframe
pdf = pd.DataFrame({'feature1': X[:, 0], 'feature2': X[:, 1], 'dependent_variable': y})

# Convert pandas dataframe to spark dataframe
sdf = spark.createDataFrame(pdf)

# Check data summary statistics
display(sdf.summary())


###### Step 3: Train Test Split

# Train test split
trainDF, testDF = sdf.randomSplit([.8, .2], seed=42)

# Print the number of records
print(f'There are {trainDF.cache().count()} records in the training dataset.')
print(f'There are {testDF.cache().count()} records in the testing dataset.')


###### Step 4: Vector Assembler

# Linear regression expect a vector input
vecAssembler = VectorAssembler(inputCols=['feature1', 'feature2'], outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF)

# Take a look at the data
display(vecTrainDF)


###### Step 5: Fit Spark ML Linear Regression Model

# Create linear regression
lr = LinearRegression(featuresCol="features", labelCol="dependent_variable")

# Fit the linear regresssion model
lrModel = lr.fit(vecTrainDF)

# Print model intercept and coefficients
print(f'The intercept of the model is {lrModel.intercept:.2f} and the coefficients of the model are {lrModel.coefficients[0]:.2f} and {lrModel.coefficients[1]:.2f}')

# Create pipeline
stages = [vecAssembler, lr]
pipeline = Pipeline(stages=stages)

# Fit the pipeline model
pipelineModel = pipeline.fit(trainDF)


###### Step 6: Model Performance Evaluation

# Make predictions on testing dataset
predDF = pipelineModel.transform(testDF)

# Take a look at the output
display(predDF.select("features", "dependent_variable", "prediction"))

# Create regression evaluator
regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="dependent_variable", metricName="rmse")

# RMSE
rmse = regressionEvaluator.evaluate(predDF)
print(f"The RMSE for the linear regression model is {rmse:0.2f}")

# MSE
mse = regressionEvaluator.setMetricName("mse").evaluate(predDF)
print(f"The MSE for the linear regression model is {mse:0.2f}")

# R2
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"The R2 for the linear regression model is {r2:0.2f}")

# MAE
mae = regressionEvaluator.setMetricName("mae").evaluate(predDF)
print(f"The MAE for the linear regression model is {mae:0.2f}")

# Visualize the data
display(predDF.select("dependent_variable", "prediction"))


###### Step 7: Save Model

# Path to save the model
pipelinePath = '/mnt/demo4tutorial/model/linear_regression_pipeline_model'

# Save the model to the path
pipelineModel.write().overwrite().save(pipelinePath)

# Confirm the model is saved
%fs ls '/mnt/demo4tutorial/model/linear_regression_pipeline_model'


###### Step 8: Make Predictions For New Data

# Create a new synthetic dataset
X_new, y_new = make_regression(n_samples=1000, n_features=2, bias=2, noise=0.3, random_state=0)

# Convert the data from numpy array to a pandas dataframe
pdf_new = pd.DataFrame({'feature1': X_new[:, 0], 'feature2': X_new[:, 1], 'dependent_variable': y_new})

# Convert pandas dataframe to spark dataframe
sdf_new = spark.createDataFrame(pdf_new)

# Check data summary statistics
display(sdf_new.summary())

# Load the saved model
loadedPipelineModel = PipelineModel.load(pipelinePath)

# Make prediction for the new dataset
predDF_new = loadedPipelineModel.transform(sdf_new)

# Take a look at the data
display(predDF_new.select("features", "dependent_variable", "prediction"))

# Actual vs. predicted
display(predDF_new.select("dependent_variable", "prediction"))

Summary

In this tutorial, we covered how to use Databricks to implement the spark ML linear regression model. You have learned:

  • What’s the difference between Spark MLlib and Spark ML?
  • How to process the data in the right format?
  • How to fit a Spark ML linear regression model?
  • How to evaluate model performance?
  • How to save the model?
  • How to make predictions for new data?

For more information about data science and machine learning, please check out my YouTube channel and Medium Page or follow me on LinkedIn.

Recommended For You

References

2 thoughts on “Databricks Linear Regression With Spark ML”

Leave a Comment

Your email address will not be published. Required fields are marked *