How to orchestrate Dataflow jobs with Cloud Functions

We all know why you’re here, so let’s cut all the BS and get right into the topic. If you need to know more about Dataflow and templates, you can read the documentation or my previous article.

orchestrate

What is the problem we’re trying to solve?

When we need to run multiple Google Dataflow jobs in sequence, we need an orchestration framework to trigger them, and pass relevant parameters into them.

Why not Cloud Composer?

Google Cloud Composer is built on top of Apache Airflow, which can orchestrate different GCP services including Dataflow. But, it needs to have a constantly running instance to do the orchestration. If you don’t have a lot of workflows to be orchestrated, then this can be overkill. It’s the same if you create a small Compute VM or a Cloud Run instance.

So, it’s better to have a serverless solution.

How can we use Cloud Functions?

high-level architecture
Highlevel Architecture

First of all, this solution is only a simple solution for our use-case and NOT an error-prone solution. With that in mind, let me explain the concept behind it.

The Log Sink

Whenever a Dataflow job completes, it pushes a particular log event to Cloud Logging. You can search this by running the below query in Cloud Logging.

resource.type=dataflow_step AND textPayload="Worker pool stopped."

A sample event looks like this.

{
  "textPayload": "Worker pool stopped.",
  "insertId": "hjgykjh7",
  "resource": {
    "type": "dataflow_step",
    "labels": {
      "job_id": "2020-12-03_19_00_51-13252210714040849818",
      "step_id": "",
      "job_name": "my-job-1--1607050850",
      "project_id": "project_id",
      "region": "europe-west4"
    }
  },
  "timestamp": "2020-12-04T03:08:00.912895609Z",
  "severity": "INFO",
  "labels": {
    "dataflow.googleapis.com/job_id": "2020-12-03_19_00_51-13252210714040849818",
    "dataflow.googleapis.com/region": "europe-west4",
    "dataflow.googleapis.com/job_name": "my-job-1--1607050850"
  },
  "logName": "projects/my-project/logs/dataflow.googleapis.com%2Fjob-message",
  "receiveTimestamp": "2020-12-04T03:08:01.635756398Z"
}

We need to implement a mechanism to know which Dataflow job to trigger depending on the current (finished) job.

First we need to create a Cloud Log Sink to sink these logs into a Pub/Sub topic. Then we should create a trigger a Cloud Function from that topic.

resource "google_pubsub_topic" "orchestrator_dataflow_events" {
  name = join("-", concat(["orchestrator-dataflow-events", var.environment, terraform.workspace]))
}

resource "google_logging_project_sink" "dataflow_job_completion_sink" {
  name = join("-", concat(["dataflow-job-completion-sink", var.environment, terraform.workspace]))
  destination = "pubsub.googleapis.com/projects/${var.project}/topics/${google_pubsub_topic.orchestrator_dataflow_events.name}"
  filter = "resource.type=dataflow_step AND textPayload=\"Worker pool stopped.\""
}

resource "google_cloudfunctions_function" "orchestrator_function" {
  ...
  name                  = join("-", concat(["orchestrator", var.environment, terraform.workspace]))
  event_trigger {
    event_type         = "google.pubsub.topic.publish"
    resource           = google_pubsub_topic.orchestrator_dataflow_events.name
  }
  ...
}

IAM issues

If you create the Log Sink using Terraform, there is a high chance it wouldn’t push messages to the Pub/Sub topic. The reason is your project doesn’t have the Service Account required to push logs.

Here is the Stack Overflow question (and the answer) I posted explaining the problem and how to solve it.

The DAG/Workflow

We thought of creating a workflow definition inspired by the AWS Step Function JSON Definitions.

dag = {
    "start": "Start",
    "steps": {
        "Start": {
            "job_name": 'start',
            "function": "start",
            "next": "Step1"
        },
        "Step1": {
            "job_name: "my-job-1",
            "template": "gs://my-bucket/my-job-1/python_command_spec.json",
            "function": "trigger_my_job_1",
            "next": "Step2"
        },
        "Step2": {
            "job_name: "my-job-2",
            "template": "gs://my-bucket/my-job-2/python_command_spec.json",
            "function": "trigger_my_job_2",
            "next": "Step3"
        },
        "Step3": {
            "job_name: "my-job-3",
            "template": "gs://my-bucket/my-job-3/python_command_spec.json",
            "function": "trigger_my_job_3",
            "next": "End"
        },
        "End": {
            "job_name": "end",
            "function": "stop",
        },
    }
}

Each step except Start and End has the template location for the corresponding Flex template. Please read my previous article to know more about how to create one.

The high-level logic

  • We receive a log event of a Dataflow job end (through a Pub/Sub topic).
    • Pub/Sub encodes messages into Base64, so we have to decode it.
  • Decode the message to extract the job name (i.e. my-job-1)
  • Extract the next Step from the DAG (i.e. Step2 is the next step for my-job-1)
  • Trigger the defined function which will populate the required parameters for the new Dataflow job.
  • Trigger the new Dataflow job with the above parameters.

To achieve the above steps, we need a few more auxiliary data sets derived from the dag.

_job_names_to_steps = {step['job_name']: step_name for step_name, step in dag['steps'].items() if 'job_name' in step}
function_names = {step['function'] for step_name, step in dag['steps'].items() if 'function' in step}
import sys
import inspect

functions = {
    name: obj for name, obj in inspect.getmembers(sys.modules[__name__])
    # If the obj is a function, inside the same module/file (to ignore imported ones) 
    # and inside the steps dag
    if inspect.isfunction(obj) and obj.__module__ == __name__ and name in function_names
}

Trigger the next Dataflow job

We can define a function that takes the current (finished) Dataflow job name extract the next job along with its parameters.

SUFFIX_SEPARATOR = "--"

def get_next_job(current_job_name):

    # job name will be suffixed with SUFFIX_SEPARATOR followed by a unique text
    # this is to avoid any warnings from Dataflow API 
    # and also to uniquely identify the jobs.

    current_job_name = current_job_name.split(SUFFIX_SEPARATOR)[0]
    step_name = _job_names_to_steps.get(current_job_name)

    if not step_name:
        print(f"something is off. {current_job_name} was not a recognized job")
        return None

    steps = dag['steps']
    current_step = steps[step_name]
    next_step_name = current_step.get('next')

    if not next_step_name:
        print(f"No next step is defined for {current_step}")
        return None

    next_step = steps[next_step_name]

    template_location = next_step.get('template')
    job_name = f"{next_step['job_name']}{SUFFIX_SEPARATOR}{int(time.time())}"
    next_function = functions[next_step['function']]

    return next_function(job_name=job_name, template_location=template_location)

Now all we need to do is to define the functions to return the parameters for each jobs. We can also have a separate function to return the common parameters for all the jobs.

def trigger_my_job_1(**kwargs):
    parameters = {
        "my_job_1_param_1": "value_1",
        "my_job_1_param_2": "value_2",
    }

    return add_common_params(parameters=parameters, **kwargs)

You need to define this type of functions for all of your steps (with the same used in the DAG). You can optionally use the below function to populate the common parameters for all the jobs.

def add_common_params(**kwargs):
    parameters = kwargs['parameters']
    # Add all the common params to `parameters`
    # Here is an example
    parameters['setup_file'] = '/dataflow/template/setup.py'
    kwargs['parameters'] = parameters
    return kwargs

Then whoever calls get_next_job() function will get all the information it needs to trigger the next Dataflow job. Obviously, the caller MUST take care of the edge cases where either the given job name doesn’t exist in the DAG or the End has reached.

Then, we can easily trigger the Dataflow pipeline inside our code as explained in the Running the pipeline section in my previous article.

dataflow_region = 'europe-west4'
next_job_params = get_next_job(current_job_name, dataflow_region)

request = dataflow.projects().locations().flexTemplates().launch(
    projectId = project,
    location = dataflow_region,
    body = {
        'launch_parameter': {
            'jobName': next_job_params['job_name'],
            'parameters': next_job_params['parameters'],
            'containerSpecGcsPath': next_job_params['template_location'],
        }
    }
)

Tadaaaa! Now you have a Dataflow job orchestration mechanism using Cloud Functions.

Drawbacks of this approach

This is NOT a well-standardized approach, so it definitely has some drawbacks.

Will be triggered for all the Dataflow jobs

When you create the log sink with the simple expression I mentioned above, this will trigger our Cloud Function every time that condition is met. Meaning, it will be triggered for ALL the Dataflow jobs in your project. If you have multiple Dataflow jobs which you don’t need to be in this orchestration, it will still trigger your Cloud Function.

However, we will only consider if the given Dataflow job is in our DAG, so nothing bad will happen, except you will have a small charge for the Cloud Function execution (especially because we have an explicit sleep).

But that’s a drawback we have to live with, unless we add the Dataflow job names (or a pattern) into the Cloud Sink filter expression.

Relying on Cloud Logging logs

We’re solely dependent on the logs pushed to Cloud Logging. If they slightly change the syntax or wordings, this solution wouldn’t work.

And, this log line is pushed not only when a Dataflow job is successful, but also when it’s failed or cancelled. We have to stop triggering the next steps if it was failed or cancelled (if that’s what you want).

Dataflow API delays

To know if the given Dataflow job was successful, we need to call some Dataflow APIs to know if it was succeeded or not.

But, when we call the API immediately after we get the event to the Cloud Function, it complains saying there is no such job. When we talked to the GCP Consultants, they also agreed that there can be a small delay until the API will have the latest updates.

So, what’s the solution you may ask. Well, nothing fancy but wait.

We added a time.sleep(30) at the beginning of our Cloud Function code to give some time for the API to get the latest updates. But how did we come up with 30 seconds? Nothing scientific! We just thought, 30 seconds are enough even for GCP to get their s**t done. 😀

Or, we can implement a retry mechanism.

Extensibility

Well, this is a double-edged sword. Because we write our own orchestrator, we can customize it however we want. If you want to add parallel execution for your Dataflow jobs, it’s still possible by adding more code.

And also on the other hand, because this is not a native solution, we will lack smooth integration with other services. The best example is the delay of the API I explained above.

Summary

If we need to orchestrate Dataflow jobs, we can use Google Cloud Composer which also can orchestrate many other GCP services. If you have a lot more other pipelines to orchestrate, this approach makes more sense.

However, if don’t have a lot of services to orchestrate, having a constantly running instance can be expensive.

Despite all the limitations we’ll face, this is how we used Cloud Functions to orchestrate a set of Dataflow jobs. It gives us immense flexibility to write custom workflows, while also introducing added complexity.

This articles explained only a selected approach we tried, however there can be many other ways to do the same, maybe even in a more efficient way. If you already know such a way, feel free to drop a comment below.

As always, here is a cookie!

cookie
Source: http://objectshowfanonpedia.wikia.com/wiki/File:Bitten_cookie.png

Leave a Reply