设为首页 加入收藏

TOP

spark的一些小项目
2019-04-04 01:06:32 】 浏览:45
Tags:spark 些小 项目
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/metooman/article/details/78775918

sparkwordcount

object SparkWC {
  def main(args: Array[String]): Unit = {
    //创建配置信息类,并设置应用的程序名称
    //local [2] 本地启用2个线程模拟集群运行任务
    //local [*] 本地有多少空线程就启用多少线程来运行任务
    //提交到集群运行时setmaster注释掉
    val conf =new SparkConf().setAppName("SparkWC").setMaster("local[*]")

    //创建spark的上下文对象  也是提交任务的入口
    val sc =new SparkContext(conf)

    //读取数据   传入数据地址 也可以通过arg传入
    val lines=sc.textFile(args(0))
    //处理数据
    val words: RDD[String] = lines.flatMap(_.split(" "))
    val res: RDD[(String, Int)] = words.map((_,1)).reduceByKey(_+_).sortBy(_._2,false)
    //保存数据
    res.saveAsTextFile(args(1))
    println(res.collect.toBuffer)
    sc.stop()

  }
}

sparkwordcount java

public class JavaWC {
    public static void main(String[] args) {
        final SparkConf jconf = new SparkConf().setAppName("JavaWC").setMaster("local[*]");
        final JavaSparkContext jsc = new JavaSparkContext(jconf);
        final JavaRDD<String> lines = jsc.textFile(args[0]);

        //切分数据
        final JavaRDD<String> word = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" "));
            }
        });
        //封装成元祖
        final JavaPairRDD<String, Integer> pairRDD = word.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        //聚合
        final JavaPairRDD<String, Integer> reduced = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        //java语言没有提供sortby方法,因此需要把pairRDD里的两个值互换,然后再调用sortbykey来进行排序,排序后再把两值互换回来
        final JavaPairRDD<Integer, String> swaped = reduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2.swap();
            }
        });
        // false 降序排序,
        final JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
        //再反转
        final JavaPairRDD<String, Integer> res = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
                return integerStringTuple2.swap();
            }
        });
        System.out.println(res.collect().toArray());
        jsc.stop();
    }
}

基站定位 家庭与工作地址

需求是在一定时间内,手机在哪个基站附近停留的时间最长
a.log 中是  手机号,进入基站的时间,基站id,事件类型(进出)
loc_info.txt 中是经纬度与,基站id
object MobileLocation {
  def main(args: Array[String]): Unit = {
    val conf =new SparkConf().setAppName("MobileLocation").setMaster("local[2]")
    val sc=new SparkContext(conf)
    val file: RDD[String] = sc.textFile("C://a.log")
    val phoneAndLocAndTime: RDD[((String, String), Long)] = file.map(line => {
      val fields = line.split(",")
      val phone = fields(0)
      val time = fields(1).toLong
      val loc = fields(2)
      val eventType = fields(3).toInt
      val time_lone = if (eventType == 1) -time else time
      ((phone, loc), time_lone)
    })
    //用户在基站停留时间的总和
    val sum: RDD[((String, String), Long)] = phoneAndLocAndTime.reduceByKey(_+_)
    //用户在某一个基站停留时间,需要与经纬度对接
    val loc_phone_timesum: RDD[(String, (String, Long))] = sum.map(x => {
      val phone = x._1._1
      val loc = x._1._2
      val timesum = x._2
      (loc, (phone, timesum))
    })
    //读取基站经纬度
    val locInfo=sc.textFile("C://loc_info.txt")
    val loc_xy: RDD[(String, (String, String))] = locInfo.map(line => {
      val fields = line.split(",")
      val loc = fields(0)
      val x = fields(1)
      val y = fields(2)
      (loc, (x, y))
    })
    //用户在基站停留时间 加上经纬度
    val join: RDD[(String, ((String, Long), (String, String)))] = loc_phone_timesum.join(loc_xy)
    //按手机号进行分组并按停留的时间,排序 取top2
    val res: RDD[(String, List[(String, Long, (String, String))])] = join.map(x => {
      val phone = x._2._1._1
      val loc = x._1
      val time = x._2._1._2
      val locxy = x._2._2
      (phone, time, locxy)
    }).groupBy(_._1).mapValues(_.toList.sortBy(_._2).reverse.take(2))
    sc.stop()
  }
}

请求访问日志统计 每个模块的访问次数

/**  文件 格式是  日期  请求url
  *  20161123101523  http://java.learn.com/java/javaee.shtml
  * 统计用户对每个学科各个模块的访问次数,取top3
  */
object ObjectCount01 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("Object").setAppName("")
    val sc =new SparkContext(conf)
    val file: RDD[String] = sc.textFile("")

    //切分数据,取出url生成元祖
    val url_map: RDD[(String, Int)] = file.map(x => {
      val lines = x.split("\t")
      val time = lines(0)
      val url = lines(1)
      (url, 1)
    })
    //相同的url聚合
    val sum_url: RDD[(String, Int)] = url_map.reduceByKey(_+_)
    //获取学科信息
    val project_url_sum: RDD[(String, String, Int)] = sum_url.map(x => {
      val url = x._1
      val count = x._2
      val project = new URL(url).getHost
      (project, url, count)
    })
    //以学科信息分组,整合,取前3
    val res=project_url_sum.groupBy(_._1).mapValues(_.toList.sortBy(_._3).reverse.take(3))
    println(res.collect.toBuffer)
    sc.stop()
  }
}

对上个项目的改进 ,加缓存

/**
  * 统计用户对每个学科各个模块的访问次数,取top3
  */
object ObjectCount02 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("Object").setAppName("")
    val sc = new SparkContext(conf)
    val file: RDD[String] = sc.textFile("")
    val project = Array("...android...", "http://java.learn.com", "...ui...") 省略写了
    //切分数据,取出url生成元祖
    val url_map: RDD[(String, Int)] = file.map(x => {
      val lines = x.split("\t")
      val time = lines(0)
      val url = lines(1)
      (url, 1)
    })
    //相同的url聚合
    val sum_url: RDD[(String, Int)] = url_map.reduceByKey(_ + _)

    //cache   将经常用到的结果数据或者shuffle以后的数据 往往先缓存起来,
    //1、便于以后快捷访问
    //2、增加数据的安全性
    val cached: RDD[(String, Int)] = sum_url.cache()
    //过滤掉非 project中的学科
    for (p <- project) {
      val fileter_project: RDD[(String, Int)] = cached.filter(_._1.startsWith(p))
      val res: Array[(String, Int)] = fileter_project.sortBy(_._2,false).take(3)
      println(res.toBuffer)
    }
    sc.stop()
  }
}

为了解决数据倾斜,自定义了分区,每个学科一个分区

/** 注意pom中的包,数据库的字符编码集
  * 统计用户对每个学科各个模块的访问次数,取top3
  * 自定义分区器  按照每种学科的数据放到不同分区里
  */
object ObjectCount03 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("Object").setAppName("")
    val sc = new SparkContext(conf)
    val file: RDD[String] = sc.textFile("")
    val project = Array("android", "javaee", "ui")
    //切分数据,取出url生成元祖
    val url_map: RDD[(String, Int)] = file.map(x => {
      val lines = x.split("\t")
      val time = lines(0)
      val url = lines(1)
      (url, 1)
    })
    //相同的url聚合
    val sum_url: RDD[(String, Int)] = url_map.reduceByKey(_ + _)
    //获取学科信息并缓存
    val cached_project = sum_url.map(x => {
      val url = x._1
      val project = new URL(url).getHost
      val count = x._2
      (project, (url, count))
    }).cache()


    //调用spark默认分区器  保存结果
    // 调用默认分区器时,会发生哈希碰撞
//    val res: RDD[(String, (String, Int))] = cached_project.partitionBy(new HashPartitioner(3))
//    res.saveAsTextFile("C://b")

    //得到所有学科
    val projects: Array[String] = cached_project.keys.distinct().collect()
    val res: RDD[(String, (String, Int))] = cached_project.partitionBy(new ProjectPartitioner(projects))
    //对每个分区进行排序 得到top3
    val res2: RDD[(String, (String, Int))] = res.mapPartitions(it => {
      it.toList.sortBy(_._2._2).reverse.take(3).iterator
    })
    res2.saveAsTextFile("")
    sc.stop()
  }
}

//自定义分区器
class ProjectPartitioner(projects: Array[String]) extends Partitioner {
  //用来存储学科与分区号
  private val project_parnum: mutable.HashMap[String, Int] = new mutable.HashMap[String, Int]()
  //计数器,用来生成分区号
  var n = 0
  for (p <- projects) {
    project_parnum += ((p, n))
    n += 1
  }

  //获取分区数
  override def numPartitions: Int = {
    projects.size
  }

  //获取分区号
  override def getPartition(key: Any): Int = {
    project_parnum.getOrElse(key.toString,0)
  }
}

ip统计每个省的流量排名,广播变量,二分查找,ip2long,结果存入mysql

object IPSearch {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("Master")
    val sc = new SparkContext(conf)

    //获取ip基础数据
    val ipInfo: RDD[(String, String, String)] = sc.textFile("D://ip.txt").map(x => {
      val fields = x.split("\\|")
      val startIP = fields(2) //起始IP
      val endIP = fields(3) //结束IP
      val province = fields(6) //省份
      (startIP, endIP, province)
    })
    // 在广播前将RDD 计算出来  Action类型
    val arrIpInfo: Array[(String, String, String)] = ipInfo.collect()
    //优化,集群中将需要的基础数据广播到相应的executor中,要注意RDD是不是懒类型
    val broadcastIpInfo: Broadcast[Array[(String, String, String)]] = sc.broadcast(arrIpInfo)
    //获取用户点击流的日志
    val province_1: RDD[(String, Int)] = sc.textFile("D://http.log").map(lines => {
      val fields = lines.split("\\|")
      val ip = fields(1) //ip
      val ipToLong = ip2Long(ip)
      val resIPInfo: Array[(String, String, String)] = broadcastIpInfo.value
      //拿到广播变量的数据
      val provice: String = resIPInfo(binarySearch(resIPInfo, ipToLong))._3
      (provice, 1)
    })
    val res: RDD[(String, Int)] = province_1.reduceByKey(_ + _)
    //    println(res.collect().toBuffer)
    res.foreachPartition(data2Mysql)
    sc.stop()
  }

  //ip 转Long类型
  def ip2Long(ip: String): Long = {
    val fragments = ip.split("[.]")
    var ipNum = 0L
    for (i <- 0 until fragments.length) {
      ipNum = fragments(i).toLong | ipNum << 8L
    }
    ipNum
  }

  //二分查找
  def binarySearch(arr: Array[(String, String, String)], ip: Long): Int = {
    var low = 0
    var high = arr.length
    while (low < high) {
      val middle = (low + high) / 2
      if ((ip >= arr(middle)._1.toLong) && (ip <= arr(middle)._2.toLong)) {
        return middle
      }
      if (ip < arr(middle)._1.toLong) {
        high = middle - 1
      } else {
        low = middle + 1
      }
    }
    -1
  }

  val data2Mysql = (it: Iterator[(String, Int)]) => {
    var conn: Connection = null
    var ps: PreparedStatement = null

    val sql = "insert into location_info(location,counts,access_date) values(,,)"
    try {
      conn = DriverManager.getConnection("jdbc:mysql://192.168.216.53:3306/bigdatauseUnicode=true&characterEncoding=utf8", "root", "root")
      it.foreach(line => {
        ps = conn.prepareStatement(sql)
        ps.setString(1, line._1)
        ps.setInt(2, line._2)
        ps.setDate(3, new Date(System.currentTimeMillis()))
        ps.executeLargeUpdate()
      })
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (ps != null)
        ps.close()
      if (conn != null)
        conn.close()
    }
  }
}
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇spark学习及环境配置 下一篇Spark实战 | Kafka与Spark Stream..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目