Customer Churn Overview

  • Companies invest significantly to acquire new customers. So, after acquisition, companies would like these customers to stay long and be loyal. Companies also make investments to keep engaging with these customers continuously and ensure they are happpy and satisfied with their offferings. Loosing customers mean loss of investment and loss of possible future revenue.
  • So, it is important for companies to infer early signs of a customer about to churn and engage or offer incentives to retain them. But as targetting each customer is not possibe, comapnies can take advantage of analytics to predict if a customer high probability of churing. A possible intervention can be made to retain the customer.

Things to learn

  • Building Ensemble models like random forest and boosted trees
In [1]:
sc
Out[1]:
<pyspark.context.SparkContext at 0x7f1dcc2ba400>
In [2]:
from pyspark.sql import SQLContext
sql_ctx = SQLContext(sc)

Read the data from csv file

In [3]:
churn_df = sql_ctx.read.format("com.databricks.spark.csv")      \
 .options( delimiter=',',
          header  = True ,
          inferSchema = True)                                    \
  .load('file:///home/hadoop/lab/data/churn.csv')
In [4]:
churn_df.show( 2 )
+-----+--------------+---------+--------+----------+----------+-------------+--------+---------+----------+--------+---------+----------+----------+-----------+------------+---------+----------+-----------+--------------+------+
|State|Account Length|Area Code|   Phone|Int'l Plan|VMail Plan|VMail Message|Day Mins|Day Calls|Day Charge|Eve Mins|Eve Calls|Eve Charge|Night Mins|Night Calls|Night Charge|Intl Mins|Intl Calls|Intl Charge|CustServ Calls|Churn?|
+-----+--------------+---------+--------+----------+----------+-------------+--------+---------+----------+--------+---------+----------+----------+-----------+------------+---------+----------+-----------+--------------+------+
|   KS|           128|      415|382-4657|        no|       yes|           25|   265.1|      110|     45.07|   197.4|       99|     16.78|     244.7|         91|       11.01|     10.0|         3|        2.7|             1|False.|
|   OH|           107|      415|371-7191|        no|       yes|           26|   161.6|      123|     27.47|   195.5|      103|     16.62|     254.4|        103|       11.45|     13.7|         3|        3.7|             1|False.|
+-----+--------------+---------+--------+----------+----------+-------------+--------+---------+----------+--------+---------+----------+----------+-----------+------------+---------+----------+-----------+--------------+------+
only showing top 2 rows

In [5]:
import matplotlib.pyplot as plt
import seaborn as sn
%matplotlib inline
:0: FutureWarning: IPython widgets are experimental and may change in the future.
In [6]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

Encoding the categorical variable

  • The following utility function is created to create categorical variables.
In [7]:
def create_category_vars( dataset, field_name ):
  idx_col = field_name + "Index"
  col_vec = field_name + "Vec"

  month_stringIndexer = StringIndexer( inputCol=field_name,
                                       outputCol=idx_col )

  month_model = month_stringIndexer.fit( dataset )
  month_indexed = month_model.transform( dataset )

  month_encoder = OneHotEncoder( dropLast=True,
                                 inputCol=idx_col,
                                 outputCol= col_vec )

  return month_encoder.transform( month_indexed )

Select only a subset of columns

  • Some columns are highly corelated e.g. Call minutes, calls and charges. So, only one of the variable can be used. This is applicable across day, eve, night and international calls.
In [8]:
churn_df.columns
Out[8]:
['State',
'Account Length',
'Area Code',
'Phone',
"Int'l Plan",
'VMail Plan',
'VMail Message',
'Day Mins',
'Day Calls',
'Day Charge',
'Eve Mins',
'Eve Calls',
'Eve Charge',
'Night Mins',
'Night Calls',
'Night Charge',
'Intl Mins',
'Intl Calls',
'Intl Charge',
'CustServ Calls',
'Churn?']
In [9]:
churn_df_subset = churn_df.select('Account Length',
                                "Int'l Plan",
                                'VMail Plan',
                                'VMail Message',
                                'Day Mins',
                                'Eve Mins',
                                'Night Mins',
                                'Intl Mins',
                                'CustServ Calls',
                                'Churn?')
In [10]:
churn_df_subset.columns
Out[10]:
['Account Length',
"Int'l Plan",
'VMail Plan',
'VMail Message',
'Day Mins',
'Eve Mins',
'Night Mins',
'Intl Mins',
'CustServ Calls',
'Churn?']

Transform Target Variable

  • 'Churn?' variable is converted into integer
  • True. will be 1 and False. will be 1
In [11]:
churn_df_subset = churn_df_subset                       \
  .withColumn( "Churn",
              ( churn_df_subset['Churn?']
               == 'True.').cast('integer') )

churn_df_subset = churn_df_subset.drop( 'Churn?' )
In [12]:
type( churn_df_subset )
Out[12]:
pyspark.sql.dataframe.DataFrame
In [13]:
churn_df_subset.show( 3 )
+--------------+----------+----------+-------------+--------+--------+----------+---------+--------------+-----+
|Account Length|Int'l Plan|VMail Plan|VMail Message|Day Mins|Eve Mins|Night Mins|Intl Mins|CustServ Calls|Churn|
+--------------+----------+----------+-------------+--------+--------+----------+---------+--------------+-----+
|           128|        no|       yes|           25|   265.1|   197.4|     244.7|     10.0|             1|    0|
|           107|        no|       yes|           26|   161.6|   195.5|     254.4|     13.7|             1|    0|
|           137|        no|        no|            0|   243.4|   121.2|     162.6|     12.2|             0|    0|
+--------------+----------+----------+-------------+--------+--------+----------+---------+--------------+-----+
only showing top 3 rows

Create categorical Variables

In [14]:
churn_vec = create_category_vars( churn_df_subset,
                               "Int'l Plan" )
In [15]:
churn_vec.show( 2 )
+--------------+----------+----------+-------------+--------+--------+----------+---------+--------------+-----+---------------+-------------+
|Account Length|Int'l Plan|VMail Plan|VMail Message|Day Mins|Eve Mins|Night Mins|Intl Mins|CustServ Calls|Churn|Int'l PlanIndex|Int'l PlanVec|
+--------------+----------+----------+-------------+--------+--------+----------+---------+--------------+-----+---------------+-------------+
|           128|        no|       yes|           25|   265.1|   197.4|     244.7|     10.0|             1|    0|            0.0|(1,[0],[1.0])|
|           107|        no|       yes|           26|   161.6|   195.5|     254.4|     13.7|             1|    0|            0.0|(1,[0],[1.0])|
+--------------+----------+----------+-------------+--------+--------+----------+---------+--------------+-----+---------------+-------------+
only showing top 2 rows

In [16]:
churn_vec = create_category_vars( churn_vec,
                               "VMail Plan" )
In [17]:
churn_vec.select( "Int'l PlanVec",
               "VMail PlanVec" ).show( 5 )
+-------------+-------------+
|Int'l PlanVec|VMail PlanVec|
+-------------+-------------+
|(1,[0],[1.0])|    (1,[],[])|
|(1,[0],[1.0])|    (1,[],[])|
|(1,[0],[1.0])|(1,[0],[1.0])|
|    (1,[],[])|(1,[0],[1.0])|
|    (1,[],[])|(1,[0],[1.0])|
+-------------+-------------+
only showing top 5 rows

In [18]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

Creating the vectors

In [19]:
def parseNewPoint( rec ):
  return LabeledPoint( float( rec[9] ),
          Vectors.dense(tuple( [rec[0],
                                rec[3],
                                rec[4],
                                rec[5],
                                rec[6],
                                rec[7],
                                rec[8] ] +
                                rec[11].toArray().tolist() +
                                rec[13].toArray().tolist() ) ) )
In [20]:
churn_vec_new = churn_vec.map( lambda rec:
                            parseNewPoint( rec ) )
In [21]:
churn_vec_new.take( 5 )
Out[21]:
[LabeledPoint(0.0, [128.0,25.0,265.1,197.4,244.7,10.0,1.0,1.0,0.0]),
LabeledPoint(0.0, [107.0,26.0,161.6,195.5,254.4,13.7,1.0,1.0,0.0]),
LabeledPoint(0.0, [137.0,0.0,243.4,121.2,162.6,12.2,0.0,1.0,1.0]),
LabeledPoint(0.0, [84.0,0.0,299.4,61.9,196.9,6.6,2.0,0.0,1.0]),
LabeledPoint(0.0, [75.0,0.0,166.7,148.3,186.9,10.1,3.0,0.0,1.0])]

