Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
4ee3856
feat: Receive OTLP/HTTP logs and transform to unified entity
bigcyy Jul 2, 2025
abce871
improvement: Add license
bigcyy Jul 2, 2025
ad5baf2
Merge branch 'master' into OSPP#25bef0067
bigcyy Jul 2, 2025
05a410b
improvement: modify source to protocol
bigcyy Jul 2, 2025
f5f3221
Merge branch 'OSPP#25bef0067' of github.com:bigcyy/hertzbeat into OSP…
bigcyy Jul 2, 2025
420c9f0
feat: add otlp log integration ui
bigcyy Jul 3, 2025
5e50d5f
bugfix: fix error en doc
bigcyy Jul 3, 2025
a14a279
improvement: code style
bigcyy Jul 4, 2025
04cf287
Merge branch 'master' into OSPP#25bef0067
bigcyy Jul 4, 2025
dd405a2
refactor: realtime to metrics_realtime and log_realtime, periodic to …
bigcyy Jul 5, 2025
600e505
feat: support send logEntry to commonDataQueue
bigcyy Jul 5, 2025
1cd5f85
feat: send log to commonDataQueue
bigcyy Jul 5, 2025
d12e0d6
feat: Add log realtime calculator and some ui
bigcyy Jul 13, 2025
0f5cb35
improvement: Modify xxx_realtime and xxx_periodic to realtime_xxx and…
bigcyy Jul 13, 2025
9a94f25
improvement: Place the logs section above the alerts section
bigcyy Jul 13, 2025
d06ea81
improvement: move data type selector
bigcyy Jul 15, 2025
98ca234
feat: Save log to db
bigcyy Jul 17, 2025
b7811cc
feat: log stream show
bigcyy Jul 20, 2025
b432940
improvement: To make the log display more compact and unify the code …
bigcyy Jul 20, 2025
9ecdf66
merge main
bigcyy Jul 21, 2025
2e049dd
Merge branch 'master' into OSPP#25bef0067
bigcyy Jul 28, 2025
116b74f
temp
bigcyy Jul 30, 2025
5979645
feat: Modify the alert_define table definition to support log queryin…
bigcyy Jul 31, 2025
ec2513c
improvement: Remove real-time log alert recovery and delete unused im…
bigcyy Aug 2, 2025
611ea49
improvement: Split PeriodicAlertCalculator to logxx and metricsxxx
bigcyy Aug 3, 2025
7c17778
improvament: keep the firing fingerprint but add the log entity to al…
bigcyy Aug 5, 2025
1fa8e0c
improvement: refactor alert calculate, add window aggregator to log r…
bigcyy Aug 11, 2025
74d98bc
Merge branch 'master' into OSPP#25bef0067
bigcyy Aug 11, 2025
5c0b77c
improvement: remove chinese comments
bigcyy Aug 11, 2025
e00a109
improvement: fix code style
bigcyy Aug 11, 2025
888d4dd
bugfix: fix common data queue test and kafka data queue topic
bigcyy Aug 11, 2025
9fb1731
bugfix: fix metrics real time alert not add firing status to cache
bigcyy Aug 11, 2025
734ef15
bugfix: fix mock jexl expr calculator default return false
bigcyy Aug 11, 2025
5ca536e
feat: Add log manage module
bigcyy Aug 16, 2025
d7a11d8
merge main
bigcyy Aug 17, 2025
54aa761
bugfix: fix flyway mysql sql script error
bigcyy Aug 17, 2025
2ccadd9
Merge branch 'master' into OSPP#25bef0067
bigcyy Aug 17, 2025
50344ff
bugfix: add license header
bigcyy Aug 17, 2025
3acdc4d
Merge branch 'OSPP#25bef0067' of github.com:bigcyy/hertzbeat into OSP…
bigcyy Aug 17, 2025
c19b404
Merge branch 'master' into OSPP#25bef0067
Calvin979 Aug 19, 2025
4075e11
Merge branch 'master' into OSPP#25bef0067
tomsun28 Aug 19, 2025
363040b
Merge branch 'master' into OSPP#25bef0067
zhangshenghang Aug 20, 2025
2607f83
improvement: Correct thread pool scaling in AlerterWorkerPool
bigcyy Aug 21, 2025
5d738a4
Update web-app/src/assets/i18n/zh-TW.json
bigcyy Aug 21, 2025
72d266f
improvement: Remove console.log
bigcyy Aug 21, 2025
649319c
improvement: Use substring() instead of deprecated substr() method
bigcyy Aug 21, 2025
821fa9f
improvement: Replace deprecated document.execCommand('copy')
bigcyy Aug 21, 2025
dbda9fa
improvement: Use substring() instead of deprecated substr() method
bigcyy Aug 21, 2025
1c8317d
Merge branch 'master' into OSPP#25bef0067
bigcyy Aug 21, 2025
3c203d0
test: add unit test for log ingestion controller and add json produces
bigcyy Aug 22, 2025
1b485ba
Merge branch 'master' into OSPP#25bef0067
bigcyy Aug 25, 2025
9c48868
feat: add otlp adapter unit test
bigcyy Aug 25, 2025
8eb1149
improvement: replace import *
bigcyy Aug 25, 2025
4ee3ea9
Merge branch 'master' into OSPP#25bef0067
bigcyy Aug 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ public class AlerterWorkerPool {

private ThreadPoolExecutor workerExecutor;
private ThreadPoolExecutor notifyExecutor;
private ThreadPoolExecutor logWorkerExecutor;

public AlerterWorkerPool() {
initWorkExecutor();
initNotifyExecutor();
initLogWorkerExecutor();
}

private void initWorkExecutor() {
Expand Down Expand Up @@ -77,6 +79,21 @@ private void initNotifyExecutor() {
new ThreadPoolExecutor.AbortPolicy());
}

private void initLogWorkerExecutor() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setUncaughtExceptionHandler((thread, throwable) -> {
log.error("Alerter logWorkerExecutor has uncaughtException.");
log.error(throwable.getMessage(), throwable);
})
.setDaemon(true)
.setNameFormat("log-worker-%d")
.build();
logWorkerExecutor = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
threadFactory,
new ThreadPoolExecutor.AbortPolicy());
}

