Migrate Data Between Databases With One Job Using the Dynamic Schema
In this article, see a tutorial on how to migrate between databases with one job using the dynamic schema.
Join the DZone community and get the full member experience.
Join For Free"How can I migrate data from all tables from one database to another with one generic Talend Job......with transformations as well?" is a question I get again and again on Talend Community. As an integration developer with over 15 years of honing my skills, this question used to get me banging my head against my desk. I now sit at a desk with a subtle but definitely present dent, which is slightly discolored from classic pine to pine with a rosé tinge.
My attitude was always that Talend is a tool that helps developers build complex integration jobs and should be used by experts who realize that there is no universal, one-size-fits-all job for everything. However, I was being somewhat harsh and maybe somewhat elitist. Looking back at my frustrations, I can see that they came from the fact that I had spent a lot of time and energy building my expertise, and I was a little resentful of the expectation that what we integration experts do should be considered so trivial and easy.
You might also find this interesting: Database Migration Tools
I recently received an example of that sort of question again and it got me considering my attitude.
The question wasn't quite the same as those I have received in the past. They didn't want to migrate the data, create dynamic transformations of the data, and filter adhoc rows all by simply joining 3 components together and pressing "Go". This individual wanted to take a database and migrate the data from source tables to target tables with a change in table name and possibly a change in column order as well.
I had a bit of time, so I thought I would give it a go. It sounded like it should be possible and like the sort of thing that I could use to dust off my skills a little, having been looking at some of Talend's newer tools over the last few months. In this blog, I will demonstrate a method of achieving this requirement.
First of all, I should point out that I am not going to demonstrate a complete multi-table to multi-table migration. What I will demonstrate is an example of a job that could be easily extended to do that. I will talk about how to easily extend it at the end. In this blog, I was focused on creating a job that will move data from one table to another using a Dynamic schema, a column mapping table and a bit of Java.
Below is a screenshot of the job. You will see that the components are numbered from 1 to 13. I will use this numbering when talking through what each component does and how you can recreate this.
I have created 3 tables in a MySQL database to demonstrate this. A source table, a target table, and a column mapping table. I've used a single database, but in reality, you will likely be using different databases. It doesn't make much difference, but you will need to make sure that the database column types are the same if you are following this. It would be possible to add some code to dynamically change the column types, but this would require extra data in the column mapping table and some extra Java code. This is not covered here.
The Source Table
The Source table is a simple table holding a few personal details. You can see the schema below as a MySQL create statement.
CREATE TABLE `source` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`first_name` varchar(45) DEFAULT NULL,
`last_name` varchar(45) DEFAULT NULL,
`house_number` int(11) DEFAULT NULL,
`street` varchar(45) DEFAULT NULL,
`city` varchar(45) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=201 DEFAULT CHARSET=latin1;
The Target Table
The Target table is also a simple table holding a few personal details, but slightly different. You can see the schema below as a MySQL create statement.
CREATE TABLE `target` (
`id` int(11) NOT NULL,
`firstName` varchar(45) DEFAULT NULL,
`lastName` varchar(45) DEFAULT NULL,
`cityName` varchar(45) DEFAULT NULL,
`road` varchar(45) DEFAULT NULL,
`addressNumber` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
The column order has changed slightly, as have some of the names. You will also notice I have not set the "id" as an auto-increment field. This was just done to save from faffing around with that in this simple example. However, there are plenty of ways of dealing with this if you have to.
The Column_Mapping Table
The Column_Mapping table is used to translate the column names and change the column order in this example. You can see the schema below as a MySQL create statement.
CREATE TABLE `column_mapping` (
`old_column_name` varchar(45) DEFAULT NULL,
`new_column_name` varchar(45) DEFAULT NULL,
`order` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
I will also show how I have populated this for this tutorial. It is a basic example with only a few changes. But it should give you the idea.
Component Configuration
Below I will describe the configuration of each of the components.
1. "Source" (tDBInput)
This component is used to read the data from the source table and output it in a Dynamic schema column. You can see the configuration of this component below...
Other than connection credentials, the only thing that needs to be done is to configure a single column (in this case called "dynamicColumns") of type "Dynamic". The query is simply...
"Select * From source"
Once this is configured, that is this component finished with.
2. "Identify column names" (tJavaFlex)
This component is used to identify the column names and the order of the columns retrieved by the Dynamic schema column. I will not show this, as it is simply code. The code is broken down into 3 sections; Start Code, Main Code, and End Code. These are shown below so that you can copy and paste. Ensure that all columns and rows are named the same otherwise you will get errors. Alternatively, ensure that you change those variable names.
Start Code
//Create an ArrayList to contain the column names of the input source
java.util.ArrayList columns = new java.util.ArrayList();
//Row count variable to count rows processed
int rowCount = 0;
Main Code
//Only carry out this code for the first row
if(rowCount==0){
//Set the dynamicColumnsTmp variable
Dynamic dynamicColumnsTmp = row3.dynamicColumns;
//Cycle through the columns stored in the Dynamic schema column and add the column names
//to the ArrayList
for (int i = 0; i < dynamicColumnsTmp.getColumnCount(); i++) {
DynamicMetadata columnMetadata = dynamicColumnsTmp.getColumnMetadata(i);
columns.add(columnMetadata.getName());
}
//Append 1 to the rowCount
rowCount++;
}
End Code
//Set the columns ArrayList to the globalMap for later
globalMap.put("columns", columns);
The above code is described in-line.
3. "Initial Dataset" (tHashOutput)
This component is used to store the data from the source component. It is passed through the tJavaFlex while that component is calculating the column order. The configuration of this component can be seen below.
You simply need to ensure that your column "dynamicColumns" is added.
4. "Column name" (tJavaFlex)
This component takes the globalMap variable created in the End Code section of the first tJavaFlex and returns each column name found, in order, one at a time to the following flow.
Start Code
//Retrieve the columns ArrayList to be used here
java.util.ArrayList columns = (java.util.ArrayList)globalMap.get("columns");
java.util.Iterator it = columns.iterator();
//Set a loop to produce a row for each column name
while(it.hasNext()){
Main Code
//Set each column name to a new row
row2.columnName = it.next();
End Code
}
This essentially works as a While loop, iterating over the ArrayList.
5. "Column mapping" (tDBInput)
This component is used to retrieve the data from the Column_Mapping table. This holds the old and new column names, it also holds the order of the columns. This data is used to dictate the order of the output data. This component is used as a lookup for the tMap which will be described next. The configuration of this component can be seen below.
Like the first tDBInput component, this is pretty simple to set up. Just set the schema and the query. The query can be seen below...
"SELECT
`column_mapping`.`old_column_name`,
`column_mapping`.`new_column_name`,
`column_mapping`.`order`
FROM `column_mapping`"
6. "Replace Column Names" (tMap)
This component is used to take the column names found in the first subjob and compare them against the lookup data from previous component. The configuration of this component can be seen below...
This is a pretty simple component in terms of its configuration. We simply bring our data row from the tJavaFlex and lookup against the dataset from the tDBInput. There is a join on the column name against the old column name from the lookup. The "columnName", "ColumnNameNew" and "Order" columns are returned. The data will hold the translation of the column names and the required order of those columns in the output data.
7. "Set Column Order" (tSortOrder)
This component is used to order the output from the tMap into the required column order for the final output. The configuration of this component can be seen below....
First, ensure that all input columns are sent to the output. Next, set the "Criteria" for the ordering to be on "Order", "num" and "asc". This will return the columns ordered from smallest "Order" value to largest.
8. "Create Ordered Column List String" (tMap)
This component is used to merge all of the column data into a single String. What it actually does is to append the rows together, returning as many rows as input, but with the final row holding a concatenation of all of the data. First, the old column name is concatenated with the new column name. These values are separated by a comma. Then the rows are concatenated using a semicolon. So the first row output might look like this....
oldColumnOne,newColumnOne
The last row output will look like this....
oldColumnOne,newColumnOne;ldColumnTwo,newColumnTwo;oldColumnThree,newColumnThree;oldColumnFour,newColumnFour;oldColumnFive,newColumnFive;oldColumnSix,newColumnSix
We need to keep the last row, but this will be handled by the next component.
The configuration of this tMap can be seen below....
This is relatively straight forward, but the tMap variables will need explaining. As I regularly mention in my tutorials, the tMap variables are processed per row from top to bottom. They also retain their values between rows. This makes this process possible. The variables and expressions are shown below...
The top variable (mergedColumns) is used to concatenate the old column name with the new column name. This is separated by a comma. The second variable (mergedRecords) is used to concatenate the mergedColumns values of every row. This is made possible by the fact that it is references (using Var.mergedRecords) itself in the concatenation. Since this is the case, it's appended value is stored between rows. So, as explained previously, by the last row of data all of the records will have been concatenated.
9. "Return Last Row" (tAggregateRow)
This component is used to return only the last row of data from the previous tMap. First of all, we ensure that the single input column is set as our output column. After this, we do not set a "Group by" field. This essentially puts all rows into one group. Then for the "Operations", we set the "mergedRecords" to have the Function" of "last". This will return only the last record.
The configuration of this component can be seen below...
10. "Save Ordered List String" (tJavaFlex)
Here we use another tJavaFlex component. Now here, I didn't really need to use a tJavaFlex. I could have used a tSetGlobalVar. I'll be totally honest and say that I produced all of the screenshots for this before thinking that it was a bit of overkill using the tJavaFlex. However, it doesn't hurt. All I am doing here is setting the value returned by the previous component to a globalMap variable. The code for this takes place in the Main Code section of the tJavaFlex. No other code is used. This can be seen below...
//Set the column translation record to the globalMap
globalMap.put("records", row7.mergedRecords);
11. "Initial Dataset" (tHashInput)
This component is used to read in the data stored in our tHashOutput from the first subjob. It is simply linked to the first tHashOutput and set with the same schema. The configuration of this component can be seen below...
12. "Reorder Dynamic Schema" (tJavaFlex)
This is our last tJavaFlex component and arguably the most complicated. I'll show the code below. The code is described in-line, but it essentially takes the data passed in the first subjob from our source, takes the mergedRecords String stored in the globalMap with a key of "records", splits up the mergedRecords String, then uses that data to match with column names from the data set passed from the tHashOutput. For each record, it checks the order and the new name required, then creates a new Dynamic schema record with the columns reordered and renamed.
Please see the code below...
Start Code
//Retrieve the "records" globalMap String which holds the record order
String records = ((String)globalMap.get("records"));
//Splite the columns up using the semi-colon
String[] columns = records.split(";");
Main Code
//Create a Dynamic schema variable to hold the incoming Dynamic column
routines.system.Dynamic dynamicColumnsTmp = row8.dynamicColumns;
//Create a brand new Dynamic column variable to be used for the newly formatted record
routines.system.Dynamic newDynamicColumns = new routines.system.Dynamic();
//Cycle through the column data supplied by the globalMap
for(int x = 0; x<columns.length; x++){
//Cycle through the columns inside the Dynamic column holding the data
for (int i = 0; i < dynamicColumnsTmp.getColumnCount(); i++) {
//Retrieve the value of the current column inside the Dynamic column
Object obj = dynamicColumnsTmp.getColumnValue(i);
//Retrieve a DynamicMetadata object from the column inside the Dynamic column
DynamicMetadata columnMetadata = dynamicColumnsTmp.getColumnMetadata(i);
//If the current column inside the Dynamic column starts with same name
if(columns[x].startsWith(columnMetadata.getName()+",")){
//Identify the old and new column names from the column record
String newColumnName = columns[x].substring(columns[x].indexOf(',')+1);
String oldColumnName = columns[x].substring(0,columns[x].indexOf(','));
//Create a new DynamicMetadata object
DynamicMetadata tmpColumnMetadata = new DynamicMetadata();
tmpColumnMetadata.setName(newColumnName);
//Set the metadata for this metadata
tmpColumnMetadata.setDbName(columnMetadata.getDbName());
tmpColumnMetadata.setType(columnMetadata.getType());
tmpColumnMetadata.setDbType(columnMetadata.getDbType());
tmpColumnMetadata.setLength(columnMetadata.getLength());
tmpColumnMetadata.setPrecision(columnMetadata.getPrecision());
tmpColumnMetadata.setFormat(columnMetadata.getFormat());
tmpColumnMetadata.setDescription(columnMetadata.getDescription());
tmpColumnMetadata.setKey(columnMetadata.isKey());
tmpColumnMetadata.setNullable(columnMetadata.isNullable());
tmpColumnMetadata.setSourceType(columnMetadata.getSourceType());
//Set the new metadata for the new column inside the new Dynamic schema column
newDynamicColumns.metadatas.add(tmpColumnMetadata);
//Set the value for the new column inside the new Dynamic schema column
newDynamicColumns.addColumnValue(obj);
}
}
}
//Set the output Dynamic schema column
row9.dynamicColumns = newDynamicColumns;
There is no End Code section for this component.
13. "Target" (tDBOutput)
This component is used to send the data to the target table. As with the first component, most of the configuration data depends upon you environment. You will need to make sure you set it up with a Dynamic schema column. This should automatically be set simply by connecting your component. The way this will work is that, so long as the new column names you selected match those in the database table, the DB component will identify these and create the appropriate insert statement. The configuration of this component can be seen below...
Once all of the above has been completed, you should be able to run your job, check your database and see that the data has been copied from source to target correctly.
What If I Want to Use This to Support Multiple Tables in One Job?
In my introduction, I mentioned that this method can be used to migrate to multiple tables. This example only supports 1 table to 1 table. All that is needed to extend this is the following.
1. Use Context Variables in Your DB Components
First of all, you will need to make your DB components more dynamic. The tDBInput component has a hardcode SELECT Statement. This needs changing, but not by much. If you add a Context variable for the source name, you can change your query to...
"Select * From "+context.source
No further changes to this component are needed.
You need to do similar to the tDBOutput component. But instead of setting a query, you will need a Context variable for the "Table" parameter. So you'd set....
context.target
...for your "Table" parameter.
2. Add Columns to Your Column_Mapping Table to Hold the Table Name
You will need to add a bit more supporting data to your Column_Mapping table. If you add an "Old_Table_Name" column and a "New_Table_Name" column, you can query the Column_Mapping table using the "Old_Table_Name" field and the context.source Context variable. That will return the mapping configurations for your source table and return the new table name. This will need to be set as your context.target Context variable value.
3. Create a Wrapper Job to Call This Job and Supply the Table Data as Context Variables
The final step for this will be to create a wrapper Job. This is a Job that will query a data set (maybe your Column_Mapping) table to return a list of source tables to be migrated. This data will then be sent to this Job, run using a tRunJob. For every source table identified in the wrapper Job, this Job will be run. Therefore you can start the wrapper Job, it will return each of the source tables and this Job will dynamically run for each of them.
Further Considerations
What I have described above is the most basic version of what you would need to do to meet this requirement. I have not included any logging or any error catching. You should do this if you want to achieve this successfully. You will likely also have to consider any referential integrity issues that might crop up. Assuming that all of your Primary and Foreign keys will be the same, this may just mean having to switch off any DB constraints on your target DB before running this. However, it is important to think this through before jumping to use this, as there will be further considerations to take into account.
Finally, good luck!
Further Reading
Published at DZone with permission of Richard Hall. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments