Overview

  • This notebook gives on overview of how to read data into Spark Framework and apply basic RDD operations like parsing, filtering, grouping, sorting and map reduce operations using keys and values.
  • When spark starts, the sparkContext is created and available as variable sc
  • This dataset is taken from cricinfo.com. There are two datasets available: Captains ODIs & Captains Tests
In [1]:
# Spark Context Created.. Let's verify it
sc
Out[1]:
<pyspark.context.SparkContext at 0x7f2118725400>

Useful information about spark context

In [2]:
## Application Identifier
sc.applicationId
Out[2]:
'local-1469148966251'
In [3]:
sc.environment.keys()
Out[3]:
dict_keys(['PYTHONHASHSEED'])
In [4]:
## Python version it is running
sc.pythonVer
Out[4]:
'3.4'

Get all Spark Configurations

In [5]:
## Use sc.getConf.getAll.foreach(println) for SCALA
sc._conf.getAll()
Out[5]:
[('spark.driver.extraClassPath',
'/home/hadoop/lab/software/apache-hive-1.2.1-bin/lib/guava-14.0.1.jar:/home/hadoop/lab/software/apache-hive-1.2.1-bin/lib/mysql-connector-java-5.1.30-bin.jar:/home/hadoop/lab/lib/com.databricks_spark-csv_2.10-1.3.0.jar:/home/hadoop/lab/lib/org.apache.commons_commons-csv-1.1.jar:/home/hadoop/lab/lib/org.apache.commons_commons-csv-1.1.jar'),
('spark.rdd.compress', 'True'),
('spark.sql.hive.metastore.jars',
'/home/hadoop/lab/software/apache-hive-1.2.1-bin/lib/*:/home/hadoop/lab/software/hadoop-2.7.1/share/hadoop/common/*:/home/hadoop/lab/software/hadoop-2.7.1/share/hadoop/hdfs/*:/home/hadoop/lab/software/hadoop-2.7.1/share/hadoop/hdfs/lib/*:/home/hadoop/lab/software/hadoop-2.7.1/share/hadoop/yarn/*:/home/hadoop/lab/software/hadoop-2.7.1/share/hadoop/yarn/lib/*:/home/hadoop/lab/software/hadoop-2.7.1/share/hadoop/mapreduce/*:/home/hadoop/lab/software/hadoop-2.7.1/share/hadoop/mapreduce/lib/*'),
('spark.executor.extraClassPath',
'/home/hadoop/lab/lib/com.databricks_spark-csv_2.10-1.3.0.jar:/home/hadoop/lab/lib/org.apache.commons_commons-csv-1.1.jar:/home/hadoop/lab/lib/org.apache.commons_commons-csv-1.1.jar:/home/hadoop/lab/software/apache-hive-1.2.1-bin/lib/mysql-connector-java-5.1.30-bin.jar'),
('spark.serializer.objectStreamReset', '100'),
('spark.sql.hive.metastore.version', '1.2.1'),
('spark.submit.deployMode', 'client'),
('spark.app.name', 'pyspark-shell'),
('spark.sql.hive.metastore.sharedPrefixes', 'com.mysql.jdbc'),
('spark.master', 'local[2]')]

Get specific properties values

In [6]:
sc._conf.get("spark.master")
Out[6]:
'local[2]'

Get the Web URL

ODI Performance Analysis

Check Spark Web UI

  • Check the Executor page. This lists the executor process running. As this application is runnin on local mode, the drive and executor running on same JVM process.

Read the capatings_ODI.csv file from local file system

In [7]:
captains_odis = sc.textFile( "file:///home/hadoop/lab/data/captains_ODI.csv" )

Check Spark UI

As you can notice there are no jobs created as no action is called.

Check the first record

In [8]:
captains_odis.first()
Out[8]:
'Ponting  R T,Australia,1995-2012,230,165,51,14,124'

Check Spark UI

A job is created as shown below.

Click on the runJob and it will take to the stages. The DAG Visualization shows that it read from the text file.

Display the first 10 records

