Skip to content

Insert perf metrics into operator #315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions common-utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
Comment on lines +75 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have junit, use it from parent pom.

</dependencies>

<build>
Expand Down
51 changes: 51 additions & 0 deletions common-utils/src/main/scala/org/grapheco/metrics/Domain.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package org.grapheco.metrics

import util.control.Breaks

class Domain(dID: String) {
var records: Vector[Record] = Vector()
val id: String = dID

def addRecord(r: Record): Unit = {
records = records :+ r
}

// record the latency for an operation
def recordLatency(r: Record): Unit = {
// if there is an existing record with the same label, we assume this record is the start point
// of the operation and compute the operation latency according to the timestamp
for (or <- records.reverse) {
if (or.matchLabel(r)) {
val latency = r.computeLatency(or)
records = records.filterNot(_ == or)
r.value.setValue(latency)
records = records :+ r
return
}
}

records = records :+ r
}

def printRecordByLabel(l: Label): Unit = {
for (r <- records) {
if (r.containLabel(l)) {
r.print(id)
}
}
}

def filterRecords(l: Label): Set[Record] = {
var filterRecords: Set[Record] = Set()
for (r <- records) {
if (r.containLabel(l)) {
filterRecords += r
}
}
filterRecords
}

def getRecordsSize(): Int = {
records.length
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.grapheco.metrics

object DomainObject {
val domainID = "query"

var domain: Domain = new Domain(domainID)

def addRecord(r: Record): Unit = {
domain.addRecord(r)
}

def recordLatency(labels: Set[String]): Unit = {
val r = new Record(new Label(labels), new Value(0))
domain.recordLatency(r)
}
}
29 changes: 29 additions & 0 deletions common-utils/src/main/scala/org/grapheco/metrics/Label.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.grapheco.metrics

class Label(val ls: Set[String]) {
var labels: Set[String] = ls

def matches(label: Label): Boolean = {
if (labels.size != label.labels.size) {
return false
}
contains(label)
}

def contains(label: Label): Boolean = {
for (l <- label.labels) {
if (!labels.contains(l)) {
return false
}
}
true
}

def addLabel(label: String): Unit = {
labels += label
}

override def toString(): String = {
labels.mkString(";")
}
}
32 changes: 32 additions & 0 deletions common-utils/src/main/scala/org/grapheco/metrics/Record.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.grapheco.metrics

class Record(l: Label, v: Value) {
var timestamp: Timestamp = new Timestamp()
var label: Label = l
var value: Value = v

def matchLabel(r: Record): Boolean = {
label.matches(r.label)
}

def containLabel(l: Label): Boolean = {
label.contains(l)
}

def -(r: Record): Record = {
val v = value - r.value
if (v == null) {
return null
}
value = v
this
}

def computeLatency(r: Record): Long = {
timestamp - r.timestamp
}

def print(dID: String): Unit = {
printf("[%s][%s][%s]%s\n", dID, label.toString(), timestamp.toString(), value.toString())
}
}
16 changes: 16 additions & 0 deletions common-utils/src/main/scala/org/grapheco/metrics/Timestamp.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.grapheco.metrics

import java.time.LocalDateTime
import java.time.temporal.ChronoUnit._

class Timestamp {
val timestamp = LocalDateTime.now()

override def toString(): String = {
timestamp.toString
}

def -(t: Timestamp): Long = {
MILLIS.between(timestamp, t.timestamp)
}
}
30 changes: 30 additions & 0 deletions common-utils/src/main/scala/org/grapheco/metrics/Value.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.grapheco.metrics

class Value(v: Any) {
var value: Any = v

def -(other: Value): Value = {
var succ: Boolean = false
var newVal: Value = new Value()
if (value.isInstanceOf[Int] && other.isInstanceOf[Int]) {
newVal.setValue(value.asInstanceOf[Int] - other.asInstanceOf[Int])
succ = true
} else if (value.isInstanceOf[Float] && other.isInstanceOf[Float]) {
newVal.setValue(value.asInstanceOf[Float] - other.asInstanceOf[Float])
succ = true
}

if (succ) {
return newVal
}
null
}

def setValue(v: Any): Unit = {
value = v
}

override def toString(): String = {
value.toString
}
}
74 changes: 74 additions & 0 deletions common-utils/src/test/scala/org/grapheco/metrics/DomainTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package org.grapheco.metrics

import org.junit.{Before, Test}

class DomainTest {
val domainID = "did"

val str1 = "a"
val str2 = "b"
val str3 = "c"

val set1: Set[String] = Set(str1)
val set2: Set[String] = Set(str1, str2)
val set3: Set[String] = Set(str1, str2)
val set4: Set[String] = Set(str2, str3)
val set5: Set[String] = Set(str3)

val l1 = new Label(set1)
val l2 = new Label(set2)
val l3 = new Label(set3)
val l4 = new Label(set4)
val l5 = new Label(set5)

val v1 = new Value(1)
val v2 = new Value(2)
val v3 = new Value(3)
val v4 = new Value(4)
val v5 = new Value(5)

val r1 = new Record(l1, v1)
val r2 = new Record(l2, v2)
val r3 = new Record(l3, v3)
val r4 = new Record(l4, v4)
val r5 = new Record(l5, v5)

var domain = new Domain(domainID)

@Before
def init(): Unit = {}

@Test
def testLabels(): Unit = {
domain.addRecord(r1)
domain.addRecord(r2)
domain.addRecord(r3)
domain.addRecord(r4)
domain.addRecord(r5)

val filterl1 = new Label(Set(str1))
val filterl2 = new Label(Set(str2))
val filterl3 = new Label(Set(str3))

val records1 = domain.filterRecords(filterl1)
assert(records1.size == 3)

val records2 = domain.filterRecords(filterl2)
assert(records2.size == 3)

val records3 = domain.filterRecords(filterl3)
assert(records3.size == 2)

assert(r2.containLabel(l1))
assert(r3.matchLabel(r2))
}

@Test
def testRecordLatency(): Unit = {
domain.recordLatency(r2)
assert(domain.getRecordsSize() == 1)

domain.recordLatency(r3)
assert(domain.getRecordsSize() == 1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package org.grapheco.lynx.operator
import org.grapheco.lynx.operator.utils.OperatorUtils
import org.grapheco.lynx.types.LynxValue
import org.grapheco.lynx.{ExecutionOperator, ExpressionContext, ExpressionEvaluator, LynxType, RowBatch}
import org.grapheco.metrics.{Label, Record, Value, DomainObject}
import org.opencypher.v9_0.ast.ReturnItem

/**
Expand All @@ -27,6 +28,8 @@ case class AggregationOperator(
var allGroupedData: Iterator[Array[Seq[LynxValue]]] = Iterator.empty
var hasPulledData: Boolean = false

val recordLabel = new Label(Set("Aggregation"))

override def openImpl(): Unit = {
in.open()
schema = (aggregationExprs ++ groupingExprs).map(col =>
Expand Down Expand Up @@ -71,11 +74,18 @@ case class AggregationOperator(
hasPulledData = true
}

if (allGroupedData.nonEmpty) RowBatch(allGroupedData.next())
else RowBatch(Seq.empty)
var rb: RowBatch = null
if (allGroupedData.nonEmpty) {
rb = RowBatch(allGroupedData.next())
} else {
rb = RowBatch(Seq.empty)
}
rb
Comment on lines +77 to +83
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we fixed it in the previous pr. Why is it here again?

}

override def closeImpl(): Unit = {}

override def outputSchema(): Seq[(String, LynxType)] = schema

override def getOperatorName(): String = "Aggregation"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not needed. we can get operator name in ExecutionOperator

}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ case class CreateOperator(

override def outputSchema(): Seq[(String, LynxType)] = schema

override def getOperatorName(): String = "Create"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above


private def nodeInputRef(varname: String, ctxMap: Map[String, LynxValue]): NodeInputRef = {
ctxMap
.get(varname)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.grapheco.lynx

import org.grapheco.lynx.types.LynxValue
import org.grapheco.metrics.DomainObject
import org.opencypher.v9_0.expressions.Expression

/**
Expand All @@ -27,11 +28,17 @@ trait ExecutionOperator extends TreeNode {

// empty RowBatch means the end of output
def getNext(): RowBatch = {
getNextImpl()
val opName = getOperatorName()
DomainObject.recordLatency(Set[String](opName))
val rb = getNextImpl()
DomainObject.recordLatency(Set[String](opName))
rb
}
// to be implemented by concrete operators
def getNextImpl(): RowBatch

def getOperatorName(): String
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get different operator names by using: getClass.getName. It will automatically determine at runtime.


def close(): Unit = {
closeImpl()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,7 @@ case class FilterOperator(

override def closeImpl(): Unit = {}

override def getOperatorName(): String = "Filter"

override def outputSchema(): Seq[(String, LynxType)] = in.outputSchema()
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ case class LimitOperator(
override def closeImpl(): Unit = {}

override def outputSchema(): Seq[(String, LynxType)] = in.outputSchema()

override def getOperatorName(): String = "Limit"
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,5 @@ case class NodeScanOperator(

override def outputSchema(): Seq[(String, LynxType)] = schema

override def getOperatorName(): String = "NodeScan"
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ case class OrderByOperator(

override def outputSchema(): Seq[(String, LynxType)] = in.outputSchema()

override def getOperatorName(): String = "OrderBy"

private def sortByItem(
a: Seq[LynxValue],
b: Seq[LynxValue],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,6 @@ case class PathScanOperator(
override def closeImpl(): Unit = {}

override def outputSchema(): Seq[(String, LynxType)] = schema

override def getOperatorName(): String = "PathScan"
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ case class ProjectOperator(
override def closeImpl(): Unit = {}

override def outputSchema(): Seq[(String, LynxType)] = projectSchema

override def getOperatorName(): String = "Project"
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ case class SelectOperator(
override def closeImpl(): Unit = {}

override def outputSchema(): Seq[(String, LynxType)] = outPutSchema

override def getOperatorName(): String = "Select"
}
Loading