Python operator is not being called in dynamic subdag in Airflow

I have created a subdag dynamically. Everything working properly, main_dag is running fine. Its PythonOperator function is being called. But Python callable in Subdag are not being called. Kindly help me. As I am new to Airflow, so got and merged this code from different sources.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from copy import deepcopy
import airflow

main_default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 12, 16),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
}

def sub_dag_method_a():
    """
    sub dag method a
    """
    import pdb;pdb.set_trace()
    print('Subdag method a')
    return 'a'

def sub_dag_method_b():
    """
    sub dag method a
    """
    print('Subdag method b')
    return 'b'

# sub dag arguments
def create_subdag(dag_parent, dag_id_child_prefix, db_name, dag_child_id, start_date, schedule_interval):
    # dag params

    # import pdb;pdb.set_trace()
    dag_id_child = '%s.%s_%s' % (dag_parent,dag_child_id,dag_id_child_prefix)
    # main default
    default_args_copy = deepcopy(main_default_args)
    subdag = DAG(dag_id=dag_id_child, schedule_interval=schedule_interval,
    start_date=start_date, default_args=default_args_copy)
    # operators
    tid_check = 'dummy_task_start_%s' % dag_id_child_prefix
    print(tid_check)
    method_start = DummyOperator(task_id=tid_check, dag=subdag, default_args=default_args_copy)

    tid_check = 'get_from_facebook_and_save_to_db_%s' % dag_id_child_prefix
    print(tid_check)

    method_a = PythonOperator(task_id=tid_check, dag=subdag, default_args=default_args_copy,
                                 python_callable=sub_dag_method_a)

    tid_check = 'save_to_es_fetch_from_db_%s' % dag_id_child_prefix
    print(tid_check)
    method_b = PythonOperator(task_id=tid_check, dag=subdag, default_args=default_args_copy,
                              provide_context=True,
                                 python_callable=sub_dag_method_b)

    tid_check = 'dummy_task_end_%s' % dag_id_child_prefix
    print(tid_check)
    method_end = DummyOperator(task_id=tid_check, dag=subdag, default_args=default_args_copy)

    method_start >> method_a
    method_a >> method_b
    method_b >> method_end

    return subdag

# main default arguments
# main dag
main_dag = DAG('main_dag', default_args=deepcopy(main_default_args), schedule_interval=timedelta(hours=1),
start_date=datetime(2019, 12, 16))

# hello_world
def hello_world():
    """
    Hello world
    """
    i=0
    subdag = create_subdag('main_dag', str(i), 'db_name'+str(i), 'task_dag',
    main_dag.start_date, main_dag.schedule_interval)
        # import pdb;pdb.set_trace()
    sd_op = SubDagOperator(task_id='task_dag_'+str(i), subdag=subdag, dag=main_dag)
    return subdag


# main task
main_task = PythonOperator(task_id='main_task', python_callable=hello_world, dag=main_dag)
# hello_world()

the output by running the command

airflow test 'main_dag' 'main_task' 2019/12/16

is

(alphavu3711_1) Noamans-MacBook-Pro-2:python3 noamanfaisalbinbadar$ airflow test 'main_dag' 'main_task' 2019/12/16
[2019-12-16 21:56:10,312] {settings.py:252} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=4100
[2019-12-16 21:56:11,119] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-12-16 21:56:11,119] {dagbag.py:92} INFO - Filling up the DagBag from /Users/noamanfaisalbinbadar/code/alphavu/production/python3/fb_messenger_airflow/dags
[2019-12-16 21:56:11,415] {taskinstance.py:630} INFO - Dependencies all met for <TaskInstance: main_dag.main_task 2019-12-16T00:00:00+00:00 [success]>
[2019-12-16 21:56:11,433] {taskinstance.py:630} INFO - Dependencies all met for <TaskInstance: main_dag.main_task 2019-12-16T00:00:00+00:00 [success]>
[2019-12-16 21:56:11,433] {taskinstance.py:841} INFO - 
--------------------------------------------------------------------------------
[2019-12-16 21:56:11,433] {taskinstance.py:842} INFO - Starting attempt 2 of 1
[2019-12-16 21:56:11,433] {taskinstance.py:843} INFO - 
--------------------------------------------------------------------------------
[2019-12-16 21:56:11,433] {taskinstance.py:862} INFO - Executing <Task(PythonOperator): main_task> on 2019-12-16T00:00:00+00:00
[2019-12-16 21:56:11,455] {python_operator.py:105} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=main_dag
AIRFLOW_CTX_TASK_ID=main_task
AIRFLOW_CTX_EXECUTION_DATE=2019-12-16T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2019-12-16T00:00:00+00:00
dummy_task_start_0
get_from_facebook_and_save_to_db_0
save_to_es_fetch_from_db_0
dummy_task_end_0
[2019-12-16 21:56:11,459] {python_operator.py:114} INFO - Done. Returned value was: <DAG: main_dag.task_dag_0>

