Visualize Airflow Workflows Without Airflow
During development, it's hard to visualize the connections mentioned in Python code. We take a look at how to validate the DAG without deploying it on Airflow.
Join the DZone community and get the full member experience.
Join For FreeApache Airflow has gained a lot of traction in the data processing world. It is a Python-based orchestration tool. When I say "Python-based" it is not just that the application has been developed using Python. The directed acyclic graphs (DAGs) — Airflows term for workflows — are also written as Python. In other words, workflows are code. Many of the popular workflows tools like Informatica and Talend have visual tools that allow developers to lay out the workflow visually. As Airflow workflows are Python code, we are able to visualize the workflow only after uploading it. While this is an acceptable situation, in some cases, it can become problematic because Airflow refuses to load the workflow due to errors. Additionally, during development, it is difficult to visualize all the connections mentioned in Python code.
While looking for a way to visualize the workflow, I came across a Sankey diagram. Not just that, I also came across a gist where Python code has been conveniently packaged into a function. All I had to do was download the gist and include it in my program.
Once I had the drawing code handy, I was left with the task of parsing the Airflow DAG and populating a data structure as needed to draw the chart. Below is the function I wrote. The function looks for the set_upstream
function in the code. As set_upstream
is used to connect the child task to the parent task, I had to place it properly in a list.
I hope you also enjoy validating the DAG without having to deploy it on Airflow.
def process_file(input_directory, filename, output_directory):
input_file = os.path.join(input_directory, filename)
output_file = os.path.join(output_directory, filename).replace(".py", ".html")
data = list()
input_file = open(input_file, "r")
for line in input_file.readlines():
line = line.replace("\n", "")
if "set_upstream" in line:
names = line.split(".")
child = names[0]
parent = names[1].replace("set_upstream(", "").replace(")", "")
data.append([parent, child, 1])
dataframe = pd.DataFrame(data, columns=["source", "target", "count"])
fig = genSankey(dataframe, cat_cols=["source", "target"], value_cols="count", title=input_file)
go.Figure.write_html(fig, output_file)
Published at DZone with permission of Bipin Patwardhan. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments