From e464c19b8aadbd1d164ea1f8334abc69ee32a6db Mon Sep 17 00:00:00 2001 From: YING XUAN Date: Tue, 30 Aug 2022 21:29:26 -0700 Subject: [PATCH 1/8] add metrics class --- .../scala/org/grapheco/metrics/Domain.scala | 40 +++++++++++++++++++ .../scala/org/grapheco/metrics/Label.scala | 29 ++++++++++++++ .../scala/org/grapheco/metrics/Record.scala | 32 +++++++++++++++ .../org/grapheco/metrics/Timestamp.scala | 16 ++++++++ .../scala/org/grapheco/metrics/Value.scala | 30 ++++++++++++++ 5 files changed, 147 insertions(+) create mode 100644 common-utils/src/main/scala/org/grapheco/metrics/Domain.scala create mode 100644 common-utils/src/main/scala/org/grapheco/metrics/Label.scala create mode 100644 common-utils/src/main/scala/org/grapheco/metrics/Record.scala create mode 100644 common-utils/src/main/scala/org/grapheco/metrics/Timestamp.scala create mode 100644 common-utils/src/main/scala/org/grapheco/metrics/Value.scala diff --git a/common-utils/src/main/scala/org/grapheco/metrics/Domain.scala b/common-utils/src/main/scala/org/grapheco/metrics/Domain.scala new file mode 100644 index 00000000..510ad98d --- /dev/null +++ b/common-utils/src/main/scala/org/grapheco/metrics/Domain.scala @@ -0,0 +1,40 @@ +package org.grapheco.metrics + +import util.control.Breaks + +class Domain(dID: String) { + var records: Vector[Record] = Vector() + val id: String = dID + + def addRecord(r: Record): Unit = { + records = records :+ r + } + + // record the latency for an operation + def recordLatency(r: Record): Unit = { + val breakloop = new Breaks + + // if there is an existing record with the same label, we assume this record is the start point + // of the operation and compute the operation latency according to the timestamp + breakloop.breakable { + for (or <- records.reverse) { + if (or.matchLabel(r)) { + val latency = r.computeLatency(or) + records = records.filterNot(_ == or) + r.value.setValue(latency) + records = records :+ r + breakloop.break + } + } + } + records = records :+ r + } + + def printRecordByLabel(l: Label): Unit = { + for (r <- records) { + if (r.containLabel(l)) { + r.print(id) + } + } + } +} diff --git a/common-utils/src/main/scala/org/grapheco/metrics/Label.scala b/common-utils/src/main/scala/org/grapheco/metrics/Label.scala new file mode 100644 index 00000000..868d2c86 --- /dev/null +++ b/common-utils/src/main/scala/org/grapheco/metrics/Label.scala @@ -0,0 +1,29 @@ +package org.grapheco.metrics + +class Label(val ls: Set[String]) { + var labels: Set[String] = ls + + def contains(label: Label): Boolean = { + for (l <- label.labels) { + if (!labels.contains(l)) { + return false + } + } + false + } + + def matches(label: Label): Boolean = { + if (labels.size != label.labels.size) { + return false + } + contains(label) + } + + def addLabel(label: String): Unit = { + labels += label + } + + override def toString(): String = { + labels.mkString(";") + } +} \ No newline at end of file diff --git a/common-utils/src/main/scala/org/grapheco/metrics/Record.scala b/common-utils/src/main/scala/org/grapheco/metrics/Record.scala new file mode 100644 index 00000000..afaf8cff --- /dev/null +++ b/common-utils/src/main/scala/org/grapheco/metrics/Record.scala @@ -0,0 +1,32 @@ +package org.grapheco.metrics + +class Record(l: Label, v: Value) { + var timestamp: Timestamp = new Timestamp() + var label: Label = l + var value: Value = v + + def matchLabel(r: Record): Boolean = { + label.matches(r.label) + } + + def containLabel(l: Label): Boolean = { + label.contains(l) + } + + def -(r: Record): Record = { + val v = value - r.value + if (v == null) { + return null + } + value = v + this + } + + def computeLatency(r: Record): Long = { + timestamp - r.timestamp + } + + def print(dID: String): Unit = { + printf("[%s][%s][%s]%s\n", dID, label.toString(), timestamp.toString(), value.toString()) + } +} diff --git a/common-utils/src/main/scala/org/grapheco/metrics/Timestamp.scala b/common-utils/src/main/scala/org/grapheco/metrics/Timestamp.scala new file mode 100644 index 00000000..56042834 --- /dev/null +++ b/common-utils/src/main/scala/org/grapheco/metrics/Timestamp.scala @@ -0,0 +1,16 @@ +package org.grapheco.metrics + +import java.time.LocalDateTime +import java.time.temporal.ChronoUnit._ + +class Timestamp { + val timestamp = LocalDateTime.now() + + override def toString(): String = { + timestamp.toString + } + + def -(t: Timestamp): Long = { + MILLIS.between(timestamp, t.timestamp) + } +} diff --git a/common-utils/src/main/scala/org/grapheco/metrics/Value.scala b/common-utils/src/main/scala/org/grapheco/metrics/Value.scala new file mode 100644 index 00000000..f69d6a66 --- /dev/null +++ b/common-utils/src/main/scala/org/grapheco/metrics/Value.scala @@ -0,0 +1,30 @@ +package org.grapheco.metrics + +class Value(v: Any) { + var value: Any = v + + def -(other: Value): Value = { + var succ: Boolean = false + var newVal: Value = new Value() + if (value.isInstanceOf[Int] && other.isInstanceOf[Int]) { + newVal.setValue(value.asInstanceOf[Int] - other.asInstanceOf[Int]) + succ = true + } else if (value.isInstanceOf[Float] && other.isInstanceOf[Float]) { + newVal.setValue(value.asInstanceOf[Float] - other.asInstanceOf[Float]) + succ = true + } + + if (succ) { + return newVal + } + null + } + + def setValue(v: Any): Unit = { + value = v + } + + override def toString(): String = { + value.toString + } +} From 29b105d2f4f48a7231917ec8c63f60933b64755d Mon Sep 17 00:00:00 2001 From: YING XUAN Date: Thu, 1 Sep 2022 10:06:05 -0700 Subject: [PATCH 2/8] fix label class --- common-utils/src/main/scala/org/grapheco/metrics/Label.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common-utils/src/main/scala/org/grapheco/metrics/Label.scala b/common-utils/src/main/scala/org/grapheco/metrics/Label.scala index 868d2c86..6066e17f 100644 --- a/common-utils/src/main/scala/org/grapheco/metrics/Label.scala +++ b/common-utils/src/main/scala/org/grapheco/metrics/Label.scala @@ -9,7 +9,7 @@ class Label(val ls: Set[String]) { return false } } - false + true } def matches(label: Label): Boolean = { From 94e7874ed0a4eea6568282d8b87283f41fdb416e Mon Sep 17 00:00:00 2001 From: YING XUAN Date: Mon, 5 Sep 2022 15:20:46 -0700 Subject: [PATCH 3/8] format code --- .../main/scala/org/grapheco/metrics/Label.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/common-utils/src/main/scala/org/grapheco/metrics/Label.scala b/common-utils/src/main/scala/org/grapheco/metrics/Label.scala index 6066e17f..f56eefbc 100644 --- a/common-utils/src/main/scala/org/grapheco/metrics/Label.scala +++ b/common-utils/src/main/scala/org/grapheco/metrics/Label.scala @@ -3,6 +3,13 @@ package org.grapheco.metrics class Label(val ls: Set[String]) { var labels: Set[String] = ls + def matches(label: Label): Boolean = { + if (labels.size != label.labels.size) { + return false + } + contains(label) + } + def contains(label: Label): Boolean = { for (l <- label.labels) { if (!labels.contains(l)) { @@ -12,13 +19,6 @@ class Label(val ls: Set[String]) { true } - def matches(label: Label): Boolean = { - if (labels.size != label.labels.size) { - return false - } - contains(label) - } - def addLabel(label: String): Unit = { labels += label } @@ -26,4 +26,4 @@ class Label(val ls: Set[String]) { override def toString(): String = { labels.mkString(";") } -} \ No newline at end of file +} From 37af34f35c74129370589ab3c963183ba44feee3 Mon Sep 17 00:00:00 2001 From: YING XUAN Date: Sun, 11 Sep 2022 18:31:26 -0700 Subject: [PATCH 4/8] add test --- common-utils/pom.xml | 12 +++ .../scala/org/grapheco/metrics/Domain.scala | 33 ++++++--- .../org/grapheco/metrics/DomainTest.scala | 74 +++++++++++++++++++ 3 files changed, 108 insertions(+), 11 deletions(-) create mode 100644 common-utils/src/test/scala/org/grapheco/metrics/DomainTest.scala diff --git a/common-utils/pom.xml b/common-utils/pom.xml index fc7839ae..95029c1b 100644 --- a/common-utils/pom.xml +++ b/common-utils/pom.xml @@ -72,6 +72,18 @@ org.apache.commons commons-lang3 + + junit + junit + 4.13.2 + test + + + org.junit.vintage + junit-vintage-engine + + + diff --git a/common-utils/src/main/scala/org/grapheco/metrics/Domain.scala b/common-utils/src/main/scala/org/grapheco/metrics/Domain.scala index 510ad98d..2b31cbc1 100644 --- a/common-utils/src/main/scala/org/grapheco/metrics/Domain.scala +++ b/common-utils/src/main/scala/org/grapheco/metrics/Domain.scala @@ -12,21 +12,18 @@ class Domain(dID: String) { // record the latency for an operation def recordLatency(r: Record): Unit = { - val breakloop = new Breaks - // if there is an existing record with the same label, we assume this record is the start point // of the operation and compute the operation latency according to the timestamp - breakloop.breakable { - for (or <- records.reverse) { - if (or.matchLabel(r)) { - val latency = r.computeLatency(or) - records = records.filterNot(_ == or) - r.value.setValue(latency) - records = records :+ r - breakloop.break - } + for (or <- records.reverse) { + if (or.matchLabel(r)) { + val latency = r.computeLatency(or) + records = records.filterNot(_ == or) + r.value.setValue(latency) + records = records :+ r + return } } + records = records :+ r } @@ -37,4 +34,18 @@ class Domain(dID: String) { } } } + + def filterRecords(l: Label): Set[Record] = { + var filterRecords: Set[Record] = Set() + for (r <- records) { + if (r.containLabel(l)) { + filterRecords += r + } + } + filterRecords + } + + def getRecordsSize(): Int = { + records.length + } } diff --git a/common-utils/src/test/scala/org/grapheco/metrics/DomainTest.scala b/common-utils/src/test/scala/org/grapheco/metrics/DomainTest.scala new file mode 100644 index 00000000..9a7155c1 --- /dev/null +++ b/common-utils/src/test/scala/org/grapheco/metrics/DomainTest.scala @@ -0,0 +1,74 @@ +package org.grapheco.metrics + +import org.junit.{Before, Test} + +class DomainTest { + val domainID = "did" + + val str1 = "a" + val str2 = "b" + val str3 = "c" + + val set1: Set[String] = Set(str1) + val set2: Set[String] = Set(str1, str2) + val set3: Set[String] = Set(str1, str2) + val set4: Set[String] = Set(str2, str3) + val set5: Set[String] = Set(str3) + + val l1 = new Label(set1) + val l2 = new Label(set2) + val l3 = new Label(set3) + val l4 = new Label(set4) + val l5 = new Label(set5) + + val v1 = new Value(1) + val v2 = new Value(2) + val v3 = new Value(3) + val v4 = new Value(4) + val v5 = new Value(5) + + val r1 = new Record(l1, v1) + val r2 = new Record(l2, v2) + val r3 = new Record(l3, v3) + val r4 = new Record(l4, v4) + val r5 = new Record(l5, v5) + + var domain = new Domain(domainID) + + @Before + def init(): Unit = {} + + @Test + def testLabels(): Unit = { + domain.addRecord(r1) + domain.addRecord(r2) + domain.addRecord(r3) + domain.addRecord(r4) + domain.addRecord(r5) + + val filterl1 = new Label(Set(str1)) + val filterl2 = new Label(Set(str2)) + val filterl3 = new Label(Set(str3)) + + val records1 = domain.filterRecords(filterl1) + assert(records1.size == 3) + + val records2 = domain.filterRecords(filterl2) + assert(records2.size == 3) + + val records3 = domain.filterRecords(filterl3) + assert(records3.size == 2) + + assert(r2.containLabel(l1)) + assert(r3.matchLabel(r2)) + } + + @Test + def testRecordLatency(): Unit = { + domain.recordLatency(r2) + assert(domain.getRecordsSize() == 1) + + domain.recordLatency(r3) + assert(domain.getRecordsSize() == 1) + } +} From 712674f181cf6b79862030b81a42cb8b6179c737 Mon Sep 17 00:00:00 2001 From: YING XUAN Date: Mon, 12 Sep 2022 10:06:49 -0700 Subject: [PATCH 5/8] add first perf metrics --- .../scala/org/grapheco/metrics/DomainObject.scala | 15 +++++++++++++++ .../lynx/operator/AggregationOperator.scala | 14 ++++++++++++-- 2 files changed, 27 insertions(+), 2 deletions(-) create mode 100644 common-utils/src/main/scala/org/grapheco/metrics/DomainObject.scala diff --git a/common-utils/src/main/scala/org/grapheco/metrics/DomainObject.scala b/common-utils/src/main/scala/org/grapheco/metrics/DomainObject.scala new file mode 100644 index 00000000..310a1940 --- /dev/null +++ b/common-utils/src/main/scala/org/grapheco/metrics/DomainObject.scala @@ -0,0 +1,15 @@ +package org.grapheco.metrics + +object DomainObject { + val domainID = "operator" + + var domain: Domain = new Domain(domainID) + + def addRecord(r: Record): Unit = { + domain.addRecord(r) + } + + def recordLatency(r: Record): Unit = { + domain.recordLatency(r) + } +} diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/AggregationOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/AggregationOperator.scala index ea4ac061..5fa17770 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/AggregationOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/AggregationOperator.scala @@ -3,6 +3,7 @@ package org.grapheco.lynx.operator import org.grapheco.lynx.operator.utils.OperatorUtils import org.grapheco.lynx.types.LynxValue import org.grapheco.lynx.{ExecutionOperator, ExpressionContext, ExpressionEvaluator, LynxType, RowBatch} +import org.grapheco.metrics.{Label, Record, Value, DomainObject} import org.opencypher.v9_0.ast.ReturnItem /** @@ -27,6 +28,8 @@ case class AggregationOperator( var allGroupedData: Iterator[Array[Seq[LynxValue]]] = Iterator.empty var hasPulledData: Boolean = false + val recordLabel = new Label(Set("Aggregation")) + override def openImpl(): Unit = { in.open() schema = (aggregationExprs ++ groupingExprs).map(col => @@ -35,6 +38,7 @@ case class AggregationOperator( } override def getNextImpl(): RowBatch = { + DomainObject.recordLatency(new Record(recordLabel, new Value(0))) if (!hasPulledData) { val columnNames = in.outputSchema().map(f => f._1) val allData = OperatorUtils.getOperatorAllOutputs(in).flatMap(rowData => rowData.batchData) @@ -71,8 +75,14 @@ case class AggregationOperator( hasPulledData = true } - if (allGroupedData.nonEmpty) RowBatch(allGroupedData.next()) - else RowBatch(Seq.empty) + var rb: RowBatch = null + if (allGroupedData.nonEmpty) { + rb = RowBatch(allGroupedData.next()) + } else { + rb = RowBatch(Seq.empty) + } + DomainObject.recordLatency(new Record(recordLabel, new Value(0))) + rb } override def closeImpl(): Unit = {} From 8c888ca71e0e7cca82dd74d3a567a223e3fd4fb4 Mon Sep 17 00:00:00 2001 From: YING XUAN Date: Thu, 15 Sep 2022 10:36:05 -0700 Subject: [PATCH 6/8] insert perf metrics into query run --- .../org/grapheco/metrics/DomainObject.scala | 5 +++-- .../lynx/operator/AggregationOperator.scala | 2 -- .../main/scala/org/grapheco/lynx/runner.scala | 21 +++++++++++++++++++ 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/common-utils/src/main/scala/org/grapheco/metrics/DomainObject.scala b/common-utils/src/main/scala/org/grapheco/metrics/DomainObject.scala index 310a1940..894cc64b 100644 --- a/common-utils/src/main/scala/org/grapheco/metrics/DomainObject.scala +++ b/common-utils/src/main/scala/org/grapheco/metrics/DomainObject.scala @@ -1,7 +1,7 @@ package org.grapheco.metrics object DomainObject { - val domainID = "operator" + val domainID = "query" var domain: Domain = new Domain(domainID) @@ -9,7 +9,8 @@ object DomainObject { domain.addRecord(r) } - def recordLatency(r: Record): Unit = { + def recordLatency(labels: Set[String]): Unit = { + val r = new Record(new Label(labels), new Value(0)) domain.recordLatency(r) } } diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/AggregationOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/AggregationOperator.scala index 5fa17770..f278f3a8 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/AggregationOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/AggregationOperator.scala @@ -38,7 +38,6 @@ case class AggregationOperator( } override def getNextImpl(): RowBatch = { - DomainObject.recordLatency(new Record(recordLabel, new Value(0))) if (!hasPulledData) { val columnNames = in.outputSchema().map(f => f._1) val allData = OperatorUtils.getOperatorAllOutputs(in).flatMap(rowData => rowData.batchData) @@ -81,7 +80,6 @@ case class AggregationOperator( } else { rb = RowBatch(Seq.empty) } - DomainObject.recordLatency(new Record(recordLabel, new Value(0))) rb } diff --git a/lynx/src/main/scala/org/grapheco/lynx/runner.scala b/lynx/src/main/scala/org/grapheco/lynx/runner.scala index d0944897..d4e87cfc 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/runner.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/runner.scala @@ -10,6 +10,7 @@ import org.opencypher.v9_0.ast.Statement import org.opencypher.v9_0.ast.semantics.SemanticState import org.opencypher.v9_0.expressions.{LabelName, PropertyKeyName, Range, SemanticDirection} import org.opencypher.v9_0.expressions.SemanticDirection.{BOTH, INCOMING, OUTGOING} +import org.grapheco.metrics.DomainObject import scala.annotation.tailrec import scala.collection.mutable @@ -54,24 +55,44 @@ class CypherRunner(graphModel: GraphModel) extends LazyLogging { queryParser.parse(query) def run(query: String, param: Map[String, Any]): LynxResult = { + val queryLabel = query + + DomainObject.recordLatency(Set(queryLabel)) + + val parserLabel = "parser" + DomainObject.recordLatency(Set(queryLabel, parserLabel)) val (statement, param2, state) = queryParser.parse(query) logger.debug(s"AST tree: ${statement}") + DomainObject.recordLatency(Set(queryLabel, parserLabel)) + val logicalPlanLabel = "logical-plan" val logicalPlannerContext = LogicalPlannerContext(param ++ param2, runnerContext) + DomainObject.recordLatency(Set(queryLabel, logicalPlanLabel)) val logicalPlan = logicalPlanner.plan(statement, logicalPlannerContext) logger.debug(s"logical plan: \r\n${logicalPlan.pretty}") + DomainObject.recordLatency(Set(queryLabel, logicalPlanLabel)) + val physicalPlanLabel = "physical-plan" val physicalPlannerContext = PhysicalPlannerContext(param ++ param2, runnerContext) + DomainObject.recordLatency(Set(queryLabel, physicalPlanLabel)) val physicalPlan = physicalPlanner.plan(logicalPlan)(physicalPlannerContext) logger.debug(s"physical plan: \r\n${physicalPlan.pretty}") + DomainObject.recordLatency(Set(queryLabel, physicalPlanLabel)) + val optimizerLabel = "optimizer" + DomainObject.recordLatency(Set(queryLabel, optimizerLabel)) val optimizedPhysicalPlan = physicalPlanOptimizer.optimize(physicalPlan, physicalPlannerContext) logger.debug(s"optimized physical plan: \r\n${optimizedPhysicalPlan.pretty}") + DomainObject.recordLatency(Set(queryLabel, optimizerLabel)) + val executeLabel = "execute" val ctx = ExecutionContext(physicalPlannerContext, statement, param ++ param2) + DomainObject.recordLatency(Set(queryLabel, executeLabel)) val df = optimizedPhysicalPlan.execute(ctx) graphModel.write.commit + DomainObject.recordLatency(Set(queryLabel, executeLabel)) + DomainObject.recordLatency(Set(queryLabel)) new LynxResult() with PlanAware { val schema = df.schema val columnNames = schema.map(_._1) From 82be1266ba8030e8277afa0b5b007d4908c4e999 Mon Sep 17 00:00:00 2001 From: YING XUAN Date: Wed, 21 Sep 2022 09:31:03 -0700 Subject: [PATCH 7/8] insert perf metrics to operator --- .../scala/org/grapheco/lynx/operator/AggregationOperator.scala | 2 ++ .../main/scala/org/grapheco/lynx/operator/CreateOperator.scala | 2 ++ .../scala/org/grapheco/lynx/operator/ExecutionOperator.scala | 3 +++ .../main/scala/org/grapheco/lynx/operator/FilterOperator.scala | 2 ++ .../main/scala/org/grapheco/lynx/operator/LimitOperator.scala | 2 ++ .../scala/org/grapheco/lynx/operator/NodeScanOperator.scala | 1 + .../scala/org/grapheco/lynx/operator/OrderByOperator.scala | 2 ++ .../scala/org/grapheco/lynx/operator/PathScanOperator.scala | 2 ++ .../scala/org/grapheco/lynx/operator/ProjectOperator.scala | 2 ++ .../main/scala/org/grapheco/lynx/operator/SelectOperator.scala | 2 ++ .../main/scala/org/grapheco/lynx/operator/SkipOperator.scala | 2 ++ 11 files changed, 22 insertions(+) diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/AggregationOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/AggregationOperator.scala index f278f3a8..3efdcd49 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/AggregationOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/AggregationOperator.scala @@ -86,4 +86,6 @@ case class AggregationOperator( override def closeImpl(): Unit = {} override def outputSchema(): Seq[(String, LynxType)] = schema + + override def getOperatorName(): String = "Aggregation" } diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/CreateOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/CreateOperator.scala index bbbdf6df..2f3ef0b8 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/CreateOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/CreateOperator.scala @@ -117,6 +117,8 @@ case class CreateOperator( override def outputSchema(): Seq[(String, LynxType)] = schema + override def getOperatorName(): String = "Create" + private def nodeInputRef(varname: String, ctxMap: Map[String, LynxValue]): NodeInputRef = { ctxMap .get(varname) diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/ExecutionOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/ExecutionOperator.scala index baad38ff..b7009a8e 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/ExecutionOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/ExecutionOperator.scala @@ -1,6 +1,7 @@ package org.grapheco.lynx import org.grapheco.lynx.types.LynxValue +import org.grapheco.metrics.DomainObject import org.opencypher.v9_0.expressions.Expression /** @@ -32,6 +33,8 @@ trait ExecutionOperator extends TreeNode { // to be implemented by concrete operators def getNextImpl(): RowBatch + def getOperatorName(): String + def close(): Unit = { closeImpl() } diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/FilterOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/FilterOperator.scala index 7246014a..3e35854a 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/FilterOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/FilterOperator.scala @@ -54,5 +54,7 @@ case class FilterOperator( override def closeImpl(): Unit = {} + override def getOperatorName(): String = "Filter" + override def outputSchema(): Seq[(String, LynxType)] = in.outputSchema() } diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/LimitOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/LimitOperator.scala index 4506f3a5..207edb9c 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/LimitOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/LimitOperator.scala @@ -51,4 +51,6 @@ case class LimitOperator( override def closeImpl(): Unit = {} override def outputSchema(): Seq[(String, LynxType)] = in.outputSchema() + + override def getOperatorName(): String = "Limit" } diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/NodeScanOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/NodeScanOperator.scala index 281efdb7..15991c61 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/NodeScanOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/NodeScanOperator.scala @@ -67,4 +67,5 @@ case class NodeScanOperator( override def outputSchema(): Seq[(String, LynxType)] = schema + override def getOperatorName(): String = "NodeScan" } diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/OrderByOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/OrderByOperator.scala index 78283c89..7f7f0ba0 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/OrderByOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/OrderByOperator.scala @@ -51,6 +51,8 @@ case class OrderByOperator( override def outputSchema(): Seq[(String, LynxType)] = in.outputSchema() + override def getOperatorName(): String = "OrderBy" + private def sortByItem( a: Seq[LynxValue], b: Seq[LynxValue], diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/PathScanOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/PathScanOperator.scala index 880c5d38..c7a64c98 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/PathScanOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/PathScanOperator.scala @@ -159,4 +159,6 @@ case class PathScanOperator( override def closeImpl(): Unit = {} override def outputSchema(): Seq[(String, LynxType)] = schema + + override def getOperatorName(): String = "PathScan" } diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/ProjectOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/ProjectOperator.scala index a4981b3a..43f741fd 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/ProjectOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/ProjectOperator.scala @@ -42,4 +42,6 @@ case class ProjectOperator( override def closeImpl(): Unit = {} override def outputSchema(): Seq[(String, LynxType)] = projectSchema + + override def getOperatorName(): String = "Project" } diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/SelectOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/SelectOperator.scala index 849906a7..cfe42158 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/SelectOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/SelectOperator.scala @@ -42,4 +42,6 @@ case class SelectOperator( override def closeImpl(): Unit = {} override def outputSchema(): Seq[(String, LynxType)] = outPutSchema + + override def getOperatorName(): String = "Select" } diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/SkipOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/SkipOperator.scala index 6619fc59..e9ae3e9e 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/SkipOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/SkipOperator.scala @@ -45,4 +45,6 @@ case class SkipOperator( override def closeImpl(): Unit = {} override def outputSchema(): Seq[(String, LynxType)] = in.outputSchema() + + override def getOperatorName(): String = "Skip" } From c80ab6484801a096e81472fe07ee341b31c3eaba Mon Sep 17 00:00:00 2001 From: YING XUAN Date: Wed, 21 Sep 2022 09:34:54 -0700 Subject: [PATCH 8/8] insert perf metrics to operator --- .../org/grapheco/lynx/operator/ExecutionOperator.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lynx/src/main/scala/org/grapheco/lynx/operator/ExecutionOperator.scala b/lynx/src/main/scala/org/grapheco/lynx/operator/ExecutionOperator.scala index b7009a8e..1541f461 100644 --- a/lynx/src/main/scala/org/grapheco/lynx/operator/ExecutionOperator.scala +++ b/lynx/src/main/scala/org/grapheco/lynx/operator/ExecutionOperator.scala @@ -28,7 +28,11 @@ trait ExecutionOperator extends TreeNode { // empty RowBatch means the end of output def getNext(): RowBatch = { - getNextImpl() + val opName = getOperatorName() + DomainObject.recordLatency(Set[String](opName)) + val rb = getNextImpl() + DomainObject.recordLatency(Set[String](opName)) + rb } // to be implemented by concrete operators def getNextImpl(): RowBatch