> 加20%,其它加 5%
2 def raisePay: FDATask[FDAROW] = row => { 3 row match { 4 case emp: Employee => { 5 val cur = emp.age match { 6 case a if ((a >= 20) && (a < 30)) => emp.copy(salary = emp.salary * 1.10) 7 case a if ((a >= 30)) => emp.copy(salary = emp.salary * 1.20) 8 case _ => emp.copy(salary = emp.salary * 1.05) 9 } 10 fda_next(cur) 11 } 12 case _ => fda_skip 13 } 14 }
用户提供的功能函数类型必须是FDATask[FDAROW]。类型参数FDAROW代表数据行通用类型。如果用户指定了FDATask[Employee]函数类型,那么必须保证管道中流动的数据行只有Employee一种类型。完成对当前行数据的处理后用fda_next(emp)把它发送到下一节连接管道。我们用下面的组合函数来进行运算:
Stream(r1,r2,r3,r4,r5,r6) .through(log("加薪前>")) .through(fda_execUserTask[FDAROW](raisePay)) .through(log("加薪后>")) .run.unsafeRun ----- 运算结果: 加薪前>> Employee(1,John,23,100.0) 加薪后>> Employee(1,John,23,110.00) 加薪前>> Employee(2,Peter,25,100.0) 加薪后>> Employee(2,Peter,25,110.00) 加薪前>> Employee(3,Kay,35,100.0) 加薪后>> Employee(3,Kay,35,120.00) 加薪前>> Employee(4,Cain,45,100.0) 加薪后>> Employee(4,Cain,45,120.00) 加薪前>> Employee(5,Catty,35,100.0) 加薪后>> Employee(5,Catty,35,120.00) 加薪前>> Employee(6,Little,19,80.0) 加薪后>> Employee(6,Little,19,84.000)
2、在一组数据行内根据每条数据状态进行筛选:
// 筛选40岁以上员工
def filter40: FDATask[FDAROW] = row => { row match { case emp: Employee => { if (emp.age > 40) Some(List(emp)) else fda_skip[Employee] } case _ => fda_break } } println("---------") Stream(r1,r2,r3,r4,r5,r6) .through(log("年龄>")) .through(fda_execUserTask[FDAROW](filter40)) .through(log("合格>")) .run.unsafeRun --- 运算结果: 年龄>> Employee(1,John,23,100.0) 年龄>> Employee(2,Peter,25,100.0) 年龄>> Employee(3,Kay,35,100.0) 年龄>> Employee(4,Cain,45,100.0) 合格>> Employee(4,Cain,45,100.0) 年龄>> Employee(5,Catty,35,100.0) 年龄>> Employee(6,Little,19,80.0) -
3、根据当前数据行状态终止作业:
1 // 浏览至第一个30岁以上员工,跳出
2 def stopOn30: FDATask[Employee] = emp => { 3 if (emp.age > 30) 4 fda_break 5 else
6 Some(List(emp)) 7 } 8 println("---------") 9 Stream(r1,r2,r3,r4,r5,r6) 10 .through(log("当前员工>")) 11 .through(fda_execUserTask[Employee](stopOn30)) 12 .through(log("选入名单>")) 13 .run.unsafeRun 14 ---
15 运算结果: 16 当前员工>> Employee(1,John,23,100.0) 17 选入名单>> Employee(1,John,23,100.0) 18 当前员工>> Employee(2,Peter,25,100.0) 19 选入名单>> Employee(2,Peter,25,100.0) 20 当前员工>> Employee(3,Kay,35,100.0)
在这个例子里用户指定了行类型统一为Employee。
我们还可以把多个功能串接起来。像下面这样把1和2两个功能连起来:
Stream(r1,r2,r3,r4,r5,r6) .through(log("加薪前>")) .through(fda_execUserTask[FDAROW](raisePay)) .through(log("加薪后>")) .through(log("年龄>")) .through(fda_execUserTask[FDAROW](filter40)) .through(log("合格>")) .run.unsafeRun --- 运算结果: 加薪前>> Employee(1,John,23,100.0) 加薪后>> Employee(1,John,23,110.00) 年龄>> Employee(1,John,23,110.00) 加薪前>> Employee(2,Peter,25,100.0) 加薪后>> Employee(2,Peter,25,110.00) 年龄>> Employee(2,Peter,25,110.00) 加薪前>> Employee(3,Kay,35,100.0) 加薪后>> Employee(3,Kay,35,120.00) 年龄>> Employee(3,Kay,35,120.00) 加薪前>> Employee(4,Cain,45,100.0) 加薪后>> Employee(4,Cain,45,120.00) 年龄>> Empl |