Introduction

  • Rossmann operates over 3,000 drug stores in 7 European countries. Currently, Rossmann store managers are tasked with predicting their daily sales for up to six weeks in advance. Store sales are influenced by many factors, including promotions, competition, school and state holidays, seasonality, and locality. With thousands of individual managers predicting sales based on their unique circumstances, the accuracy of results can be quite varied.
  • This dataset is taken from a kaggle competition and is available herem
  • The dataset contains historical sales data for 1,115 Rossmann stores.

    • train.csv - historical data including Sales
    • store.csv - supplemental information about the stores
  • This tutorial will build a model predict sales in various stores and use spark framework to explore and build predictive model. This tutorial will use the datasets created from previous exploratory tutorial, which has engineered some variables and created more variables.

Loading dataset

In [2]:
sc
Out[2]:
<pyspark.context.SparkContext at 0x7fd208397278>
In [3]:
sqlContext
Out[3]:
<pyspark.sql.context.HiveContext at 0x7fd1ed449d30>
In [4]:
retail_df = sqlContext.read.parquet( "/user/hadoop/retail.parquet" )
In [5]:
retail_df.show( 5 )
+-----+---------+----------+-----+---------+----+-----+------------+-------------+-----+----+
|Store|DayOfWeek|      Date|Sales|Customers|Open|Promo|StateHoliday|SchoolHoliday|month|year|
+-----+---------+----------+-----+---------+----+-----+------------+-------------+-----+----+
|    1|        5|2015-07-31| 5263|      555|   1|    1|           0|            1|    7|2015|
|    2|        5|2015-07-31| 6064|      625|   1|    1|           0|            1|    7|2015|
|    3|        5|2015-07-31| 8314|      821|   1|    1|           0|            1|    7|2015|
|    4|        5|2015-07-31|13995|     1498|   1|    1|           0|            1|    7|2015|
|    5|        5|2015-07-31| 4822|      559|   1|    1|           0|            1|    7|2015|
+-----+---------+----------+-----+---------+----+-----+------------+-------------+-----+----+
only showing top 5 rows

In [6]:
retail_df.printSchema()
root
|-- Store: integer (nullable = true)
|-- DayOfWeek: integer (nullable = true)
|-- Date: date (nullable = true)
|-- Sales: integer (nullable = true)
|-- Customers: integer (nullable = true)
|-- Open: integer (nullable = true)
|-- Promo: integer (nullable = true)
|-- StateHoliday: string (nullable = true)
|-- SchoolHoliday: string (nullable = true)
|-- month: integer (nullable = true)
|-- year: integer (nullable = true)

In [7]:
stores_df = sqlContext.read.parquet( "/user/hadoop/stores_limited.parquet" )
In [8]:
stores_df.show( 5 )
+-----+---------+----------+-------------------+-----------+
|Store|StoreType|Assortment|CompetitionDistance|comp_months|
+-----+---------+----------+-------------------+-----------+
|    1|        c|         a|               1270|       87.0|
|    2|        a|         a|                570|       97.0|
|    3|        a|         a|              14130|      108.0|
|    4|        c|         c|                620|       75.0|
|    5|        a|         a|              29910|        8.0|
+-----+---------+----------+-------------------+-----------+
only showing top 5 rows

In [9]:
stores_df.printSchema()
root
|-- Store: string (nullable = true)
|-- StoreType: string (nullable = true)
|-- Assortment: string (nullable = true)
|-- CompetitionDistance: string (nullable = true)
|-- comp_months: float (nullable = true)

  • CompetitionDistance seems to be a string types. It need to be converted into a numerical variable.

Convert CompetitionDistance into float type

In [10]:
stores_df = stores_df.withColumn( "CompetitionDistance",
                   stores_df["CompetitionDistance"].cast( 'float' ) )
In [11]:
stores_df.printSchema()
root
|-- Store: string (nullable = true)
|-- StoreType: string (nullable = true)
|-- Assortment: string (nullable = true)
|-- CompetitionDistance: float (nullable = true)
|-- comp_months: float (nullable = true)

In [12]:
retail_df.columns
Out[12]:
['Store',
'DayOfWeek',
'Date',
'Sales',
'Customers',
'Open',
'Promo',
'StateHoliday',
'SchoolHoliday',
'month',
'year']

Selecting columns to predict store sales

  • To predict sales of stores we can use the following features to make prediction. Customers, Open and Date columns will not be used. Customers column data will not be availble while making predictions and Date may create overfitting problems. Open will be always 1 for the day for which we will make predictions in future. For Open = 0, we know the sales will be 0, as the store is closed.
