{{announcement.body}}
{{announcement.title}}

Parse Data With Ab Initio Batch Graph and Write to Database

DZone 's Guide to

Parse Data With Ab Initio Batch Graph and Write to Database

This article shows how to parse data with Ab Initio Batch Graph and write to a database.

· Database Zone ·
Free Resource

Image title

Ab Initio Batch Graph

Within the scope of this article, the Batch Ab Initio graph will be triggered via the scheduler and the files in the destination folder will be retrieved and the data in it will be parsed according to certain criteria.

The parsed data will be written to the database.

He talked about the architecture and coding of archiving of files, providing file control, retrieving the file, filing the content of the file according to the required criteria, writing and processing the filtered data to the database.

First of all, we should have a scheduler structure that will trigger the Ab Initio graph. The choice here is entirely up to you. An oracle scheduler can be used as an example.

With the job to be triggered via the scheduler, it is possible to run the Ab Initio graph.

You might also like:  Understanding Batch, Microbatch, and Stream Processing

Folder Created

We must create our Folder structure in a directory that can be accessed from Ab Initio.

For input files --> SAMPLE\INPUT_FILE

For archive:

Output files --> SAMPLE\OUTPUT_FILE

After the directories are created, we assign our 'sample_xxxx.txt' file into the SAMPLE \ INPUT_FILE folder.

For example, let's have a single column in our file and let the column name Msisdn. Column data can be '+ 90530XXXXXX'.

Sampling was done to make the structure understandable. You can design as you want.

Ab Initio

New Ab Initio Graph is Created

On the Ab Initio Tool;

* Create a new graph

* Go to (‘File>New’)

* Then ‘File>Save As’ (i.e., Graph1) to save it in the appropriate ‘sandbox’ to enable this new graph to pick up the proper environment.

The newly created graph is opened. By clicking the organizer button on the main page, the following components are added.

Run Program Component

The Run program component is added to the graph.

With this component, we can read our files from the destination folder that we set ourselves.

Run Program --> Description --> Get Input File List

Run Program --> Parameters --> value --> SAMPLE\INPUT_FILE directory is shown.

source --> embed selected.

Run Program --> Ports --> Output --> out --> record format

record
string("\n") filename;
end


Replicate Component

The first stream is used to parse the data in the file.

The 2nd stream is created with a replicate component to archive files while processing. (It will be better understood in the full graphical display later.) This stream is set to run after the files have been processed. In the second stream, the sh file is created to archive the files by taking the distinct file names in the flow.

ReFormat and Output File components are used to create the “Sh File”. This file is then created in a different phase (example: phase 3) It is run. The Run Commands component was used to run the “Sh File”.

Replicate --> Ports --> Output --> out --> record format

record
string("\n") filename;
end

Read Multiple Files Component







The Read Multiple Files component is added to transform the file contents.

type input_type = record 
string("\n") msisdn;
end;

/*extract filename from input record*/
filename::get_filename(in) =
begin
filename :: in.filename;

end;

/* This function is optional. */
/*Create output record*/

out::reformat(read,filename,in)=
begin
let string(int) l_filename = string_substring(filename,string_rindex(filename,"/") + 1, 255);
let string('') [] l_filename_parts = string_split(l_filename,'_');
out.* :: read.*;
    out.msisdn :: string_suffix(decimal_strip(read.msisdn),10);
    out.filename_base :: l_filename;
    out.file_external_id :: l_filename_parts[1];
    out.record_no :: 1;
end;


Read Multiple Files --> Parameters --> compressed --> False

Read Multiple Files --> Data --> Associate component with EME dataset : $AI_SERIAL/dummy.dat

Read Multiple Files --> Layout --> Url : $AI_SERIAL

Read Multiple Files --> Ports --> Output --> filereject -->


record
string("\n") filename;
end

Read Multiple Files --> Ports --> Output --> fileerror --> string('\n')

Read Multiple Files --> Ports --> Output --> ou --> record format -->


record
string("¨", ) msisdn;
string("¨", ) filename_base;
decimal("¨", ) file_external_id;  
    decimal("¨", ) record_no;
    string(1) newline = "\n";
end


Sort Component


The sort component is added to sort the data. The filename_base and msisdn fields are sorted ascending.

Sort --> Parameters --> Key --> value --> {filename_base; msisdn} --> Ascending sorting added.

Sort --> Parameters --> Max_core --> value --> 1 ( max_core value is written)

Sort --> Ports --> Output --> out --> record format -->

record
string("¨", ) msisdn;
string("¨", ) filename_base;
decimal("¨", ) file_external_id;   
decimal("¨", ) record_no;
string(1) newline = "\n";
end


Sort --> Ports --> Output --> out --> Record Format Source --> Propagate from neighbors is marked.

Dedup Sorted Component

Dedup Sorted Component is added to eliminate duplicate records.

Dedup Sorted --> Parameters --> Key --> Value --> {filename_base; msisdn} --> Ascending sorted.

Dedup Sorted --> Parameters --> Keep --> first

Dedup Sorted --> Parameters --> Reject-threshold --> Abort on first reject

Dedup Sorted --> Parameters --> Check-sort --> False (Not checked. no need for sort component used before)

Dedup Sorted --> Parameters --> Logging --> False

Dedup Sorted --> Ports --> Output --> out --> Record Format Source --> Propagate from neighbors is marked.

Dedup Sorted --> Ports --> Output --> out --> Record format -->

record
string("¨", ) msisdn;
string("¨", ) filename_base;
decimal("¨", ) file_external_id;   
decimal("¨", ) record_no;
string(1) newline = "\n";
end


Reformat Component

The reformat component is added to find out the record sequence number based on filename.

Reformat --> Parameters --> Transform --> value

let string(int) g_filename = '';
let integer(8)  ;
/* This function is optional. */
/*Create output record*/
out::reformat(in)=
begin
/* initialize count filename changes*/
if (g_filename != in.filename_base)
      begin
          g_filename = in.filename_base;
          g_count = 1;
      end;
else
       begin
           g_count = g_count + 1;   
       end;
 out.* :: in.*;
 out.record_no :: g_count;
end;


Reformat --> Ports --> Output --> out --> Record format -->

record
string("¨", ) msisdn;
string("¨", ) filename_base;
decimal("¨", ) file_external_id;  
decimal("¨", ) record_no;
string(1) newline = "\n";
end;


Reformat --> Ports --> Output --> out --> Record Format Source --> Propagate from neighbors is marked.

Join With DB Component

Insert with DB Component is added to insert the data into the Database.

NOTE: Particularly, the use of 'join with db component' is due to the fact that the insert operations in the database are performed on more than one table and the need to execute a database procedure due to the execution of some controls.

For example, for each file, an insert to the master table and each record in the file is inserted into the detail table.

If insert operation is to be performed for a single table, the use of out table component should be preferred because it will be more efficient.

Join with DB --> Parameters --> DBConfigFile --> Configuration File --> configration the definition is added.

For example: $AI_DB/oracle_XXX.dbc

--> DBMS type --> for example: ORACLE

Join with DB --> Parameters --> dbms --> value --> for example : ORACLE

Join with DB --> Parameters --> select_sql --> execute schema.p_insert( :p_filename, :p_msisdn, :p_file_external_id, :p_location_type_id, :p_recordno );

Join with DB --> Parameters --> transform -->

type query_result_type=
record
end;
/*Generated type from select statement*/
// Compute fields for where clause
type key_type =
record
string(int) p_filename;
string(int) p_msisdn;
integer(8)  p_file_external_id;
integer(8)  p_recordno;
end;
/*Optional calculation of data for query where clause*/
out::compute_key(in) =
begin
out.p_filename :: in.filename_base;
    out.p_msisdn :: in.msisdn;
    out.p_file_external_id :: in.file_external_id;
    out.p_recordno :: in.record_no;
end;
/*Database lookup transform*/
out::join_with_db(in, query_result) =
begin
out.* :: in.*;
end;


Join with DB --> Parameters --> Continuous --> false

Join with DB --> Parameters --> match_Required --> true

Join with DB --> Parameters --> cache_query_results --> false

Database

Database Writing

CREATE TABLE T_MASTER
(
  MASTER_ID             NUMBER,
  FILE_EXTERNAL_ID      NUMBER,
  LOCATION_TYPE_ID      NUMBER           DEFAULT 1,
  FILE_NAME             VARCHAR2(255 BYTE),
  FILE_COUNT            NUMBER,
  STATUS            NUMBER           DEFAULT 0,
  INSERT_DATE    DATE,
  ERROR_DESC        VARCHAR2(4000 BYTE)
);

CREATE UNIQUE INDEX MASTER_ID_PK ON T_MASTER(MASTER_ID);

ALTER TABLE T_MASTER ADD (
  CONSTRAINT MASTER_ID_PK
  PRIMARY KEY
  (MASTER_ID)
  USING INDEX MASTER_ID_PK
  ENABLE VALIDATE);

CREATE SEQUENCE SEQ_MASTER
  START WITH 1
  MAXVALUE 999999999999
  MINVALUE 0
  NOCYCLE
  NOCACHE
  NOORDER;

CREATE SEQUENCE SEQ_FILE_DETAIL
  START WITH 1
  MAXVALUE 999999999999
  MINVALUE 0
  NOCYCLE
  NOCACHE
  NOORDER;

CREATE SEQUENCE SEQ_ERR_REC
  START WITH 1
  MAXVALUE 999999999999
  MINVALUE 0
  NOCYCLE
  NOCACHE
  NOORDER;

CREATE TABLE T_FILE_DETAIL
(
  FILE_DETAIL_ID  NUMBER,
  MSISDN            VARCHAR2(20 BYTE),
  MASTER_ID       NUMBER,
  INSERT_DATE     DATE,
  ERROR_DESC      VARCHAR2(4000 BYTE),
  STATUS          NUMBER     DEFAULT 0
);

CREATE TABLE T_ERROR_RECORDS
(
  ERROR_REC_ID   NUMBER,
  MSISDN       VARCHAR2(20 BYTE),
  MASTER_ID       NUMBER,
  INSERT_DATE      DATE
);

CREATE UNIQUE INDEX ERROR_REC_ID_PK ON T_ERROR_RECORDS
(ERROR_REC_ID);

ALTER TABLE T_ERROR_RECORDS ADD (
  CONSTRAINT ERROR_REC_ID_PK
  PRIMARY KEY
  (ERROR_REC_ID)
  USING INDEX ERROR_REC_ID_PK
  ENABLE VALIDATE);

ALTER TABLE T_ERROR_RECORDS ADD (
  CONSTRAINT ERR_MASTER_ID_FK 
  FOREIGN KEY (MASTER_ID) 
  REFERENCES T_MASTER (MASTER_ID)
  DISABLE NOVALIDATE);

 CREATE OR REPLACE PROCEDURE P_INSERT (
        p_filename               IN       VARCHAR2,
        p_msisdn                 IN        VARCHAR2,
        p_file_external_id       IN        NUMBER,
        p_location_type_id       IN        NUMBER,
        p_recordno               IN        NUMBER
    ) AS
        l_file_detail_id    NUMBER;
        l_master_id         NUMBER;
        l_error_rec_id      NUMBER;
    BEGIN
        IF p_recordno = 1 THEN
            l_master_id := seq_master.NEXTVAL;
            INSERT INTO t_master
                        (master_id, file_external_id, location_type_id, file_name,
                         file_count,status,insert_date
                        )
                 VALUES (l_master_id, p_file_external_id, p_location_type_id, p_filename,
                        1,0,   --initial statu
                                         SYSDATE
                        );
        ELSE
            SELECT master_id
              INTO l_master_id
              FROM t_master
             WHERE file_name = p_filename;

               UPDATE t_master
               SET file_count = file_count + 1
             WHERE master_id = l_master_id;
        END IF;
        l_file_detail_id := seq_file_detail.NEXTVAL;
        BEGIN
               IF p_msisdn IS NULL THEN
                      l_error_rec_id := seq_err_rec.NEXTVAL;
                           INSERT INTO t_error_records
                                       (error_rec_id, msisdn, master_id, insert_date
                                       )
                                VALUES (l_error_rec_id, p_msisdn,l_master_id,SYSDATE);

                                 UPDATE t_master
                                 SET file_count = file_count - 1
                                 WHERE master_id = l_master_id;
              ELSIF  LENGTH(TO_CHAR(p_msisdn)) != 10 then
                   l_error_rec_id := seq_err_rec.NEXTVAL;
                           INSERT INTO t_error_records
                                       (error_rec_id, msisdn, master_id, insert_date
                                       )
                                VALUES (l_error_rec_id, p_msisdn,l_master_id,SYSDATE);
                                 UPDATE t_master
                                SET file_count = file_count - 1
                                WHERE master_id = l_master_id;
              ELSE
                    INSERT INTO t_file_detail
                           (file_detail_id, msisdn,master_id,
                            insert_date,status
                           )
                    VALUES (l_file_detail_id, p_msisdn,l_master_id,
                            SYSDATE,0   --initial statu
                          );
              END IF;
        END;
END p_insert;


Ab Initio Continued

For Archive Files

These two components are used to archive files after data is processed into the database.

Reformat Component

Reformat --> Ports --> Output --> out --> Record Format Source --> embed is marked.

Reformat --> Parameters --> Transform --> value

out::reformat(in)=
begin
let string("") l_start="";
let integer(4) l_nis=next_in_squence();
l_start = "#!/bin/ksh \n";
out.cmd:1:if(l_nis==1)string_concat(l_start,"mv","in.filename"";
"$KSH_FILENAME_POSTFIX.sh");
out.cmd::string_concat("mv",in.filename,"","$KSH_FILENAME_POSTFIX.sh");
end;


Reformat --> Ports --> Output --> out --> Record format -->

record
string("¨", ) msisdn;
string("¨", ) filename_base;
decimal("¨", ) file_external_id;  
decimal("¨", ) record_no;
string(1) newline = "\n";
end;


Reformat --> Ports --> Output --> out --> Record Format Source à embed is marked.

Output File Component

OutputFile --> Data --> URL is marked. ( The path to the ksh file that will run should be given here )

OutputFile --> Access -->

OutputFile --> Ports --> Input --> write --> Record Format -->

record
string("\n") cmd;
end;


OutputFile --> Ports --> Record Format Source --> Propagate from neighbors is marked.

Further Reading

Tips for Fast Batch Updates of Graph Structures With Neo4j and Cypher

Graph Processing With Apache Flink

Topics:
data write processes ,database writing ,file processing ,file read ,graph database ,batch applications

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}