This page shows you how to write, deploy, and trigger a pipeline run using an Event-Driven Cloud Function with a Cloud Pub/Sub trigger. Follow these steps:
- Define an ML pipeline using the Kubeflow Pipelines (KFP) SDK and compile it into a YAML file. 
- Upload the compiled pipeline definition to a Cloud Storage bucket. 
- Use Cloud Run functions to create, configure, and deploy a function that's triggered by a new or existing Pub/Sub topic. 
Define and compile a pipeline
Using Kubeflow Pipelines SDK, build a scheduled pipeline and compile it into a YAML file.
Sample hello-world-scheduled-pipeline:
from kfp import compiler
from kfp import dsl
# A simple component that prints and returns a greeting string
@dsl.component
def hello_world(message: str) -> str:
    greeting_str = f'Hello, {message}'
    print(greeting_str)
    return greeting_str
# A simple pipeline that contains a single hello_world task
@dsl.pipeline(
    name='hello-world-scheduled-pipeline')
def hello_world_scheduled_pipeline(greet_name: str):
    hello_world_task = hello_world(greet_name)
# Compile the pipeline and generate a YAML file
compiler.Compiler().compile(pipeline_func=hello_world_scheduled_pipeline,
                            package_path='hello_world_scheduled_pipeline.yaml')
Upload compiled pipeline YAML to Cloud Storage bucket
- Open the Cloud Storage browser in the Google Cloud console. 
- Click the Cloud Storage bucket you created when you configured your project. 
- Using either an existing folder or a new folder, upload your compiled pipeline YAML (in this example - hello_world_scheduled_pipeline.yaml) to the selected folder.
- Click the uploaded YAML file to access the details. Copy the gsutil URI for later use. 
Create a Cloud Run functions with a Pub/Sub trigger
- Visit the Cloud Run functions page in the console. 
- Click the Create function button. 
- In the Basics section, give your function a name (for example - my-scheduled-pipeline-function).
- In the Trigger section, select Cloud Pub/Sub as the Trigger type. 
- In the Select a Cloud Pub/Sub topic list, click Create a topic. 
- In the Create a topic box, give your new topic a name (for example - my-scheduled-pipeline-topic), and select Create topic.
- Leave all other fields as default and click Save to save the Trigger section configuration. 
- Leave all other fields as default and click Next to proceed to the Code section. 
- Under Runtime, select Python 3.7. 
- In Entry point, input "subscribe" (the example code entry point function name). 
- Under Source code, select Inline Editor if it's not already selected. 
- In the - main.pyfile, add in the following code:- import base64 import json from google.cloud import aiplatform PROJECT_ID = 'your-project-id' # <---CHANGE THIS REGION = 'your-region' # <---CHANGE THIS PIPELINE_ROOT = 'your-cloud-storage-pipeline-root' # <---CHANGE THIS def subscribe(event, context): """Triggered from a message on a Cloud Pub/Sub topic. Args: event (dict): Event payload. context (google.cloud.functions.Context): Metadata for the event. """ # decode the event payload string payload_message = base64.b64decode(event['data']).decode('utf-8') # parse payload string into JSON object payload_json = json.loads(payload_message) # trigger pipeline run with payload trigger_pipeline_run(payload_json) def trigger_pipeline_run(payload_json): """Triggers a pipeline run Args: payload_json: expected in the following format: { "pipeline_spec_uri": "<path-to-your-compiled-pipeline>", "parameter_values": { "greet_name": "<any-greet-string>" } } """ pipeline_spec_uri = payload_json['pipeline_spec_uri'] parameter_values = payload_json['parameter_values'] # Create a PipelineJob using the compiled pipeline from pipeline_spec_uri aiplatform.init( project=PROJECT_ID, location=REGION, ) job = aiplatform.PipelineJob( display_name='hello-world-pipeline-cloud-function-invocation', template_path=pipeline_spec_uri, pipeline_root=PIPELINE_ROOT, enable_caching=False, parameter_values=parameter_values ) # Submit the PipelineJob job.submit()- Replace the following: - PROJECT_ID: The Google Cloud project that this pipeline runs in.
- REGION: The region that this pipeline runs in.
- PIPELINE_ROOT: Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored in the pipeline root.
 
- In the - requirements.txtfile, replace the contents with the following package requirements:- google-api-python-client>=1.7.8,<2 google-cloud-aiplatform
- Click deploy to deploy the Function. 
What's next
- Learn more about Google Cloud Pub/Sub.
- Visualize and analyze pipeline results.
- Learn how to create triggers in Cloud Runfrom Pub/Sub events.
- To view code samples for using Pub/Sub, refer to the Google Cloud sample browser.