Split dataset into train and test

In [22]:
trainingData, testData = churn_vec_new.randomSplit([0.7, 0.3])

Building the model and making predictions

Logistic Regression Model

In [23]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS,    \
                                       LogisticRegressionModel
from pyspark.mllib.evaluation import BinaryClassificationMetrics
In [24]:
## Training the model
lr_model = LogisticRegressionWithLBFGS.train( trainingData )

Performance Metrics of the Model

In [25]:
labelsAndPreds_lr = testData.map(lambda lp:
                               ( float(lr_model.predict(lp.features) ), lp.label ) )

success_count_lr = labelsAndPreds_lr.filter(lambda rec:
                                            rec[0] == rec[1]).count()
print("Successful prediction percentage: " +
    str( round( success_count_lr / labelsAndPreds_lr.count(), 2 ) ) )
Successful prediction percentage: 0.85
In [26]:
from pyspark.mllib.evaluation import MulticlassMetrics

metrics = MulticlassMetrics( labelsAndPreds_lr )
In [27]:
def printMetric( metrics ):
  print( 'Precision of True ', metrics.precision(1) )

  print( 'Precision of False', metrics.precision(0) )
  print( 'Recall of True    ', metrics.recall(1) )
  print( 'Recall of False   ', metrics.recall(0) )
  print( 'F-1 Score         ', metrics.fMeasure() )
  print( 'Confusion Matrix\n', metrics.confusionMatrix().toArray() )
In [28]:
printMetric( metrics )
Precision of True  0.4444444444444444
Precision of False 0.8710691823899371
Recall of True     0.16326530612244897
Recall of False    0.9651567944250871
F-1 Score          0.8482142857142857
Confusion Matrix
[[ 831.   30.]
[ 123.   24.]]

Create Confusion matrix and check ROC score

In [29]:
labelsAndPreds_lr_df = labelsAndPreds_lr.toDF().toPandas()
In [30]:
labelsAndPreds_lr_df.head( 5 )
Out[30]:
_1 _2
0 1 0
1 0 0
2 0 0
3 0 0
4 0 0
In [31]:
from sklearn import metrics as smetrics
import matplotlib as plt
import seaborn as sn
%matplotlib inline
In [32]:
cm = smetrics.confusion_matrix( labelsAndPreds_lr_df._2, labelsAndPreds_lr_df._1 )
sn.heatmap(cm, annot=True, fmt='.2f' )
Out[32]:
<matplotlib.axes._subplots.AxesSubplot at 0x7f1d9a363eb8>

Decision Tree Model

In [33]:
from pyspark.mllib.tree import DecisionTree

Build model, make prediction and check accuracy

In [34]:
tree_model = DecisionTree.trainClassifier(trainingData,
                                        numClasses=2,
                                        categoricalFeaturesInfo={7:2, 8:2},
                                        impurity='gini',
                                        maxDepth=4,
                                        maxBins=32)
In [35]:
print( tree_model.toDebugString() )
DecisionTreeModel classifier of depth 4 with 29 nodes
If (feature 2 <= 265.2)
 If (feature 6 <= 3.0)
  If (feature 7 in {0.0})
   If (feature 5 <= 12.9)
    Predict: 0.0
   Else (feature 5 > 12.9)
    Predict: 1.0
  Else (feature 7 not in {0.0})
   If (feature 2 <= 221.1)
    Predict: 0.0
   Else (feature 2 > 221.1)
    Predict: 0.0
 Else (feature 6 > 3.0)
  If (feature 2 <= 160.7)
   If (feature 3 <= 266.9)
    Predict: 1.0
   Else (feature 3 > 266.9)
    Predict: 0.0
  Else (feature 2 > 160.7)
   If (feature 3 <= 142.3)
    Predict: 1.0
   Else (feature 3 > 142.3)
    Predict: 0.0
Else (feature 2 > 265.2)
 If (feature 1 <= 0.0)
  If (feature 3 <= 142.3)
   Predict: 0.0
  Else (feature 3 > 142.3)
   If (feature 4 <= 156.7)
    Predict: 1.0
   Else (feature 4 > 156.7)
    Predict: 1.0
 Else (feature 1 > 0.0)
  If (feature 7 in {1.0})
   If (feature 3 <= 259.4)
    Predict: 0.0
   Else (feature 3 > 259.4)
    Predict: 0.0
  Else (feature 7 not in {1.0})
   If (feature 5 <= 8.1)
    Predict: 0.0
   Else (feature 5 > 8.1)
    Predict: 1.0

In [36]:
tree_predictions = tree_model.predict( testData.map(lambda lp: lp.features ) )
labelsAndPreds_tree = tree_predictions.zip( testData.map( lambda lp: lp.label ) )
In [37]:
success_count_tree = labelsAndPreds_tree.filter(lambda rec:
                                            rec[0] == rec[1]).count()

print("Successful prediction percentage: " +
    str( round( success_count_tree / labelsAndPreds_tree.count(), 2 ) ) )
Successful prediction percentage: 0.91
In [38]:
tmetrics = MulticlassMetrics( labelsAndPreds_tree )
In [39]:
printMetric( tmetrics )
Precision of True  0.8426966292134831
Precision of False 0.9216539717083787
Recall of True     0.5102040816326531
Recall of False    0.983739837398374
F-1 Score          0.9146825396825397
Confusion Matrix
[[ 847.   14.]
[  72.   75.]]
In [40]:
labelsAndPreds_tree_df = labelsAndPreds_tree.toDF().toPandas()
cm_tree = smetrics.confusion_matrix( labelsAndPreds_tree_df._2, labelsAndPreds_tree_df._1 )
sn.heatmap(cm_tree, annot=True, fmt='.2f' )
Out[40]:
<matplotlib.axes._subplots.AxesSubplot at 0x7f1d9a28ff98>

Under Sampling to overcome unbalanced representation of classes

  • There are roughly 5-6 times as many no-churn samples as there are churn samples. Both class types can be made to represent equally by using techniques of undersampling or downsampling.
  • The DataFrames sampleBy() function will be used with fractions of each sample type to be selected.
  • All instances of the Churn=True class will be in the new dataset, but the Churn=False class will be undersampled to a fraction of 500/2850.
In [41]:
churn_vec.groupby('Churn').count().toPandas()
Out[41]:
Churn count
0 0 2850
1 1 483
In [42]:
undersample_churn_vec = churn_vec.sampleBy('Churn', fractions={0: 500.0/2850.0, 1: 1.0}).cache()

undersample_churn_vec.groupby('Churn').count().toPandas()
Out[42]:
Churn count
0 0 510
1 1 483

Logistic Regression with undersampled dataset

In [43]:
churn_vec_new = undersample_churn_vec.map( lambda rec:
                            parseNewPoint( rec ) )

trainingData, testData = churn_vec_new.randomSplit([0.7, 0.3])

lr_model = LogisticRegressionWithLBFGS.train( trainingData )

labelsAndPreds_lr = testData.map(lambda lp:
                               ( float(lr_model.predict(lp.features) ), lp.label ) )

success_count_lr = labelsAndPreds_lr.filter(lambda rec:
                                            rec[0] == rec[1]).count()
print("Successful prediction percentage: " +
    str( round( success_count_lr / labelsAndPreds_lr.count(), 2 ) ) )

metrics = MulticlassMetrics( labelsAndPreds_lr )
Successful prediction percentage: 0.75
In [44]:
printMetric( metrics )
Precision of True  0.7886178861788617
Precision of False 0.7247191011235955
Recall of True     0.6643835616438356
Recall of False    0.832258064516129
F-1 Score          0.7508305647840532
Confusion Matrix
[[ 129.   26.]
[  49.   97.]]
In [45]:
labelsAndPreds_lr_df = labelsAndPreds_lr.toDF().toPandas()
cm = smetrics.confusion_matrix( labelsAndPreds_lr_df._2, labelsAndPreds_lr_df._1 )
sn.heatmap(cm, annot=True, fmt='.2f' )
Out[45]:
<matplotlib.axes._subplots.AxesSubplot at 0x7f1d9a21feb8>

Decision Tree with undersampled dataset

In [46]:
tree_model = DecisionTree.trainClassifier(trainingData,
                                        numClasses=2,
                                        categoricalFeaturesInfo={7:2, 8:2},
                                        impurity='gini',
                                        maxDepth=4,
                                        maxBins=32)

tree_predictions = tree_model.predict( testData.map(lambda lp: lp.features ) )
labelsAndPreds_tree = tree_predictions.zip( testData.map( lambda lp: lp.label ) )

