需求背景
需要在前端页面展示当前表字段的所有上下游血缘关系,以进一步做数据诊断治理。大致效果图如下:
首先这里解释什么是表字段血缘关系,SQL 示例:
CREATE TABLE IF NOT EXISTS table_b
AS SELECT order_id, order_status FROM table_a;
如上 DDL 语句中,创建的 table_b 的 order_id 和 order_status 字段来源于 table_a,代表table_a 就是 table_b 的来源表,也叫上游表,table_b 就是 table_a 下游表,另外 table_a.order_id 就是 table_b.order_id 的上游字段,它们之间就存在血缘关系。
INSERT INTO table_c
SELECT a.order_id, b.order_status
FROM table_a a JOIN table_b b ON a.order_id = b.order_id;
如上 DML 语句中,table_c 的 order_id 字段来源于 table_a,而 order_status 来源于 table_b,表示 table_c 和 table_a、table_b 之间也存在血缘关系。
由上也可看出想要存储血缘关系,还需要先解析 sql,这块儿主要使用了开源项目 calcite 的解析器,这篇文章不再展开,本篇主要讲如何存储和如何展示
环境配置
参考另一篇:springboot 配置内嵌式 neo4j
Node 数据结构定义
因为要展示表的字段之间的血缘关系,所以直接将表字段作为图节点存储,表字段之间的血缘关系就用图节点之间的关系表示,具体 node 定义如下:
public class ColumnVertex {
// 唯一键
private String name;
public ColumnVertex(String catalogName, String databaseName, String tableName, String columnName) {
this.name = catalogName + "." + databaseName + "." + tableName + "." + columnName;
}
public String getCatalogName() {
return Long.parseLong(name.split("\\.")[0]);
}
public String getDatabaseName() {
return name.split("\\.")[1];
}
public String getTableName() {
return name.split("\\.")[2];
}
public String getColumnName() {
return name.split("\\.")[3];
}
}
通用 Service 定义
public interface EmbeddedGraphService {
// 添加图节点以及与上游节点之间的关系
void addColumnVertex(ColumnVertex currentVertex, ColumnVertex upstreamVertex);
// 寻找上游节点
List<ColumnVertex> findUpstreamColumnVertex(ColumnVertex currentVertex);
// 寻找下游节点
List<ColumnVertex> findDownstreamColumnVertex(ColumnVertex currentVertex);
}
Service 实现
import javax.annotation.Resource;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.springframework.stereotype.Service;
@Service
public class EmbeddedGraphServiceImpl implements EmbeddedGraphService {
@Resource private GraphDatabaseService graphDb;
@Override
public void addColumnVertex(ColumnVertex currentVertex, ColumnVertex upstreamVertex) {
try (Transaction tx = graphDb.beginTx()) {
tx.execute(
"MERGE (c:ColumnVertex {name: $currentName}) MERGE (u:ColumnVertex {name: $upstreamName})"
+ " MERGE (u)-[:UPSTREAM]->(c)",
Map.of("currentName", currentVertex.getName(), "upstreamName", upstreamVertex.getName()));
tx.commit();
}
}
@Override
public List<ColumnVertex> findUpstreamColumnVertex(ColumnVertex currentVertex) {
List<ColumnVertex> result = new ArrayList<>();
try (Transaction tx = graphDb.beginTx()) {
Result queryResult =
tx.execute(
"MATCH (u:ColumnVertex)-[:UPSTREAM]->(c:ColumnVertex) WHERE c.name = $name RETURN"
+ " u.name AS name",
Map.of("name", currentVertex.getName()));
while (queryResult.hasNext()) {
Map<String, Object> row = queryResult.next();
result.add(new ColumnVertex().setName((String) row.get("name")));
}
tx.commit();
}
return result;
}
@Override
public List<ColumnVertex> findDownstreamColumnVertex(ColumnVertex currentVertex) {
List<ColumnVertex> result = new ArrayList<&g