Workflows are a new feature in ⟐ Cloudflare‘s developer platform. You can use workflows to safely execute a series of steps as defined by code.
Let’s start by creating a new workflow.
$ npm create cloudflare@latest workflows-starter -- --template "cloudflare/workflows-starter"
Inside the workflows-starter
directory, src/index.ts
defines two top-level exports: the workflow entrypoint, as well as a default module that handles HTTP requests:
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
type Env = {
// Add your bindings here, e.g. Workers KV, D1, Workers AI, etc.
MY_WORKFLOW: Workflow;
};
// User-defined params passed to your workflow
type Params = {
email: string;
metadata: Record<string, string>;
};
// Workflow entrypoint
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
// Can access bindings on `this.env`
// Can access params on `event.payload`
const files = await step.do('my first step', async () => {
// Fetch a list of files from $SOME_SERVICE
return {
inputParams: event,
files: [
'doc_7392_rev3.pdf',
'report_x29_final.pdf',
'memo_2024_05_12.pdf',
'file_089_update.pdf',
'proj_alpha_v2.pdf',
'data_analysis_q2.pdf',
'notes_meeting_52.pdf',
'summary_fy24_draft.pdf',
],
};
});
const apiResponse = await step.do('some other step', async () => {
let resp = await fetch('https://api.cloudflare.com/client/v4/ips');
return await resp.json<any>();
});
await step.sleep('wait on something', '1 minute');
await step.do(
'make a call to write that could maybe, just might, fail',
// Define a retry strategy
{
retries: {
limit: 5,
delay: '5 second',
backoff: 'exponential',
},
timeout: '15 minutes',
},
async () => {
// Do stuff here, with access to the state from our previous steps
if (Math.random() > 0.5) {
throw new Error('API call to $STORAGE_SYSTEM failed');
}
},
);
}
}
export default {
async fetch(req: Request, env: Env): Promise<Response> {
let url = new URL(req.url);
if (url.pathname.startsWith('/favicon')) {
return Response.json({}, { status: 404 });
}
// Get the status of an existing instance, if provided
let id = url.searchParams.get('instanceId');
if (id) {
let instance = await env.MY_WORKFLOW.get(id);
return Response.json({
status: await instance.status(),
});
}
// Spawn a new instance and return the ID and status
let instance = await env.MY_WORKFLOW.create();
return Response.json({
id: instance.id,
details: await instance.status(),
});
},
};
What is a workflow?
A workflow is a class that extends WorkflowEntrypoint
. It has access to env
, which contains the bindings for the Workers application. It can also accept (typed) parameters used to instantiate the workflow.
A workflow is comprised of steps. You can call step.do
to execute a step:
await step.do('do something', async () => {
return 'OK, done!';
});
Steps should be awaited, as they are asynchronous. You can return a value from the step, and capture it as a variable:
const result = await step.do('do something', async () => {
return 'OK, done!';
});
You can use step.sleep
to pause the workflow for a period of time:
await step.sleep('wait on something', '1 minute');
The second parameter is a duration-style string, such as '1 minute'
, '5 seconds'
, or '1 year'
.
All steps are retried by default - see the retry steps section in the docs for more details. You can override the default behavior by passing a retries
option to step.do
:
await step.do(
'make a call to write that could maybe, just might, fail',
// Define a retry strategy
{
retries: {
limit: 5,
delay: '5 second',
backoff: 'exponential',
},
timeout: '15 minutes',
},
async () => {
// Do stuff here, with access to the state from our previous steps
if (Math.random() > 0.5) {
throw new Error('API call to $STORAGE_SYSTEM failed');
}
},
);
Building a custom workflow
So far, we’ve looked at the default code used in Cloudflare’s workflow template. Now, we’ll build our own workflow.
Imagine that we want to build a workflow that fetches analytics for a SaaS product we’re building. It will then take those analytics and report them to a private Slack channel. We can break this down into three steps:
- Fetch analytics from our SaaS product
- Transform/format those analytics
- Report those analytics to a private Slack channel
That workflow can be defined like this:
type Env = {
REPORT_ANALYTICS_WORKFLOW: Workflow;
ANALYTICS_ENDPOINT: string;
};
type Params = {
channelId: string;
};
export class ReportAnalyticsWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
const analytics = await step.do('fetch analytics', async () => {
const resp = await fetch(env.ANALYTICS_ENDPOINT);
return await resp.json<any>();
});
const formatted = await step.do('format analytics', async () => {
return formatAnalytics(analytics);
});
await step.do('report analytics', async () => {
const resp = await fetch('https://slack.com/api/chat.postMessage', {
method: 'POST',
body: JSON.stringify({
channel: event.payload.channelId,
text: JSON.stringify(formatted),
}),
});
if (!resp.ok) {
throw new Error('Failed to report analytics');
}
});
}
}
Although it’s deceptively simple, this workflow as defined is quite powerful. If any step fails, it will retry the workflow using exponential backoff, starting from that step. There’s minimal try/catch
style code in this example, yet it still handles errors gracefully.
Calling workflows
Now that we’ve built a workflow, let’s explore how to execute it. In src/index.ts
, we can define and export a default module that handles HTTP requests. Inside of that handler function, we can call the workflow by using the create
function defined on our workflow:
export default {
async fetch(req: Request, env: Env): Promise<Response> {
const channelId = req.url.searchParams.get('channelId') || 'C123456789';
await env.REPORT_ANALYTICS_WORKFLOW.create({ channelId });
}
}
Where does env.REPORT_ANALYTICS_WORKFLOW
come from? It’s defined in wrangler.toml
:
#:schema node_modules/wrangler/config-schema.json
name = "workflows-starter"
main = "src/index.ts"
compatibility_date = "2024-10-22"
[observability]
enabled = true
head_sampling_rate = 1 # optional. default = 1.
[[workflows]]
name = "report-analytics"
binding = "REPORT_ANALYTICS_WORKFLOW"
class_name = "ReportAnalyticsWorkflow"
By defining the workflow in wrangler.toml
, we can access it from our code.
It’s useful to call workflows manually, but we can also call them using a scheduled
trigger. For an analytics workflow, we may want to run it every day at a certain time. We can do this by defining a triggers
block in wrangler.toml
:
#:schema node_modules/wrangler/config-schema.json
name = "workflows-starter"
main = "src/index.ts"
compatibility_date = "2024-10-22"
[observability]
enabled = true
head_sampling_rate = 1 # optional. default = 1.
[[workflows]]
name = "report-analytics"
binding = "REPORT_ANALYTICS_WORKFLOW"
class_name = "ReportAnalyticsWorkflow"
[triggers]
# Run the workflow every day at 12:00 AM
crons = ["0 * * * *"]
With the cron trigger enabled, we can add a new function scheduled
to our src/index.ts
file:
export default {
async fetch(req: Request, env: Env): Promise<Response> {
const channelId = req.url.searchParams.get('channelId') || 'C123456789';
await env.REPORT_ANALYTICS_WORKFLOW.create({ channelId });
}
async scheduled(event, env, ctx) {
ctx.waitUntil(
env.REPORT_ANALYTICS_WORKFLOW.create({
channelId: 'C123456789',
}),
);
},
};
With the scheduled
function defined, the analytics workflow will run every day at 12:00 AM.
Conclusion
I’m quite enjoying playing with Workflows. They’re a great way to reliably execute multi-step code without having to worry about retries, timeouts, or error handling. You can check out the documentation for Workflows to learn more. I also wrote about my ⟐ indexer project a few days ago, which is built on top of Cloudflare Workflows and is fully open-source.