Automating CSV to PostgreSQL Ingestion with Airflow and Docker

Introduction

Managing an information pipeline, comparable to transferring knowledge from CSV to PostgreSQL, is like orchestrating a well-timed course of the place every step depends on the earlier one. Apache Airflow streamlines this course of by automating the workflow, making it simple to handle advanced knowledge duties.

On this article, we’ll construct a sturdy knowledge pipeline utilizing Apache Airflow, Docker, and PostgreSQL PostgreSQL to automate studying knowledge from CSV recordsdata and inserting it right into a database. We’ll cowl key Airflow ideas comparable to Directed Acyclic Graphs (DAGs), duties, and operators, which can show you how to effectively handle workflows.

The goal of this challenge is to show easy methods to create a dependable knowledge pipeline with Apache Airflow that reads knowledge from CSV recordsdata and writes it right into a PostgreSQL database. We’ll discover the combination of varied Airflow parts to make sure efficient knowledge dealing with and preserve knowledge integrity.

Studying Outcomes

  • Perceive the core ideas of Apache Airflow, together with DAGs, duties, and operators.
  • Discover ways to arrange and configure Apache Airflow with Docker for workflow automation.
  • Acquire sensible information on integrating PostgreSQL for knowledge administration inside Airflow pipelines.
  • Grasp the method of studying CSV recordsdata and automating knowledge insertion right into a PostgreSQL database.
  • Construct and deploy scalable, environment friendly knowledge pipelines utilizing Airflow and Docker.

Stipulations

  • Docker Desktop, VS Code, Docker compose
  • Fundamental understanding of Docker containers
  • Fundamental Docker instructions
  • Fundamental Linux Instructions
  • Fundamental Python Data
  • Constructing Picture from Dockerfile, Docker-compose

This text was printed as part of the Information Science Blogathon.

What’s Apache Airflow?

Apache Airflow (or just Airflow) is a platform to programmatically creator, schedule, and monitor workflows. When workflows are outlined as code, they grow to be extra maintainable, versionable, testable, and collaborative. The wealthy person interface makes it simple to visualise pipelines operating in manufacturing, monitor progress, and troubleshoot points when wanted.

Introduction to Apache Airflow

Understanding Airflow Terminologies

Allow us to perceive the airflow terminologies under:

Workflow

  • Consider a workflow as a step-by-step course of to attain a objective. It may be a sequence of actions that should be executed in a particular order to perform one thing.
  • Instance: If you wish to bake a cake, the workflow might embrace steps like: collect components → combine components → bake cake → adorn cake.

DAG (Directed Acyclic Graph)

  • A DAG is a blueprint or map of your workflow. It defines what must be executed and in what order, however it doesn’t really carry out the duties. It reveals the dependencies between completely different steps.
  • “Directed” signifies that the steps comply with a particular order, whereas “Acyclic” signifies that the method can’t loop again to a earlier step.
  • Instance: Within the cake instance, the DAG could be a chart that claims you have to collect components earlier than mixing them and blend the components earlier than baking the cake.
DAG (Directed Acyclic Graph)

On this DAG, A will run first, then cut up into two branches: one goes to B after which to D, and the opposite goes to C after which to E. Each branches can run independently after A finishes.

Job

  • A job is a single motion or step inside the workflow. Every job represents a particular job that must be executed.
  • Instance: Within the cake workflow, duties could be: collect components (one job), combine components (one other job), bake cake (yet one more job), and so forth.

Operators in Airflow

  • Operators are the constructing blocks of duties in Airflow. They inform Airflow what motion to carry out for a job.
  • Every operator defines a particular motion, like operating a Python script, shifting knowledge, or triggering one other course of.

Distinguished Operators

  • PythonOperator: Runs a Python perform.
    • Instance: Executes a Python perform to scrub knowledge.
  • DummyOperator: Does nothing, used for testing or as a placeholder.
    • Instance: Marks the completion of part of a DAG with out doing something.
  • PostgresOperator : The PostgresOperator is an Airflow operator designed to run SQL instructions in a PostgreSQL database.

XComs (Cross-Communications)

  • XComs are a method for duties to speak with one another in Airflow.
  • They permit one job to ship knowledge to a different job.
  • Instance: Job A processes some knowledge, shops the end result utilizing XCom, and Job B can retrieve that end result and proceed processing.

In easy phrases: Operators outline what your job will do, and XComs let duties cross data to one another.

Connections

In Airflow, you utilize connections to handle and retailer the credentials and particulars required for connecting to exterior programs and providers. They permit Airflow to work together with varied knowledge sources, APIs, and providers securely and persistently. For instance, once you create a Spark or AWS S3 connection in Airflow, you allow Airflow to work together with Spark clusters or AWS S3 buckets, respectively, via duties outlined in your DAGs.

Now that we’re clear with the essential terminologies of airflow, lets begin constructing our challenge !!

Putting in Apache Airflow on Docker Utilizing Dockerfile

Utilizing Docker with Apache Airflow ensures a straightforward and reproducible atmosphere setup.

Writing a Dockerfile

A Dockerfile is a script that accommodates a sequence of directions to construct a Docker picture.Kindly copy these directions right into a file with the title Dockerfile.

Necessary : Don’t save the file as Dockerfile.txt or another extension. Merely save as Dockerfile.

FROM apache/airflow:2.9.1-python3.9
USER root
# Set up Python dependencies
COPY necessities.txt /necessities.txt
RUN pip3 set up --upgrade pip
RUN pip3 set up --no-cache-dir -r /necessities.txt

# Set up Airflow suppliers
RUN pip3 set up apache-airflow-providers-apache-spark apache-airflow-providers-amazon n

# Set up system dependencies

RUN apt-get replace && 
    apt-get set up -y gcc python3-dev openjdk-17-jdk && 
    apt-get clear

We begin with a base picture from the official Apache Airflow repository. This ensures that we’ve got a steady and dependable basis for our software:

  • FROM apache/airflow:2.9.1-python3.9
    • The picture apache/airflow:2.9.1-python3.9 consists of Airflow model 2.9.1 and Python 3.9, which supplies the important instruments and libraries to run Apache Airflow.
  • USER root
    • By switching to the basis person, we acquire the mandatory permissions to put in packages and modify the file system inside the container.
  • Putting in Python Dependencies
    • We’ll copy a necessities file containing the mandatory Python packages into the picture and set up them.
  • Putting in Airflow Suppliers
    • We set up particular Airflow suppliers required for our workflows.Right here for educational functions, we’ve got put in Supplier for Apache Spark and Supplier for AWS providers.You’ll be able to set up varied different suppliers. Distinguished suppliers embrace Spark, AWS, Google, Postgres.
  • Putting in System Dependencies
    • Lastly, we set up system-level dependencies that could be required by sure libraries or functionalities.
RUN apt-get replace && 
    apt-get set up -y gcc python3-dev openjdk-17-jdk && 
    apt-get clear
  • apt-get replace: Updates the bundle lists for the newest model of packages.
  • apt-get set up -y gcc python3-dev openjdk-17-jdk: Installs the GCC compiler, Python growth headers, and OpenJDK 17, which can be required for constructing sure packages.
  • apt-get clear: Cleans up the bundle cache to cut back the picture measurement.

Now that we’re executed with establishing the Dockerfile, let’s transfer forward!!

Configuring Docker Compose for Apache Airflow

Along with making a customized Docker picture with a Dockerfile, you’ll be able to simply handle and orchestrate your Docker containers utilizing Docker Compose. The docker-compose.yml file defines the providers, networks, and volumes that make up your software. We join the Dockerfile to the Compose file in order that we will construct a customized picture for our software and simply handle all of the providers it must run collectively. Let’s see easy methods to specify our customized Dockerfile within the Docker Compose setup :

x-airflow-common

This part defines frequent settings for all Airflow providers.

  • Units up the atmosphere variables wanted for the Airflow software to run.
  • Specifies connections to a PostgreSQL database to retailer Airflow knowledge.
  • Defines paths for storing DAGs (Directed Acyclic Graphs), logs, and configurations.
