[java] view plain copy
1. package cn.com.saidi.job; 2.
3. import org.apache.commons.io.FileUtils; 4. import org.pentaho.di.core.KettleEnvironment; 5. import org.pentaho.di.core.database.DatabaseMeta;
6. import org.pentaho.di.core.exception.KettleDatabaseException; 7. import org.pentaho.di.core.exception.KettleXMLException; 8. import org.pentaho.di.core.plugins.PluginRegistry; 9. import org.pentaho.di.core.plugins.StepPluginType; 10. import org.pentaho.di.trans.TransHopMeta; 11. import org.pentaho.di.trans.TransMeta; 12. import org.pentaho.di.trans.step.StepMeta;
13. import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta; 14. import org.pentaho.di.trans.steps.tableinput.TableInputMeta; 15.
16. import java.io.File; 17. 18. /**
19. * Created by 戴桥冰 on 2017/1/16. 20. */
21. public class TransDemo { 22.
23. public static TransDemo transDemo; 24. 25. /**
26. * 两个库中的表名 27. */
28. public static String bjdt_tablename = \"test1\"; 29. public static String kettle_tablename = \"test2\"; 30. 31. /**
32. * 数据库连接信息,适用于DatabaseMeta其中 一个构造器
DatabaseMeta(String xml) 33. */
34. public static final String[] databasesXML = { 35.
36. \"\" + 37. \" 39. \" 41. \" 42. \" 44. \"
47. \"\" + 48. \" 50. \" 53. \" 55. \"
61. public static void main(String[] args) { 62. try {
63. KettleEnvironment.init(); 64. transDemo = new TransDemo();
65. TransMeta transMeta = transDemo.generateMyOwnTrans(); 66. String transXml = transMeta.getXML();
67. String transName = \"etl/update_insert_Trans.ktr\"; 68. File file = new File(transName);
69. FileUtils.writeStringToFile(file, transXml, \"UTF-8\");
70. System.out.println(databasesXML.length+\"\\n\"+databasesXML[0]+\"\\n\"
+databasesXML[1]);
71. } catch (Exception e) { 72. e.printStackTrace(); 73. return; 74. } 75. } 76. 77. /**
78. * 生成一个转化,把一个数据库中的数据转移到另一个数据库中,只有两个步骤,第一个是
表输入,第二个是表插入与更新操作 79. * @return
80. * @throws KettleXMLException 81. */
82. public TransMeta generateMyOwnTrans() throws KettleXMLException, Kettl
eDatabaseException {
83. System.out.println(\"************start to generate my own transformat
ion***********\");
84. TransMeta transMeta = new TransMeta(); 85. //设置转化的名称
86. transMeta.setName(\"insert_update\"); 87. //添加转换的数据库连接
88. for (int i=0;i 92. //registry是给每个步骤生成一个标识Id用 93. PluginRegistry registry = PluginRegistry.getInstance(); 94. //第一个表输入步骤(TableInputMeta) 95. TableInputMeta tableInput = new TableInputMeta(); 96. String tableInputPluginId = registry.getPluginId(StepPluginType.cla ss, tableInput); 97. //给表输入添加一个DatabaseMeta连接数据库 98. DatabaseMeta database_bjdt = transMeta.findDatabase(\"bjdt\"); 99. tableInput.setDatabaseMeta(database_bjdt); 100. String select_sql = \"SELECT name FROM \"+bjdt_tablename; 101. tableInput.setSQL(select_sql); 102. 103. //添加TableInputMeta到转换中 104. StepMeta tableInputMetaStep = new StepMeta(tableInputPluginId,\"tab le input\",tableInput); 105. //给步骤添加在spoon工具中的显示位置 106. tableInputMetaStep.setDraw(true); 107. tableInputMetaStep.setLocation(100, 100); 108. transMeta.addStep(tableInputMetaStep); 109. 110. //第二个步骤插入与更新 111. InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta(); 112. String insertUpdateMetaPluginId = registry.getPluginId(StepPluginTy pe.class,insertUpdateMeta); 113. //添加数据库连接 114. DatabaseMeta database_kettle = transMeta.findDatabase(\"kettle\"); 115. insertUpdateMeta.setDatabaseMeta(database_kettle); 116. //设置操作的表 117. insertUpdateMeta.setTableName(kettle_tablename); 118. //设置用来查询的关键字 119. insertUpdateMeta.setKeyLookup(new String[]{\"name\"}); 120. insertUpdateMeta.setKeyStream(new String[]{\"name\"}); 121. insertUpdateMeta.setKeyStream2(new String[]{\"\"});//一定要加上 122. insertUpdateMeta.setKeyCondition(new String[]{\"=\"}); 123. 124. //设置要更新的字段 125. String[] updatelookup = {\"name\"} ; 126. 127. String [] updateStream = {\"name\"}; 128. Boolean[] updateOrNot = {true}; 129. insertUpdateMeta.setUpdateLookup(updatelookup); 130. insertUpdateMeta.setUpdateStream(updateStream); 131. insertUpdateMeta.setUpdate(updateOrNot); 132. String[] lookup = insertUpdateMeta.getUpdateLookup(); 133. //添加步骤到转换中 134. StepMeta insertUpdateStep = new StepMeta(insertUpdateMetaPluginId, \"insert_update\",insertUpdateMeta); 135. insertUpdateStep.setDraw(true); 136. insertUpdateStep.setLocation(250,100); 137. transMeta.addStep(insertUpdateStep); 138. //***************************************************************** * 139. 140. //***************************************************************** * 141. 142. //添加hop把两个步骤关联起来 143. transMeta.addTransHop(new TransHopMeta(tableInputMetaStep, insertU pdateStep)); 144. System.out.println(\"***********the end************\"); 145. return transMeta; 146. } 147. 148. } 上述操作将会产生一个ktr文件,接下来的操作是对ctr文件进行转换: [java] view plain copy 1. public static void main(String[] args) throws KettleException { 2. //初始化ketlle 3. KettleEnvironment.init(); 4. //创建转换元数据对象 5. TransMeta meta = new TransMeta(\"etl/update_insert_Trans.ktr\"); 6. Trans trans = new Trans(meta); 7. trans.prepareExecution(null); 8. trans.startThreads(); 9. trans.waitUntilFinished(); 10. if(trans.getErrors()!=0){ 11. System.out.println(\"执行失败!\"); 12. } 13. } 因篇幅问题不能全部显示,请点此查看更多更全内容