Friday, June 5, 2015

Pentaho Kettle 5.3 CE and copy tables data from Postgres to MySql database

My experience with Kettle is really short. I tried to manage use case where data in Postgres database tables should be transferred to Mysql database. In previous article I described how to have Virtualbox/Ubuntu 14.04 with Mysql and Postgres database so this work is useful for my current task.

First step is to install
Installation is straightforward  - unpack zip in new folder and this minimal setup is all that you need. Kettle has Eclipse-based visual editor so Java runtime is pre-requirement (Java8 64bit was ok on my Win8.1 laptop). To access different databases jdbc libraries should be added to KETTLE_HOME\lib directory (Postgres driver is only one already in place).

Some important points from my experience
  • There are different options how to install Kettle - keep everything simple as possible and avoid pdi database. In my experience for development purposes folder/xml based repository is good enough and much easier to manage.
  • There is huge collection of documentation in internet (even youtube videos), but problem is that for different Kettle versions and not all options of Kettle commercial version is available in CE version. My suggestion is to read old Kettle wiki documentation and avoid Pentaho EE documentation, because changes between Kettle versions are not so big. Another option is to download older Kettle version, but I do not find any reason for that (4.4 version is quite popular in forums).
  • If you have previous installation of Kettle in same computer then in %USER%/.kettle directory files will be used by both installations (better is to remove older one).

Kettle task implementation

My task was simple - I have 2 tables in postgres database with data and I want to transfer data in these tables to Mysql database. I have previously installed Virtualbox/Ubuntu 14.04 with Mysql and Postgres databases with 2 tables in both databases with similar structure.

Because simple Transformation with 2 Table Input/Output tasks is to simple solution, then idea was to solve problem in dynamic way. In future any number of tables can be tranferred using same pattern (identical tables structures in 2 databases is prerequisite) and only changes in configuration is needed.

Get names and columns of tables from source DB

First step is to have transformation that reads from DB metadata table names and columns and output is stream of rows consisting Table_name and column_names (column names is list of columns separated by comma)

Solution is Transformation

Table Input -> Add Constraint -> Row Denormalizer -> Copy Rows to result

Table Input.Connection = Postgres Db connection (previously saved in shared.xml)
Table Input.SQL = select table_name, column_name from information_schema.columns where table_schema = 'test' order by table_name, ordinal_position
# Get information from Postgres metadata


Add Constraint (required for Normalizer, adds constant 1 for all rows in stream)
Add Constraint.Name = grouper
Add Constraint.Type = Integer
Add Constraint.Value = 1


Row Denormalizer.The key field = grouper
Row Denormalizer.Group field = table_name
Row Denormalizer.Target fieldname = table_columns
Row Denormalizer.Value fieldname = column name
Row Denormalizer.Key value = 1
Row Denormalizer.Type = String
Row Denormalizer.Aggregation = Concatenate strings separated by ,

Row Denormalizer will concatanate columns (achievable also by sql, but we try to learn Kettle now :-)) and Copy Rows to result will make current transformation output available to next step (Transformation/job) as stream. Idea is to pass this stream as parameters to next step without persisting information in any file/database.

Next step is to have transformation to save data in target db

Get dynamically data from source and save in target db

In this step we will read data from source database table and save it in target database. This would be really simple if it will be implemented in static way

Table Input -> Table Output

We will have same components in our transformation, but table name and column names should be come as named parameters to this transformation. So we will have Transformation properties

Transformation properties.parameters[0].Parameter = table_columns
Transformation properties.parameters[0].Default Value = i,d,t

Transformation properties.parameters[1].Parameter = table_name
Transformation properties.parameters[1].Default Value = test_data

# Default value is optional, but gives option to test transformation without input feed

Table.Input.Connection = Postgres Db connection (previously saved in shared.xml)
Table.Input.SQL = SELECT ${table_columns} FROM ${table_name}
Table.Input.Replace variables in script = Check
# This trick will read columns and table name from named parameters

Table.Output.Connection = Mysql connection (previously saved in shared.xml)
Table.Output.Target schema = test
Table.Output.Target table = ${table_name}
Table.Output.Truncate table = check
# We will always truncate table before loading, we assume that source/target have same field names so field mapping is not needed.

Main job

Now we have 2 separate transformations and we need to feed with every row of first transformation output second transformation to get results. So loop over first transformation output should be implemented and for every row second generation started.

There is really simple and elegant solution in Kettle to solve the problem (even without loop in screen). We need new job with these elements

Start -> Transfomation 1 -> Transformation 2

No changes in properties for job

Transfomation 1.job entry details.Transformation Spec.Transformation name = <first transformation filename>

Transfomation 2.job entry details.Transformation Spec.Transformation name = <second transformation filename>
Transfomation 2.job entry details.Advanced.Copy previous results to parameters = Check
Transfomation 2.job entry details.Advanced.Execute for every input row = Check
Transfomation 2.job entry details.Parameters[0].Parameter = table_columns
Transfomation 2.job entry details.Parameters[0].Stream column name = table_columns
Transfomation 2.job entry details.Parameters[1].Parameter = table_name
Transfomation 2.job entry details.Parameters[1].Stream column name = table_name

Now you have first transformation that will call second transformation for every row in output stream and pass these values as named parameters to the second transformation. If you have data in source database tables then running Main job should transfer data to target tables.

Conclusion

We have very powerful solution to tranfer data from source database to target if table structures in both databases are identical. Main problem during development was to figure out what fields must be filled to pass one transformation output stream as input to next transformation. Actually it took many hours to figure out the second transformation Parameters part, because it is not easy to figure out where is problem with debugging.

Honestly, I was surprised how simple and elegant dynamic solution was possible to develop in Kettle and hopefully it will be my favourite ETL tool in future.


1 comment: