Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lambdas/functions/control-plane/src/aws/runners.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
DefaultTargetCapacityType,
FleetOnDemandAllocationStrategy,
InstanceRequirementsRequest,
SpotAllocationStrategy,
_InstanceType,
Expand Down Expand Up @@ -59,9 +60,10 @@ export interface RunnerInputParameters {
launchTemplateName: string;
ec2instanceCriteria: {
instanceTypes: string[];
instanceTypePriorities?: Record<string, number>;
targetCapacityType: DefaultTargetCapacityType;
maxSpotPrice?: string;
instanceAllocationStrategy: SpotAllocationStrategy;
instanceAllocationStrategy: SpotAllocationStrategy | FleetOnDemandAllocationStrategy;
};
ec2OverrideConfig?: Ec2OverrideConfig;
numberOfRunners: number;
Expand Down
107 changes: 98 additions & 9 deletions lambdas/functions/control-plane/src/aws/runners.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
DescribeInstancesCommand,
type DescribeInstancesResult,
EC2Client,
FleetOnDemandAllocationStrategy,
RunInstancesCommand,
SpotAllocationStrategy,
TerminateInstancesCommand,
Expand Down Expand Up @@ -390,11 +391,71 @@ describe('create runner', () => {
});

it('calls create fleet of 1 instance with the on-demand capacity', async () => {
await createRunner(createRunnerConfig({ ...defaultRunnerConfig, capacityType: 'on-demand' }));
await createRunner(
createRunnerConfig({ ...defaultRunnerConfig, capacityType: 'on-demand', allocationStrategy: 'lowest-price' }),
);
expect(mockEC2Client).toHaveReceivedCommandWith(CreateFleetCommand, {
...expectedCreateFleetRequest({
...defaultExpectedFleetRequestValues,
capacityType: 'on-demand',
allocationStrategy: 'lowest-price',
}),
});
});

it('calls create fleet with on-demand capacity and prioritized allocation strategy', async () => {
await createRunner(
createRunnerConfig({
...defaultRunnerConfig,
capacityType: 'on-demand',
allocationStrategy: FleetOnDemandAllocationStrategy.PRIORITIZED,
}),
);
expect(mockEC2Client).toHaveReceivedCommandWith(CreateFleetCommand, {
...expectedCreateFleetRequest({
...defaultExpectedFleetRequestValues,
capacityType: 'on-demand',
allocationStrategy: FleetOnDemandAllocationStrategy.PRIORITIZED,
}),
});
});

it('calls create fleet with custom instance type priorities', async () => {
const priorities = { 'm5.large': 10, 'c5.large': 5 };
await createRunner(
createRunnerConfig({
...defaultRunnerConfig,
capacityType: 'on-demand',
allocationStrategy: FleetOnDemandAllocationStrategy.PRIORITIZED,
instanceTypePriorities: priorities,
}),
);
expect(mockEC2Client).toHaveReceivedCommandWith(CreateFleetCommand, {
...expectedCreateFleetRequest({
...defaultExpectedFleetRequestValues,
capacityType: 'on-demand',
allocationStrategy: FleetOnDemandAllocationStrategy.PRIORITIZED,
instanceTypePriorities: priorities,
}),
});
});

it('calls create fleet with spot capacity-optimized-prioritized and instance type priorities', async () => {
const priorities = { 'm5.large': 10, 'c5.large': 5 };
await createRunner(
createRunnerConfig({
...defaultRunnerConfig,
capacityType: 'spot',
allocationStrategy: SpotAllocationStrategy.CAPACITY_OPTIMIZED_PRIORITIZED,
instanceTypePriorities: priorities,
}),
);
expect(mockEC2Client).toHaveReceivedCommandWith(CreateFleetCommand, {
...expectedCreateFleetRequest({
...defaultExpectedFleetRequestValues,
capacityType: 'spot',
allocationStrategy: SpotAllocationStrategy.CAPACITY_OPTIMIZED_PRIORITIZED,
instanceTypePriorities: priorities,
}),
});
});
Expand Down Expand Up @@ -841,12 +902,13 @@ describe('create runner with errors fail over to OnDemand', () => {
}),
});

// second call with with OnDemand fallback
// second call with with OnDemand fallback, allocation strategy defaults to lowest-price
expect(mockEC2Client).toHaveReceivedNthCommandWith(2, CreateFleetCommand, {
...expectedCreateFleetRequest({
...defaultExpectedFleetRequestValues,
totalTargetCapacity: 1,
capacityType: 'on-demand',
allocationStrategy: 'lowest-price',
}),
});
});
Expand Down Expand Up @@ -883,12 +945,13 @@ describe('create runner with errors fail over to OnDemand', () => {
}),
});

// second call with with OnDemand failback, capacity is reduced by 1
// second call with with OnDemand failback, capacity is reduced by 1, allocation strategy defaults to lowest-price
expect(mockEC2Client).toHaveReceivedNthCommandWith(2, CreateFleetCommand, {
...expectedCreateFleetRequest({
...defaultExpectedFleetRequestValues,
totalTargetCapacity: 1,
capacityType: 'on-demand',
allocationStrategy: 'lowest-price',
}),
});
});
Expand Down Expand Up @@ -958,7 +1021,8 @@ function createFleetMockWithWithOnDemandFallback(errors: string[], instances?: s
interface RunnerConfig {
type: RunnerType;
capacityType: DefaultTargetCapacityType;
allocationStrategy: SpotAllocationStrategy;
allocationStrategy: SpotAllocationStrategy | FleetOnDemandAllocationStrategy;
instanceTypePriorities?: Record<string, number>;
maxSpotPrice?: string;
amiIdSsmParameterName?: string;
tracingEnabled?: boolean;
Expand All @@ -977,6 +1041,7 @@ function createRunnerConfig(runnerConfig: RunnerConfig): RunnerInputParameters {
launchTemplateName: LAUNCH_TEMPLATE,
ec2instanceCriteria: {
instanceTypes: ['m5.large', 'c5.large'],
instanceTypePriorities: runnerConfig.instanceTypePriorities,
targetCapacityType: runnerConfig.capacityType,
maxSpotPrice: runnerConfig.maxSpotPrice,
instanceAllocationStrategy: runnerConfig.allocationStrategy,
Expand All @@ -994,7 +1059,8 @@ function createRunnerConfig(runnerConfig: RunnerConfig): RunnerInputParameters {
interface ExpectedFleetRequestValues {
type: 'Repo' | 'Org';
capacityType: DefaultTargetCapacityType;
allocationStrategy: SpotAllocationStrategy;
allocationStrategy: SpotAllocationStrategy | FleetOnDemandAllocationStrategy;
instanceTypePriorities?: Record<string, number>;
maxSpotPrice?: string;
totalTargetCapacity: number;
imageId?: string;
Expand All @@ -1016,6 +1082,9 @@ function expectedCreateFleetRequest(expectedValues: ExpectedFleetRequestValues):
const traceId = tracer.getRootXrayTraceId();
tags.push({ Key: 'ghr:trace_id', Value: traceId! });
}
const usesPriority =
expectedValues.allocationStrategy === 'prioritized' ||
expectedValues.allocationStrategy === 'capacity-optimized-prioritized';
const request: CreateFleetCommandInput = {
LaunchTemplateConfigs: [
{
Expand All @@ -1027,26 +1096,46 @@ function expectedCreateFleetRequest(expectedValues: ExpectedFleetRequestValues):
{
InstanceType: 'm5.large',
SubnetId: 'subnet-123',
...(usesPriority && {
Priority: expectedValues.instanceTypePriorities?.['m5.large'] ?? 0,
}),
},
{
InstanceType: 'c5.large',
SubnetId: 'subnet-123',
...(usesPriority && {
Priority: expectedValues.instanceTypePriorities?.['c5.large'] ?? 1,
}),
},
{
InstanceType: 'm5.large',
SubnetId: 'subnet-456',
...(usesPriority && {
Priority: expectedValues.instanceTypePriorities?.['m5.large'] ?? 0,
}),
},
{
InstanceType: 'c5.large',
SubnetId: 'subnet-456',
...(usesPriority && {
Priority: expectedValues.instanceTypePriorities?.['c5.large'] ?? 1,
}),
},
],
},
],
SpotOptions: {
AllocationStrategy: expectedValues.allocationStrategy,
MaxTotalPrice: expectedValues.maxSpotPrice,
},
...(expectedValues.capacityType === 'spot'
? {
SpotOptions: {
AllocationStrategy: expectedValues.allocationStrategy,
MaxTotalPrice: expectedValues.maxSpotPrice,
},
}
: {
OnDemandOptions: {
AllocationStrategy: expectedValues.allocationStrategy,
},
}),
TagSpecifications: [
{
ResourceType: 'instance',
Expand Down
72 changes: 65 additions & 7 deletions lambdas/functions/control-plane/src/aws/runners.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {
RunInstancesCommand,
EC2Client,
FleetLaunchTemplateOverridesRequest,
FleetOnDemandAllocationStrategy,
SpotAllocationStrategy,
Tag,
TerminateInstancesCommand,
_InstanceType,
Expand Down Expand Up @@ -122,11 +124,37 @@ export async function untag(instanceId: string, tags: Tag[]): Promise<void> {
await ec2.send(new DeleteTagsCommand({ Resources: [instanceId], Tags: tags }));
}

const SPOT_ALLOCATION_STRATEGIES = [
'lowest-price',
'diversified',
'capacity-optimized',
'capacity-optimized-prioritized',
'price-capacity-optimized',
];
const ON_DEMAND_ALLOCATION_STRATEGIES = ['lowest-price', 'prioritized'];

// The instance_allocation_strategy variable accepts the union of spot and on-demand strategies,
// so a value valid for one capacity type can be invalid for the other. AWS rejects CreateFleet
// when the strategy is not valid for the target capacity type, so fall back to 'lowest-price'
// (the AWS default) when the configured value is invalid for the given capacity type.
function sanitizeAllocationStrategy(
strategy: string,
targetCapacityType: string,
): SpotAllocationStrategy | FleetOnDemandAllocationStrategy {
const validStrategies =
targetCapacityType === 'spot' ? SPOT_ALLOCATION_STRATEGIES : ON_DEMAND_ALLOCATION_STRATEGIES;
return (validStrategies.includes(strategy) ? strategy : 'lowest-price') as
| SpotAllocationStrategy
| FleetOnDemandAllocationStrategy;
}

function generateFleetOverrides(
subnetIds: string[],
instancesTypes: string[],
amiId?: string,
ec2OverrideConfig?: Runners.Ec2OverrideConfig,
allocationStrategy?: string,
instanceTypePriorities?: Record<string, number>,
): FleetLaunchTemplateOverridesRequest[] {
const result: FleetLaunchTemplateOverridesRequest[] = [];

Expand All @@ -135,12 +163,18 @@ function generateFleetOverrides(
const instanceTypesToUse = ec2OverrideConfig?.InstanceType ? [ec2OverrideConfig.InstanceType] : instancesTypes;
const amiIdToUse = ec2OverrideConfig?.ImageId ?? amiId;

// Both the on-demand 'prioritized' and the spot 'capacity-optimized-prioritized' strategies
// honor the Priority field of the launch template overrides.
const usesPriority =
allocationStrategy === 'prioritized' || allocationStrategy === 'capacity-optimized-prioritized';

subnetsToUse.forEach((s) => {
instanceTypesToUse.forEach((i) => {
instanceTypesToUse.forEach((i, index) => {
const item: FleetLaunchTemplateOverridesRequest = {
SubnetId: s,
InstanceType: i as _InstanceType,
ImageId: amiIdToUse,
...(usesPriority && { Priority: instanceTypePriorities?.[i] ?? index }),
...ec2OverrideConfig,
};
result.push(item);
Expand Down Expand Up @@ -205,11 +239,19 @@ async function processFleetResult(
logger.warn(`Create fleet failed, initatiing fall back to on demand instances.`);
logger.debug('Create fleet failed.', { data: fleet.Errors });
const numberOfInstances = runnerParameters.numberOfRunners - instances.length;
const failoverAllocationStrategy = sanitizeAllocationStrategy(
runnerParameters.ec2instanceCriteria.instanceAllocationStrategy,
'on-demand',
);
const instancesOnDemand = await createRunner({
...runnerParameters,
numberOfRunners: numberOfInstances,
onDemandFailoverOnError: ['InsufficientInstanceCapacity'],
ec2instanceCriteria: { ...runnerParameters.ec2instanceCriteria, targetCapacityType: 'on-demand' },
ec2instanceCriteria: {
...runnerParameters.ec2instanceCriteria,
targetCapacityType: 'on-demand',
instanceAllocationStrategy: failoverAllocationStrategy,
},
});
instances.push(...instancesOnDemand);
return instances;
Expand Down Expand Up @@ -269,6 +311,12 @@ async function createInstances(
tags.push({ Key: 'ghr:trace_id', Value: traceId! });
}

const targetCapacityType = runnerParameters.ec2instanceCriteria.targetCapacityType;
const allocationStrategy = sanitizeAllocationStrategy(
runnerParameters.ec2instanceCriteria.instanceAllocationStrategy,
targetCapacityType,
);

let fleet: CreateFleetResult;
try {
// see for spec https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_CreateFleet.html
Expand All @@ -284,16 +332,26 @@ async function createInstances(
runnerParameters.ec2instanceCriteria.instanceTypes,
amiIdOverride,
runnerParameters.ec2OverrideConfig,
allocationStrategy,
runnerParameters.ec2instanceCriteria.instanceTypePriorities,
),
},
],
SpotOptions: {
MaxTotalPrice: runnerParameters.ec2instanceCriteria.maxSpotPrice,
AllocationStrategy: runnerParameters.ec2instanceCriteria.instanceAllocationStrategy,
},
...(targetCapacityType === 'spot'
? {
SpotOptions: {
MaxTotalPrice: runnerParameters.ec2instanceCriteria.maxSpotPrice,
AllocationStrategy: allocationStrategy as SpotAllocationStrategy,
},
}
: {
OnDemandOptions: {
AllocationStrategy: allocationStrategy as FleetOnDemandAllocationStrategy,
},
}),
TargetCapacitySpecification: {
TotalTargetCapacity: runnerParameters.numberOfRunners,
DefaultTargetCapacityType: runnerParameters.ec2instanceCriteria.targetCapacityType,
DefaultTargetCapacityType: targetCapacityType,
},
TagSpecifications: [
{
Expand Down
3 changes: 2 additions & 1 deletion lambdas/functions/control-plane/src/modules.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ declare namespace NodeJS {
| 'price-capacity-optimized'
| 'diversified'
| 'capacity-optimized'
| 'capacity-optimized-prioritized';
| 'capacity-optimized-prioritized'
| 'prioritized';
}
}
4 changes: 4 additions & 0 deletions lambdas/functions/control-plane/src/pool/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ export async function adjust(event: PoolEvent): Promise<void> {
const launchTemplateName = process.env.LAUNCH_TEMPLATE_NAME;
const instanceMaxSpotPrice = process.env.INSTANCE_MAX_SPOT_PRICE;
const instanceAllocationStrategy = process.env.INSTANCE_ALLOCATION_STRATEGY || 'lowest-price'; // same as AWS default
const instanceTypePriorities = process.env.INSTANCE_TYPE_PRIORITIES
? (JSON.parse(process.env.INSTANCE_TYPE_PRIORITIES) as Record<string, number>)
: undefined;
const runnerOwner = process.env.RUNNER_OWNER;
const amiIdSsmParameterName = process.env.AMI_ID_SSM_PARAMETER_NAME;
const tracingEnabled = yn(process.env.POWERTOOLS_TRACE_ENABLED, { default: false });
Expand Down Expand Up @@ -92,6 +95,7 @@ export async function adjust(event: PoolEvent): Promise<void> {
{
ec2instanceCriteria: {
instanceTypes,
instanceTypePriorities,
targetCapacityType: instanceTargetCapacityType,
maxSpotPrice: instanceMaxSpotPrice,
instanceAllocationStrategy: instanceAllocationStrategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,9 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
const launchTemplateName = process.env.LAUNCH_TEMPLATE_NAME;
const instanceMaxSpotPrice = process.env.INSTANCE_MAX_SPOT_PRICE;
const instanceAllocationStrategy = process.env.INSTANCE_ALLOCATION_STRATEGY || 'lowest-price'; // same as AWS default
const instanceTypePriorities = process.env.INSTANCE_TYPE_PRIORITIES
? (JSON.parse(process.env.INSTANCE_TYPE_PRIORITIES) as Record<string, number>)
: undefined;
const enableJobQueuedCheck = yn(process.env.ENABLE_JOB_QUEUED_CHECK, { default: true });
const amiIdSsmParameterName = process.env.AMI_ID_SSM_PARAMETER_NAME;
const runnerNamePrefix = process.env.RUNNER_NAME_PREFIX || '';
Expand Down Expand Up @@ -575,6 +578,7 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
{
ec2instanceCriteria: {
instanceTypes,
instanceTypePriorities,
targetCapacityType: instanceTargetCapacityType,
maxSpotPrice: instanceMaxSpotPrice,
instanceAllocationStrategy: instanceAllocationStrategy,
Expand Down
Loading