; // 将数据拆分成所需数据,进行处理 JavaPairDStream
line = lines.mapToPair(new PairFunction
() { @Override public Tuple2
call(String t) throws Exception { System.out.println(t); JSONParser jsonParser = new JSONParser(JSONParser.DEFAULT_PERMISSIVE_MODE); JSONObject map=(JSONObject)jsonParser.parse(t); if (map.containsKey("commit")) { boolean commit = Boolean.valueOf(map.get("commit").toString()); if (map.get("table").toString().equals("stang_bid") && map.get("type").toString().equals("insert") && commit) { String data = map.get("data").toString().trim(); JSONObject mapdata = (JSONObject) jsonParser.parse(data); if (mapdata.get("author").toString().equals("中国采购与招标网")) { ProducerRecord
msg = new ProducerRecord
(producerTopic, t); procuder.send(msg); } else if (mapdata.get("title") != null && mapdata.get("info") != null) { int id = Integer.parseInt(mapdata.get("id").toString()); String title = mapdata.get("title").toString(); title = ScreenOfTitle(title); String info = mapdata.get("info").toString(); Document d = Jsoup.parse(info); info = d.text(); ResultSet rs = null; int result = 0; if (title != null) { for (String key : keys) { if (title.contains(key) | info.contains(key)) { if (pattern1.matcher(title).find() | pattern1.matcher(info).find()) { pstmt = conn.prepareStatement(sql); pstmt.setInt(1, 1); pstmt.setInt(2, 1); pstmt.setInt(3, id); result = pstmt.executeUpdate(); break; } else if (pattern2.matcher(title).find() | pattern2.matcher(info).find()) { pstmt = conn.prepareStatement(sql); pstmt.setInt(1, 2); pstmt.setInt(2, 1); pstmt.setInt(3, id); result = pstmt.executeUpdate(); break; } else if (pattern3.matcher(title).find() | pattern3.matcher(info).find()) { pstmt = conn.prepareStatement(sql); pstmt.setInt(1, 3); pstmt.setInt(2, 1); pstmt.setInt(3, id); result = pstmt.executeUpdate(); break; } else if (pattern4.matcher(title).find() | pattern4.matcher(info).find()) { pstmt = conn.prepareStatement(sql); pstmt.setInt(1, 4); pstmt.setInt(2, 1); pstmt.setInt(3, id); result = pstmt.executeUpdate(); break; } else if (pattern5.matcher(title).find()) { pstmt = conn.prepareStatement(sql); pstmt.setInt(1, 5); pstmt.setInt(2, 1); pstmt.setInt(3, id); result = pstmt.executeUpdate(); break; } else if (pattern6.matcher(title).find()) { pstmt = conn.prepareStatement(sql); pstmt.setInt(1, 6); pstmt.setInt(2, 1); pstmt.setInt(3, id); result = pstmt.executeUpdate(); break; } else if (pattern7.matcher(title).find()) { pstmt = conn.prepareStatement(sql); pstmt.setInt(1, 7); pstmt.setInt(2, 1); pstmt.setInt(3, id); result = pstmt.executeUpdate(); break; } } i++; } } total++; } } } return new Tuple2
(total, i); } }); line.print(); jscc.start(); jscc.awaitTermination(); }
spark的学习之路还很长,希望大家一起努力,共同进步
|