An introduction to Cloudflare Workflows
How to get started with a great new feature in Cloudflare's developer platform.
Workflows are a new feature in Cloudflare’s developer platform. You can use workflows to safely execute a series of steps as defined by code.
Let’s start by creating a new workflow.
$ npm create cloudflare@latest workflows-starter -- --template "cloudflare/workflows-starter"
Inside the workflows-starter
directory, src/index.ts
defines two top-level exports: the workflow entrypoint, as well as a default module that handles HTTP requests:
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
type Env = { // Add your bindings here, e.g. Workers KV, D1, Workers AI, etc. MY_WORKFLOW: Workflow;};
// User-defined params passed to your workflowtype Params = { email: string; metadata: Record<string, string>;};
// Workflow entrypointexport class MyWorkflow extends WorkflowEntrypoint<Env, Params> { async run(event: WorkflowEvent<Params>, step: WorkflowStep) { // Can access bindings on `this.env` // Can access params on `event.payload`
const files = await step.do('my first step', async () => { // Fetch a list of files from $SOME_SERVICE return { inputParams: event, files: [ 'doc_7392_rev3.pdf', 'report_x29_final.pdf', 'memo_2024_05_12.pdf', 'file_089_update.pdf', 'proj_alpha_v2.pdf', 'data_analysis_q2.pdf', 'notes_meeting_52.pdf', 'summary_fy24_draft.pdf', ], }; });
const apiResponse = await step.do('some other step', async () => { let resp = await fetch('https://api.cloudflare.com/client/v4/ips'); return await resp.json<any>(); });
await step.sleep('wait on something', '1 minute');
await step.do( 'make a call to write that could maybe, just might, fail', // Define a retry strategy { retries: { limit: 5, delay: '5 second', backoff: 'exponential', }, timeout: '15 minutes', }, async () => { // Do stuff here, with access to the state from our previous steps if (Math.random() > 0.5) { throw new Error('API call to $STORAGE_SYSTEM failed'); } }, ); }}
export default {23 collapsed lines
async fetch(req: Request, env: Env): Promise<Response> { let url = new URL(req.url);
if (url.pathname.startsWith('/favicon')) { return Response.json({}, { status: 404 }); }
// Get the status of an existing instance, if provided let id = url.searchParams.get('instanceId'); if (id) { let instance = await env.MY_WORKFLOW.get(id); return Response.json({ status: await instance.status(), }); }
// Spawn a new instance and return the ID and status let instance = await env.MY_WORKFLOW.create(); return Response.json({ id: instance.id, details: await instance.status(), }); },};
What is a workflow?
A workflow is a class that extends WorkflowEntrypoint
. It has access to env
, which contains the bindings for the Workers application. It can also accept (typed) parameters used to instantiate the workflow.
A workflow is comprised of steps. You can call step.do
to execute a step:
await step.do('do something', async () => { return 'OK, done!';});
Steps should be awaited, as they are asynchronous. You can return a value from the step, and capture it as a variable:
const result = await step.do('do something', async () => { return 'OK, done!';});
You can use step.sleep
to pause the workflow for a period of time:
await step.sleep('wait on something', '1 minute');
The second parameter is a duration-style string, such as '1 minute'
, '5 seconds'
, or '1 year'
.
All steps are retried by default - see the retry steps section in the docs for more details. You can override the default behavior by passing a retries
option to step.do
:
await step.do( 'make a call to write that could maybe, just might, fail', // Define a retry strategy { retries: { limit: 5, delay: '5 second', backoff: 'exponential', }, timeout: '15 minutes', }, async () => { // Do stuff here, with access to the state from our previous steps if (Math.random() > 0.5) { throw new Error('API call to $STORAGE_SYSTEM failed'); } },);
Building a custom workflow
So far, we’ve looked at the default code used in Cloudflare’s workflow template. Now, we’ll build our own workflow.
Imagine that we want to build a workflow that fetches analytics for a SaaS product we’re building. It will then take those analytics and report them to a private Slack channel. We can break this down into three steps:
- Fetch analytics from our SaaS product
- Transform/format those analytics
- Report those analytics to a private Slack channel
That workflow can be defined like this:
type Env = { REPORT_ANALYTICS_WORKFLOW: Workflow; ANALYTICS_ENDPOINT: string;};
type Params = { channelId: string;};
export class ReportAnalyticsWorkflow extends WorkflowEntrypoint<Env, Params> { async run(event: WorkflowEvent<Params>, step: WorkflowStep) { const analytics = await step.do('fetch analytics', async () => { const resp = await fetch(env.ANALYTICS_ENDPOINT); return await resp.json<any>(); });
const formatted = await step.do('format analytics', async () => { return formatAnalytics(analytics); });
await step.do('report analytics', async () => { const resp = await fetch('https://slack.com/api/chat.postMessage', { method: 'POST', body: JSON.stringify({ channel: event.payload.channelId, text: JSON.stringify(formatted), }), });
if (!resp.ok) { throw new Error('Failed to report analytics'); } }); }}
Although it’s deceptively simple, this workflow as defined is quite powerful. If any step fails, it will retry the workflow using exponential backoff, starting from that step. There’s minimal try/catch
style code in this example, yet it still handles errors gracefully.
Calling workflows
Now that we’ve built a workflow, let’s explore how to execute it. In src/index.ts
, we can define and export a default module that handles HTTP requests. Inside of that handler function, we can call the workflow by using the create
function defined on our workflow:
export default { async fetch(req: Request, env: Env): Promise<Response> { const channelId = req.url.searchParams.get('channelId') || 'C123456789'; await env.REPORT_ANALYTICS_WORKFLOW.create({ channelId }); }}
Where does env.REPORT_ANALYTICS_WORKFLOW
come from? It’s defined in wrangler.toml
:
#:schema node_modules/wrangler/config-schema.jsonname = "workflows-starter"main = "src/index.ts"compatibility_date = "2024-10-22"
[observability]enabled = truehead_sampling_rate = 1 # optional. default = 1.
[[workflows]]name = "report-analytics"binding = "REPORT_ANALYTICS_WORKFLOW"class_name = "ReportAnalyticsWorkflow"
By defining the workflow in wrangler.toml
, we can access it from our code.
It’s useful to call workflows manually, but we can also call them using a scheduled
trigger. For an analytics workflow, we may want to run it every day at a certain time. We can do this by defining a triggers
block in wrangler.toml
:
8 collapsed lines
#:schema node_modules/wrangler/config-schema.jsonname = "workflows-starter"main = "src/index.ts"compatibility_date = "2024-10-22"
[observability]enabled = truehead_sampling_rate = 1 # optional. default = 1.
[[workflows]]name = "report-analytics"binding = "REPORT_ANALYTICS_WORKFLOW"class_name = "ReportAnalyticsWorkflow"
[triggers]# Run the workflow every day at 12:00 AMcrons = ["0 * * * *"]
With the cron trigger enabled, we can add a new function scheduled
to our src/index.ts
file:
export default {5 collapsed lines
async fetch(req: Request, env: Env): Promise<Response> { const channelId = req.url.searchParams.get('channelId') || 'C123456789'; await env.REPORT_ANALYTICS_WORKFLOW.create({ channelId }); }
async scheduled(event, env, ctx) { ctx.waitUntil( env.REPORT_ANALYTICS_WORKFLOW.create({ channelId: 'C123456789', }), ); },};
With the scheduled
function defined, the analytics workflow will run every day at 12:00 AM.
Conclusion
I’m quite enjoying playing with Workflows. They’re a great way to reliably execute multi-step code without having to worry about retries, timeouts, or error handling. You can check out the documentation for Workflows to learn more. I also wrote about my indexer project a few days ago, which is built on top of Cloudflare Workflows and is fully open-source.