success_count_tree = labelsAndPreds_tree.filter(lambda rec:
                                            rec[0] == rec[1]).count()

print("Successful prediction percentage: " +
    str( round( success_count_tree / labelsAndPreds_tree.count(), 2 ) ) )

tmetrics = MulticlassMetrics( labelsAndPreds_tree )
Successful prediction percentage: 0.84
In [47]:
printMetric( tmetrics )
Precision of True  0.9375
Precision of False 0.783068783068783
Recall of True     0.7191780821917808
Recall of False    0.9548387096774194
F-1 Score          0.840531561461794
Confusion Matrix
[[ 148.    7.]
[  41.  105.]]
In [48]:
labelsAndPreds_tree_df = labelsAndPreds_tree.toDF().toPandas()
cm_tree = smetrics.confusion_matrix( labelsAndPreds_tree_df._2, labelsAndPreds_tree_df._1 )
sn.heatmap(cm_tree, annot=True, fmt='.2f' )
Out[48]:
<matplotlib.axes._subplots.AxesSubplot at 0x7f1d9a2003c8>

Random Forest Tree Model

In [49]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
In [50]:
rf_model = RandomForest.trainClassifier(trainingData,
                                      numClasses=2,
                                      categoricalFeaturesInfo={7:2, 8:2},
                                      numTrees=50,
                                      impurity='entropy',
                                      maxDepth=10,
                                      maxBins=32)
In [51]:
rf_predictions = rf_model.predict( testData.map(lambda lp: lp.features ) )
labelsAndPreds_rf = rf_predictions.zip( testData.map( lambda lp: lp.label ) )
success_count_rf = labelsAndPreds_rf.filter(lambda rec:
                                            rec[0] == rec[1]).count()

print("Successful prediction percentage: " +
    str( round( success_count_rf / labelsAndPreds_rf.count(), 2 ) ) )
Successful prediction percentage: 0.87
In [52]:
rf_metrics = MulticlassMetrics( labelsAndPreds_rf )
In [53]:
printMetric( rf_metrics )
Precision of True  0.8970588235294118
Precision of False 0.8545454545454545
Recall of True     0.8356164383561644
Recall of False    0.9096774193548387
F-1 Score          0.8737541528239202
Confusion Matrix
[[ 141.   14.]
[  24.  122.]]
In [54]:
labelsAndPreds_rf_df = labelsAndPreds_rf.toDF().toPandas()
cm_rf = smetrics.confusion_matrix( labelsAndPreds_rf_df._2, labelsAndPreds_rf_df._1 )
sn.heatmap(cm_rf, annot=True, fmt='.2f' )
Out[54]:
<matplotlib.axes._subplots.AxesSubplot at 0x7f1d9a093160>

Gradient Boosted Trees

In [55]:
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
In [56]:
gbt_model = GradientBoostedTrees.trainClassifier(trainingData,
                                               categoricalFeaturesInfo={7:2, 8:2},
                                               numIterations=10)
In [57]:
gbt_predictions = gbt_model.predict( testData.map(lambda lp: lp.features ) )
labelsAndPreds_gbt = gbt_predictions.zip( testData.map( lambda lp: lp.label ) )
success_count_gbt = labelsAndPreds_gbt.filter(lambda rec:
                                            rec[0] == rec[1]).count()

print("Successful prediction percentage: " +
    str( round( success_count_gbt / labelsAndPreds_gbt.count(), 2 ) ) )
Successful prediction percentage: 0.87
In [58]:
gbt_metrics = MulticlassMetrics( labelsAndPreds_gbt )
In [59]:
printMetric( gbt_metrics )
Precision of True  0.9140625
Precision of False 0.8323699421965318
Recall of True     0.8013698630136986
Recall of False    0.9290322580645162
F-1 Score          0.867109634551495
Confusion Matrix
[[ 144.   11.]
[  29.  117.]]
In [60]:
labelsAndPreds_gbt = labelsAndPreds_gbt.toDF().toPandas()
cm_gbt = smetrics.confusion_matrix( labelsAndPreds_gbt._1, labelsAndPreds_gbt._2 )
sn.heatmap(cm_gbt, annot=True, fmt='.2f' )
Out[60]:
<matplotlib.axes._subplots.AxesSubplot at 0x7f1d9a0982e8>