Monday, October 28, 2024
An introduction to Cloudflare Workflows
Workflows are a new feature in Cloudflare’s developer platform. You can use workflows to safely execute a series of steps as defined by code.
Let’s start by creating a new workflow.
$ npm create cloudflare@latest workflows-starter -- --template "cloudflare/workflows-starter"
Inside the workflows-starter directory, src/index.ts defines two top-level exports: the workflow entrypoint, as well as a default module that handles HTTP requests:
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
type Env = {
  // Add your bindings here, e.g. Workers KV, D1, Workers AI, etc.
  MY_WORKFLOW: Workflow;
};
// User-defined params passed to your workflow
type Params = {
  email: string;
  metadata: Record<string, string>;
};
// Workflow entrypoint
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
  async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
    // Can access bindings on `this.env`
    // Can access params on `event.payload`
    const files = await step.do('my first step', async () => {
      // Fetch a list of files from $SOME_SERVICE
      return {
        inputParams: event,
        files: [
          'doc_7392_rev3.pdf',
          'report_x29_final.pdf',
          'memo_2024_05_12.pdf',
          'file_089_update.pdf',
          'proj_alpha_v2.pdf',
          'data_analysis_q2.pdf',
          'notes_meeting_52.pdf',
          'summary_fy24_draft.pdf',
        ],
      };
    });
    const apiResponse = await step.do('some other step', async () => {
      let resp = await fetch('https://api.cloudflare.com/client/v4/ips');
      return await resp.json<any>();
    });
    await step.sleep('wait on something', '1 minute');
    await step.do(
      'make a call to write that could maybe, just might, fail',
      // Define a retry strategy
      {
        retries: {
          limit: 5,
          delay: '5 second',
          backoff: 'exponential',
        },
        timeout: '15 minutes',
      },
      async () => {
        // Do stuff here, with access to the state from our previous steps
        if (Math.random() > 0.5) {
          throw new Error('API call to $STORAGE_SYSTEM failed');
        }
      },
    );
  }
}
export default {
  async fetch(req: Request, env: Env): Promise<Response> {
    let url = new URL(req.url);
    if (url.pathname.startsWith('/favicon')) {
      return Response.json({}, { status: 404 });
    }
    // Get the status of an existing instance, if provided
    let id = url.searchParams.get('instanceId');
    if (id) {
      let instance = await env.MY_WORKFLOW.get(id);
      return Response.json({
        status: await instance.status(),
      });
    }
    // Spawn a new instance and return the ID and status
    let instance = await env.MY_WORKFLOW.create();
    return Response.json({
      id: instance.id,
      details: await instance.status(),
    });
  },
};
What is a workflow?
A workflow is a class that extends WorkflowEntrypoint. It has access to env, which contains the bindings for the Workers application. It can also accept (typed) parameters used to instantiate the workflow.
A workflow is comprised of steps. You can call step.do to execute a step:
await step.do('do something', async () => {
  return 'OK, done!';
});
Steps should be awaited, as they are asynchronous. You can return a value from the step, and capture it as a variable:
const result = await step.do('do something', async () => {
  return 'OK, done!';
});
You can use step.sleep to pause the workflow for a period of time:
await step.sleep('wait on something', '1 minute');
The second parameter is a duration-style string, such as '1 minute', '5 seconds', or '1 year'.
All steps are retried by default - see the retry steps section in the docs for more details. You can override the default behavior by passing a retries option to step.do:
await step.do(
  'make a call to write that could maybe, just might, fail',
  // Define a retry strategy
  {
    retries: {
      limit: 5,
      delay: '5 second',
      backoff: 'exponential',
    },
    timeout: '15 minutes',
  },
  async () => {
    // Do stuff here, with access to the state from our previous steps
    if (Math.random() > 0.5) {
      throw new Error('API call to $STORAGE_SYSTEM failed');
    }
  },
);
Building a custom workflow
So far, we’ve looked at the default code used in Cloudflare’s workflow template. Now, we’ll build our own workflow.
Imagine that we want to build a workflow that fetches analytics for a SaaS product we’re building. It will then take those analytics and report them to a private Slack channel. We can break this down into three steps:
- Fetch analytics from our SaaS product
- Transform/format those analytics
- Report those analytics to a private Slack channel
That workflow can be defined like this:
type Env = {
  REPORT_ANALYTICS_WORKFLOW: Workflow;
  ANALYTICS_ENDPOINT: string;
};
type Params = {
  channelId: string;
};
export class ReportAnalyticsWorkflow extends WorkflowEntrypoint<Env, Params> {
  async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
    const analytics = await step.do('fetch analytics', async () => {
      const resp = await fetch(env.ANALYTICS_ENDPOINT);
      return await resp.json<any>();
    });
    const formatted = await step.do('format analytics', async () => {
      return formatAnalytics(analytics);
    });
    await step.do('report analytics', async () => {
      const resp = await fetch('https://slack.com/api/chat.postMessage', {
        method: 'POST',
        body: JSON.stringify({
          channel: event.payload.channelId,
          text: JSON.stringify(formatted),
        }),
      });
      if (!resp.ok) {
        throw new Error('Failed to report analytics');
      }
    });
  }
}
Although it’s deceptively simple, this workflow as defined is quite powerful. If any step fails, it will retry the workflow using exponential backoff, starting from that step. There’s minimal try/catch style code in this example, yet it still handles errors gracefully.
Calling workflows
Now that we’ve built a workflow, let’s explore how to execute it. In src/index.ts, we can define and export a default module that handles HTTP requests. Inside of that handler function, we can call the workflow by using the create function defined on our workflow:
export default {
  async fetch(req: Request, env: Env): Promise<Response> {
    const channelId = req.url.searchParams.get('channelId') || 'C123456789';
    await env.REPORT_ANALYTICS_WORKFLOW.create({ channelId });
  }
}
Where does env.REPORT_ANALYTICS_WORKFLOW come from? It’s defined in wrangler.toml:
#:schema node_modules/wrangler/config-schema.json
name = "workflows-starter"
main = "src/index.ts"
compatibility_date = "2024-10-22"
[observability]
enabled = true
head_sampling_rate = 1 # optional. default = 1.
[[workflows]]
name = "report-analytics"
binding = "REPORT_ANALYTICS_WORKFLOW"
class_name = "ReportAnalyticsWorkflow"
By defining the workflow in wrangler.toml, we can access it from our code.
It’s useful to call workflows manually, but we can also call them using a scheduled trigger. For an analytics workflow, we may want to run it every day at a certain time. We can do this by defining a triggers block in wrangler.toml:
#:schema node_modules/wrangler/config-schema.json
name = "workflows-starter"
main = "src/index.ts"
compatibility_date = "2024-10-22"
[observability]
enabled = true
head_sampling_rate = 1 # optional. default = 1.
[[workflows]]
name = "report-analytics"
binding = "REPORT_ANALYTICS_WORKFLOW"
class_name = "ReportAnalyticsWorkflow"
[triggers]
# Run the workflow every day at 12:00 AM
crons = ["0 * * * *"]
With the cron trigger enabled, we can add a new function scheduled to our src/index.ts file:
export default {
  async fetch(req: Request, env: Env): Promise<Response> {
    const channelId = req.url.searchParams.get('channelId') || 'C123456789';
    await env.REPORT_ANALYTICS_WORKFLOW.create({ channelId });
  }
  async scheduled(event, env, ctx) {
    ctx.waitUntil(
      env.REPORT_ANALYTICS_WORKFLOW.create({
        channelId: 'C123456789',
      }),
    );
  },
};
With the scheduled function defined, the analytics workflow will run every day at 12:00 AM.
Conclusion
I’m quite enjoying playing with Workflows. They’re a great way to reliably execute multi-step code without having to worry about retries, timeouts, or error handling. You can check out the documentation for Workflows to learn more. I also wrote about my indexer project a few days ago, which is built on top of Cloudflare Workflows and is fully open-source.