3 Ways for Multiple Time Series Forecasting Using Prophet in Python

3 Ways for Multiple Time Series Forecasting Using Prophet in Python

Welcome to GrabNGoInfo! Multiple time series forecasting refers to training many time series models and making predictions. For example, if we would like to predict the sales quantity of 10 products in 5 stores, there will be 50 store-product combinations, and each combination is a time series. Using the multiple time series model, we can train and predict the 50 time series model at the same time. Another example is to predict multiple stock prices at the same time.

In this tutorial, we will predict the stock prices of five tech companies using Prophet. Three ways of running multiple time series forecasting will be demonstrated. You will learn:

  • How to run multiple time series forecasting using for loop?
  • How to set up multi-processing and utilize all the cores on a computer to run multiple time series models?
  • How to set up PySpark to run multiple time series forecasting in parallel?

If you are not familiar with Prophet, please check out my previous tutorial Time Series Forecasting Of Bitcoin Prices Using Prophet and Multivariate Time Series Forecasting with Seasonality and Holiday Effect Using Prophet in Python.

Resources for this post:

  • Python code is at the end of the post. . Click here for the Colab notebook
  • More video tutorials on time series
  • More blog posts on time series
  • If you prefer the video version of the tutorial, watch the video below on YouTube.
Multiple time series forecast using prophet – GrabNGoInfo.com

Let’s get started!

Step 1: Install and Import Libraries

In the first step, we will install and import libraries.

Three packages are installed:

  • yfinance is the python package for pulling stock data from Yahoo Finance.
  • prophet is the package for the time series model.
  • pyspark is for setting up the Spark environment.
# Install libraries
!pip install yfinance prophet pyspark

After installing the three python packages, we imported the libraries needed for this tutorial.

  • pandas and numpy are for data processing.
  • yfinance is for pulling stock price data from Yahoo Finance.
  • prophet is for building the time series model.
  • seaborn and matplotlib are for visualization.
  • Pool and cpu_count are for multi-processing.
  • pyspark.sql.typespandas_udf, and PandasUDFType are for Spark parallel processing.
  • tqdm is for generating a process bar to show the completed percentage.
  • time is for tracking the time used for modeling and prediction.
# Data processing
import pandas as pd
import numpy as np

# Get time series data
import yfinance as yf

# Prophet model for time series forecast
from prophet import Prophet

# Visualization
import seaborn as sns
import matplotlib.pyplot as plt

# Multi-processing
from multiprocessing import Pool, cpu_count

# Spark
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf, PandasUDFType

# Process bar
from tqdm import tqdm

# Tracking time
from time import time

Step 2: Pull Data

The second step pulls stock data from Yahoo Finance API. We will pull 2 years of daily data from the beginning of 2020 to the end of 2021.

  • start_date = '2020-01-02' means the earliest date for the stock data is January 2nd of 2020. It did not start with January 1st because January 1st is a holiday, and there is no stock data on holidays and weekends.
  • end_date = '2022-01-01' means that the last date for the stock data is December 31st of 2021. yfinance excludes the end date, so we need to add one day to the last day of the data end date.
# Data start date
start_date = '2020-01-02'

# Data end date
end_date = '2022-01-01' # yfinance excludes the end date, so we need to add one day to the last day of data

We will download the closing prices for five tickers. FB is for Facebook (Meta), GOOG is for Google, ORCL is for Oracle, MSFT is for Microsoft, and AMZN is for Amazon.

The goal of the time series model is to predict the closing price of all five stocks.

# Download data
ticker_list = ['FB', 'GOOG', 'ORCL', 'MSFT', 'AMZN']
data = yf.download(ticker_list, start=start_date, end=end_date)[['Close']]

# Drop the top level column name
data.columns = data.columns.droplevel()

# Take a look at the data
data.head()
Download stock data from yfinance – GrabNGoInfo.com

From the visualization of the stock prices, we can see that all five stocks increase in prices, and Amazon and Google have the highest prices.

# Visualize data using seaborn
sns.set(rc={'figure.figsize':(12,8)})
sns.lineplot(x=data.index, y=data['FB'])
sns.lineplot(x=data.index, y=data['AMZN'])
sns.lineplot(x=data.index, y=data['GOOG'])
sns.lineplot(x=data.index, y=data['MSFT'])
sns.lineplot(x=data.index, y=data['ORCL'])
plt.legend(labels = ['Facebook', 'Amazon', 'Google', 'Microsoft', 'Oracle'])
Visualize stock prices downloaded from yfinance – GrabNGoInfo.com

There are 505 data points for each ticker, and there are no missing values.

# Data information
data.info()

Output

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 505 entries, 2020-01-02 to 2021-12-31
Data columns (total 5 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   AMZN    505 non-null    float64
 1   FB      505 non-null    float64
 2   GOOG    505 non-null    float64
 3   MSFT    505 non-null    float64
 4   ORCL    505 non-null    float64
dtypes: float64(5)
memory usage: 23.7 KB

Step 3: Data Processing

Step 3 transforms the dataset into a multiple time series model dataset.

Firstly, the dataset is transformed from the wide format to the long format using the pandas .melt function.

Prophet requires at least two columns as inputs: a ds column and a y column.

  • The ds column has the time information. The column Date is renamed to ds.
  • The y column has the time series values. In this example, because we are predicting the closing stock price, y represents the stock close price.
  • There is no pre-defined name for the individual time series in prophet, so we can keep the name ticker as is.
# Release Date from the index
data = data.reset_index()

# Change data from the wide format to the long format
df = pd.melt(data, id_vars='Date', value_vars=['AMZN', 'FB', 'GOOG', 'MSFT', 'ORCL'])
df.columns = ['ds', 'ticker', 'y']
df.head()
Multiple time series model dataset – GrabNGoInfo.com

After transforming the dataset from the wide format to the long format, we have 2,525 records.

# Check the dataset information
df.info()

Output

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2525 entries, 0 to 2524
Data columns (total 3 columns):
 #   Column  Non-Null Count  Dtype         
---  ------  --------------  -----         
 0   ds      2525 non-null   datetime64[ns]
 1   ticker  2525 non-null   object        
 2   y       2525 non-null   float64       
dtypes: datetime64[ns](1), float64(1), object(1)
memory usage: 59.3+ KB

Next, we group the pandas dataframe by the column ticker and save it in a new dataframe called groups_by_ticker. Using .groups.keys(), we can confirm that there are five groups, one group for each ticker.

# Group the data by ticker
groups_by_ticker = df.groupby('ticker')

# Check the groups in the dataframe
groups_by_ticker.groups.keys()

Output

dict_keys(['AMZN', 'FB', 'GOOG', 'MSFT', 'ORCL'])

Step 4: Define Function

In step 4, the function for training and forecasting each group is defined.

  • The input data is an individual time series data for a group.
  • Prophet() initiates the time series model with the default hyperparameters, and we give the model the name m. I will create another tutorial for prophet time series model hyperparameter tuning. Please subscribe to the YouTube channel or Medium email so you will be notified when the video is published.
  • m.fit(group) fits the prophet model on the individual time series data, which is the stock price data for a ticker.
  • make_future_dataframe creates a new dataframe called future for the forecasting. periods=15 means that we will forecast for 15 days of data. To use a different frequency, we can specify the freq option. For example, periods=15, freq='MS' means that we are forecasting for the next 15 months.
  • After predicting on the future dataframe, prophet produces a long list of outputs. We only kept dsyhatyhat_lower and yhat_upperyhat is the predicted value. yhat_lower and yhat_upper are the lower and upper bound of the uncertainty interval.
  • A new column called ticker is created in the forecast dataframe to indicate the ticker name for the predictions.

The output of the function has 5 columns: dstickeryhatyhat_upper, and yhat_lower.

def train_and_forecast(group):

  # Initiate the model
  m = Prophet()
  
  # Fit the model
  m.fit(group)

  # Make predictions
  future = m.make_future_dataframe(periods=15)
  forecast = m.predict(future)[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]
  forecast['ticker'] = group['ticker'].iloc[0]
  
  # Return the forecasted results
  return forecast[['ds', 'ticker', 'yhat', 'yhat_upper', 'yhat_lower']]  

Step 5: Multiple Time Series Forecast Using For-Loop

In step 5, we will make multiple time series forecasting using for-loop.

  • The time used for the forecast is calculated by recording the time in the beginning and at the end, then getting the difference between the two.
  • An empty dataframe is created to save the forecast results.
  • For each ticker, we firstly get the time series data, then apply the function for training and forecasting to each individual time series, and finally concatenate the forecast results together.
# Start time
start_time = time()

# Create an empty dataframe
for_loop_forecast = pd.DataFrame()

# Loop through each ticker
for ticker in ticker_list:
  # Get the data for the ticker
  group = groups_by_ticker.get_group(ticker)  
  # Make forecast
  forecast = train_and_forecast(group)
  # Add the forecast results to the dataframe
  for_loop_forecast = pd.concat((for_loop_forecast, forecast))

print('The time used for the for-loop forecast is ', time()-start_time)

# Take a look at the data
for_loop_forecast.head()

It took 28 seconds to run the five time series models using a for-loop, and the output has five columns.

The time used for the for-loop forecast is  27.879573822021484
Multiple Time Series Forecast Using For-Loop – GrabNGoInfo.com

Step 6: Multiple Time Series Forecast Using Multi-Processing

In step 6, we will use the Python multiprocessing package to run the time series forecasts in parallel.

  • Firstly, the time series data for each ticker is saved in a list.
  • Secondly, a pool process with the number of workers being the number of CPUs. The pool object from the multiprocessing Python package executes a function across input data in parallel. cpu_count() returns the number of CPUs in the system.
  • imap is the parallel version of map. It returns an object and the results need to be converted to a list. We use imap to apply the train_and_forcast function to each element in the list called series, where each element is a dataframe for an individual ticker.
  • tqdm shows the progress bar of the training.
  • The prediction output is a list of forecast results, one dataframe for each ticker.
  • The pool process needs to be manually terminated using .close(). Failure to do this can lead to the process hanging on finalization.
  • .join() tells the pool to wait till all the jobs are finished before exiting.
  • Finally, the results for all the tickers are concatenated into one single dataframe.
# Start time
start_time = time()

# Get time series data for each ticker and save in a list
series = [groups_by_ticker.get_group(ticker) for ticker in ticker_list]

# Create a pool process with the number of worker processes being the number of CPUs
p = Pool(cpu_count())

# Make predictions for each ticker and save the results to a list
predictions = list(tqdm(p.imap(train_and_forecast, series), total=len(series)))

# Terminate the pool process
p.close()

# Tell the pool to wait till all the jobs are finished before exit
p.join()

# Concatenate results
multiprocess_forecast = pd.concat(predictions)

# Get the time used for the forecast
print('\nThe time used for the multi-processing forecast is ', time()-start_time)

This process takes around 10 seconds.

  0%|          | 0/5 [00:00<?, ?it/s]INFO:prophet:Disabling yearly seasonality. Run prophet with yearly_seasonality=True to override this.
INFO:prophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
INFO:prophet:Disabling yearly seasonality. Run prophet with yearly_seasonality=True to override this.
INFO:prophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
 20%|██        | 1/5 [00:03<00:15,  3.80s/it]INFO:prophet:Disabling yearly seasonality. Run prophet with yearly_seasonality=True to override this.
INFO:prophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
INFO:prophet:Disabling yearly seasonality. Run prophet with yearly_seasonality=True to override this.
INFO:prophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
INFO:prophet:Disabling yearly seasonality. Run prophet with yearly_seasonality=True to override this.
INFO:prophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
100%|██████████| 5/5 [00:09<00:00,  1.91s/it]
The time used for the multi-processing forecast is  9.68030071258545

Step 7: Multiple Time Series Forecast Using Spark

In step 7, we will use Spark to forecast multiple time series in parallel. The workers in a Spark cluster can train and forecast a subset of models in parallel.

Firstly, a Spark session called spark is created. We can type the Spark session name to check the information such as the Spark version. Version 3.2.1 is used in this example.

# Import SparkSession
from pyspark.sql import SparkSession
# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Check Spark Session Information
spark
Spark session information – GrabNGoInfo.com

Next, the pandas dataframe is converted to a Spark dataframe and grouped by ticker.

  • spark.createDataFrame takes a pandas dataframe and converts it into a Spark dataframe.
  • applyInPandas maps each group using a pandas UDF (User Defined Function) and returns a dataframe.
  • schema is a StructType describing the schema of the returned dataframe.
# Convert the pandas dataframe into a spark dataframe
sdf = spark.createDataFrame(df)

# Define the restult schema
result_schema =StructType([
  StructField('ds',DateType()),
  StructField('ticker',StringType()),
  StructField('yhat',FloatType()),
  StructField('yhat_upper',FloatType()),
  StructField('yhat_lower',FloatType())
  ])

# Start time
start_time = time()

# Train and forecast by ticker 
spark_forecast = sdf.groupBy('ticker').applyInPandas(train_and_forecast, schema=result_schema)

# Take a look at the results
spark_forecast.show(5)

# Processing time
print('The time used for the Spark forecast is ', time()-start_time)

Spark used 12 seconds for the forecast.

+----------+------+---------+----------+----------+
|        ds|ticker|     yhat|yhat_upper|yhat_lower|
+----------+------+---------+----------+----------+
|2020-01-02|  AMZN|1874.2633| 2038.9186| 1731.7717|
|2020-01-03|  AMZN|1867.2312| 2000.8362| 1707.6724|
|2020-01-06|  AMZN|1868.9219| 2024.8553| 1721.1112|
|2020-01-07|  AMZN|1881.8456| 2028.5168| 1736.3002|
|2020-01-08|  AMZN|1886.3406| 2047.3295| 1743.9379|
+----------+------+---------+----------+----------+
only showing top 5 rows

The time used for the Spark forecast is  11.66998553276062

Another way of doing multiple time series forecasting is to use pandas_udf as a decorator, and apply the function to the grouped Spark dataframe. However, It is preferred to use applyInPandas over this API because it will be deprecated in future releases.

Nevertheless, I provide the code for using the pandas_udf decorator for your reference.

@pandas_udf( result_schema, PandasUDFType.GROUPED_MAP )
def train_and_forecast(group):

  # Initiate the model
  m = Prophet()
  
  # Fit the model
  m.fit(group)

  # Make predictions
  future = m.make_future_dataframe(periods=15)
  forecast = m.predict(future)[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]
  forecast['ticker'] = group['ticker'].iloc[0]
  
  # Return the forecasted results
  return forecast[['ds', 'ticker', 'yhat', 'yhat_upper', 'yhat_lower']]  

# Start time
start_time = time()

# Train and forecast by ticker 
spark_forecast = sdf.groupBy('ticker').apply(train_and_forecast)

# Take a look at the results
spark_forecast.show(5)

# Processing time
print('The time used for the Spark forecast is ', time()-start_time)

Output

/usr/local/lib/python3.7/dist-packages/pyspark/sql/pandas/group_ops.py:84: UserWarning: It is preferred to use 'applyInPandas' over this API. This API will be deprecated in the future releases. See SPARK-28264 for more details.
  "more details.", UserWarning)
+----------+------+---------+----------+----------+
|        ds|ticker|     yhat|yhat_upper|yhat_lower|
+----------+------+---------+----------+----------+
|2020-01-02|  AMZN|1874.2633| 2038.9186| 1731.7717|
|2020-01-03|  AMZN|1867.2312| 2000.8362| 1707.6724|
|2020-01-06|  AMZN|1868.9219| 2024.8553| 1721.1112|
|2020-01-07|  AMZN|1881.8456| 2028.5168| 1736.3002|
|2020-01-08|  AMZN|1886.3406| 2047.3295| 1743.9379|
+----------+------+---------+----------+----------+
only showing top 5 rows

The time used for the Spark forecast is  11.952160358428955

Step 8: Which Method to use?

Now you know how to make multiple time series forecasting using a for-loop, using multi-processing, and using Spark, which method should you use for your project?

The general guideline is:

  • When the number of models is small, there is not a big difference in processing time, so any one of the three methods is good to use.
  • When the number of models is medium, use multi-processing or Spark to utilize multiple CPUs in parallel.
  • When the number of models is large, Spark is preferred.

For more information about data science and machine learning, please check out my YouTube channel and Medium Page or follow me on LinkedIn.

Put All Code Together

#-------------------------------------------------------------#
# Step 1: Install and Import Libraries
#-------------------------------------------------------------#

# Install libraries
!pip install yfinance prophet pyspark

# Data processing
import pandas as pd
import numpy as np

# Get time series data
import yfinance as yf

# Prophet model for time series forecast
from prophet import Prophet

# Visualization
import seaborn as sns
import matplotlib.pyplot as plt

# Multi-processing
from multiprocessing import Pool, cpu_count

# Spark
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf, PandasUDFType

# Process bar
from tqdm import tqdm

# Tracking time
from time import time

#-------------------------------------------------------------#
# Step 2: Pull Data
#-------------------------------------------------------------#

# Data start date
start_date = '2020-01-02'

# Data end date
end_date = '2022-01-01' # yfinance excludes the end date, so we need to add one day to the last day of data

# Download data
ticker_list = ['FB', 'GOOG', 'ORCL', 'MSFT', 'AMZN']
data = yf.download(ticker_list, start=start_date, end=end_date)[['Close']]

# Drop the top level column name
data.columns = data.columns.droplevel()

# Take a look at the data
data.head()

# Visualize data using seaborn
sns.set(rc={'figure.figsize':(12,8)})
sns.lineplot(x=data.index, y=data['FB'])
sns.lineplot(x=data.index, y=data['AMZN'])
sns.lineplot(x=data.index, y=data['GOOG'])
sns.lineplot(x=data.index, y=data['MSFT'])
sns.lineplot(x=data.index, y=data['ORCL'])
plt.legend(labels = ['Facebook', 'Amazon', 'Google', 'Microsoft', 'Oracle'])

# Data information
data.info()

#-------------------------------------------------------------#
# Step 3: Data Processing
#-------------------------------------------------------------#

# Release Date from the index
data = data.reset_index()

# Change data from the wide format to the long format
df = pd.melt(data, id_vars='Date', value_vars=['AMZN', 'FB', 'GOOG', 'MSFT', 'ORCL'])
df.columns = ['ds', 'ticker', 'y']
df.head()

# Check the dataset information
df.info()

# Group the data by ticker
groups_by_ticker = df.groupby('ticker')

# Check the groups in the dataframe
groups_by_ticker.groups.keys()

#-------------------------------------------------------------#
# Step 4: Define Function
#-------------------------------------------------------------#

def train_and_forecast(group):

  # Initiate the model
  m = Prophet()
  
  # Fit the model
  m.fit(group)

  # Make predictions
  future = m.make_future_dataframe(periods=15)
  forecast = m.predict(future)[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]
  forecast['ticker'] = group['ticker'].iloc[0]
  
  # Return the forecasted results
  return forecast[['ds', 'ticker', 'yhat', 'yhat_upper', 'yhat_lower']]  

#-------------------------------------------------------------#
# Step 5: Multiple Time Series Forecast Using For-Loop
#-------------------------------------------------------------#

# Start time
start_time = time()