model: '1.0' 
x-airflow-common: &airflow-common
  construct:
    context: .
    dockerfile: Dockerfile
  atmosphere:
    AIRFLOW__CORE__EXECUTOR: LocalExecutor
    AIRFLOW__CORE__FERNET_KEY: ""
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CORE__LOAD_EXAMPLES: "false"
    AIRFLOW__API__AUTH_BACKENDS: airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session
    AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: "true"
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/dags:/decide/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/decide/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/config:/decide/airflow/config
    - ${AIRFLOW_PROJ_DIR:-.}/plugins:/decide/airflow/plugins
    - ${AIRFLOW_PROJ_DIR:-.}/necessities.txt:/decide/airflow/necessities.txt
    - ${AIRFLOW_PROJ_DIR:-.}/sample_files:/decide/airflow/sample_files
    - ./spark_jobs:/decide/bitnami/spark_jobs
  person: ${AIRFLOW_UID:-50000}:0
  depends_on:
    postgres:
      situation: service_healthy
  networks:
    - confluent

After establishing the x-airflow-common, we have to outline the providers which might be required.

airflow-webserver

This service runs the net interface for Airflow, the place customers can handle and monitor workflows.

  • Exposes port 8080 to entry the net UI.
  • Makes use of well being checks to make sure that the net server is operating correctly.
  • Depends upon the database service to be wholesome earlier than beginning.
providers:

  airflow-webserver:
    <<: *airflow-common
    
    ports:
      - "8080:8080"
    depends_on:
      postgres:
        situation: service_healthy
    healthcheck:
      take a look at: ["CMD-SHELL", "curl -f http://localhost:8080/health || exit 1"]
      interval: 30s
      timeout: 10s
      retries: 3

airflow-scheduler

The scheduler is liable for triggering duties primarily based on the outlined workflows.

  airflow-scheduler:
    <<: *airflow-common
    networks :
      - confluent
    depends_on:
      postgres:
        situation: service_healthy
      airflow-webserver:
        situation: service_healthy
    healthcheck:
      take a look at: ["CMD-SHELL", "curl -f http://localhost:8080/health || exit 1"]
      interval: 30s
      timeout: 10s
      retries: 3

airflow-triggerer

This service triggers duties that require exterior occasions or situations to begin. It runs in an identical method to the scheduler and connects to the identical PostgreSQL database.

  airflow-triggerer:
    <<: *airflow-common
    depends_on:
      postgres:
        situation: service_healthy
      airflow-init:
        situation: service_completed_successfully
    networks:
      - confluent
    command: bash -c "airflow triggerer"
    healthcheck:
      take a look at:
        - CMD-SHELL
        - airflow jobs test --job-type TriggererJob --hostname "${HOSTNAME}"
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: all the time

airflow-cli

This service permits command-line interface (CLI) operations on the Airflow atmosphere. It could run varied Airflow instructions for debugging or administration.

  airflow-cli:
    <<: *airflow-common
    depends_on:
      postgres:
        situation: service_healthy
    networks:
      - confluent
    profiles:
      - debug
    command:
      - bash
      - -c
      - airflow

airflow-init

This service initializes the database and creates the default admin person.

  airflow-init:
    <<: *airflow-common
    depends_on:
      postgres:
        situation: service_healthy
    command: >
      bash -c "
      airflow db init &&
      airflow customers create 
      --username admin 
      --firstname admin 
      --lastname admin 
      --role Admin 
      --email [email protected] 
      --password admin
      "
    networks:
      - confluent

postgres

This service hosts the PostgreSQL database utilized by Airflow to retailer its metadata. We now have set the username and password to connect with postgres as airflow.

  postgres:
    picture: postgres:16.0
    atmosphere:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    logging:
      choices:
        max-size: 10m
        max-file: "3"
    healthcheck:
      take a look at:
        - CMD
        - pg_isready
        - -U
        - airflow
      interval: 10s
      retries: 5
      start_period: 5s
    restart: all the time
    networks:
      - confluent

networks 

Defines a community for all providers to speak with one another.

All providers are linked to the identical confluent community, permitting them to work together seamlessly.

networks:
  confluent:

Full docker-compose.yml

x-airflow-common: &airflow-common
  construct:
    context: .
    dockerfile: Dockerfile 
  atmosphere:
    AIRFLOW__CORE__EXECUTOR: LocalExecutor
    AIRFLOW__CORE__FERNET_KEY: ""
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CORE__LOAD_EXAMPLES: "false"
    AIRFLOW__API__AUTH_BACKENDS: airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session
    AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: "true"
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/dags:/decide/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/dags/sql:/decide/airflow/dags/sql
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/decide/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/config:/decide/airflow/config
    - ${AIRFLOW_PROJ_DIR:-.}/plugins:/decide/airflow/plugins
    - ${AIRFLOW_PROJ_DIR:-.}/necessities.txt:/decide/airflow/necessities.txt
    - ${AIRFLOW_PROJ_DIR:-.}/sample_files:/decide/airflow/sample_files
    - ./spark_jobs:/decide/bitnami/spark_jobs
  person: ${AIRFLOW_UID:-50000}:0
  depends_on:
    postgres:
      situation: service_healthy
  networks:
    - confluent
providers:

  airflow-webserver:
      <<: *airflow-common
      depends_on:
        postgres:
          situation: service_healthy
        airflow-init:
          situation: service_completed_successfully
      networks:
        - confluent
      command: bash -c "airflow webserver"
      ports:
        - 8080:8080
      healthcheck:
        take a look at:
          - CMD
          - curl
          - --fail
          - http://localhost:8080/well being
        interval: 30s
        timeout: 10s
        retries: 5
        start_period: 30s
      restart: all the time
  airflow-scheduler:
    <<: *airflow-common

    depends_on:
      postgres:
        situation: service_healthy
      airflow-init:
        situation: service_completed_successfully
    networks:
      - confluent
    command: bash -c "airflow scheduler"
    healthcheck:
      take a look at:
        - CMD
        - curl
        - --fail
        - http://localhost:8974/well being
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: all the time
  airflow-triggerer:
    <<: *airflow-common

    depends_on:
      postgres:
        situation: service_healthy
      airflow-init:
        situation: service_completed_successfully
    networks:
      - confluent
    command: bash -c "airflow triggerer"
    healthcheck:
      take a look at:
        - CMD-SHELL
        - airflow jobs test --job-type TriggererJob --hostname "${HOSTNAME}"
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: all the time

  airflow-init:
    <<: *airflow-common

    depends_on:
      postgres:
        situation: service_healthy
    command: >
      bash -c "
      airflow db init &&
      airflow customers create 
      --username admin 
      --firstname admin 
      --lastname admin 
      --role Admin 
      --email [email protected] 
      --password admin
      "
    networks:
      - confluent


  airflow-cli:
    <<: *airflow-common
    depends_on:
      postgres:
        situation: service_healthy
    networks:
      - confluent
    profiles:
      - debug
    command:
      - bash
      - -c
      - airflow
  postgres:
    picture: postgres:1.0
    atmosphere:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    logging:
      choices:
        max-size: 10m
        max-file: "3"
    healthcheck:
      take a look at:
        - CMD
        - pg_isready
        - -U
        - airflow
      interval: 10s
      retries: 5
      start_period: 5s
    restart: all the time
    networks:
      - confluent
networks:
  confluent:

Information to Undertaking Setup and Execution

We’ll now look into the steps for establishing our challenge and execution.

Step 1: Making a Folder

First step is to create a folder after which paste the above Dockerfile and docker-compose.yml recordsdata inside this folder.

Step 2: Creating Requirement

Create a necessities.txt file and write needed python libraries. It could embrace pandas, numpy and so forth.

Step2: Creating requirement

Step 3: Docker Desktop

Begin your Docker desktop. Then, open your terminal and write  ” docker-compose up  -d “.

Step3: Docker Desktop
Step3: Docker Desktop

It’s best to see one thing just like the above photos. After the command is executed efficiently, you must be capable of see these recordsdata :

You should see something like the above images. After the command is executed successfully, you should be able to see these files :

Step 4: Confirm Airflow Set up

In your browser, enter this URL : http://localhost:8080. In case your set up was profitable, you must see:

Step4: Verify airflow installation

Enter your username and password as admin. After logging in, you must see :

Enter your username and password as admin. After logging in, you should see :

Step 5: Connecting Postgres to Airflow

We use postgres_conn_id to specify the connection to the PostgreSQL database inside Airflow. You outline this connection ID within the Airflow UI, the place you configure database credentials such because the host, port, username, and password.

Through the use of postgres_conn_id, Airflow is aware of which database to connect with when executing SQL instructions. It abstracts away the necessity to hard-code connection particulars immediately within the DAG code, enhancing safety and adaptability.

Step 5.1: On the Airflow UI, navigate to Admin>Connections

Admin>Connections

Step 5.2: Click on on ‘Add a brand new file’ 

Step5.2: Click on 'Add a new record' 

Step 5.3: Add the next parameters rigorously.

Configuring Postgres Connection

Right here, we’ve got given the essential connection parameters which can enable Airflow to connect with our postgres server configured on Docker.

NOTE : Write connection_id as ‘write_to_psql‘ correctly as will probably be used later. The login and password to connect with PostgreSQL are each set to airflow

Step 5.4: Getting ready dummy enter.csv file

Put together a dummy enter.csv file for the challenge. Retailer the file inside sample_files folder which was created.

input.csv: CSV to PostgreSQL

Understanding the DAG Setup in Airflow

First, we import the mandatory parts: DAG (to create the workflow), PythonOperator (to run Python features), and PostgresOperator (to work together with a PostgreSQL database). We additionally outline default arguments just like the proprietor of the workflow (airflow) and the beginning date of the duties, guaranteeing the workflow begins on January 1, 2024. Lastly, we import Pandas to deal with knowledge, enabling us to learn CSV recordsdata effectively.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.suppliers.postgres.operators.postgres import PostgresOperator
from datetime import datetime
import pandas as pd
# Outline default arguments
default_args = {
    'proprietor': 'airflow',
    'start_date': datetime(2024, 1, 1),
}

Understanding generate_insert_queries() Perform

This perform is liable for studying a CSV file utilizing Pandas, then creating SQL insert queries to insert knowledge right into a PostgreSQL desk. It loops via every row of the CSV, producing an SQL assertion that inserts the id, title, and age values right into a desk. Lastly, you save these queries to a file named insert_queries.sql contained in the dags/sql folder, permitting Airflow to execute them later utilizing a PostgresOperator.

# Perform to learn the CSV and generate insert queries
def generate_insert_queries():
    # Learn the CSV file
    df = pd.read_csv(CSV_FILE_PATH)
    CSV_FILE_PATH = 'sample_files/enter.csv'
    # Create a listing of SQL insert queries
    insert_queries = []
    for index, row in df.iterrows():
        insert_query = f"INSERT INTO sample_table (id, title, age) VALUES ({row['id']}, '{row['name']}', {row['age']});"
        insert_queries.append(insert_query)
    
    # Save queries to a file for the PostgresOperator to execute
    with open('./dags/sql/insert_queries.sql', 'w') as f:
        for question in insert_queries:
            f.write(f"{question}n")
insert_queries.sql file: CSV to PostgreSQL

DAG Definition

This block defines the DAG (Directed Acyclic Graph), which represents the complete workflow. The parameters embrace:

  • schedule_interval=’@as soon as’: This specifies that the DAG ought to run solely as soon as.
  • catchup=False: Prevents backfilling of DAG runs for missed schedules.
  • default_args=default_args: Reuses default arguments like the beginning date for the DAG.
with DAG('csv_to_postgres_dag',
         default_args=default_args,
         schedule_interval="@as soon as",
         catchup=False) as dag:

Job ID

Every Airflow job receives a novel task_id, which serves as its figuring out title inside the DAG.

For instance:

task_id='create_table'

PostgresOperator

The PostgresOperator means that you can run SQL instructions in a PostgreSQL database utilizing Airflow.

  • task_id=’create_table’: This units the distinctive identifier for the duty inside the DAG.
  • postgres_conn_id=’write_to_psql’: Refers back to the Airflow connection ID used to connect with the PostgreSQL database
  • sql: Accommodates the SQL question that drops the desk if it exists after which creates a brand new sample_table with id, title, and age columns.
    create_table = PostgresOperator(
        task_id='create_table',
        postgres_conn_id='write_to_psql',  # Change along with your connection ID
        sql="""
        DROP TABLE IF EXISTS sample_table;
        CREATE TABLE sample_table (
            id SERIAL PRIMARY KEY,
            title VARCHAR(50),
            age INT
        );
        """
    )

PythonOperator

The PythonOperator means that you can run Python features as duties. Right here, it calls the generate_insert_queries perform, which generates SQL queries from a CSV file.

generate_queries = PythonOperator(
    task_id='generate_insert_queries',
    python_callable=generate_insert_queries
)

PostgresOperator

  • task_id=’run_insert_queries’ : A novel identifier for the duty that runs the SQL insert queries.
  • postgres_conn_id=’write_to_psql’: Connection ID utilized by Airflow to connect with the PostgreSQL database, which is pre-configured within the Airflow UI.
  • sql=’sql/insert_queries.sql’: The trail to the file containing SQL queries that might be executed in PostgreSQL.
    run_insert_queries = PostgresOperator(
        task_id='run_insert_queries',
        postgres_conn_id='write_to_psql',  # Outline this connection in Airflow UI
        sql="sql/insert_queries.sql"
    )
create_table>>generate_queries>>run_insert_queries

The road create_table >> generate_queries >> run_insert_queries establishes a sequence of job execution in Apache Airflow. It signifies that:

  • create_table job have to be accomplished efficiently earlier than the subsequent job can start.
  • As soon as create_table is finished, the generate_queries job will run.
  • After generate_queries has completed executing, the run_insert_queries job will then execute.

In brief, it defines a linear workflow the place every job is determined by the profitable completion of the earlier one.

Creating Python File

In your VS Code, create a Python file named pattern.py contained in the robotically created dags folder.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.suppliers.postgres.operators.postgres import PostgresOperator
from datetime import datetime
import pandas as pd
# Outline default arguments
default_args = {
    'proprietor': 'airflow',
    'start_date': datetime(2024, 1, 1),
}

# Perform to learn the CSV and generate insert queries
def generate_insert_queries():
    CSV_FILE_PATH = 'sample_files/enter.csv'
    # Learn the CSV file
    df = pd.read_csv(CSV_FILE_PATH)
    
    # Create a listing of SQL insert queries
    insert_queries = []
    for index, row in df.iterrows():
        insert_query = f"INSERT INTO sample_table (id, title, age) VALUES ({row['id']}, '{row['name']}', {row['age']});"
        insert_queries.append(insert_query)
    
    # Save queries to a file for the PostgresOperator to execute
    with open('./dags/sql/insert_queries.sql', 'w') as f:
        for question in insert_queries:
            f.write(f"{question}n")


# Outline the DAG
with DAG('csv_to_postgres_dag',
         default_args=default_args,
         schedule_interval="@as soon as",
         catchup=False) as dag:

    # Job to create a PostgreSQL desk
    create_table = PostgresOperator(
        task_id='create_table',
        postgres_conn_id='write_to_psql',  # Change along with your connection ID
        sql="""
        DROP TABLE IF EXISTS sample_table;
        CREATE TABLE sample_table (
            id SERIAL PRIMARY KEY,
            title VARCHAR(50),
            age INT
        );
        """
    )
    generate_queries = PythonOperator(
    task_id='generate_insert_queries',
    python_callable=generate_insert_queries
    )

    # Job to run the generated SQL queries utilizing PostgresOperator
    run_insert_queries = PostgresOperator(
        task_id='run_insert_queries',
        postgres_conn_id='write_to_psql',  # Outline this connection in Airflow UI
        sql="sql/insert_queries.sql"
    )
    create_table>>generate_queries>>run_insert_queries
    # Different duties can comply with right here

