• RFM is a method used for analyzing customer value. It is commonly used in marketing and has wide spread applications in retail industries.
  • RFM stands for

    • Recency – How recently did the customer purchase?
    • Frequency – How often do they purchase?
    • Monetary Value – How much do they spend?
  • This tutorial will analyzie the retail dataset available in UCI repository to

    • Calculate the RFM values for each customer (by customer id).
    • For Recency calculation, use most recent transaction date as current date. So, the recency should be how many days before he or she has made a purchase from that date.
    • Find top 10 customers based on frequency and monetary value. Sort them based on first frequency and then monetary value.
    • Based on the RFM value of each customer, create customer segments. Develop a k-means algorithm using map reduce approach. Use Euclidean distance for allocations.
    • Find optimal number of segments using dendogram and elbow method.
    • Create final segment of customers and label the customers based on which segment they belong to. Explain each segment intuitively.
    • Use the Spark ML Clustering algorithm and compare your results with your own algorithm. How many wrong allocations to each segment.
In [61]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

Load the dataset

In [62]:
retail_df = sqlContext.read.format("com.databricks.spark.csv")        \
        .options(delimiter=',', header = True, inferSchema = True)  \
        .load('file:///home/hadoop/lab/data/Online Retail.csv')
In [63]:
retail_df.cache()
Out[63]:
DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: int, Country: string]
In [64]:
retail_df.show( 10 )
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/10 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/10 8:26|     4.25|     17850|United Kingdom|
|   536366|    22633|HAND WARMER UNION...|       6|12/1/10 8:28|     1.85|     17850|United Kingdom|
|   536366|    22632|HAND WARMER RED P...|       6|12/1/10 8:28|     1.85|     17850|United Kingdom|
|   536367|    84879|ASSORTED COLOUR B...|      32|12/1/10 8:34|     1.69|     13047|United Kingdom|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
only showing top 10 rows

How many records?

In [65]:
num_recs = retail_df.count()
In [66]:
num_recs
Out[66]:
541909
In [67]:
retail_df.printSchema()
root
|-- InvoiceNo: string (nullable = true)
|-- StockCode: string (nullable = true)
|-- Description: string (nullable = true)
|-- Quantity: integer (nullable = true)
|-- InvoiceDate: string (nullable = true)
|-- UnitPrice: double (nullable = true)
|-- CustomerID: integer (nullable = true)
|-- Country: string (nullable = true)

Find out how many null values in each column?

In [68]:
from pyspark.sql.functions import count

sc.broadcast( num_recs )

retail_df.agg( *[ count(c).alias(c) for c in retail_df.columns ] ).show()
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|   541909|   541909|     541909|  541909|     541909|   541909|    406829| 541909|
+---------+---------+-----------+--------+-----------+---------+----------+-------+

Note:

  • Quite a few customerIDs are not present and are null values. We can drop these records from the analysis.

Drop NA values from dataset

In [69]:
retail_df = retail_df.dropna(how='any')
In [70]:
retail_df.count()
Out[70]:
406829

Note:

  • Final dataset now contains 406829 records.

Adding total price column

  • Each transaction record contains unit price and quantity bought. We can introduce a new column of TotalPrice, which will be required for further analysis.
In [71]:
from pyspark.sql.functions import round

retail_df = retail_df.withColumn('TotalPrice', round( retail_df.Quantity * retail_df.UnitPrice, 2 ) )

Convert all timestamps into dd/MM/yy HH:mm format

In [72]:
from pyspark.sql.functions import to_utc_timestamp, unix_timestamp, lit, datediff, col

retail_df = retail_df.withColumn('NewInvoiceDate',
                               to_utc_timestamp(unix_timestamp(col('InvoiceDate'),
                                          "dd/MM/yy HH:mm").cast('timestamp'), 'UTC' ) )
In [73]:
retail_df.cache()
Out[73]:
DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: int, Country: string, TotalPrice: double, NewInvoiceDate: timestamp]
In [74]:
retail_df.show(5)
+---------+---------+--------------------+--------+------------+---------+----------+--------------+----------+--------------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|TotalPrice|      NewInvoiceDate|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+----------+--------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|      15.3|2010-01-12 08:26:...|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|     20.34|2010-01-12 08:26:...|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|      22.0|2010-01-12 08:26:...|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|     20.34|2010-01-12 08:26:...|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|     20.34|2010-01-12 08:26:...|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+----------+--------------------+
only showing top 5 rows

Calculate RFM attributes for each customer

  • RFM Represents

    • R (Recency)
    • F (Frequency)
    • M (Monetary Value)
  • Detailed description of these attributes are described here

Most recent transaction date

In [302]:
from pyspark.sql.functions import mean, min, max, sum

retail_df.select( max( 'NewInvoiceDate' ) ).show()
+--------------------+
| max(NewInvoiceDate)|
+--------------------+
|2013-07-10 17:13:...|
+--------------------+

For calculating recency, we will assume current date as 30/07/2013. The recency value will be how recently the customer bought with respect to 30/07/2013 date.

Calculate Recency

In [303]:
current_ts = to_utc_timestamp( unix_timestamp(lit('30/07/2013 00:00'), \
                              "dd/MM/yy HH:mm").cast('timestamp'), 'UTC' )

# Calculatre Recency value
retail_df = retail_df.withColumn('DaysBefore', datediff( lit(current_ts), 'NewInvoiceDate' ) )

recency_df = retail_df.groupBy('CustomerID').agg(min('DaysBefore').alias('Recency'))
In [304]:
recency_df.show( 5 )
+----------+-------+
|CustomerID|Recency|
+----------+-------+
|     12431|     25|
|     14031|     27|
|     13831|    235|
|     17231|    141|
|     15831|    394|
+----------+-------+
only showing top 5 rows

Calculate Frequency

  • First group all invoices by the line items and then count the number of invoices by each customer.
In [305]:
frequency_df = retail_df.groupBy('CustomerID', 'InvoiceNo').count()  \
                      .groupBy('CustomerID')                       \
                      .agg(count("*").alias("Frequency"))
In [306]:
frequency_df.show( 5 )
+----------+---------+
|CustomerID|Frequency|
+----------+---------+
|     14031|       23|
|     18231|        6|
|     14431|        2|
|     13631|       12|
|     12431|       18|
+----------+---------+
only showing top 5 rows

In [307]:
monetary_df = retail_df.groupBy('CustomerID').agg( round( sum('TotalPrice'), 2 ).alias('Monetary'))
In [81]:
monetary_df.show( 5 )
+----------+--------+
|CustomerID|Monetary|
+----------+--------+
|     12431| 6416.39|
|     14031|14634.64|
|     13831| 2202.99|
|     17231| 3127.82|
|     15831|  747.76|
+----------+--------+
only showing top 5 rows

Joining RFM values by customer ID

In [82]:
mf_df = monetary_df.join(frequency_df,                              \
      monetary_df.CustomerID == frequency_df.CustomerID, 'inner') \
      .drop(frequency_df.CustomerID)

rfm_df =  mf_df.join(recency_df,                              \
        recency_df.CustomerID == mf_df.CustomerID, 'inner') \
        .drop(recency_df.CustomerID)

rfm_df = rfm_df.select( 'CustomerID', 'Recency', 'Frequency', 'Monetary' )
rfm_df.cache()
Out[82]:
DataFrame[CustomerID: int, Recency: int, Frequency: bigint, Monetary: double]
In [83]:
rfm_df.show()
+----------+-------+---------+--------+
|CustomerID|Recency|Frequency|Monetary|
+----------+-------+---------+--------+
|     12431|     25|       18| 6416.39|
|     12631|    567|        1|   640.5|
|     12831|    300|        1|  215.05|
|     13431|    599|        2|  1269.1|
|     13631|    114|       12| 3021.77|
|     13831|    235|        6| 2202.99|
|     14031|     27|       23|14634.64|
|     14231|    240|        5|  822.06|
|     14431|    544|        2|  397.76|
|     14631|    354|        3| 1006.98|
|     15031|    325|        4|  668.14|
|     15831|    394|        4|  747.76|
|     16031|    721|        2|  548.94|
|     16431|    515|        3|  642.81|
|     17031|    110|        2|  380.35|
|     17231|    141|       14| 3127.82|
|     17431|     59|        1|   403.3|
|     17631|    204|        3|  528.33|
|     17831|    780|        1|    35.4|
|     18231|     25|        6| 2066.17|
+----------+-------+---------+--------+
only showing top 20 rows

Top 10 customers based on frequency

In [84]:
from pyspark.sql.functions import desc

top_frequencies = rfm_df.orderBy(desc('Frequency'))
In [85]:
top_frequencies.show( 10 )
+----------+-------+---------+---------+
|CustomerID|Recency|Frequency| Monetary|
+----------+-------+---------+---------+
|     14911|     20|      248|132572.62|
|     12748|     23|      224|  29072.1|
|     17841|     20|      169| 40340.78|
|     14606|     49|      128| 11713.85|
|     15311|     80|      118| 59419.34|
|     13089|     22|      118| 57385.88|
|     12971|     50|       89| 10930.26|
|     14527|     20|       86|  7711.38|
|     13408|     22|       81| 27487.41|
|     14646|     22|       77|279489.02|
+----------+-------+---------+---------+
only showing top 10 rows

Top10 cutomers by monetary values

In [86]:
top_monetary = rfm_df.orderBy(desc('Monetary'))
In [87]:
top_monetary.show( 10 )
+----------+-------+---------+---------+
|CustomerID|Recency|Frequency| Monetary|
+----------+-------+---------+---------+
|     14646|     22|       77|279489.02|
|     18102|    110|       62|256438.49|
|     17450|     20|       55|187482.17|
|     14911|     20|      248|132572.62|
|     12415|    117|       26|123725.45|
|     14156|     23|       66|113384.14|
|     17511|     88|       46| 88125.38|
|     16684|     84|       31| 65892.08|
|     13694|     22|       60|  62653.1|
|     15311|     80|      118| 59419.34|
+----------+-------+---------+---------+
only showing top 10 rows

Creating Vectors

In [88]:
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.linalg import Vector, DenseVector, Vectors
In [89]:
customer_labels = rfm_df.rdd.map( lambda rec: rec['CustomerID'] )

customer_vecs = rfm_df.rdd.map( lambda rec: Vectors.dense(rec['Recency'], \
                                                rec['Monetary'],        \
                                                rec['Frequency']) )

Standardizing Vectors

In [90]:
standardizer = StandardScaler( True, True )
standardized_model = standardizer.fit( customer_vecs )
customer_data = customer_labels.zip( standardized_model.transform( customer_vecs ) )
customer_data.cache()
Out[90]:
org.apache.spark.api.java.JavaPairRDD@25010869

Clustering using Map Reduce Approach

Randomly selecting cluster centers

  • We will assume there are 3 clusters. And we will take 3 random points from the sample as initial cluster points and then iteratively find the optimal cluster points.
In [91]:
customer_rfm_vals = customer_data.map(lambda rec: rec[1])
customer_rfm_vals.cache()
Out[91]:
PythonRDD[492] at RDD at PythonRDD.scala:43
In [92]:
customer_rfm_vals.take(5)
Out[92]:
[DenseVector([-1.1234, 0.5497, 1.384]),
DenseVector([0.8347, -0.153, -0.4364]),
DenseVector([-0.1299, -0.2048, -0.4364]),
DenseVector([0.9503, -0.0766, -0.3293]),
DenseVector([-0.8019, 0.1367, 0.7415])]

Randomly select some points

In [93]:
num_clusters = 3

# initial cluster centers
cluster_centroid = []

# Randomly select as many points as there are clusters...
cluster_centroids = customer_rfm_vals.sample( False,
                                      0.001,
                                      seed=12345 ).take( num_clusters )

Take top three points as initial clusters centers

In [94]:
cluster_centroids[0:3]
Out[94]:
[DenseVector([-0.596, -0.1053, -0.4364]),
DenseVector([2.1786, -0.2148, -0.4364]),
DenseVector([2.9193, -0.2319, -0.4364])]

Define a function to allocate a point to a cluster center

  • It calculates the euclidean distance between the point and cluster centers and return the label of the cluster to which the point has minimum distance.
In [95]:
from sklearn.metrics.pairwise import euclidean_distances
import numpy as np

def assign_to_cluster( point, cluster_centers ):
  return np.argmin( euclidean_distances( np.array(point).reshape(1, -1),
                                        cluster_centers ) )

Testing the function

  • Checking if the third cluster point is allocated to third cluster or not
In [96]:
assert( assign_to_cluster( cluster_centroids[2], cluster_centroids ) == 2 )

Define a function to calculate shift in cluster centers

  • Function which returns the distance to the nearest cluster
In [97]:
def get_nearest_cluster_dist( point, cluster_centers ):
  return np.argmin( [sqrt(sum([x**2 for x in (point - center)] ) )
                     for center in cluster_centers] )
  • Returns the distances to the nearest cluster point for all segments
In [98]:
def clusters_converged( new_cluster_centers, old_cluster_centers ):
  dist_list = [ get_nearest_cluster_dist(new_cluster_centre, old_cluster_centers)
               for i, new_cluster_centre in enumerate(new_cluster_centers)]

One iteration - step by step

Broadcase the initial cluster centers

In [99]:
broadcasted_cluster_centers = sc.broadcast(cluster_centroids)
print('cluster centroid: ', broadcasted_cluster_centers.value)
cluster centroid:  [DenseVector([-0.596, -0.1053, -0.4364]), DenseVector([2.1786, -0.2148, -0.4364]), DenseVector([2.9193, -0.2319, -0.4364])]

Assign the points to the cluster centers

In [120]:
mark_nearest_cluster = customer_rfm_vals.map( lambda point:
                  ( assign_to_cluster( point, broadcasted_cluster_centers.value ), point ) )
In [121]:
mark_nearest_cluster.take( 10 )
Out[121]:
[(0, DenseVector([-1.1234, 0.5497, 1.384])),
(1, DenseVector([0.8347, -0.153, -0.4364])),
(0, DenseVector([-0.1299, -0.2048, -0.4364])),
(1, DenseVector([0.9503, -0.0766, -0.3293])),
(0, DenseVector([-0.8019, 0.1367, 0.7415])),
(0, DenseVector([-0.3647, 0.0371, 0.099])),
(0, DenseVector([-1.1162, 1.5495, 1.9194])),
(0, DenseVector([-0.3467, -0.131, -0.0081])),
(0, DenseVector([0.7516, -0.1826, -0.3293])),
(0, DenseVector([0.0652, -0.1085, -0.2222]))]

Group the cluster points and calculate the mean - new cluster centers

In [122]:
cluster_groups = mark_nearest_cluster.groupByKey()
cluster_groups.cache()
cluster_groups.take( 3 )
Out[122]:
[(0, <pyspark.resultiterable.ResultIterable at 0x7f47c0a70b00>),
(1, <pyspark.resultiterable.ResultIterable at 0x7f47c0cb8208>),
(2, <pyspark.resultiterable.ResultIterable at 0x7f47c0cb8e48>)]

Define a function to calculate the mean

In [123]:
def calculateMean( vecs ):
  return Vectors.dense( np.array( [vec.toArray()
                                   for vec in vecs] ).mean( axis = 0) )
In [124]:
from pyspark.mllib.stat import MultivariateStatisticalSummary, Statistics

Calculate the new cluster centers

In [125]:
new_cluster_centers = cluster_groups.mapValues( lambda rec: calculateMean( rec ) )
In [126]:
new_cluster_centers
Out[126]:
PythonRDD[776] at RDD at PythonRDD.scala:43
In [127]:
new_cluster_centers = new_cluster_centers.sortBy( lambda rec: rec[0], ascending = True )
In [128]:
new_cluster_centers.take(3)
Out[128]:
[(0, DenseVector([-0.4396, 0.0476, 0.1016])),
(1, DenseVector([1.4605, -0.1692, -0.3628])),
(2, DenseVector([3.0746, -0.2045, -0.4223]))]
In [129]:
new_cluster_centers = [i[1] for i in new_cluster_centers.take(3)]
In [130]:
new_cluster_centers
Out[130]:
[DenseVector([-0.4396, 0.0476, 0.1016]),
DenseVector([1.4605, -0.1692, -0.3628]),
DenseVector([3.0746, -0.2045, -0.4223])]
In [131]:
[(a, b) for a, b in zip( cluster_centroids, new_cluster_centers ) ]
Out[131]:
[(DenseVector([-0.9904, 3.097, 4.1061]),
DenseVector([-0.4396, 0.0476, 0.1016])),
(DenseVector([-0.5347, -0.0463, -0.0018]),
DenseVector([1.4605, -0.1692, -0.3628])),
(DenseVector([1.3221, -0.1674, -0.3594]),
DenseVector([3.0746, -0.2045, -0.4223]))]

