Skip to content

Commit be0b886

Browse files
committed
Resolve issue #29 handle pe relocation
1 parent 42d9c18 commit be0b886

File tree

2 files changed

+105
-18
lines changed

2 files changed

+105
-18
lines changed

streams-metric-exporter/src/main/java/streams/metric/exporter/streamstracker/instance/StreamsInstanceTracker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1282,7 +1282,8 @@ private double getInstanceStatusAsMetric() {
12821282
case PARTIALLY_RUNNING:
12831283
case PARTIALLY_FAILED:
12841284
case STOPPING:
1285-
value = 0.5;
1285+
value = 0.5;
1286+
break;
12861287
default:
12871288
value = 0;
12881289
}

streams-metric-exporter/src/main/java/streams/metric/exporter/streamstracker/job/JobDetails.java

Lines changed: 103 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public class JobDetails implements NotificationListener {
9898
// Control over complete refresh of job required before next refresh
9999
private boolean jobTopologyRefreshRequired = false;
100100

101-
private final Map<BigInteger, String> peResourceMap = new HashMap<BigInteger, String>();
101+
//private final Map<BigInteger, String> peResourceMap = new HashMap<BigInteger, String>();
102102
private final Map<String, String> operatorKindMap = new HashMap<String, String>();
103103
private final Map<String, Map<Integer, String>> operatorInputPortNames = new HashMap<String, Map<Integer, String>>();
104104
private final Map<String, Map<Integer, String>> operatorOutputPortNames = new HashMap<String, Map<Integer, String>>();
@@ -171,7 +171,7 @@ public JobDetails(StreamsInstanceTracker monitor, BigInteger jobid, JobMXBean jo
171171
}
172172

173173
try {
174-
mapResources(beanSource);
174+
//mapResources(beanSource);
175175
mapPortNames(beanSource);
176176
} catch (Exception e) {
177177
String message = "Unable to create resource or operator port names";
@@ -202,7 +202,7 @@ private void handleTopologyRefresh() {
202202
// Reset the port mappings as this will change when a topology change occurs
203203
try {
204204
MXBeanSource beanSource = monitor.getContext().getBeanSourceProvider().getBeanSource();
205-
this.mapResources(beanSource);
205+
//this.mapResources(beanSource);
206206

207207
operatorInputPortNames.clear();
208208
operatorOutputPortNames.clear();
@@ -250,7 +250,10 @@ public void setJobMetrics(String jobMetrics) {
250250
this.jobMetrics = resolveMappings(jobMetrics);
251251
updateExportedMetrics();
252252
}
253+
254+
253255

256+
254257
public String getJobSnapshot() {
255258
return jobSnapshot;
256259
}
@@ -381,7 +384,7 @@ public JobMXBean.Health getHealth() {
381384

382385
public void setHealth(JobMXBean.Health health) {
383386
this.health = health;
384-
metricsExporter.getStreamsMetric("healthy", StreamsObjectType.JOB, this.domain, this.streamsInstanceName, this.name).set((this.getHealth() == JobMXBean.Health.HEALTHY?1:0));
387+
//metricsExporter.getStreamsMetric("healthy", StreamsObjectType.JOB, this.domain, this.streamsInstanceName, this.name).set((this.getHealth() == JobMXBean.Health.HEALTHY?1:0));
385388

386389
}
387390

@@ -612,10 +615,15 @@ public String getSnapshot(int maximumDepth, boolean includeStaticAttributes) thr
612615
public void handleNotification(Notification notification, Object handback) {
613616
try {
614617
String notificationType = notification.getType();
615-
LOGGER.trace("* Job Notification: " + notification);
618+
LOGGER.trace("* Job Notification Type: " + notificationType + ", notification: " + notification);
616619

617620
switch (notificationType) {
618-
621+
case Notifications.JOB_CHANGED:
622+
// JOB Changed notification can include PE relocated, but no user data to get more specific. Asuume topology changed
623+
LOGGER.debug("Instance({}) Job ({}) JOB_CHANGED Notification: Assume topology changed", this.instance, this.getJobid());
624+
this.jobTopologyRefreshRequired = true;
625+
break;
626+
619627
case AttributeChangeNotification.ATTRIBUTE_CHANGE:
620628
AttributeChangeNotification acn = (AttributeChangeNotification) notification;
621629
LOGGER.debug("* INSTANCE ({}) Job ({}) Notification: attribute ({}) changed from: {} to: {}", this.instance, this.getJobid(), acn.getAttributeName(), acn.getOldValue(), acn.getNewValue());
@@ -648,6 +656,7 @@ public void handleNotification(Notification notification, Object handback) {
648656
}
649657
}
650658

659+
/*
651660
// Create mapping of peid to resource name it is running on
652661
private void mapResources(MXBeanSource beanSource) {
653662
Set<BigInteger> pes = getJobBean().getPes();
@@ -660,6 +669,7 @@ private void mapResources(MXBeanSource beanSource) {
660669
peResourceMap.put(peid,peBean.getResource());
661670
}
662671
}
672+
*/
663673

664674
private void mapPortNames(MXBeanSource beanSource) {
665675
Set<String> operators = getJobBean().getOperators();
@@ -719,8 +729,9 @@ private String resolveMappings(String metricsSnapshot) {
719729
for (int i = 0; i < peArray.size(); i++) {
720730
JSONObject pe = (JSONObject) peArray.get(i);
721731

722-
// Add resource name
723-
resolveResource(pe);
732+
// Add resource name, status, and health
733+
//resolveResource(pe);
734+
enrichPEMetrics(pe);
724735

725736
JSONArray operatorArray = (JSONArray) pe.get("operators");
726737

@@ -741,14 +752,7 @@ private String resolveMappings(String metricsSnapshot) {
741752
return metricsSnapshot;
742753
}
743754

744-
// For the given pe JsonObject, lookup its id in the resource map and set a new resource attribute to be used in export metric labels
745-
@SuppressWarnings("unchecked")
746-
private void resolveResource(JSONObject pe) {
747-
BigInteger peid = new BigInteger(pe.get("id").toString());
748-
String resource = peResourceMap.get(peid);
749-
LOGGER.trace("Resolving PE ({}) to Resource ({}) mapping in job ({}) metrics",peid.toString(),resource,getJobid().toString());
750-
pe.put("resource",resource);
751-
}
755+
752756

753757
@SuppressWarnings("unchecked")
754758
private void resolveOperatorInputPortNames(JSONObject operator) {
@@ -802,6 +806,44 @@ private void resolveOperatorOutputPortNames(JSONObject operator) {
802806
}
803807
}
804808

809+
/*
810+
// For the given pe JsonObject, lookup its id in the resource map and set a new resource attribute to be used in export metric labels
811+
@SuppressWarnings("unchecked")
812+
private void resolveResource(JSONObject pe) {
813+
BigInteger peid = new BigInteger(pe.get("id").toString());
814+
String resource = peResourceMap.get(peid);
815+
LOGGER.trace("Resolving PE ({}) to Resource ({}) mapping in job ({}) metrics",peid.toString(),resource,getJobid().toString());
816+
pe.put("resource",resource);
817+
}
818+
*/
819+
820+
/* This is a temporary approach to getting PE status and health into the metrics JSON so we can
821+
supress certain metrics when the PE is restarting */
822+
@SuppressWarnings("unchecked")
823+
private void enrichPEMetrics(JSONObject metrics_pe) {
824+
String metrics_peid = metrics_pe.get("id").toString();
825+
if (this.jobSnapshot != null) {
826+
JSONParser parser = new JSONParser();
827+
try {
828+
JSONObject snapshotObject = (JSONObject) parser.parse(this.jobSnapshot);
829+
JSONArray peArray = (JSONArray) snapshotObject.get("pes");
830+
for (int i = 0; i < peArray.size(); i++) {
831+
JSONObject pe = (JSONObject) peArray.get(i);
832+
String peid = (String)pe.get("id");
833+
if (metrics_peid.equals(peid)) {
834+
LOGGER.debug("enrichPEMetrics: enriching metrics pe: " + peid + " from snapshot");
835+
LOGGER.debug("enrichPEMetrics: snapshot pe: " + peArray.get(i));
836+
metrics_pe.put("status",pe.get("status"));
837+
metrics_pe.put("health",pe.get("health"));
838+
metrics_pe.put("resource",pe.get("resource"));
839+
}
840+
}
841+
} catch (ParseException e) {
842+
throw new IllegalStateException(e);
843+
}
844+
}
845+
}
846+
805847
private String getOperatorName(JSONObject operator) {
806848
return operator.get("name").toString();
807849
}
@@ -851,19 +893,32 @@ private void updateExportedMetrics() {
851893
JSONParser parser = new JSONParser();
852894
try {
853895
JSONObject metricsObject = (JSONObject) parser.parse(this.jobMetrics);
854-
855896
JSONArray peArray = (JSONArray) metricsObject.get("pes");
856897

857898
/* Job Metrics */
858899
long ncpu = 0, nrmc = 0, nmc = 0;
859900
long numconnections = 0, totalcongestion = 0, curcongestion = 0;
860901
long maxcongestion = 0 , avgcongestion = 0, mincongestion = 999;
902+
LOGGER.debug("Metrics, job status: " + this.getStatus());
903+
LOGGER.debug("Metrics, job helath: " + this.getHealth());
861904
/* PE Loop */
862905
for (int i = 0; i < peArray.size(); i++) {
863906
JSONObject pe = (JSONObject) peArray.get(i);
864907
String peid = (String)pe.get("id");
908+
String status = (String)pe.get("status");
909+
String health = (String)pe.get("health");
865910
String resource = (String)pe.get("resource");
866911

912+
LOGGER.debug("Metrics, pe: " + peid + " resource: " + resource);
913+
LOGGER.debug("Metrics, pe: " + peid + " status: " + status);
914+
LOGGER.debug("Metrics, pe: " + peid + " health: " + health);
915+
916+
// If the PE is not healthy, then its resource may not be correct while it is being
917+
// relocated, and we cannot create / update those metrics
918+
if (!health.equalsIgnoreCase("healthy")) {
919+
LOGGER.info("Metrics, pe: " + peid + " is NOT healthy, NOT setting metrics");
920+
continue; // skip to next pe in loop
921+
}
867922

868923
JSONArray peMetricsArray = (JSONArray) pe.get("metrics");
869924
/* PE Metrics Loop */
@@ -872,6 +927,7 @@ private void updateExportedMetrics() {
872927
String metricName = (String)metric.get("name");
873928
switch (metricName) {
874929
case "nCpuMilliseconds":
930+
LOGGER.debug("Metrics, pe: " + peid + " resource: " + resource + " nCpuMilliseconds: " + metric.get("value"));
875931
ncpu += (long)metric.get("value");
876932
break;
877933
case "nResidentMemoryConsumption":
@@ -1084,6 +1140,12 @@ private void updateExportedSnapshotMetrics() {
10841140
try {
10851141
JSONObject snapshotObject = (JSONObject) parser.parse(this.jobSnapshot);
10861142

1143+
String health = (String)snapshotObject.get("health");
1144+
LOGGER.debug("snapshot Metrics job health: " + health);
1145+
metricsExporter.getStreamsMetric("healthy", StreamsObjectType.JOB, this.domain, this.streamsInstanceName, this.name).set(getHealthAsMetric(health));
1146+
1147+
1148+
10871149
JSONArray peArray = (JSONArray) snapshotObject.get("pes");
10881150

10891151
// Metrics to create
@@ -1094,6 +1156,12 @@ private void updateExportedSnapshotMetrics() {
10941156
JSONObject pe = (JSONObject) peArray.get(i);
10951157
String peid = (String)pe.get("id");
10961158
String resource = (String)pe.get("resource");
1159+
1160+
1161+
LOGGER.debug("Snapshots, pe: " + peid + " status: " + (String)pe.get("status"));
1162+
LOGGER.debug("Snapshots, pe: " + peid + " helath: " + (String)pe.get("health"));
1163+
LOGGER.debug("Snapshots, pe: " + peid + " resource: " + resource);
1164+
10971165

10981166
launchCount = (long)pe.get("launchCount");
10991167

@@ -1110,4 +1178,22 @@ private void updateExportedSnapshotMetrics() {
11101178
}
11111179
} // end if snapshot != null
11121180
}
1181+
1182+
private double getHealthAsMetric(String health) {
1183+
double value = 0;
1184+
switch (JobMXBean.Health.fromString(health)) {
1185+
case HEALTHY :
1186+
value = 1;
1187+
break;
1188+
case PARTIALLY_HEALTHY:
1189+
case PARTIALLY_UNHEALTHY:
1190+
value = 0.5;
1191+
break;
1192+
default:
1193+
value = 0;
1194+
}
1195+
LOGGER.debug("getHealthAsMetric(" + health + ") = " + value);
1196+
return value;
1197+
}
1198+
11131199
}

0 commit comments

Comments
 (0)