最近要对一个系统的数据同步到另一个系统中,要求新系统的数据结果完成之后,实时同步到另一个系统数据表中。
Java调用Kettle执行任务或转换,需要使用Kettle中的jar,可以先导入lib目录中的几个基本的jar,如:kettle-core.jar、kettle-db.jar、kettle-engine.jar ,其它jar根据情况进行添加,所需的jar在<kettle-home>\lib、<kettle-home>\libext下面都可以找到,本示例引用的jar如下图:
也就是动态的传一个关联的ID。由于旧系统是vb做的,无法提供webservice接口,并且同步的表涉及到十几张表,并且两个系统表结构完全不一样,所以想到了kettle。
虽然网上文章有说Java可以传递参数给kettle,不过只找到了传递参数给转换的文章,没有讲参数传递给job,kettle中如何使用java传递的参数。今天就以上问题,一并共享。
import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.util.EnvUtil; import org.pentaho.di.job.Job; import org.pentaho.di.job.JobMeta; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; public class tests { /** * 本测试类慎用!!!!!!! * * @param args */ public static void main(String[] args) { String datetime = "2014-12-19 23:20:45"; String[] params = {"707", datetime}; // 传递参数 String path = "E:\\job.kjb"; for (int i = 0; i < 3; i++) { params[0] += i; runJob(params, path); } } /** * 运行转换文件方法 * @param params 多个参数变量值 * @param ktrPath 转换文件的路径,后缀ktr */ public static void runTransfer(String[] params, String ktrPath) { Trans trans = null; try { // // 初始化 // 转换元对象 //KettleEnvironment.init();// 初始化 EnvUtil.environmentInit(); TransMeta transMeta = new TransMeta(ktrPath); // 转换 trans = new Trans(transMeta); // 执行转换 trans.execute(params); // 等待转换执行结束 trans.waitUntilFinished(); // 抛出异常 if (trans.getErrors() > 0) { throw new Exception( "There are errors during transformation exception!(传输过程中发生异常)"); } } catch (Exception e) { e.printStackTrace(); } } /** * java 调用 kettle 的job * * @param jobname * 如: String fName= "D:\\kettle\\informix_to_am_4.ktr"; */ public static void runJob(String[] params, String jobPath) { try { KettleEnvironment.init(); // jobname 是Job脚本的路径及名称 JobMeta jobMeta = new JobMeta(jobPath, null); Job job = new Job(null, jobMeta); // 向Job 脚本传递参数,脚本中获取参数值:${参数名} // job.setVariable(paraname, paravalue); job.setVariable("id", params[0]); System.err.println(params[0]+"=========="+params[1]); job.setVariable("dt", params[1]); job.start(); job.waitUntilFinished(); if (job.getErrors() > 0) { throw new Exception( "There are errors during job exception!(执行job发生异常)"); } } catch (Exception e) { e.printStackTrace(); } } }
下面就是如何使用java传递的参数了。
转换的网上有例子,转换工作台,打开输入,找到“获取系统信息”
进行变量定义,选择命令参数1。。。n即可
就可以引用我示例中的1,2参数了。
但是我们的迁移工作是要按顺序执行的,调用转换不够用,需要job来定义执行顺序,
比如上面的转换作为第一步,操作完进行其他步骤,那么在这个基础上,可以画一个job
这时候问题来了,我们要活得java的数据,同时这个job需要把参数传递给test2转换使用。
其实很简单,点开test2,切换到参数选型,将java定义的参数写进去,记住带{}
这里的参数指的是“位置参数”
这样就大功搞成了。java已经能够顺利的将值传递给job,job可以顺利的将值传递给转换。
点击run this job 在variable中定义参数名称跟java传递的参数一致,可以写值进行测试。
注意事项:任务和转换要存成文件格式,任务中引用的转换也要是文件格式,
否则就都需要数据库支撑,数据库方法调用了。