Skip to content

Commit 397b673

Browse files
authored
[spark] Add v1 function support to SparkCatalog (#6075)
1 parent 619a206 commit 397b673

File tree

42 files changed

+1316
-75
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1316
-75
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ target
1212
*.iml
1313
*.swp
1414
*.jar
15+
!**/resources/**/*.jar
1516
*.log
1617
*.pyc
1718
*.ipr

docs/content/concepts/functions.md

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -84,37 +84,6 @@ DROP FUNCTION mydb.parse_str;
8484

8585
This statement deletes the existing `parse_str` function from the `mydb` database, relinquishing its functionality.
8686

87-
## Lambda Function Usage in Spark
87+
## Functions in Spark
8888

89-
### Create Function
90-
91-
```sql
92-
-- Spark SQL
93-
CALL sys.create_function(`function` => 'my_db.area_func',
94-
`inputParams` => '[{"id": 0, "name":"length", "type":"INT"}, {"id": 1, "name":"width", "type":"INT"}]',
95-
`returnParams` => '[{"id": 0, "name":"area", "type":"BIGINT"}]',
96-
`deterministic` => true,
97-
`comment` => 'comment',
98-
`options` => 'k1=v1,k2=v2'
99-
);
100-
```
101-
102-
### Alter Function
103-
104-
```sql
105-
-- Spark SQL
106-
CALL sys.alter_function(`function` => 'my_db.area_func',
107-
`change` => '{"action" : "addDefinition", "name" : "spark", "definition" : {"type" : "lambda", "definition" : "(Integer length, Integer width) -> { return (long) length * width; }", "language": "JAVA" } }'
108-
);
109-
```
110-
```sql
111-
-- Spark SQL
112-
select paimon.my_db.area_func(1, 2);
113-
```
114-
115-
### Drop Function
116-
117-
```sql
118-
-- Spark SQL
119-
CALL sys.drop_function(`function` => 'my_db.area_func');
120-
```
89+
see [SQL Functions]({{< ref "spark/sql-functions#user-defined-function" >}})

docs/content/spark/sql-functions.md

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ under the License.
2929
This section introduce all available Paimon Spark functions.
3030

3131

32-
## max_pt
32+
## Built-in Function
33+
34+
### max_pt
3335

3436
`max_pt($table_name)`
3537

@@ -44,12 +46,69 @@ It would throw exception when:
4446

4547
**Example**
4648

47-
```shell
48-
> SELECT max_pt('t');
49-
20250101
49+
```sql
50+
SELECT max_pt('t');
51+
-- 20250101
5052

51-
> SELECT * FROM t where pt = max_pt('t');
52-
a, 20250101
53+
SELECT * FROM t where pt = max_pt('t');
54+
-- a, 20250101
5355
```
5456

5557
**Since: 1.1.0**
58+
59+
## User-defined Function
60+
61+
Paimon Spark supports two types of user-defined functions: lambda functions and file-based functions.
62+
63+
This feature currently only supports the REST catalog.
64+
65+
### Lambda Function
66+
67+
Empowering users to define functions using Java lambda expressions, enabling inline, concise, and functional-style operations.
68+
69+
**Example**
70+
71+
```sql
72+
-- Create Function
73+
CALL sys.create_function(`function` => 'my_db.area_func',
74+
`inputParams` => '[{"id": 0, "name":"length", "type":"INT"}, {"id": 1, "name":"width", "type":"INT"}]',
75+
`returnParams` => '[{"id": 0, "name":"area", "type":"BIGINT"}]',
76+
`deterministic` => true,
77+
`comment` => 'comment',
78+
`options` => 'k1=v1,k2=v2'
79+
);
80+
81+
-- Alter Function
82+
CALL sys.alter_function(`function` => 'my_db.area_func',
83+
`change` => '{"action" : "addDefinition", "name" : "spark", "definition" : {"type" : "lambda", "definition" : "(Integer length, Integer width) -> { return (long) length * width; }", "language": "JAVA" } }'
84+
);
85+
86+
-- Drop Function
87+
CALL sys.drop_function(`function` => 'my_db.area_func');
88+
```
89+
90+
### File Function
91+
92+
Users can define functions within a file, providing flexibility and modular support for function definition, only supports jar files now.
93+
94+
This feature requires Spark 3.4 or higher.
95+
96+
**Example**
97+
98+
```sql
99+
-- Create Function
100+
CREATE FUNCTION mydb.simple_udf
101+
AS 'com.example.SimpleUdf'
102+
USING JAR '/tmp/SimpleUdf.jar' [, JAR '/tmp/SimpleUdfR.jar'];
103+
104+
-- Create or Replace Function
105+
CREATE OR REPLACE FUNCTION mydb.simple_udf
106+
AS 'com.example.SimpleUdf'
107+
USING JAR '/tmp/SimpleUdf.jar';
108+
109+
-- Describe Function
110+
DESCRIBE FUNCTION [EXTENDED] mydb.simple_udf;
111+
112+
-- Drop Function
113+
DROP FUNCTION mydb.simple_udf;
114+
```

docs/layouts/shortcodes/generated/spark_catalog_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,11 @@
3838
<td>String</td>
3939
<td>The default database name.</td>
4040
</tr>
41+
<tr>
42+
<td><h5>v1Function.enabled</h5></td>
43+
<td style="word-wrap: break-word;">true</td>
44+
<td>Boolean</td>
45+
<td>Whether to enable v1 function.</td>
46+
</tr>
4147
</tbody>
4248
</table>

paimon-api/src/main/java/org/apache/paimon/function/Function.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.function;
2020

21+
import org.apache.paimon.catalog.Identifier;
2122
import org.apache.paimon.types.DataField;
2223

2324
import java.util.List;
@@ -31,6 +32,8 @@ public interface Function {
3132

3233
String fullName();
3334

35+
Identifier identifier();
36+
3437
Optional<List<DataField>> inputParams();
3538

3639
Optional<List<DataField>> returnParams();

paimon-api/src/main/java/org/apache/paimon/function/FunctionImpl.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,34 +21,38 @@
2121
import org.apache.paimon.catalog.Identifier;
2222
import org.apache.paimon.types.DataField;
2323

24+
import javax.annotation.Nullable;
25+
26+
import java.util.Collections;
2427
import java.util.List;
2528
import java.util.Map;
29+
import java.util.Objects;
2630
import java.util.Optional;
2731

2832
/** Function implementation. */
2933
public class FunctionImpl implements Function {
3034

3135
private final Identifier identifier;
3236

33-
private final List<DataField> inputParams;
37+
@Nullable private final List<DataField> inputParams;
3438

35-
private final List<DataField> returnParams;
39+
@Nullable private final List<DataField> returnParams;
3640

3741
private final boolean deterministic;
3842

3943
private final Map<String, FunctionDefinition> definitions;
4044

41-
private final String comment;
45+
@Nullable private final String comment;
4246

4347
private final Map<String, String> options;
4448

4549
public FunctionImpl(
4650
Identifier identifier,
47-
List<DataField> inputParams,
48-
List<DataField> returnParams,
51+
@Nullable List<DataField> inputParams,
52+
@Nullable List<DataField> returnParams,
4953
boolean deterministic,
5054
Map<String, FunctionDefinition> definitions,
51-
String comment,
55+
@Nullable String comment,
5256
Map<String, String> options) {
5357
this.identifier = identifier;
5458
this.inputParams = inputParams;
@@ -66,7 +70,7 @@ public FunctionImpl(Identifier identifier, Map<String, FunctionDefinition> defin
6670
this.deterministic = true;
6771
this.definitions = definitions;
6872
this.comment = null;
69-
this.options = null;
73+
this.options = Collections.emptyMap();
7074
}
7175

7276
@Override
@@ -79,6 +83,10 @@ public String fullName() {
7983
return identifier.getFullName();
8084
}
8185

86+
public Identifier identifier() {
87+
return identifier;
88+
}
89+
8290
@Override
8391
public Optional<List<DataField>> inputParams() {
8492
return Optional.ofNullable(inputParams);
@@ -113,4 +121,31 @@ public String comment() {
113121
public Map<String, String> options() {
114122
return options;
115123
}
124+
125+
@Override
126+
public boolean equals(Object o) {
127+
if (o == null || getClass() != o.getClass()) {
128+
return false;
129+
}
130+
FunctionImpl function = (FunctionImpl) o;
131+
return deterministic == function.deterministic
132+
&& Objects.equals(identifier, function.identifier)
133+
&& Objects.equals(inputParams, function.inputParams)
134+
&& Objects.equals(returnParams, function.returnParams)
135+
&& Objects.equals(definitions, function.definitions)
136+
&& Objects.equals(comment, function.comment)
137+
&& Objects.equals(options, function.options);
138+
}
139+
140+
@Override
141+
public int hashCode() {
142+
return Objects.hash(
143+
identifier,
144+
inputParams,
145+
returnParams,
146+
deterministic,
147+
definitions,
148+
comment,
149+
options);
150+
}
116151
}

paimon-spark/paimon-spark-3.2/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,18 @@ under the License.
6161
<version>${spark.version}</version>
6262
</dependency>
6363

64+
<dependency>
65+
<groupId>org.apache.spark</groupId>
66+
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
67+
<version>${spark.version}</version>
68+
</dependency>
69+
70+
<dependency>
71+
<groupId>org.apache.spark</groupId>
72+
<artifactId>spark-hive_${scala.binary.version}</artifactId>
73+
<version>${spark.version}</version>
74+
</dependency>
75+
6476
<dependency>
6577
<groupId>org.apache.paimon</groupId>
6678
<artifactId>paimon-bundle</artifactId>
@@ -114,6 +126,12 @@ under the License.
114126
<scope>test</scope>
115127
</dependency>
116128

129+
<dependency>
130+
<groupId>com.squareup.okhttp3</groupId>
131+
<artifactId>mockwebserver</artifactId>
132+
<version>${okhttp.version}</version>
133+
<scope>test</scope>
134+
</dependency>
117135
</dependencies>
118136

119137
<build>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.spark.sql.catalyst.parser.extensions
20+
21+
import org.apache.spark.sql.SparkSession
22+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
23+
import org.apache.spark.sql.catalyst.rules.Rule
24+
25+
case class RewritePaimonFunctionCommands(spark: SparkSession) extends Rule[LogicalPlan] {
26+
27+
// do nothing
28+
override def apply(plan: LogicalPlan): LogicalPlan = plan
29+
}

paimon-spark/paimon-spark-3.3/pom.xml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,18 @@ under the License.
6161
<version>${spark.version}</version>
6262
</dependency>
6363

64+
<dependency>
65+
<groupId>org.apache.spark</groupId>
66+
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
67+
<version>${spark.version}</version>
68+
</dependency>
69+
70+
<dependency>
71+
<groupId>org.apache.spark</groupId>
72+
<artifactId>spark-hive_${scala.binary.version}</artifactId>
73+
<version>${spark.version}</version>
74+
</dependency>
75+
6476
<dependency>
6577
<groupId>org.apache.paimon</groupId>
6678
<artifactId>paimon-bundle</artifactId>
@@ -113,6 +125,13 @@ under the License.
113125
<version>${spark.version}</version>
114126
<scope>test</scope>
115127
</dependency>
128+
129+
<dependency>
130+
<groupId>com.squareup.okhttp3</groupId>
131+
<artifactId>mockwebserver</artifactId>
132+
<version>${okhttp.version}</version>
133+
<scope>test</scope>
134+
</dependency>
116135
</dependencies>
117136

118137
<build>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.spark.sql.catalyst.parser.extensions
20+
21+
import org.apache.spark.sql.SparkSession
22+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
23+
import org.apache.spark.sql.catalyst.rules.Rule
24+
25+
case class RewritePaimonFunctionCommands(spark: SparkSession) extends Rule[LogicalPlan] {
26+
27+
// do nothing
28+
override def apply(plan: LogicalPlan): LogicalPlan = plan
29+
}

0 commit comments

Comments
 (0)