Things to learn

  • Reading from HDFS
  • Reading from MySql ( from RDBMS using JDBC )
  • Working with JSON Data - Reading and Parsing
  • Working with Spark SQLs
  • Applying data transformaton using Spark SQL Statements
  • Data used in this exercise is randomly generated
In [2]:
sc
Out[2]:
<pyspark.context.SparkContext at 0x7f7e59ec9400>

This Spark Application is running on YARN. So, open the YARN Resource manager UI and verify if the application is running

Initialize SQLContext from SparkContext

In [3]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

Reading JSON file from HDFS

  • Give hdfs uri location as input data source for the read() function
In [4]:
## Read the json file from HDFS 
txns = sqlContext.read.json(
  "hdfs://hadooplab.bigdataleap.com/sparklab/txnjsonsmall")

Display the first 10 records

In [5]:
txns.show( 10 )
+------------+-------------------+----------+--------------------+-------------+----------+----------+--------+
|CashOrCredit|       creditCardNo|customerNo|           lineItems| merchantCity|     state|     tDate|   txnNo|
+------------+-------------------+----------+--------------------+-------------+----------+----------+--------+
|      credit|4971-xxxx-xxxx-5769|   4004819|[[015.82,Team Spo...|  Brownsville|     Texas|06-27-2011|00000000|
|      credit|3787-xxxx-xxxx-6017|   4003459|[[089.28,Water Sp...|      Houston|     Texas|02-07-2011|00000001|
|      credit|5951-xxxx-xxxx-4036|   4009112|[[067.51,Exercise...|       Eugene|    Oregon|03-02-2011|00000002|
|      credit|3793-xxxx-xxxx-3180|   4009376|[[043.38,Water Sp...|     Paterson|New Jersey|01-23-2011|00000003|
|      credit|3913-xxxx-xxxx-4556|   4006758|[[193.65,Outdoor ...|      Gresham|    Oregon|05-07-2011|00000004|
|      credit|4629-xxxx-xxxx-3692|   4000951|[[104.47,Exercise...|   Des Moines|      Iowa|12-07-2011|00000005|
|      credit|4032-xxxx-xxxx-1996|   4002494|[[093.97,Jumping,...|  St. Louis  |  Missouri|05-02-2011|00000006|
|      credit|3551-xxxx-xxxx-0696|   4000599|[[197.33,Exercise...|      Phoenix|   Arizona|06-02-2011|00000007|
|      credit|3282-xxxx-xxxx-5190|   4007057|[[128.98,Winter S...|Overland Park|    Kansas|03-06-2011|00000008|
|      credit|4621-xxxx-xxxx-9258|   4005366|[[074.57,Water Sp...|      Fremont|California|06-22-2011|00000009|
+------------+-------------------+----------+--------------------+-------------+----------+----------+--------+
only showing top 10 rows

Flatten the nested structure

  • The line items are nested structure in each transaction.
  • Display the lineitems of first 5 transactions
In [6]:
txns.select( "lineItems" ).take( 5 )
Out[6]:
[Row(lineItems=[Row(amount='015.82', category='Team Sports', product='Cheerleading'), Row(amount='086.47', category='Water Sports', product='Whitewater Rafting'), Row(amount='063.08', category='Exercise & Fitness', product='Gym Mats'), Row(amount='068.80', category='Exercise & Fitness', product='Weightlifting Machine Accessories'), Row(amount='092.49', category='Team Sports', product='Lacrosse'), Row(amount='083.92', category='Outdoor Recreation', product='Lawn Games')]),
Row(lineItems=[Row(amount='089.28', category='Water Sports', product='Water Tubing'), Row(amount='042.38', category='Water Sports', product='Surfing'), Row(amount='062.80', category='Team Sports', product='Cheerleading')]),
Row(lineItems=[Row(amount='067.51', category='Exercise & Fitness', product='Exercise Bands'), Row(amount='154.57', category='Team Sports', product='Rugby'), Row(amount='100.18', category='Outdoor Recreation', product='Skateboarding'), Row(amount='190.52', category='Exercise & Fitness', product='Foam Rollers'), Row(amount='054.35', category='Water Sports', product='Kitesurfing')]),
Row(lineItems=[Row(amount='043.38', category='Water Sports', product='Boating'), Row(amount='106.27', category='Team Sports', product='Rugby'), Row(amount='164.86', category='Combat Sports', product='Fencing'), Row(amount='164.94', category='Racquet Sports', product='Tennis'), Row(amount='007.36', category='Exercise & Fitness', product='Gym Mats'), Row(amount='110.56', category='Outdoor Recreation', product='Skateboarding')]),
Row(lineItems=[Row(amount='193.65', category='Outdoor Recreation', product='Deck Shuffleboard'), Row(amount='135.98', category='Winter Sports', product='Snowshoeing'), Row(amount='063.27', category='Racquet Sports', product='Racquetball'), Row(amount='151.53', category='Dancing', product='Ballet Bars'), Row(amount='088.69', category='Gymnastics', product='Balance Beams'), Row(amount='070.75', category='Outdoor Play Equipment', product='Swing Sets')])]
  • Explode function in spark sql can be used to flatten the records
In [7]:
from pyspark.sql.functions import *
In [8]:
## Explode and flatten the nested structure into a set of columns
txns_new = txns.select( 'txnNo', 'tDate', 'customerNo', 'merchantCity',
                     'state', 'item.category',
                     'item.product', 'item.amount',
                     explode( txns.lineItems ).alias( 'item' ) ).drop( 'item')
In [9]:
# Show 10 records
txns_new.show( 10 )
+--------+----------+----------+------------+------+------------------+--------------------+------+
|   txnNo|     tDate|customerNo|merchantCity| state|          category|             product|amount|
+--------+----------+----------+------------+------+------------------+--------------------+------+
|00000000|06-27-2011|   4004819| Brownsville| Texas|       Team Sports|        Cheerleading|015.82|
|00000000|06-27-2011|   4004819| Brownsville| Texas|      Water Sports|  Whitewater Rafting|086.47|
|00000000|06-27-2011|   4004819| Brownsville| Texas|Exercise & Fitness|            Gym Mats|063.08|
|00000000|06-27-2011|   4004819| Brownsville| Texas|Exercise & Fitness|Weightlifting Mac...|068.80|
|00000000|06-27-2011|   4004819| Brownsville| Texas|       Team Sports|            Lacrosse|092.49|
|00000000|06-27-2011|   4004819| Brownsville| Texas|Outdoor Recreation|          Lawn Games|083.92|
|00000001|02-07-2011|   4003459|     Houston| Texas|      Water Sports|        Water Tubing|089.28|
|00000001|02-07-2011|   4003459|     Houston| Texas|      Water Sports|             Surfing|042.38|
|00000001|02-07-2011|   4003459|     Houston| Texas|       Team Sports|        Cheerleading|062.80|
|00000002|03-02-2011|   4009112|      Eugene|Oregon|Exercise & Fitness|      Exercise Bands|067.51|
+--------+----------+----------+------------+------+------------------+--------------------+------+
only showing top 10 rows

Register the new table as temporary ( in memory ) table, so that we can run SQL Queries on it

  • Register the dataframe as an temporary sql table into memory, so that we can go and run some sql query
In [10]:
txns_new.registerTempTable("txnrecords")

Find revenue generated by state and product

In [11]:
revenue_by_state = sqlContext.sql( '''select state, product, sum( amount ) as 
                                total from txnrecords group by state, product''' )
In [12]:
# show the first 10 records
revenue_by_state.show( 10 )
+-------------+--------------------+------------------+
|        state|             product|             total|
+-------------+--------------------+------------------+
|       Oregon|               Rugby|             261.4|
|        Texas|          Parachutes|            706.58|
|       Oregon|Scuba Diving & Sn...|            264.55|
|         Utah|           Wrestling|207.79000000000002|
|     Kentucky|         Bobsledding|232.84999999999997|
|      Florida|        Foam Rollers|            387.27|
|Massachusetts|          Air Hockey|            120.66|
|      Alabama|         Windsurfing|296.17999999999995|
|      Arizona|      Jumping Stilts| 96.00999999999999|
| Pennsylvania|           Disc Golf|             28.08|
+-------------+--------------------+------------------+
only showing top 10 rows

Register the result sets as temporary tables

In [13]:
revenue_by_state.registerTempTable('state_revenue')

Write an UDF ( User defined function ) to extract week day name from the date field

  • Define a user defined function to be invoked from sql query. For example, deriving weekday name from the date field
In [14]:
import datetime
def getDayOfWeek( date):
  return datetime.datetime.strptime(date, "%m-%d-%Y").strftime('%A')

Register the function to SQL Context as new UDF

In [15]:
# Register the function 
from pyspark.sql.types import StringType
sqlContext.udf.register("getDayOfWeek", lambda date: getDayOfWeek( date ),
                      StringType() )

Invoke the UDF from the SQL

  • Write a query to invoke the user defined function. Calculate the total revenue by different weekdays...
In [16]:
revenue_by_state = sqlContext.sql( '''select weekday as weekday, 
                                  round( sum( amount ), 2 ) as total 
                                  from ( select getDayOfWeek( tDate ) as weekday, 
                                  amount from txnrecords ) txns 
                                  group by weekday order by total desc''' )
In [17]:
revenue_by_state.show( 10 )
+---------+--------+
|  weekday|   total|
+---------+--------+
| Thursday| 94549.2|
|Wednesday|85091.56|
|   Monday|81712.77|
|   Sunday|79634.08|
|  Tuesday|79594.51|
| Saturday|78114.84|
|   Friday| 71809.1|
+---------+--------+

Reading data from MySql

  • Before we could demonstrate how to read data from mysql table, the data need to be loaded into mysql table. The data is actually available in a file in csv format. The following steps show how to load data into a mysql table.

Check MySql Table

  • Go to linux prompt of your VM
  • Enter "mysql -u root -p"
  • Enter password
  • Select Database retail
    • use retail;
  • Show tables
    • show tables;
    • describe customers;
  • Load data into customers table
    • LOAD DATA LOCAL INFILE '/home/hadoop/lab/data/custs' INTO TABLE customers FIELDS TERMINATED BY ',' LINES TERMINATED BY '\r\n';
  • List some of the records
    • select * from customers limit 100;

Using jdbc to read from mysql table

In [18]:
## Read customer information from mysql table....
cust_df = sqlContext.read.format('jdbc')                                      \
  .options(url='jdbc:mysql://localhost/retail?user=root&password=hadoop123',
  dbtable='customers').load()
In [19]:
cust_df.show( 10 )
+-------+---------+----------+---+--------------------+
| CustID|FirstName|  LastName|Age|          Profession|
+-------+---------+----------+---+--------------------+
|4000001| Kristina|     Chung| 55|               Pilot|
|4000002|    Paige|      Chen| 74|             Teacher|
|4000003|   Sherri|    Melton| 34|         Firefighter|
|4000004| Gretchen|      Hill| 66|Computer hardware...|
|4000005|    Karen|   Puckett| 74|              Lawyer|
|4000006|  Patrick|      Song| 42|        Veterinarian|
|4000007|    Elsie|  Hamilton| 43|               Pilot|
|4000008|    Hazel|    Bender| 63|           Carpenter|
|4000009|  Malcolm|    Wagner| 39|              Artist|
|4000010|  Dolores|McLaughlin| 60|              Writer|
+-------+---------+----------+---+--------------------+
only showing top 10 rows

Finding total money spent by each customers

In [20]:
top_10_custs = sqlContext.sql( '''select customerNo, round( sum( amount ), 2 ) 
                              as total from txnrecords group by customerNo 
                              order by total desc limit 10''')

top_10_custs.take( 10 )
Out[20]:
[Row(customerNo='4007510', total=2204.79),
Row(customerNo='4003293', total=2024.67),
Row(customerNo='4003971', total=1869.43),
Row(customerNo='4004260', total=1869.12),
Row(customerNo='4001058', total=1791.99),
Row(customerNo='4008914', total=1652.06),
Row(customerNo='4004491', total=1649.74),
Row(customerNo='4007168', total=1610.76),
Row(customerNo='4001253', total=1516.77),
Row(customerNo='4009672', total=1485.23)]
In [21]:
top_10_custs.registerTempTable( "top_10_custs" )
cust_df.registerTempTable( "custs_rec" )

Joining with customer table to find top 10 customers based on total money spent

In [22]:
top_10_cust_names = sqlContext.sql( '''select a.CustID, a.FirstName, a.LastName, 
                            a.Age, b.total from custs_rec a join top_10_custs  b 
                            on a.CustID = b.customerNo order by b.total desc''' )
In [23]:
top_10_cust_names.show( 10 )
+-------+---------+---------+---+-------+
| CustID|FirstName| LastName|Age|  total|
+-------+---------+---------+---+-------+
|4007510|  Kristin|    Levin| 73|2204.79|
|4003293|   Martha|   Warner| 45|2024.67|
|4003971|   Donald|     Lamm| 34|1869.43|
|4004260| Courtney|    Rubin| 54|1869.12|
|4001058|   Gloria| Matthews| 53|1791.99|
|4008914| Samantha|Batchelor| 41|1652.06|
|4004491|     Rita|    Parks| 44|1649.74|
|4007168|  Carolyn|      Han| 52|1610.76|
|4001253|    Peter| McNamara| 74|1516.77|
|4009672|   Samuel|     Kidd| 61|1485.23|
+-------+---------+---------+---+-------+

Exercises

  • Find out top 5 selling products in each category
  • Find top 10 customers for every month