首頁 >資料庫 >mysql教程 >详解kettle之UserDefinedJavaClass步骤(三)

详解kettle之UserDefinedJavaClass步骤(三)

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB原創
2016-06-07 16:02:342071瀏覽

详解User Defined Java Class步骤(三) kettle中的user defined java class步骤,也称UDJC步骤,从4.0版本就有,功能非常强大,无所不能;可以在其中写任意代码,却不影响效率。本文将详细介绍在不同场景中用示例展示如果使用该步骤,由于内容非常多,便于

详解User Defined Java Class步骤(三)

kettle中的“user defined java class”步骤,也称UDJC步骤,从4.0版本就有,功能非常强大,无所不能;可以在其中写任意代码,却不影响效率。本文将详细介绍在不同场景中用示例展示如果使用该步骤,由于内容非常多,便于阅读方便,把内容分成三部分,请完整看完全部内容,示例代码在这里下载.

如果没有看第二部分,请先访问第二部分。

错误处理

udjc步骤支持kettle的错误处理特性,从udjc步骤拖动一个连接到空步骤,接收错误数据行,右击udjc步骤,选择”Defined Error Handing”(定义错误处理)。弹出界面可以配置错误步骤接收错误数据,其他一些选项和字段名称可以配置扩展错误信息,在udjc步骤中,通过调用putError()方法把错误数据转发的错误处理步骤。

\

public boolean processRow(StepMetaInterfacesmi, StepDataInterface sdi) throws KettleException

{

Object[]r = getRow();

 

if(r == null) {

setOutputDone();

returnfalse;

}

 

if (first){

first = false;

}

 

r= createOutputRow(r, data.outputRowMeta.size());

 

// Get the value from an input field

Long numerator = get(Fields.In, "numerator").getInteger(r);

Long denominator = get(Fields.In,"denominator").getInteger(r);

 

//avoid dividing by 0

if(denominator == 0){

//putErro is declared as follows:

//public void putError(RowMetaInterface rowMeta, Object[] row, long nrErrors,String errorDescriptions, String fieldNames, String errorCodes)

putError(data.outputRowMeta,r, 1, "Denominator must be different from 0","denominator", "DIV_0");

//get on with the next line

returntrue;

}

 

longinteger_division = numerator / denominator;

longremainder = numerator % denominator;

 

//write output fields

get(Fields.Out, "integer_division").setValue(r,Long.valueOf(integer_division));

get(Fields.Out, "remainder").setValue(r,Long.valueOf(remainder));

 

//Send the row on to the next step.

putRow(data.outputRowMeta, r);

 

returntrue;

}

访问数据库连接

如果udjc步骤需要实现一些和数据库相关的功能,那么可以使用kettle功能获取其数据库连接。下面示例中使用了kettle中定义的“TestDB”数据库连接。输入行有一个“table_name”字段,该步骤检查输入的表是否存在,并把结果写入的输出结果中。

如果需要在udjc步骤中实现一些和数据库相关的重要工作,最好对源码中的org.pentaho.di.core.database包内容比较熟悉,也可以查看和DB相关的步骤和示例代码,了解如何使用database包相关类的使用。

\

importorg.pentaho.di.core.database.Database;

importjava.util.List;

importjava.util.Arrays;

 

privateDatabase db = null;

privateFieldHelper outputField = null;

private FieldHelpertableField = null;

privateList existingTables = null;

 

publicboolean processRow(StepMetaInterface smi, StepDataInterface sdi) throwsKettleException

{

Object[] r = getRow();

 

if (r == null) {

setOutputDone();

return false;

}

 

if (first){

first = false;

existingTables =Arrays.asList(db.getTablenames());

tableField = get(Fields.In,"table_name");

outputField = get(Fields.Out,"table_exists");

}

 

r = createOutputRow(r,data.outputRowMeta.size());

 

if (existingTables.contains(tableField.getString(r))){

outputField.setValue(r, Long.valueOf(1));

}

else{

outputField.setValue(r,Long.valueOf(0));

}

 

// Send the row on to the next step.

putRow(data.outputRowMeta, r);

 

return true;

public booleaninit(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface)

{

 

if (parent.initImpl(stepMetaInterface,stepDataInterface)){

 

try{

db = newDatabase(this.parent, getTransMeta().findDatabase("TestDB"));

db.shareVariablesWith(this.parent);

db.connect();

return true;

}

catch(KettleDatabaseException e){

logError("Errorconnecting to TestDB: "+ e.getMessage());

setErrors(1);

stopAll();

}

 

}

return false;

}

publicvoid dispose(StepMetaInterface smi, StepDataInterface sdi)

{

if (db != null) {

db.disconnect();

}

 

parent.disposeImpl(smi, sdi);

}

示例udjc步骤中的重写了init()和dispose()方法,分别实现创建数据库连接和完成后断开连接。在转换初始化的时候,第一次执行processRow()之前调用init()方法。转换执行完成之后调用dispose()方法。如果有首先要初始化的工作以及一些清理资源代码,就考虑分别放在init和dispose方法中。示例转换的名称:db_access.ktr。

实现输入步骤

有时udjc步骤本身就是输入步骤,这时其自己生成输入行,而无需其他的输入行步骤。下面示例展示生成java的系统属性列表作为输入行。

\

代码如下:

import java.util.*;

private ArrayList keys = null;

private int idx = 0;

public boolean processRow(StepMetaInterfacesmi, StepDataInterface sdi) throws KettleException

{

if(first){

first= false;

//get the system property names, output is done one at a time later

keys= Collections.list(System.getProperties().propertyNames());

idx= 0;

}

if(idx >= keys.size()) {

setOutputDone();

returnfalse;

}

//create a row

Object[]r = RowDataUtil.allocateRowData(data.outputRowMeta.size());

// Set key and value in a new output row

get(Fields.Out, "key").setValue(r, keys.get(idx));

get(Fields.Out,"value").setValue(r,System.getProperties().get(keys.get(idx)));

idx++;

//Send the row on to the next step.

putRow(data.outputRowMeta, r);

returntrue;

}

在代码中没有调用getRow方法获取输入行,而是第一次调用processRow方法是初始化java系统属性列表。这些属性被逐个写入到输出流中。因为没有输入行,代码通过RowDataUtil.allocateRowData()方法创建,然后设置字段值并传输到下一步骤中。示例转换的名称input_step.ktr。

总结

本文详细说明了udjc步骤在不同场景的使用方式。如果你需要自定义处理功能,但是javascript步骤实现不灵活或性能不够,这时可以考虑使用udjc步骤代替。为了学习更多的内容,我们也可以查看sample目录下的关于udjc的示例。

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn