Today, I am going to demonstrate usage of Talend component tMemorizeRows. tMemorizeRows
temporarily memorizes an array of incoming data in a row by row
sequence and instantiates this array by indexing each of the memorized
rows from 0. We can set the maximum number of rows to be memorized at
any given time in the component properties of the component.
tMemorizeRows is
generally used in scenarios where data from previous rows needs to be
compared with the current row to perform the transformations.
To
demonstrate, I am going to create a Talend Job which will read the Log
file having every job’s module end time and calculate the module
processing time in seconds.
Lets look at the input delimited log file which contains Job Id, Module Name and its processing End Date Time.
Input File
job_id;job_module;job_end_date_time
1;Start;2013-04-12 13:00:00
1;Authentication;2013-04-12 13:00:56
1;Downloading;2013-04-12 13:02:23
1;Merging;2013-04-12 13:03:18
1;Transformation;2013-04-12 13:03:58
1;FTPing;2013-04-12 13:04:48
2;Start;2013-04-12 14:00:00
2;Authentication;2013-04-12 14:00:32
2;Downloading;2013-04-12 14:01:57
2;Merging;2013-04-12 14:02:16
2;Transformation;2013-04-12 14:03:11
2;FTPing;2013-04-12 14:04:48
For readability, I have shown below the same Input file in the Run console using tLogRow component.
Now, from this input file, we need to derive new column Module_Processing_Time_in_Seconds which
will hold the modules processing time. We will derive this field by
subtracting Current row’s module date time with previous row’s End Date
time. For finding the difference in seconds we will use the following
Talend build in function:
TalendDate.diffDate(previous_date_time,current_date_time , "ss")
Finally, our output file should look like below:
Lets start with designing a Talend job:
Jod Design:
1. First we will read the Input delimited file using tFileInputDelimited component. Provide the values of the component properties as mentioned below:
2. Define the metadata of the Input file. Metadata should look like following:
For more details on defining and configuring the metadata of the Delimited file visit “Generating Metadata for CSV (Delimited) file”.
3. Next step is to Sort the input data. Drag tSortRow component on to Job designer.
We will be sorting it on JobId and Module_End_Date_Time columns.
4. Now, its time to use tMemorizeRows component.
As I told you earlier this component is used to remember last n values
in the form of array. Provide the count of rows of data for particular
fields that you want flow to remember in “Row count to memorize” In our scenario we will be only comparing current’s row date time with previous one we will give the count as 2.
In the “Columns to memorize” check all the fields that you want the job to remember.
In
our scenario, we will compare JobId to reset the previous value to the
current one. hence, we will also check JobId field to remember the
previous value in flow.
Now,
This component will allow the flow to save current and previous values
in the array so that the next components can make use of these values.
We can access the current and previous using the below syntax.
To retrieve the previous value of JobId field we can use following table:
Similarly,
if you are memorizing more records then you can access the attributes
by mentioning the array index int the field array.
5. Now, we have successfully memorized previous and current value of fields JobId and Module_End_Date_Time. Its time to use these values to calculate the total time taken by module in seconds.
Drag tJavaFlex component onto job designer. Right click tMemorizeRows component, Select Row>Main and connect it to tJavaFlex.
In the schema editor of tJavaFlex component, add new column Module_processing_Time_in_Seconds.
6. Open the component properties of the tJavaFlex component.
Declare all the variables in the start code.
Start Code
long var_processing_time;
int previous_job_id;
int current_job_id;
Date previous_date_time;
Date current_date_time;
Enter the following code in the main code section of the component.
Main Code
previous_job_id = JobId_tMemorizeRows_1[0];
current_job_id = Relational.ISNULL(JobId_tMemorizeRows_1[1])?JobId_tMemorizeRows_1[0]:JobId_tMemorizeRows_1[1];
previous_date_time = Module_End_Date_Time_tMemorizeRows_1[0];
current_date_time
=
Relational.ISNULL(Module_End_Date_Time_tMemorizeRows_1[1])?Module_End_Date_Time_tMemorizeRows_1[0]:Module_End_Date_Time_tMemorizeRows_1[1];
if((previous_job_id==current_job_id) && (previous_date_time != current_date_time))
var_processing_time = TalendDate.diffDate(previous_date_time,current_date_time , "ss");
else var_processing_time=(long)0;
row3.Module_Processing_Time_in_Seconds = var_processing_time;
Logic of the code is explained in below steps:
1. Populate variables previous_job_id and current_job_id from tMemorizeRows array for column Job ID.
2. Similarly, Populate variables previous_date_time and current_date_time from tMemorizeRows array for column Module_End_Date_Time.
3. Derive variable var_processing_time by comparing previous_job_id and current_job_id and finally subtracting previous_date_time from current_date_time.
4. Assign the value of variable var_processing_time to output field.
7. Finally, drag tLogRow component
onto job designer to display the output in Run console. Right click
tjavaFlex component, select Row>Main and connect it to tLogRow.
Our job is complete, it's time to execute it and check the output.
Now,
in the output Module processing time is calculated for every module in a
Job. Also, when Job Id changes then the Module processing time is also
getting reset.
You
can use tMemorizeRows component is similar scenarios which required to
perform transformation based on the values in the previous records.
Let me know, if you have any concerns regarding the usage of tMemorizeRows component.
You may also like to read..
hi vikram
ReplyDeletethose images are not in visiable state
Images are not visible
ReplyDeleteimages on this page are not visible
ReplyDeleteHow can we do persistent caching in talend, like we can do in informatica ?
ReplyDelete