Introduction

Rossmann operates over 3,000 drug stores in 7 European countries. Currently, Rossmann store managers are tasked with predicting their daily sales for up to six weeks in advance. Store sales are influenced by many factors, including promotions, competition, school and state holidays, seasonality, and locality. With thousands of individual managers predicting sales based on their unique circumstances, the accuracy of results can be quite varied.

This dataset is taken from a kaggle competition and is available here

The dataset contains historical sales data for 1,115 Rossmann stores.

  • train.csv - historical data including Sales
  • store.csv - supplemental information about the stores

This tutorial will explore the data to gain insights into sales and how various parameters are influencing sales across stores. This tutorial will use spark framework to explore the data.

  • Run this tutorial on hadoop deployment, so that HiveContext is initialized and all the steps will run correctely.
In [1]:
sc
Out[1]:
<pyspark.context.SparkContext at 0x7f1cf81ed278>
In [2]:
sqlContext
Out[2]:
<pyspark.sql.context.HiveContext at 0x7f1cddc97d30>

Loading the dataset

Load and cache the dataset

In [3]:
retail_df = sqlContext.read.format("com.databricks.spark.csv")            \
        .options(delimiter=',')                                         \
        .options( header=True)                                          \
        .load('file:///home/hadoop/lab/data/rossmann/train.csv')
In [4]:
retail_df.cache()
Out[4]:
DataFrame[Store: string, DayOfWeek: string, Date: string, Sales: string, Customers: string, Open: string, Promo: string, StateHoliday: string, SchoolHoliday: string]
In [5]:
retail_df.show( 5 )
+-----+---------+----------+-----+---------+----+-----+------------+-------------+
|Store|DayOfWeek|      Date|Sales|Customers|Open|Promo|StateHoliday|SchoolHoliday|
+-----+---------+----------+-----+---------+----+-----+------------+-------------+
|    1|        5|2015-07-31| 5263|      555|   1|    1|           0|            1|
|    2|        5|2015-07-31| 6064|      625|   1|    1|           0|            1|
|    3|        5|2015-07-31| 8314|      821|   1|    1|           0|            1|
|    4|        5|2015-07-31|13995|     1498|   1|    1|           0|            1|
|    5|        5|2015-07-31| 4822|      559|   1|    1|           0|            1|
+-----+---------+----------+-----+---------+----+-----+------------+-------------+
only showing top 5 rows

How many records?

In [18]:
retail_df.count()
Out[18]:
1017209
In [19]:
retail_df.printSchema()
root
|-- Store: integer (nullable = true)
|-- DayOfWeek: integer (nullable = true)
|-- Date: date (nullable = true)
|-- Sales: integer (nullable = true)
|-- Customers: integer (nullable = true)
|-- Open: integer (nullable = true)
|-- Promo: integer (nullable = true)
|-- StateHoliday: string (nullable = true)
|-- SchoolHoliday: string (nullable = true)

From kaggle site, the column definitions are as follows

  • Store - a unique Id for each store
  • Sales - the turnover for any given day (this is what you are predicting)
  • Customers - the number of customers on a given day
  • Open - an indicator for whether the store was open: 0 = closed, 1 = open
  • StateHoliday - indicates a state holiday. Normally all stores, with few exceptions, are closed on state holidays. Note that all schools are closed on public holidays and weekends. a = public holiday, b = Easter holiday, c = Christmas, 0 = None
  • SchoolHoliday - indicates if the (Store, Date) was affected by the closure of public schools
  • Promo - indicates whether a store is running a promo on that day

Read records using custom schema

List of all data types supported by spark dataframe is available here

In [20]:
from pyspark.sql.types import *
fields = [StructField("Store", IntegerType(), True),
       StructField("DayOfWeek", IntegerType(), True),
       StructField("Date", DateType(), True),
       StructField("Sales", IntegerType(), True),
       StructField("Customers", IntegerType(), True),
       StructField("Open", IntegerType(), True),
       StructField("Promo", IntegerType(), True),
       StructField("StateHoliday", StringType(), True),
       StructField("SchoolHoliday", StringType(), True)]
In [21]:
retail_df = sqlContext.read.format("com.databricks.spark.csv")            \
        .options(delimiter=',')                                         \
        .options( header=True)                                          \
        .load('file:///home/hadoop/lab/data/rossmann/train.csv',
        schema = StructType(fields) )

retail_df.cache()
Out[21]:
DataFrame[Store: int, DayOfWeek: int, Date: date, Sales: int, Customers: int, Open: int, Promo: int, StateHoliday: string, SchoolHoliday: string]
In [22]:
retail_df.show( 5 )
+-----+---------+----------+-----+---------+----+-----+------------+-------------+
|Store|DayOfWeek|      Date|Sales|Customers|Open|Promo|StateHoliday|SchoolHoliday|
+-----+---------+----------+-----+---------+----+-----+------------+-------------+
|    1|        5|2015-07-31| 5263|      555|   1|    1|           0|            1|
|    2|        5|2015-07-31| 6064|      625|   1|    1|           0|            1|
|    3|        5|2015-07-31| 8314|      821|   1|    1|           0|            1|
|    4|        5|2015-07-31|13995|     1498|   1|    1|           0|            1|
|    5|        5|2015-07-31| 4822|      559|   1|    1|           0|            1|
+-----+---------+----------+-----+---------+----+-----+------------+-------------+
only showing top 5 rows

In [23]:
retail_df.columns
Out[23]:
['Store',
'DayOfWeek',
'Date',
'Sales',
'Customers',
'Open',
'Promo',
'StateHoliday',
'SchoolHoliday']

Selection and Filter

Selecting few columns from a dataframe

In [24]:
sales_custs_df = retail_df.select( "Store", "Sales", "Customers" )
In [25]:
sales_custs_df.show( 5 )
+-----+-----+---------+
|Store|Sales|Customers|
+-----+-----+---------+
|    1| 5263|      555|
|    2| 6064|      625|
|    3| 8314|      821|
|    4|13995|     1498|
|    5| 4822|      559|
+-----+-----+---------+
only showing top 5 rows

Filtering records based on a condition

In [26]:
retails_open_df = retail_df.where( retail_df.Open > 0  )
In [27]:
retails_open_df.show( 5 )
+-----+---------+----------+-----+---------+----+-----+------------+-------------+
|Store|DayOfWeek|      Date|Sales|Customers|Open|Promo|StateHoliday|SchoolHoliday|
+-----+---------+----------+-----+---------+----+-----+------------+-------------+
|    1|        5|2015-07-31| 5263|      555|   1|    1|           0|            1|
|    2|        5|2015-07-31| 6064|      625|   1|    1|           0|            1|
|    3|        5|2015-07-31| 8314|      821|   1|    1|           0|            1|
|    4|        5|2015-07-31|13995|     1498|   1|    1|           0|            1|
|    5|        5|2015-07-31| 4822|      559|   1|    1|           0|            1|
+-----+---------+----------+-----+---------+----+-----+------------+-------------+
only showing top 5 rows

How many stores remained "Open" on days when it was both school and state holiday?

In [28]:
holidays_df = retail_df.where( ( retail_df.StateHoliday == 1 ) & ( retail_df.SchoolHoliday == 1 ) )
In [29]:
holidays_df.where( holidays_df.Open == 0 ).show( 1 )
+-----+---------+----+-----+---------+----+-----+------------+-------------+
|Store|DayOfWeek|Date|Sales|Customers|Open|Promo|StateHoliday|SchoolHoliday|
+-----+---------+----+-----+---------+----+-----+------------+-------------+
+-----+---------+----+-----+---------+----+-----+------------+-------------+

  • None of those days any store was opened?

How many stores?

In [30]:
store_ids = retail_df.select( retail_df.Store ).distinct()
In [31]:
store_ids.count()
Out[31]:
1115

There are 1115 stores in rossmann.

Which weekday have maximum number of promos running

In [32]:
weekday_promos = retail_df.stat.crosstab( "DayOfWeek" , "Promo" )
In [33]:
weekday_promos.show( 10 )
+---------------+------+-----+
|DayOfWeek_Promo|     0|    1|
+---------------+------+-----+
|              5| 68265|77580|
|              1| 66970|77760|
|              6|144730|    0|
|              2| 68084|77580|
|              7|144730|    0|
|              3| 68085|77580|
|              4| 68265|77580|
+---------------+------+-----+

Rename columns

In [34]:
weekday_promos = weekday_promos.withColumnRenamed( "DayOfWeek_Promo", "DayOfWeek" )  \
                             .withColumnRenamed( "0", "NoPromo" )                  \
                             .withColumnRenamed( "1","Promo" )
In [35]:
weekday_promos.show(5)
+---------+-------+-----+
|DayOfWeek|NoPromo|Promo|
+---------+-------+-----+
|        5|  68265|77580|
|        1|  66970|77760|
|        6| 144730|    0|
|        2|  68084|77580|
|        7| 144730|    0|
+---------+-------+-----+
only showing top 5 rows

Sorting dataframe by column in ascending and descending order

In [36]:
weekday_promos.sort( "Promo" ).show()
+---------+-------+-----+
|DayOfWeek|NoPromo|Promo|
+---------+-------+-----+
|        7| 144730|    0|
|        6| 144730|    0|
|        2|  68084|77580|
|        3|  68085|77580|
|        4|  68265|77580|
|        5|  68265|77580|
|        1|  66970|77760|
+---------+-------+-----+

In [37]:
weekday_promos.sort( "Promo", ascending = False ).show()
+---------+-------+-----+
|DayOfWeek|NoPromo|Promo|
+---------+-------+-----+
|        1|  66970|77760|
|        5|  68265|77580|
|        2|  68084|77580|
|        4|  68265|77580|
|        3|  68085|77580|
|        6| 144730|    0|
|        7| 144730|    0|
+---------+-------+-----+

Note:

No promos running on weekends. This may be because sales are typically high on the weekends.

Assignment: Find sales revenue for each weekday.

Add a month column

In [38]:
from pyspark.sql.functions import month, year
In [39]:
retail_df = retail_df.withColumn( 'month', month( retail_df.Date ) )
In [40]:
retail_df = retail_df.withColumn( 'year', year( retail_df.Date ) )
In [41]:
retail_df.show( 5 )
+-----+---------+----------+-----+---------+----+-----+------------+-------------+-----+----+
|Store|DayOfWeek|      Date|Sales|Customers|Open|Promo|StateHoliday|SchoolHoliday|month|year|
+-----+---------+----------+-----+---------+----+-----+------------+-------------+-----+----+
|    1|        5|2015-07-31| 5263|      555|   1|    1|           0|            1|    7|2015|
|    2|        5|2015-07-31| 6064|      625|   1|    1|           0|            1|    7|2015|
|    3|        5|2015-07-31| 8314|      821|   1|    1|           0|            1|    7|2015|
|    4|        5|2015-07-31|13995|     1498|   1|    1|           0|            1|    7|2015|
|    5|        5|2015-07-31| 4822|      559|   1|    1|           0|            1|    7|2015|
+-----+---------+----------+-----+---------+----+-----+------------+-------------+-----+----+
only showing top 5 rows

Data Aggregations

Sales Contribution by stores

In [42]:
sales_by_stores = retail_df.groupBy( "Store" ).sum( "Sales" )
In [43]:
sales_by_stores.show( 5 )
+-----+----------+
|Store|sum(Sales)|
+-----+----------+
|   31|   4596143|
|  231|   2967114|
|  431|   7419918|
|  631|   4233841|
|  831|   9884733|
+-----+----------+
only showing top 5 rows

Multiple Aggregation

In [44]:
sales_by_stores_custs = retail_df.groupBy( "Store" ).agg( { "Sales" :"sum", "Customers": "sum" } )
In [45]:
sales_by_stores_custs.show( 5 )
+-----+----------+--------------+
|Store|sum(Sales)|sum(Customers)|
+-----+----------+--------------+
|   31|   4596143|        459464|
|  231|   2967114|        245380|
|  431|   7419918|        780912|
|  631|   4233841|        500538|
|  831|   9884733|       1228453|
+-----+----------+--------------+
only showing top 5 rows

Top 10 stores by sales contribution

In [46]:
sales_by_stores_custs.sort( "sum(Sales)", ascending = False ).show( 10 )
+-----+----------+--------------+
|Store|sum(Sales)|sum(Customers)|
+-----+----------+--------------+
|  262|  19516842|       3204694|
|  817|  17057867|       2454370|
|  562|  16927322|       2924960|
| 1114|  16202585|       2509542|
|  251|  14896870|       1908934|
|  513|  14252406|       1643527|
|  788|  14082141|       1346835|
|  733|  14067158|       3206058|
|  383|  13489879|       1720249|
|  756|  12911782|       1827980|
+-----+----------+--------------+
only showing top 10 rows

Average purchase value by customers in each store

In [47]:
from pyspark.sql.functions import col, round
In [48]:
sales_by_stores_custs = sales_by_stores_custs.withColumn(
  "avg_purchase_val",
  round( col( "sum(Sales)" ) / col( "sum(Customers)" ), 2) )
In [49]:
sales_by_stores_custs.show( 10 )
+-----+----------+--------------+----------------+
|Store|sum(Sales)|sum(Customers)|avg_purchase_val|
+-----+----------+--------------+----------------+
|   31|   4596143|        459464|            10.0|
|  231|   2967114|        245380|           12.09|
|  431|   7419918|        780912|             9.5|
|  631|   4233841|        500538|            8.46|
|  831|   9884733|       1228453|            8.05|
| 1031|   3773364|        325598|           11.59|
|   32|   2526117|        296333|            8.52|
|  232|   2583221|        232738|            11.1|
|  432|   8610571|       1218907|            7.06|
|  632|   5510559|        708433|            7.78|
+-----+----------+--------------+----------------+
only showing top 10 rows

In [50]:
avg_purchse_val = sales_by_stores_custs.sort( "avg_purchase_val", ascending = False )
In [51]:
import matplotlib.pyplot as plt
import seaborn as sn
%matplotlib inline
:0: FutureWarning: IPython widgets are experimental and may change in the future.

Collecting the data as pandas dataframe

In [52]:
top_10_avg_purchse_val = avg_purchse_val.toPandas()
In [53]:
top_10_avg_purchse_val.head( 10 )
Out[53]:
Store sum(Sales) sum(Customers) avg_purchase_val
0 842 11553523 714829 16.16
1 612 4246423 264263 16.07
2 455 5727571 359933 15.91
3 158 6354198 405068 15.69
4 52 4607742 313176 14.71
5 868 7480652 509296 14.69
6 1115 4922229 337884 14.57
7 56 5718290 395919 14.44
8 540 3513672 243396 14.44
9 903 7300878 508657 14.35

Visualizing top 10 stores by avg_purchase_val

In [54]:
sn.set(rc={"figure.figsize": (10, 6)});

ax = sn.barplot( x = 'Store',
         y = 'avg_purchase_val' ,
         data = top_10_avg_purchse_val[0:10],
         order = top_10_avg_purchse_val.Store[0:10] )

ax.set( xlabel='Store Number',
     ylabel='Total Sales')
plt.show()

Loading Stores Data

In [55]:
stores_df = sqlContext.read.format("com.databricks.spark.csv")            \
        .options(delimiter=',')                                         \
        .options( header=True)                                          \
        .load('file:///home/hadoop/lab/data/rossmann/store.csv')
In [56]:
stores_df.show( 5 )
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+---------------+
|Store|StoreType|Assortment|CompetitionDistance|CompetitionOpenSinceMonth|CompetitionOpenSinceYear|Promo2|Promo2SinceWeek|Promo2SinceYear|  PromoInterval|
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+---------------+
|    1|        c|         a|               1270|                        9|                    2008|     0|               |               |               |
|    2|        a|         a|                570|                       11|                    2007|     1|             13|           2010|Jan,Apr,Jul,Oct|
|    3|        a|         a|              14130|                       12|                    2006|     1|             14|           2011|Jan,Apr,Jul,Oct|
|    4|        c|         c|                620|                        9|                    2009|     0|               |               |               |
|    5|        a|         a|              29910|                        4|                    2015|     0|               |               |               |
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+---------------+
only showing top 5 rows

Again, from kaggle site, the column defintions are as follows:

  • StoreType - differentiates between 4 different store models: a, b, c, d
  • Assortment - describes an assortment level: a = basic, b = extra, c = extended
  • CompetitionDistance - distance in meters to the nearest competitor store
  • CompetitionOpenSince[Month/Year] - gives the approximate year and month of the time the nearest competitor was opened
  • Promo - indicates whether a store is running a promo on that day
  • Promo2 - Promo2 is a continuing and consecutive promotion for some stores: 0 = store is not participating, 1 = store is participating
  • Promo2Since[Year/Week] - describes the year and calendar week when the store started participating in Promo2
  • PromoInterval - describes the consecutive intervals Promo2 is started, naming the months the promotion is started anew. E.g. "Feb,May,Aug,Nov" means each round starts in February, May, August, November of any given year for that store
In [57]:
stores_df.printSchema()
root
|-- Store: string (nullable = true)
|-- StoreType: string (nullable = true)
|-- Assortment: string (nullable = true)
|-- CompetitionDistance: string (nullable = true)
|-- CompetitionOpenSinceMonth: string (nullable = true)
|-- CompetitionOpenSinceYear: string (nullable = true)
|-- Promo2: string (nullable = true)
|-- Promo2SinceWeek: string (nullable = true)
|-- Promo2SinceYear: string (nullable = true)
|-- PromoInterval: string (nullable = true)

In [58]:
stores_df.columns
Out[58]:
['Store',
'StoreType',
'Assortment',
'CompetitionDistance',
'CompetitionOpenSinceMonth',
'CompetitionOpenSinceYear',
'Promo2',
'Promo2SinceWeek',
'Promo2SinceYear',
'PromoInterval']

How many different types of stores are there?

In [59]:
store_by_types = stores_df.groupBy( "StoreType" ).count()
In [60]:
store_by_types.show()
+---------+-----+
|StoreType|count|
+---------+-----+
|        a|  602|
|        b|   17|
|        c|  148|
|        d|  348|
+---------+-----+

Different store types in percentages

Total Number of stores

In [61]:
total = stores_df.count()
In [62]:
total
Out[62]:
1115
In [63]:
from pyspark.sql.functions import lit
In [64]:
store_by_types.withColumn( "percentage", round( col( "count" ) / lit( total ), 2) ).show()
+---------+-----+----------+
|StoreType|count|percentage|
+---------+-----+----------+
|        a|  602|      0.54|
|        b|   17|      0.02|
|        c|  148|      0.13|
|        d|  348|      0.31|
+---------+-----+----------+

How long competitions exist for the stores?

Write a function to calculate the total months passed since competition is opened?

In [65]:
stores_df = stores_df.withColumn(
  "CompetitionOpenSinceMonth",
  stores_df["CompetitionOpenSinceMonth"].cast( 'float' ) )

stores_df = stores_df.withColumn(
  "CompetitionOpenSinceYear",
  stores_df["CompetitionOpenSinceYear"].cast( 'float' ) )
In [66]:
stores_df = stores_df.fillna( 0.0 )
In [67]:
stores_df.where( col( "CompetitionOpenSinceYear" ).isNull() ).show()
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+-------------+
|Store|StoreType|Assortment|CompetitionDistance|CompetitionOpenSinceMonth|CompetitionOpenSinceYear|Promo2|Promo2SinceWeek|Promo2SinceYear|PromoInterval|
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+-------------+
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+-------------+

In [68]:
stores_df.where( col( "CompetitionOpenSinceMonth" ).isNull() ).show()
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+-------------+
|Store|StoreType|Assortment|CompetitionDistance|CompetitionOpenSinceMonth|CompetitionOpenSinceYear|Promo2|Promo2SinceWeek|Promo2SinceYear|PromoInterval|
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+-------------+
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+-------------+

In [69]:
from dateutil.relativedelta import relativedelta
from datetime import datetime
import math

def diff_in_months( fromYear, fromMonth ):
  if (fromYear == 0.0) or (fromMonth == 0.0):
      return 0.0
  else:
      return ( 2015.0 - fromYear ) * 12.0 + ( 12.0 - fromMonth )
In [70]:
diff_in_months( 2012, 3 )
Out[70]:
45.0
In [71]:
from pyspark.sql.functions import udf, array
In [72]:
comp_months_udf = udf( diff_in_months, FloatType() )
In [73]:
stores_df.printSchema()
root
|-- Store: string (nullable = true)
|-- StoreType: string (nullable = true)
|-- Assortment: string (nullable = true)
|-- CompetitionDistance: string (nullable = true)
|-- CompetitionOpenSinceMonth: double (nullable = false)
|-- CompetitionOpenSinceYear: double (nullable = false)
|-- Promo2: string (nullable = true)
|-- Promo2SinceWeek: string (nullable = true)
|-- Promo2SinceYear: string (nullable = true)
|-- PromoInterval: string (nullable = true)

In [74]:
stores_df.printSchema()
root
|-- Store: string (nullable = true)
|-- StoreType: string (nullable = true)
|-- Assortment: string (nullable = true)
|-- CompetitionDistance: string (nullable = true)
|-- CompetitionOpenSinceMonth: double (nullable = false)
|-- CompetitionOpenSinceYear: double (nullable = false)
|-- Promo2: string (nullable = true)
|-- Promo2SinceWeek: string (nullable = true)
|-- Promo2SinceYear: string (nullable = true)
|-- PromoInterval: string (nullable = true)

