Skip to content

Airflow DAG Scheduling issue #51771

Open
Open
@Srichandra-razor

Description

@Srichandra-razor

Hi team,

Apologies — the code I shared earlier was an updated version. I kindly request that this issue be reopened for further review.

To clarify, the previous version of the DAG uses the days_ago function from airflow.utils.dates to set the start_date. This change is relevant for scheduling logic and historical backfill behavior.

Here is a snippet from the older version of the DAG:

from multiprocessing import Event
import os
from datetime import timedelta,date
import resource
import time
import requests
from uuid import uuid4 as v4
from itertools import groupby
from operator import itemgetter
from airflow import DAG
from airflow.models import Variable
from airflow.models.baseoperator import chain
from airflow.operators.python_operator import PythonOperator
# from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.utils.dates import days_ago
import boto3
import pandas as pd
from boto3.dynamodb.conditions import Key, Attr
import json
from datetime import datetime,date
import time
import pandas as pd
import pyarrow.parquet as pq
import datetime
import numpy as np
import json
import boto3
from io import BytesIO
import os
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.operators.dummy_operator import DummyOperator


import concurrent.futures


SLACK_CONN_ID = "SLACK_ALERTS"
AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY")

# today = datetime.datetime.today().strftime('%Y-%m-%d')
today = date.today()

s3_bucket = "razor-prod-raw-datalake"
sqs_client = boto3.client('sqs' , region_name = 'eu-central-1' ,aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY)

os.environ['AWS_ACCESS_KEY_ID'] = AWS_ACCESS_KEY_ID
os.environ['AWS_SECRET_ACCESS_KEY'] = AWS_SECRET_ACCESS_KEY
os.environ['AWS_REGION'] = 'eu-central-1'

resource = boto3.resource(
    "dynamodb",
    region_name="eu-central-1",
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
)

lambda_client = boto3.client(
    "lambda",
    region_name="eu-central-1",
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
)

class S3Client:
    def __init__(self):
        self.s3_client = boto3.resource('s3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY)

    def put(self, local_file_path, s3_bucket, s3_file_path_and_name):
        return self.s3_client.meta.client.upload_file(f'{local_file_path}', s3_bucket, f'{s3_file_path_and_name}')
    
    def get(self,s3_bucket,s3_file_path_and_name):
        s3_object = self.s3_client.meta.client.get_object(Bucket=s3_bucket,Key=s3_file_path_and_name)
        return s3_object["Body"].read().decode("utf-8")

    def put_object(self, body, s3_bucket, s3_file_path_and_name):
        return self.s3_client.meta.client.put_object(Body=body.encode('UTF-8'),Bucket=s3_bucket,Key=s3_file_path_and_name)
    
    def put_object_without_encode(self, body, s3_bucket, s3_file_path_and_name):
        return self.s3_client.meta.client.put_object(Body=body,Bucket=s3_bucket,Key=s3_file_path_and_name)
    
    def put_object_public(self, body, s3_bucket, s3_file_path_and_name):
        return self.s3_client.meta.client.put_object(Body=body.encode('UTF-8'),Bucket=s3_bucket,Key=s3_file_path_and_name,ACL='public-read')
    
    def list_files_in_folder(self, s3_bucket, folder_path):
        try:
            file_list = []
            paginator = self.s3_client.meta.client.get_paginator('list_objects_v2')
            page_iterator = paginator.paginate(Bucket=s3_bucket, Prefix=folder_path)

            for page in page_iterator:
                if 'Contents' in page:
                    for obj in page['Contents']:
                        file_info = {
                            'key': obj['Key'],
                            'size': obj['Size']  # Size in bytes
                        }
                        file_list.append(file_info)

            return file_list
        except Exception as e:
            print(f"Error listing files in folder: {e}")
            return []
        
    def delete_file(self, s3_bucket, s3_file_path_and_name):
        try:
            response = self.s3_client.meta.client.delete_object(Bucket=s3_bucket, Key=s3_file_path_and_name)
            return response
        except Exception as e:
            print(f"Error deleting file: {e}")
            return None

    def delete_folder(self, s3_bucket, folder_path):
        try:
            # List all files in the folder
            file_list = self.list_files_in_folder(s3_bucket, folder_path)
            if not file_list:
                print(f"No files found in folder: {folder_path}")
                return []

            responses = []
            for file_info in file_list:
                response = self.delete_file(s3_bucket, file_info['key'])
                responses.append(response)

            return responses
        except Exception as e:
            print(f"Error deleting folder: {e}")
            return None
        
s3_client = S3Client()

def check_queue_status():
    sqs = boto3.client('sqs',region_name='eu-central-1',aws_access_key_id=AWS_ACCESS_KEY_ID,
            aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
    
    queue_url = 'https://sqs.eu-central-1.amazonaws.com/815361800176/keywordPresence-prod-recieveCompleteRun'
    max_time_seconds = 3600 * 20 # 5 hours
    
    start_time = time.time()
    while True:
        # Check if the time limit has been reached
        elapsed_time = time.time() - start_time
        if elapsed_time >= max_time_seconds:
            print("Time limit reached. Exiting loop.")
            break
        
        # Get the approximate number of messages in the queue
        response = sqs.get_queue_attributes(
            QueueUrl=queue_url,
            AttributeNames=['ApproximateNumberOfMessages']
        )
        
        # Check if the queue is empty
        num_messages = int(response['Attributes']['ApproximateNumberOfMessages'])
        if num_messages == 0:
            print("Queue is empty.")
            break
        else:
            print(f"Queue has {num_messages} messages.")
        
        # Sleep for a while before checking again
        time.sleep(15)  # Adjust sleep duration as needed
    
    print("Finished checking queue status.")

    print("Entering 15 min sleep for final Lambdas to be computed.")
    time.sleep(900)

def read_s3_parquet(bucket, table_path):
        # Create a ParquetDataset object to access the Parquet files in the S3 bucket
        dataset = pq.ParquetDataset(f"s3://{bucket}/{table_path}")

        # Read the Parquet files and convert them into a Table object
        table = dataset.read()

        # Convert the Table object into a Pandas DataFrame
        df = table.to_pandas()

        # Return the DataFrame containing the Parquet data
        return df

def get_random_filename(s3, bucket_name, folder_path):

    # List objects in the specified folder
    response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_path)

    # Assuming there's only one file in the folder
    if 'Contents' in response:
        return response['Contents'][0]['Key']

    return None

def rename_s3_file(s3 , bucket_name, folder_path, new_filename):

    # Get the random filename from the specified folder
    old_key = get_random_filename(s3 ,bucket_name, folder_path)
    print("Old Key" , old_key)
    if old_key:
        # Construct the full path for the new file
        new_key = f"{folder_path}/{new_filename}"
        print("New Key" , new_key)
        # Copy the object to the new key
        s3.copy_object(Bucket=bucket_name, CopySource={'Bucket': bucket_name, 'Key': old_key}, Key=new_key)

        # Delete the old object
        s3.delete_object(Bucket=bucket_name, Key=old_key)
        print(f"File renamed from {old_key} to {new_key}")
    else:
        raise ValueError(f"No file found in folder {folder_path}")
    
def rename_op():

    bucket_name = s3_bucket
    new_filename = 'compiled_op.parquet'
    s3_client = boto3.client("s3",region_name='eu-central-1', aws_access_key_id=AWS_ACCESS_KEY_ID , aws_secret_access_key = AWS_SECRET_ACCESS_KEY )
    folder_path = f"keyword_presence/{today}/compiled"    

    print("Folder Path" , folder_path)
    rename_s3_file( s3_client , bucket_name, folder_path, new_filename)


def parellel_send_messages(country):

    keyword_folder = s3_client.list_files_in_folder("razor-prod-raw-datalake", f"content_workflows/unloads/keywords_presence_keywords/country_code={country}")
    part = 0
    for file in keyword_folder:
    
        output_list = []
        try:
            country_keywords = read_s3_parquet("razor-prod-raw-datalake", f"{file['key']}")
        except Exception as e:
            print(e)
            print(f'Could not find keywords for country : {country} with error' + e)
            # raise e
        try:
            country_listing = read_s3_parquet("razor-prod-raw-datalake", f"content_workflows/unloads/keywords_presence_listing/country_code={country}/000.parquet")
        except Exception as e:
            print(e)
            print(f'Could not find listing for country : {country} with error' + e)
            # raise e
        

        merged_df = pd.merge(country_listing, country_keywords, on='asin', how='inner')
        print("keyword count")
        print(country)
        
        x = 0 

        while x<merged_df.shape[0]:
    #         print(f"{country} - {x} / {merged_df.shape[0]}")
            to_push_df = merged_df.iloc[x:x+1000]
            pandas_dataframe_to_s3(to_push_df,s3_bucket,f'keyword_presence/{today}/inputs',f"{country}_{part}")
            
            message_body = {
                'bucket': 'razor-prod-raw-datalake',
                'output_key' : f'keyword_presence/{today}/outputs/{country}_{part}',
                'input_key' : f'keyword_presence/{today}/inputs/{country}_{part}.parquet',
                'country_code' : country,
                
            }

            resp = sqs_client.send_message(
                QueueUrl='https://sqs.eu-central-1.amazonaws.com/815361800176/keywordPresence-prod-recieveCompleteRun',
                MessageBody=json.dumps(message_body) 
            )
            
            message_body['resp'] = resp
            output_list.append(message_body)
            

            x+=1000
            part+=1000
            print(part)
            
    return output_list , f"{country} has completed with {x/1000} calls"

country_list_3 = [ 'CA' , 'ES' , 'FR' ,  'MX' , 'NL' , 'PL' , 'SE' , 'TR',  'DE' , 'IT' , 'UK' , 'US' ]

def main(country):
    output_list , result = parellel_send_messages(country)
    print(result)


def pandas_dataframe_to_s3(
    pandas_df,
    s3_bucket,
    s3_path,
    tablename,
    access_key=None,
    secret_key=None
):
    """
    Uploads a pandas DataFrame to S3 as a parquet file.

    - pandas_df: The DataFrame to upload.
    - s3_bucket: The name of the S3 bucket to upload the file to.
    - s3_path: The path in the S3 bucket where the file will be saved.
    - tablename: The name of the table or identifier for the parquet file.
    - access_key: AWS access key (optional, defaults to class attribute if not provided).
    - secret_key: AWS secret key (optional, defaults to class attribute if not provided).
    """
    # access_key = access_key
    # secret_key = secret_key

    df = pandas_df

    s3_client = boto3.client(
        's3',
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY
    )
    out_buffer = BytesIO()
    df.to_parquet(out_buffer, index=False)
    filename = f"{s3_path}/{tablename}.parquet"
    s3_client.put_object(Bucket=s3_bucket, Key=filename, Body=out_buffer.getvalue())
#     print('pandas dataframe written to s3 as parquet')



DEFAULT_ARGS = {
    "owner": "tech_analytics",
    "depends_on_past": False,
    "retries": 0,
    "email_on_failure": False,
    "email_on_retry": False,
    "catchup": False,
    "redshift_conn_id": "REDSHIFT_FRANKFURT",
    "postgres_conn_id": "REDSHIFT_FRANKFURT",
    "conn_id": "REDSHIFT_FRANKFURT",
    # "on_failure_callback": task_fail_slack_alert,
}

with DAG(
    dag_id="rgbit_integrated_kwdb_static",
    description="keyword",
    default_args=DEFAULT_ARGS,
    # dagrun_timeout=timedelta(hours=1),
    start_date=days_ago(1),
    schedule_interval="0 17 * * 7",
    tags=["keyword","static"],
    template_searchpath="/usr/local/airflow/include/redshift_queries",
    catchup=False,  
    max_active_tasks=2
) as dag:


    
    country_tasks_3 = []
    for country in country_list_3:
        main_fn = PythonOperator(
            task_id=f'process_{country}_static',
            python_callable=main,
            op_kwargs={'country': country},
            dag=dag,
            retries=2,  # Add retries for each task
            retry_delay=timedelta(minutes=5)  # Optionally set a different retry delay
        
        )
        country_tasks_3.append(main_fn)
    # main_fn = PythonOperator(
    #     task_id="main_function",
    #     python_callable=main,
    #     trigger_rule="all_done",
    # )

    wait_for_lambda_1 = PythonOperator(
        task_id="wait_for_lambda_1",
        # python_callable=sleeper,
        python_callable=check_queue_status,
        op_kwargs={},
        trigger_rule="all_done",
        dag=dag
    )

    dummy_dag_3 = DummyOperator(task_id='dummy_dag_3',trigger_rule="all_done", dag=dag)

    dummy_dag_4 = DummyOperator(task_id='dummy_dag_4',trigger_rule="all_done", dag=dag)


    trigger_glue_static = GlueJobOperator(
        task_id="trigger_glue_static",
        job_name="keyword_presence",
        region_name='eu-central-1',
        dag=dag,
    )


    rename_file_2 = PythonOperator(
        task_id="rename_file_2",
        python_callable=rename_op,
        dag=dag
    )

    folder_delete  =  PythonOperator(
        task_id="folder_delete", 
        python_callable=s3_client.delete_folder,
        op_kwargs={'s3_bucket': s3_bucket, 'folder_path': f"content_workflows/unloads/keywords_presence_keywords/"},
    )

    sql_3 = '''
        unload($$	
        select distinct asin, country_code , keyword
        FROM temp_rgbit_keyword_static_list_q2
        ;
        
        $$)
        to 's3://razor-prod-raw-datalake/content_workflows/unloads/keywords_presence_keywords/'
        FORMAT PARQUET
        partition by (country_code)
        ALLOWOVERWRITE
        REGION 'eu-central-1'
        MAXFILESIZE AS 50 MB
        PARALLEL ON
        iam_role 'arn:aws:iam::815361800176:role/redshift-unload';
        '''

    unload_items_3 = SQLExecuteQueryOperator(
        task_id=f"unload_items_to_s3_3",
        sql=sql_3
    )



    data_push_static='''
            truncate table rgbit_keyword_presence_intermediate_temp_static;
            COPY rgbit_keyword_presence_intermediate_temp_static
            FROM 's3://razor-prod-raw-datalake/keyword_presence/{today}/compiled/compiled_op.parquet'
            iam_role 'arn:aws:iam::815361800176:role/redshift-unload'
            FORMAT AS parquet
    '''.format(
        today = today
    )

    data_push_static_1 = '''
        truncate table public.rgbit_keyword_presence_intermediate_main_static;
        insert into rgbit_keyword_presence_intermediate_main_static select asin, country_code, keyword, max_column, max_value, duplication_score, field_score, backend_keywords__presence_sum, bp1__presence_sum, bp2__presence_sum, bp3__presence_sum, bp4__presence_sum, bp5__presence_sum, description__presence_sum, title__presence_sum, forbidden_new_keyword, forbidden_keyword_removed_kw, trademark_new_keyword, trademark_keyword_removed_kw
        from (select * ,row_number() over(partition by asin,country_code,keyword order by asin,country_code,keyword) as rn from rgbit_keyword_presence_intermediate_temp_static)
        where rn = 1 
        '''

    trunctate_static = SQLExecuteQueryOperator(
        task_id=f"trunctate_static",
        sql=data_push_static
    )

    push_data_static = SQLExecuteQueryOperator(
        task_id=f"data_push_static_1",
        sql=data_push_static_1
    )

    folder_delete >> unload_items_3 >> dummy_dag_3

    for task in country_tasks_3:
        dummy_dag_3 >> task >> wait_for_lambda_1

    wait_for_lambda_1 >> trigger_glue_static >> rename_file_2 >> trunctate_static >> push_data_static

Originally posted by @Srichandra-razor in #51651

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:Schedulerincluding HA (high availability) schedulerkind:bugThis is a clearly a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions