设为首页 加入收藏

TOP

基于mysql日志+maxwell+kafka+SparkStreaming的数据过滤框架实例讲解(二)
2018-04-14 06:06:21 】 浏览:189
Tags:基于 mysql日志 maxwell kafka SparkStreaming 数据 过滤 框架 实例 讲解
; // 将数据拆分成所需数据,进行处理 JavaPairDStream line = lines.mapToPair(new PairFunction () { @Override public Tuple2 call(String t) throws Exception { System.out.println(t); JSONParser jsonParser = new JSONParser(JSONParser.DEFAULT_PERMISSIVE_MODE); JSONObject map=(JSONObject)jsonParser.parse(t); if (map.containsKey("commit")) { boolean commit = Boolean.valueOf(map.get("commit").toString()); if (map.get("table").toString().equals("stang_bid") && map.get("type").toString().equals("insert") && commit) { String data = map.get("data").toString().trim(); JSONObject mapdata = (JSONObject) jsonParser.parse(data); if (mapdata.get("author").toString().equals("中国采购与招标网")) { ProducerRecord msg = new ProducerRecord (producerTopic, t); procuder.send(msg); } else if (mapdata.get("title") != null && mapdata.get("info") != null) { int id = Integer.parseInt(mapdata.get("id").toString()); String title = mapdata.get("title").toString(); title = ScreenOfTitle(title); String info = mapdata.get("info").toString(); Document d = Jsoup.parse(info); info = d.text(); ResultSet rs = null; int result = 0; if (title != null) { for (String key : keys) { if (title.contains(key) | info.contains(key)) { if (pattern1.matcher(title).find() | pattern1.matcher(info).find()) { pstmt = conn.prepareStatement(sql); pstmt.setInt(1, 1); pstmt.setInt(2, 1); pstmt.setInt(3, id); result = pstmt.executeUpdate(); break; } else if (pattern2.matcher(title).find() | pattern2.matcher(info).find()) { pstmt = conn.prepareStatement(sql); pstmt.setInt(1, 2); pstmt.setInt(2, 1); pstmt.setInt(3, id); result = pstmt.executeUpdate(); break; } else if (pattern3.matcher(title).find() | pattern3.matcher(info).find()) { pstmt = conn.prepareStatement(sql); pstmt.setInt(1, 3); pstmt.setInt(2, 1); pstmt.setInt(3, id); result = pstmt.executeUpdate(); break; } else if (pattern4.matcher(title).find() | pattern4.matcher(info).find()) { pstmt = conn.prepareStatement(sql); pstmt.setInt(1, 4); pstmt.setInt(2, 1); pstmt.setInt(3, id); result = pstmt.executeUpdate(); break; } else if (pattern5.matcher(title).find()) { pstmt = conn.prepareStatement(sql); pstmt.setInt(1, 5); pstmt.setInt(2, 1); pstmt.setInt(3, id); result = pstmt.executeUpdate(); break; } else if (pattern6.matcher(title).find()) { pstmt = conn.prepareStatement(sql); pstmt.setInt(1, 6); pstmt.setInt(2, 1); pstmt.setInt(3, id); result = pstmt.executeUpdate(); break; } else if (pattern7.matcher(title).find()) { pstmt = conn.prepareStatement(sql); pstmt.setInt(1, 7); pstmt.setInt(2, 1); pstmt.setInt(3, id); result = pstmt.executeUpdate(); break; } } i++; } } total++; } } } return new Tuple2 (total, i); } }); line.print(); jscc.start(); jscc.awaitTermination(); }

spark的学习之路还很长,希望大家一起努力,共同进步

首页 上一页 1 2 下一页 尾页 2/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇linux系统部署mongodb数据库步骤.. 下一篇SQL语句遇到的一次错误及解决办法..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目