In [75]:
stores_df = stores_df.withColumn( "comp_months",
                   comp_months_udf( stores_df.CompetitionOpenSinceYear,
                                stores_df.CompetitionOpenSinceMonth ) )

stores_df.cache()
Out[75]:
DataFrame[Store: string, StoreType: string, Assortment: string, CompetitionDistance: string, CompetitionOpenSinceMonth: double, CompetitionOpenSinceYear: double, Promo2: string, Promo2SinceWeek: string, Promo2SinceYear: string, PromoInterval: string, comp_months: float]
In [76]:
stores_df.show(2)
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+---------------+-----------+
|Store|StoreType|Assortment|CompetitionDistance|CompetitionOpenSinceMonth|CompetitionOpenSinceYear|Promo2|Promo2SinceWeek|Promo2SinceYear|  PromoInterval|comp_months|
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+---------------+-----------+
|    1|        c|         a|               1270|                      9.0|                  2008.0|     0|               |               |               |       87.0|
|    2|        a|         a|                570|                     11.0|                  2007.0|     1|             13|           2010|Jan,Apr,Jul,Oct|       97.0|
+-----+---------+----------+-------------------+-------------------------+------------------------+------+---------------+---------------+---------------+-----------+
only showing top 2 rows

Advanced OLAP functions

Creating multi-dimensional cubes for sales on customers, month, year, promo, weekday dimensions

In [97]:
retail_df.printSchema()
root
|-- Store: integer (nullable = true)
|-- DayOfWeek: integer (nullable = true)
|-- Date: date (nullable = true)
|-- Sales: integer (nullable = true)
|-- Customers: integer (nullable = true)
|-- Open: integer (nullable = true)
|-- Promo: integer (nullable = true)
|-- StateHoliday: string (nullable = true)
|-- SchoolHoliday: string (nullable = true)
|-- month: integer (nullable = true)
|-- year: integer (nullable = true)

In [98]:
sales_cubes = retail_df.cube( "year", "month", "DayOfWeek" ).avg("Sales")
In [99]:
sales_cubes = sales_cubes.sort( "year", "month", "DayOfWeek",
                             ascending = [False, False, False] )
In [100]:
sales_cubes.cache()
Out[100]:
DataFrame[year: int, month: int, DayOfWeek: int, avg(Sales): double]
In [101]:
sales_cubes.show(65)
+----+-----+---------+------------------+
|year|month|DayOfWeek|        avg(Sales)|
+----+-----+---------+------------------+
|2015|    7|        7|253.49282511210762|
|2015|    7|        6| 5557.611210762332|
|2015|    7|        5| 7220.753542600897|
|2015|    7|        4|7039.3309417040355|
|2015|    7|        3| 7044.266188340807|
|2015|    7|        2| 6996.633183856502|
|2015|    7|        1| 8167.792152466368|
|2015|    7|     null| 6142.705511355417|
|2015|    6|        7| 234.2035874439462|
|2015|    6|        6|5811.1556053811655|
|2015|    6|        5| 7045.798878923767|
|2015|    6|        4| 5491.128699551569|
|2015|    6|        3| 6980.366367713004|
|2015|    6|        2| 7929.010582959641|
|2015|    6|        1| 8816.090762331838|
|2015|    6|     null| 6199.203976083707|
|2015|    5|        7| 232.5886995515695|
|2015|    5|        6| 6590.219910313901|
|2015|    5|        5|6143.7431390134525|
|2015|    5|        4| 5390.822197309417|
|2015|    5|        3| 7177.902242152466|
|2015|    5|        2| 7212.469058295964|
|2015|    5|        1| 6419.562331838565|
|2015|    5|     null|  5472.12200202517|
|2015|    4|        7|209.32780269058296|
|2015|    4|        6| 6431.994170403587|
|2015|    4|        5|4936.6813901345295|
|2015|    4|        4| 7863.491121076233|
|2015|    4|        3|7443.0651121076235|
|2015|    4|        2|7265.6174887892375|
|2015|    4|        1| 6399.615695067265|
|2015|    4|     null| 5916.857578475337|
|2015|    3|        7|203.92502242152466|
|2015|    3|        6| 5923.364125560538|
|2015|    3|        5| 6909.351793721973|
|2015|    3|        4| 6519.332735426009|
|2015|    3|        3| 6448.600448430493|
|2015|    3|        2| 7477.686098654708|
|2015|    3|        1| 8562.476412556054|
|2015|    3|     null| 5949.130131636048|
|2015|    2|        7|190.68183856502242|
|2015|    2|        6| 6055.761659192825|
|2015|    2|        5| 7068.506726457399|
|2015|    2|        4| 6418.807847533632|
|2015|    2|        3| 6416.215919282511|
|2015|    2|        2| 6630.320403587444|
|2015|    2|        1| 7191.781390134529|
|2015|    2|     null| 5710.296540679052|
|2015|    1|        7| 169.3645739910314|
|2015|    1|        6| 5761.383139013453|
|2015|    1|        5|7128.9840358744395|
|2015|    1|        4| 5293.223497757847|
|2015|    1|        3|6797.2262331838565|
|2015|    1|        2|  6606.85067264574|
|2015|    1|        1| 8280.866143497758|
|2015|    1|     null| 5752.747866338782|
|2015| null|        7|213.69503736920777|
|2015| null|        6| 6029.252077727952|
|2015| null|        5| 6655.121249819182|
|2015| null|        4| 6330.986894257197|
|2015| null|        3|6923.9300448430495|
|2015| null|        2| 7196.034887892377|
|2015| null|        1| 7757.710224215247|
|2015| null|     null| 5878.245380319824|
|2014|   12|        7|238.62513368983957|
+----+-----+---------+------------------+
only showing top 65 rows

