Tuesday, June 11, 2013

A Theta Join with Talend Open Studio

A Theta Join with Talend Open Studio

Ok.  The concept is actually a non-equi join, but I figure that a Greek mathematical symbol will get a few more hits.

If you're a functional programmer (ex, Java), you often reach for a loop and a few variables when you need to process a record while keeping track of preceding records.  In the relational DB world, a set-based paradigm is used, where sets of data are manipulated to get the desired output.

"Theta Join" refers to a join between two tables that can be based on an operator.  Usually, the operator is = which produces a special case of the Theta Join called Equi Join.  This is the typical join in RDBMS applications that links one to another via a primary key / foreign key relationship.  A Non-equi join is also a Theta Join, but with a "Theta" that isn't =.

The job featured in this blog post will demonstrate an application of the Non-equi join using Talend Open Studio.  The concept is not Talend-specific and can be applied to a SQL-only solution.

Input
This input contains an EmpEvent record which is an emp_id, event_dt, and job_code.  The job_code is null in some cases.  When the job_code is null, the job will swap in a job_code based on a preceding record.  Given the input

Input - Some job_code Data Missing
The job is expected to fill in the nulls based on preceding records.  The rule will select the closest event that has a job_code.  For example, the record with event_dt 1/5/2012 will use job_code B rather than the earlier A which is used for 1/1/2012, 1/2/2012, and 1/3/2012.

The output will be the following

Output - job_code is Filled In For Each Record
Job Design

The strategy that I'm using for this job is based on sequentially paring the record set down to get the desired output.

The input is processed in two passes.  The first pass loads event records with job_codes into a lookup table.  There are two records (A and B) that have job_codes.  They are inserted into the EmpEventLookup table.  This is a trivial operation; there are lots of examples of loading text files and applying tFilterRow on this blog.

The second pass processes the records, consulting the lookup table.  Here is a screenshot of the job.

Job Processing Input in 2 Stages
The tMap applies a Non-Equi join to the main flow (EmpEvent) and the lookup (EmpEventLookup).  This non-equi join does use an equality operator to link up emp_ids, but it also uses a greater than or equal to operator to refine the data set based on event_dt.  The result set is a list of EmpEvent records with candidate job_code matches.  (Records that already have a job_code are processed in a consistent manner.)

tMap Showing a Non-Equi Join
The tMap is configured to use an inner join and a match model of "All".  This is made available in the tMap settings.  The emp_ids are related using the normal drag-and-drop technique.  This will not work with the non-equi join.

Press the "activate expression filter" to work with the non-equi join.  This will display a box that will gather an expression.  The expression could be a simple where clause based on a constant comparison.  In this job's case, it's relating two date rows using the >= operator.  (For comparison purposes, I convert the dates to milliseconds using getTime()).

On the output schema, there is a new column introduced for later processing called "event_dt_delta".  This is performing a date difference.  At this point, the data set we're working with will have positive values, all the later records that could have a job_code match will be removed from the non-equi join.

Sort and Aggregate Components

The data set is refined, but there are still too many matches.  For example, the 1/5/2012 entry is still linked to both A and B because they appear earlier in the input.  We need to select the "closest" record.  We won't go over because those records have been removed with the non-equi join.  This is done with a tSortRow component that sorts on the computed event_dt_delta field.

Sorting by event_dt_delta
Now that the input is sorted, I use a tAggregateRow component to select the "first" record which is now the "closest" job_code (without going over).

Selecting the First Record of Sorted Input
Lastly, a tLogRow and second tSortRow are used for presentation.  The tAggregateRow seemed to scramble the input.  (There's probably a Java HashMap involved in the implementation.)

A Second Sort (For Presentation Purposes)
Most programmers, including DBAs who write stored procedures, think in terms of loops and variables when confronted with logic that involves relating records as they are being processed.  An alternative approach is to think in terms of data sets, paring a set down to the desired output.  This data set paradigm can be implemented in Talend Open Studio using components like tMap and tAggregateRow.  It can also be implemented in SQL or a hybrid Talend/SQL approach.

No comments: