Missing mandatory dag argument from task instantiations task1 = BashOperator(Ī dagling PythonOperator with a callable which json.dumps Variable that is solving no purpose (unless i misunderstood you code / intent here, remove it completely) PythonOperator( If my_var_dict:īranch_task = BranchPythonOperator(task_id='branch_task', # decide which branch to take based on insurance flag My_var_dict = Variable.get('my_var_name', deserialize_json=True) instead you can leverage that BranchPythonOperator in right way to move that Variable reading on runtime (when DAG / tasks will be actually run) rather than Dag generation time (when dag-file is parsed by Airflow and DAG is generated on webserver) here is the code for that (and you should do away with that if-else block completely).this not just unnecessarily overloads your SQLAlchemy backend meta-db, but also slows down parser (in extreme case can lead of DagBag timeout if parsing starts taking too long).above alone should be enough to fix your code, may i offer you a suggestion for improvement: having a Variable being read in dag definition file means a SQL query being fired by Airflow's SQLAlchemy ORM very frequently in background (every cycle of continuously parsing dag-definition file) That ways, the unnecessary tasks won't be created (and hence won't run) The correct way instead is to put both task instantiation (creation of PythonOperator taskn object) as well as task wiring within that if. task6) are ALWAYS created (and hence they will always run, irrespective of insurance_flag) just their inter-task dependency is set in accordance with insurance_flag The way your file wires tasks together creates several problemsĪll 6 tasks ( task1. The dag-definition-file is continuously parsed by Airflow in background and the generated DAGs & tasks are picked by scheduler. The code is given below: from airflow import DAGįrom _operator import BashOperatorįrom airflow.operators import PythonOperatorĭag = DAG('DAG_NAME',default_args=default_args,schedule_interval=None,max_active_runs=5, start_date=datetime(2020, 8, 4)) Since am a newbie to this i dont have much idea about the usage of PythonOperator & BranchPythonOperator. If the value of the variable ' insurance' is " true" then task1, task2, task3 need to run else task4, task5, task6 need to run. I need to run the tasks based on the value of a variable in the input json.
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |