Apache Spark has a library for different types of machine learning models. In this tutorial, we will talk about how to use Databricks to implement the spark ML linear regression model. We will cover:

- What’s the difference between Spark MLlib and Spark ML?
- How to process the data to the right format?
- How to fit a Spark ML linear regression model?
- How to evaluate model performance?
- How to save the model?
- How to make predictions for new data?

**Resources for this post:**

- Click here for Databricks Notebook
- More video tutorials on Databricks and PySpark
- More blog posts on Databricks and PySpark
- If you are interested in the video version of the tutorial, please check out the video on YouTube

### Step 0: Spark MLlib Vs. Spark ML

Firstly, let’s talk about the difference between spark MLlib and spark ML.

`spark.mllib`

is the legacy machine learning API built on RDDs. `spark.ml`

is the new machine learning API based on dataframe.

The name MLlib includes both the RDD-based API and the dataframe-based API. The RDD-based API is now in maintenance mode, so there will not be new features added to the RDD-based API.

In this tutorial, we will use the dataframe-based API, and I suggest you use it as well.

### Step 1: Import Libraries

In step 1, we will import the libraries. `pandas`

is for data processing. `make_regression`

is for creating synthetic modeling datasets.

From the pyspark.ml library, we imported `VectorAssembler`

for feature formatting, `LinearRegression`

for model training, `RegressionEvaluator`

for model evaluation, `Pipeline`

, and `PipelineModel`

for pipeline creation and loading.

# Data processing import pandas as pd # Create synthetic dataset from sklearn.datasets import make_regression # Modeling from pyspark.ml.feature import VectorAssembler from pyspark.ml.regression import LinearRegression from pyspark.ml.evaluation import RegressionEvaluator from pyspark.ml import Pipeline, PipelineModel

### Step 2: Create Dataset For Linear Regression

In step 2, we will create a synthetic dataset for the linear regression model.

Using `make_regression`

, a dataset with one million records is created. The dataset has two features, a bias of 2, one numeric dependence variable, and 30% noise. random_state ensures the randomly created dataset is reproducible. The random state does not have to be 42. It can be any number.

The output of `make_regression`

is in array format. We convert it into a pandas dataframe, then convert it into a spark dataframe.

`summary()`

gives us the summary statistics of the dataset.

# Create a synthetic dataset X, y = make_regression(n_samples=1000000, n_features=2, noise=0.3, bias=2, random_state=42) # Convert the data from numpy array to a pandas dataframe pdf = pd.DataFrame({'feature1': X[:, 0], 'feature2': X[:, 1], 'dependent_variable': y}) # Convert pandas dataframe to spark dataframe sdf = spark.createDataFrame(pdf) # Check data summary statistics display(sdf.summary())

### Step 3: Train Test Split

After creating the modeling dataset, in step 3, we will make the train test split.

Using `randomSplit`

, we split the dataset into 80% training and 20% validation. `seed=42`

makes the random split results reproducible. However, we need to make sure that the same cluster and partition number are used when reproducing the split.

# Train test split trainDF, testDF = sdf.randomSplit([.8, .2], seed=42) # Print the number of records print(f'There are {trainDF.cache().count()} records in the training dataset.') print(f'There are {testDF.cache().count()} records in the testing dataset.')

We got 800,299 in the training dataset and 199,701 in the testing dataset after the split.

There are 800299 records in the training dataset. There are 199701 records in the testing dataset.

### Step 4: Vector Assembler

Linear regression expects a vector input as features, so in step 4, we will use VectorAssembler to transform the features into a vector format.

# Linear regression expect a vector input vecAssembler = VectorAssembler(inputCols=['feature1', 'feature2'], outputCol="features") vecTrainDF = vecAssembler.transform(trainDF) # Take a look at the data display(vecTrainDF)

We can see that in the newly created column called “features”, the two features are listed in vector format.

### Step 5: Fit Spark ML Linear Regression Model

In step 5, we will fit the linear regression model.

Firstly, the feature column and the label column are specified for the linear regression.

Then, the linear regression model is fit on the vectorized training dataset.

After that, the model intercept and coefficients are printed.

# Create linear regression lr = LinearRegression(featuresCol="features", labelCol="dependent_variable") # Fit the linear regresssion model lrModel = lr.fit(vecTrainDF) # Print model intercept and coefficients print(f'The intercept of the model is {lrModel.intercept:.2f} and the coefficients of the model are {lrModel.coefficients[0]:.2f} and {lrModel.coefficients[1]:.2f}')

We can see that the model has an intercept of 2, and coefficients of 82.09 and 35.03 for the two features.

The intercept of the model is 2.00 and the coefficients of the model are 82.09 and 35.03

Alternatively, we can create a pipeline and fit the model on the pipeline. A pipeline usually includes both the data processing steps and the model fitting step.

# Create pipeline stages = [vecAssembler, lr] pipeline = Pipeline(stages=stages) # Fit the pipeline model pipelineModel = pipeline.fit(trainDF)

### Step 6: Model Performance Evaluation

In step 6, we will evaluate the model performance using the testing dataset.

In order to do model performance evaluation, we need to first make predictions on the testing dataset. Using the `pipelineModel`

created in the previous step, we can transform the testing dataset and make predictions with a single line of code.

# Make predictions on testing dataset predDF = pipelineModel.transform(testDF) # Take a look at the output display(predDF.select("features", "dependent_variable", "prediction"))

After having the predicted values, we pass the prediction column name and the actual value column name into `RegressionEvaluator`

.

`metricName`

can be one of the following values:

`rmse`

: Root mean squared error is the default value`mse`

: Mean squared error`r2`

: R square`mae`

: Mean absolute error

# Create regression evaluator regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="dependent_variable", metricName="rmse") # RMSE rmse = regressionEvaluator.evaluate(predDF) print(f"The RMSE for the linear regression model is {rmse:0.2f}") # MSE mse = regressionEvaluator.setMetricName("mse").evaluate(predDF) print(f"The MSE for the linear regression model is {mse:0.2f}") # R2 r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF) print(f"The R2 for the linear regression model is {r2:0.2f}") # MAE mae = regressionEvaluator.setMetricName("mae").evaluate(predDF) print(f"The MAE for the linear regression model is {mae:0.2f}")

The RMSE for the linear regression model is 0.30 The MSE for the linear regression model is 0.09 The R2 for the linear regression model is 1.00 The MAE for the linear regression model is 0.24

We can also create a scatter plot to visually check the relationship between the actual value and the predicted value. To learn how to use the built-in charts function in Databricks notebook, please check my previous tutorial on Databricks Dashboard For Big Data.

### Step 7: Save Model

In step 7, we will save the pipeline model to AWS S3 bucket. To learn how to mount S3 bucket to Databricks, please check my previous tutorial Databricks Mount To AWS S3 And Import Data.

# Path to save the model pipelinePath = '/mnt/demo4tutorial/model/linear_regression_pipeline_model' # Save the model to the path pipelineModel.write().overwrite().save(pipelinePath)

After saving the model to S3, we can confirm the model is in the bucket using the `%fs ls`

command.

# Confirm the model is saved %fs ls '/mnt/demo4tutorial/model/linear_regression_pipeline_model'

We can see that both the pipeline stages and the metadata are saved for the model.

### Step 8: Make Predictions For New Data

In step 8, we will go over how to make predictions for new data using the saved model.

We first create a new dataset with 1000 records. The number of features, the bias, and the noise is the same as the training dataset to ensure the new dataset follows the same distribution.

# Create a new synthetic dataset X_new, y_new = make_regression(n_samples=1000, n_features=2, bias=2, noise=0.3, random_state=0) # Convert the data from numpy array to a pandas dataframe pdf_new = pd.DataFrame({'feature1': X_new[:, 0], 'feature2': X_new[:, 1], 'dependent_variable': y_new}) # Convert pandas dataframe to spark dataframe sdf_new = spark.createDataFrame(pdf_new) # Check data summary statistics display(sdf_new.summary())

Then the pipeline model is loaded from the S3 bucket, and the prediction is made on the new dataset.

# Load the saved model loadedPipelineModel = PipelineModel.load(pipelinePath) # Make prediction for the new dataset predDF_new = loadedPipelineModel.transform(sdf_new) # Take a look at the data display(predDF_new.select("features", "dependent_variable", "prediction"))

### Step 9: Put All Code Together

###### Step 1: Import Libraries # Data processing import pandas as pd # Create synthetic dataset from sklearn.datasets import make_regression # Modeling from pyspark.ml.feature import VectorAssembler from pyspark.ml.regression import LinearRegression from pyspark.ml.evaluation import RegressionEvaluator from pyspark.ml import Pipeline, PipelineModel ###### Step 2: Create Dataset For Linear Regression # Create a synthetic dataset X, y = make_regression(n_samples=1000000, n_features=2, noise=0.3, bias=2, random_state=42) # Convert the data from numpy array to a pandas dataframe pdf = pd.DataFrame({'feature1': X[:, 0], 'feature2': X[:, 1], 'dependent_variable': y}) # Convert pandas dataframe to spark dataframe sdf = spark.createDataFrame(pdf) # Check data summary statistics display(sdf.summary()) ###### Step 3: Train Test Split # Train test split trainDF, testDF = sdf.randomSplit([.8, .2], seed=42) # Print the number of records print(f'There are {trainDF.cache().count()} records in the training dataset.') print(f'There are {testDF.cache().count()} records in the testing dataset.') ###### Step 4: Vector Assembler # Linear regression expect a vector input vecAssembler = VectorAssembler(inputCols=['feature1', 'feature2'], outputCol="features") vecTrainDF = vecAssembler.transform(trainDF) # Take a look at the data display(vecTrainDF) ###### Step 5: Fit Spark ML Linear Regression Model # Create linear regression lr = LinearRegression(featuresCol="features", labelCol="dependent_variable") # Fit the linear regresssion model lrModel = lr.fit(vecTrainDF) # Print model intercept and coefficients print(f'The intercept of the model is {lrModel.intercept:.2f} and the coefficients of the model are {lrModel.coefficients[0]:.2f} and {lrModel.coefficients[1]:.2f}') # Create pipeline stages = [vecAssembler, lr] pipeline = Pipeline(stages=stages) # Fit the pipeline model pipelineModel = pipeline.fit(trainDF) ###### Step 6: Model Performance Evaluation # Make predictions on testing dataset predDF = pipelineModel.transform(testDF) # Take a look at the output display(predDF.select("features", "dependent_variable", "prediction")) # Create regression evaluator regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="dependent_variable", metricName="rmse") # RMSE rmse = regressionEvaluator.evaluate(predDF) print(f"The RMSE for the linear regression model is {rmse:0.2f}") # MSE mse = regressionEvaluator.setMetricName("mse").evaluate(predDF) print(f"The MSE for the linear regression model is {mse:0.2f}") # R2 r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF) print(f"The R2 for the linear regression model is {r2:0.2f}") # MAE mae = regressionEvaluator.setMetricName("mae").evaluate(predDF) print(f"The MAE for the linear regression model is {mae:0.2f}") # Visualize the data display(predDF.select("dependent_variable", "prediction")) ###### Step 7: Save Model # Path to save the model pipelinePath = '/mnt/demo4tutorial/model/linear_regression_pipeline_model' # Save the model to the path pipelineModel.write().overwrite().save(pipelinePath) # Confirm the model is saved %fs ls '/mnt/demo4tutorial/model/linear_regression_pipeline_model' ###### Step 8: Make Predictions For New Data # Create a new synthetic dataset X_new, y_new = make_regression(n_samples=1000, n_features=2, bias=2, noise=0.3, random_state=0) # Convert the data from numpy array to a pandas dataframe pdf_new = pd.DataFrame({'feature1': X_new[:, 0], 'feature2': X_new[:, 1], 'dependent_variable': y_new}) # Convert pandas dataframe to spark dataframe sdf_new = spark.createDataFrame(pdf_new) # Check data summary statistics display(sdf_new.summary()) # Load the saved model loadedPipelineModel = PipelineModel.load(pipelinePath) # Make prediction for the new dataset predDF_new = loadedPipelineModel.transform(sdf_new) # Take a look at the data display(predDF_new.select("features", "dependent_variable", "prediction")) # Actual vs. predicted display(predDF_new.select("dependent_variable", "prediction"))

### Summary

In this tutorial, we covered how to use Databricks to implement the spark ML linear regression model. You have learned:

- What’s the difference between Spark MLlib and Spark ML?
- How to process the data in the right format?
- How to fit a Spark ML linear regression model?
- How to evaluate model performance?
- How to save the model?
- How to make predictions for new data?

For more information about data science and machine learning, please check out myÂ YouTube channelÂ andÂ Medium PageÂ or follow me onÂ LinkedIn.

### Recommended For You

- GrabNGoInfo Machine Learning Tutorials Inventory
- Databricks Mount To AWS S3 And Import Data
- Databricks Notebook Markdown Cheat Sheet
- Five Ways To Create Tables In Databricks
- Databricks Dashboard For Big Data

×§×ž×’×¨×”Everything is very open with a precise clarification of the challenges. It was really informative. Your website is useful. Thanks for sharing!

AmyThank you for your comments. I am glad that you like the tutorial.