Overview

  • This notebook gives on overview of how to read data into Spark Framework and apply basic RDD operations like parsing, filtering, grouping, sorting and map reduce operations using keys and values.
  • When spark starts, the sparkContext is created and available as variable sc
  • This dataset is taken from cricinfo.com. There are two datasets available: Captains ODIs & Captains Tests
  • Using RDD APIs, we will explore the following steps:

    • Reading text files
    • Parse records and associate with a schema
    • Filtering of records
    • Grouping of records
    • Sorting by key and values
    • Applying map and reduce functions
    • Caching RDDs
    • Applying groupByKey() and mapValues() functions
    • Collecting data into driver
    • Joining RDDs
    • Saving RDDs into files
    • Using Broadcast variables
    • Using Accumulators

Check Spark Context

Checking if SparkContext is Created

In [1]:
# Spark Context Created.. Let's verify it
sc
Out[1]:
<pyspark.context.SparkContext at 0x7f2a4c425278>

Checking spark version

In [184]:
sc.version
Out[184]:
'1.6.0'

Check Spark Master Deployment Mode

In [185]:
sc.master
Out[185]:
'local[2]'

Spark Application Name and ID

In [186]:
sc.appName
Out[186]:
'pyspark-shell'
In [187]:
sc.applicationId
Out[187]:
'local-1469738969083'

Spark using which python version

In [188]:
sc.pythonVer
Out[188]:
'3.4'

ODI Performance Analysis

Loading the dataset

Read the capatings_ODI.csv file from local file system

In [2]:
captains_odis = sc.textFile( "file:///home/hadoop/lab/data/captains_ODI.csv" )

Check the first record

In [3]:
captains_odis.first()
Out[3]:
'Ponting  R T,Australia,1995-2012,230,165,51,14,124'

Display the first 10 records

In [130]:
captains_odis.take( 10 )
Out[130]:
['Ponting  R T,Australia,1995-2012,230,165,51,14,124',
'Fleming  S P,New Zealand,1994-2007,218,98,106,14,105',
'Ranatunga  A,Sri Lanka,1982-1999,193,89,95,9,102',
'Dhoni  M S*,India,2004-,186,103,68,15,88',
'Border  A R,Australia,1979-1994,178,107,67,4,86',
'Azharuddin  M,India,1985-2000,174,89,77,8,96',
'Smith  G C,South Africa,2002-2013,149,91,51,7,74',
'Ganguly  S C,India,1992-2007,147,76,66,5,74',
'Cronje  W J,South Africa,1992-2000,140,99,37,4,74',
'Imran Khan,Pakistan,1974-1992,139,75,59,5,70']

Checking the RDD type

In [5]:
# captains_odis should be of RDD type
type( captains_odis )
Out[5]:
pyspark.rdd.RDD
In [6]:
captains_odis.persist( StorageLevel.MEMORY_ONLY_SER )
Out[6]:
MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2

Counting the number of records

In [7]:
# Count the number of records
captains_odis.count()
Out[7]:
98

Parsing the lines and creating records

  • Each line from the file is taken as a record. The records are not associated with a schema. Let's define a schema and associate the fields in each line with the schema. One way of doing it is defining a named tuple and converting the lines into named tuples.
In [8]:
fields = ("name", "country", "career", "matches", "won", "lost", "ties", "toss" )
In [9]:
from collections import namedtuple

Define named tuples to represent records

  • Lets define a tuple ( a record ) for each line. We need to iterate through each line and convert that into a record. The record can be defined as a namedTuple type and called Captain. Let's also link the names for the fields
In [10]:
Captain = namedtuple( 'Captain', fields )

Parsing lines into named tuples

  • Create a function to parse the lines and create namedtuples. Then iterate throught the data and convert them into records i.e. named tuples.
In [11]:
# Define a function to parse each line and convert them into records
def parseRecs( line ):
  fields = line.split(",")
  return Captain( fields[0], fields[1], fields[2], int( fields[3] ),
                 int( fields[4] ), int(fields[5]), int(fields[6]), int(fields[7] ) )
In [12]:
captains = captains_odis.map( lambda rec: parseRecs( rec) )

Cache the RDD

  • We will be using the RDD in future for many analysis. So, let's cache the RDD.
In [40]:
captains.cache()
Out[40]:
PythonRDD[6] at RDD at PythonRDD.scala:43
In [41]:
# Now captains refer to all the records. Let's display the first 10 records.
captains.take( 10 )
Out[41]:
[Captain(name='Ponting  R T', country='Australia', career='1995-2012', matches=230, won=165, lost=51, ties=14, toss=124),
Captain(name='Fleming  S P', country='New Zealand', career='1994-2007', matches=218, won=98, lost=106, ties=14, toss=105),
Captain(name='Ranatunga  A', country='Sri Lanka', career='1982-1999', matches=193, won=89, lost=95, ties=9, toss=102),
Captain(name='Dhoni  M S*', country='India', career='2004-', matches=186, won=103, lost=68, ties=15, toss=88),
Captain(name='Border  A R', country='Australia', career='1979-1994', matches=178, won=107, lost=67, ties=4, toss=86),
Captain(name='Azharuddin  M', country='India', career='1985-2000', matches=174, won=89, lost=77, ties=8, toss=96),
Captain(name='Smith  G C', country='South Africa', career='2002-2013', matches=149, won=91, lost=51, ties=7, toss=74),
Captain(name='Ganguly  S C', country='India', career='1992-2007', matches=147, won=76, lost=66, ties=5, toss=74),
Captain(name='Cronje  W J', country='South Africa', career='1992-2000', matches=140, won=99, lost=37, ties=4, toss=74),
Captain(name='Imran Khan', country='Pakistan', career='1974-1992', matches=139, won=75, lost=59, ties=5, toss=70)]
In [42]:
# What is the type of the captains RDD
type( captains )
Out[42]:
pyspark.rdd.PipelinedRDD

Finding ODI Insights

Filtering records: Captain's who captained for more than 100 ODIs

  • Filter only those captains that have captained for at least 100 ODI matches. And then we can compare the statistics of these captains
In [43]:
captains_100 = captains.filter( lambda rec: rec.matches > 100 )

How many captains have captained their country for more than 100 ODIs?

In [44]:
captains_100.count()
Out[44]:
16
In [45]:
# Who are these captains
captains_100.take( 10 )
Out[45]:
[Captain(name='Ponting  R T', country='Australia', career='1995-2012', matches=230, won=165, lost=51, ties=14, toss=124),
Captain(name='Fleming  S P', country='New Zealand', career='1994-2007', matches=218, won=98, lost=106, ties=14, toss=105),
Captain(name='Ranatunga  A', country='Sri Lanka', career='1982-1999', matches=193, won=89, lost=95, ties=9, toss=102),
Captain(name='Dhoni  M S*', country='India', career='2004-', matches=186, won=103, lost=68, ties=15, toss=88),
Captain(name='Border  A R', country='Australia', career='1979-1994', matches=178, won=107, lost=67, ties=4, toss=86),
Captain(name='Azharuddin  M', country='India', career='1985-2000', matches=174, won=89, lost=77, ties=8, toss=96),
Captain(name='Smith  G C', country='South Africa', career='2002-2013', matches=149, won=91, lost=51, ties=7, toss=74),
Captain(name='Ganguly  S C', country='India', career='1992-2007', matches=147, won=76, lost=66, ties=5, toss=74),
Captain(name='Cronje  W J', country='South Africa', career='1992-2000', matches=140, won=99, lost=37, ties=4, toss=74),
Captain(name='Imran Khan', country='Pakistan', career='1974-1992', matches=139, won=75, lost=59, ties=5, toss=70)]

Writing an utility / data function to return num captains

In [72]:
def getNumCaptainsByMinMatches( anRDD, num_matches ):
  return anRDD.map( lambda rec: parseRecs( rec) ).filter( lambda rec: rec.matches > num_matches ).count()

Writing script to test functions

In [73]:
test_input_data = ['Ponting  R T,Australia,1995-2012,230,165,51,14,124',
'Fleming  S P,New Zealand,1994-2007,218,98,106,14,105',
'Ranatunga  A,Sri Lanka,1982-1999,193,89,95,9,102',
'Dhoni  M S*,India,2004-,186,103,68,15,88',
'Crowe  M D,New Zealand,1982-1995,44,21,22,1,28',
'Atherton  M A,England,1990-1998,43,20,21,2,20',
'Walsh  C A,West Indies,1985-2000,43,22,20,1,20']
In [74]:
test_input_rdd = sc.parallelize( test_input_data, 2 )

Testing a successful condition

In [75]:
assert getNumCaptainsByMinMatches( test_input_rdd, 100 ) == 4,    \
          "getNumCaptainsByMinMatches() returning incorrect matches"

Testing a failure condition

In [77]:
assert getNumCaptainsByMinMatches( test_input_rdd, 100 ) == 5,    \
          "getNumCaptainsByMinMatches() returning incorrect matches"
---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-77-412e927eebe4> in <module>()
----> 1 assert getNumCaptainsByMinMatches( test_input_rdd, 100 ) == 5,                "getNumCaptainsByMinMatches() returning incorrect matches"

AssertionError: getNumCaptainsByMinMatches() returning incorrect matches
  • assert() failed indicating the function is not working.

Invoking the function on original data

In [83]:
getNumCaptainsByMinMatches( captains_odis, 100 )
Out[83]:
16

Captains with more wins than losses

In [46]:
# Filtering: Captains who have more wins then losses...
captains_more_wins = captains_100.filter( lambda rec: rec.won > rec.lost )

Collecting the captain names in the driver

In [47]:
# Captains with more wins than losses
captains_more_wins.map( lambda rec: rec.name ).collect()
Out[47]:
['Ponting  R T',
'Dhoni  M S*',
'Border  A R',
'Azharuddin  M',
'Smith  G C',
'Ganguly  S C',
'Cronje  W J',
'Imran Khan',
'Jayawardene  D P M',
'Jayasuriya  S T',
'Wasim Akram',
'Waugh  S R',
'Richards  I V A']
In [48]:
# Captains with less wins than losses
captains_more_losts = captains_100.filter( lambda rec: rec.won <= rec.lost )
captains_more_losts.map( lambda rec: rec.name ).collect()
Out[48]:
['Fleming  S P', 'Ranatunga  A', 'Lara  B C']

Creating a subset of data by filtering columns

In [49]:
# Which country has played how many matches..
countries = captains.map( lambda rec: ( rec.country , rec.matches) )
In [50]:
countries.take( 10 )
Out[50]:
[('Australia', 230),
('New Zealand', 218),
('Sri Lanka', 193),
('India', 186),
('Australia', 178),
('India', 174),
('South Africa', 149),
('India', 147),
('South Africa', 140),
('Pakistan', 139)]

Aggregating values by keys

In [51]:
# Aggregate by countries
matches_countries = countries.reduceByKey( lambda a, b: a + b )
In [52]:
matches_countries.take( 20 )
Out[52]:
[('Pakistan', 781),
('South Africa', 463),
('Ireland', 93),
('Australia', 832),
('West Indies', 658),
('Kenya', 114),
('India', 770),
('Bermuda', 31),
('Netherlands', 31),
('Afghanistan', 50),
('England', 554),
('Canada', 27),
('Zimbabwe', 394),
('Sri Lanka', 710),
('Bangladesh', 251),
('New Zealand', 608)]
In [53]:
# Sort the countries by the number of matches they played. 
# Sort by names...(sort by key)
matches_countries.sortByKey().collect()
Out[53]:
[('Afghanistan', 50),
('Australia', 832),
('Bangladesh', 251),
('Bermuda', 31),
('Canada', 27),
('England', 554),
('India', 770),
('Ireland', 93),
('Kenya', 114),
('Netherlands', 31),
('New Zealand', 608),
('Pakistan', 781),
('South Africa', 463),
('Sri Lanka', 710),
('West Indies', 658),
('Zimbabwe', 394)]