# Create an empty dataframe
for_loop_forecast = pd.DataFrame()

# Loop through each ticker
for ticker in ticker_list:
  # Get the data for the ticker
  group = groups_by_ticker.get_group(ticker)  
  # Make forecast
  forecast = train_and_forecast(group)
  # Add the forecast results to the dataframe
  for_loop_forecast = pd.concat((for_loop_forecast, forecast))

# Print processing time
print('The time used for the for-loop forecast is ', time()-start_time)

# Take a look at the data
for_loop_forecast.head()

#-------------------------------------------------------------#
# Step 6: Multiple Time Series Forecast Using Multi-Processing
#-------------------------------------------------------------#

# Start time
start_time = time()

# Get time series data for each ticker and save in a list
series = [groups_by_ticker.get_group(ticker) for ticker in ticker_list]

# Create a pool process with the number of worker processes being the number of CPUs
p = Pool(cpu_count())

# Make predictions for each ticker and save the results to a list
predictions = list(tqdm(p.imap(train_and_forecast, series), total=len(series)))

# Terminate the pool process
p.close()

# Tell the pool to wait till all the jobs are finished before exit
p.join()

# Concatenate results
multiprocess_forecast = pd.concat(predictions)

# Get the time used for the forecast
print('\nThe time used for the multi-processing forecast is ', time()-start_time)

#-------------------------------------------------------------#
# Step 7: Multiple Time Series Forecast Using Spark
#-------------------------------------------------------------#

# Import SparkSession
from pyspark.sql import SparkSession
# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Check Spark Session Information
spark

# Convert the pandas dataframe into a spark dataframe
sdf = spark.createDataFrame(df)

# Define the restult schema
result_schema =StructType([
  StructField('ds',DateType()),
  StructField('ticker',StringType()),
  StructField('yhat',FloatType()),
  StructField('yhat_upper',FloatType()),
  StructField('yhat_lower',FloatType())
  ])

# Start time
start_time = time()

# Train and forecast by ticker 
spark_forecast = sdf.groupBy('ticker').applyInPandas(train_and_forecast, schema=result_schema)

# Take a look at the results
spark_forecast.show(5)

# Processing time
print('The time used for the Spark forecast is ', time()-start_time)


###### Using Pandas UDF - It is preferred to use applyInPandas over this API because it will be deprecated in the future releases
@pandas_udf( result_schema, PandasUDFType.GROUPED_MAP )
def train_and_forecast(group):

  # Initiate the model
  m = Prophet()
  
  # Fit the model
  m.fit(group)

  # Make predictions
  future = m.make_future_dataframe(periods=15)
  forecast = m.predict(future)[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]
  forecast['ticker'] = group['ticker'].iloc[0]
  
  # Return the forecasted results
  return forecast[['ds', 'ticker', 'yhat', 'yhat_upper', 'yhat_lower']]  

# Start time
start_time = time()

# Train and forecast by ticker 
spark_forecast = sdf.groupBy('ticker').apply(train_and_forecast)

# Take a look at the results
spark_forecast.show(5)

# Processing time
print('The time used for the Spark forecast is ', time()-start_time)

Recommended Tutorials

References

3 thoughts on “3 Ways for Multiple Time Series Forecasting Using Prophet in Python”

  1. Hallo, thank you for your article.
    Can you, please, tell which spark version is used?

    I am getting always such error: Py4JJavaError: An error occurred while calling o2752.showString.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 762.0 failed 1 times, most recent failure: Lost task 0.0 in stage 762.0 (TID 747) (host.docker.internal executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
    at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:189)
    at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
    at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
    at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:164)
    at org.apac

  2. Hello, thank you s ok much for sharing this. It has helped me a lot. However, I am unable to display my result with show() command. It just keeps crashing with pyspark. Can you please help??

Leave a Comment

Your email address will not be published. Required fields are marked *