首页  >  文章  >  数据库  >  kettle api 执行转换

kettle api 执行转换

WBOY
WBOY原创
2016-06-07 15:10:511762浏览

导入java.io.DataOutputStream;导入java.io.File;导入 java.io.FileOutputStream;导入 java.util.Date;导入 be.ibridge.kettle.core.Const;导入 be.ibridge.kettle.core.LogWriter;导入 be.ibridge.kettle.core.NotePadMeta;导入b


导入 java.io.DataOutputStream;
导入 java.io.File;
导入 java.io.FileOutputStream;
导入 java.util.Date;

导入 be.ibridge.kettle.core.Const;
导入 be.ibridge.kettle.core.LogWriter;
导入 be.ibridge.kettle.core.NotePadMeta;
导入 be.ibridge。 Kettle.core.database.Database;
导入 be.ibridge.kettle.core.database.DatabaseMeta;
导入 be.ibridge.kettle.core.exception.KettleException;
导入 be.ibridge.kettle。 trans.StepLoader;
导入 be.ibridge.kettle.trans.Trans;
导入 be.ibridge.kettle.trans.TransHopMeta;
导入 be.ibridge.kettle.trans.TransMeta;
导入be.ibridge.kettle.trans.step.StepMeta;
导入 be.ibridge.kettle.trans.step.StepMetaInterface;
导入 be.ibridge.kettle.trans.step.selectvalues.SelectValuesMeta;
导入be.ibridge.kettle.trans.step.tableinput.TableInputMeta;
导入 be.ibridge.kettle.trans.step.tableoutput.TableOutputMeta;

/**
 *
 *

Title:
 * 本文描述了以下操作:

1)           建立一个新的转换(transformation)

2)           把转换(transformation)存储为XML文件

3)           生成需要在目标表运行的SQL语句

4)           执行转换(transformation)

5)           删除目标表,可以使测试程序可以反复执行(这一点可根据需要修改)。


 *

Description: TODO 类的功能描述


 *

Copyright: Copyright (c) 2003



 * @author 洪亮
 * @version 1.0
 *

------------------------------------------------------------


 *

修改历史


 *

  序号    日期       时间        修 改 人    修 改 原 因


 *

   1    2006-9-20   下午05:59:06     洪亮       创建 


 *
 */

public class TransBuilderME
{
    public static Final String[] datasetXML = {
        ""
        “”
            "目标名称>"
            "192.168.169.220服务器>"
            "<类型>ORACLE"
            "<访问>本机"
            "NMSDB数据库>"
            "1521端口>"
            "UCP用户名>"
            "UCP密码>"
          "连接>",
         
          ""
          “”
              "来源名称>"
              "192.168.169.220服务器>"
              "<类型>ORACLE"
              "<访问>本机"
              "NMSDB数据库>"
              "1521端口>"
              "UCP用户名>"
              "UCP密码>"
            "
    };

    /**
     * 使用输入参数(例如要读取的表名)创建新的转换。
     * @param conversionName 转换的名称
     * @param sourceDatabaseName 要读取的数据库的名称
     * @param sourceTableName 要读取的表的名称
     * @param sourceFields 我们要从源表读取的字段名称
     * @param targetDatabaseName 目标数据库的名称
     * @param targetTableName我们要写入的目标表的名称
     * @param targetFields 目标表中字段的名称(字段数量与sourceFields相同)
     * @return 一个新的转换
     * @throws KettleException在极少数情况下会出现问题
    */
    public static final TransMeta buildCopyTable(String conversionName, String sourceDatabaseName, String sourceTableName, String[] sourceFields, String targetDatabaseName, String targetTableName, String[] targetFields) throws KettleException
{
        LogWriter log = LogWriter.getInstance();
       
        try
        {
            //
            // 创建一个新转换...
            // 传输元信息
            TransMeta transMeta = new TransMeta();
            transMeta.setName(transformationName);//传输名称
           
            // 添加数据库连接
            for (int i=0 ;i            {
                DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]);//数据库元信息
                transMeta.addDatabase(databaseMeta);//传输元  中加入数据库元信息
            }
           
            DatabaseMeta sourceDBInfo = transMeta.findDatabase(sourceDatabaseName);//查找源数据库元信息
            DatabaseMeta targetDBInfo = transMeta.findDatabase(targetDatabaseName);//查找目标数据库元信息


//
//添加note
//
string note =“数据库[“ sourcedbinfo”]上的表[“ sourcetablename”]的信息读取信息。 🎜>            note = "之后,它将信息写入数据库 [" targetDBInfo "] 上的表 [" targetTableName "]";
            NotePadMeta ni = new NotePadMeta(note, 150, 10, -1, -1); // 注释信息
            transMeta.addNote(ni);

            //
            // create the source step...
            //
            String fromstepname = "read from [" sourceTableName "]";//from步骤名称
            TableInputMeta tii = new TableInputMeta();//表输入元数据信息
            tii.setDatabaseMeta(sourceDBInfo);//为表输入 指定 数据库
            String selectSQL = "SELECT " Const.CR;//拼接查询sql语句
            for (int i=0;i            {
                if (i>0) selectSQL =", "; else selectSQL ="  ";
                selectSQL =sourceFields[i] Const.CR;
            }
            selectSQL ="FROM " sourceTableName;
            tii.setSQL(selectSQL);//设置查询sql语句

            StepLoader steploader = StepLoader.getInstance();//???

            String fromstepid = steploader.getStepPluginID(tii);
            //步骤元数据信息
            StepMeta fromstep = new StepMeta(log, fromstepid, fromstepname, (StepMetaInterface) tii);
            fromstep.setLocation(150, 100);
            fromstep.setDraw(true);
            fromstep.setDescription("Reads information from table [" sourceTableName "] on database [" sourceDBInfo "]");
            //传输中 添加步骤
            transMeta.addStep(fromstep);
            //
            // add logic to rename fields
            // Use metadata logic in SelectValues, use SelectValueInfo...
            //选择字段(重命名)
            SelectValuesMeta svi = new SelectValuesMeta();
            svi.allocate(0, 0, sourceFields.length);
            for (int i = 0; i < sourceFields.length; i )
            {
             //设置源字段和目标字段
                svi.getMetaName()[i] = sourceFields[i];
                svi.getMetaRename()[i] = targetFields[i];
            }

            String selstepname = "Rename field names";
            //获取步骤插件ID
            String selstepid = steploader.getStepPluginID(svi);
            //创建步骤元数据信息
            StepMeta selstep = new StepMeta(log, selstepid, selstepname, (StepMetaInterface) svi);
            selstep.setLocation(350, 100);
            selstep.setDraw(true);
            selstep.setDescription("Rename field names");
            //添加步骤
            transMeta.addStep(selstep);

            //传输连接元数据信息(连接from和select)
            TransHopMeta shi = new TransHopMeta(fromstep, selstep);
            transMeta.addTransHop(shi);//添加到传输元对象
            fromstep = selstep;//然后设置from步骤为select步骤

            //
            // Create the target step...
            //
            //
            // Add the TableOutputMeta step...
            //设置目标步骤名称
            String tostepname = "write to [" targetTableName "]";
            //表输出元对象
            TableOutputMeta toi = new TableOutputMeta();
            toi.setDatabase(targetDBInfo);//设置数据库
            toi.setTablename(targetTableName);//设置表名
            toi.setCommitSize(3000);//设置批量提交数
            toi.setTruncateTable(true);//是否清除原有数据

            //获取步骤ID
            String tostepid = steploader.getStepPluginID(toi);
            //创建to步骤
            StepMeta tostep = new StepMeta(log, tostepid, tostepname, (StepMetaInterface) toi);
            tostep.setLocation(550, 100);
            tostep.setDraw(true);
            tostep.setDescription("Write information to table [" targetTableName "] on database [" targetDBInfo "]");
            transMeta.addStep(tostep);//添加步骤

            //
            // Add a hop between the two steps...
            //
            //创建连接 from--to
            TransHopMeta hi = new TransHopMeta(fromstep, tostep);
            transMeta.addTransHop(hi);

            // 好的,如果我们还在这里:覆盖当前转换...
            return transMeta;
        }
        catch (Exception e)
        {
            throw new KettleException ( “创建新转换时发生意外错误”,e);
        }
    }

