Skip to content

Commit 9e1b882

Browse files
authored
Materialized views refresher (#915)
Adds some utilities to refresh materialized views
1 parent 8d858f1 commit 9e1b882

File tree

35 files changed

+2251
-2265
lines changed

35 files changed

+2251
-2265
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.baremaps.tasks;
19+
20+
import java.sql.SQLException;
21+
import javax.sql.DataSource;
22+
import org.apache.baremaps.postgres.refresh.DatabaseMetadataRetriever;
23+
import org.apache.baremaps.postgres.refresh.DependencyGraphBuilder;
24+
import org.apache.baremaps.postgres.refresh.MaterializedViewRefresher;
25+
import org.apache.baremaps.workflow.Task;
26+
import org.apache.baremaps.workflow.WorkflowContext;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
public class RefreshMaterializedViews implements Task {
31+
32+
private static final Logger LOGGER = LoggerFactory.getLogger(RefreshMaterializedViews.class);
33+
34+
private Object database;
35+
36+
public RefreshMaterializedViews() {
37+
// Default constructor
38+
}
39+
40+
public RefreshMaterializedViews(Object database) {
41+
this.database = database;
42+
}
43+
44+
@Override
45+
public void execute(WorkflowContext context) throws Exception {
46+
DataSource dataSource = context.getDataSource(database);
47+
try (var connection = dataSource.getConnection()) {
48+
LOGGER.info("Connected to PostgreSQL database.");
49+
50+
// Get the schema of the database.
51+
var schema = connection.getSchema();
52+
53+
// Retrieve database objects (tables, views, materialized views).
54+
var objects = DatabaseMetadataRetriever.getObjects(connection, schema);
55+
56+
// Retrieve dependencies between database objects.
57+
var dependencies = DatabaseMetadataRetriever.getDependencies(connection, schema, objects);
58+
59+
// Build a directed graph of dependencies between the database objects.
60+
var graph = DependencyGraphBuilder.buildGraph(objects, dependencies);
61+
62+
// Perform a topological sort so that dependencies come before dependents.
63+
var sorted = DependencyGraphBuilder.topologicalSort(graph);
64+
65+
// Refresh materialized views, dropping and recreating indexes if present.
66+
MaterializedViewRefresher.refreshMaterializedViews(connection, sorted);
67+
68+
LOGGER.info("Done refreshing materialized views.");
69+
} catch (SQLException ex) {
70+
LOGGER.error("Database error", ex);
71+
}
72+
}
73+
}

baremaps-core/src/main/java/org/apache/baremaps/workflow/Task.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
@Type(value = ImportOsmPbf.class, name = "ImportOsmPbf"),
5353
@Type(value = ImportShapefile.class, name = "ImportShapefile"),
5454
@Type(value = LogMessage.class, name = "LogMessage"),
55+
@Type(value = RefreshMaterializedViews.class, name = "RefreshMaterializedViews"),
5556
@Type(value = UpdateOsmDatabase.class, name = "UpdateOsmDatabase"),
5657
})
5758
public interface Task {
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.baremaps.postgres.refresh;
19+
20+
import java.sql.Connection;
21+
import java.sql.SQLException;
22+
import java.util.ArrayList;
23+
import java.util.HashMap;
24+
import java.util.List;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
/**
29+
* Utility class to retrieve metadata about tables, views, and materialized views.
30+
*/
31+
public class DatabaseMetadataRetriever {
32+
33+
private static final Logger LOGGER =
34+
LoggerFactory.getLogger(DatabaseMetadataRetriever.class.getName());
35+
36+
private DatabaseMetadataRetriever() {
37+
// Prevent instantiation
38+
}
39+
40+
/**
41+
* Retrieves all tables, views, and materialized views in the given schema.
42+
*
43+
* @param connection the database connection
44+
* @param schema the schema name
45+
* @return a list of DatabaseObject records
46+
* @throws SQLException if a database error occurs
47+
*/
48+
public static List<DatabaseObject> getObjects(Connection connection, String schema)
49+
throws SQLException {
50+
var result = new ArrayList<DatabaseObject>();
51+
52+
var sql = """
53+
SELECT c.oid,
54+
c.relname,
55+
c.relkind,
56+
n.nspname
57+
FROM pg_class c
58+
JOIN pg_namespace n ON n.oid = c.relnamespace
59+
WHERE n.nspname = ?
60+
AND c.relkind IN ('r', 'v', 'm')
61+
""";
62+
63+
try (var ps = connection.prepareStatement(sql)) {
64+
ps.setString(1, schema);
65+
try (var rs = ps.executeQuery()) {
66+
while (rs.next()) {
67+
var oid = rs.getLong("oid");
68+
var relName = rs.getString("relname");
69+
var relKind = rs.getString("relkind");
70+
var nspName = rs.getString("nspname");
71+
72+
var objectType = switch (relKind) {
73+
case "r" -> ObjectType.TABLE;
74+
case "v" -> ObjectType.VIEW;
75+
case "m" -> ObjectType.MATERIALIZED_VIEW;
76+
default -> null;
77+
};
78+
79+
var dbObj = new DatabaseObject(nspName, relName, objectType);
80+
result.add(dbObj);
81+
}
82+
}
83+
}
84+
85+
LOGGER.info("Found " + result.size() + " objects in schema " + schema);
86+
return result;
87+
}
88+
89+
/**
90+
* Retrieves dependencies between database objects in the given schema.
91+
*
92+
* @param connection the database connection
93+
* @param schema the schema name
94+
* @param objects a list of database objects
95+
* @return a list of DatabaseDependency records
96+
* @throws SQLException if a database error occurs
97+
*/
98+
public static List<DatabaseDependency> getDependencies(Connection connection, String schema,
99+
List<DatabaseObject> objects) throws SQLException {
100+
var sql = """
101+
SELECT dependent_ns.nspname AS dependent_schema,
102+
dependent_c.relname AS dependent_name,
103+
source_ns.nspname AS source_schema,
104+
source_c.relname AS source_name
105+
FROM pg_depend d
106+
JOIN pg_rewrite r
107+
ON r.oid = d.objid
108+
JOIN pg_class dependent_c
109+
ON r.ev_class = dependent_c.oid
110+
JOIN pg_namespace dependent_ns
111+
ON dependent_c.relnamespace = dependent_ns.oid
112+
JOIN pg_class source_c
113+
ON d.refobjid = source_c.oid
114+
JOIN pg_namespace source_ns
115+
ON source_c.relnamespace = source_ns.oid
116+
WHERE dependent_ns.nspname = ?
117+
AND source_ns.nspname = ?
118+
""";
119+
120+
// Create a fast lookup by (schema + name).
121+
var lookupMap = new HashMap<String, DatabaseObject>();
122+
for (var obj : objects) {
123+
var key = obj.schemaName() + "." + obj.objectName();
124+
lookupMap.put(key, obj);
125+
}
126+
127+
var result = new ArrayList<DatabaseDependency>();
128+
try (var ps = connection.prepareStatement(sql)) {
129+
ps.setString(1, schema);
130+
ps.setString(2, schema);
131+
try (var rs = ps.executeQuery()) {
132+
while (rs.next()) {
133+
var dependentSchema = rs.getString("dependent_schema");
134+
var dependentName = rs.getString("dependent_name");
135+
var sourceSchema = rs.getString("source_schema");
136+
var sourceName = rs.getString("source_name");
137+
138+
var dependentKey = dependentSchema + "." + dependentName;
139+
var sourceKey = sourceSchema + "." + sourceName;
140+
141+
var dependentObj = lookupMap.get(dependentKey);
142+
var sourceObj = lookupMap.get(sourceKey);
143+
144+
if (dependentObj != null && sourceObj != null) {
145+
// Skip self-loop dependencies.
146+
if (!dependentObj.equals(sourceObj)) {
147+
result.add(new DatabaseDependency(sourceObj, dependentObj));
148+
}
149+
}
150+
}
151+
}
152+
}
153+
return result;
154+
}
155+
156+
/**
157+
* Retrieves indexes for the given table or materialized view.
158+
*
159+
* @param connection the database connection
160+
* @param schema the schema name
161+
* @param tableName the table or materialized view name
162+
* @return a list of DatabaseIndex records
163+
* @throws SQLException if a database error occurs
164+
*/
165+
public static List<DatabaseIndex> getIndexes(Connection connection, String schema,
166+
String tableName) throws SQLException {
167+
var sql = """
168+
SELECT indexname, indexdef
169+
FROM pg_indexes
170+
WHERE schemaname = ?
171+
AND tablename = ?
172+
""";
173+
174+
var result = new ArrayList<DatabaseIndex>();
175+
try (var ps = connection.prepareStatement(sql)) {
176+
ps.setString(1, schema);
177+
ps.setString(2, tableName);
178+
try (var rs = ps.executeQuery()) {
179+
while (rs.next()) {
180+
var indexName = rs.getString("indexname");
181+
var indexDef = rs.getString("indexdef");
182+
result.add(new DatabaseIndex(indexName, indexDef));
183+
}
184+
}
185+
}
186+
return result;
187+
}
188+
189+
/**
190+
* Represents the type of database object.
191+
*/
192+
public enum ObjectType {
193+
TABLE,
194+
VIEW,
195+
MATERIALIZED_VIEW
196+
}
197+
198+
/**
199+
* A record representing a database object (table, view, materialized view).
200+
*/
201+
public record DatabaseObject(
202+
String schemaName,
203+
String objectName,
204+
ObjectType objectType) {
205+
}
206+
207+
/**
208+
* Record representing a dependency between two database objects.
209+
*/
210+
public record DatabaseDependency(DatabaseObject source, DatabaseObject dependent) {
211+
}
212+
213+
214+
/**
215+
* Record representing a database index.
216+
*/
217+
public record DatabaseIndex(String indexName, String indexDef) {
218+
}
219+
220+
}

0 commit comments

Comments
 (0)