Open
Description
Hi team,
Apologies — the code I shared earlier was an updated version. I kindly request that this issue be reopened for further review.
To clarify, the previous version of the DAG uses the days_ago function from airflow.utils.dates to set the start_date. This change is relevant for scheduling logic and historical backfill behavior.
Here is a snippet from the older version of the DAG:
from multiprocessing import Event import os from datetime import timedelta,date import resource import time import requests from uuid import uuid4 as v4 from itertools import groupby from operator import itemgetter from airflow import DAG from airflow.models import Variable from airflow.models.baseoperator import chain from airflow.operators.python_operator import PythonOperator # from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator from airflow.providers.postgres.operators.postgres import PostgresOperator from airflow.providers.amazon.aws.operators.glue import GlueJobOperator from airflow.utils.dates import days_ago import boto3 import pandas as pd from boto3.dynamodb.conditions import Key, Attr import json from datetime import datetime,date import time import pandas as pd import pyarrow.parquet as pq import datetime import numpy as np import json import boto3 from io import BytesIO import os from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from airflow.operators.dummy_operator import DummyOperator import concurrent.futures SLACK_CONN_ID = "SLACK_ALERTS" AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID") AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY") # today = datetime.datetime.today().strftime('%Y-%m-%d') today = date.today() s3_bucket = "razor-prod-raw-datalake" sqs_client = boto3.client('sqs' , region_name = 'eu-central-1' ,aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY) os.environ['AWS_ACCESS_KEY_ID'] = AWS_ACCESS_KEY_ID os.environ['AWS_SECRET_ACCESS_KEY'] = AWS_SECRET_ACCESS_KEY os.environ['AWS_REGION'] = 'eu-central-1' resource = boto3.resource( "dynamodb", region_name="eu-central-1", aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, ) lambda_client = boto3.client( "lambda", region_name="eu-central-1", aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, ) class S3Client: def __init__(self): self.s3_client = boto3.resource('s3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY) def put(self, local_file_path, s3_bucket, s3_file_path_and_name): return self.s3_client.meta.client.upload_file(f'{local_file_path}', s3_bucket, f'{s3_file_path_and_name}') def get(self,s3_bucket,s3_file_path_and_name): s3_object = self.s3_client.meta.client.get_object(Bucket=s3_bucket,Key=s3_file_path_and_name) return s3_object["Body"].read().decode("utf-8") def put_object(self, body, s3_bucket, s3_file_path_and_name): return self.s3_client.meta.client.put_object(Body=body.encode('UTF-8'),Bucket=s3_bucket,Key=s3_file_path_and_name) def put_object_without_encode(self, body, s3_bucket, s3_file_path_and_name): return self.s3_client.meta.client.put_object(Body=body,Bucket=s3_bucket,Key=s3_file_path_and_name) def put_object_public(self, body, s3_bucket, s3_file_path_and_name): return self.s3_client.meta.client.put_object(Body=body.encode('UTF-8'),Bucket=s3_bucket,Key=s3_file_path_and_name,ACL='public-read') def list_files_in_folder(self, s3_bucket, folder_path): try: file_list = [] paginator = self.s3_client.meta.client.get_paginator('list_objects_v2') page_iterator = paginator.paginate(Bucket=s3_bucket, Prefix=folder_path) for page in page_iterator: if 'Contents' in page: for obj in page['Contents']: file_info = { 'key': obj['Key'], 'size': obj['Size'] # Size in bytes } file_list.append(file_info) return file_list except Exception as e: print(f"Error listing files in folder: {e}") return [] def delete_file(self, s3_bucket, s3_file_path_and_name): try: response = self.s3_client.meta.client.delete_object(Bucket=s3_bucket, Key=s3_file_path_and_name) return response except Exception as e: print(f"Error deleting file: {e}") return None def delete_folder(self, s3_bucket, folder_path): try: # List all files in the folder file_list = self.list_files_in_folder(s3_bucket, folder_path) if not file_list: print(f"No files found in folder: {folder_path}") return [] responses = [] for file_info in file_list: response = self.delete_file(s3_bucket, file_info['key']) responses.append(response) return responses except Exception as e: print(f"Error deleting folder: {e}") return None s3_client = S3Client() def check_queue_status(): sqs = boto3.client('sqs',region_name='eu-central-1',aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY) queue_url = 'https://sqs.eu-central-1.amazonaws.com/815361800176/keywordPresence-prod-recieveCompleteRun' max_time_seconds = 3600 * 20 # 5 hours start_time = time.time() while True: # Check if the time limit has been reached elapsed_time = time.time() - start_time if elapsed_time >= max_time_seconds: print("Time limit reached. Exiting loop.") break # Get the approximate number of messages in the queue response = sqs.get_queue_attributes( QueueUrl=queue_url, AttributeNames=['ApproximateNumberOfMessages'] ) # Check if the queue is empty num_messages = int(response['Attributes']['ApproximateNumberOfMessages']) if num_messages == 0: print("Queue is empty.") break else: print(f"Queue has {num_messages} messages.") # Sleep for a while before checking again time.sleep(15) # Adjust sleep duration as needed print("Finished checking queue status.") print("Entering 15 min sleep for final Lambdas to be computed.") time.sleep(900) def read_s3_parquet(bucket, table_path): # Create a ParquetDataset object to access the Parquet files in the S3 bucket dataset = pq.ParquetDataset(f"s3://{bucket}/{table_path}") # Read the Parquet files and convert them into a Table object table = dataset.read() # Convert the Table object into a Pandas DataFrame df = table.to_pandas() # Return the DataFrame containing the Parquet data return df def get_random_filename(s3, bucket_name, folder_path): # List objects in the specified folder response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_path) # Assuming there's only one file in the folder if 'Contents' in response: return response['Contents'][0]['Key'] return None def rename_s3_file(s3 , bucket_name, folder_path, new_filename): # Get the random filename from the specified folder old_key = get_random_filename(s3 ,bucket_name, folder_path) print("Old Key" , old_key) if old_key: # Construct the full path for the new file new_key = f"{folder_path}/{new_filename}" print("New Key" , new_key) # Copy the object to the new key s3.copy_object(Bucket=bucket_name, CopySource={'Bucket': bucket_name, 'Key': old_key}, Key=new_key) # Delete the old object s3.delete_object(Bucket=bucket_name, Key=old_key) print(f"File renamed from {old_key} to {new_key}") else: raise ValueError(f"No file found in folder {folder_path}") def rename_op(): bucket_name = s3_bucket new_filename = 'compiled_op.parquet' s3_client = boto3.client("s3",region_name='eu-central-1', aws_access_key_id=AWS_ACCESS_KEY_ID , aws_secret_access_key = AWS_SECRET_ACCESS_KEY ) folder_path = f"keyword_presence/{today}/compiled" print("Folder Path" , folder_path) rename_s3_file( s3_client , bucket_name, folder_path, new_filename) def parellel_send_messages(country): keyword_folder = s3_client.list_files_in_folder("razor-prod-raw-datalake", f"content_workflows/unloads/keywords_presence_keywords/country_code={country}") part = 0 for file in keyword_folder: output_list = [] try: country_keywords = read_s3_parquet("razor-prod-raw-datalake", f"{file['key']}") except Exception as e: print(e) print(f'Could not find keywords for country : {country} with error' + e) # raise e try: country_listing = read_s3_parquet("razor-prod-raw-datalake", f"content_workflows/unloads/keywords_presence_listing/country_code={country}/000.parquet") except Exception as e: print(e) print(f'Could not find listing for country : {country} with error' + e) # raise e merged_df = pd.merge(country_listing, country_keywords, on='asin', how='inner') print("keyword count") print(country) x = 0 while x<merged_df.shape[0]: # print(f"{country} - {x} / {merged_df.shape[0]}") to_push_df = merged_df.iloc[x:x+1000] pandas_dataframe_to_s3(to_push_df,s3_bucket,f'keyword_presence/{today}/inputs',f"{country}_{part}") message_body = { 'bucket': 'razor-prod-raw-datalake', 'output_key' : f'keyword_presence/{today}/outputs/{country}_{part}', 'input_key' : f'keyword_presence/{today}/inputs/{country}_{part}.parquet', 'country_code' : country, } resp = sqs_client.send_message( QueueUrl='https://sqs.eu-central-1.amazonaws.com/815361800176/keywordPresence-prod-recieveCompleteRun', MessageBody=json.dumps(message_body) ) message_body['resp'] = resp output_list.append(message_body) x+=1000 part+=1000 print(part) return output_list , f"{country} has completed with {x/1000} calls" country_list_3 = [ 'CA' , 'ES' , 'FR' , 'MX' , 'NL' , 'PL' , 'SE' , 'TR', 'DE' , 'IT' , 'UK' , 'US' ] def main(country): output_list , result = parellel_send_messages(country) print(result) def pandas_dataframe_to_s3( pandas_df, s3_bucket, s3_path, tablename, access_key=None, secret_key=None ): """ Uploads a pandas DataFrame to S3 as a parquet file. - pandas_df: The DataFrame to upload. - s3_bucket: The name of the S3 bucket to upload the file to. - s3_path: The path in the S3 bucket where the file will be saved. - tablename: The name of the table or identifier for the parquet file. - access_key: AWS access key (optional, defaults to class attribute if not provided). - secret_key: AWS secret key (optional, defaults to class attribute if not provided). """ # access_key = access_key # secret_key = secret_key df = pandas_df s3_client = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY ) out_buffer = BytesIO() df.to_parquet(out_buffer, index=False) filename = f"{s3_path}/{tablename}.parquet" s3_client.put_object(Bucket=s3_bucket, Key=filename, Body=out_buffer.getvalue()) # print('pandas dataframe written to s3 as parquet') DEFAULT_ARGS = { "owner": "tech_analytics", "depends_on_past": False, "retries": 0, "email_on_failure": False, "email_on_retry": False, "catchup": False, "redshift_conn_id": "REDSHIFT_FRANKFURT", "postgres_conn_id": "REDSHIFT_FRANKFURT", "conn_id": "REDSHIFT_FRANKFURT", # "on_failure_callback": task_fail_slack_alert, } with DAG( dag_id="rgbit_integrated_kwdb_static", description="keyword", default_args=DEFAULT_ARGS, # dagrun_timeout=timedelta(hours=1), start_date=days_ago(1), schedule_interval="0 17 * * 7", tags=["keyword","static"], template_searchpath="/usr/local/airflow/include/redshift_queries", catchup=False, max_active_tasks=2 ) as dag: country_tasks_3 = [] for country in country_list_3: main_fn = PythonOperator( task_id=f'process_{country}_static', python_callable=main, op_kwargs={'country': country}, dag=dag, retries=2, # Add retries for each task retry_delay=timedelta(minutes=5) # Optionally set a different retry delay ) country_tasks_3.append(main_fn) # main_fn = PythonOperator( # task_id="main_function", # python_callable=main, # trigger_rule="all_done", # ) wait_for_lambda_1 = PythonOperator( task_id="wait_for_lambda_1", # python_callable=sleeper, python_callable=check_queue_status, op_kwargs={}, trigger_rule="all_done", dag=dag ) dummy_dag_3 = DummyOperator(task_id='dummy_dag_3',trigger_rule="all_done", dag=dag) dummy_dag_4 = DummyOperator(task_id='dummy_dag_4',trigger_rule="all_done", dag=dag) trigger_glue_static = GlueJobOperator( task_id="trigger_glue_static", job_name="keyword_presence", region_name='eu-central-1', dag=dag, ) rename_file_2 = PythonOperator( task_id="rename_file_2", python_callable=rename_op, dag=dag ) folder_delete = PythonOperator( task_id="folder_delete", python_callable=s3_client.delete_folder, op_kwargs={'s3_bucket': s3_bucket, 'folder_path': f"content_workflows/unloads/keywords_presence_keywords/"}, ) sql_3 = ''' unload($$ select distinct asin, country_code , keyword FROM temp_rgbit_keyword_static_list_q2 ; $$) to 's3://razor-prod-raw-datalake/content_workflows/unloads/keywords_presence_keywords/' FORMAT PARQUET partition by (country_code) ALLOWOVERWRITE REGION 'eu-central-1' MAXFILESIZE AS 50 MB PARALLEL ON iam_role 'arn:aws:iam::815361800176:role/redshift-unload'; ''' unload_items_3 = SQLExecuteQueryOperator( task_id=f"unload_items_to_s3_3", sql=sql_3 ) data_push_static=''' truncate table rgbit_keyword_presence_intermediate_temp_static; COPY rgbit_keyword_presence_intermediate_temp_static FROM 's3://razor-prod-raw-datalake/keyword_presence/{today}/compiled/compiled_op.parquet' iam_role 'arn:aws:iam::815361800176:role/redshift-unload' FORMAT AS parquet '''.format( today = today ) data_push_static_1 = ''' truncate table public.rgbit_keyword_presence_intermediate_main_static; insert into rgbit_keyword_presence_intermediate_main_static select asin, country_code, keyword, max_column, max_value, duplication_score, field_score, backend_keywords__presence_sum, bp1__presence_sum, bp2__presence_sum, bp3__presence_sum, bp4__presence_sum, bp5__presence_sum, description__presence_sum, title__presence_sum, forbidden_new_keyword, forbidden_keyword_removed_kw, trademark_new_keyword, trademark_keyword_removed_kw from (select * ,row_number() over(partition by asin,country_code,keyword order by asin,country_code,keyword) as rn from rgbit_keyword_presence_intermediate_temp_static) where rn = 1 ''' trunctate_static = SQLExecuteQueryOperator( task_id=f"trunctate_static", sql=data_push_static ) push_data_static = SQLExecuteQueryOperator( task_id=f"data_push_static_1", sql=data_push_static_1 ) folder_delete >> unload_items_3 >> dummy_dag_3 for task in country_tasks_3: dummy_dag_3 >> task >> wait_for_lambda_1 wait_for_lambda_1 >> trigger_glue_static >> rename_file_2 >> trunctate_static >> push_data_static
Originally posted by @Srichandra-razor in #51651