Calculating Distance betweeb old cluster centers and new cluster centers

In [132]:
dist_list = [ euclidean_distances(a, b )
               for a, b in zip( cluster_centroids, new_cluster_centers ) ]
In [133]:
dist_list
Out[133]:
[array([[ 5.06338354]]), array([[ 2.0312989]]), array([[ 1.75399413]])]
In [134]:
[a[0][0] for a in dist_list]
Out[134]:
[5.06338354159909, 2.0312988959930403, 1.7539941283993663]

Running multpiple iterations to converge to final clusters

  • The above steps will be run in multiple iterations to find the actual or most optimal clusters
  • To do this the we will run at least 100 iterations or until the distnace between new and old ceters fall below 0.01
In [135]:
num_iterations = 100
epsilon = 0.01
num_clusters = 3

for i in range(num_iterations):
  # broadcasting centroids
  # if you want to destroy, use destroy method on broadcast variable
  broadcasted_cluster_centroids = sc.broadcast(cluster_centroids)

  # Assigning point to nearest cluster
  assign_to_nearest_cluster = customer_rfm_vals.map( lambda point:
                                        ( assign_to_cluster( point,
                                          broadcasted_cluster_centroids.value), point ) )

  # Re-calculating centroid by aggregating (Taking column mean)
  new_centroids = assign_to_nearest_cluster.groupByKey( numPartitions
                                                       = num_clusters ).mapValues(
                                                  lambda rec: calculateMean( rec ) )


  new_centroids = new_centroids.sortBy( lambda rec: rec[0],
                                       ascending = True ).take( num_clusters )

  previous_cluster_centroid = [i for i in cluster_centroids]
  cluster_centroids = [i[1] for i in new_centroids]


  # Check distance between previous and new centroid. Need this distance to calculate Convergence
  dist_list = [ euclidean_distances(a, b )
               for a, b in zip( cluster_centroids, previous_cluster_centroid ) ]

  dist_list = [a[0][0] for a in dist_list]
  if np.max(dist_list) <= epsilon:
      print("The Clusters Converged\n")
      break
  else:
      print('Not Converged yet.. going for next iteration: ', dist_list)
Not Converged yet.. going for next iteration:  [1.2857358964872079, 0.030741145950249748, 0.0072124401975714588]
Not Converged yet.. going for next iteration:  [1.1302031087315294, 0.017274863723053074, 0.0044047809692650819]
Not Converged yet.. going for next iteration:  [1.7411155003956216, 0.021597626369677082, 0.0044374840521173237]
Not Converged yet.. going for next iteration:  [1.2127585187830106, 0.014208636646654237, 0.011103794974810755]
Not Converged yet.. going for next iteration:  [1.6766891693566788, 0.014450228282898357, 0.005143991127110027]
Not Converged yet.. going for next iteration:  [0.25713306265251407, 0.006223045200706389, 0.010715011047713789]
Not Converged yet.. going for next iteration:  [0.0, 0.005333706864219202, 0.011592632314832074]
The Clusters Converged

New centroids:  []

Final clusters centers

In [136]:
cluster_centroids
Out[136]:
[DenseVector([-1.0528, 9.0114, 8.3813]),
DenseVector([-0.572, -0.0016, 0.0915]),
DenseVector([1.2612, -0.1675, -0.3574])]

Assigning customers to clusters

In [186]:
customer_cluster_labels = customer_data.map( lambda rec:
                                          ( int(rec[0]),
                                           int(assign_to_cluster( rec[1], cluster_centroids ) ) ) )
In [187]:
customer_cluster_labels.cache()
customer_cluster_labels.take( 5 )
Out[187]:
[(12431, 1), (12631, 2), (12831, 1), (13431, 2), (13631, 1)]
In [188]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

customer_schema = StructType( [ StructField('CustomerID', IntegerType(), True),
                    StructField('ClusterLabel', IntegerType(), True) ] )
In [189]:
customer_clusters_df = sqlContext.createDataFrame( customer_cluster_labels, customer_schema )
In [190]:
customer_clusters_df.show( 5 )
+----------+------------+
|CustomerID|ClusterLabel|
+----------+------------+
|     12431|           1|
|     12631|           2|
|     12831|           1|
|     13431|           2|
|     13631|           1|
+----------+------------+
only showing top 5 rows

In [195]:
customer_rfm_cluster_df = rfm_df.join( customer_clusters_df,
                                    on = 'CustomerID',
                                    how = 'inner' )

Customers with their RFM values and the cluster labels

In [196]:
customer_rfm_cluster_df.show(5)
+----------+-------+---------+--------+------------+
|CustomerID|Recency|Frequency|Monetary|ClusterLabel|
+----------+-------+---------+--------+------------+
|     12431|     25|       18| 6416.39|           1|
|     12631|    567|        1|   640.5|           2|
|     12831|    300|        1|  215.05|           1|
|     13431|    599|        2|  1269.1|           2|
|     13631|    114|       12| 3021.77|           1|
+----------+-------+---------+--------+------------+
only showing top 5 rows

Find Cluster Characteristics

In [199]:
customer_cluster_centers = customer_rfm_cluster_df.groupBy( 'ClusterLabel'
                                                        ).agg( { 'Recency': 'mean',
                                                      'Frequency': 'mean',
                                                      'Monetary': 'mean'} )
In [200]:
customer_cluster_centers.show()
+------------+------------------+------------------+-----------------+
|ClusterLabel|     avg(Monetary)|    avg(Frequency)|     avg(Recency)|
+------------+------------------+------------------+-----------------+
|           0|  75966.3873076923| 83.34615384615384|44.53846153846154|
|           1|1886.7595827725454|5.9320323014804845|177.3741588156124|
|           2| 522.1908733624449|1.7416302765647744|684.4985443959243|
+------------+------------------+------------------+-----------------+

Note:

  • Customers in cluster 0 seem to be high valued customers with very high monetary value, very frequent buys and have bought recently.
  • Customers in cluster 1 seems to have bought six months back at an average and most likely churn. These customers may need intervention to be retained.
  • Customers in cluster 2 seems to have not bought for 2 years and most likely have churned. There must be a different starategy to bring them back.

Finding optimal number of clusters

  • To find optimal number of clusters, we can use dendogram or elbow analysis.
  • For creating dendogram, we can not use all the data as the datasize is very large. We can randomly sample multiple samples and create multiple dendograms. This is give us an approximate estimate on number of clusters. Number of clusters also depends on business convenience. A large number of clusters may not be feasible to manage as several staragies need to be created to deal with different clusters or segments. Dendograms will only give initial indication of how many clusters may existing in the dataset.
In [201]:
only_rfm_vals_df = customer_rfm_cluster_df.select( ['Recency', 'Frequency', 'Monetary'] )
In [202]:
only_rfm_vals_df.show(5)
+-------+---------+--------+
|Recency|Frequency|Monetary|
+-------+---------+--------+
|     25|       18| 6416.39|
|    567|        1|   640.5|
|    300|        1|  215.05|
|    599|        2|  1269.1|
|    114|       12| 3021.77|
+-------+---------+--------+
only showing top 5 rows

In [206]:
customer_data.take(5)
Out[206]:
[(12431, DenseVector([-1.1234, 0.5497, 1.384])),
(12631, DenseVector([0.8347, -0.153, -0.4364])),
(12831, DenseVector([-0.1299, -0.2048, -0.4364])),
(13431, DenseVector([0.9503, -0.0766, -0.3293])),
(13631, DenseVector([-0.8019, 0.1367, 0.7415]))]
In [215]:
customer_df = sqlContext.createDataFrame(
  customer_data.map( lambda rec:
                    ( float(rec[1][0]), float(rec[1][1]), float(rec[1][2]) ) ),
  ['Recency', 'Monetary', 'Frequency'] )
In [216]:
customer_df.show(5)
+--------------------+--------------------+-------------------+
|             Recency|            Monetary|          Frequency|
+--------------------+--------------------+-------------------+
| -1.1234212007794546|  0.5496703474612289| 1.3839661526847584|
|  0.8346931557931577|-0.15304865275331805|-0.4364051411980503|
|-0.12991299403135792| -0.2048106839972093|-0.4364051411980503|
|  0.9503013834874816|-0.07657054054292899|-0.3293244768520027|
|  -0.801885817504616| 0.13666664197024736|  0.741482166608473|
+--------------------+--------------------+-------------------+
only showing top 5 rows

In [219]:
import matplotlib.pyplot as plt
import seaborn as sn
%matplotlib inline

One sample

In [223]:
cmap = sn.cubehelix_palette(as_cmap=True, rot=-.3, light=1)
sn.clustermap( customer_df.sample( False, fraction = 0.01 ).toPandas(), cmap=cmap, linewidths=.5)
Out[223]:
<seaborn.matrix.ClusterGrid at 0x7f47ba9c4908>

One more sample

In [292]:
cmap = sn.cubehelix_palette(as_cmap=True, rot=-.3, light=1)
sn.clustermap( customer_df.sample( False, fraction = 0.01 ).toPandas(), cmap=cmap, linewidths=.5)
Out[292]:
<seaborn.matrix.ClusterGrid at 0x7f47b637f438>

Note:

From dendograms, we can infer that there are about 3/4 clusters.

Elbow Analysis

For elbow analysis also, we will take multiple samples and create clusters and compute the sum of squared errors to find out optimal number of clusters that can be created. For details about the technique, refer to this blog.

In [295]:
from sklearn.cluster import KMeans

Creating elbow analysis

In [299]:
## Takes two parameters: List of k values and fraction of sample that need to be taken for this analysis
def elbow_analysis( num_clusters_list, sampleFraction ):
  X_scaled = customer_df.sample( False, fraction = sampleFraction ).toPandas()
  cluster_errors = []

  for num_clusters in cluster_range:
    clusters = KMeans( num_clusters )
    clusters.fit( X_scaled )
    cluster_errors.append( clusters.inertia_ )
  clusters_df = pd.DataFrame( { "num_clusters":cluster_range, "cluster_errors": cluster_errors } )
  plt.figure(figsize=(10,6))
  plt.plot( clusters_df.num_clusters, clusters_df.cluster_errors, marker = "o" )
  plt.show()

Cluster range from 1 to 10 and sample fraction 0.01

In [300]:
elbow_analysis( range(0, 10), 0.01 )

Cluster range 1 to 10 and sample fraction 0.1

In [301]:
elbow_analysis( range(0, 10), 0.1 )

Note:

It can be observed that the marginal gain in explained variance drops significantly after 4 clusters. So, 4 clusters could be optimal. But we still need to make sense of those clusters from business point of view. Creating additional clusters without effective strategy to deal with them will be an overhead to business without any additional benefits.

Using Spark MLIB KMeans Algorithm

In [262]:
from pyspark.mllib.clustering import KMeans, KMeansModel

For Num Cluster = 3

In [263]:
customer_rfm_vals.take( 5 )
Out[263]:
[DenseVector([-1.1234, 0.5497, 1.384]),
DenseVector([0.8347, -0.153, -0.4364]),
DenseVector([-0.1299, -0.2048, -0.4364]),
DenseVector([0.9503, -0.0766, -0.3293]),
DenseVector([-0.8019, 0.1367, 0.7415])]
In [279]:
numClusters = 3
numIterations = 50

clusters = KMeans.train(customer_rfm_vals,
                      numClusters,
                      maxIterations = numIterations,
                      initializationMode="random",
                      epsilon=1e-4)

Assign customers to clusters

In [280]:
cust_cluser_labels = clusters.predict( customer_rfm_vals )

Map customers ID to Clusters

In [281]:
customer_ml_labels = customer_labels.zip( cust_cluser_labels )
In [282]:
customer_ml_labels.take( 5 )
Out[282]:
[(12431, 0), (12631, 1), (12831, 0), (13431, 1), (13631, 0)]
In [283]:
customer_ml_clusters_df = sqlContext.createDataFrame(
  customer_ml_labels.map( lambda rec: ( int(rec[0]), int(rec[1]) ) ), customer_schema )
In [284]:
customer_ml_clusters_df.show(5)
+----------+------------+
|CustomerID|ClusterLabel|
+----------+------------+
|     12431|           0|
|     12631|           1|
|     12831|           0|
|     13431|           1|
|     13631|           0|
+----------+------------+
only showing top 5 rows

In [285]:
customer_rfm_cluster_ml_df = rfm_df.join( customer_ml_clusters_df,
                                    on = 'CustomerID',
                                    how = 'inner' )
In [286]:
customer_rfm_cluster_ml_df.show(5)
+----------+-------+---------+--------+------------+
|CustomerID|Recency|Frequency|Monetary|ClusterLabel|
+----------+-------+---------+--------+------------+
|     12431|     25|       18| 6416.39|           0|
|     12631|    567|        1|   640.5|           1|
|     12831|    300|        1|  215.05|           0|
|     13431|    599|        2|  1269.1|           1|
|     13631|    114|       12| 3021.77|           0|
+----------+-------+---------+--------+------------+
only showing top 5 rows

Calculate the cluster centers

In [287]:
customer_ml_cluster_centers = customer_rfm_cluster_ml_df.groupBy( 'ClusterLabel'
                                                        ).agg( { 'Recency': 'mean',
                                                      'Frequency': 'mean',
                                                      'Monetary': 'mean'} )
In [288]:
customer_ml_cluster_centers.show()
+------------+------------------+------------------+------------------+
|ClusterLabel|     avg(Monetary)|    avg(Frequency)|      avg(Recency)|
+------------+------------------+------------------+------------------+
|           0|1887.3142140693385| 5.933019185459441|177.29182093571188|
|           1| 521.9848799999995|1.7425454545454546| 684.3076363636363|
|           2|  75966.3873076923| 83.34615384615384| 44.53846153846154|
+------------+------------------+------------------+------------------+

What we got from our clusetering algorithm

In [289]:
customer_cluster_centers.show()
+------------+------------------+------------------+-----------------+
|ClusterLabel|     avg(Monetary)|    avg(Frequency)|     avg(Recency)|
+------------+------------------+------------------+-----------------+
|           0|  75966.3873076923| 83.34615384615384|44.53846153846154|
|           1|1886.7595827725454|5.9320323014804845|177.3741588156124|
|           2| 522.1908733624449|1.7416302765647744|684.4985443959243|
+------------+------------------+------------------+-----------------+

Note:

It can be observed that our algorithm and spark's mllib kmean() library have found the same clusters. They are only rearranged, but the property of each cluster remains same.

For Num Clusters = 4

In [290]:
numClusters = 4
numIterations = 50

clusters = KMeans.train(customer_rfm_vals,
                      numClusters,
                      maxIterations = numIterations,
                      initializationMode="random",
                      epsilon=1e-4)

cust_cluser_labels = clusters.predict( customer_rfm_vals )
customer_ml_labels = customer_labels.zip( cust_cluser_labels )

customer_ml_clusters_df = sqlContext.createDataFrame(
  customer_ml_labels.map( lambda rec: ( int(rec[0]), int(rec[1]) ) ), customer_schema )

customer_rfm_cluster_ml_df = rfm_df.join( customer_ml_clusters_df,
                                        on = 'CustomerID',
                                        how = 'inner' )

customer_ml_cluster_centers = customer_rfm_cluster_ml_df.groupBy(
  'ClusterLabel').agg( { 'Recency': 'mean',
                        'Frequency': 'mean',
                        'Monetary': 'mean'} )

customer_ml_cluster_centers.show()
+------------+------------------+------------------+------------------+
|ClusterLabel|     avg(Monetary)|    avg(Frequency)|      avg(Recency)|
+------------+------------------+------------------+------------------+
|           0| 524.2829606784885|1.7316885119506553| 699.9622205088666|
|           1|1358.2928222996527| 4.599303135888502|190.35087108013937|
|           2| 12135.74762886598|28.530927835051546| 72.74226804123711|
|           3|124312.30636363638| 109.9090909090909| 49.54545454545455|
+------------+------------------+------------------+------------------+

Note:

It can be observed that the cluster 0 and 1 have characteristics very similar to earlier segments we created. But the high value segements has been split into two more segments now. Cluster 3 is realy high valued customers and cluster 2 is potential customers with good frequency and have bought 2/3 months back. There can be a stragegy to convert these customers into high valued customers.