diff --git a/common-utils/pom.xml b/common-utils/pom.xml index fc7839ae..95029c1b 100644 --- a/common-utils/pom.xml +++ b/common-utils/pom.xml @@ -72,6 +72,18 @@ org.apache.commons commons-lang3 + + junit + junit + 4.13.2 + test + + + org.junit.vintage + junit-vintage-engine + + + diff --git a/common-utils/src/main/scala/org/grapheco/metrics/Domain.scala b/common-utils/src/main/scala/org/grapheco/metrics/Domain.scala new file mode 100644 index 00000000..2b31cbc1 --- /dev/null +++ b/common-utils/src/main/scala/org/grapheco/metrics/Domain.scala @@ -0,0 +1,51 @@ +package org.grapheco.metrics + +import util.control.Breaks + +class Domain(dID: String) { + var records: Vector[Record] = Vector() + val id: String = dID + + def addRecord(r: Record): Unit = { + records = records :+ r + } + + // record the latency for an operation + def recordLatency(r: Record): Unit = { + // if there is an existing record with the same label, we assume this record is the start point + // of the operation and compute the operation latency according to the timestamp + for (or <- records.reverse) { + if (or.matchLabel(r)) { + val latency = r.computeLatency(or) + records = records.filterNot(_ == or) + r.value.setValue(latency) + records = records :+ r + return + } + } + + records = records :+ r + } + + def printRecordByLabel(l: Label): Unit = { + for (r <- records) { + if (r.containLabel(l)) { + r.print(id) + } + } + } + + def filterRecords(l: Label): Set[Record] = { + var filterRecords: Set[Record] = Set() + for (r <- records) { + if (r.containLabel(l)) { + filterRecords += r + } + } + filterRecords + } + + def getRecordsSize(): Int = { + records.length + } +} diff --git a/common-utils/src/main/scala/org/grapheco/metrics/DomainObject.scala b/common-utils/src/main/scala/org/grapheco/metrics/DomainObject.scala new file mode 100644 index 00000000..894cc64b --- /dev/null +++ b/common-utils/src/main/scala/org/grapheco/metrics/DomainObject.scala @@ -0,0 +1,16 @@ +package org.grapheco.metrics + +object DomainObject { + val domainID = "query" + + var domain: Domain = new Domain(domainID) + + def addRecord(r: Record): Unit = { + domain.addRecord(r) + } + + def recordLatency(labels: Set[String]): Unit = { + val r = new Record(new Label(labels), new Value(0)) + domain.recordLatency(r) + } +} diff --git a/common-utils/src/main/scala/org/grapheco/metrics/Label.scala b/common-utils/src/main/scala/org/grapheco/metrics/Label.scala new file mode 100644 index 00000000..f56eefbc --- /dev/null +++ b/common-utils/src/main/scala/org/grapheco/metrics/Label.scala @@ -0,0 +1,29 @@ +package org.grapheco.metrics + +class Label(val ls: Set[String]) { + var labels: Set[String] = ls + + def matches(label: Label): Boolean = { + if (labels.size != label.labels.size) { + return false + } + contains(label) + } + + def contains(label: Label): Boolean = { + for (l <- label.labels) { + if (!labels.contains(l)) { + return false + } + } + true + } + + def addLabel(label: String): Unit = { + labels += label + } + + override def toString(): String = { + labels.mkString(";") + } +} diff --git a/common-utils/src/main/scala/org/grapheco/metrics/Record.scala b/common-utils/src/main/scala/org/grapheco/metrics/Record.scala new file mode 100644 index 00000000..afaf8cff --- /dev/null +++ b/common-utils/src/main/scala/org/grapheco/metrics/Record.scala @@ -0,0 +1,32 @@ +package org.grapheco.metrics + +class Record(l: Label, v: Value) { + var timestamp: Timestamp = new Timestamp() + var label: Label = l + var value: Value = v + + def matchLabel(r: Record): Boolean = { + label.matches(r.label) + } + + def containLabel(l: Label): Boolean = { + label.contains(l) + } + + def -(r: Record): Record = { + val v = value - r.value + if (v == null) { + return null + } + value = v + this + } + + def computeLatency(r: Record): Long = { + timestamp - r.timestamp + } + + def print(dID: String): Unit = { + printf("[%s][%s][%s]%s\n", dID, label.toString(), timestamp.toString(), value.toString()) + } +} diff --git a/common-utils/src/main/scala/org/grapheco/metrics/Timestamp.scala b/common-utils/src/main/scala/org/grapheco/metrics/Timestamp.scala new file mode 100644 index 00000000..56042834 --- /dev/null +++ b/common-utils/src/main/scala/org/grapheco/metrics/Timestamp.scala @@ -0,0 +1,16 @@ +package org.grapheco.metrics + +import java.time.LocalDateTime +import java.time.temporal.ChronoUnit._ + +class Timestamp { + val timestamp = LocalDateTime.now() + + override def toString(): String = { + timestamp.toString + } + + def -(t: Timestamp): Long = { + MILLIS.between(timestamp, t.timestamp) + } +} diff --git a/common-utils/src/main/scala/org/grapheco/metrics/Value.scala b/common-utils/src/main/scala/org/grapheco/metrics/Value.scala new file mode 100644 index 00000000..f69d6a66 --- /dev/null +++ b/common-utils/src/main/scala/org/grapheco/metrics/Value.scala @@ -0,0 +1,30 @@ +package org.grapheco.metrics + +class Value(v: Any) { + var value: Any = v + + def -(other: Value): Value = { + var succ: Boolean = false + var newVal: Value = new Value() + if (value.isInstanceOf[Int] && other.isInstanceOf[Int]) { + newVal.setValue(value.asInstanceOf[Int] - other.asInstanceOf[Int]) + succ = true + } else if (value.isInstanceOf[Float] && other.isInstanceOf[Float]) { + newVal.setValue(value.asInstanceOf[Float] - other.asInstanceOf[Float]) + succ = true + } + + if (succ) { + return newVal + } + null + } + + def setValue(v: Any): Unit = { + value = v + } + + override def toString(): String = { + value.toString + } +} diff --git a/common-utils/src/test/scala/org/grapheco/metrics/DomainTest.scala b/common-utils/src/test/scala/org/grapheco/metrics/DomainTest.scala new file mode 100644 index 00000000..9a7155c1 --- /dev/null +++ b/common-utils/src/test/scala/org/grapheco/metrics/DomainTest.scala @@ -0,0 +1,74 @@ +package org.grapheco.metrics + +import org.junit.{Before, Test} + +class DomainTest { + val domainID = "did" + + val str1 = "a" + val str2 = "b" + val str3 = "c" + + val set1: Set[String] = Set(str1) + val set2: Set[String] = Set(str1, str2) + val set3: Set[String] = Set(str1, str2) + val set4: Set[String] = Set(str2, str3) + val set5: Set[String] = Set(str3) + + val l1 = new Label(set1) + val l2 = new Label(set2) + val l3 = new Label(set3) + val l4 = new Label(set4) + val l5 = new Label(set5) + + val v1 = new Value(1) + val v2 = new Value(2) + val v3 = new Value(3) + val v4 = new Value(4) + val v5 = new Value(5) + + val r1 = new Record(l1, v1) + val r2 = new Record(l2, v2) + val r3 = new Record(l3, v3) + val r4 = new Record(l4, v4) + val r5 = new Record(l5, v5) + + var domain = new Domain(domainID) + + @Before + def init(): Unit = {} + + @Test + def testLabels(): Unit = { + domain.addRecord(r1) + domain.addRecord(r2) + domain.addRecord(r3) + domain.addRecord(r4) + domain.addRecord(r5) + + val filterl1 = new Label(Set(str1)) + val filterl2 = new Label(Set(str2)) + val filterl3 = new Label(Set(str3)) + + val records1 = domain.filterRecords(filterl1) + assert(records1.size == 3) + + val records2 = domain.filterRecords(filterl2) + assert(records2.size == 3) + + val records3 = domain.filterRecords(filterl3) + assert(records3.size == 2) + + assert(r2.containLabel(l1)) + assert(r3.matchLabel(r2)) + } + + @Test + def testRecordLatency(): Unit = { + domain.recordLatency(r2) + assert(domain.getRecordsSize() == 1) + + domain.recordLatency(r3) + assert(domain.getRecordsSize() == 1) + } +} diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/AggregationOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/AggregationOperator.scala index ea4ac061..3efdcd49 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/AggregationOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/AggregationOperator.scala @@ -3,6 +3,7 @@ package org.grapheco.lynx.operator import org.grapheco.lynx.operator.utils.OperatorUtils import org.grapheco.lynx.types.LynxValue import org.grapheco.lynx.{ExecutionOperator, ExpressionContext, ExpressionEvaluator, LynxType, RowBatch} +import org.grapheco.metrics.{Label, Record, Value, DomainObject} import org.opencypher.v9_0.ast.ReturnItem /** @@ -27,6 +28,8 @@ case class AggregationOperator( var allGroupedData: Iterator[Array[Seq[LynxValue]]] = Iterator.empty var hasPulledData: Boolean = false + val recordLabel = new Label(Set("Aggregation")) + override def openImpl(): Unit = { in.open() schema = (aggregationExprs ++ groupingExprs).map(col => @@ -71,11 +74,18 @@ case class AggregationOperator( hasPulledData = true } - if (allGroupedData.nonEmpty) RowBatch(allGroupedData.next()) - else RowBatch(Seq.empty) + var rb: RowBatch = null + if (allGroupedData.nonEmpty) { + rb = RowBatch(allGroupedData.next()) + } else { + rb = RowBatch(Seq.empty) + } + rb } override def closeImpl(): Unit = {} override def outputSchema(): Seq[(String, LynxType)] = schema + + override def getOperatorName(): String = "Aggregation" } diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/CreateOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/CreateOperator.scala index bbbdf6df..2f3ef0b8 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/CreateOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/CreateOperator.scala @@ -117,6 +117,8 @@ case class CreateOperator( override def outputSchema(): Seq[(String, LynxType)] = schema + override def getOperatorName(): String = "Create" + private def nodeInputRef(varname: String, ctxMap: Map[String, LynxValue]): NodeInputRef = { ctxMap .get(varname) diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/ExecutionOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/ExecutionOperator.scala index baad38ff..1541f461 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/ExecutionOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/ExecutionOperator.scala @@ -1,6 +1,7 @@ package org.grapheco.lynx import org.grapheco.lynx.types.LynxValue +import org.grapheco.metrics.DomainObject import org.opencypher.v9_0.expressions.Expression /** @@ -27,11 +28,17 @@ trait ExecutionOperator extends TreeNode { // empty RowBatch means the end of output def getNext(): RowBatch = { - getNextImpl() + val opName = getOperatorName() + DomainObject.recordLatency(Set[String](opName)) + val rb = getNextImpl() + DomainObject.recordLatency(Set[String](opName)) + rb } // to be implemented by concrete operators def getNextImpl(): RowBatch + def getOperatorName(): String + def close(): Unit = { closeImpl() } diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/FilterOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/FilterOperator.scala index 7246014a..3e35854a 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/FilterOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/FilterOperator.scala @@ -54,5 +54,7 @@ case class FilterOperator( override def closeImpl(): Unit = {} + override def getOperatorName(): String = "Filter" + override def outputSchema(): Seq[(String, LynxType)] = in.outputSchema() } diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/LimitOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/LimitOperator.scala index 4506f3a5..207edb9c 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/LimitOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/LimitOperator.scala @@ -51,4 +51,6 @@ case class LimitOperator( override def closeImpl(): Unit = {} override def outputSchema(): Seq[(String, LynxType)] = in.outputSchema() + + override def getOperatorName(): String = "Limit" } diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/NodeScanOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/NodeScanOperator.scala index 281efdb7..15991c61 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/NodeScanOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/NodeScanOperator.scala @@ -67,4 +67,5 @@ case class NodeScanOperator( override def outputSchema(): Seq[(String, LynxType)] = schema + override def getOperatorName(): String = "NodeScan" } diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/OrderByOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/OrderByOperator.scala index 78283c89..7f7f0ba0 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/OrderByOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/OrderByOperator.scala @@ -51,6 +51,8 @@ case class OrderByOperator( override def outputSchema(): Seq[(String, LynxType)] = in.outputSchema() + override def getOperatorName(): String = "OrderBy" + private def sortByItem( a: Seq[LynxValue], b: Seq[LynxValue], diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/PathScanOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/PathScanOperator.scala index 880c5d38..c7a64c98 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/PathScanOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/PathScanOperator.scala @@ -159,4 +159,6 @@ case class PathScanOperator( override def closeImpl(): Unit = {} override def outputSchema(): Seq[(String, LynxType)] = schema + + override def getOperatorName(): String = "PathScan" } diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/ProjectOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/ProjectOperator.scala index a4981b3a..43f741fd 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/ProjectOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/ProjectOperator.scala @@ -42,4 +42,6 @@ case class ProjectOperator( override def closeImpl(): Unit = {} override def outputSchema(): Seq[(String, LynxType)] = projectSchema + + override def getOperatorName(): String = "Project" } diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/SelectOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/SelectOperator.scala index 849906a7..cfe42158 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/SelectOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/SelectOperator.scala @@ -42,4 +42,6 @@ case class SelectOperator( override def closeImpl(): Unit = {} override def outputSchema(): Seq[(String, LynxType)] = outPutSchema + + override def getOperatorName(): String = "Select" } diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/SkipOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/SkipOperator.scala index 6619fc59..e9ae3e9e 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/SkipOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/SkipOperator.scala @@ -45,4 +45,6 @@ case class SkipOperator( override def closeImpl(): Unit = {} override def outputSchema(): Seq[(String, LynxType)] = in.outputSchema() + + override def getOperatorName(): String = "Skip" } diff --git a/lynx/src/main/scala/org/grapheco/lynx/runner.scala b/lynx/src/main/scala/org/grapheco/lynx/runner.scala index d0944897..d4e87cfc 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/runner.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/runner.scala @@ -10,6 +10,7 @@ import org.opencypher.v9_0.ast.Statement import org.opencypher.v9_0.ast.semantics.SemanticState import org.opencypher.v9_0.expressions.{LabelName, PropertyKeyName, Range, SemanticDirection} import org.opencypher.v9_0.expressions.SemanticDirection.{BOTH, INCOMING, OUTGOING} +import org.grapheco.metrics.DomainObject import scala.annotation.tailrec import scala.collection.mutable @@ -54,24 +55,44 @@ class CypherRunner(graphModel: GraphModel) extends LazyLogging { queryParser.parse(query) def run(query: String, param: Map[String, Any]): LynxResult = { + val queryLabel = query + + DomainObject.recordLatency(Set(queryLabel)) + + val parserLabel = "parser" + DomainObject.recordLatency(Set(queryLabel, parserLabel)) val (statement, param2, state) = queryParser.parse(query) logger.debug(s"AST tree: ${statement}") + DomainObject.recordLatency(Set(queryLabel, parserLabel)) + val logicalPlanLabel = "logical-plan" val logicalPlannerContext = LogicalPlannerContext(param ++ param2, runnerContext) + DomainObject.recordLatency(Set(queryLabel, logicalPlanLabel)) val logicalPlan = logicalPlanner.plan(statement, logicalPlannerContext) logger.debug(s"logical plan: \r\n${logicalPlan.pretty}") + DomainObject.recordLatency(Set(queryLabel, logicalPlanLabel)) + val physicalPlanLabel = "physical-plan" val physicalPlannerContext = PhysicalPlannerContext(param ++ param2, runnerContext) + DomainObject.recordLatency(Set(queryLabel, physicalPlanLabel)) val physicalPlan = physicalPlanner.plan(logicalPlan)(physicalPlannerContext) logger.debug(s"physical plan: \r\n${physicalPlan.pretty}") + DomainObject.recordLatency(Set(queryLabel, physicalPlanLabel)) + val optimizerLabel = "optimizer" + DomainObject.recordLatency(Set(queryLabel, optimizerLabel)) val optimizedPhysicalPlan = physicalPlanOptimizer.optimize(physicalPlan, physicalPlannerContext) logger.debug(s"optimized physical plan: \r\n${optimizedPhysicalPlan.pretty}") + DomainObject.recordLatency(Set(queryLabel, optimizerLabel)) + val executeLabel = "execute" val ctx = ExecutionContext(physicalPlannerContext, statement, param ++ param2) + DomainObject.recordLatency(Set(queryLabel, executeLabel)) val df = optimizedPhysicalPlan.execute(ctx) graphModel.write.commit + DomainObject.recordLatency(Set(queryLabel, executeLabel)) + DomainObject.recordLatency(Set(queryLabel)) new LynxResult() with PlanAware { val schema = df.schema val columnNames = schema.map(_._1)