package com.bayakala.funda.fdapipes.examples
import fs2._
import com.bayakala.funda.fdapipes._
import FDANodes._
import FDAValves._
import Helpers._
object Example1 extends App {
case class Employee(id: Int, name: String, age: Int, salary: BigDecimal) extends FDAROW
// test data set
val r1 = Employee(1, "John", 23, 100.00)
val r2 = Employee(2, "Peter", 25,100.00)
val r3 = Employee(3, "Kay", 35,100.00)
val r4 = Employee(4, "Cain", 45,100.00)
val r5 = Employee(5, "Catty", 35,100.00)
val r6 = Employee(6, "Little", 19,80.00)
// 20 - 30岁加10%, 30岁> 加20%,其它加 5%
def raisePay: FDATask[FDAROW] = row => {
row match {
case emp: Employee => {
val cur = emp.age match {
case a if ((a >= 20) && (a < 30)) => emp.copy(salary = emp.salary * 1.10)
case a if ((a >= 30)) => emp.copy(salary = emp.salary * 1.20)
case _ => emp.copy(salary = emp.salary * 1.05)
}
fda_next(cur)
}
case _ => fda_skip
}
}
Stream(r1,r2,r3,r4,r5,r6)
.through(log("加薪前>"))
.through(fda_execUserTask[FDAROW](raisePay))
.through(log("加薪后>"))
.run.unsafeRun
// 筛选40岁以上员工
def filter40: FDATask[FDAROW] = row => {
row match {
case emp: Employee => {
if (emp.age > 40)
Some(List(emp))
else fda_skip[Employee]
}
case _ => fda_break
}
}
println("---------")
Stream(r1,r2,r3,r4,r5,r6)
.through(log("年龄>"))
.through(fda_execUserTask[FDAROW](filter40))
.through(log("合格>"))
.run.unsafeRun
// 浏览至第一个30岁以上员工,跳出
def stopOn30: FDATask[Employee] = emp => {
if (emp.age > 30)
fda_break
else
Some(List(emp))
}
println("---------")
Stream(r1,r2,r3,r4,r5,r6)
.through(log("当前员工>"))
.through(fda_execUserTask[Employee](stopOn30))
.through(log("选入名单>"))
.run.unsafeRun
println("---------")
Stream(r1,r2,r3,r4,r5,r6)
.through(log("加薪前>"))
.through(fda_execUserTask[FDAROW](raisePay))
.through(log("加薪后>"))
.through(log("年龄>"))
.through(fda_execUserTask[FDAROW](filter40))
.through(log("合格>"))
.run.unsafeRun
}