您的当前位置:首页正文

ETL之kettle进行二次开发简单demo

2022-09-05 来源:意榕旅游网
demo,利用kettle的api,将一个数据源中的信息导入到另外一个数据源中:

[java] view plain copy

1. package cn.com.saidi.job; 2.

3. import org.apache.commons.io.FileUtils; 4. import org.pentaho.di.core.KettleEnvironment; 5. import org.pentaho.di.core.database.DatabaseMeta;

6. import org.pentaho.di.core.exception.KettleDatabaseException; 7. import org.pentaho.di.core.exception.KettleXMLException; 8. import org.pentaho.di.core.plugins.PluginRegistry; 9. import org.pentaho.di.core.plugins.StepPluginType; 10. import org.pentaho.di.trans.TransHopMeta; 11. import org.pentaho.di.trans.TransMeta; 12. import org.pentaho.di.trans.step.StepMeta;

13. import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta; 14. import org.pentaho.di.trans.steps.tableinput.TableInputMeta; 15.

16. import java.io.File; 17. 18. /**

19. * Created by 戴桥冰 on 2017/1/16. 20. */

21. public class TransDemo { 22.

23. public static TransDemo transDemo; 24. 25. /**

26. * 两个库中的表名 27. */

28. public static String bjdt_tablename = \"test1\"; 29. public static String kettle_tablename = \"test2\"; 30. 31. /**

32. * 数据库连接信息,适用于DatabaseMeta其中 一个构造器

DatabaseMeta(String xml) 33. */

34. public static final String[] databasesXML = { 35.

36. \"\" + 37. \"\" + 38. \"bjdt\" +

39. \"192.168.1.122\" + 40. \"Mysql\" +

41. \"Native\" +

42. \"daiqiaobing\" + 43. \"3306\" +

44. \"root\" + 45. \"root\" + 46. \"\",

47. \"\" + 48. \"\" + 49. \"kettle\" +

50. \"192.168.1.122\" + 51. \"Mysql\" + 52. \"Native\" +

53. \"daiqiaobing\" + 54. \"3306\" +

55. \"root\" + 56. \"root\" + 57. \"\" 58. 59. }; 60.

61. public static void main(String[] args) { 62. try {

63. KettleEnvironment.init(); 64. transDemo = new TransDemo();

65. TransMeta transMeta = transDemo.generateMyOwnTrans(); 66. String transXml = transMeta.getXML();

67. String transName = \"etl/update_insert_Trans.ktr\"; 68. File file = new File(transName);

69. FileUtils.writeStringToFile(file, transXml, \"UTF-8\");

70. System.out.println(databasesXML.length+\"\\n\"+databasesXML[0]+\"\\n\"

+databasesXML[1]);

71. } catch (Exception e) { 72. e.printStackTrace(); 73. return; 74. } 75. } 76. 77. /**

78. * 生成一个转化,把一个数据库中的数据转移到另一个数据库中,只有两个步骤,第一个是

表输入,第二个是表插入与更新操作 79. * @return

80. * @throws KettleXMLException 81. */

82. public TransMeta generateMyOwnTrans() throws KettleXMLException, Kettl

eDatabaseException {

83. System.out.println(\"************start to generate my own transformat

ion***********\");

84. TransMeta transMeta = new TransMeta(); 85. //设置转化的名称

86. transMeta.setName(\"insert_update\"); 87. //添加转换的数据库连接

88. for (int i=0;i89. DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]); 90. transMeta.addDatabase(databaseMeta); 91. }

92. //registry是给每个步骤生成一个标识Id用

93. PluginRegistry registry = PluginRegistry.getInstance(); 94. //第一个表输入步骤(TableInputMeta)

95. TableInputMeta tableInput = new TableInputMeta();

96. String tableInputPluginId = registry.getPluginId(StepPluginType.cla

ss, tableInput);

97. //给表输入添加一个DatabaseMeta连接数据库

98. DatabaseMeta database_bjdt = transMeta.findDatabase(\"bjdt\"); 99. tableInput.setDatabaseMeta(database_bjdt);

100. String select_sql = \"SELECT name FROM \"+bjdt_tablename; 101. tableInput.setSQL(select_sql); 102.

103. //添加TableInputMeta到转换中

104. StepMeta tableInputMetaStep = new StepMeta(tableInputPluginId,\"tab

le input\",tableInput);

105. //给步骤添加在spoon工具中的显示位置 106. tableInputMetaStep.setDraw(true);

107. tableInputMetaStep.setLocation(100, 100); 108. transMeta.addStep(tableInputMetaStep); 109.

110. //第二个步骤插入与更新

111. InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta(); 112. String insertUpdateMetaPluginId = registry.getPluginId(StepPluginTy

pe.class,insertUpdateMeta); 113. //添加数据库连接

114. DatabaseMeta database_kettle = transMeta.findDatabase(\"kettle\"); 115. insertUpdateMeta.setDatabaseMeta(database_kettle); 116. //设置操作的表

117. insertUpdateMeta.setTableName(kettle_tablename); 118. //设置用来查询的关键字

119. insertUpdateMeta.setKeyLookup(new String[]{\"name\"}); 120. insertUpdateMeta.setKeyStream(new String[]{\"name\"});

121. insertUpdateMeta.setKeyStream2(new String[]{\"\"});//一定要加上 122. insertUpdateMeta.setKeyCondition(new String[]{\"=\"}); 123.

124. //设置要更新的字段

125. String[] updatelookup = {\"name\"} ; 126.

127. String [] updateStream = {\"name\"}; 128. Boolean[] updateOrNot = {true};

129. insertUpdateMeta.setUpdateLookup(updatelookup); 130. insertUpdateMeta.setUpdateStream(updateStream); 131. insertUpdateMeta.setUpdate(updateOrNot);

132. String[] lookup = insertUpdateMeta.getUpdateLookup(); 133. //添加步骤到转换中

134. StepMeta insertUpdateStep = new StepMeta(insertUpdateMetaPluginId,

\"insert_update\",insertUpdateMeta); 135. insertUpdateStep.setDraw(true); 136. insertUpdateStep.setLocation(250,100); 137. transMeta.addStep(insertUpdateStep);

138. //*****************************************************************

* 139.

140. //*****************************************************************

* 141.

142. //添加hop把两个步骤关联起来

143. transMeta.addTransHop(new TransHopMeta(tableInputMetaStep, insertU

pdateStep));

144. System.out.println(\"***********the end************\"); 145. return transMeta; 146. } 147. 148. }

上述操作将会产生一个ktr文件,接下来的操作是对ctr文件进行转换:

[java] view plain copy

1. public static void main(String[] args) throws KettleException { 2. //初始化ketlle

3. KettleEnvironment.init();

4. //创建转换元数据对象

5. TransMeta meta = new TransMeta(\"etl/update_insert_Trans.ktr\"); 6. Trans trans = new Trans(meta); 7. trans.prepareExecution(null); 8. trans.startThreads(); 9. trans.waitUntilFinished(); 10. if(trans.getErrors()!=0){

11. System.out.println(\"执行失败!\"); 12. } 13. }

因篇幅问题不能全部显示,请点此查看更多更全内容