package movie_rating
import org.apache.spark.rdd.RDD
/**
* Utils.usersRdd:对应字段中文解释:用户 id,性别,年龄,职业,邮政编码
* Utils.movieRdd:对应字段中文解释:电影 ID,电影名字,电影类型
* Utils.ratingsRdd:对应字段中文解释:用户 ID,电影 ID,评分,评分时间戳
*/
object Demand09 {
/**
* 9、该影评库中各种类型电影中评价最高的 5 部电影(类型,电影名,平均影评分)
*/
def main(args: Array[String]): Unit = {
//获得所有电影的movieID,name,types (movieID, (name, types))
val movieID_name_types: RDD[(String, (String, String))] = Utils.movieRdd.map(x => (x._1, (x._2, x._3)))
//获得所有的movieID,rating (movieID, rating)
val movieID_rating: RDD[(String, String)] = Utils.ratingsRdd.map(x => (x._2, x._3))
//关联movieID_name_types与movieID_rating (movieID, ((name, types), rating)) ---> (types, name, rating)
val types_name_rating: RDD[((String, String), Int)] = movieID_name_types.join(movieID_rating)
.map(x => ((x._2._1._2, x._2._1._1), x._2._2.toInt))
//((types, name), Iterable(rating)) ---> (types, name, avg)
val types_name_avg: RDD[(String, String, Double)] = types_name_rating.groupByKey().map(x => {
var avg = 0d
if (x._2.size >= 50)
avg = x._2.sum * 1.0 / x._2.size
(x._1._1, x._1._2, avg)
})
//(types, name, avg) 划分types:将Action|Adventure|Comedy|Sci-Fi拆开
var tempArray: Array[(String, String, Double)] = Array(("", "", 0d))
types_name_avg.collect().foreach(x => {
//Action|Adventure|Comedy|Sci-Fi ---> Arrays(Action, Adventure, Comedy, Sci-Fi)
val types: Array[String] = x._1.split("\\|")
//将所有的types_name_avg中的元素拆分后存于tempArray数组中
tempArray = types.map((_, x._2, x._3)).union(tempArray)
})
//(type, name, avg) 包含所有类型电影的排序
val type_name_avg = Utils.sc.makeRDD(tempArray).filter(_._3 > 0).sortBy(x => (x._1, x._3), false)
//(type, Iterable(type, name, avg)) 打印前五
type_name_avg.groupBy(_._1).sortByKey().foreach(x => {
var count = 0
val list: List[(String, String, Double)] = x._2.toList
while(count < list.size && count < 5){
println(list(count))
count += 1
}
println()
})
}
}
package movie_rating
import org.apache.spark.rdd.RDD
/**
* Utils.usersRdd:对应字段中文解释:用户 id,性别,年龄,职业,邮政编码
* Utils.movieRdd:对应字段中文解释:电影 ID,电影名字,电影类型
* Utils.ratingsRdd:对应字段中文解释:用户 ID,电影 ID,评分,评分时间戳
*/
object Demand10 {
/**
* 10、各年评分最高的电影类型(年份,类型,影评分)
*/
def main(args: Array[String]): Unit = {
//(movieID, year)
val movieID_year: RDD[(String, String)] = Utils.movieRdd.map(x => (x._1, (x._2.substring(x._2.length - 5, x._2.length - 1))))
//(movieID, rating) ---> (movieID, Iterable(rating)) ---> (movie