In [102]:
custs_rollups = retail_df.rollup( "year", "month", "DayOfWeek" ).sum("Customers")
In [103]:
custs_rollups = custs_rollups.sort( "year", "month", "DayOfWeek",
                             ascending = [False, False, False] )
In [104]:
custs_rollups.cache()
Out[104]:
DataFrame[year: int, month: int, DayOfWeek: int, sum(Customers): bigint]
In [105]:
custs_rollups.show(60)
+----+-----+---------+--------------+
|year|month|DayOfWeek|sum(Customers)|
+----+-----+---------+--------------+
|2015|    7|        7|        188144|
|2015|    7|        6|       2699949|
|2015|    7|        5|       4229917|
|2015|    7|        4|       4135350|
|2015|    7|        3|       4084639|
|2015|    7|        2|       3270527|
|2015|    7|        1|       3645362|
|2015|    7|     null|      22253888|
|2015|    6|        7|        177292|
|2015|    6|        6|       2817690|
|2015|    6|        5|       3381020|
|2015|    6|        4|       2737228|
|2015|    6|        3|       3325603|
|2015|    6|        2|       4438681|
|2015|    6|        1|       4767615|
|2015|    6|     null|      21645129|
|2015|    5|        7|        217516|
|2015|    5|        6|       3921955|
|2015|    5|        5|       3663158|
|2015|    5|        4|       2576555|
|2015|    5|        3|       3432759|
|2015|    5|        2|       3441435|
|2015|    5|        1|       2940470|
|2015|    5|     null|      20193848|
|2015|    4|        7|        161366|
|2015|    4|        6|       3120909|
|2015|    4|        5|       2473489|
|2015|    4|        4|       4618265|
|2015|    4|        3|       4321340|
|2015|    4|        2|       3477838|
|2015|    4|        1|       2880784|
|2015|    4|     null|      21053991|
|2015|    3|        7|        193657|
|2015|    3|        6|       2874050|
|2015|    3|        5|       3343678|
|2015|    3|        4|       3215772|
|2015|    3|        3|       3161923|
|2015|    3|        2|       4276840|
|2015|    3|        1|       4743721|
|2015|    3|     null|      21809641|
|2015|    2|        7|        146914|
|2015|    2|        6|       2954720|
|2015|    2|        5|       3405752|
|2015|    2|        4|       3185048|
|2015|    2|        3|       3169238|
|2015|    2|        2|       3235330|
|2015|    2|        1|       3384379|
|2015|    2|     null|      19481381|
|2015|    1|        7|        131477|
|2015|    1|        6|       3489789|
|2015|    1|        5|       4222693|
|2015|    1|        4|       3163920|
|2015|    1|        3|       3211087|
|2015|    1|        2|       3148225|
|2015|    1|        1|       3743290|
|2015|    1|     null|      21110481|
|2015| null|     null|     147548359|
|2014|   12|        7|        146729|
|2014|   12|        6|       2926291|
|2014|   12|        5|       2639423|
+----+-----+---------+--------------+
only showing top 60 rows

Joining two dataframes

In [106]:
stores_limited_df = stores_df.select( 'Store',
                                    'StoreType',
                                    'Assortment',
                                    'CompetitionDistance',
                                    'comp_months')

stores_limited_df.show(10)
+-----+---------+----------+-------------------+-----------+
|Store|StoreType|Assortment|CompetitionDistance|comp_months|
+-----+---------+----------+-------------------+-----------+
|    1|        c|         a|               1270|       87.0|
|    2|        a|         a|                570|       97.0|
|    3|        a|         a|              14130|      108.0|
|    4|        c|         c|                620|       75.0|
|    5|        a|         a|              29910|        8.0|
|    6|        a|         a|                310|       24.0|
|    7|        a|         c|              24000|       32.0|
|    8|        a|         a|               7520|       14.0|
|    9|        a|         c|               2030|      184.0|
|   10|        a|         a|               3160|       75.0|
+-----+---------+----------+-------------------+-----------+
only showing top 10 rows

In [75]:
sales_by_stores_custs.show(10)
+-----+----------+--------------+----------------+
|Store|sum(Sales)|sum(Customers)|avg_purchase_val|
+-----+----------+--------------+----------------+
|   31|   4596143|        459464|            10.0|
|  231|   2967114|        245380|           12.09|
|  431|   7419918|        780912|             9.5|
|  631|   4233841|        500538|            8.46|
|  831|   9884733|       1228453|            8.05|
| 1031|   3773364|        325598|           11.59|
|   32|   2526117|        296333|            8.52|
|  232|   2583221|        232738|            11.1|
|  432|   8610571|       1218907|            7.06|
|  632|   5510559|        708433|            7.78|
+-----+----------+--------------+----------------+
only showing top 10 rows

In [76]:
all_stores_df = stores_limited_df.join( sales_by_stores_custs,
                                     on = ["Store"], how = 'inner' )
In [78]:
all_stores_df.show( 10 )
+-----+---------+----------+-------------------+-----------+----------+--------------+----------------+
|Store|StoreType|Assortment|CompetitionDistance|comp_months|sum(Sales)|sum(Customers)|avg_purchase_val|
+-----+---------+----------+-------------------+-----------+----------+--------------+----------------+
|   31|        d|         c|               9800|       null|   4596143|        459464|            10.0|
|  231|        d|         c|               3840|       null|   2967114|        245380|           12.09|
|  431|        d|         c|               4520|          0|   7419918|        780912|             9.5|
|  631|        d|         c|               2870|          0|   4233841|        500538|            8.46|
|  831|        a|         a|                800|       null|   9884733|       1228453|            8.05|
| 1031|        d|         a|                590|       null|   3773364|        325598|           11.59|
|   32|        a|         a|               2910|          0|   2526117|        296333|            8.52|
|  232|        c|         c|              13570|       null|   2583221|        232738|            11.1|
|  432|        a|         a|                810|       null|   8610571|       1218907|            7.06|
|  632|        a|         a|               3350|       null|   5510559|        708433|            7.78|
+-----+---------+----------+-------------------+-----------+----------+--------------+----------------+
only showing top 10 rows

Statistical Functions

In [79]:
all_stores_df.printSchema()
root
|-- Store: string (nullable = true)
|-- StoreType: string (nullable = true)
|-- Assortment: string (nullable = true)
|-- CompetitionDistance: string (nullable = true)
|-- comp_months: integer (nullable = true)
|-- sum(Sales): long (nullable = true)
|-- sum(Customers): long (nullable = true)
|-- avg_purchase_val: double (nullable = true)

In [80]:
all_stores_stats = all_stores_df.describe()
In [81]:
all_stores_stats.show()
+-------+-----------+------------------+------------------+------------------+
|summary|comp_months|        sum(Sales)|    sum(Customers)|  avg_purchase_val|
+-------+-----------+------------------+------------------+------------------+
|  count|        354|              1115|              1115|              1115|
|   mean|        0.0| 5267426.567713005| 577615.9237668162|  9.64365919282511|
| stddev|        0.0|1951304.4839653843|304654.53314051765|1.9869552301495719|
|    min|          0|           2114322|            187583|              3.51|
|    max|          0|          19516842|           3206058|             16.16|
+-------+-----------+------------------+------------------+------------------+

Convert the data types of the columns

In [82]:
all_stores_df = all_stores_df.withColumn(
  "CompetitionDistance",
  all_stores_df["CompetitionDistance"].cast( 'int' ) )
In [83]:
all_stores_df.printSchema()
root
|-- Store: string (nullable = true)
|-- StoreType: string (nullable = true)
|-- Assortment: string (nullable = true)
|-- CompetitionDistance: integer (nullable = true)
|-- comp_months: integer (nullable = true)
|-- sum(Sales): long (nullable = true)
|-- sum(Customers): long (nullable = true)
|-- avg_purchase_val: double (nullable = true)

In [84]:
all_stores_stats = all_stores_df.describe()
all_stores_stats.show()
+-------+-------------------+-----------+-----------------+-----------------+-----------------+
|summary|CompetitionDistance|comp_months|       sum(Sales)|   sum(Customers)| avg_purchase_val|
+-------+-------------------+-----------+-----------------+-----------------+-----------------+
|  count|               1112|        354|             1115|             1115|             1115|
|   mean|  5404.901079136691|        0.0|5267426.567713005|577615.9237668162|9.643659192825108|
| stddev|  7663.174720367947|        0.0|1951304.483965384|304654.5331405178|1.986955230149572|
|    min|                 20|          0|          2114322|           187583|             3.51|
|    max|              75860|          0|         19516842|          3206058|            16.16|
+-------+-------------------+-----------+-----------------+-----------------+-----------------+

Note:

  • Some data points missing in CompetitionDistance column. Only 1112 out of 1115 are available.

Describe on a subset of columns

In [85]:
all_stores_df.describe( "CompetitionDistance", "avg_purchase_val" ).show()
+-------+-------------------+------------------+
|summary|CompetitionDistance|  avg_purchase_val|
+-------+-------------------+------------------+
|  count|               1112|              1115|
|   mean|  5404.901079136691| 9.643659192825114|
| stddev|  7663.174720367955|1.9869552301495716|
|    min|                 20|              3.51|
|    max|              75860|             16.16|
+-------+-------------------+------------------+

Finding correlation between variables

In [86]:
all_stores_df.stat.corr( 'CompetitionDistance', 'avg_purchase_val' )
Out[86]:
0.23007011949360373

Note:

  • Surprisingly not very highly correlated. In fact, it is positively correlated, which sound unintuitive. This may be because the stores with low CompetitionDistance are located in crowded places. And Stores with high competition distance are placed in very low population areas. This explains why the sales are low, even when there is no competition.
In [87]:
all_stores_df.stat.corr( 'sum(Sales)', 'sum(Customers)' )
Out[87]:
0.8542650646315247
  • This is expected to be highly correlated. Sales are higher when when there are higher footfalls.

Note:

Here is a nice article from databricks, which is good introduction to applying statistical and mathematical functions in spark.

Running SQL Queries

Registering sales data as a table

In [88]:
retail_df.registerTempTable( "retail" )

Running SQL Queries

In [89]:
avg_sales_custs = sqlContext.sql( """select Store, avg(Sales) as avg_sales,                 
                                 avg(Customers) as avg_custs from retail group by Store""" )
