Friday 12 April 2013

Understanding usage of tMemorizeRows component - Talend or Jasper ETL

Today, I am going to demonstrate usage of Talend component tMemorizeRows. tMemorizeRows temporarily memorizes an array of incoming data in a row by row sequence and instantiates this array by indexing each of the memorized rows from 0. We can set the maximum number of rows to be memorized at any given time in the component properties of the component.


tMemorizeRows is generally used in scenarios where data from previous rows needs to be compared with the current row to perform the transformations.


To demonstrate, I am going to create a Talend Job which will read the Log file having every job’s module end time and calculate the module processing time in seconds.


Lets look at the input delimited log file which contains Job Id, Module Name and its processing End Date Time.


Input File
job_id;job_module;job_end_date_time
1;Start;2013-04-12 13:00:00
1;Authentication;2013-04-12 13:00:56
1;Downloading;2013-04-12 13:02:23
1;Merging;2013-04-12 13:03:18
1;Transformation;2013-04-12 13:03:58
1;FTPing;2013-04-12 13:04:48
2;Start;2013-04-12 14:00:00
2;Authentication;2013-04-12 14:00:32
2;Downloading;2013-04-12 14:01:57
2;Merging;2013-04-12 14:02:16
2;Transformation;2013-04-12 14:03:11
2;FTPing;2013-04-12 14:04:48


For readability, I have shown below the same Input file in the Run console using tLogRow component.


Now, from this input file, we need to derive new column Module_Processing_Time_in_Seconds which will hold the modules processing time. We will derive this field by subtracting Current row’s module date time with previous row’s End Date time. For finding the difference in seconds we will use the following Talend build in function:


TalendDate.diffDate(previous_date_time,current_date_time , "ss")


Finally, our output file should look like below:



Lets start with designing a Talend job:


Jod Design:



1. First we will read the Input delimited file using tFileInputDelimited component. Provide the values of the component properties as mentioned below:




2. Define the metadata of the Input file. Metadata should look like following:


For more details on defining and configuring the metadata of the Delimited file visit “Generating Metadata for CSV (Delimited) file”.




3. Next step is to Sort the input data. Drag tSortRow component on to Job designer.
We will be sorting it on JobId and Module_End_Date_Time columns.


4. Now, its time to use tMemorizeRows component. As I told you earlier this component is used to remember last n values in the form of array. Provide the count of rows of data for particular fields that you want flow to remember in “Row count to memorize” In our scenario we will be only comparing current’s row date time with previous one we will give the count as 2.


In the “Columns to memorize” check all the fields that you want the job to remember.



In our scenario, we will compare JobId to reset the previous value to the current one. hence, we will also check JobId field to remember the previous value in flow.


Now, This component will allow the flow to save current and previous values in the array so that the next components can make use of these values. We can access the current and previous using the below syntax.


To retrieve the previous value of JobId field we can use following table:


Similarly, if you are memorizing more records then you can access the attributes by mentioning the array index int the field array.


5. Now, we have successfully memorized previous and current value of fields JobId and Module_End_Date_Time. Its time to use these values to calculate the total time taken by module in seconds.


Drag tJavaFlex component onto job designer. Right click tMemorizeRows component, Select Row>Main and connect it to tJavaFlex.


In the schema editor of tJavaFlex component, add new column Module_processing_Time_in_Seconds.
 


6. Open the component properties of the tJavaFlex component.


Declare all the variables in the start code.


Start Code
long var_processing_time;  
int previous_job_id;
int current_job_id;
Date previous_date_time;
Date current_date_time;


Enter the following code in the main code section of the component.


Main Code
previous_job_id = JobId_tMemorizeRows_1[0];
current_job_id = Relational.ISNULL(JobId_tMemorizeRows_1[1])?JobId_tMemorizeRows_1[0]:JobId_tMemorizeRows_1[1];


previous_date_time = Module_End_Date_Time_tMemorizeRows_1[0];
current_date_time = Relational.ISNULL(Module_End_Date_Time_tMemorizeRows_1[1])?Module_End_Date_Time_tMemorizeRows_1[0]:Module_End_Date_Time_tMemorizeRows_1[1];


if((previous_job_id==current_job_id) && (previous_date_time != current_date_time))
var_processing_time = TalendDate.diffDate(previous_date_time,current_date_time , "ss");
else var_processing_time=(long)0;


row3.Module_Processing_Time_in_Seconds = var_processing_time;


Logic of the code is explained in below steps:


1. Populate variables previous_job_id and current_job_id from tMemorizeRows array for column Job ID.
2. Similarly, Populate variables previous_date_time and current_date_time from tMemorizeRows array for column Module_End_Date_Time.
3. Derive variable var_processing_time by comparing previous_job_id and current_job_id and finally subtracting previous_date_time from current_date_time.
4. Assign the value of variable var_processing_time to output field.


7. Finally, drag tLogRow component onto job designer to display the output in Run console. Right click tjavaFlex component, select Row>Main and connect it to tLogRow.


Our job is complete, it's time to execute it and check the output.


Now, in the output Module processing time is calculated for every module in a Job. Also, when Job Id changes then the Module processing time is also getting reset.


You can use tMemorizeRows component is similar scenarios which required to perform transformation based on the values in the previous records.


Let me know, if you have any concerns regarding the usage of tMemorizeRows component.


You may also like to read..

3 comments: