Dynamic execution of Fargate tasks triggered via AWS Event Bridge

AWS Event Bridge and AWS Fargate: A match made in heaven

Running a command in container on demand is a powerful feature of AWS Fargate. This opens a whole world of possibilities when combined with aws event bridge. Using an EventRule you can match existing aws events or custom events you intend to send to event bridge and set an AWS Fargate task as a target.

A practical Example

To see this in practice we’ll use an example in aws CDK:

First we create a base class for our triggered constructs that is responsible for:

  • Creating the event target fargate task given taskDefinition, securityGroups and containerOverrides . Task definition will let you set memory & cpu to be used, the image, the command to be run and logging. Security Groups define access . Container overrides give you the option to override the configuration and the environment of container, we ‘ll come back to this hidden gem later. Once the ecs task is created it’s assigned as a target to the provided rule.
  • Configuring aws log driver. That’s useful for viewing execution output in AWS cloudwath.
  • Discovering/ maintaining a default ecs cluster of the vpc(for the task to be run in) if we don’t provide a specific cluster to use

Then we create a subclass that simplifies construct creation responsible for constructing task definitions.
We could also have a scheduled task subclass that would create a cron like event rule internally.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
export abstract class TriggeredTaskBase extends Construct {
public readonly cluster: ICluster;

public readonly desiredTaskCount: number;

public readonly eventRule: Rule;

constructor(scope: Construct, id: string, props: TriggeredTaskBaseProps) {
super(scope, id);

this.cluster = props.cluster || this.getDefaultCluster(this, props.vpc);
if (props.desiredTaskCount !== undefined && props.desiredTaskCount < 1) {
throw new Error('You must specify a desiredTaskCount greater than 0');
}
this.desiredTaskCount = props.desiredTaskCount || 1;

// An EventRule that describes the event trigger
this.eventRule = props.rule;
Tags.of(this).add('Module', getModuleName(__filename));
}

/**
* Create an ECS task using the task definition provided
* and adds it as a target to the event rule.
*
* @param ecsTaskProps properties of the ecs task
*/
protected addTaskDefinitionToEventTarget({
taskDefinition,
securityGroups,
containerOverrides,
}: {
taskDefinition: TaskDefinition;
securityGroups?: ISecurityGroup[];
containerOverrides?: ContainerOverride[];
}): EcsTask {
// Use the EcsTask as the target of the EventRule
const eventRuleTarget = new EcsTask({
cluster: this.cluster,
taskDefinition,
taskCount: this.desiredTaskCount,
containerOverrides,
securityGroups,
});

this.eventRule.addTarget(eventRuleTarget);

return eventRuleTarget;
}

/**
* Returns the default cluster.
*/
protected getDefaultCluster(scope: Construct, vpc?: IVpc): Cluster {
// magic string to avoid collision with user-defined constructs
const DEFAULT_CLUSTER_ID = `EcsDefaultCluster${vpc ? vpc.node.id : ''}`;
const stack = Stack.of(scope);
return (
(stack.node.tryFindChild(DEFAULT_CLUSTER_ID) as Cluster) ||
new Cluster(stack, DEFAULT_CLUSTER_ID, { vpc })
);
}
protected createAWSLogDriver(prefix: string): AwsLogDriver {
return new AwsLogDriver({ streamPrefix: prefix });
}
}

export class TriggeredFargateTask extends TriggeredTaskBase {
// Create a Task Definition for the container to start
public readonly taskDefinition: TaskDefinition;
public readonly eventTarget: EcsTask;
constructor(scope: Construct, id: string, props: TriggeredFargateTaskProps) {
super(scope, id, props);
if (props.triggeredFargateTaskDefinitionOptions && props.triggeredFargateTaskImageOptions) {
throw new Error(
'You must specify either a triggeredFargateTaskDefinitionOptions or triggeredFargateTaskOptions, not both.'
);
} else if (props.triggeredFargateTaskDefinitionOptions) {
this.taskDefinition = props.triggeredFargateTaskDefinitionOptions.taskDefinition;
} else if (props.triggeredFargateTaskImageOptions) {
const taskImageOptions = props.triggeredFargateTaskImageOptions;
this.taskDefinition = new FargateTaskDefinition(this, 'TriggeredTaskDef', {
taskRole: props.taskRole,
memoryLimitMiB: taskImageOptions.memoryLimitMiB || 512,
cpu: taskImageOptions.cpu || 256,
});
const { logDriver, ...imageOptions } = taskImageOptions;
this.taskDefinition.addContainer(props.containerName, {
...imageOptions,
logging: logDriver !== undefined ? logDriver : this.createAWSLogDriver(this.node.id),
});
} else {
throw new Error('You must specify one of: taskDefinition or image');
}

this.eventTarget = this.addTaskDefinitionToEventTarget({
taskDefinition: this.taskDefinition,
securityGroups: props.securityGroups,
containerOverrides: props.containerOverrides,
});
}
}

Until now, it all seems straight forward. We can now create our task. In our example we run a database migration task:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
new TriggeredFargateTask(this, `my-proj ManagementTask`, {
cluster,
taskRole: imageOptions.taskRole,
containerName,
securityGroups,
triggeredFargateTaskImageOptions: {
...imageOptions,
cpu: 2048,
memoryLimitMiB: 4096,
command: ['sh', '-c', 'python manage.py migrate'],
},
rule: new Rule(this, `my-proj Management Trigger`, {
eventPattern: {
detail: {
subject: ['execute'],
},
detailType: ['Management'],
source: [`com.corp.env.my-proj.manage`],
},
}),
});

Now if you wanted to run another command using the same image, you would create another task and so on and so forth.

The one task to rule them all

If you are familiar with django(the above example demonstrates executing django management commands) you will know that there is a plethora of commands that you would want to run and creating a task for each command would require considerable amount of effort. The other issue is that some commands accept parameters which would be impractical to be hardcoded in task definition. Facing these issues I embarked on a journey to create the one task to rule them all.

Container Overrides to the rescue

The concept was simple. I wanted to provide parameters to the fargate task dynamically when triggering the event bridge event. Although it seemed a common use case, most of the solutions I found where suggesting setting up an sqs queue that would accept messages containing the parameters consumed at execution time, which seemed like an overkill to me. The only way I have found to pass data to the container without involving extra resources was to set environment variables from event bridge event payload via container overrides(using input transformer behavior). I actually found this by digging in some github tickets rather than finding some reference solution, neither the docs where clear on how to use the constructs to reach to a solution. Here is our revised example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
new TriggeredFargateTask(this, `my-proj ManagementTask`, {
cluster,
taskRole: imageOptions.taskRole,
containerName,
securityGroups,
triggeredFargateTaskImageOptions: {
...imageOptions,
cpu: 2048,
memoryLimitMiB: 4096,
command: ['sh', '-c', 'python manage.py $MANAGEMENT_COMMAND'],
},
rule: new Rule(this, `my-proj Management Trigger`, {
eventPattern: {
detail: {
subject: ['execute'],
},
detailType: ['Management'],
source: [`com.corp.env.my-proj.manage`],
},
}),
containerOverrides: [
{
containerName,
environment: [
{
name: 'MANAGEMENT_COMMAND',
value: EventField.fromPath('$.detail.command'),
},
],
},
],
});

Notice that the command changed to :

command: ['sh', '-c', 'python manage.py $MANAGEMENT_COMMAND'],

and we added the following override:

1
2
3
4
5
6
7
8
9
10
11
containerOverrides: [
{
containerName,
environment: [
{
name: 'MANAGEMENT_COMMAND',
value: EventField.fromPath('$.detail.command'),
},
],
},
],

The above override uses EventField.fromPath('$.detail.command') to transform the value of detail.command from the event to the value of the environment variable in the container override. Now detail.command attribute of our event can hold any arbitrary command including any parameters we want (the migration to migrate to for example) and will pass it to be executed via the $MANAGEMENT_COMMAND environment variable. This allows us to dynamically specify what we expect to run in the container.
A sample of triggering the above using event bridge event via cdk would be the following:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const eventDetails = [
{
Detail: JSON.stringify({ subject: 'execute', command:'showmigrations myapp'}),
DetailType: 'Management',
Resources: [],
Source: `com.corp.env.my-proj.manage`,
Time: new Date(),
}
];
eventBridge
.putEvents(eventDetails)
.promise()
.then((data) =>
console.log('Successfully triggered django management event details:\n', JSON.stringify(data, null, 4))
)

The above would trigger showing migrations of myapp. To see the output you can check cloudwatch . You would normally accept the command as command line argument or as an argument of a function/method depending on if you are creating a CLI tool or a Library. (typically you would build a library that is used by your cli tool as well)

The above demonstrates how you can be flexible when emitting orchestration commands as events but could also be used in a choreographic scenario(system event for example) where the consuming target gets some configuration about execution from an event passed as environment variable.

  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.
  • Copyrights © 2012-2023 Andreas Galazis

请我喝杯咖啡吧~