Rdd:对应字段中文解释:电影 ID,电影名字,电影类型
* Utils.ratingsRdd:对应字段中文解释:用户 ID,电影 ID,评分,评分时间戳
*/
object Demand02 {
/**
* 2、分别求男性,女性当中评分最高的 10 部电影(性别,电影名,影评分)
*/
def main(args: Array[String]): Unit = {
//(userID, sex)
val userID_sex: RDD[(String, String)] = Utils.usersRdd.map(x => (x._1, x._2))
//(userID, (movieID, rating))
val userID_movieID_rating: RDD[(String, (String, String))] = Utils.ratingsRdd.map(x => (x._1, (x._2, x._3)))
//(userID, (sex, (movieID, rating))) ---> (sex, movieID, rating)
val movieID_rating: RDD[(String, String, String)] = userID_sex.join(userID_movieID_rating).map(x => (x._2._1, x._2._2._1, x._2._2._2))
//((sex, movieID), Iterable[(sex, movieID, rating)]) ---> (movieID, (sex, avg))
val movieID_sex_avg: RDD[(String, (String, Double))] = movieID_rating.groupBy(x => (x._1, x._2)).map(x => {
var sum, avg = 0d
val list: List[(String, String, String)] = x._2.toList
if (list.size > 50) {
list.map(x => ( sum += x._3.toInt ))
avg = sum * 1.0 / list.size
}
(x._1._2, (x._1._1, avg))
})
//(movieID, movieName)
val movieID_movieName: RDD[(String, String)] = Utils.movieRdd.map(x => (x._1, x._2))
//sex_movieID_avg与movie进行关联 (movieID, ((sex, avg), movieName)) ---> (sex, movieName, avg)
val sex_movieName_avg: RDD[(String, String, Double)] = movieID_sex_avg.join(movieID_movieName)
.map(x => (x._2._1._1, x._2._2, x._2._1._2)).sortBy(x => (x._1, x._3), false)
sex_movieName_avg.take(10).foreach(println(_))
sex_movieName_avg.filter(_._1 == "F").take(10).foreach(println(_))
}
}
第三问:
package movie_rating
import org.apache.spark.rdd.RDD
/**
* Utils.usersRdd:对应字段中文解释:用户 id,性别,年龄,职业,邮政编码
* Utils.movieRdd:对应字段中文解释:电影 ID,电影名字,电影类型
* Utils.ratingsRdd:对应字段中文解释:用户 ID,电影 ID,评分,评分时间戳
*/
object Demand03 {
/**
* 3、分别求男性,女性看过最多的 10 部电影(性别,电影名)
*/
def main(args: Array[String]): Unit = {
//(userID, sex)
val userID_sex: RDD[(String, String)] = Utils.usersRdd.map(x => (x._1, x._2))
//(userID, movieID)
val userID_movieID: RDD[(String, String)] = Utils.ratingsRdd.map(x => (x._1, x._2))
//(movieID, name)
val movieID_name: RDD[(String, String)] = Utils.movieRdd.map(x => (x._1, x._2))
//(userID, (sex, movieID)) ---> (movieID, sex)
val movieID_sex: RDD[(String, String)] = userID_sex.join(userID_movieID).map(x => (x._2._2, x._2._1))
//关联movieID_sex和movieID_name (movieID, (sex, name)) ---> (movieID, sex, name)
val movieID_sex_name: RDD[(String, String, String)] = movieID_sex.join(movieID_name)
.map(x => (x._1, x._2._1, x._2._2))
//((sex, name), Iterable[(movieID, sex, name)]) ---> (sex, name, times)
val sex_name_times: RDD[(String, String, Int)] = movieID_sex_name.groupBy(x => (x._2, x._3)).map(x => (x._1._1, x._1._2, x._2.toList.size)).sortBy(x => (x._1, x._3), false)
//输出结果
sex_name_times.take(10).foreach(println(_))
sex_name_times.filter(_._1 == "F").take(10).foreach(println(_))
}
}
第四问
package movie_rating
import org.apache.spark.rdd.RDD
/**
* Utils.usersRdd:对应字段中文解释:用户 id,性别,年龄,职业,邮政编码
* Utils.movieRdd:对应字段中文解释:电影 ID,电影名字,电影类型
* Utils.ratingsRdd:对应字段中文解释:用户 ID,电影 ID,评分,评分时间戳
*/
object Demand04 {
/**
* 4、年龄段在“18-24”的男人,最喜欢看 1 |