/**
* Run the alerter task
* @param runnable task
Expand All @@ -96,4 +113,13 @@ public void executeNotify(Runnable runnable) throws RejectedExecutionException {
notifyExecutor.execute(runnable);
}

/**
* Executes the given runnable task using the logWorkerExecutor.
*
* @param runnable the task to be executed
* @throws RejectedExecutionException if the task cannot be accepted for execution
*/
public void executeLogJob(Runnable runnable) throws RejectedExecutionException {
logWorkerExecutor.execute(runnable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hertzbeat.alert.calculate;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.jexl3.JexlException;
import org.apache.commons.jexl3.JexlExpression;
import org.apache.hertzbeat.common.util.JexlExpressionRunner;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
* JexlExprCalculator is a utility class for evaluating JEXL expressions
*/
@Slf4j
@Component
public class JexlExprCalculator {
/**
* Execute an alert expression
* @param fieldValueMap The field value map for expression evaluation
* @param expr The expression to evaluate
* @param ignoreJexlException Whether to ignore JEXL exceptions
* @return true if the expression matches, false otherwise
*/
public boolean execAlertExpression(Map<String, Object> fieldValueMap, String expr, boolean ignoreJexlException) {
Boolean match;
JexlExpression expression;
try {
expression = JexlExpressionRunner.compile(expr);
} catch (JexlException jexlException) {
log.warn("Alarm Rule: {} Compile Error: {}.", expr, jexlException.getMessage());
throw jexlException;
} catch (Exception e) {
log.error("Alarm Rule: {} Unknown Error: {}.", expr, e.getMessage());
throw e;
}

try {
match = (Boolean) JexlExpressionRunner.evaluate(expression, fieldValueMap);
} catch (JexlException jexlException) {
if (ignoreJexlException) {
log.debug("Alarm Rule: {} Run Error: {}.", expr, jexlException.getMessage());
} else {
log.error("Alarm Rule: {} Run Error: {}.", expr, jexlException.getMessage());
}
throw jexlException;
} catch (Exception e) {
log.error("Alarm Rule: {} Unknown Error: {}.", expr, e.getMessage());
throw e;
}
return match != null && match;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hertzbeat.alert.calculate.periodic;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hertzbeat.alert.calculate.AlarmCacheManager;
import org.apache.hertzbeat.alert.calculate.JexlExprCalculator;
import org.apache.hertzbeat.alert.reduce.AlarmCommonReduce;
import org.apache.hertzbeat.alert.service.DataSourceService;
import org.apache.hertzbeat.alert.util.AlertTemplateUtil;
import org.apache.hertzbeat.common.constants.CommonConstants;
import org.apache.hertzbeat.common.entity.alerter.AlertDefine;
import org.apache.hertzbeat.common.entity.alerter.SingleAlert;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


/**
* Log Periodic Alert Calculator
*/
@Slf4j
@Component
public class LogPeriodicAlertCalculator {

private static final String ROWS = "__rows__";
private static final String ALERT_MODE_LABEL = "alert_mode";
private static final String ALERT_MODE_GROUP = "group";
private static final String ALERT_MODE_INDIVIDUAL = "individual";

private final DataSourceService dataSourceService;
private final AlarmCommonReduce alarmCommonReduce;
private final AlarmCacheManager alarmCacheManager;
private final JexlExprCalculator jexlExprCalculator;


public LogPeriodicAlertCalculator(DataSourceService dataSourceService, AlarmCommonReduce alarmCommonReduce,
AlarmCacheManager alarmCacheManager, JexlExprCalculator jexlExprCalculator) {
this.alarmCommonReduce = alarmCommonReduce;
this.alarmCacheManager = alarmCacheManager;
this.jexlExprCalculator = jexlExprCalculator;
this.dataSourceService = dataSourceService;
}

public void calculate(AlertDefine define) {
if (!define.isEnable() || StringUtils.isEmpty(define.getExpr())) {
log.error("Log define {} is disabled or expression is empty", define.getName());
return;
}
try {
doCalculate(define);
} catch (Exception e) {
log.error("Calculate periodic define {} failed: {}", define.getName(), e.getMessage());
}
}

private void doCalculate(AlertDefine define) {
try {
// Log-based queries are SQL queries with log-specific expressions
List<Map<String, Object>> results = dataSourceService.query(define.getDatasource(), define.getQueryExpr());
results = this.calculateLogThreshold(results, define.getExpr());

// If no match the expr threshold, the results item map {'value': null} should be null and others field keep
// If results has multi list, should trigger multi alert
if (CollectionUtils.isEmpty(results)) {
return;
}
afterThresholdRuleMatch(results, define);
} catch (Exception ignored) {
// Ignore the query exception eg: no result, timeout, etc
}
}

/**
* Calculate log threshold evaluation
* @param results Query results from log datasource
* @param expression Alert expression for log analysis
* @return Filtered results that match the log threshold
*/
private List<Map<String, Object>> calculateLogThreshold(List<Map<String, Object>> results, String expression) {
if (CollectionUtils.isEmpty(results)) {
return List.of();
}
List<Map<String, Object>> newResults = new ArrayList<>(results.size());
for (Map<String, Object> result : results) {
HashMap<String, Object> fieldMap = new HashMap<>(result);
fieldMap.put(ROWS, results.size());
boolean match = jexlExprCalculator.execAlertExpression(fieldMap, expression, true);
if (match) {
newResults.add(result);
}
}
return newResults;
}


private String getAlertMode(AlertDefine alertDefine) {
String mode = null;
if (alertDefine.getLabels() != null) {
mode = alertDefine.getLabels().get(ALERT_MODE_LABEL);
}
if (mode == null || mode.isEmpty()) {
return ALERT_MODE_GROUP; // Default to group mode if not specified
} else {
return mode;
}
}

/**
* Handle alert after threshold rule match
*/
private void afterThresholdRuleMatch(List<Map<String, Object>> alertContext, AlertDefine define) {
// Determine alert mode from configuration
String alertMode = getAlertMode(define);

long currentTime = System.currentTimeMillis();

switch (alertMode) {
case ALERT_MODE_INDIVIDUAL:
// Generate individual alerts for each matching log
for (Map<String, Object> context : alertContext) {
generateIndividualAlert(define, context, currentTime);
}
break;

case ALERT_MODE_GROUP:
// Generate a single alert group for all matching logs
generateGroupAlert(define, alertContext, currentTime);
break;
default:
log.warn("Unknown alert mode for define {}: {}", define.getName(), alertMode);
}
}

private void generateIndividualAlert(AlertDefine define, Map<String, Object> context, long currentTime) {

Map<String, String> alertLabels = new HashMap<>(8);

Map<String, String> commonFingerPrints = createCommonFingerprints(define);
alertLabels.putAll(commonFingerPrints);
addContextToMap(context, alertLabels);

Map<String, Object> fieldValueMap = createFieldValueMap(context, define);
Map<String, String> alertAnnotations = createAlertAnnotations(define, fieldValueMap);
// Create and send group alert
SingleAlert alert = SingleAlert.builder()
.labels(alertLabels)
.annotations(alertAnnotations)
.content(AlertTemplateUtil.render(define.getTemplate(), fieldValueMap))
.status(CommonConstants.ALERT_STATUS_FIRING)
.triggerTimes(1)
.startAt(currentTime)
.activeAt(currentTime)
.build();

alarmCommonReduce.reduceAndSendAlarm(alert.clone());

log.debug("Generated individual alert for define: {}", define.getName());
}

private void addContextToMap(Map<String, Object> context, Map<String, String> alertLabels) {
for (Map.Entry<String, Object> entry : context.entrySet()) {
if (entry.getValue() != null) {
alertLabels.put(entry.getKey(), entry.getValue().toString());
}
}
}

private void generateGroupAlert(AlertDefine define, List<Map<String, Object>> alertContext, long currentTime) {

List<SingleAlert> alerts = new ArrayList<>(alertContext.size());

// Create fingerprints for group alert
Map<String, String> commonFingerPrints = createCommonFingerprints(define);

// Add context information to fingerprints
commonFingerPrints.put(ROWS, String.valueOf(alertContext.size()));
commonFingerPrints.put(ALERT_MODE_LABEL, ALERT_MODE_GROUP);

for (Map<String, Object> context : alertContext) {

Map<String, String> alertLabels = new HashMap<>(8);

alertLabels.putAll(commonFingerPrints);
// add the context to commonFingerPrints
addContextToMap(context, alertLabels);

Map<String, Object> fieldValueMap = createFieldValueMap(context, define);
Map<String, String> alertAnnotations = createAlertAnnotations(define, fieldValueMap);
// Create and send group alert
SingleAlert alert = SingleAlert.builder()
.labels(alertLabels)
.annotations(alertAnnotations)
.content(AlertTemplateUtil.render(define.getTemplate(), fieldValueMap))
.status(CommonConstants.ALERT_STATUS_FIRING)
.triggerTimes(alertContext.size())
.startAt(currentTime)
.activeAt(currentTime)
.build();
alerts.add(alert.clone());
}
alarmCommonReduce.reduceAndSendAlarmGroup(commonFingerPrints, alerts);

log.debug("Generated group alert for define: {} with {} matching data",
define.getName(), alertContext.size());
}



private Map<String, String> createCommonFingerprints(AlertDefine define) {
Map<String, String> fingerprints = new HashMap<>(8);
fingerprints.put(CommonConstants.LABEL_ALERT_NAME, define.getName());
fingerprints.put(CommonConstants.LABEL_DEFINE_ID, String.valueOf(define.getId()));

if (define.getLabels() != null) {
fingerprints.putAll(define.getLabels());
}

return fingerprints;
}

private Map<String, Object> createFieldValueMap(Map<String, Object> context, AlertDefine define) {
Map<String, Object> fieldValueMap = new HashMap<>(8);
for (Map.Entry<String, Object> entry : context.entrySet()) {
if (entry.getValue() != null) {
fieldValueMap.put(entry.getKey(), entry.getValue().toString());
}
}
if (define.getLabels() != null) {
fieldValueMap.putAll(define.getLabels());
}

return fieldValueMap;
}

private Map<String, String> createAlertAnnotations(AlertDefine define, Map<String, Object> fieldValueMap) {
Map<String, String> annotations = new HashMap<>(8);

if (define.getAnnotations() != null) {
for (Map.Entry<String, String> entry : define.getAnnotations().entrySet()) {
annotations.put(entry.getKey(),
AlertTemplateUtil.render(entry.getValue(), fieldValueMap));
}
}

return annotations;
}

}
Loading
Loading