Sorting records

  • Sort the countries by the number of matches they played by descending order by names (key)
In [54]:
matches_countries.sortByKey( ascending = False ).collect()
Out[54]:
[('Zimbabwe', 394),
('West Indies', 658),
('Sri Lanka', 710),
('South Africa', 463),
('Pakistan', 781),
('New Zealand', 608),
('Netherlands', 31),
('Kenya', 114),
('Ireland', 93),
('India', 770),
('England', 554),
('Canada', 27),
('Bermuda', 31),
('Bangladesh', 251),
('Australia', 832),
('Afghanistan', 50)]
In [55]:
# Sort by values.. default is ascending...
matches_countries.sortBy( lambda rec: rec[1] ).collect()
Out[55]:
[('Canada', 27),
('Bermuda', 31),
('Netherlands', 31),
('Afghanistan', 50),
('Ireland', 93),
('Kenya', 114),
('Bangladesh', 251),
('Zimbabwe', 394),
('South Africa', 463),
('England', 554),
('New Zealand', 608),
('West Indies', 658),
('Sri Lanka', 710),
('India', 770),
('Pakistan', 781),
('Australia', 832)]
In [56]:
# Sort by value by descending
matches_countries.sortBy( lambda rec: rec[1], ascending = False ).collect()
Out[56]:
[('Australia', 832),
('Pakistan', 781),
('India', 770),
('Sri Lanka', 710),
('West Indies', 658),
('New Zealand', 608),
('England', 554),
('South Africa', 463),
('Zimbabwe', 394),
('Bangladesh', 251),
('Kenya', 114),
('Ireland', 93),
('Afghanistan', 50),
('Bermuda', 31),
('Netherlands', 31),
('Canada', 27)]

Writing a data function to return countries and total matches played in sorted order

In [67]:
def getNumMatchesPerCountry( anRDD ):
  return anRDD.map( lambda rec: parseRecs( rec) )          \
  .map( lambda rec: ( rec.country , rec.matches) )         \
  .reduceByKey( lambda a, b: a + b )                       \
  .sortBy( lambda rec: rec[1], ascending = False )

Testing the Function: getNumMatchesPerCountry

In [78]:
assert getNumMatchesPerCountry( test_input_rdd ).collect()[0] == ('New Zealand', 262 )

Invoking the Function

In [85]:
getNumMatchesPerCountry( captains_odis ).take( 10 )
Out[85]:
[('Australia', 832),
('Pakistan', 781),
('India', 770),
('Sri Lanka', 710),
('West Indies', 658),
('New Zealand', 608),
('England', 554),
('South Africa', 463),
('Zimbabwe', 394),
('Bangladesh', 251)]

Top Baptains by Percentage Wins

In [57]:
# Captains by percentage of wins
captains_100_percent_wins = captains_100.map(
  lambda rec: ( rec.name, round( rec.won/rec.matches, 2 ) ) )

# Sort by percentage wins
captains_100_percent_wins.sortBy(
  lambda rec: rec[1], ascending = False ).collect()
Out[57]:
[('Ponting  R T', 0.72),
('Cronje  W J', 0.71),
('Richards  I V A', 0.64),
('Waugh  S R', 0.63),
('Smith  G C', 0.61),
('Wasim Akram', 0.61),
('Border  A R', 0.6),
('Jayasuriya  S T', 0.56),
('Dhoni  M S*', 0.55),
('Jayawardene  D P M', 0.55),
('Imran Khan', 0.54),
('Ganguly  S C', 0.52),
('Azharuddin  M', 0.51),
('Lara  B C', 0.47),
('Ranatunga  A', 0.46),
('Fleming  S P', 0.45)]

Lucky Captains

In [65]:
# Captains by percentage of wins
lucky_captains = captains_100.map(
  lambda rec: ( rec.name, round( rec.toss / rec.matches, 2 ) ) )
In [66]:
lucky_captains.sortBy( lambda rec: rec[1], ascending = False ).take( 5 )
Out[66]:
[('Azharuddin  M', 0.55),
('Ponting  R T', 0.54),
('Ranatunga  A', 0.53),
('Cronje  W J', 0.53),
('Wasim Akram', 0.53)]

For further explorations for the readers to try out

  • Filter countries which have played more than hundred matches
  • Sort counties by the percentage of matches they have won

Captain's Test Match Performances

Loading the dataset

In [39]:
# Read the Captains_Test.csv file
captains_tests = sc.textFile( "file:///home/hadoop/lab/data/captains_Test.csv" )
In [40]:
# Parse the records
captains_tests_recs = captains_tests.map( lambda rec: parseRecs( rec ) )
In [41]:
# Display the first 10 records
captains_tests_recs.take( 10 )
Out[41]:
[Captain(name='Smith  G C', country='South Africa', career='2002-2014', matches=109, won=53, lost=29, ties=27, toss=58),
Captain(name='Border  A R', country='Australia', career='1978-1994', matches=93, won=32, lost=22, ties=38, toss=47),
Captain(name='Fleming  S P', country='New Zealand', career='1994-2008', matches=80, won=28, lost=27, ties=25, toss=38),
Captain(name='Ponting  R T', country='Australia', career='1995-2012', matches=77, won=48, lost=16, ties=13, toss=37),
Captain(name='Lloyd  C H', country='West Indies', career='1966-1984', matches=74, won=36, lost=12, ties=26, toss=35),
Captain(name='Dhoni  M S*', country='India', career='2005-', matches=60, won=27, lost=18, ties=15, toss=27),
Captain(name='Waugh  S R', country='Australia', career='1985-2004', matches=57, won=41, lost=9, ties=7, toss=31),
Captain(name='Ranatunga  A', country='Sri Lanka', career='1982-2000', matches=56, won=12, lost=19, ties=25, toss=29),
Captain(name='Atherton  M A', country='England', career='1989-2001', matches=54, won=13, lost=21, ties=20, toss=23),
Captain(name='Cronje  W J', country='South Africa', career='1992-2000', matches=53, won=27, lost=11, ties=15, toss=22)]
In [42]:
# Filter the captains who have captained for more than 100 tests
captains_tests_100 = captains_tests_recs.filter( lambda rec:
                                              rec.matches > 100 )
In [63]:
## How many captains?
captains_tests_100.take( 10 )
Out[63]:
[Captain(name='Smith  G C', country='South Africa', career='2002-2014', matches=109, won=53, lost=29, ties=27, toss=58)]

Note:

  • There is only one captain who has captained more then 100 tests
  • So, we must lower the number of matches to look at more captains performance. We will filter out all captains who have played more than 50 matches.
In [64]:
# Filter the captains who have captained for more than 50 tests
captains_tests_50 = captains_tests_recs.filter( lambda rec:
                                             rec.matches > 50 )
In [35]:
captains_tests_50.take( 10 )
Out[35]:
[Captain(name='Smith  G C', country='South Africa', career='2002-2014', matches=109, won=53, lost=29, ties=27, toss=58),
Captain(name='Border  A R', country='Australia', career='1978-1994', matches=93, won=32, lost=22, ties=38, toss=47),
Captain(name='Fleming  S P', country='New Zealand', career='1994-2008', matches=80, won=28, lost=27, ties=25, toss=38),
Captain(name='Ponting  R T', country='Australia', career='1995-2012', matches=77, won=48, lost=16, ties=13, toss=37),
Captain(name='Lloyd  C H', country='West Indies', career='1966-1984', matches=74, won=36, lost=12, ties=26, toss=35),
Captain(name='Dhoni  M S*', country='India', career='2005-', matches=60, won=27, lost=18, ties=15, toss=27),
Captain(name='Waugh  S R', country='Australia', career='1985-2004', matches=57, won=41, lost=9, ties=7, toss=31),
Captain(name='Ranatunga  A', country='Sri Lanka', career='1982-2000', matches=56, won=12, lost=19, ties=25, toss=29),
Captain(name='Atherton  M A', country='England', career='1989-2001', matches=54, won=13, lost=21, ties=20, toss=23),
Captain(name='Cronje  W J', country='South Africa', career='1992-2000', matches=53, won=27, lost=11, ties=15, toss=22)]

Calculating the test percentage wins

In [36]:
# Sort the captains by percentage of wins
captain_top = captains_tests_50.map(
  lambda rec: ( rec.name,
               round( rec.won/rec.matches,
                     2 ) ) ).sortBy( lambda rec: rec[1], ascending = False )
In [37]:
captain_top.collect()
Out[37]:
[('Waugh  S R', 0.72),
('Ponting  R T', 0.62),
('Cronje  W J', 0.51),
('Vaughan  M P', 0.51),
('Smith  G C', 0.49),
('Lloyd  C H', 0.49),
('Dhoni  M S*', 0.45),
('Fleming  S P', 0.35),
('Border  A R', 0.34),
('Atherton  M A', 0.24),
('Ranatunga  A', 0.21)]

Joining multiple data sets

  • Now let's join both ODI and Test performances and verify their efficiency.
In [38]:
# Lets join both ODI and Test captaincy details. 
# Default is inner join...
all_time_best_captains = captains_100_percent_wins.join( captain_top )
In [39]:
all_time_best_captains.collect()
Out[39]:
[('Smith  G C', (0.61, 0.49)),
('Border  A R', (0.6, 0.34)),
('Fleming  S P', (0.45, 0.35)),
('Cronje  W J', (0.71, 0.51)),
('Ponting  R T', (0.72, 0.62)),
('Ranatunga  A', (0.46, 0.21)),
('Dhoni  M S*', (0.55, 0.45)),
('Waugh  S R', (0.63, 0.72))]
In [40]:
## Best by test match wins
all_time_best_captains.sortBy( lambda rec: rec[1][1],
                            ascending = False ).collect()
Out[40]:
[('Waugh  S R', (0.63, 0.72)),
('Ponting  R T', (0.72, 0.62)),
('Cronje  W J', (0.71, 0.51)),
('Smith  G C', (0.61, 0.49)),
('Dhoni  M S*', (0.55, 0.45)),
('Fleming  S P', (0.45, 0.35)),
('Border  A R', (0.6, 0.34)),
('Ranatunga  A', (0.46, 0.21))]
In [41]:
## Best by ODI match wins
all_time_best_captains.sortBy( lambda rec: rec[1][0],
                            ascending = False ).collect()
Out[41]:
[('Ponting  R T', (0.72, 0.62)),
('Cronje  W J', (0.71, 0.51)),
('Waugh  S R', (0.63, 0.72)),
('Smith  G C', (0.61, 0.49)),
('Border  A R', (0.6, 0.34)),
('Dhoni  M S*', (0.55, 0.45)),
('Ranatunga  A', (0.46, 0.21)),
('Fleming  S P', (0.45, 0.35))]

Flattening the tuples

In [42]:
## Now let's flatten the structure and store the results into a file...
best_captains = all_time_best_captains.map( lambda rec:
                                         ( rec[0],
                                          rec[1][0],
                                          rec[1][1] ) )
best_captains.take( 10 )
Out[42]:
[('Smith  G C', 0.61, 0.49),
('Border  A R', 0.6, 0.34),
('Fleming  S P', 0.45, 0.35),
('Cronje  W J', 0.71, 0.51),
('Ponting  R T', 0.72, 0.62),
('Ranatunga  A', 0.46, 0.21),
('Dhoni  M S*', 0.55, 0.45),
('Waugh  S R', 0.63, 0.72)]

Saving results into filesystem

In [43]:
best_captains.saveAsTextFile( "file:///home/hadoop/lab/results/captains")

Check results

  • Go to you VM's putty terminal and go to the results directory
    • cd /home/hadoop/lab/results/captains/
    • cat part-0000

