NIFI — Data Pipelines

Ankit Agarwal
4 min readNov 30, 2022

--

While I was working as a backend developer during my internship, I got to work on multiple microservices. One of which was inventory service. There was this particular use case in this service where I had to take a CSV/excel file from the AWS s3 bucket and push the JSON of individual rows to SQS(Simple Queue Service) after performing validations. This was my first time working with NIFI so the approach might be a bit noob.

Before going forward, I want to mention that this blog is not friendly for people who don't know what NIFI is and the basics of NIFI, I have focused more on the flow, and the processors used rather than how they have been used. I will drop some links that I found useful and might help you to start with NIFI.

Let's Begin!

  1. So the first part was to fetch files from S3. For this, I used a List S3 processor. Since we had a specific folder structure to access files, I used RouteOnAttribute to filter the unwanted ObjectKeys. From Object Keys, I extracted a particular value from the path using UpdateAttribute.
Flow to get files from the S3 bucket

Then I used FetchS3Object to get actual files. Now the architecture requirement was to delete the file from the original location and add it to an archive folder. This entire this was bundled into a group output sent to the next processing group i.e. Getting files in the correct format.

2. In the second step, I used MimeType to get the file type. If it was a type other than CSV or excel, we rejected it. If it was csv it was moved for further processing and if it was excel it was first converted to csv and then moved. RouteOnAttribute was used to diverge on the basis of file type.

Flow to get CSV files

3. Now we had the most important task to be executed which is validating the rows, handling the failures, and pushing the JSON from the group. Firstly I used Add Schema Name Attribute, and then ValidateCSV processor. First I checked if there are no Null values, if there are then we will be rejecting those rows. Now we will split the file into individual rows, and use the ValidateCSV process to make case-specific checks like enums, unique, etc. If these checks fail, we will add a failure message as an attribute and use it to save it in the failure folder of S3 after converting it to JSON. To convert to JSON I have used ConvertRecord processors which use CSVReader and JsonWriter controllers.

If all the validations are passed we convert the record to JSON and added an extra field that should be unique(used flow file UUID for it). On success, the final JSON gets pushed outside the group using an outside port.

Extract rows Failure
Extract Rows success

4. In the final step I am pushing the JSON to AWS SQS using the PutSQS processor.

Throughout the pipeline, I have used log attributes at multiple places to log the data for the relationships that were mandatory to handle but I did not need them and at some places to have a look into flow files for debugging purposes and get a better understanding.

People who are completely new can check out this link. This was the first playlist I watched to start, and it's just enough to get started. After this, you can directly jump to your problem statement and google as and when required.

Some important pointers:

  1. Do keep the NIFI expression language handy. You will need it a lot of times.
  2. Test small chunks while creating.
  3. Group logical entities into processor groups, for maintaining sanity.
  4. Read through flow files, and check attributes going in and out.
  5. Use funnels if required.
  6. Use and create templates.

That's it guys, you can reach out to me on Linkedin or by mail. I will be happy to help.

Thank you!

--

--