Things to learn¶
- Reading a file into spark dataframes
- Applying schema while reading records into a dataframe
- Displaying records
- Applying operations like grouping, sorting, aggregating, filtering etc.
- Joining multiple dataframes
- Utility functions like describing schema, showing records, rename columns, listing columns etc
- Spark DataFrame Docs available here
- The dataset used in this exercise is available here
In [1]:
sc
Out[1]:
Use SQLContext to load and read structured data¶
In [2]:
## Create an sql context
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
Read the tab separated file. Which contains userid, movieid, ratings and timestamp¶
In [3]:
ratings = sqlContext.read.format("com.databricks.spark.csv") \
.options(delimiter='\t') \
.load('file:///home/hadoop/lab/data/movies/ratings.dat')
- The details of the the csv reader is given in the url https://github.com/databricks/spark-csv
In [4]:
# What is the data type of the ratings variable. It should be a dataframe.
ratings
Out[4]:
Displaying Records¶
In [5]:
# Read the first few rows of the dataframe
ratings.show()
Describe the schema of the records¶
In [6]:
ratings.printSchema()
In [7]:
ratings.schema
Out[7]:
Apply a custom defined schema to the dataframe¶
In [8]:
from pyspark.sql.types import *
fields = [StructField("userid", IntegerType(), True),
StructField("movieid", IntegerType(), True),
StructField("rating", IntegerType(), True),
StructField("timestamp", LongType(), True) ]
Applying the schema, while reading the records¶
In [9]:
## Read the tab separated file. Which contains userid, movieid, ratings and timestamp
ratings_df = sqlContext.read.format("com.databricks.spark.csv") \
.options(delimiter='\t') \
.load('file:///home/hadoop/lab/data/movies/ratings.dat',
schema = StructType(fields) )
In [10]:
ratings_df.show()
The RDD or DataFrame can be persisted in Memory¶
In [11]:
ratings_df.persist()
Out[11]:
In [12]:
ratings_df.printSchema()
Show the list of columns in the dataframe¶
In [13]:
# Return a list of columns
ratings_df.columns
Out[13]:
In [14]:
## How many records in the dataframe?
ratings_df.count()
Out[14]:
In [15]:
ratings_df_mem_ser = ratings_df
In [16]:
ratings_df_mem_ser.persist(StorageLevel.MEMORY_ONLY_SER)
Out[16]:
In [17]:
ratings_df_mem_ser.count()
Out[17]:
How to drop a column?¶
- We will drop timestamp from dataframe as we will not need it for further analysis.
In [18]:
## We donot need the timestamp column.. let's drop it
ratings_df = ratings_df.drop( 'timestamp')
In [19]:
ratings_df.show()
Applying operations like groupby() and sort()¶
In [20]:
movie_counts = ratings_df.groupBy("movieid").count()
In [21]:
from pyspark.sql.functions import *
movie_counts = movie_counts.sort(desc("count"))
In [22]:
movie_counts.show( 10 )
Applying an aggregation function to the group by¶
In [23]:
avg_ratings = ratings_df.groupBy("movieid").agg( {"rating":"avg"} )
In [24]:
avg_ratings.printSchema()
In [25]:
avg_ratings = avg_ratings.sort( desc( "avg(rating)" ) )
In [26]:
avg_ratings.show( 10 )
Joining multiple dataframes¶
In [27]:
avg_ratings_count = avg_ratings.join( movie_counts,
avg_ratings.movieid == movie_counts.movieid ,
'inner' ).drop(movie_counts.movieid)
In [28]:
avg_ratings_count.printSchema()
Renaming a column in a dataframe¶
In [29]:
avg_ratings_count = avg_ratings_count.withColumnRenamed( "avg(rating)",
"mean_rating" )
In [30]:
avg_ratings_count.printSchema()
In [31]:
avg_ratings_count = avg_ratings_count \
.withColumn( "mean_rating",
round( avg_ratings_count["mean_rating"]
, 2 ) )
In [32]:
avg_ratings_count = avg_ratings_count.sort( desc( "mean_rating" ) )
In [33]:
avg_ratings_count.show( 10 )
Filtering records in a dataframe based on a criteria¶
In [34]:
avg_ratings_count = avg_ratings_count.filter( avg_ratings_count["count"] > 20 )
In [35]:
avg_ratings_count = avg_ratings_count.sort( desc( "mean_rating" ) , desc( "count") )
In [36]:
avg_ratings_count.show( 10 )
Loading movies data¶
- Movies file contains information like movieid, movie name and the genre it belongs to in terms of a set of tags
In [37]:
movies_df = sqlContext.read.format("com.databricks.spark.csv") \
.options(delimiter='\t', header = True, inferSchema = True) \
.load('file:///home/hadoop/lab/data/movies/movies.dat')
In [38]:
movies_df.show( 10 )
In [39]:
movies_df.printSchema()
Joining Ratings and Movies data to find top 20 best rated movies¶
In [40]:
top_movies = avg_ratings_count.limit(20) \
.join( movies_df,
avg_ratings_count.movieid == movies_df.movieid,
"inner" ).drop(movies_df.movieid)
In [41]:
top_movies_20 = top_movies.select( "movieid", "mean_rating", "count", "name" )
In [42]:
top_movies_20.collect()
Out[42]:
Saving the results into a csv file¶
In [43]:
top_movies_20.write \
.format("com.databricks.spark.csv") \
.option("header", "true") \
.save("file:///home/hadoop/lab/results/topmovies")
Exercises¶
- Find out 20 worst rated movies. But only consider those movies which are rated by at least 100 users.
- Find out best 10 and worst 10 movies in each category - Categories are tags given in the movies.data file
Comments