avg_sales_custs.cache()
Out[89]:
DataFrame[Store: int, avg_sales: double, avg_custs: double]
In [90]:
avg_sales_custs.show(10)
+-----+------------------+------------------+
|Store|         avg_sales|         avg_custs|
+-----+------------------+------------------+
|   31| 4879.132696390659|487.75371549893845|
|  231|3914.3984168865436| 323.7203166226913|
|  431| 7876.770700636943| 828.9936305732484|
|  631| 4494.523354564756| 531.3566878980891|
|  831|10493.347133757961|1304.0902335456476|
| 1031|4005.6942675159235| 345.6454352441614|
|   32| 3332.608179419525|390.94063324538257|
|  232| 2742.272823779193|247.06794055201698|
|  432| 9140.733545647558|1293.9564755838642|
|  632| 5849.850318471337|  752.052016985138|
+-----+------------------+------------------+
only showing top 10 rows

Creating Cubes and Rollups

In [91]:
sqlContext.sql( "describe retail").show()
+-------------+---------+-------+
|     col_name|data_type|comment|
+-------------+---------+-------+
|        Store|      int|       |
|    DayOfWeek|      int|       |
|         Date|     date|       |
|        Sales|      int|       |
|    Customers|      int|       |
|         Open|      int|       |
|        Promo|      int|       |
| StateHoliday|   string|       |
|SchoolHoliday|   string|       |
|        month|      int|       |
|         year|      int|       |
+-------------+---------+-------+

In [92]:
sqlContext
Out[92]:
<pyspark.sql.context.HiveContext at 0x7fc054440d30>
In [93]:
sales_rollups = sqlContext.sql( '''select year, month, Store, sum(Sales) as total_sales
            from retail group by year, month, Store with rollup''')
In [94]:
sales_rollups.cache()
Out[94]:
DataFrame[year: int, month: int, Store: int, total_sales: bigint]
In [95]:
sales_rollups.show(10)
+----+-----+-----+-----------+
|year|month|Store|total_sales|
+----+-----+-----+-----------+
|2015|    7|   34|     228254|
|2015|    7|  234|     272817|
|2015|    7|  434|     273987|
|2015|    7|  634|     184245|
|2015|    7|  834|     163256|
|2015|    7| 1034|     169809|
|2015|    6|   71|     237000|
|2015|    6|  271|     219222|
|2015|    6|  471|     177985|
|2015|    6|  671|     178178|
+----+-----+-----+-----------+
only showing top 10 rows

In [96]:
sales_rollups.dropna().registerTempTable( "sales_rollups" )

Top 3 Stores for each month by sales revenue contribution

In [97]:
top3_query = """SELECT
  year,
  month,
  Store,
  total_sales,
  rank
FROM (
  SELECT
    Store,
    year,
    month,
    total_sales,
    rank() OVER (PARTITION BY year, month ORDER BY total_sales DESC) as rank
  FROM sales_rollups) tmp
WHERE
  rank <= 3 ORDER BY year, month, rank"""
In [98]:
top3_store_by_month = sqlContext.sql( top3_query )
top3_store_by_month.cache()
Out[98]:
DataFrame[year: int, month: int, Store: int, total_sales: bigint, rank: int]
In [99]:
top3_store_by_month.show( 30 )
+----+-----+-----+-----------+----+
|year|month|Store|total_sales|rank|
+----+-----+-----+-----------+----+
|2013|    1|  817|     616461|   1|
|2013|    1|  262|     566482|   2|
|2013|    1|  562|     527869|   3|
|2013|    2|  817|     561454|   1|
|2013|    2|  262|     549174|   2|
|2013|    2|  562|     488061|   3|
|2013|    3|  262|     673085|   1|
|2013|    3|  817|     615420|   2|
|2013|    3|  562|     572658|   3|
|2013|    4|  262|     610222|   1|
|2013|    4|  817|     606697|   2|
|2013|    4|  562|     528838|   3|
|2013|    5|  262|     711428|   1|
|2013|    5|  817|     617836|   2|
|2013|    5|  562|     564743|   3|
|2013|    6|  262|     630094|   1|
|2013|    6|  817|     609556|   2|
|2013|    6|  562|     541954|   3|
|2013|    7|  262|     615851|   1|
|2013|    7|  817|     609614|   2|
|2013|    7| 1114|     548603|   3|
|2013|    8|  262|     628556|   1|
|2013|    8|  817|     587789|   2|
|2013|    8| 1114|     559259|   3|
|2013|    9|  262|     600554|   1|
|2013|    9|  817|     507966|   2|
|2013|    9|  562|     505259|   3|
|2013|   10|  262|     624051|   1|
|2013|   10|  817|     556556|   2|
|2013|   10|  562|     552199|   3|
+----+-----+-----+-----------+----+
only showing top 30 rows

Writing dataframes using Parquet Format

In [108]:
retail_df.write.parquet( "retail.parquet" )
In [110]:
stores_limited_df.show( 5 )
+-----+---------+----------+-------------------+-----------+
|Store|StoreType|Assortment|CompetitionDistance|comp_months|
+-----+---------+----------+-------------------+-----------+
|    1|        c|         a|               1270|       87.0|
|    2|        a|         a|                570|       97.0|
|    3|        a|         a|              14130|      108.0|
|    4|        c|         c|                620|       75.0|
|    5|        a|         a|              29910|        8.0|
+-----+---------+----------+-------------------+-----------+
only showing top 5 rows

In [111]:
stores_limited_df.write.parquet( "stores_limited.parquet" )