diff --git a/storage/src/main/scala/org/grapheco/tudb/engine/GraphEngine.scala b/storage/src/main/scala/org/grapheco/tudb/engine/GraphEngine.scala new file mode 100644 index 00000000..c4877291 --- /dev/null +++ b/storage/src/main/scala/org/grapheco/tudb/engine/GraphEngine.scala @@ -0,0 +1,8 @@ +package org.grapheco.tudb.engine + +import org.grapheco.tudb.store.storage.KeyValueDB + +// Functions of a GraphEngine should be similar to NodeStoreAPI/RelationshipStoreAPI currently +class GraphEngine(storageInstance: KeyValueDB) { + +} diff --git a/storage/src/main/scala/org/grapheco/tudb/engine/GraphEngineOnRockDB.scala b/storage/src/main/scala/org/grapheco/tudb/engine/GraphEngineOnRockDB.scala deleted file mode 100644 index 92b96d89..00000000 --- a/storage/src/main/scala/org/grapheco/tudb/engine/GraphEngineOnRockDB.scala +++ /dev/null @@ -1,3 +0,0 @@ -package org.grapheco.tudb.engine - -class GraphEngineOnRockDB {} diff --git a/storage/src/main/scala/org/grapheco/tudb/engine/NodeOpOnRocksDB.scala b/storage/src/main/scala/org/grapheco/tudb/engine/NodeOpOnRocksDB.scala new file mode 100644 index 00000000..c95d3508 --- /dev/null +++ b/storage/src/main/scala/org/grapheco/tudb/engine/NodeOpOnRocksDB.scala @@ -0,0 +1,362 @@ +package org.grapheco.tudb.engine + +import org.grapheco.tudb.serializer.{BaseSerializer, NodeSerializer} +import org.grapheco.tudb.store.index.IndexFactory +import org.grapheco.tudb.store.meta.TypeManager.NodeId +import org.grapheco.tudb.store.meta.{ConfigNameMap, DBNameMap, IdGenerator, NodeLabelNameStore, PropertyNameStore} +import org.grapheco.tudb.store.node.{NodeLabelStore, NodeStore, NodeStoreSPI, StoredNodeWithProperty} +import org.grapheco.tudb.store.storage.{KeyValueDB, RocksDBStorage} +import org.rocksdb.{WriteBatch, WriteOptions} + +import scala.collection.mutable + +class NodeOpOnRocksDB(operationContext: OperationsContext) extends NodeOperations { + private val nodeDB = + RocksDBStorage.getDB(nodeDBPath, rocksdbConfigPath = nodeDBConfigPath) + private val nodeLabelDB = RocksDBStorage.getDB( + nodeLabelDBPath, + rocksdbConfigPath = nodeLabelConfigPath + ) + + // Fixme: Don't disable the WAL by hard code. + private val writeOptions = new WriteOptions() + writeOptions.setDisableWAL(true) + writeOptions.setIgnoreMissingColumnFamilies(true) + writeOptions.setSync(false) + private val nodeStore = new NodeStore(nodeDB) + private val nodeLabelStore = new NodeLabelStore(nodeLabelDB) + private val nodeLabelName = new NodeLabelNameStore(metaDB) + private val propertyName = new PropertyNameStore(metaDB) + + private val idGenerator = new IdGenerator(nodeLabelDB, 200) + // this is the index engine instance + private val indexImpl = IndexFactory.newIndex(indexUri + "&path=" + dbPath) + + val NONE_LABEL_ID: Int = 0 + + def this( + dbPath: String, + rocksdbCfgPath: String = "default", + metaDB: KeyValueDB, + indexUri: String + ) { + this( + s"${dbPath}/${DBNameMap.nodeDB}", + rocksdbCfgPath, + s"${dbPath}/${DBNameMap.nodeLabelDB}", + rocksdbCfgPath, + metaDB, + indexUri, + dbPath + ) + } + //add all index + logger.info("start add index") + var addCount = 0 + if (indexImpl.hasIndex() && needRebuildIndex()) { + //generate index for all node use memory to speedup + val cacheHashMap = new mutable.HashMap[String, mutable.HashSet[Long]]() + allNodes().foreach { node => + node.properties.foreach { property => + val key = indexImpl.encodeKey(property._1, property._2) + if (cacheHashMap.contains(key)) { + cacheHashMap(key).add(node.id) + if (cacheHashMap(key).size >= 100000) { //batch add index TODO size can be config + indexImpl.batchAddIndex(key, cacheHashMap(key).toSet) + cacheHashMap(key).clear() + } + } else { + cacheHashMap(key) = new mutable.HashSet[Long]() + cacheHashMap(key).add(node.id) + } + addCount += 1 + } + } + cacheHashMap.foreach { //batch add index + case (key, value) => + indexImpl.batchAddIndex(key, value.toSet) + } + } + metaDB.put(ConfigNameMap.indexNameStorageKey, indexImpl.indexName.getBytes) + logger.info(f"load index ok,size:${addCount}") + + /** + * + * @return true if need rebuild index + */ + def needRebuildIndex(): Boolean = { + // check last time use index engine + val indexType = metaDB.get(ConfigNameMap.indexNameStorageKey) + if (indexType == null || indexType.length == 0) { + true + } else { + indexImpl.needRebuildIndex(indexType.toString) + } + + } + + def removePropertyIndexByNodeId(nodeId: Long): Unit = { + if (indexImpl.hasIndex()) { + getNodeById(nodeId).foreach { node => + node.properties.foreach { property => + indexImpl.removeIndex(indexImpl.encodeKey(property._1, property._2), node.id) + } + } + } + } + + /** + * @see [[NodeStoreSPI.getNodeIdByProperty()]] + * @return bool + */ + def getNodeIdByProperty(propertyKey: Int, propertyValue: Any): Set[Long] = { + indexImpl.getIndexByKey(indexImpl.encodeKey(propertyKey, propertyValue)) + } + + /** + * @see [[NodeStoreSPI.hasIndex()]] + * @return bool + */ + def hasIndex(): Boolean = { + indexImpl.hasIndex() + } + + override def allLabels(): Array[String] = + nodeLabelName.mapString2Int.keys.toArray + + override def allLabelIds(): Array[Int] = + nodeLabelName.mapInt2String.keys.toArray + + override def getLabelName(labelId: Int): Option[String] = + nodeLabelName.key(labelId) + + override def getLabelId(labelName: String): Option[Int] = + nodeLabelName.id(labelName) + + override def getLabelIds(labelNames: Set[String]): Set[Int] = + nodeLabelName.ids(labelNames) + + override def addLabel(labelName: String): Int = + nodeLabelName.getOrAddId(labelName) + + override def allPropertyKeys(): Array[String] = + propertyName.mapString2Int.keys.toArray + + override def allPropertyKeyIds(): Array[Int] = + propertyName.mapInt2String.keys.toArray + + override def getPropertyKeyName(keyId: Int): Option[String] = + propertyName.key(keyId) + + override def getPropertyKeyId(keyName: String): Option[Int] = + propertyName.id(keyName) + + override def addPropertyKey(keyName: String): Int = + propertyName.getOrAddId(keyName) + + override def getNodeById(nodeId: Long): Option[StoredNodeWithProperty] = { + nodeLabelStore.get(nodeId).map(nodeStore.get(nodeId, _).get) + } + + override def getNodeById(nodeId: Long, label: Int): Option[StoredNodeWithProperty] = + nodeStore.get(nodeId, label) + + override def getNodeLabelsById(nodeId: Long): Array[Int] = + nodeLabelStore.getAll(nodeId) + + override def hasLabel(nodeId: Long, label: Int): Boolean = + nodeLabelStore.exist(nodeId, label) + + override def nodeAddLabel(nodeId: Long, labelId: Int): Unit = + getNodeById(nodeId) + .foreach { node => + if (!node.labelIds.contains(labelId)) { + val labels = node.labelIds ++ Array(labelId) + nodeLabelStore.set(nodeId, labelId) + val nodeInBytes: Array[Byte] = NodeSerializer + .encodeNodeWithProperties(node.id, labels, node.properties) + nodeStore.set( + new StoredNodeWithProperty(node.id, labels, nodeInBytes) + ) + // if node is nonLabel node, delete it + if (node.labelIds.isEmpty) { + nodeLabelStore.delete(nodeId, NONE_LABEL_ID) + nodeStore.delete(nodeId, NONE_LABEL_ID) + } + } + } + + override def nodeRemoveLabel(nodeId: Long, labelId: Int): Unit = + nodeStore + .get(nodeId, labelId) + .foreach { node => + if (node.labelIds.contains(labelId)) { + val labels = node.labelIds.filter(_ != labelId) + val nodeInBytes: Array[Byte] = NodeSerializer + .encodeNodeWithProperties(node.id, labels, node.properties) + val newNode = new StoredNodeWithProperty(node.id, labels, nodeInBytes) + // if a node has only one label, add NONE_LABEL_ID after delete it + if (node.labelIds.length == 1) { + nodeLabelStore.set(nodeId, NONE_LABEL_ID) + nodeStore.set(NONE_LABEL_ID, newNode) + } + nodeLabelStore.delete(node.id, labelId) + nodeStore.set(newNode) + nodeStore.delete(nodeId, labelId) + } + } + + override def nodeSetProperty(nodeId: Long, propertyKeyId: Int, propertyValue: Any): Unit = { + getNodeById(nodeId) + .foreach { node => + { + val nodeInBytes: Array[Byte] = + NodeSerializer.encodeNodeWithProperties( + node.id, + node.labelIds, + node.properties ++ Map(propertyKeyId -> propertyValue) + ) + nodeStore.set( + new StoredNodeWithProperty(node.id, node.labelIds, nodeInBytes) + ) + //add node id to index + indexImpl.addIndex(indexImpl.encodeKey(propertyKeyId, propertyValue), nodeId) + } + } + } + + override def nodeRemoveProperty(nodeId: Long, propertyKeyId: Int): Any = { + getNodeById(nodeId) + .foreach { node => + { + val nodeInBytes: Array[Byte] = + NodeSerializer.encodeNodeWithProperties( + node.id, + node.labelIds, + node.properties - propertyKeyId + ) + nodeStore.set( + new StoredNodeWithProperty(node.id, node.labelIds, nodeInBytes) + ) + //remove node id from index + node.properties + .get(propertyKeyId) + .map(propertyValue => + indexImpl.removeIndex(indexImpl.encodeKey(propertyKeyId, propertyValue), nodeId) + ) + } + } + } + + override def addNode(node: StoredNodeWithProperty): Unit = { + if (node.labelIds != null && node.labelIds.length > 0) { + nodeStore.set(node) + nodeLabelStore.set(node.id, node.labelIds) + } else { + nodeStore.set(NONE_LABEL_ID, node) + nodeLabelStore.set(node.id, NONE_LABEL_ID) + } + //add node id to index + node.properties.foreach { property => + indexImpl.addIndex(indexImpl.encodeKey(property._1, property._2), node.id) + } + } + + override def addNode(nodeId: NodeId, labelIds: Array[Int], props: Map[Int, Any]): Unit = { + addNode(new StoredNodeWithProperty(nodeId, labelIds, props)) + } + + override def allNodes(): Iterator[StoredNodeWithProperty] = nodeStore.all() + + override def nodesCount: Long = nodeLabelStore.getNodesCount + + override def getNodesByLabel(labelId: Int): Iterator[StoredNodeWithProperty] = + nodeStore.getNodesByLabel(labelId) + + // Fixme: This func is slow, because it needs to finish all the search before return the final iter. + override def getNodesByLabels(labelIds: Seq[Int]): Iterator[StoredNodeWithProperty] = + labelIds.flatMap(labelId => nodeStore.getNodesByLabel(labelId)).toIterator + + override def getNodeById(nodeId: Long, label: Option[Int]): Option[StoredNodeWithProperty] = + label.map(getNodeById(nodeId, _)).getOrElse(getNodeById(nodeId)) + + override def getNodeIdsByLabel(labelId: Int): Iterator[Long] = + nodeStore.getNodeIdsByLabel(labelId) + + override def deleteNode(nodeId: Long): Unit = { + removePropertyIndexByNodeId(nodeId) + nodeLabelStore + .getAll(nodeId) + .foreach(labelId => nodeStore.delete(nodeId, labelId)) + nodeLabelStore.delete(nodeId) + } + + override def deleteNodes(nodeIDs: Iterator[NodeId]): Unit = { + val nodesWB = new WriteBatch() + val labelWB = new WriteBatch() + nodeIDs.foreach(nid => { + nodeLabelStore + .getAll(nid) + .foreach(lid => { + nodesWB.delete(NodeSerializer.encodeNodeKey(nid, lid)) + }) + labelWB.deleteRange( + NodeSerializer.encodeNodeKey(nid, 0), + NodeSerializer.encodeNodeKey(nid, -1) + ) + removePropertyIndexByNodeId(nid) + }) + nodeDB.write(writeOptions, nodesWB) //TODO Important! to guarantee atomic + nodeLabelDB.write( + writeOptions, + labelWB + ) //TODO Important! to guarantee atomic + } + + // big cost + override def deleteNodesByLabel(labelId: Int): Unit = { + nodeStore + .getNodeIdsByLabel(labelId) + .foreach { nodeid => + nodeLabelStore + .getAll(nodeid) + .foreach { + nodeStore.delete(nodeid, _) + } + nodeLabelStore.delete(nodeid) + removePropertyIndexByNodeId(nodeid) + } + nodeStore.deleteByLabel(labelId) + } + + override def close(): Unit = { + indexImpl.close() + nodeDB.close() + nodeLabelDB.close() + metaDB.close() + } + + override def newNodeId(): Long = { + idGenerator.nextId() + } + + override def refreshMeta(): Unit = {} + + override def cleanData(): Unit = {} + + override def serializeLabelIdsToBytes(labelIds: Array[Int]): Array[Byte] = { + BaseSerializer.encodeArray(labelIds) + } + + override def deserializeBytesToLabelIds(bytes: Array[Byte]): Array[Int] = { + NodeSerializer.decodeNodeLabelIds(bytes) + } + + override def serializePropertiesToBytes(properties: Map[Int, Any]): Array[Byte] = { + NodeSerializer.encodeNodeProperties(properties) + } + + override def deserializeBytesToProperties(bytes: Array[Byte]): Map[Int, Any] = { + BaseSerializer.decodePropMap(bytes) + } +} diff --git a/storage/src/main/scala/org/grapheco/tudb/engine/NodeOperations.scala b/storage/src/main/scala/org/grapheco/tudb/engine/NodeOperations.scala new file mode 100644 index 00000000..1ee692dc --- /dev/null +++ b/storage/src/main/scala/org/grapheco/tudb/engine/NodeOperations.scala @@ -0,0 +1,86 @@ +package org.grapheco.tudb.engine +import org.grapheco.tudb.graph.Node + +import com.typesafe.scalalogging.LazyLogging + +trait NodeOperations extends LazyLogging { + + def getNodeIdByProperty(propertyKey: Int, propertyValue: Any): Set[Long] + + def hasIndex(): Boolean + + def refreshMeta(): Unit + + def newNodeId(): Long; + + def cleanData(): Unit + + def hasLabel(nodeId: Long, label: Int): Boolean; + + def addNode(node: Node): Unit + + def addNode(nodeId: Long, labelIds: Array[Int], props: Map[Int, Any]): Unit + + def addLabel(labelName: String): Int; + + def addPropertyKey(keyName: String): Int; + + def nodeAddLabel(nodeId: Long, labelId: Int): Unit; + + def nodeRemoveLabel(nodeId: Long, labelId: Int): Unit; + + def nodeSetProperty(nodeId: Long, propertyKeyId: Int, propertyValue: Any): Unit; + + def nodeRemoveProperty(nodeId: Long, propertyKeyId: Int): Any; + + def deleteNode(nodeId: Long): Unit; + + def deleteNodes(nodeIDs: Iterator[Long]): Unit; + + def deleteNodesByLabel(labelId: Int): Unit + + def getLabelName(labelId: Int): Option[String]; + + def getLabelId(labelName: String): Option[Int]; + + def getLabelIds(labelNames: Set[String]): Set[Int] + + def getPropertyKeyName(keyId: Int): Option[String]; + + def getPropertyKeyId(keyName: String): Option[Int]; + + def getNodeById(nodeId: Long): Option[Node] + + def getNodeById(nodeId: Long, label: Int): Option[Node] + + def getNodeById(nodeId: Long, label: Option[Int]): Option[Node] + + def getNodesByLabel(labelId: Int): Iterator[Node]; + def getNodesByLabels(labelIds: Seq[Int]): Iterator[Node] + + def getNodeIdsByLabel(labelId: Int): Iterator[Long]; + + def getNodeLabelsById(nodeId: Long): Array[Int]; + + def serializeLabelIdsToBytes(labelIds: Array[Int]): Array[Byte]; + + def deserializeBytesToLabelIds(bytes: Array[Byte]): Array[Int]; + + def serializePropertiesToBytes(properties: Map[Int, Any]): Array[Byte]; + + def deserializeBytesToProperties(bytes: Array[Byte]): Map[Int, Any]; + + def allLabels(): Array[String]; + + def allLabelIds(): Array[Int]; + + def allPropertyKeys(): Array[String]; + + def allPropertyKeyIds(): Array[Int]; + + def allNodes(): Iterator[Node] + + def nodesCount: Long + + def close(): Unit +} diff --git a/storage/src/main/scala/org/grapheco/tudb/engine/OperationsContext.scala b/storage/src/main/scala/org/grapheco/tudb/engine/OperationsContext.scala new file mode 100644 index 00000000..d141900b --- /dev/null +++ b/storage/src/main/scala/org/grapheco/tudb/engine/OperationsContext.scala @@ -0,0 +1,20 @@ +package org.grapheco.tudb.engine + +import org.grapheco.tudb.ContextMap +import org.grapheco.tudb.store.storage.KeyValueDB + +class OperationsContext extends ContextMap { + + def setNodeDB(db: KeyValueDB) = super.put("__node_db__", db) + def getNodeDB() = super.get("__node_db__") + + def setRelationshipDB(db: KeyValueDB) = super.put("__relationship_db__", db) + def getRelationshipDB() = super.get("__relationship_db__") + + def setIndexDB(db: KeyValueDB) = super.put("__index_db__", db) + def getIndexDB() = super.get("__index_db__") + + def setMetaDB(db: KeyValueDB) = super.put("__meta_db__", db) + def getMetaDB() = super.get("__meta_db__") + +} diff --git a/storage/src/main/scala/org/grapheco/tudb/engine/RelationshipOperations.scala b/storage/src/main/scala/org/grapheco/tudb/engine/RelationshipOperations.scala new file mode 100644 index 00000000..05249b79 --- /dev/null +++ b/storage/src/main/scala/org/grapheco/tudb/engine/RelationshipOperations.scala @@ -0,0 +1,84 @@ +package org.grapheco.tudb.engine + +import org.grapheco.tudb.graph.Relationship + +trait RelationshipOperations { + def allRelationshipTypes(): Array[String]; + + def allRelationshipTypeIds(): Array[Int]; + + def relationCount: Long + + def getRelationshipTypeName(relationTypeId: Int): Option[String]; + + def getRelationshipTypeId(relationTypeName: String): Option[Int]; + + def addRelationshipType(relationTypeName: String): Int; + + def allPropertyKeys(): Array[String]; + + def allPropertyKeyIds(): Array[Int]; + + def getPropertyKeyName(keyId: Int): Option[String]; + + def getPropertyKeyId(keyName: String): Option[Int]; + + def addPropertyKey(keyName: String): Int; + + def getRelationshipById(relId: Long): Option[Relationship]; + + def getRelationshipIdsByRelationshipType(relationTypeId: Int): Iterator[Long]; + + def relationshipSetProperty(relationId: Long, propertyKeyId: Int, propertyValue: Any): Unit; + + def relationshipRemoveProperty(relationId: Long, propertyKeyId: Int): Any; + + def deleteRelationship(relationId: Long): Unit; + + def findToNodeIds(fromNodeId: Long): Iterator[Long]; + + def findToNodeIds(fromNodeId: Long, relationType: Int): Iterator[Long]; + + def findFromNodeIds(toNodeId: Long): Iterator[Long]; + + def findFromNodeIds(toNodeId: Long, relationType: Int): Iterator[Long]; + + def newRelationshipId(): Long; + + def addRelationship(relation: Relationship): Unit + + def addRelationship( + relationshipId: Long, + fromId: Long, + toId: Long, + typeId: Int, + props: Map[Int, Any] + ) + + def allRelationships(): Iterator[Relationship] + def allRelationshipsWithProperty(): Iterator[Relationship] + + def findOutRelationships(fromNodeId: Long): Iterator[Relationship] = + findOutRelationships(fromNodeId, None) + + def findOutRelationships(fromNodeId: Long, edgeType: Option[Int] = None): Iterator[Relationship] + + def findInRelationships(toNodeId: Long): Iterator[Relationship] = + findInRelationships(toNodeId, None) + + def findInRelationships(toNodeId: Long, edgeType: Option[Int] = None): Iterator[Relationship] + + def findInRelationshipsBetween( + toNodeId: Long, + fromNodeId: Long, + edgeType: Option[Int] = None + ): Iterator[Relationship] + + def findOutRelationshipsBetween( + fromNodeId: Long, + toNodeId: Long, + edgeType: Option[Int] = None + ): Iterator[Relationship] + + def close(): Unit +}