-
Notifications
You must be signed in to change notification settings - Fork 1
Insert perf metrics into operator #315
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e464c19
29b105d
94e7874
37af34f
712674f
8c888ca
82be126
c80ab64
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
package org.grapheco.metrics | ||
|
||
import util.control.Breaks | ||
|
||
class Domain(dID: String) { | ||
var records: Vector[Record] = Vector() | ||
val id: String = dID | ||
|
||
def addRecord(r: Record): Unit = { | ||
records = records :+ r | ||
} | ||
|
||
// record the latency for an operation | ||
def recordLatency(r: Record): Unit = { | ||
// if there is an existing record with the same label, we assume this record is the start point | ||
// of the operation and compute the operation latency according to the timestamp | ||
for (or <- records.reverse) { | ||
if (or.matchLabel(r)) { | ||
val latency = r.computeLatency(or) | ||
records = records.filterNot(_ == or) | ||
r.value.setValue(latency) | ||
records = records :+ r | ||
return | ||
} | ||
} | ||
|
||
records = records :+ r | ||
} | ||
|
||
def printRecordByLabel(l: Label): Unit = { | ||
for (r <- records) { | ||
if (r.containLabel(l)) { | ||
r.print(id) | ||
} | ||
} | ||
} | ||
|
||
def filterRecords(l: Label): Set[Record] = { | ||
var filterRecords: Set[Record] = Set() | ||
for (r <- records) { | ||
if (r.containLabel(l)) { | ||
filterRecords += r | ||
} | ||
} | ||
filterRecords | ||
} | ||
|
||
def getRecordsSize(): Int = { | ||
records.length | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
package org.grapheco.metrics | ||
|
||
object DomainObject { | ||
val domainID = "query" | ||
|
||
var domain: Domain = new Domain(domainID) | ||
|
||
def addRecord(r: Record): Unit = { | ||
domain.addRecord(r) | ||
} | ||
|
||
def recordLatency(labels: Set[String]): Unit = { | ||
val r = new Record(new Label(labels), new Value(0)) | ||
domain.recordLatency(r) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package org.grapheco.metrics | ||
|
||
class Label(val ls: Set[String]) { | ||
var labels: Set[String] = ls | ||
|
||
def matches(label: Label): Boolean = { | ||
if (labels.size != label.labels.size) { | ||
return false | ||
} | ||
contains(label) | ||
} | ||
|
||
def contains(label: Label): Boolean = { | ||
for (l <- label.labels) { | ||
if (!labels.contains(l)) { | ||
return false | ||
} | ||
} | ||
true | ||
} | ||
|
||
def addLabel(label: String): Unit = { | ||
labels += label | ||
} | ||
|
||
override def toString(): String = { | ||
labels.mkString(";") | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
package org.grapheco.metrics | ||
|
||
class Record(l: Label, v: Value) { | ||
var timestamp: Timestamp = new Timestamp() | ||
var label: Label = l | ||
var value: Value = v | ||
|
||
def matchLabel(r: Record): Boolean = { | ||
label.matches(r.label) | ||
} | ||
|
||
def containLabel(l: Label): Boolean = { | ||
label.contains(l) | ||
} | ||
|
||
def -(r: Record): Record = { | ||
val v = value - r.value | ||
if (v == null) { | ||
return null | ||
} | ||
value = v | ||
this | ||
} | ||
|
||
def computeLatency(r: Record): Long = { | ||
timestamp - r.timestamp | ||
} | ||
|
||
def print(dID: String): Unit = { | ||
printf("[%s][%s][%s]%s\n", dID, label.toString(), timestamp.toString(), value.toString()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
package org.grapheco.metrics | ||
|
||
import java.time.LocalDateTime | ||
import java.time.temporal.ChronoUnit._ | ||
|
||
class Timestamp { | ||
val timestamp = LocalDateTime.now() | ||
|
||
override def toString(): String = { | ||
timestamp.toString | ||
} | ||
|
||
def -(t: Timestamp): Long = { | ||
MILLIS.between(timestamp, t.timestamp) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
package org.grapheco.metrics | ||
|
||
class Value(v: Any) { | ||
var value: Any = v | ||
|
||
def -(other: Value): Value = { | ||
var succ: Boolean = false | ||
var newVal: Value = new Value() | ||
if (value.isInstanceOf[Int] && other.isInstanceOf[Int]) { | ||
newVal.setValue(value.asInstanceOf[Int] - other.asInstanceOf[Int]) | ||
succ = true | ||
} else if (value.isInstanceOf[Float] && other.isInstanceOf[Float]) { | ||
newVal.setValue(value.asInstanceOf[Float] - other.asInstanceOf[Float]) | ||
succ = true | ||
} | ||
|
||
if (succ) { | ||
return newVal | ||
} | ||
null | ||
} | ||
|
||
def setValue(v: Any): Unit = { | ||
value = v | ||
} | ||
|
||
override def toString(): String = { | ||
value.toString | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
package org.grapheco.metrics | ||
|
||
import org.junit.{Before, Test} | ||
|
||
class DomainTest { | ||
val domainID = "did" | ||
|
||
val str1 = "a" | ||
val str2 = "b" | ||
val str3 = "c" | ||
|
||
val set1: Set[String] = Set(str1) | ||
val set2: Set[String] = Set(str1, str2) | ||
val set3: Set[String] = Set(str1, str2) | ||
val set4: Set[String] = Set(str2, str3) | ||
val set5: Set[String] = Set(str3) | ||
|
||
val l1 = new Label(set1) | ||
val l2 = new Label(set2) | ||
val l3 = new Label(set3) | ||
val l4 = new Label(set4) | ||
val l5 = new Label(set5) | ||
|
||
val v1 = new Value(1) | ||
val v2 = new Value(2) | ||
val v3 = new Value(3) | ||
val v4 = new Value(4) | ||
val v5 = new Value(5) | ||
|
||
val r1 = new Record(l1, v1) | ||
val r2 = new Record(l2, v2) | ||
val r3 = new Record(l3, v3) | ||
val r4 = new Record(l4, v4) | ||
val r5 = new Record(l5, v5) | ||
|
||
var domain = new Domain(domainID) | ||
|
||
@Before | ||
def init(): Unit = {} | ||
|
||
@Test | ||
def testLabels(): Unit = { | ||
domain.addRecord(r1) | ||
domain.addRecord(r2) | ||
domain.addRecord(r3) | ||
domain.addRecord(r4) | ||
domain.addRecord(r5) | ||
|
||
val filterl1 = new Label(Set(str1)) | ||
val filterl2 = new Label(Set(str2)) | ||
val filterl3 = new Label(Set(str3)) | ||
|
||
val records1 = domain.filterRecords(filterl1) | ||
assert(records1.size == 3) | ||
|
||
val records2 = domain.filterRecords(filterl2) | ||
assert(records2.size == 3) | ||
|
||
val records3 = domain.filterRecords(filterl3) | ||
assert(records3.size == 2) | ||
|
||
assert(r2.containLabel(l1)) | ||
assert(r3.matchLabel(r2)) | ||
} | ||
|
||
@Test | ||
def testRecordLatency(): Unit = { | ||
domain.recordLatency(r2) | ||
assert(domain.getRecordsSize() == 1) | ||
|
||
domain.recordLatency(r3) | ||
assert(domain.getRecordsSize() == 1) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ package org.grapheco.lynx.operator | |
import org.grapheco.lynx.operator.utils.OperatorUtils | ||
import org.grapheco.lynx.types.LynxValue | ||
import org.grapheco.lynx.{ExecutionOperator, ExpressionContext, ExpressionEvaluator, LynxType, RowBatch} | ||
import org.grapheco.metrics.{Label, Record, Value, DomainObject} | ||
import org.opencypher.v9_0.ast.ReturnItem | ||
|
||
/** | ||
|
@@ -27,6 +28,8 @@ case class AggregationOperator( | |
var allGroupedData: Iterator[Array[Seq[LynxValue]]] = Iterator.empty | ||
var hasPulledData: Boolean = false | ||
|
||
val recordLabel = new Label(Set("Aggregation")) | ||
|
||
override def openImpl(): Unit = { | ||
in.open() | ||
schema = (aggregationExprs ++ groupingExprs).map(col => | ||
|
@@ -71,11 +74,18 @@ case class AggregationOperator( | |
hasPulledData = true | ||
} | ||
|
||
if (allGroupedData.nonEmpty) RowBatch(allGroupedData.next()) | ||
else RowBatch(Seq.empty) | ||
var rb: RowBatch = null | ||
if (allGroupedData.nonEmpty) { | ||
rb = RowBatch(allGroupedData.next()) | ||
} else { | ||
rb = RowBatch(Seq.empty) | ||
} | ||
rb | ||
Comment on lines
+77
to
+83
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we fixed it in the previous pr. Why is it here again? |
||
} | ||
|
||
override def closeImpl(): Unit = {} | ||
|
||
override def outputSchema(): Seq[(String, LynxType)] = schema | ||
|
||
override def getOperatorName(): String = "Aggregation" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method is not needed. we can get operator name in |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -117,6 +117,8 @@ case class CreateOperator( | |
|
||
override def outputSchema(): Seq[(String, LynxType)] = schema | ||
|
||
override def getOperatorName(): String = "Create" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same question as above |
||
|
||
private def nodeInputRef(varname: String, ctxMap: Map[String, LynxValue]): NodeInputRef = { | ||
ctxMap | ||
.get(varname) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package org.grapheco.lynx | ||
|
||
import org.grapheco.lynx.types.LynxValue | ||
import org.grapheco.metrics.DomainObject | ||
import org.opencypher.v9_0.expressions.Expression | ||
|
||
/** | ||
|
@@ -27,11 +28,17 @@ trait ExecutionOperator extends TreeNode { | |
|
||
// empty RowBatch means the end of output | ||
def getNext(): RowBatch = { | ||
getNextImpl() | ||
val opName = getOperatorName() | ||
DomainObject.recordLatency(Set[String](opName)) | ||
val rb = getNextImpl() | ||
DomainObject.recordLatency(Set[String](opName)) | ||
rb | ||
} | ||
// to be implemented by concrete operators | ||
def getNextImpl(): RowBatch | ||
|
||
def getOperatorName(): String | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can get different operator names by using: |
||
|
||
def close(): Unit = { | ||
closeImpl() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have junit, use it from parent pom.