diff --git a/common-utils/pom.xml b/common-utils/pom.xml
index fc7839ae..95029c1b 100644
--- a/common-utils/pom.xml
+++ b/common-utils/pom.xml
@@ -72,6 +72,18 @@
org.apache.commons
commons-lang3
+
+ junit
+ junit
+ 4.13.2
+ test
+
+
+ org.junit.vintage
+ junit-vintage-engine
+
+
+
diff --git a/common-utils/src/main/scala/org/grapheco/metrics/Domain.scala b/common-utils/src/main/scala/org/grapheco/metrics/Domain.scala
new file mode 100644
index 00000000..2b31cbc1
--- /dev/null
+++ b/common-utils/src/main/scala/org/grapheco/metrics/Domain.scala
@@ -0,0 +1,51 @@
+package org.grapheco.metrics
+
+import util.control.Breaks
+
+class Domain(dID: String) {
+ var records: Vector[Record] = Vector()
+ val id: String = dID
+
+ def addRecord(r: Record): Unit = {
+ records = records :+ r
+ }
+
+ // record the latency for an operation
+ def recordLatency(r: Record): Unit = {
+ // if there is an existing record with the same label, we assume this record is the start point
+ // of the operation and compute the operation latency according to the timestamp
+ for (or <- records.reverse) {
+ if (or.matchLabel(r)) {
+ val latency = r.computeLatency(or)
+ records = records.filterNot(_ == or)
+ r.value.setValue(latency)
+ records = records :+ r
+ return
+ }
+ }
+
+ records = records :+ r
+ }
+
+ def printRecordByLabel(l: Label): Unit = {
+ for (r <- records) {
+ if (r.containLabel(l)) {
+ r.print(id)
+ }
+ }
+ }
+
+ def filterRecords(l: Label): Set[Record] = {
+ var filterRecords: Set[Record] = Set()
+ for (r <- records) {
+ if (r.containLabel(l)) {
+ filterRecords += r
+ }
+ }
+ filterRecords
+ }
+
+ def getRecordsSize(): Int = {
+ records.length
+ }
+}
diff --git a/common-utils/src/main/scala/org/grapheco/metrics/DomainObject.scala b/common-utils/src/main/scala/org/grapheco/metrics/DomainObject.scala
new file mode 100644
index 00000000..894cc64b
--- /dev/null
+++ b/common-utils/src/main/scala/org/grapheco/metrics/DomainObject.scala
@@ -0,0 +1,16 @@
+package org.grapheco.metrics
+
+object DomainObject {
+ val domainID = "query"
+
+ var domain: Domain = new Domain(domainID)
+
+ def addRecord(r: Record): Unit = {
+ domain.addRecord(r)
+ }
+
+ def recordLatency(labels: Set[String]): Unit = {
+ val r = new Record(new Label(labels), new Value(0))
+ domain.recordLatency(r)
+ }
+}
diff --git a/common-utils/src/main/scala/org/grapheco/metrics/Label.scala b/common-utils/src/main/scala/org/grapheco/metrics/Label.scala
new file mode 100644
index 00000000..f56eefbc
--- /dev/null
+++ b/common-utils/src/main/scala/org/grapheco/metrics/Label.scala
@@ -0,0 +1,29 @@
+package org.grapheco.metrics
+
+class Label(val ls: Set[String]) {
+ var labels: Set[String] = ls
+
+ def matches(label: Label): Boolean = {
+ if (labels.size != label.labels.size) {
+ return false
+ }
+ contains(label)
+ }
+
+ def contains(label: Label): Boolean = {
+ for (l <- label.labels) {
+ if (!labels.contains(l)) {
+ return false
+ }
+ }
+ true
+ }
+
+ def addLabel(label: String): Unit = {
+ labels += label
+ }
+
+ override def toString(): String = {
+ labels.mkString(";")
+ }
+}
diff --git a/common-utils/src/main/scala/org/grapheco/metrics/Record.scala b/common-utils/src/main/scala/org/grapheco/metrics/Record.scala
new file mode 100644
index 00000000..afaf8cff
--- /dev/null
+++ b/common-utils/src/main/scala/org/grapheco/metrics/Record.scala
@@ -0,0 +1,32 @@
+package org.grapheco.metrics
+
+class Record(l: Label, v: Value) {
+ var timestamp: Timestamp = new Timestamp()
+ var label: Label = l
+ var value: Value = v
+
+ def matchLabel(r: Record): Boolean = {
+ label.matches(r.label)
+ }
+
+ def containLabel(l: Label): Boolean = {
+ label.contains(l)
+ }
+
+ def -(r: Record): Record = {
+ val v = value - r.value
+ if (v == null) {
+ return null
+ }
+ value = v
+ this
+ }
+
+ def computeLatency(r: Record): Long = {
+ timestamp - r.timestamp
+ }
+
+ def print(dID: String): Unit = {
+ printf("[%s][%s][%s]%s\n", dID, label.toString(), timestamp.toString(), value.toString())
+ }
+}
diff --git a/common-utils/src/main/scala/org/grapheco/metrics/Timestamp.scala b/common-utils/src/main/scala/org/grapheco/metrics/Timestamp.scala
new file mode 100644
index 00000000..56042834
--- /dev/null
+++ b/common-utils/src/main/scala/org/grapheco/metrics/Timestamp.scala
@@ -0,0 +1,16 @@
+package org.grapheco.metrics
+
+import java.time.LocalDateTime
+import java.time.temporal.ChronoUnit._
+
+class Timestamp {
+ val timestamp = LocalDateTime.now()
+
+ override def toString(): String = {
+ timestamp.toString
+ }
+
+ def -(t: Timestamp): Long = {
+ MILLIS.between(timestamp, t.timestamp)
+ }
+}
diff --git a/common-utils/src/main/scala/org/grapheco/metrics/Value.scala b/common-utils/src/main/scala/org/grapheco/metrics/Value.scala
new file mode 100644
index 00000000..f69d6a66
--- /dev/null
+++ b/common-utils/src/main/scala/org/grapheco/metrics/Value.scala
@@ -0,0 +1,30 @@
+package org.grapheco.metrics
+
+class Value(v: Any) {
+ var value: Any = v
+
+ def -(other: Value): Value = {
+ var succ: Boolean = false
+ var newVal: Value = new Value()
+ if (value.isInstanceOf[Int] && other.isInstanceOf[Int]) {
+ newVal.setValue(value.asInstanceOf[Int] - other.asInstanceOf[Int])
+ succ = true
+ } else if (value.isInstanceOf[Float] && other.isInstanceOf[Float]) {
+ newVal.setValue(value.asInstanceOf[Float] - other.asInstanceOf[Float])
+ succ = true
+ }
+
+ if (succ) {
+ return newVal
+ }
+ null
+ }
+
+ def setValue(v: Any): Unit = {
+ value = v
+ }
+
+ override def toString(): String = {
+ value.toString
+ }
+}
diff --git a/common-utils/src/test/scala/org/grapheco/metrics/DomainTest.scala b/common-utils/src/test/scala/org/grapheco/metrics/DomainTest.scala
new file mode 100644
index 00000000..9a7155c1
--- /dev/null
+++ b/common-utils/src/test/scala/org/grapheco/metrics/DomainTest.scala
@@ -0,0 +1,74 @@
+package org.grapheco.metrics
+
+import org.junit.{Before, Test}
+
+class DomainTest {
+ val domainID = "did"
+
+ val str1 = "a"
+ val str2 = "b"
+ val str3 = "c"
+
+ val set1: Set[String] = Set(str1)
+ val set2: Set[String] = Set(str1, str2)
+ val set3: Set[String] = Set(str1, str2)
+ val set4: Set[String] = Set(str2, str3)
+ val set5: Set[String] = Set(str3)
+
+ val l1 = new Label(set1)
+ val l2 = new Label(set2)
+ val l3 = new Label(set3)
+ val l4 = new Label(set4)
+ val l5 = new Label(set5)
+
+ val v1 = new Value(1)
+ val v2 = new Value(2)
+ val v3 = new Value(3)
+ val v4 = new Value(4)
+ val v5 = new Value(5)
+
+ val r1 = new Record(l1, v1)
+ val r2 = new Record(l2, v2)
+ val r3 = new Record(l3, v3)
+ val r4 = new Record(l4, v4)
+ val r5 = new Record(l5, v5)
+
+ var domain = new Domain(domainID)
+
+ @Before
+ def init(): Unit = {}
+
+ @Test
+ def testLabels(): Unit = {
+ domain.addRecord(r1)
+ domain.addRecord(r2)
+ domain.addRecord(r3)
+ domain.addRecord(r4)
+ domain.addRecord(r5)
+
+ val filterl1 = new Label(Set(str1))
+ val filterl2 = new Label(Set(str2))
+ val filterl3 = new Label(Set(str3))
+
+ val records1 = domain.filterRecords(filterl1)
+ assert(records1.size == 3)
+
+ val records2 = domain.filterRecords(filterl2)
+ assert(records2.size == 3)
+
+ val records3 = domain.filterRecords(filterl3)
+ assert(records3.size == 2)
+
+ assert(r2.containLabel(l1))
+ assert(r3.matchLabel(r2))
+ }
+
+ @Test
+ def testRecordLatency(): Unit = {
+ domain.recordLatency(r2)
+ assert(domain.getRecordsSize() == 1)
+
+ domain.recordLatency(r3)
+ assert(domain.getRecordsSize() == 1)
+ }
+}
diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/AggregationOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/AggregationOperator.scala
index ea4ac061..3efdcd49 100644
--- a/lynx/src/main/scala/org/grapheco/lynx/operator/AggregationOperator.scala
+++ b/lynx/src/main/scala/org/grapheco/lynx/operator/AggregationOperator.scala
@@ -3,6 +3,7 @@ package org.grapheco.lynx.operator
import org.grapheco.lynx.operator.utils.OperatorUtils
import org.grapheco.lynx.types.LynxValue
import org.grapheco.lynx.{ExecutionOperator, ExpressionContext, ExpressionEvaluator, LynxType, RowBatch}
+import org.grapheco.metrics.{Label, Record, Value, DomainObject}
import org.opencypher.v9_0.ast.ReturnItem
/**
@@ -27,6 +28,8 @@ case class AggregationOperator(
var allGroupedData: Iterator[Array[Seq[LynxValue]]] = Iterator.empty
var hasPulledData: Boolean = false
+ val recordLabel = new Label(Set("Aggregation"))
+
override def openImpl(): Unit = {
in.open()
schema = (aggregationExprs ++ groupingExprs).map(col =>
@@ -71,11 +74,18 @@ case class AggregationOperator(
hasPulledData = true
}
- if (allGroupedData.nonEmpty) RowBatch(allGroupedData.next())
- else RowBatch(Seq.empty)
+ var rb: RowBatch = null
+ if (allGroupedData.nonEmpty) {
+ rb = RowBatch(allGroupedData.next())
+ } else {
+ rb = RowBatch(Seq.empty)
+ }
+ rb
}
override def closeImpl(): Unit = {}
override def outputSchema(): Seq[(String, LynxType)] = schema
+
+ override def getOperatorName(): String = "Aggregation"
}
diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/CreateOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/CreateOperator.scala
index bbbdf6df..2f3ef0b8 100644
--- a/lynx/src/main/scala/org/grapheco/lynx/operator/CreateOperator.scala
+++ b/lynx/src/main/scala/org/grapheco/lynx/operator/CreateOperator.scala
@@ -117,6 +117,8 @@ case class CreateOperator(
override def outputSchema(): Seq[(String, LynxType)] = schema
+ override def getOperatorName(): String = "Create"
+
private def nodeInputRef(varname: String, ctxMap: Map[String, LynxValue]): NodeInputRef = {
ctxMap
.get(varname)
diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/ExecutionOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/ExecutionOperator.scala
index baad38ff..1541f461 100644
--- a/lynx/src/main/scala/org/grapheco/lynx/operator/ExecutionOperator.scala
+++ b/lynx/src/main/scala/org/grapheco/lynx/operator/ExecutionOperator.scala
@@ -1,6 +1,7 @@
package org.grapheco.lynx
import org.grapheco.lynx.types.LynxValue
+import org.grapheco.metrics.DomainObject
import org.opencypher.v9_0.expressions.Expression
/**
@@ -27,11 +28,17 @@ trait ExecutionOperator extends TreeNode {
// empty RowBatch means the end of output
def getNext(): RowBatch = {
- getNextImpl()
+ val opName = getOperatorName()
+ DomainObject.recordLatency(Set[String](opName))
+ val rb = getNextImpl()
+ DomainObject.recordLatency(Set[String](opName))
+ rb
}
// to be implemented by concrete operators
def getNextImpl(): RowBatch
+ def getOperatorName(): String
+
def close(): Unit = {
closeImpl()
}
diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/FilterOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/FilterOperator.scala
index 7246014a..3e35854a 100644
--- a/lynx/src/main/scala/org/grapheco/lynx/operator/FilterOperator.scala
+++ b/lynx/src/main/scala/org/grapheco/lynx/operator/FilterOperator.scala
@@ -54,5 +54,7 @@ case class FilterOperator(
override def closeImpl(): Unit = {}
+ override def getOperatorName(): String = "Filter"
+
override def outputSchema(): Seq[(String, LynxType)] = in.outputSchema()
}
diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/LimitOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/LimitOperator.scala
index 4506f3a5..207edb9c 100644
--- a/lynx/src/main/scala/org/grapheco/lynx/operator/LimitOperator.scala
+++ b/lynx/src/main/scala/org/grapheco/lynx/operator/LimitOperator.scala
@@ -51,4 +51,6 @@ case class LimitOperator(
override def closeImpl(): Unit = {}
override def outputSchema(): Seq[(String, LynxType)] = in.outputSchema()
+
+ override def getOperatorName(): String = "Limit"
}
diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/NodeScanOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/NodeScanOperator.scala
index 281efdb7..15991c61 100644
--- a/lynx/src/main/scala/org/grapheco/lynx/operator/NodeScanOperator.scala
+++ b/lynx/src/main/scala/org/grapheco/lynx/operator/NodeScanOperator.scala
@@ -67,4 +67,5 @@ case class NodeScanOperator(
override def outputSchema(): Seq[(String, LynxType)] = schema
+ override def getOperatorName(): String = "NodeScan"
}
diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/OrderByOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/OrderByOperator.scala
index 78283c89..7f7f0ba0 100644
--- a/lynx/src/main/scala/org/grapheco/lynx/operator/OrderByOperator.scala
+++ b/lynx/src/main/scala/org/grapheco/lynx/operator/OrderByOperator.scala
@@ -51,6 +51,8 @@ case class OrderByOperator(
override def outputSchema(): Seq[(String, LynxType)] = in.outputSchema()
+ override def getOperatorName(): String = "OrderBy"
+
private def sortByItem(
a: Seq[LynxValue],
b: Seq[LynxValue],
diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/PathScanOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/PathScanOperator.scala
index 880c5d38..c7a64c98 100644
--- a/lynx/src/main/scala/org/grapheco/lynx/operator/PathScanOperator.scala
+++ b/lynx/src/main/scala/org/grapheco/lynx/operator/PathScanOperator.scala
@@ -159,4 +159,6 @@ case class PathScanOperator(
override def closeImpl(): Unit = {}
override def outputSchema(): Seq[(String, LynxType)] = schema
+
+ override def getOperatorName(): String = "PathScan"
}
diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/ProjectOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/ProjectOperator.scala
index a4981b3a..43f741fd 100644
--- a/lynx/src/main/scala/org/grapheco/lynx/operator/ProjectOperator.scala
+++ b/lynx/src/main/scala/org/grapheco/lynx/operator/ProjectOperator.scala
@@ -42,4 +42,6 @@ case class ProjectOperator(
override def closeImpl(): Unit = {}
override def outputSchema(): Seq[(String, LynxType)] = projectSchema
+
+ override def getOperatorName(): String = "Project"
}
diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/SelectOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/SelectOperator.scala
index 849906a7..cfe42158 100644
--- a/lynx/src/main/scala/org/grapheco/lynx/operator/SelectOperator.scala
+++ b/lynx/src/main/scala/org/grapheco/lynx/operator/SelectOperator.scala
@@ -42,4 +42,6 @@ case class SelectOperator(
override def closeImpl(): Unit = {}
override def outputSchema(): Seq[(String, LynxType)] = outPutSchema
+
+ override def getOperatorName(): String = "Select"
}
diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/SkipOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/SkipOperator.scala
index 6619fc59..e9ae3e9e 100644
--- a/lynx/src/main/scala/org/grapheco/lynx/operator/SkipOperator.scala
+++ b/lynx/src/main/scala/org/grapheco/lynx/operator/SkipOperator.scala
@@ -45,4 +45,6 @@ case class SkipOperator(
override def closeImpl(): Unit = {}
override def outputSchema(): Seq[(String, LynxType)] = in.outputSchema()
+
+ override def getOperatorName(): String = "Skip"
}
diff --git a/lynx/src/main/scala/org/grapheco/lynx/runner.scala b/lynx/src/main/scala/org/grapheco/lynx/runner.scala
index d0944897..d4e87cfc 100644
--- a/lynx/src/main/scala/org/grapheco/lynx/runner.scala
+++ b/lynx/src/main/scala/org/grapheco/lynx/runner.scala
@@ -10,6 +10,7 @@ import org.opencypher.v9_0.ast.Statement
import org.opencypher.v9_0.ast.semantics.SemanticState
import org.opencypher.v9_0.expressions.{LabelName, PropertyKeyName, Range, SemanticDirection}
import org.opencypher.v9_0.expressions.SemanticDirection.{BOTH, INCOMING, OUTGOING}
+import org.grapheco.metrics.DomainObject
import scala.annotation.tailrec
import scala.collection.mutable
@@ -54,24 +55,44 @@ class CypherRunner(graphModel: GraphModel) extends LazyLogging {
queryParser.parse(query)
def run(query: String, param: Map[String, Any]): LynxResult = {
+ val queryLabel = query
+
+ DomainObject.recordLatency(Set(queryLabel))
+
+ val parserLabel = "parser"
+ DomainObject.recordLatency(Set(queryLabel, parserLabel))
val (statement, param2, state) = queryParser.parse(query)
logger.debug(s"AST tree: ${statement}")
+ DomainObject.recordLatency(Set(queryLabel, parserLabel))
+ val logicalPlanLabel = "logical-plan"
val logicalPlannerContext = LogicalPlannerContext(param ++ param2, runnerContext)
+ DomainObject.recordLatency(Set(queryLabel, logicalPlanLabel))
val logicalPlan = logicalPlanner.plan(statement, logicalPlannerContext)
logger.debug(s"logical plan: \r\n${logicalPlan.pretty}")
+ DomainObject.recordLatency(Set(queryLabel, logicalPlanLabel))
+ val physicalPlanLabel = "physical-plan"
val physicalPlannerContext = PhysicalPlannerContext(param ++ param2, runnerContext)
+ DomainObject.recordLatency(Set(queryLabel, physicalPlanLabel))
val physicalPlan = physicalPlanner.plan(logicalPlan)(physicalPlannerContext)
logger.debug(s"physical plan: \r\n${physicalPlan.pretty}")
+ DomainObject.recordLatency(Set(queryLabel, physicalPlanLabel))
+ val optimizerLabel = "optimizer"
+ DomainObject.recordLatency(Set(queryLabel, optimizerLabel))
val optimizedPhysicalPlan = physicalPlanOptimizer.optimize(physicalPlan, physicalPlannerContext)
logger.debug(s"optimized physical plan: \r\n${optimizedPhysicalPlan.pretty}")
+ DomainObject.recordLatency(Set(queryLabel, optimizerLabel))
+ val executeLabel = "execute"
val ctx = ExecutionContext(physicalPlannerContext, statement, param ++ param2)
+ DomainObject.recordLatency(Set(queryLabel, executeLabel))
val df = optimizedPhysicalPlan.execute(ctx)
graphModel.write.commit
+ DomainObject.recordLatency(Set(queryLabel, executeLabel))
+ DomainObject.recordLatency(Set(queryLabel))
new LynxResult() with PlanAware {
val schema = df.schema
val columnNames = schema.map(_._1)