导入java.io.DataOutputStream;导入java.io.File;导入 java.io.FileOutputStream;导入 java.util.Date;导入 be.ibridge.kettle.core.Const;导入 be.ibridge.kettle.core.LogWriter;导入 be.ibridge.kettle.core.NotePadMeta;导入b
导入 java.io.DataOutputStream;
导入 java.io.File;
导入 java.io.FileOutputStream;
导入 java.util.Date;
导入 be.ibridge.kettle.core.Const;
导入 be.ibridge.kettle.core.LogWriter;
导入 be.ibridge.kettle.core.NotePadMeta;
导入 be.ibridge。 Kettle.core.database.Database;
导入 be.ibridge.kettle.core.database.DatabaseMeta;
导入 be.ibridge.kettle.core.exception.KettleException;
导入 be.ibridge.kettle。 trans.StepLoader;
导入 be.ibridge.kettle.trans.Trans;
导入 be.ibridge.kettle.trans.TransHopMeta;
导入 be.ibridge.kettle.trans.TransMeta;
导入be.ibridge.kettle.trans.step.StepMeta;
导入 be.ibridge.kettle.trans.step.StepMetaInterface;
导入 be.ibridge.kettle.trans.step.selectvalues.SelectValuesMeta;
导入be.ibridge.kettle.trans.step.tableinput.TableInputMeta;
导入 be.ibridge.kettle.trans.step.tableoutput.TableOutputMeta;
/**
*
*
Title:
* 本文描述了以下操作:
1) 建立一个新的转换(transformation)
2) 把转换(transformation)存储为XML文件
3) 生成需要在目标表运行的SQL语句
4) 执行转换(transformation)
5) 删除目标表,可以使测试程序可以反复执行(这一点可根据需要修改)。
Description: TODO 类的功能描述
Copyright: Copyright (c) 2003
------------------------------------------------------------
修改历史
序号 日期 时间 修 改 人 修 改 原 因
1 2006-9-20 下午05:59:06 洪亮 创建
public class TransBuilderME
{
public static Final String[] datasetXML = {
""
“”
"目标名称>"
"192.168.169.220服务器>"
"<类型>ORACLE类型>"
"<访问>本机访问>"
"NMSDB数据库>"
"1521端口>"
"UCP用户名>"
"UCP密码>"
"连接>",
""
“”
"来源名称>"
"192.168.169.220服务器>"
"<类型>ORACLE类型>"
"<访问>本机访问>"
"NMSDB数据库>"
"1521端口>"
"UCP用户名>"
"UCP密码>"
"连接>"
};
/**
* 使用输入参数(例如要读取的表名)创建新的转换。
* @param conversionName 转换的名称
* @param sourceDatabaseName 要读取的数据库的名称
* @param sourceTableName 要读取的表的名称
* @param sourceFields 我们要从源表读取的字段名称
* @param targetDatabaseName 目标数据库的名称
* @param targetTableName我们要写入的目标表的名称
* @param targetFields 目标表中字段的名称(字段数量与sourceFields相同)
* @return 一个新的转换
* @throws KettleException在极少数情况下会出现问题
*/
public static final TransMeta buildCopyTable(String conversionName, String sourceDatabaseName, String sourceTableName, String[] sourceFields, String targetDatabaseName, String targetTableName, String[] targetFields) throws KettleException
{
LogWriter log = LogWriter.getInstance();
try
{
//
// 创建一个新转换...
// 传输元信息
TransMeta transMeta = new TransMeta();
transMeta.setName(transformationName);//传输名称
// 添加数据库连接
for (int i=0 ;i {
DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]);//数据库元信息
transMeta.addDatabase(databaseMeta);//传输元 中加入数据库元信息
}
DatabaseMeta sourceDBInfo = transMeta.findDatabase(sourceDatabaseName);//查找源数据库元信息
DatabaseMeta targetDBInfo = transMeta.findDatabase(targetDatabaseName);//查找目标数据库元信息
//
//添加note
//
string note =“数据库[“ sourcedbinfo”]上的表[“ sourcetablename”]的信息读取信息。 🎜> note = "之后,它将信息写入数据库 [" targetDBInfo "] 上的表 [" targetTableName "]";
NotePadMeta ni = new NotePadMeta(note, 150, 10, -1, -1); // 注释信息
transMeta.addNote(ni);
//
// create the source step...
//
String fromstepname = "read from [" sourceTableName "]";//from步骤名称
TableInputMeta tii = new TableInputMeta();//表输入元数据信息
tii.setDatabaseMeta(sourceDBInfo);//为表输入 指定 数据库
String selectSQL = "SELECT " Const.CR;//拼接查询sql语句
for (int i=0;i
if (i>0) selectSQL =", "; else selectSQL =" ";
selectSQL =sourceFields[i] Const.CR;
}
selectSQL ="FROM " sourceTableName;
tii.setSQL(selectSQL);//设置查询sql语句
StepLoader steploader = StepLoader.getInstance();//???
String fromstepid = steploader.getStepPluginID(tii);
//步骤元数据信息
StepMeta fromstep = new StepMeta(log, fromstepid, fromstepname, (StepMetaInterface) tii);
fromstep.setLocation(150, 100);
fromstep.setDraw(true);
fromstep.setDescription("Reads information from table [" sourceTableName "] on database [" sourceDBInfo "]");
//传输中 添加步骤
transMeta.addStep(fromstep);
//
// add logic to rename fields
// Use metadata logic in SelectValues, use SelectValueInfo...
//选择字段(重命名)
SelectValuesMeta svi = new SelectValuesMeta();
svi.allocate(0, 0, sourceFields.length);
for (int i = 0; i < sourceFields.length; i )
{
//设置源字段和目标字段
svi.getMetaName()[i] = sourceFields[i];
svi.getMetaRename()[i] = targetFields[i];
}
String selstepname = "Rename field names";
//获取步骤插件ID
String selstepid = steploader.getStepPluginID(svi);
//创建步骤元数据信息
StepMeta selstep = new StepMeta(log, selstepid, selstepname, (StepMetaInterface) svi);
selstep.setLocation(350, 100);
selstep.setDraw(true);
selstep.setDescription("Rename field names");
//添加步骤
transMeta.addStep(selstep);
//传输连接元数据信息(连接from和select)
TransHopMeta shi = new TransHopMeta(fromstep, selstep);
transMeta.addTransHop(shi);//添加到传输元对象
fromstep = selstep;//然后设置from步骤为select步骤
//
// Create the target step...
//
//
// Add the TableOutputMeta step...
//设置目标步骤名称
String tostepname = "write to [" targetTableName "]";
//表输出元对象
TableOutputMeta toi = new TableOutputMeta();
toi.setDatabase(targetDBInfo);//设置数据库
toi.setTablename(targetTableName);//设置表名
toi.setCommitSize(3000);//设置批量提交数
toi.setTruncateTable(true);//是否清除原有数据
//获取步骤ID
String tostepid = steploader.getStepPluginID(toi);
//创建to步骤
StepMeta tostep = new StepMeta(log, tostepid, tostepname, (StepMetaInterface) toi);
tostep.setLocation(550, 100);
tostep.setDraw(true);
tostep.setDescription("Write information to table [" targetTableName "] on database [" targetDBInfo "]");
transMeta.addStep(tostep);//添加步骤
//
// Add a hop between the two steps...
//
//创建连接 from--to
TransHopMeta hi = new TransHopMeta(fromstep, tostep);
transMeta.addTransHop(hi);
// 好的,如果我们还在这里:覆盖当前转换...
return transMeta;
}
catch (Exception e)
{
throw new KettleException ( “创建新转换时发生意外错误”,e);
}
}
/**
* 1) 创建新转换
* 2) 将转换另存为 XML 文件
* 3) 为目标表生成 SQL
* 4) 执行转换
* 5)删除目标表以使该程序可重复
*
* @param args
*/
public static void main(String[] args) throws Exception
{
long start = new Date().getTime();
//初始化日志记录...
LogWriter log = LogWriter.getInstance("TransBuilder.log", true, LogWriter.LOG_LEVEL_DETAILED);
// 加载 Kettle 步骤和插件
StepLoader stloader = StepLoader .getInstance();
if (!stloader.read())
{
log.logError("TransBuilder", "加载 Kettle 步骤和插件时出错...立即停止!");
return;
}
// 我们想要的参数,可选的可以是
String fileName = "./NewTrans.xml";
String conversionName = "Test Transformation";
String sourceDatabaseName = "source";
String sourceTableName = "emp_collect";
String sourceFields[] = {
"empno",
"ename",
“工作”,
"mgr",
"comm",
"sal",
"deptno",
“生日”
};
String targetDatabaseName = "target";
String targetTableName = "emp_kettle01";
String targetFields[] = {
"empno01",
"ename01 ",
“job01”,
"mgr01",
"comm",
"sal",
"deptno",
“生日”
};
// 生成转换。
//创建转换元对象
TransMeta transMeta = TransBuilderME.buildCopyTable(
conversionName,
源数据库名称,
sourceTableName,
sourceFields,
targetDatabaseName,
targetTableName,
targetFields
);
// transMeta = new TransMeta();
// 将其保存为文件:
// 传输元对象中获取XML,输出并
String xml = transMeta.getXML();
DataOutputStream dos = new DataOutputStream(new FileOutputStream(new File(fileName)));
dos. write(xml.getBytes("UTF-8"));
dos.close();
System.out.println("已保存转换到文件:" fileName);
// OK,生成目标表需要执行什么SQL?
//获得sql语句,创建表语句
String sql = transMeta.getSQLStatementsString();
// Execute the SQL on the target table:
//创建表
Database targetDatabase = new Database(transMeta.findDatabase(targetDatabaseName));
targetDatabase.connect();//连接数据库
targetDatabase.execStatements(sql);//执行sql
// Now execute the transformation...
//执行传输任务
Trans trans = new Trans(log, transMeta);
trans.execute(null);
trans.waitUntilFinished();//等待执行完毕
// For testing/repeatability, we drop the target table again
// targetDatabase.execStatement("drop table " targetTableName);
targetDatabase.disconnect();//断开数据库连接
long end = new Date().getTime();
System.out.println("运行时间:" (end - start) / 1000 "秒");
long min = (end - start) / 1000 / 60;
long second = (end - start) / 1000 % 60;
System.out.println("运行时间:" min "分钟" second "秒");
}
}