

# If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream If index < (int(DynamicWorkflow_Group3) - 1):īash_command='touch /home/ec2-user/airflow/thirdGroup_' + str(index) + '.txt', # except the last task will use the BashOperator # I chose to use the PythonOperator for all tasks # You can make this logic anything you'd like ("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3))įor index in range(int(DynamicWorkflow_Group3)): ("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2))įor index in range(int(DynamicWorkflow_Group2)):ĭynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3") Starting_t_downstream(dynamicTask)ĭynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2") ("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1))įor index in range(int(DynamicWorkflow_Group1)): # Used to connect the stream in the event that the range is zeroĭynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1") Os.system('touch /home/ec2-user/airflow/' + str(name) + str(index) + '.txt') ResetTasksStatus('thirdGroup_' + str(i), str(kwargs))ĭef doSomeWork(name, index, *args, **kwargs): Os.system('airflow variables -set DynamicWorkflow_Group3 ' + str(dynamicValue)) ("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue))

("Current DynamicWorkflow_Group3 value is " + str(variableValue)) VariableValue = Variable.get("DynamicWorkflow_Group3") ResetTasksStatus('secondGroup_' + str(i), str(kwargs)) Os.system('airflow variables -set DynamicWorkflow_Group2 ' + str(dynamicValue)) ("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue)) ("Current DynamicWorkflow_Group2 value is " + str(variableValue))

VariableValue = Variable.get("DynamicWorkflow_Group2") # You can set this value dynamically e.g., from a database or a calculation ("Updated state of " + task_id + " is " + str(state)) ("Current state of " + task_id + " is " + str(state)) Ti = TaskInstance(my_task, execution_date) ("Resetting: " + task_id + " " + execution_date)ĭag_folder = conf.get('core', 'DAGS_FOLDER') ("Current DynamicWorkflow_Group1 value is " + str(value))ĭef resetTasksStatus(task_id, execution_date): Value = Variable.get("DynamicWorkflow_Group1") import airflowįrom _operator import PythonOperatorįrom airflow import configuration as confįrom airflow.models import DagBag, TaskInstanceįrom _operator import BashOperator Yes this is possible I've created an example DAG that demonstrates this. I have been contacted by several people looking for a solution. Task Dummy A -|- Task B.3 -|-> Task Dummy BĪs of now this question still does not have a great answer.

Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task Cĭag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator) Surely there is an easier way? Or was Airflow not designed for this? Dag 1 So I do not consider this a viable solution. I don't like this solution because I have to create a blocking ExternalTaskSensor and all the Task B.* will take between 2-24 hours to complete. Each Task B.* will take several hours to compute and cannot be combined. I have an issue where it is impossible to know the number of task B's that will be needed to calculate Task C until Task A has been completed. Would dag triggers work? And if so could you please provide an example. Is there any way in Airflow to create a workflow such that the number of tasks B.* is unknown until completion of Task A? I have looked at subdags but it looks like it can only work with a static set of tasks that have to be determined at Dag creation.