NOTE: Please put the pattern.py contained in the dags folder solely. It’s because by default, airflow seems for the recordsdata contained in the dags folder.

Configuring Postgres

Earlier than operating our code, we have to create a pattern database inside our PostgreSQL container to put in writing our CSV knowledge.

In your Docker Desktop, navigate to the postgres container and go to the EXEC part. Write the next instructions which can create a database known as pattern inside our Postgres database.

Create a database: CSV to PostgreSQL

Run Your Code

Now that you just’ve constructed the muse of your Airflow challenge, it’s time to see your arduous work come to fruition! Working your code is the place the magic occurs; you’ll witness your CSV knowledge being remodeled and seamlessly inserted into your PostgreSQL database.

  • In your terminal, once more write docker compose up -d. This can load up our pattern.py code inside airflow.
  • Go to the Airflow Residence Web page and click on on the dag.
Run Your Code: CSV to PostgreSQL

Upon clicking Graph, you’ll be able to visualize your pipeline. The code part will present your newest code written within the file.

Run Your Code: CSV to PostgreSQL

Upon clicking the play button on the upper-right nook (subsequent to “Subsequent Run ID: None” marker), you’ll be able to run the dag. After operating the dag, click on on any job within the graph part to see its particulars. Discover to seek out out extra.

If there have been no errors, then you must see a Inexperienced coloration bar on the left facet.

CSV to PostgreSQL

Nevertheless if there are any errors, click on on the duty which failed after which click on on the Logs to grasp the error :

click on the Logs to understand the error: CSV to PostgreSQL

Conclusion

This challenge efficiently demonstrated the combination of Airflow with PostgreSQL to automate the method of studying knowledge from a CSV file and inserting it right into a database. All through the challenge, varied operators have been explored and carried out for environment friendly knowledge dealing with strategies. This challenge showcases the ability of Airflow in automating knowledge workflows and lays the groundwork for additional exploration in knowledge engineering.

Github Repo : Undertaking File

Key Takeaways

  • Using Airflow to automate knowledge workflows considerably enhances effectivity and reduces guide intervention in knowledge processing duties
  • The PostgresOperator simplifies executing SQL instructions, making database operations seamless inside Airflow DAGs.
  • Docker helps bundle the Airflow setup right into a container, making it simple to run the identical software anyplace with out worrying about completely different environments.

Ceaselessly Requested Questions

Q1. What’s Apache Airflow used for?

A. Apache Airflow means that you can programmatically creator, schedule, and monitor workflows as an open-source platform. It helps automate advanced knowledge pipelines by organizing duties into directed acyclic graphs (DAGs).

Q2. Why is Docker essential for Apache Airflow?

A. Docker simplifies the setup and deployment of Apache Airflow by creating remoted, reproducible environments. It ensures seamless configuration and operation of all needed dependencies and providers, comparable to PostgreSQL, inside containers.

Q3. How does Apache Airflow combine with PostgreSQL?

A. Airflow can hook up with PostgreSQL utilizing its built-in database operators. You need to use these operators to execute SQL queries, handle database operations, and automate knowledge pipelines that contain studying from or writing to PostgreSQL databases.

This fall. How do I learn CSV recordsdata utilizing Airflow?

A. You need to use Python scripts in Airflow duties to learn CSV recordsdata. The duty can extract knowledge from the CSV and, via a database operator, insert the information into PostgreSQL, automating the complete workflow.

Q5. Can I scale Apache Airflow for bigger initiatives?

A. Sure, Apache Airflow can scale simply. With Docker, you’ll be able to run a number of employee nodes, and Airflow can distribute duties throughout them. Moreover, integrating a database like PostgreSQL helps environment friendly dealing with of large-scale knowledge.

The media proven on this article is just not owned by Analytics Vidhya and is used on the Writer’s discretion.