    /**
     * 1) 创建新转换
     * 2) 将转换另存为 XML 文件
     * 3) 为目标表生成 SQL
     * 4) 执行转换
     * 5)删除目标表以使该程序可重复
     *
     * @param args
    */
    public static void main(String[] args) throws Exception
    {
        long start = new Date().getTime();
        //初始化日志记录...
        LogWriter log = LogWriter.getInstance("TransBuilder.log", true, LogWriter.LOG_LEVEL_DETAILED);
       
        // 加载 Kettle 步骤和插件
        StepLoader stloader = StepLoader .getInstance();
        if (!stloader.read())
        {
            log.logError("TransBuilder",  "加载 Kettle 步骤和插件时出错...立即停止!");
            return;
        }
       
        // 我们想要的参数,可选的可以是
        String fileName = "./NewTrans.xml";
        String conversionName = "Test Transformation";
        String sourceDatabaseName = "source";
        String sourceTableName = "emp_collect";
        String sourceFields[] = {
          "empno",      
          "ename",      
          “工作”,        
          "mgr",        
          "comm",       
          "sal",        
          "deptno",     
          “生日”   
 
            };

        String targetDatabaseName = "target";
        String targetTableName = "emp_kettle01";
        String targetFields[] = {
          "empno01",      
          "ename01 ",      
          “job01”,
          "mgr01",        
          "comm",       
          "sal",        
          "deptno",     
          “生日”
            };

       
        // 生成转换。
        //创建转换元对象
        TransMeta transMeta = TransBuilderME.buildCopyTable(
                conversionName,
                源数据库名称,
sourceTableName,
                sourceFields,
               targetDatabaseName,
               targetTableName,
                targetFields
                );
//        transMeta = new  TransMeta();
        // 将其保存为文件:
        // 传输元对象中获取XML,输出并
        String xml = transMeta.getXML();
        DataOutputStream dos = new DataOutputStream(new FileOutputStream(new File(fileName)));
        dos. write(xml.getBytes("UTF-8"));
        dos.close();
        System.out.println("已保存转换到文件:" fileName);

        // OK,生成目标表需要执行什么SQL?
        //获得sql语句,创建表语句
        String sql = transMeta.getSQLStatementsString();

        // Execute the SQL on the target table:
        //创建表
        Database targetDatabase = new Database(transMeta.findDatabase(targetDatabaseName));
        targetDatabase.connect();//连接数据库
        targetDatabase.execStatements(sql);//执行sql
       
        // Now execute the transformation...
        //执行传输任务
        Trans trans = new Trans(log, transMeta);
        trans.execute(null);
        trans.waitUntilFinished();//等待执行完毕
       
        // For testing/repeatability, we drop the target table again
//        targetDatabase.execStatement("drop table " targetTableName);
        targetDatabase.disconnect();//断开数据库连接
       
        long end = new Date().getTime();
        System.out.println("运行时间:" (end - start) / 1000 "秒");
        long min = (end - start) / 1000 / 60;
        long second = (end - start) / 1000 % 60;
        System.out.println("运行时间:" min "分钟" second "秒");
    }


}
 

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn