大数据上的流式SQL引擎—StreamCQL:http://www.csdn.net/article/2015-11-13/2826204
http://www.csdn.net/article/2015-12-22/2826541
Introduce
CQL(Continuous Query Language),持续查询语言,用于数据流上的查询语言。相对于传统的SQL,CQL加入了窗口的概念,使得数据可以一直保存在内存中,由此可以快速进行大量内存计算,CQL的输出结果为数据流在某一时刻的计算结果。
CQL是建立在Storm基础上的类SQL查询语言,它解决了Storm原生API使用复杂,上手难度高,很多基本功能缺失的问题,提升了流处理产品的易用性。
在CQL设计语法之初,通过参考市面上现有的CEP产品的语法,发现这些产品都不算是全部的SQL语句,即仅仅使用SQL语句还不能运行,还必须依靠一些客户端的代码。 这样就给使用带来了一些不便, 用户必须学习客户端API,比较繁琐,上手难度也比较大。
所以,CQL设计目标就是,用纯粹的SQL语句再加上一些命令,就可以完成所有的任务发布以及执行,这样,就可以通过SQL接口,直接进行任务的下发,统一了客户端接口。对于有一定SQL基础的用户,只需要掌握一些CQL比较特殊的语法,比如窗口或者流定义的语法,就可以写出可运行的CQL程序,大大降低了上手难度。
关键概念
Stream(流):流是一组(无穷)元素的集合,流上的每个元素都属于同一个schema;每个元素都和逻辑时间有关;即流包含了元组和时间的双重属性。留上的任何一个元素,都可以用Element的方式来表示,tuple是元组,包含了数据结构和数据内容,Time就是该数据的逻辑时间。
Window(窗口):窗口是流处理中解决事件的无边界(unbounded)及流动性的一种重要手段,把事件流在某一时刻变成静态的视图,以便进行类似数据库表的各种查询操作。在stream上可以定义window,窗口有两种类型,时间窗口(time-based)和记录窗口(row-based)。两种窗口都支持两种模式,滑动(slide)和跳动(tumble)。
Expression(表达式):符号和运算符的一种组合,CQL解析引擎处理该组合以获取单个值。简单表达式可以是常量、变量或者函数,可以用运算符将两个或者多个简单表达式联合起来构成更复杂的表达式。
QuickStart
1.startup zk and storm local
1
2
3
4
5
|
zkServer.sh start
nohup bin/storm nimbus &
nohup bin/storm ui &
nohup bin/storm supervisor &
|
2.build and run cql client
1
2
3
4
5
6
|
cd StreamCQL
mvn clean install
cd cql-binary/target
tar xvf stream-cql-bianry-1.0.tar.gz
cd stream-cql-bianry-1.0
bin/cql
|
3.create first topology:
1
2
3
4
5
6
7
8
9
10
11
|
CREATE INPUT STREAM s(id INT, name STRING, type INT)
SOURCE randomgen PROPERTIES ( timeUnit = "SECONDS", period = "1", eventNumPerperiod = "1", isSchedule = "true" );
CREATE OUTPUT STREAM rs(type INT, cc INT)
SINK consoleOutput;
INSERT INTO STREAM rs SELECT type, COUNT(id) as cc
FROM s[RANGE 20 SECONDS BATCH]
WHERE id > 5 GROUP BY type;
SUBMIT APPLICATION example;
|
输入流: 随机数,每秒生成一个事件
输出流: 控制台, 每隔20秒输出一次, 只统计id>5,根据type分组,求和
提交应用程序, 相当于创建了一个Storm的Topology.
4.A complicate topology:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
CREATE INPUT STREAM s1(name STRING)
SOURCE RANDOMGEN PROPERTIES ( timeUnit = "SECONDS", period = "1", eventNumPerperiod = "1", isSchedule = "true" );
CREATE OUTPUT STREAM s2(c1 STRING)
SINK kafakOutput PROPERTIES ( topic = "cqlOut", zookeepers = "127.0.0.1:2181", brokers = "127.0.0.1:9092" );
CREATE INPUT STREAM s3( c1 STRING)
SOURCE KafkaInput PROPERTIES (groupid = "cqlClient", topic = "cqlInput", zookeepers = "127.0.0.1:2181", brokers = "127.0.0.1:9092" )
CREATE OUTPUT STREAM s4(c1 STRING)
SINK consoleOutput;
INSERT INTO STREAM s2 SELECT * FROM s1;
INSERT INTO STREAM s4 SELECT * FROM s3;
SUBMIT APPLICATION cql_kafka_example;
|
输入流s1 发射数据 到kafka输出流s2
kafka输入流 发射数据 到控制台输出流s4
5.查看拓扑:http://localhost:8080
Architecture
StreamCQL的代码由三部分组成: cql,streaming,adapter分别对应下面的三个组件.
客户端提交的CQL语句
会由执行计划生成器ExecutorPlanGenerator
生成可运行的任务
,最终由Storm适配器
组装Topology提交执行.
StreamCQL对应的Storm拓扑:
至少有一个输入和输出. Component之间可以组合比如Select,Join等.
Window example:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
SELECT * FROM transformEvent[ROWS 10 SLIDE PARTITION BY TYPE];
SELECT * FROM transformEvent[RANGE 1000 MILLISECONDS SORT BY dte];
INSERT INTO STREAM rs sum(OrderPrice),avg(OrderPrice),count(OrderPrice)
FROM transformEvent[RANGE 10 SECONDS SLIDE TRIGGER by TS EXCLUDE now];
INSERT INTO STREAM rs select id,name,count(id)
FROM transformEvent[RANGE TODAY ts PARTITION BY TYPE]
WHERE id > 5 GROUP BY TYPE HAVING id > 10;
|
Split example:
1
2
3
4
5
|
FROM teststream
INSERT INTO STREAM s1 SELECT *
INSERT INTO STREAM s2 SELECT a
INSERT INTO STREAM s3 SELECT id, name WHERE id > 10
PRARLLEL 4;
|
原始的spout输入流会分成三个输出流. 所以中间用一个SplitBolt来作为中间介质.
From log to see StreamCQL
熟悉一个开源框架的流程, 可以先跑一个测试例子, 查看打印的日志信息, 通过日志的顺序, 可以大致熟悉整体的流程.
当然要求框架本身的日志信息足够明了, StreamCQL做的不错. 这种方式的优点是不至于不知道要从哪里看起来.
CQLClient to Driver
bin/cql
会开启一个CQLClient客户端,
当输入;
表示一个语句的终结时,就会触发一次CQL语句的编译执行等.
Driver.run是CQL的运行起点
-
1、编译
-
2、语义分析
-
3、命令执行
-
4、返回结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public void run(String cql) {
ParseContext parseContext = parser.parse(cql);
saveAllChangeableCQLs(cql, parseContext);
preDriverRun(context, parseContext);
executeTask(parseContext);
postDriverRun(context, parseContext);
}
private void executeTask(ParseContext parseContext) {
mergeConfs();
Task task = TaskFactory.createTask(context, parseContext, config, analyzeHooks);
task.execute(parseContext);
context.setQueryResult(task.getResult());
}
|
初始的语法解析类是ApplicationParser. parse方法采用visitor访问者模式遍历CQL语句. 感兴趣的可以进入CQLParser查看具体的解析过程.
1
2
3
4
5
6
|
IParser (com.huawei.streaming.cql.semanticanalyzer.parser)
|-- OrderbyClauseParser
|-- GroupbyClauseParser
|-- ApplicationParser
|-- SelectClauseParser
|-- DataSourceArgumentsParser
|
ParseContext的实现类很多,基本上CQL语法的每一部分都会对应一个语法解析器.
1
2
3
4
5
6
7
8
|
ParseContext (com.huawei.streaming.cql.semanticanalyzer.parser.context)
|
|
|
|
|
|
|
|
ApplicationParser.parse返回的ParseContext具体针对特定的CQL语句返回的是什么类型 这个类型对于创建什么类型的任务非常重要.
因为这个是创建一个新的Stream, 所以ParseContext是CreateInputStatementContext
.
1
2
3
4
5
6
7
|
2015-11-25 02:32:19 | INFO | [main] | start to parse cql : CREATE INPUT STREAM s
(id INT, name STRING, type INT)
SOURCE randomgen
PROPERTIES ( timeUnit = "SECONDS", period = "1",
eventNumPerperiod = "1", isSchedule = "true" ) | com.huawei.streaming.cql.semanticanalyzer.parser.ApplicationParser (ApplicationParser.java:44)
2015-11-25 02:32:19 | INFO | [main] | Parse Completed | com.huawei.streaming.cql.semanticanalyzer.parser.ApplicationParser (ApplicationParser.java:69)
2015-11-25 02:32:19 | INFO | [main] | start to execute CREATE INPUT STREAM s (id INT, name STRING, type INT) SOURCE randomgen PROPERTIES ( 'timeUnit' = 'SECONDS', 'period' = '1', 'eventNumPerperiod' = '1', 'isSchedule' = 'true' ) | com.huawei.streaming.cql.tasks.LazyTask (LazyTask.java:62)
|
CreateStreamStatementContext.createTask创建的是LazyTask. 它的execute方法只是把当前ParseContext加入到DriverContext的parseContexts中.
1
2
3
|
public void execute(ParseContext parseContext) {
context.addParseContext(parseContext);
}
|
同样输出流经过ApplicationParser.parse返回的是CreateOutputStatementContext
,它也继承了CreateStreamStatementContext.
1
2
3
4
5
|
2015-11-25 02:32:20 | INFO | [main] | start to parse cql : CREATE OUTPUT STREAM rs
(type INT, cc INT)
SINK consoleOutput | com.huawei.streaming.cql.semanticanalyzer.parser.ApplicationParser (ApplicationParser.java:44)
2015-11-25 02:32:20 | INFO | [main] | Parse Completed | com.huawei.streaming.cql.semanticanalyzer.parser.ApplicationParser (ApplicationParser.java:69)
2015-11-25 02:32:20 | INFO | [main] | start to execute CREATE OUTPUT STREAM rs (type INT, cc INT) SINK consoleOutput | com.huawei.streaming.cql.tasks.LazyTask (LazyTask.java:62)
|
insert语句是InsertStatementContext
,
输入输出插入这些都是延迟执行的任务,并不需要立即执行,因为需要根据上下文构造一个完整的DAG拓扑图.
1
2
3
4
5
|
2015-11-25 02:32:20 | INFO | [main] | start to parse cql : INSERT INTO STREAM rs SELECT type, COUNT(id) as cc
FROM s[RANGE 20 SECONDS BATCH]
WHERE id > 5 GROUP BY type | com.huawei.streaming.cql.semanticanalyzer.parser.ApplicationParser (ApplicationParser.java:44)
2015-11-25 02:32:20 | INFO | [main] | Parse Completed | com.huawei.streaming.cql.semanticanalyzer.parser.ApplicationParser (ApplicationParser.java:69)
2015-11-25 02:32:20 | INFO | [main] | start to execute INSERT INTO STREAM rs SELECT type, count(id) AS cc FROM s[RANGE 20 SECONDS BATCH] WHERE id > 5 GROUP BY type | com.huawei.streaming.cql.tasks.LazyTask (LazyTask.java:62)
|
Submit
提交应用程序,经过parse返回的是SubmitApplicationContext
,创建的Task是SubmitTask.
1
2
3
|
2015-11-25 02:32:23 | INFO | [main] | start to parse cql : SUBMIT APPLICATION example | com.huawei.streaming.cql.semanticanalyzer.parser.ApplicationParser (ApplicationParser.java:44)
2015-11-25 02:32:23 | INFO | [main] | Parse Completed | com.huawei.streaming.cql.semanticanalyzer.parser.ApplicationParser (ApplicationParser.java:69)
2015-11-25 02:32:24 | INFO | [main] | combine all split contexts | com.huawei.streaming.cql.builder.operatorsplitter.OperatorSplitter (OperatorCombiner.java:101)
|
Task & SemanticAnalyzer
可以看到对于前面的输入流,输出流,insert语句,并没有对应的Task实现类,所以它们都使用LazyTask.
Driver.executeTask会根据ParseContext具体的实现类由TaskFactory创建对应的Task. ParseContext抽象类除了创建Task,还会创建SemanticAnalyzer
1
2
3
4
5
|
public abstract Task createTask(DriverContext driverContext, List<SemanticAnalyzeHook> analyzeHooks) throws CQLException;
public abstract SemanticAnalyzer createAnalyzer() throws SemanticAnalyzerException;
|
比如SubmitApplicationContext创建的分析器是SubmitApplicationAnalyzer. CreateStreamStatementContext也是个抽象类,
有三个子类CreateInputStatementContext,CreateOutputStatementContext,CreatePipeStatementContext,它们创建的分析器分别是:
CreateInputStreamAnalyzer, CreateOutputStreamAnalyzer, CreatePipeStreamAnalyzer 它们都继承了CreateStreamAnalyzer.
1
2
3
4
5
6
7
8
9
10
11
|
--------
----
---
---
---
----
-- ---------
---
---
---
|
SemanticAnalyzer的创建方式和创建Task一样都是使用工厂类SemanticAnalyzerFactory
.
在创建完之后都调用了init初始化.
1
2
3
4
5
6
7
8
9
10
|
public static Task createTask(DriverContext driverContext, ParseContext parseContext, StreamingConfig config, List<SemanticAnalyzeHook> analyzeHooks) {
Task task = parseContext.createTask(driverContext, analyzeHooks);
task.init(driverContext, config, analyzeHooks);
return task;
}
public static SemanticAnalyzer createAnalyzer(ParseContext parseContext, List<Schema> schemas) {
SemanticAnalyzer analyzer = parseContext.createAnalyzer();
analyzer.init(schemas);
return analyzer;
}
|
SubmitTask
parseSubmit
SubmitTask执行应用程序提交的execute方法和前面的LazyTask有点复杂, 因为它要把前面创建的LazyTask都组合起来,组成一个完整的应用程序.
1
2
3
4
5
6
7
8
9
10
11
|
public void execute(ParseContext parseContext) {
parseSubmit(parseContext);
createApplication();
dropApplicationIfAllow();
submitApplication();
}
private void parseSubmit(ParseContext parseContext) {
SemanticAnalyzer analyzer = SemanticAnalyzerFactory.createAnalyzer(parseContext, EMPTY_SCHEMAS);
submitContext = (SubmitApplicationAnalyzeContext)analyzer.analyze();
}
|
这里面几个对象的创建关系是(Parser语法解析器->Context->Analyzer语义分析器->AnalyzerContext):
1
2
3
4
|
ApplicationParser.parse()
|
|
|
|
createApplication
创建Application,如果有路径的话,直接加载物理执行计划,否则创建一个API用的Application并设置到DriverContext中.
1
2
3
4
5
6
7
8
9
10
11
|
private Application createAPIApplication(String appName) {
Application app = null;
if (context.getApp() == null) {
semanticAnalyzerLazyContexts();
app = new ApplicationBuilder().build(appName, analyzeContexts, context);
} else {
app = context.getApp();
}
app.setApplicationId(appName);
return app;
}
|
创建APIApplication, 记得前面的那些LazyTask吗, 都要用语义分析分析一遍,对应的AnalyzeContext会被ApplicationBuilder用到.
因为LazyTask的execute方法只是简单地把当前的ParseContext实现类加入到DriverContext中,所以下面的for循环能从DriverContext获取出所有ParseContext.
1
2
3
4
5
6
7
8
9
|
private void semanticAnalyzerLazyContexts() {
for (ParseContext parseContext : context.getParseContexts()) {
preAnalyze(context, parseContext);
SemanticAnalyzer analyzer = SemanticAnalyzerFactory.createAnalyzer(parseContext, context.getSchemas());
AnalyzeContext analyzeContext = analyzer.analyze();
postAnalyze(context, analyzeContext, parseContext);
analyzeContexts.add(analyzeContext);
}
}
|
像SubmitTask一样都要先创建SemanticAnalyzer,然后调用analyze方法, 这些没有调用的方法都要调用. 验证了那句话:人在江湖漂,哪有不挨刀.该来的总是会来的
.
1
2
3
4
|
ApplicationParser.parse() --> CreateInputStatementContext|CreateOutputStatementContext : ParseContext
|--createTask: LazyTask
|--createAnalyzer: CreateInputStreamAnalyzer|CreateOutputStreamAnalyzer
|--createAnalyzeContext: CreateStreamAnalyzeContext
|
注意Input,Output的StatementContext,SemanticAnalyzer都有各自的实现类,并都继承了Stream的相关父类,但是AnalyzeContext没有各自的实现类,都是一样的了.
ApplicationBuilder
ApplicationBuilder的构建需要每个分析器分析的结果AnalyzeContext(parseContexts): 专门用来完成从多个解析内容到应用程序的转换
buildApplication()将整个应用程序的构建分成: 1、各个算子的构建; 2、将完成拆分的应用程序解析成为Application
1
2
3
4
5
6
7
8
9
10
|
public Application build(String appName, List<AnalyzeContext> parContexts, DriverContext driverContext) {
this.applicationName = appName;
this.parseContexts = parContexts;
executeLogicOptimizer();
buildApplication();
executePhysicOptimizer();
parseDriverContext(driverContext);
return app;
}
|
逻辑计划包含的功能:
1、SQL语句的重写,比如将where中的聚合filter调整到having中等等
2、count(a+b),count(*),count(a) 的优化,全部改成count(1)
3、Join的调整,将不等值Join改为Innerjoin
4、将where条件中的等值表达式提升到On上面去。
物理优化器的优化内容:
1、OrderBy优化,实现sorted-merge排序。
2、limit优化,上一个算子中加入limit。
3、算子替换,将功能比较简单的算子,替换为Filter算子或者functor算子
4、移除无意义的filter算子
逻辑计划和物理计划中间的步骤是构建Application. 在这里才开始new一个Application.
1
2
3
4
5
6
7
8
9
10
|
private void buildApplication() {
app = new Application(applicationName);
parseSchemas();
List<SplitContext> splitContexts = splitOperators();
SplitContext splitContext = combineOperators(splitContexts);
changeUnionOperators(splitContext);
changeSchemaAfterAggregate(splitContext);
app.setOperators(splitContext.getOperators());
app.setOpTransition(splitContext.getTransitions());
}
|
预告: SemanticAnalyzer.analyze()和具体的CQL语法相关,下一篇我们就来看看CQL的语法和语义解析是怎么工作的.