设为首页 加入收藏

TOP

flume修改sqlsource以针对时间戳增量数据传输
2019-05-03 14:07:53 】 浏览:73
Tags:flume 修改 sqlsource 针对 时间 增量 数据传输

flume修改sqlsource以针对时间戳增量数据传输

flume github关于增量数据传输的原理,是通过唯一id,递增,每次记录传输的数据量+current_index=last_index,不符合我们此次项目没有增量id的情况。
由于数据存在时间戳标志,因此改写flume sqlsource以应对实际需求:

  1. 每次增量传输先查询数据库中当前最大的时间戳,记录为maxtime
  2. 查询数据库:select * from table where time>=current_index and time<maxtime,此时不能取到time=maxtime的数据,不排除在数据查询之后会继续生成maxtime的新数据,则会出现数据遗漏
  3. 增量数据操作完成,将current_index=maxtime,写入状态表

SQLSourceHelper增加以下两段代码:

//增加取数据库最大值的代码
public String maxQuery() {
    return "SELECT max(" + time + ") FROM " + table;
  }

//增量查询oracle语句
  public String buildQuery(String maxTime) {

    if (customQuery == null) {
      return "SELECT " + columnsToSelect + " FROM " + table + " " +
             "WHERE "+ time + ">=to_date('" + currentIndex + "','yyyy-mm-dd hh24:mi:ss') AND " + time + "<to_date('" + maxTime + "','yyyy-mm-dd hh24:mi:ss') " +
              "order by "+time+" asc";
    } else {
      if (customQuery.contains("$@$")) {
        return customQuery.replace("$@$", currentIndex) ;
      } else {
        return customQuery ;
      }
    }
  }

HibernateHelper修改executeQuery方法:

public List<List<Object>> executeQuery() throws InterruptedException, ParseException {
		
		List<List<Object>> rowsList = new ArrayList<List<Object>>() ;
		Query query;
		if (!session.isConnected()){
			resetConnection();
		}
				

		String sql = sqlSourceHelper.maxQuery();
		LOG.info("sql "+sql);
		List<List<Object>> max = session.createSQLQuery(sql).setResultTransformer(Transformers.TO_LIST).list();
		String maxtime = max.get(0).get(0).toString().substring(0,19);
		query = session.createSQLQuery(sqlSourceHelper.buildQuery(maxtime));		
		try {
			rowsList = query.setResultTransformer(Transformers.TO_LIST).list();
			LOG.info("Current time is "+sqlSourceHelper.getCurrentIndex()+",and lasttime is "+maxtime);
			LOG.info("Records count: "+rowsList.size());
		}catch (Exception e){
			LOG.error("Exception thrown, resetting connection.",e);
			resetConnection();
		}
		sqlSourceHelper.setCurrentIndex(maxtime);
		return rowsList;
	}
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Flume (五) Channel Selectors 下一篇大数据收集层常用技术-Sqoop、Flu..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目