sparkwordcount
object SparkWC {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setAppName("SparkWC").setMaster("local[*]")
val sc =new SparkContext(conf)
val lines=sc.textFile(args(0))
val words: RDD[String] = lines.flatMap(_.split(" "))
val res: RDD[(String, Int)] = words.map((_,1)).reduceByKey(_+_).sortBy(_._2,false)
res.saveAsTextFile(args(1))
println(res.collect.toBuffer)
sc.stop()
}
}
sparkwordcount java 版
public class JavaWC {
public static void main(String[] args) {
final SparkConf jconf = new SparkConf().setAppName("JavaWC").setMaster("local[*]");
final JavaSparkContext jsc = new JavaSparkContext(jconf);
final JavaRDD<String> lines = jsc.textFile(args[0]);
final JavaRDD<String> word = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split(" "));
}
});
final JavaPairRDD<String, Integer> pairRDD = word.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
});
final JavaPairRDD<String, Integer> reduced = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
final JavaPairRDD<Integer, String> swaped = reduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.swap();
}
});
final JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
final JavaPairRDD<String, Integer> res = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
return integerStringTuple2.swap();
}
});
System.out.println(res.collect().toArray());
jsc.stop();
}
}
基站定位 家庭与工作地址
需求是在一定时间内,手机在哪个基站附近停留的时间最长
a.log 中是 手机号,进入基站的时间,基站id,事件类型(进出)
loc_info.txt 中是经纬度与,基站id
object MobileLocation {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setAppName("MobileLocation").setMaster("local[2]")
val sc=new SparkContext(conf)
val file: RDD[String] = sc.textFile("C://a.log")
val phoneAndLocAndTime: RDD[((String, String), Long)] = file.map(line => {
val fields = line.split(",")
val phone = fields(0)
val time = fields(1).toLong
val loc = fields(2)
val eventType = fields(3).toInt
val time_lone = if (eventType == 1) -time else time
((phone, loc), time_lone)
})
val sum: RDD[((String, String), Long)] = phoneAndLocAndTime.reduceByKey(_+_)
val loc_phone_timesum: RDD[(String, (String, Long))] = sum.map(x => {
val phone = x._1._1
val loc = x._1._2
val timesum = x._2
(loc, (phone, timesum))
})
val locInfo=sc.textFile("C://loc_info.txt")
val loc_xy: RDD[(String, (String, String))] = locInfo.map(line => {
val fields = line.split(",")
val loc = fields(0)
val x = fields(1)
val y = fields(2)
(loc, (x, y))
})
val join: RDD[(String, ((String, Long), (String, String)))] = loc_phone_timesum.join(loc_xy)
val res: RDD[(String, List[(String, Long, (String, String))])] = join.map(x => {
val phone = x._2._1._1
val loc = x._1
val time = x._2._1._2
val locxy = x._2._2
(phone, time, locxy)
}).groupBy(_._1).mapValues(_.toList.sortBy(_._2).reverse.take(2))
sc.stop()
}
}
请求访问日志统计 每个模块的访问次数
/** 文件 格式是 日期 请求url
* 20161123101523 http://java.learn.com/java/javaee.shtml
* 统计用户对每个学科各个模块的访问次数,取top3
*/
object ObjectCount01 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("Object").setAppName("")
val sc =new SparkContext(conf)
val file: RDD[String] = sc.textFile("")
val url_map: RDD[(String, Int)] = file.map(x => {
val lines = x.split("\t")
val time = lines(0)
val url = lines(1)
(url, 1)
})
val sum_url: RDD[(String, Int)] = url_map.reduceByKey(_+_)
val project_url_sum: RDD[(String, String, Int)] = sum_url.map(x => {
val url = x._1
val count = x._2
val project = new URL(url).getHost
(project, url, count)
})
val res=project_url_sum.groupBy(_._1).mapValues(_.toList.sortBy(_._3).reverse.take(3))
println(res.collect.toBuffer)
sc.stop()
}
}
对上个项目的改进 ,加缓存
/**
* 统计用户对每个学科各个模块的访问次数,取top3
*/
object ObjectCount02 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("Object").setAppName("")
val sc = new SparkContext(conf)
val file: RDD[String] = sc.textFile("")
val project = Array("...android...", "http://java.learn.com", "...ui...") 省略写了
val url_map: RDD[(String, Int)] = file.map(x => {
val lines = x.split("\t")
val time = lines(0)
val url = lines(1)
(url, 1)
})
val sum_url: RDD[(String, Int)] = url_map.reduceByKey(_ + _)
val cached: RDD[(String, Int)] = sum_url.cache()
for (p <- project) {
val fileter_project: RDD[(String, Int)] = cached.filter(_._1.startsWith(p))
val res: Array[(String, Int)] = fileter_project.sortBy(_._2,false).take(3)
println(res.toBuffer)
}
sc.stop()
}
}
为了解决数据倾斜,自定义了分区,每个学科一个分区
/** 注意pom中的包,数据库的字符编码集
* 统计用户对每个学科各个模块的访问次数,取top3
* 自定义分区器 按照每种学科的数据放到不同分区里
*/
object ObjectCount03 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("Object").setAppName("")
val sc = new SparkContext(conf)
val file: RDD[String] = sc.textFile("")
val project = Array("android", "javaee", "ui")
val url_map: RDD[(String, Int)] = file.map(x => {
val lines = x.split("\t")
val time = lines(0)
val url = lines(1)
(url, 1)
})
val sum_url: RDD[(String, Int)] = url_map.reduceByKey(_ + _)
val cached_project = sum_url.map(x => {
val url = x._1
val project = new URL(url).getHost
val count = x._2
(project, (url, count))
}).cache()
val projects: Array[String] = cached_project.keys.distinct().collect()
val res: RDD[(String, (String, Int))] = cached_project.partitionBy(new ProjectPartitioner(projects))
val res2: RDD[(String, (String, Int))] = res.mapPartitions(it => {
it.toList.sortBy(_._2._2).reverse.take(3).iterator
})
res2.saveAsTextFile("")
sc.stop()
}
}
class ProjectPartitioner(projects: Array[String]) extends Partitioner {
private val project_parnum: mutable.HashMap[String, Int] = new mutable.HashMap[String, Int]()
var n = 0
for (p <- projects) {
project_parnum += ((p, n))
n += 1
}
override def numPartitions: Int = {
projects.size
}
override def getPartition(key: Any): Int = {
project_parnum.getOrElse(key.toString,0)
}
}
ip统计每个省的流量排名,广播变量,二分查找,ip2long,结果存入mysql
object IPSearch {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Master")
val sc = new SparkContext(conf)
val ipInfo: RDD[(String, String, String)] = sc.textFile("D://ip.txt").map(x => {
val fields = x.split("\\|")
val startIP = fields(2)
val endIP = fields(3)
val province = fields(6)
(startIP, endIP, province)
})
val arrIpInfo: Array[(String, String, String)] = ipInfo.collect()
val broadcastIpInfo: Broadcast[Array[(String, String, String)]] = sc.broadcast(arrIpInfo)
val province_1: RDD[(String, Int)] = sc.textFile("D://http.log").map(lines => {
val fields = lines.split("\\|")
val ip = fields(1)
val ipToLong = ip2Long(ip)
val resIPInfo: Array[(String, String, String)] = broadcastIpInfo.value
val provice: String = resIPInfo(binarySearch(resIPInfo, ipToLong))._3
(provice, 1)
})
val res: RDD[(String, Int)] = province_1.reduceByKey(_ + _)
res.foreachPartition(data2Mysql)
sc.stop()
}
def ip2Long(ip: String): Long = {
val fragments = ip.split("[.]")
var ipNum = 0L
for (i <- 0 until fragments.length) {
ipNum = fragments(i).toLong | ipNum << 8L
}
ipNum
}
def binarySearch(arr: Array[(String, String, String)], ip: Long): Int = {
var low = 0
var high = arr.length
while (low < high) {
val middle = (low + high) / 2
if ((ip >= arr(middle)._1.toLong) && (ip <= arr(middle)._2.toLong)) {
return middle
}
if (ip < arr(middle)._1.toLong) {
high = middle - 1
} else {
low = middle + 1
}
}
-1
}
val data2Mysql = (it: Iterator[(String, Int)]) => {
var conn: Connection = null
var ps: PreparedStatement = null
val sql = "insert into location_info(location,counts,access_date) values(,,)"
try {
conn = DriverManager.getConnection("jdbc:mysql://192.168.216.53:3306/bigdatauseUnicode=true&characterEncoding=utf8", "root", "root")
it.foreach(line => {
ps = conn.prepareStatement(sql)
ps.setString(1, line._1)
ps.setInt(2, line._2)
ps.setDate(3, new Date(System.currentTimeMillis()))
ps.executeLargeUpdate()
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (ps != null)
ps.close()
if (conn != null)
conn.close()
}
}
}