Skip to content

Commit f50716f

Browse files
committed
Add application logic for Schedule and Chain Policy within the autoscaler
1 parent 840fa8b commit f50716f

File tree

3 files changed

+95
-8
lines changed

3 files changed

+95
-8
lines changed

pkg/fleetautoscalers/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ func (c *Controller) syncFleetAutoscaler(ctx context.Context, key string) error
313313
}
314314

315315
currentReplicas := fleet.Status.Replicas
316-
desiredReplicas, scalingLimited, err := computeDesiredFleetSize(fas, fleet, c.gameServerLister, c.counter.Counts())
316+
desiredReplicas, scalingLimited, err := computeDesiredFleetSize(fas.Spec.Policy, fleet, c.gameServerLister, c.counter.Counts())
317317
if err != nil {
318318
c.recorder.Eventf(fas, corev1.EventTypeWarning, "FleetAutoscaler",
319319
"Error calculating desired fleet size on FleetAutoscaler %s. Error: %s", fas.ObjectMeta.Name, err.Error())

pkg/fleetautoscalers/fleetautoscalers.go

Lines changed: 93 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"time"
3030

3131
"github.com/pkg/errors"
32+
"github.com/robfig/cron/v3"
3233
"k8s.io/apimachinery/pkg/util/intstr"
3334
"k8s.io/apimachinery/pkg/util/uuid"
3435

@@ -50,17 +51,21 @@ var client = http.Client{
5051
}
5152

5253
// computeDesiredFleetSize computes the new desired size of the given fleet
53-
func computeDesiredFleetSize(fas *autoscalingv1.FleetAutoscaler, f *agonesv1.Fleet,
54+
func computeDesiredFleetSize(pol autoscalingv1.FleetAutoscalerPolicy, f *agonesv1.Fleet,
5455
gameServerLister listeragonesv1.GameServerLister, nodeCounts map[string]gameservers.NodeCount) (int32, bool, error) {
55-
switch fas.Spec.Policy.Type {
56+
switch pol.Type {
5657
case autoscalingv1.BufferPolicyType:
57-
return applyBufferPolicy(fas.Spec.Policy.Buffer, f)
58+
return applyBufferPolicy(pol.Buffer, f)
5859
case autoscalingv1.WebhookPolicyType:
59-
return applyWebhookPolicy(fas.Spec.Policy.Webhook, f)
60+
return applyWebhookPolicy(pol.Webhook, f)
6061
case autoscalingv1.CounterPolicyType:
61-
return applyCounterOrListPolicy(fas.Spec.Policy.Counter, nil, f, gameServerLister, nodeCounts)
62+
return applyCounterOrListPolicy(pol.Counter, nil, f, gameServerLister, nodeCounts)
6263
case autoscalingv1.ListPolicyType:
63-
return applyCounterOrListPolicy(nil, fas.Spec.Policy.List, f, gameServerLister, nodeCounts)
64+
return applyCounterOrListPolicy(nil, pol.List, f, gameServerLister, nodeCounts)
65+
case autoscalingv1.SchedulePolicyType:
66+
return applySchedulePolicy(pol.Schedule, f, gameServerLister, nodeCounts)
67+
case autoscalingv1.ChainPolicyType:
68+
return applyChainPolicy(pol.Chain, f, gameServerLister, nodeCounts)
6469
}
6570

6671
return 0, false, errors.New("wrong policy type, should be one of: Buffer, Webhook, Counter, List")
@@ -362,6 +367,88 @@ func applyCounterOrListPolicy(c *autoscalingv1.CounterPolicy, l *autoscalingv1.L
362367
return 0, false, errors.Errorf("unable to apply ListPolicy %v", l)
363368
}
364369

370+
func applySchedulePolicy(s *autoscalingv1.SchedulePolicy, f *agonesv1.Fleet, gameServerLister listeragonesv1.GameServerLister, nodeCounts map[string]gameservers.NodeCount) (int32, bool, error) {
371+
// Ensure the scheduled autoscaler feature gate is enabled
372+
if !runtime.FeatureEnabled(runtime.FeatureScheduledAutoscaler) {
373+
return 0, false, errors.Errorf("cannot apply SchedulePolicy unless feature flag %s is enabled", runtime.FeatureScheduledAutoscaler)
374+
}
375+
376+
if isScheduleActive(s) {
377+
return computeDesiredFleetSize(s.Policy, f, gameServerLister, nodeCounts)
378+
}
379+
380+
return f.Status.Replicas, false, nil
381+
}
382+
383+
func applyChainPolicy(c autoscalingv1.ChainPolicy, f *agonesv1.Fleet, gameServerLister listeragonesv1.GameServerLister, nodeCounts map[string]gameservers.NodeCount) (int32, bool, error) {
384+
// Ensure the scheduled autoscaler feature gate is enabled
385+
if !runtime.FeatureEnabled(runtime.FeatureScheduledAutoscaler) {
386+
return 0, false, errors.Errorf("cannot apply ChainPolicy unless feature flag %s is enabled", runtime.FeatureScheduledAutoscaler)
387+
}
388+
389+
// Loop over all entries in the chain
390+
for _, entry := range c {
391+
switch entry.Type {
392+
case autoscalingv1.SchedulePolicyType:
393+
schedRep, schedLim, schedErr := applySchedulePolicy(entry.Schedule, f, gameServerLister, nodeCounts)
394+
// If the schedule is active and no error was returned from the policy, then return the replicas, limited and error
395+
if isScheduleActive(entry.Schedule) && schedErr == nil {
396+
return schedRep, schedLim, schedErr
397+
}
398+
case autoscalingv1.WebhookPolicyType:
399+
webhookRep, webhookLim, webhookErr := applyWebhookPolicy(entry.Webhook, f)
400+
if webhookErr == nil {
401+
return webhookRep, webhookLim, webhookErr
402+
}
403+
default:
404+
return computeDesiredFleetSize(entry.FleetAutoscalerPolicy, f, gameServerLister, nodeCounts)
405+
}
406+
}
407+
408+
return f.Status.Replicas, false, nil
409+
}
410+
411+
// isScheduleActive checks if a chain entry's is active and returns a boolean, true if active, false otherwise
412+
func isScheduleActive(s *autoscalingv1.SchedulePolicy) bool {
413+
now := time.Now()
414+
scheduleDelta := time.Minute * -1
415+
416+
// If a start time is present and the current time is before the start time, the schedule is inactive so return false
417+
startTime := s.Between.Start.Time
418+
if !startTime.IsZero() && now.Before(startTime) {
419+
return false
420+
}
421+
422+
// If an end time is present and the current time is after the end time, the schedule is inactive so return false
423+
endTime := s.Between.End.Time
424+
if !endTime.IsZero() && now.After(endTime) {
425+
return false
426+
}
427+
428+
// If no startCron field is specified, then it's automatically true (duration is no longer relevant since we're always running)
429+
if s.ActivePeriod.StartCron == "" {
430+
return true
431+
}
432+
433+
location, _ := time.LoadLocation(s.ActivePeriod.Timezone)
434+
startCron, _ := cron.ParseStandard(s.ActivePeriod.StartCron)
435+
nextStart := startCron.Next(now.In(location)).Add(scheduleDelta)
436+
duration, err := time.ParseDuration(s.ActivePeriod.Duration)
437+
438+
// If there's an err, then the duration field is empty, meaning duration is indefinite
439+
if err != nil {
440+
duration = 0 // Indefinite duration if not set
441+
}
442+
443+
// If the current time is after the next start time, and the duration is indefinite or the current time is before the next start time + duration,
444+
// then return true
445+
if now.After(nextStart) && (duration == 0 || now.Before(nextStart.Add(duration))) {
446+
return true
447+
}
448+
449+
return false
450+
}
451+
365452
// getSortedGameServers returns the list of Game Servers for the Fleet in the order in which the
366453
// Game Servers would be deleted.
367454
func getSortedGameServers(f *agonesv1.Fleet, gameServerLister listeragonesv1.GameServerLister,

pkg/fleetautoscalers/fleetautoscalers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ func TestComputeDesiredFleetSize(t *testing.T) {
187187
_, cancel := agtesting.StartInformers(m, gameServers.Informer().HasSynced)
188188
defer cancel()
189189

190-
replicas, limited, err := computeDesiredFleetSize(fas, f, gameServers.Lister(), nc)
190+
replicas, limited, err := computeDesiredFleetSize(fas.Spec.Policy, f, gameServers.Lister(), nc)
191191

192192
if tc.expected.err != "" && assert.NotNil(t, err) {
193193
assert.Equal(t, tc.expected.err, err.Error())

0 commit comments

Comments
 (0)