In [9]:
captains_odis.take( 10 )
Out[9]:
['Ponting  R T,Australia,1995-2012,230,165,51,14,124',
'Fleming  S P,New Zealand,1994-2007,218,98,106,14,105',
'Ranatunga  A,Sri Lanka,1982-1999,193,89,95,9,102',
'Dhoni  M S*,India,2004-,186,103,68,15,88',
'Border  A R,Australia,1979-1994,178,107,67,4,86',
'Azharuddin  M,India,1985-2000,174,89,77,8,96',
'Smith  G C,South Africa,2002-2013,149,91,51,7,74',
'Ganguly  S C,India,1992-2007,147,76,66,5,74',
'Cronje  W J,South Africa,1992-2000,140,99,37,4,74',
'Imran Khan,Pakistan,1974-1992,139,75,59,5,70']

Checking the RDD type

In [10]:
# captains_odis should be of RDD type
type( captains_odis )
Out[10]:
pyspark.rdd.RDD

Cache the RDD into memory

In [11]:
captains_odis.persist( StorageLevel.MEMORY_ONLY_SER )
Out[11]:
MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2

Counting the number of records

In [12]:
# Count the number of records
captains_odis.count()
Out[12]:
98

Check Web UI

Check Storage

The Storage tab shows the RDD is now persisited. It shows number of partitions created and total memory allocation for the RDD.

Click on the RDD and it will show you all partitions for the RDD

It shows the following information

  • The executor location of the partitions
  • Size of each partition
  • If the partitions are replicated (by default they are not replicated)

Check Stages

Click on the stages and click on the count stage.

This should open up the DAG visualization. It shows summary stats, aggregated metric for all tasks and again individual tasks metrics.

In this case,

  • Number of tasks should be equal to number of partitions.
  • Duration of execution by each partition.
  • Input size and number of records processed by each partition.
  • Locality Level. If PROCESS_LOCAL, then the task is accomplished where the partition exists.

Event Timeline

Event time line diagram show the following details

  • Scheduler Delay
  • Shuffle Read Time
  • Executor Computing Time
  • Shuffle Write Time

The above time line may provide clue to further optimization the spark proceses like serialization or computation time.

Obtaining these information from APIs

How many partitions in an RDD?

In [13]:
## use someRDD.partitions.size in SCALA
captains_odis.getNumPartitions()
Out[13]:
2

Is the RDD cached()

In [15]:
captains_odis.is_cached
Out[15]:
True

Where is the cache stored? Storage Level?

In [16]:
captains_odis.getStorageLevel()
Out[16]:
StorageLevel(False, True, False, False, 1)
In [17]:
## Use StorageLevel.description in SCALA
print( captains_odis.getStorageLevel() )
Memory Serialized 1x Replicated

Parsing the lines and creating records

  • The RDD does not have a header. Let's define a header and then assign it to the RDD
In [20]:
fields = ("name", "country", "career", "matches", "won", "lost", "ties", "toss" )
In [21]:
from collections import namedtuple

Check the RDD lineage

Define named tuples to represent records

  • Lets define a tuple ( a record ) for each line. We need to iterate through each line and convert that into a record. The record can be defined as a namedTuple type and called Captain. Let's also link the names for the fields
In [22]:
Captain = namedtuple( 'Captain', fields )
In [23]:
# Define a function to parse each line and convert them into records
def parseRecs( line ):
  fields = line.split(",")
  return Captain( fields[0], fields[1], fields[2], int( fields[3] ),
                 int( fields[4] ), int(fields[5]), int(fields[6]), int(fields[7] ) )
In [24]:
captains = captains_odis.map( lambda rec: parseRecs( rec) )
In [29]:
# Which country has played how many matches..
countries = captains.map( lambda rec: ( rec.country , rec.matches) )
In [59]:
# use countries.toDebugString in SCALA 
print( str(countries.toDebugString(),'utf-8') )
(2) PythonRDD[34] at RDD at PythonRDD.scala:43 []
|  MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 []
|      CachedPartitions: 2; MemorySize: 3.4 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
|  file:///home/hadoop/lab/data/captains_ODI.csv HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2 []

There is ONE STAGE of operation for reading records, parsing and creating keys and values

Aggregating values by keys

In [60]:
# Aggregate by countries
matches_countries = countries.reduceByKey( lambda a, b: a + b )
In [61]:
print( str(matches_countries.toDebugString(),'utf-8') )
(2) PythonRDD[39] at RDD at PythonRDD.scala:43 []
|  MapPartitionsRDD[38] at mapPartitions at PythonRDD.scala:374 []
|  ShuffledRDD[37] at partitionBy at NativeMethodAccessorImpl.java:-2 []
+-(2) PairwiseRDD[36] at reduceByKey at <ipython-input-60-30c85538e831>:2 []
  |  PythonRDD[35] at reduceByKey at <ipython-input-60-30c85538e831>:2 []
  |  MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 []
  |      CachedPartitions: 2; MemorySize: 3.4 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
  |  file:///home/hadoop/lab/data/captains_ODI.csv HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2 []

There are TWO STAGEs of operation for creating keys and values then shuffle and reducing it (aggregating)

Sorting records

  • Sort the countries by the number of matches they played by descending order by names (key)
In [62]:
# Sort by value by descending
match_sorted = matches_countries.sortBy( lambda rec: rec[1], ascending = False )
In [63]:
print( str(match_sorted.toDebugString(),'utf-8') )
(2) PythonRDD[46] at RDD at PythonRDD.scala:43 []
|  MapPartitionsRDD[45] at mapPartitions at PythonRDD.scala:374 []
|  ShuffledRDD[44] at partitionBy at NativeMethodAccessorImpl.java:-2 []
+-(2) PairwiseRDD[43] at sortBy at <ipython-input-62-361045a5af76>:2 []
  |  PythonRDD[42] at sortBy at <ipython-input-62-361045a5af76>:2 []
  |  MapPartitionsRDD[38] at mapPartitions at PythonRDD.scala:374 []
  |  ShuffledRDD[37] at partitionBy at NativeMethodAccessorImpl.java:-2 []
  +-(2) PairwiseRDD[36] at reduceByKey at <ipython-input-60-30c85538e831>:2 []
     |  PythonRDD[35] at reduceByKey at <ipython-input-60-30c85538e831>:2 []
     |  MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 []
     |      CachedPartitions: 2; MemorySize: 3.4 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
     |  file:///home/hadoop/lab/data/captains_ODI.csv HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2 []

There is THREE STAGE of operation for reading map, reduce and another reduce stage for sorting the record by values

Note:

The number (2) in bracker shows the number of parallelism.

Let's change the number of partitions during reduce stage

In [74]:
matches_countries = countries.reduceByKey( lambda a, b: a + b, numPartitions = 4 )
match_sorted = matches_countries.sortBy( lambda rec: rec[1], ascending = False )
print( str(match_sorted.toDebugString(),'utf-8') )
(4) PythonRDD[123] at RDD at PythonRDD.scala:43 []
|  MapPartitionsRDD[122] at mapPartitions at PythonRDD.scala:374 []
|  ShuffledRDD[121] at partitionBy at null:-1 []
+-(4) PairwiseRDD[120] at sortBy at <ipython-input-74-dbeee14cc119>:2 []
  |  PythonRDD[119] at sortBy at <ipython-input-74-dbeee14cc119>:2 []
  |  MapPartitionsRDD[116] at mapPartitions at PythonRDD.scala:374 []
  |  ShuffledRDD[115] at partitionBy at NativeMethodAccessorImpl.java:-2 []
  +-(2) PairwiseRDD[114] at reduceByKey at <ipython-input-74-dbeee14cc119>:1 []
     |  PythonRDD[113] at reduceByKey at <ipython-input-74-dbeee14cc119>:1 []
     |  MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 []
     |      CachedPartitions: 2; MemorySize: 3.4 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
     |  file:///home/hadoop/lab/data/captains_ODI.csv HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2 []

Now, the number of partitions have increased from first reduce stage.

In [81]:
# Sort the captains by percentage of wins
captain_top_odi = captains.map(
  lambda rec: ( rec.name,
               round( rec.won/rec.matches,
                     2 ) ) ).sortBy( lambda rec: rec[1], ascending = False )
In [82]:
# Read the Captains_Test.csv file
captains_tests = sc.textFile( "file:///home/hadoop/lab/data/captains_Test.csv" )
In [83]:
# Parse the records
captains_tests_recs = captains_tests.map( lambda rec: parseRecs( rec ) )
In [84]:
# Sort the captains by percentage of wins
captain_top_test = captains_tests_recs.map(
  lambda rec: ( rec.name,
               round( rec.won/rec.matches,
                     2 ) ) ).sortBy( lambda rec: rec[1], ascending = False )

Joining multiple data sets

  • Now let's join both ODI and Test performances and verify their efficiency.
