设为首页 加入收藏

TOP

Kafka+Fink 实战+工具类(一)
2023-08-26 21:10:57 】 浏览:70
Tags:Kafka Fink 实战
  • LogServiceImpl
@Service
@Slf4j
public class LogServiceImpl implements LogService {

    private static final String TOPIC_NAME = "ods_link_visit_topic";

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 记录日志
     *
     * @param request
     * @param shortLinkCode
     * @param accountNo
     * @return
     */
    @Override
    public void recodeShortLinkLog(HttpServletRequest request, String shortLinkCode, Long accountNo) {
        // ip、 浏览器信息
        String ip = CommonUtil.getIpAddr(request);
        // 全部请求头
        Map<String, String> headerMap = CommonUtil.getAllRequestHeader(request);

        Map<String,String> availableMap = new HashMap<>();
        availableMap.put("user-agent",headerMap.get("user-agent"));
        availableMap.put("referer",headerMap.get("referer"));
        availableMap.put("accountNo",accountNo.toString());

        LogRecord logRecord = LogRecord.builder()
                //日志类型
                .event(LogTypeEnum.SHORT_LINK_TYPE.name())
                //日志内容
                .data(availableMap)
                //客户端ip
                .ip(ip)
                // 时间
                .ts(CommonUtil.getCurrentTimestamp())
                //业务唯一标识(短链码)
                .bizId(shortLinkCode).build();

        String jsonLog = JsonUtil.obj2Json(logRecord);

        //打印日志 in 控制台
        log.info(jsonLog);

        // 发送kafka
        kafkaTemplate.send(TOPIC_NAME,jsonLog);


    }
}

  • DwdShortLinkLogApp
@Slf4j
public class DwdShortLinkLogApp {
    //定义 topic
    public static final String SOURCE_TOPIC = "ods_link_visit_topic";

    //定义 消费组
    public static final String SINK_TOPIC = "dwd_link_visit_topic";

    //定义 消费组
    public static final String GROUP_ID = "dwd_short_link_group";


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

//        DataStream<String> ds = env.socketTextStream("192.168.75.146", 8888);

        FlinkKafkaConsumer<String> kafkaConsumer = KafkaUtil.getKafkaConsumer(SOURCE_TOPIC, GROUP_ID);

        DataStreamSource<String> ds = env.addSource(kafkaConsumer);

        ds.print();

        SingleOutputStreamOperator<JSONObject> jsonDs = ds.flatMap(new FlatMapFunction<String, JSONObject>() {

            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                JSONObject jsonObject = JSON.parseObject(value);
                // 生成web端设备唯一标识
                String udid = getDeviceId(jsonObject);
                jsonObject.put("udid",udid);

                String referer = getReferer(jsonObject);
                jsonObject.put("referer",referer);

                out.collect(jsonObject);

            }
        });

        // 分组
        KeyedStream<JSONObject, String> keyedStream = jsonDs.keyBy(new KeySelector<JSONObject, String>() {

            @Override
            public String getKey(JSONObject value) throws Exception {
                return value.getString("udid");

            }
        });


        // 识别新老访客    richMap open函数,对状态以及日期格式进行初始化

        SingleOutputStreamOperator<String> jsonDSWithVisitorState = keyedStream.map(new VisitorMapFunction());

        jsonDSWithVisitorState.print("ods新老访客");

        // 存储到dwd
        FlinkKafkaProducer<String> kafkaProducer = KafkaUtil.getKafkaProducer(SINK_TOPIC);

        jsonDSWithVisitorState.addSink(kafkaProducer);


        env.execute();
    }

    /**
     * 获取referer
     * @param jsonObject
     * @return
     */
    public static String getReferer(JSONObject jsonObject){
        JSONObject da
首页 上一页 1 2 3 下一页 尾页 1/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇配置Redis哨兵集群所遇到的问题 下一篇SpringBoot3.x原生镜像-Native Im..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目