In [22]:
store_sales_df = retail_df.select('Store',
'DayOfWeek',
'Promo',
'StateHoliday',
'SchoolHoliday',
'month',
'year',
'Sales',
)
  • We can add more featurs about the stores from stores dataframe. For example, store type, assortment type, competition distances and how long the competitions have been there etc.

Join store sales and stores dataframe

  • Adding store variables to the sales data.
In [23]:
store_sales_df = store_sales_df.join( stores_df, on = "Store", how = "inner" )
In [24]:
store_sales_df.cache()
Out[24]:
DataFrame[Store: int, DayOfWeek: int, Promo: int, StateHoliday: string, SchoolHoliday: string, month: int, year: int, Sales: int, StoreType: string, Assortment: string, CompetitionDistance: float, comp_months: float]
In [25]:
store_sales_df.show(5)
+-----+---------+-----+------------+-------------+-----+----+-----+---------+----------+-------------------+-----------+
|Store|DayOfWeek|Promo|StateHoliday|SchoolHoliday|month|year|Sales|StoreType|Assortment|CompetitionDistance|comp_months|
+-----+---------+-----+------------+-------------+-----+----+-----+---------+----------+-------------------+-----------+
|    1|        5|    1|           0|            1|    7|2015| 5263|        c|         a|             1270.0|       87.0|
|    2|        5|    1|           0|            1|    7|2015| 6064|        a|         a|              570.0|       97.0|
|    3|        5|    1|           0|            1|    7|2015| 8314|        a|         a|            14130.0|      108.0|
|    4|        5|    1|           0|            1|    7|2015|13995|        c|         c|              620.0|       75.0|
|    5|        5|    1|           0|            1|    7|2015| 4822|        a|         a|            29910.0|        8.0|
+-----+---------+-----+------------+-------------+-----+----+-----+---------+----------+-------------------+-----------+
only showing top 5 rows

In [26]:
store_sales_df.columns
Out[26]:
['Store',
'DayOfWeek',
'Promo',
'StateHoliday',
'SchoolHoliday',
'month',
'year',
'Sales',
'StoreType',
'Assortment',
'CompetitionDistance',
'comp_months']

Remove rows with 0 observations

  • There are days with zero sales. These are the days when the stores are closed. So, we need not take these data for model building. So, we can remove all entries with zero observations.
In [29]:
store_sales_df = store_sales_df.where( store_sales_df.Sales > 0 )

Fill all null values with zero.

In [31]:
store_sales_df.fillna( 0 )
Out[31]:
DataFrame[Store: int, DayOfWeek: int, Promo: int, StateHoliday: string, SchoolHoliday: string, month: int, year: int, Sales: int, StoreType: string, Assortment: string, CompetitionDistance: double, comp_months: double]
In [32]:
store_sales_df = store_sales_df.na.drop( how = 'any' )

Which are categorical and which are continuous features?

In [36]:
cat_features = ['DayOfWeek',
              'Promo',
              'StateHoliday',
              'SchoolHoliday',
              'month',
              'year',
              'StoreType',
              'Assortment']
In [37]:
continuous_features = ['CompetitionDistance', 'comp_months']

Function to encode categorical features

In [38]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, PolynomialExpansion, VectorIndexer
In [39]:
def create_category_vars( dataset, field_name ):
  idx_col = field_name + "Index"
  col_vec = field_name + "Vec"

  month_stringIndexer = StringIndexer( inputCol=field_name,
                                       outputCol=idx_col )

  month_model = month_stringIndexer.fit( dataset )
  month_indexed = month_model.transform( dataset )

  month_encoder = OneHotEncoder( dropLast=True,
                                 inputCol=idx_col,
                                 outputCol= col_vec )

  return month_encoder.transform( month_indexed )
In [40]:
for col in cat_features:
store_sales_df = create_category_vars( store_sales_df, col )
store_sales_df.cache()
Out[40]:
DataFrame[Store: int, DayOfWeek: int, Promo: int, StateHoliday: string, SchoolHoliday: string, month: int, year: int, Sales: int, StoreType: string, Assortment: string, CompetitionDistance: float, comp_months: float, DayOfWeekIndex: double, DayOfWeekVec: vector, PromoIndex: double, PromoVec: vector, StateHolidayIndex: double, StateHolidayVec: vector, SchoolHolidayIndex: double, SchoolHolidayVec: vector, monthIndex: double, monthVec: vector, yearIndex: double, yearVec: vector, StoreTypeIndex: double, StoreTypeVec: vector, AssortmentIndex: double, AssortmentVec: vector]
In [41]:
store_sales_df.columns
Out[41]:
['Store',
'DayOfWeek',
'Promo',
'StateHoliday',
'SchoolHoliday',
'month',
'year',
'Sales',
'StoreType',
'Assortment',
'CompetitionDistance',
'comp_months',
'DayOfWeekIndex',
'DayOfWeekVec',
'PromoIndex',
'PromoVec',
'StateHolidayIndex',
'StateHolidayVec',
'SchoolHolidayIndex',
'SchoolHolidayVec',
'monthIndex',
'monthVec',
'yearIndex',
'yearVec',
'StoreTypeIndex',
'StoreTypeVec',
'AssortmentIndex',
'AssortmentVec']
In [42]:
store_sales_df.show(5)
+-----+---------+-----+------------+-------------+-----+----+-----+---------+----------+-------------------+-----------+--------------+-------------+----------+---------+-----------------+---------------+------------------+----------------+----------+--------------+---------+---------+--------------+-------------+---------------+-------------+
|Store|DayOfWeek|Promo|StateHoliday|SchoolHoliday|month|year|Sales|StoreType|Assortment|CompetitionDistance|comp_months|DayOfWeekIndex| DayOfWeekVec|PromoIndex| PromoVec|StateHolidayIndex|StateHolidayVec|SchoolHolidayIndex|SchoolHolidayVec|monthIndex|      monthVec|yearIndex|  yearVec|StoreTypeIndex| StoreTypeVec|AssortmentIndex|AssortmentVec|
+-----+---------+-----+------------+-------------+-----+----+-----+---------+----------+-------------------+-----------+--------------+-------------+----------+---------+-----------------+---------------+------------------+----------------+----------+--------------+---------+---------+--------------+-------------+---------------+-------------+
|    1|        5|    1|           0|            1|    7|2015| 5263|        c|         a|             1270.0|       87.0|           3.0|(6,[3],[1.0])|       1.0|(1,[],[])|              0.0|  (3,[0],[1.0])|               1.0|       (1,[],[])|       2.0|(11,[2],[1.0])|      2.0|(2,[],[])|           2.0|(3,[2],[1.0])|            0.0|(2,[0],[1.0])|
|    2|        5|    1|           0|            1|    7|2015| 6064|        a|         a|              570.0|       97.0|           3.0|(6,[3],[1.0])|       1.0|(1,[],[])|              0.0|  (3,[0],[1.0])|               1.0|       (1,[],[])|       2.0|(11,[2],[1.0])|      2.0|(2,[],[])|           0.0|(3,[0],[1.0])|            0.0|(2,[0],[1.0])|
|    3|        5|    1|           0|            1|    7|2015| 8314|        a|         a|            14130.0|      108.0|           3.0|(6,[3],[1.0])|       1.0|(1,[],[])|              0.0|  (3,[0],[1.0])|               1.0|       (1,[],[])|       2.0|(11,[2],[1.0])|      2.0|(2,[],[])|           0.0|(3,[0],[1.0])|            0.0|(2,[0],[1.0])|
|    4|        5|    1|           0|            1|    7|2015|13995|        c|         c|              620.0|       75.0|           3.0|(6,[3],[1.0])|       1.0|(1,[],[])|              0.0|  (3,[0],[1.0])|               1.0|       (1,[],[])|       2.0|(11,[2],[1.0])|      2.0|(2,[],[])|           2.0|(3,[2],[1.0])|            1.0|(2,[1],[1.0])|
|    5|        5|    1|           0|            1|    7|2015| 4822|        a|         a|            29910.0|        8.0|           3.0|(6,[3],[1.0])|       1.0|(1,[],[])|              0.0|  (3,[0],[1.0])|               1.0|       (1,[],[])|       2.0|(11,[2],[1.0])|      2.0|(2,[],[])|           0.0|(3,[0],[1.0])|            0.0|(2,[0],[1.0])|
+-----+---------+-----+------------+-------------+-----+----+-----+---------+----------+-------------------+-----------+--------------+-------------+----------+---------+-----------------+---------------+------------------+----------------+----------+--------------+---------+---------+--------------+-------------+---------------+-------------+
only showing top 5 rows

In [43]:
categorical_vecs = [ "".join( (cat, "Vec") ) for cat in cat_features ]
In [44]:
categorical_vecs
Out[44]:
['DayOfWeekVec',
'PromoVec',
'StateHolidayVec',
'SchoolHolidayVec',
'monthVec',
'yearVec',
'StoreTypeVec',
'AssortmentVec']

Including all features for model building

In [45]:
all_features = continuous_features + categorical_vecs
In [46]:
all_features
Out[46]:
['CompetitionDistance',
'comp_months',
'DayOfWeekVec',
'PromoVec',
'StateHolidayVec',
'SchoolHolidayVec',
'monthVec',
'yearVec',
'StoreTypeVec',
'AssortmentVec']

Creating the vector of all predictors

In [47]:
assembler = VectorAssembler( inputCols = all_features, outputCol = "features")
In [48]:
store_sales_df = assembler.transform( store_sales_df )

Setting the target variables - Sales

In [49]:
store_sales_df = store_sales_df.withColumn( "label", store_sales_df.Sales.cast( 'double' ) )
In [50]:
store_sales_df.select( "features", "label" ).show( 5 )
+--------------------+-------+
|            features|  label|
+--------------------+-------+
|(31,[0,1,5,9,15,2...| 5263.0|
|(31,[0,1,5,9,15,2...| 6064.0|
|(31,[0,1,5,9,15,2...| 8314.0|
|(31,[0,1,5,9,15,2...|13995.0|
|(31,[0,1,5,9,15,2...| 4822.0|
+--------------------+-------+
only showing top 5 rows

Split the dataset

In [51]:
seed = 42
In [52]:
train_df, test_df = store_sales_df.randomSplit( [0.7, 0.3], seed = seed )

Train Linear Regression Model

In [53]:
from pyspark.ml.regression import LinearRegression
In [54]:
linreg = LinearRegression(maxIter=500, regParam=0.0)
lm = linreg.fit( train_df )

Intercept and coefficients

In [55]:
lm.intercept
Out[55]:
12128.79074862793
In [56]:
lm.coefficients
Out[56]:
DenseVector([-0.0161, 0.3228, -204.559, -308.0678, -650.7845, -246.0352, 767.0461, -605.8875, -2295.5861, 293.697, 695.7533, 400.6263, -308.215, -2101.7849, -1804.6742, -1909.1852, -1659.867, -1696.8338, -2020.3086, -1578.9687, -2093.8889, -2004.1266, -2064.3072, -1488.5126, -337.2648, -160.5574, -5351.6756, -5652.3367, -5476.1958, 3268.9422, 4119.6292])

Making predictions on test data

In [57]:
y_pred = lm.transform( test_df )
/home/hadoop/lab/software/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/regression.py:123: UserWarning: weights is deprecated. Use coefficients instead.
warnings.warn("weights is deprecated. Use coefficients instead.")
In [58]:
y_pred.select( 'features', 'label', 'prediction' ).show( 5 )
+--------------------+-------+-----------------+
|            features|  label|       prediction|
+--------------------+-------+-----------------+
|(31,[0,1,5,9,15,2...| 6064.0|8206.675620804475|
|(31,[0,1,5,9,15,2...| 8314.0|7992.187761767058|
|(31,[0,1,5,9,15,2...|13995.0|8924.937939253934|
|(31,[0,1,5,9,15,2...| 6544.0|8170.408412749184|
|(31,[0,1,5,9,15,2...| 8430.0| 8222.46026567327|
+--------------------+-------+-----------------+
only showing top 5 rows

Calculating RMSE and R-Squared

In [59]:
from pyspark.ml.evaluation import RegressionEvaluator
In [60]:
rmse_evaluator = RegressionEvaluator(labelCol="label",
                            predictionCol="prediction",
                            metricName="rmse" )
In [61]:
lm_rmse = rmse_evaluator.evaluate( y_pred )
lm_rmse
Out[61]:
2718.9541694123404
In [62]:
r2_evaluator = RegressionEvaluator(labelCol="label",
                            predictionCol="prediction",
                            metricName="r2" )
lm_r2 = r2_evaluator.evaluate( y_pred )
lm_r2
Out[62]:
0.2302915072031575

Note:

  • Accuracy of the model is very poor. This may be because we do not have right or enough variables which can predict the sales better. We can do feature engineering to create more features.

Feature Engineering

  • The sales might be influenced by if the next day is a holiday.
  • What is the sales for the last 2 and 5 days?

Is the next day a state holiday?

In [66]:
retail_df.show(5)
+-----+---------+----------+-----+---------+----+-----+------------+-------------+-----+----+
|Store|DayOfWeek|      Date|Sales|Customers|Open|Promo|StateHoliday|SchoolHoliday|month|year|
+-----+---------+----------+-----+---------+----+-----+------------+-------------+-----+----+
|    1|        5|2015-07-31| 5263|      555|   1|    1|           0|            1|    7|2015|
|    2|        5|2015-07-31| 6064|      625|   1|    1|           0|            1|    7|2015|
|    3|        5|2015-07-31| 8314|      821|   1|    1|           0|            1|    7|2015|
|    4|        5|2015-07-31|13995|     1498|   1|    1|           0|            1|    7|2015|
|    5|        5|2015-07-31| 4822|      559|   1|    1|           0|            1|    7|2015|
+-----+---------+----------+-----+---------+----+-----+------------+-------------+-----+----+
only showing top 5 rows

In [67]:
from pyspark.sql.functions import lag, col, lead
from pyspark.sql.window import Window

w = Window().partitionBy("Store").orderBy(col("Date"))
retail_new_df = retail_df.select("*", lead("StateHoliday").over(w).alias("next_day_state_holiday"))
retail_new_df.cache()
retail_new_df.show(5)
+-----+---------+----------+-----+---------+----+-----+------------+-------------+-----+----+----------------------+
|Store|DayOfWeek|      Date|Sales|Customers|Open|Promo|StateHoliday|SchoolHoliday|month|year|next_day_state_holiday|
+-----+---------+----------+-----+---------+----+-----+------------+-------------+-----+----+----------------------+
|   31|        2|2013-01-01|    0|        0|   0|    0|           a|            1|    1|2013|                     0|
|   31|        3|2013-01-02| 5122|      567|   1|    0|           0|            1|    1|2013|                     0|
|   31|        4|2013-01-03| 5623|      578|   1|    0|           0|            1|    1|2013|                     0|
|   31|        5|2013-01-04| 6140|      633|   1|    0|           0|            1|    1|2013|                     0|
|   31|        6|2013-01-05| 5607|      624|   1|    0|           0|            0|    1|2013|                     0|
+-----+---------+----------+-----+---------+----+-----+------------+-------------+-----+----+----------------------+
only showing top 5 rows

Is the next day a school holiday?

In [68]:
w = Window().partitionBy("Store").orderBy(col("Date"))
retail_new_df = retail_new_df.select("*", lead("SchoolHoliday").over(w).alias("next_day_school_holiday"))
retail_new_df.cache()
retail_new_df.show(5)
+-----+---------+----------+-----+---------+----+-----+------------+-------------+-----+----+----------------------+-----------------------+
|Store|DayOfWeek|      Date|Sales|Customers|Open|Promo|StateHoliday|SchoolHoliday|month|year|next_day_state_holiday|next_day_school_holiday|
+-----+---------+----------+-----+---------+----+-----+------------+-------------+-----+----+----------------------+-----------------------+
|   31|        2|2013-01-01|    0|        0|   0|    0|           a|            1|    1|2013|                     0|                      1|
|   31|        3|2013-01-02| 5122|      567|   1|    0|           0|            1|    1|2013|                     0|                      1|
|   31|        4|2013-01-03| 5623|      578|   1|    0|           0|            1|    1|2013|                     0|                      1|
|   31|        5|2013-01-04| 6140|      633|   1|    0|           0|            1|    1|2013|                     0|                      0|
|   31|        6|2013-01-05| 5607|      624|   1|    0|           0|            0|    1|2013|                     0|                      0|
+-----+---------+----------+-----+---------+----+-----+------------+-------------+-----+----+----------------------+-----------------------+
only showing top 5 rows

mean sales for the last 2 days

In [69]:
w = Window().partitionBy("Store").orderBy("Date")

retail_new_df = retail_new_df.withColumn("avg_sales_last_2days",
                                       ( lag("Sales", 1).over(w) + lag("Sales", 2).over(w) ) / 2 )
In [70]:
retail_new_df.show(5)
+-----+---------+----------+-----+---------+----+-----+------------+-------------+-----+----+----------------------+-----------------------+--------------------+
|Store|DayOfWeek|      Date|Sales|Customers|Open|Promo|StateHoliday|SchoolHoliday|month|year|next_day_state_holiday|next_day_school_holiday|avg_sales_last_2days|
+-----+---------+----------+-----+---------+----+-----+------------+-------------+-----+----+----------------------+-----------------------+--------------------+
|   31|        2|2013-01-01|    0|        0|   0|    0|           a|            1|    1|2013|                     0|                      1|                null|
|   31|        3|2013-01-02| 5122|      567|   1|    0|           0|            1|    1|2013|                     0|                      1|                null|
|   31|        4|2013-01-03| 5623|      578|   1|    0|           0|            1|    1|2013|                     0|                      1|              2561.0|
|   31|        5|2013-01-04| 6140|      633|   1|    0|           0|            1|    1|2013|                     0|                      0|              5372.5|
|   31|        6|2013-01-05| 5607|      624|   1|    0|           0|            0|    1|2013|                     0|                      0|              5881.5|
+-----+---------+----------+-----+---------+----+-----+------------+-------------+-----+----+----------------------+-----------------------+--------------------+
only showing top 5 rows

Mean sales for the last 2 days

In [71]:
w = Window().partitionBy("Store").orderBy("Date")

retail_new_df = retail_new_df.withColumn("avg_sales_last_5days",
                                       ( lag("Sales", 1).over(w)
                                        + lag("Sales", 2).over(w)
                                        + lag("Sales", 3).over(w)
                                        + lag("Sales", 4).over(w)
                                        + lag("Sales", 5).over(w) ) / 5 )
In [72]:
retail_new_df.show( 10 )
+-----+---------+----------+-----+---------+----+-----+------------+-------------+-----+----+----------------------+-----------------------+--------------------+--------------------+
|Store|DayOfWeek|      Date|Sales|Customers|Open|Promo|StateHoliday|SchoolHoliday|month|year|next_day_state_holiday|next_day_school_holiday|avg_sales_last_2days|avg_sales_last_5days|
+-----+---------+----------+-----+---------+----+-----+------------+-------------+-----+----+----------------------+-----------------------+--------------------+--------------------+
|   31|        2|2013-01-01|    0|        0|   0|    0|           a|            1|    1|2013|                     0|                      1|                null|                null|
|   31|        3|2013-01-02| 5122|      567|   1|    0|           0|            1|    1|2013|                     0|                      1|                null|                null|
|   31|        4|2013-01-03| 5623|      578|   1|    0|           0|            1|    1|2013|                     0|                      1|              2561.0|                null|
|   31|        5|2013-01-04| 6140|      633|   1|    0|           0|            1|    1|2013|                     0|                      0|              5372.5|                null|
|   31|        6|2013-01-05| 5607|      624|   1|    0|           0|            0|    1|2013|                     0|                      0|              5881.5|                null|
|   31|        7|2013-01-06|    0|        0|   0|    0|           0|            0|    1|2013|                     0|                      0|              5873.5|              4498.4|
|   31|        1|2013-01-07| 7857|      743|   1|    1|           0|            0|    1|2013|                     0|                      0|              2803.5|              4498.4|
|   31|        2|2013-01-08| 6480|      612|   1|    1|           0|            0|    1|2013|                     0|                      0|              3928.5|              5045.4|
|   31|        3|2013-01-09| 6335|      571|   1|    1|           0|            0|    1|2013|                     0|                      0|              7168.5|              5216.8|
|   31|        4|2013-01-10| 6066|      619|   1|    1|           0|            0|    1|2013|                     0|                      0|              6407.5|              5255.8|
+-----+---------+----------+-----+---------+----+-----+------------+-------------+-----+----+----------------------+-----------------------+--------------------+--------------------+
only showing top 10 rows

Average sales per store for each month

In [73]:
avg_sales_store_by_month = retail_new_df.groupBy( "Store", "month" ).avg( "Sales" )
In [74]:
avg_sales_store_by_month.show( 5 )
+-----+-----+-----------------+
|Store|month|       avg(Sales)|
+-----+-----+-----------------+
|   31|    1|4721.182795698925|
|   31|    2|4722.166666666667|
|   31|    3|4776.774193548387|
|   31|    4|4720.388888888889|
|   31|    5|4521.139784946236|
+-----+-----+-----------------+
only showing top 5 rows

In [75]:
retail_new_df = retail_new_df.join( avg_sales_store_by_month, on = ["Store", "month"], how = "inner" )
In [76]:
from pyspark.sql.functions import round
In [77]:
retail_new_df = retail_new_df.withColumn( "avg_sales_by_month",
                                       round( retail_new_df["avg(Sales)"], 2 ) )
In [93]:
store_sales_new_df = store_sales_df = retail_new_df.select('Store',
                                                   'DayOfWeek',
                                                   'Promo',
                                                   'StateHoliday',
                                                   'SchoolHoliday',
                                                   'month',
                                                   'year',
                                                   'Sales',
                                                   'next_day_state_holiday',
                                                   'next_day_school_holiday',
                                                   'avg_sales_last_2days',
                                                   'avg_sales_last_5days',
                                                   'avg_sales_by_month'
                                                  )
In [94]:
store_sales_new_df = store_sales_new_df.join( stores_df, on = "Store", how = "inner" )
In [95]:
store_sales_new_df.cache()
store_sales_new_df.show(5)
+-----+---------+-----+------------+-------------+-----+----+-----+----------------------+-----------------------+--------------------+--------------------+------------------+---------+----------+-------------------+-----------+
|Store|DayOfWeek|Promo|StateHoliday|SchoolHoliday|month|year|Sales|next_day_state_holiday|next_day_school_holiday|avg_sales_last_2days|avg_sales_last_5days|avg_sales_by_month|StoreType|Assortment|CompetitionDistance|comp_months|
+-----+---------+-----+------------+-------------+-----+----+-----+----------------------+-----------------------+--------------------+--------------------+------------------+---------+----------+-------------------+-----------+
|   31|        2|    0|           a|            1|    1|2013|    0|                     0|                      1|                null|                null|           4721.18|        d|         c|             9800.0|       41.0|
|   31|        3|    0|           0|            1|    1|2013| 5122|                     0|                      1|                null|                null|           4721.18|        d|         c|             9800.0|       41.0|
|   31|        4|    0|           0|            1|    1|2013| 5623|                     0|                      1|              2561.0|                null|           4721.18|        d|         c|             9800.0|       41.0|
|   31|        5|    0|           0|            1|    1|2013| 6140|                     0|                      0|              5372.5|                null|           4721.18|        d|         c|             9800.0|       41.0|
|   31|        6|    0|           0|            0|    1|2013| 5607|                     0|                      0|              5881.5|                null|           4721.18|        d|         c|             9800.0|       41.0|
+-----+---------+-----+------------+-------------+-----+----+-----+----------------------+-----------------------+--------------------+--------------------+------------------+---------+----------+-------------------+-----------+
only showing top 5 rows

Considering the new features

In [96]:
cat_features_new = cat_features + ['next_day_state_holiday', 'next_day_school_holiday']
cat_features_new
Out[96]:
['DayOfWeek',
'Promo',
'StateHoliday',
'SchoolHoliday',
'month',
'year',
'StoreType',
'Assortment',
'next_day_state_holiday',
'next_day_school_holiday']
In [97]:
continuous_features_new = continuous_features + ['avg_sales_last_2days', 'avg_sales_last_5days', 'avg_sales_by_month']
continuous_features_new
Out[97]:
['CompetitionDistance',
'comp_months',
'avg_sales_last_2days',
'avg_sales_last_5days',
'avg_sales_by_month']

Encoding the categorical features

In [98]:
store_sales_new_df.show(5)
+-----+---------+-----+------------+-------------+-----+----+-----+----------------------+-----------------------+--------------------+--------------------+------------------+---------+----------+-------------------+-----------+
|Store|DayOfWeek|Promo|StateHoliday|SchoolHoliday|month|year|Sales|next_day_state_holiday|next_day_school_holiday|avg_sales_last_2days|avg_sales_last_5days|avg_sales_by_month|StoreType|Assortment|CompetitionDistance|comp_months|
+-----+---------+-----+------------+-------------+-----+----+-----+----------------------+-----------------------+--------------------+--------------------+------------------+---------+----------+-------------------+-----------+
|   31|        2|    0|           a|            1|    1|2013|    0|                     0|                      1|                null|                null|           4721.18|        d|         c|             9800.0|       41.0|
|   31|        3|    0|           0|            1|    1|2013| 5122|                     0|                      1|                null|                null|           4721.18|        d|         c|             9800.0|       41.0|
|   31|        4|    0|           0|            1|    1|2013| 5623|                     0|                      1|              2561.0|                null|           4721.18|        d|         c|             9800.0|       41.0|
|   31|        5|    0|           0|            1|    1|2013| 6140|                     0|                      0|              5372.5|                null|           4721.18|        d|         c|             9800.0|       41.0|
|   31|        6|    0|           0|            0|    1|2013| 5607|                     0|                      0|              5881.5|                null|           4721.18|        d|         c|             9800.0|       41.0|
+-----+---------+-----+------------+-------------+-----+----+-----+----------------------+-----------------------+--------------------+--------------------+------------------+---------+----------+-------------------+-----------+
only showing top 5 rows

Dropping all observations will null values

In [99]:
store_sales_new_df = store_sales_new_df.na.drop( how = 'any' )
In [100]:
for col in cat_features_new:
store_sales_new_df = create_category_vars( store_sales_new_df, col )
store_sales_new_df.cache()
Out[100]:
DataFrame[Store: int, DayOfWeek: int, Promo: int, StateHoliday: string, SchoolHoliday: string, month: int, year: int, Sales: int, next_day_state_holiday: string, next_day_school_holiday: string, avg_sales_last_2days: double, avg_sales_last_5days: double, avg_sales_by_month: double, StoreType: string, Assortment: string, CompetitionDistance: float, comp_months: float, DayOfWeekIndex: double, DayOfWeekVec: vector, PromoIndex: double, PromoVec: vector, StateHolidayIndex: double, StateHolidayVec: vector, SchoolHolidayIndex: double, SchoolHolidayVec: vector, monthIndex: double, monthVec: vector, yearIndex: double, yearVec: vector, StoreTypeIndex: double, StoreTypeVec: vector, AssortmentIndex: double, AssortmentVec: vector, next_day_state_holidayIndex: double, next_day_state_holidayVec: vector, next_day_school_holidayIndex: double, next_day_school_holidayVec: vector]
In [101]:
categorical_vecs_new = [ "".join( (cat, "Vec") ) for cat in cat_features_new ]
In [102]:
categorical_vecs_new
Out[102]:
['DayOfWeekVec',
'PromoVec',
'StateHolidayVec',
'SchoolHolidayVec',
'monthVec',
'yearVec',
'StoreTypeVec',
'AssortmentVec',
'next_day_state_holidayVec',
'next_day_school_holidayVec']
In [103]:
all_features_new = continuous_features_new + categorical_vecs_new
all_features_new
Out[103]:
['CompetitionDistance',
'comp_months',
'avg_sales_last_2days',
'avg_sales_last_5days',
'avg_sales_by_month',
'DayOfWeekVec',
'PromoVec',
'StateHolidayVec',
'SchoolHolidayVec',
'monthVec',
'yearVec',
'StoreTypeVec',
'AssortmentVec',
'next_day_state_holidayVec',
'next_day_school_holidayVec']

Creating vector of all features and target variable

In [514]:
assembler = VectorAssembler( inputCols = all_features_new, outputCol = "features")
store_sales_new_df = assembler.transform( store_sales_new_df )
store_sales_new_df = store_sales_new_df.withColumn( "label", store_sales_new_df.Sales.cast( 'double' ) )
store_sales_new_df.select( "features", "label" ).show( 5 )
+--------------------+------+
|            features| label|
+--------------------+------+
|(38,[0,1,2,3,4,7,...|   0.0|
|(38,[0,1,2,3,4,6,...|7857.0|
|(38,[0,1,2,3,4,8,...|6480.0|
|(38,[0,1,2,3,4,9,...|6335.0|
|(38,[0,1,2,3,4,5,...|6066.0|
+--------------------+------+
only showing top 5 rows

Splitting dataset

In [515]:
seed = 42

train_df, test_df = store_sales_new_df.randomSplit( [0.7, 0.3], seed = seed )

Building the model

linreg = LinearRegression(maxIter=500, regParam=0.0) lm = linreg.fit( train_df )

Model intercept and coefficients

In [517]:
lm.intercept
Out[517]:
-4942.362244628821
In [518]:
lm.coefficients
Out[518]:
DenseVector([-0.0001, 0.0244, 0.0674, -0.1336, 1.0653, -616.8848, 989.3013, -5765.3521, -104.0695, -686.0616, -225.6114, -2226.2976, 6152.6877, -113.3646, -1269.006, -212.0137, 391.7585, -76.7732, 41.3081, 84.8828, 23.7344, -404.927, -92.8137, 517.9586, -118.741, 69.375, 23.075, -310.9126, -167.1236, 41.258, 52.0254, 34.2045, -1.9185, -7.8461, 1588.279, 2319.0705, 2738.8773, -42.2212])

Predicting on test dataset

In [519]:
y_pred = lm.transform( test_df )
y_pred.select( 'features', 'label', 'prediction' ).show( 5 )
+--------------------+------+------------------+
|            features| label|        prediction|
+--------------------+------+------------------+
|(38,[0,1,2,3,4,6,...|7857.0| 7908.488666553771|
|(38,[0,1,2,3,4,8,...|6480.0| 6817.886851625741|
|(38,[0,1,2,3,4,9,...|6335.0| 6431.487755155953|
|(38,[0,1,2,3,4,11...|5286.0|4835.0059206956485|
|(38,[0,1,2,3,4,8,...|5728.0| 6871.911017768358|
+--------------------+------+------------------+
only showing top 5 rows

/home/hadoop/lab/software/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/regression.py:123: UserWarning: weights is deprecated. Use coefficients instead.
warnings.warn("weights is deprecated. Use coefficients instead.")

Calculating model accuracy: RMSE and R-Squared

In [520]:
r2_evaluator = RegressionEvaluator(labelCol="label",
                            predictionCol="prediction",
                            metricName="r2" )

lm_r2 = r2_evaluator.evaluate( y_pred )
lm_r2
Out[520]:
0.8213964184360055
In [521]:
rmse_evaluator = RegressionEvaluator(labelCol="label",
                            predictionCol="prediction",
                            metricName="rmse" )

lm_rmse = rmse_evaluator.evaluate( y_pred )
lm_rmse
Out[521]:
1631.2022849742

Note:

The new model has good prediction accuracy. The new variables seem to have increased accuracy of the model.