Skip to content

[FLINK-37155] [Runtime/Coordination] Implementing FLIP-505 for Flink History Server scalability improvements to decouple local and remote storage #26878

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>historyserver.archive.cached-retained-jobs</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The maximum number of n latest jobs to retain in the local directory defined by `historyserver.web.tmpdir`. If this configuration is provided, the remote and local storage of job archives will be decoupled.If set to `0` or less than `-1` HistoryServer will throw an <code class="highlighter-rouge">IllegalConfigurationException</code>. </td>
</tr>
<tr>
<td><h5>historyserver.archive.clean-expired-jobs</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand All @@ -26,6 +32,12 @@
<td>Duration</td>
<td>Interval for refreshing the archived job directories.</td>
</tr>
<tr>
<td><h5>historyserver.archive.num-cached-most-recently-viewed-jobs</h5></td>
<td style="word-wrap: break-word;">1</td>
<td>Integer</td>
<td>The maximum number of jobs to retain in the local cache defined by `historyserver.web.tmpdir` which stores the job archives that are fetched from the remote storage. This limit is distinct from the number of most recent jobs which will in the cache.The total cache size is a combination of the number of remote cache jobs and the number of remote fetch cached jobs and retained cache jobs.If set to less than `0` HistoryServer will throw an <code class="highlighter-rouge">IllegalConfigurationException</code>. </td>
</tr>
<tr>
<td><h5>historyserver.archive.retained-jobs</h5></td>
<td style="word-wrap: break-word;">-1</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,23 @@ public class HistoryServerOptions {
"Enable HTTPs access to the HistoryServer web frontend. This is applicable only when the"
+ " global SSL flag security.ssl.enabled is set to true.");

public static final ConfigOption<Integer> HISTORY_SERVER_CACHED_JOBS =
key("historyserver.archive.cached-retained-jobs")
.intType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
String.format(
"The maximum number of n latest jobs to retain in the local directory defined by `%s`. ",
HISTORY_SERVER_WEB_DIR.key()))
.text(
"If this configuration is provided, the remote and local storage of job archives will be decoupled.")
.text(
"If set to `0` or less than `-1` HistoryServer will throw an %s. ",
code("IllegalConfigurationException"))
.build());

public static final ConfigOption<Integer> HISTORY_SERVER_RETAINED_JOBS =
key("historyserver.archive.retained-jobs")
.intType()
Expand All @@ -143,5 +160,24 @@ public class HistoryServerOptions {
code("IllegalConfigurationException"))
.build());

public static final ConfigOption<Integer> HISTORY_SERVER_NUM_CACHED_MOST_RECENTLY_VIEWED_JOBS =
key("historyserver.archive.num-cached-most-recently-viewed-jobs")
.intType()
.defaultValue(1)
.withDescription(
Description.builder()
.text(
String.format(
"The maximum number of jobs to retain in the local cache defined by `%s` which "
+ "stores the job archives that are fetched from the remote storage. This "
+ "limit is distinct from the number of most recent jobs which will in the cache."
+ "The total cache size is a combination of the number of remote cache jobs and "
+ "the number of remote fetch cached jobs and retained cache jobs.",
HISTORY_SERVER_WEB_DIR.key()))
.text(
"If set to less than `0` HistoryServer will throw an %s. ",
code("IllegalConfigurationException"))
.build());

private HistoryServerOptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public class HistoryServer {

private final HistoryServerArchiveFetcher archiveFetcher;

private final HistoryServerStaticFileServerHandler staticFileServerHandler;

@Nullable private final SSLHandlerFactory serverSSLFactory;
private WebFrontendBootstrap netty;

Expand Down Expand Up @@ -215,12 +217,12 @@ public HistoryServer(
throw new FlinkException(
HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS + " was not configured.");
}
List<RefreshLocation> refreshDirs = new ArrayList<>();
List<HistoryServer.RefreshLocation> refreshDirs = new ArrayList<>();
for (String refreshDirectory : refreshDirectories.split(",")) {
try {
Path refreshPath = new Path(refreshDirectory);
FileSystem refreshFS = refreshPath.getFileSystem();
refreshDirs.add(new RefreshLocation(refreshPath, refreshFS));
refreshDirs.add(new HistoryServer.RefreshLocation(refreshPath, refreshFS));
} catch (Exception e) {
// there's most likely something wrong with the path itself, so we ignore it from
// here on
Expand All @@ -244,13 +246,41 @@ public HistoryServer(
"Cannot set %s to 0 or less than -1",
HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS.key());
}
boolean remoteFetchEnabled =
config.contains(HistoryServerOptions.HISTORY_SERVER_CACHED_JOBS);

int generalCachedJobSize = -1;
if (remoteFetchEnabled) {
generalCachedJobSize = config.get(HistoryServerOptions.HISTORY_SERVER_CACHED_JOBS);
if (generalCachedJobSize == 0 || generalCachedJobSize < -1) {
throw new IllegalConfigurationException(
"Cannot set %s to 0 or less than -1",
HistoryServerOptions.HISTORY_SERVER_CACHED_JOBS.key());
}
}
int numCachedMostRecentlyViewedJobs =
config.get(
HistoryServerOptions.HISTORY_SERVER_NUM_CACHED_MOST_RECENTLY_VIEWED_JOBS);
if (numCachedMostRecentlyViewedJobs <= 0) {
throw new IllegalConfigurationException(
"Cannot set %s to less than 0",
HistoryServerOptions.HISTORY_SERVER_NUM_CACHED_MOST_RECENTLY_VIEWED_JOBS.key());
}

archiveFetcher =
new HistoryServerArchiveFetcher(
refreshDirs,
webDir,
jobArchiveEventListener,
cleanupExpiredArchives,
maxHistorySize);
maxHistorySize,
generalCachedJobSize,
remoteFetchEnabled,
numCachedMostRecentlyViewedJobs);

staticFileServerHandler =
new HistoryServerStaticFileServerHandler(
webDir, remoteFetchEnabled, archiveFetcher);

this.shutdownHook =
ShutdownHookUtil.addShutdownHook(
Expand Down Expand Up @@ -310,7 +340,7 @@ void start() throws IOException, InterruptedException {
new GeneratedLogUrlHandler(
CompletableFuture.completedFuture(pattern))));

router.addGet("/:*", new HistoryServerStaticFileServerHandler(webDir));
router.addGet("/:*", staticFileServerHandler);

createDashboardConfigFile();

Expand Down Expand Up @@ -393,11 +423,11 @@ private static String createConfigJson(DashboardConfiguration dashboardConfigura
}

/** Container for the {@link Path} and {@link FileSystem} of a refresh directory. */
static class RefreshLocation {
public static class RefreshLocation {
private final Path path;
private final FileSystem fs;

private RefreshLocation(Path path, FileSystem fs) {
public RefreshLocation(Path path, FileSystem fs) {
this.path = path;
this.fs = fs;
}
Expand Down
Loading