In [85]:
# Lets join both ODI and Test captaincy details. 
# Default is inner join...
all_time_best_captains = captain_top_test.join( captain_top_odi )
In [86]:
print( str(all_time_best_captains.toDebugString(),'utf-8') )
(4) PythonRDD[154] at RDD at PythonRDD.scala:43 []
|  MapPartitionsRDD[153] at mapPartitions at PythonRDD.scala:374 []
|  ShuffledRDD[152] at partitionBy at null:-1 []
+-(4) PairwiseRDD[151] at join at <ipython-input-85-64bc265f7afc>:3 []
  |  PythonRDD[150] at join at <ipython-input-85-64bc265f7afc>:3 []
  |  UnionRDD[149] at union at NativeMethodAccessorImpl.java:-2 []
  |  PythonRDD[147] at RDD at PythonRDD.scala:43 []
  |  MapPartitionsRDD[146] at mapPartitions at PythonRDD.scala:374 []
  |  ShuffledRDD[145] at partitionBy at null:-1 []
  +-(2) PairwiseRDD[144] at sortBy at <ipython-input-84-0fc037ee6758>:5 []
     |  PythonRDD[143] at sortBy at <ipython-input-84-0fc037ee6758>:5 []
     |  MapPartitionsRDD[140] at textFile at NativeMethodAccessorImpl.java:-2 []
     |  file:///home/hadoop/lab/data/captains_Test.csv HadoopRDD[139] at textFile at NativeMethodAccessorImpl.java:-2 []
  |  PythonRDD[148] at RDD at PythonRDD.scala:43 []
  |  MapPartitionsRDD[138] at mapPartitions at PythonRDD.scala:374 []
  |  ShuffledRDD[137] at partitionBy at null:-1 []
  +-(2) PairwiseRDD[136] at sortBy at <ipython-input-81-92859234f101>:5 []
     |  PythonRDD[135] at sortBy at <ipython-input-81-92859234f101>:5 []
     |  MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 []
     |      CachedPartitions: 2; MemorySize: 3.4 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
     |  file:///home/hadoop/lab/data/captains_ODI.csv HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2 []

Coalesce to less number of partitions

In [90]:
all_time_best_captains_2 = all_time_best_captains.coalesce( 1 )
In [91]:
print( str(all_time_best_captains_2.toDebugString(),'utf-8') )
(1) CoalescedRDD[157] at coalesce at NativeMethodAccessorImpl.java:-2 []
|  PythonRDD[154] at RDD at PythonRDD.scala:43 []
|  MapPartitionsRDD[153] at mapPartitions at PythonRDD.scala:374 []
|  ShuffledRDD[152] at partitionBy at null:-1 []
+-(4) PairwiseRDD[151] at join at <ipython-input-85-64bc265f7afc>:3 []
  |  PythonRDD[150] at join at <ipython-input-85-64bc265f7afc>:3 []
  |  UnionRDD[149] at union at NativeMethodAccessorImpl.java:-2 []
  |  PythonRDD[147] at RDD at PythonRDD.scala:43 []
  |  MapPartitionsRDD[146] at mapPartitions at PythonRDD.scala:374 []
  |  ShuffledRDD[145] at partitionBy at null:-1 []
  +-(2) PairwiseRDD[144] at sortBy at <ipython-input-84-0fc037ee6758>:5 []
     |  PythonRDD[143] at sortBy at <ipython-input-84-0fc037ee6758>:5 []
     |  MapPartitionsRDD[140] at textFile at NativeMethodAccessorImpl.java:-2 []
     |  file:///home/hadoop/lab/data/captains_Test.csv HadoopRDD[139] at textFile at NativeMethodAccessorImpl.java:-2 []
  |  PythonRDD[148] at RDD at PythonRDD.scala:43 []
  |  MapPartitionsRDD[138] at mapPartitions at PythonRDD.scala:374 []
  |  ShuffledRDD[137] at partitionBy at null:-1 []
  +-(2) PairwiseRDD[136] at sortBy at <ipython-input-81-92859234f101>:5 []
     |  PythonRDD[135] at sortBy at <ipython-input-81-92859234f101>:5 []
     |  MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 []
     |      CachedPartitions: 2; MemorySize: 3.4 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
     |  file:///home/hadoop/lab/data/captains_ODI.csv HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2 []