详解User Defined Java Class步骤(三)
kettle中的“user defined java class”步骤,也称UDJC步骤,从4.0版本就有,功能非常强大,无所不能;可以在其中写任意代码,却不影响效率。本文将详细介绍在不同场景中用示例展示如果使用该步骤,由于内容非常多,便于阅读方便,把内容分成三部分,请完整看完全部内容,示例代码在这里下载.
如果没有看第二部分,请先访问第二部分。
错误处理
udjc步骤支持kettle的错误处理特性,从udjc步骤拖动一个连接到空步骤,接收错误数据行,右击udjc步骤,选择”Defined Error Handing”(定义错误处理)。弹出界面可以配置错误步骤接收错误数据,其他一些选项和字段名称可以配置扩展错误信息,在udjc步骤中,通过调用putError()方法把错误数据转发的错误处理步骤。

public boolean processRow(StepMetaInterfacesmi, StepDataInterface sdi) throws KettleException
{
Object[]r = getRow();
?
if(r == null) {
setOutputDone();
returnfalse;
}
?
if (first){
first = false;
}
?
r= createOutputRow(r, data.outputRowMeta.size());
?
// Get the value from an input field
Long numerator = get(Fields.In, "numerator").getInteger(r);
Long denominator = get(Fields.In,"denominator").getInteger(r);
?
//avoid dividing by 0
if(denominator == 0){
//putErro is declared as follows:
//public void putError(RowMetaInterface rowMeta, Object[] row, long nrErrors,String errorDescriptions, String fieldNames, String errorCodes)
putError(data.outputRowMeta,r, 1, "Denominator must be different from 0","denominator", "DIV_0");
//get on with the next line
returntrue;
}
?
longinteger_division = numerator / denominator;
longremainder = numerator % denominator;
?
//write output fields
get(Fields.Out, "integer_division").setValue(r,Long.valueOf(integer_division));
get(Fields.Out, "remainder").setValue(r,Long.valueOf(remainder));
?
//Send the row on to the next step.
putRow(data.outputRowMeta, r);
?
returntrue;
}
访问数据库连接
如果udjc步骤需要实现一些和数据库相关的功能,那么可以使用kettle功能获取其数据库连接。下面示例中使用了kettle中定义的“TestDB”数据库连接。输入行有一个“table_name”字段,该步骤检查输入的表是否存在,并把结果写入的输出结果中。
如果需要在udjc步骤中实现一些和数据库相关的重要工作,最好对源码中的org.pentaho.di.core.database包内容比较熟悉,也可以查看和DB相关的步骤和示例代码,了解如何使用database包相关类的使用。

importorg.pentaho.di.core.database.Database;
importjava.util.List;
importjava.util.Arrays;
?
privateDatabase db = null;
privateFieldHelper outputField = null;
private FieldHelpertableField = null;
privateList existingTables = null;
?
publicboolean processRow(StepMetaInterface smi, StepDataInterface sdi) throwsKettleException
{
Object[] r = getRow();
?
if (r == null) {
setOutputDone();
return false;
}
?
if (first){
first = false;
existingTables =Arrays.asList(db.getTablenames());
tableField = get(Fields.In,"table_name");
outputField = get(Fields.Out,"table_exists");
}
?
r = createOutputRow(r,data.outputRowMeta.size());
?
if (existingTables.contains(tableField.getString(r))){
outputField.setValue(r, Long.valueOf(1));
}
else{
outputField.setValue(r,Long.valueOf(0));
}
?
// Send the row on to the next step.
putRow(data.outputRowMeta, r);
?
return true;
}?
public booleaninit(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface)
{
?
if (parent.initImpl(stepMetaInterface,stepDataInterface)){
?
try{
db = newDatabase(this.parent, getTransMeta().findDatabase("TestDB"));
db.shareVariablesWith(this.parent);
db.connect();
return true;
}
catch(KettleDatabaseException e){
logError("Errorconnecting to TestDB: "+ e.getMessage());
setErrors(1);
stopAll();
}
?
}
return false;
}
publicvoid dispose(StepMetaInterface smi, StepDataInterface sdi)
{
if (db != null) {
db.disconnect();
}
?