Saving results into one file. Forcing the bring all results into one partition

In [44]:
## Consolidate into one partition
best_captains.repartition( 1 ).                                     \
  saveAsTextFile( "file:///home/hadoop/lab/results/bestcaptains")

Check results

  • Go to you VM's putty terminal and go to the results directory
    • cd /home/hadoop/lab/results/bestcaptains/
    • cat part-00000

Exercises

Sorting countries by percentage wins

  • We have multiple elements in the value part of (key, value) tuple. So, we can not invoke reduce by key function. We will call groupByKey() and then map each element of values to sum. And then divide the total wins by total matches to get percentage wins.

Generate the key and values

In [107]:
country_matches_list = captains.map( lambda rec: ( rec.country , ( rec.won, rec.matches ) ) )
In [ ]:
In [108]:
country_matches_list.take( 5 )
Out[108]:
[('Australia', (165, 230)),
('New Zealand', (98, 218)),
('Sri Lanka', (89, 193)),
('India', (103, 186)),
('Australia', (107, 178))]

Group the values

In [109]:
country_matches_list.groupByKey().mapValues( lambda rec: list( rec ) ).take( 2 )
Out[109]:
[('Pakistan',
[(75, 139),
 (66, 109),
 (52, 89),
 (45, 87),
 (26, 62),
 (37, 62),
 (19, 38),
 (24, 36),
 (20, 34),
 (21, 34),
 (13, 25),
 (9, 22),
 (7, 22),
 (8, 22)]),
('South Africa', [(91, 149), (99, 140), (60, 98), (44, 76)])]

Apply sum() to each element of values tuple

In [100]:
def getWinMatchPercentage( allrecs ):
  return sum( [rec[1] for rec in allrecs] ) / len( allrecs )
In [111]:
country_matches_rdd = country_matches_list.groupByKey().mapValues( lambda rec:
                                          ( sum( [rec[0] for rec in rec] ),
                                            sum( [rec[1] for rec in rec] ) ) )
In [112]:
country_matches_rdd.take( 10 )
Out[112]:
[('Pakistan', (422, 781)),
('South Africa', (294, 463)),
('Ireland', (42, 93)),
('Australia', (503, 832)),
('West Indies', (342, 658)),
('Kenya', (34, 114)),
('India', (387, 770)),
('Bermuda', (6, 31)),
('Netherlands', (10, 31)),
('Afghanistan', (25, 50))]

Get percentage wins

In [119]:
most_win_countries = country_matches_rdd.map( lambda rec:
                                           ( rec[0],
                                            round( rec[1][0] / rec[1][1], 2  ) ) )     \
                                      .sortBy( lambda rec: rec[1], ascending = False )
In [120]:
most_win_countries.take( 10 )
Out[120]:
[('South Africa', 0.63),
('Australia', 0.6),
('Pakistan', 0.54),
('West Indies', 0.52),
('India', 0.5),
('Afghanistan', 0.5),
('England', 0.5),
('Sri Lanka', 0.49),
('Ireland', 0.45),
('New Zealand', 0.45)]

Broadcasting Variables

In [124]:
def getNumCaptainsByMinimumMatches( anRDD, num_matches ):
  min_matches = sc.broadcast( num_matches )
  return anRDD.map( lambda rec: parseRecs( rec) ).filter( lambda rec: rec.matches > min_matches.value ).count()
In [126]:
getNumCaptainsByMinimumMatches( captains_odis, 100 )
Out[126]:
16
In [128]:
getNumCaptainsByMinimumMatches( captains_odis, 50 )
Out[128]:
48

Accumulators

In [179]:
def parRecsNew( line ):
  global playing_caps
  if "*" in line:
     playing_caps.add( 1 )
  return parseRecs( line )
In [180]:
playing_caps = sc.accumulator(0)
In [181]:
captains_odis_new = captains_odis.map( lambda line: parRecsNew( line ) )
In [182]:
captains_odis_new.count()
Out[182]:
98
In [183]:
playing_caps.value
Out[183]:
27