the new approach after your answer is this

from fb_messenger.airflow_helpers.get_conversation_ids_page_wise import GetConversationIdsPageWise
from fb_messenger.airflow_helpers.get_conversation_messages_info import GetConversationMessagesInfo
from fb_messenger.airflow_helpers.save_to_es import SaveToES
from copy import deepcopy
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
import airflow


main_default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
}

def create_subdag(dag_name, dag_name_prefix, start_date, schedule_interval, conversation_info):
    # dag params
    # import pdb;pdb.set_trace()
    dag_name_processed = '%s_%s' % (dag_name, dag_name_prefix)
    # main default
    default_args_copy = deepcopy(main_default_args)
    subdag = DAG(dag_name_processed, schedule_interval=schedule_interval, start_date=start_date,
                 default_args=deepcopy(main_default_args))
    def sub_dag_method_a(**kwargs):
        """
        sub dag method a
        """
        print('Subdag method a')
        print(kwargs['conversation_id'])
        print(kwargs['updated_time'])
        return 'a'

    def sub_dag_method_b(**kwargs):
        """
        sub dag method a
        """
        print('Subdag method b')
        print(kwargs['conversation_id'])
        print(kwargs['updated_time'])
        return 'b'

    with subdag:
    # operators
        tid_check = 'dummy_task_start_%s' % dag_name_prefix
        # print(tid_check)
        method_start = DummyOperator(task_id=tid_check, dag=subdag)
        # new tid
        tid_check = 'get_from_facebook_and_save_to_db_%s' % dag_name_prefix
        # print(tid_check)
        method_a = PythonOperator(task_id=tid_check, dag=subdag, python_callable=sub_dag_method_a,
                                op_kwargs={'conversation_id':conversation_info['id'], 
                                'updated_time':conversation_info['updated_time']})
        # new tid
        tid_check = 'save_to_es_fetch_from_db_%s' % dag_name_prefix
        # print(tid_check)
        method_b = PythonOperator(task_id=tid_check, dag=subdag, python_callable=sub_dag_method_b,
                                op_kwargs={'conversation_id':conversation_info['id'], 
                                'updated_time':conversation_info['updated_time']})
        # new tid
        tid_check = 'dummy_task_end_%s' % dag_name_prefix
        # print(tid_check)
        method_end = DummyOperator(task_id=tid_check, dag=subdag)
        # dependencies
        method_start >> method_a
        method_a >> method_b
        method_b >> method_end
    # return subdag
    return subdag

start_date_ = datetime.now() + timedelta(minutes=-1)
# getting list of dictionaries
conversation_infos = GetConversationIdsPageWise().get_all()
print(conversation_infos)
print(len(conversation_infos))
for conversation_info in conversation_infos:
    print(conversation_info)
    i = conversation_info['id']
    subdag_name="main_dag"
    sub_dag = create_subdag(subdag_name, str(i), start_date_, timedelta(minutes=2), conversation_info)
    print(sub_dag)


But I am unable to create multiple Dags even

Leave a Comment