Skip to content
Merged
6 changes: 6 additions & 0 deletions prover/crates/bin/prover_autoscaler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ agent_config:
- `apply_min_to_namespace` specifies current primary namespace to run min number of provers in it.
- `long_pending_duration` is time after a pending pod considered long pending and will be relocated to different
cluster. Default: 10m.
- `scale_errors_duration` is duration of scale errors during which Autoscaler includes scale errors into calculation.
Default: 1h.
- `need_to_move_duration` is duration of scale errors during which Autoscaler force moves pods to a different cluster.
Should be at least x2 of `scaler_run_interval`. Default: 4m.
- `scaler_targets` subsection is a list of non-GPU targets:
- `scaler_target_type` specifies the type, possible options: `Simple` (default) and `Gpu`.
- `queue_report_field` is name of corresponding queue report section. See example for possible options.
Expand Down Expand Up @@ -185,6 +189,8 @@ scaler_config:
cluster3: 200
apply_min_to_namespace: prover-new
long_pending_duration: 10m
scale_errors_duration: 1h
need_to_move_duration: 4m
scaler_targets:
- queue_report_field: prover_jobs
scaler_target_type: Gpu
Expand Down
22 changes: 22 additions & 0 deletions prover/crates/bin/prover_autoscaler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ pub struct ProverAutoscalerScalerConfig {
default = "ProverAutoscalerScalerConfig::default_long_pending_duration"
)]
pub long_pending_duration: Duration,
/// Duration of scale errors during which Autoscaler includes scale errors into calculation.
#[serde(
with = "humantime_serde",
default = "ProverAutoscalerScalerConfig::default_scale_errors_duration"
)]
pub scale_errors_duration: Duration,
/// Duration of scale errors during which Autoscaler force moves pods to a different cluster.
#[serde(
with = "humantime_serde",
default = "ProverAutoscalerScalerConfig::default_need_to_move_duration"
)]
pub need_to_move_duration: Duration,
/// List of simple autoscaler targets.
pub scaler_targets: Vec<ScalerTarget>,
/// If dry-run enabled don't send any scale requests.
Expand Down Expand Up @@ -177,6 +189,16 @@ impl ProverAutoscalerScalerConfig {
pub fn default_long_pending_duration() -> Duration {
Duration::from_secs(600)
}

/// Default long_pending_duration -- 1h
pub fn default_scale_errors_duration() -> Duration {
Duration::from_secs(3600)
}

/// Default long_pending_duration -- 4m
pub fn default_need_to_move_duration() -> Duration {
Duration::from_secs(4 * 60)
}
}

impl ScalerTarget {
Expand Down
49 changes: 21 additions & 28 deletions prover/crates/bin/prover_autoscaler/src/global/manager.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};

use super::{
queuer,
scaler::{Scaler, ScalerTrait},
scaler::{Scaler, ScalerConfig, ScalerTrait},
watcher,
};
use crate::{
agent::ScaleRequest,
cluster_types::{ClusterName, Clusters, NamespaceName},
cluster_types::{ClusterName, NamespaceName},
config::{ProverAutoscalerScalerConfig, QueueReportFields, ScalerTargetType},
key::{GpuKey, NoKey},
metrics::AUTOSCALER_METRICS,
Expand Down Expand Up @@ -40,6 +40,21 @@ impl Manager {

let mut scalers: Vec<Box<dyn ScalerTrait + Sync + Send>> = Vec::default();
let mut jobs = Vec::default();

let scaler_config = Arc::new(ScalerConfig {
cluster_priorities: config.cluster_priorities,
apply_min_to_namespace: config.apply_min_to_namespace,
long_pending_duration: chrono::Duration::seconds(
config.long_pending_duration.as_secs() as i64,
),
scale_errors_duration: chrono::Duration::seconds(
config.scale_errors_duration.as_secs() as i64,
),
need_to_move_duration: chrono::Duration::seconds(
config.need_to_move_duration.as_secs() as i64,
),
});

for c in &config.scaler_targets {
jobs.push(c.queue_report_field);
match c.scaler_target_type {
Expand All @@ -52,9 +67,7 @@ impl Manager {
.map(|(k, v)| (k.clone(), v.into_map_gpukey()))
.collect(),
c.speed.into_map_gpukey(),
config.cluster_priorities.clone(),
config.apply_min_to_namespace.clone(),
chrono::Duration::seconds(config.long_pending_duration.as_secs() as i64),
scaler_config.clone(),
))),
ScalerTargetType::Simple => scalers.push(Box::new(Scaler::<NoKey>::new(
c.queue_report_field,
Expand All @@ -65,9 +78,7 @@ impl Manager {
.map(|(k, v)| (k.clone(), v.into_map_nokey()))
.collect(),
c.speed.into_map_nokey(),
config.cluster_priorities.clone(),
config.apply_min_to_namespace.clone(),
chrono::Duration::seconds(config.long_pending_duration.as_secs() as i64),
scaler_config.clone(),
))),
};
}
Expand All @@ -81,22 +92,6 @@ impl Manager {
}
}

/// is_namespace_running returns true if there are some pods running in it.
fn is_namespace_running(namespace: &NamespaceName, clusters: &Clusters) -> bool {
clusters
.clusters
.values()
.flat_map(|v| v.namespaces.iter())
.filter_map(|(k, v)| if k == namespace { Some(v) } else { None })
.flat_map(|v| v.deployments.values())
.map(
|d| d.running + d.desired, // If there is something running or expected to run, we
// should re-evaluate the namespace.
)
.sum::<usize>()
> 0
}

#[async_trait::async_trait]
impl Task for Manager {
async fn invoke(&self) -> anyhow::Result<()> {
Expand All @@ -123,9 +118,7 @@ impl Task for Manager {
"Running eval for namespace {ns}, PPV {ppv}, scaler {} found queue {q}",
scaler.deployment()
);
if q > 0 || is_namespace_running(ns, &guard.clusters) {
scaler.run(ns, q, &guard.clusters, &mut scale_requests);
}
scaler.run(ns, q, &guard.clusters, &mut scale_requests);
}
}
} // Unlock self.watcher.data.
Expand Down
Loading
Loading