[SEDONA-721] Add Sedona vectorized udf for Python#1859
Conversation
|
need to adjust it before I ll reopen it again |
|
need to add docs for this one |
|
|
||
| val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) else Iterator(iter) | ||
|
|
||
| val columnarBatchIter = new ArrowPythonRunner( |
There was a problem hiding this comment.
Not a battle for this particular PR, but do we get to choose what the Python function is evaluating on or are we leaning on built-in Spark things such that we are forced to have this be a function of a pandas series? (if it could be a function of, for example, two numpy arrays for points or Arrow buffers more generally, it would open up some options in terms of speed).
There was a problem hiding this comment.
I am not super opting this solution as well. I just wanted to unlock the arrow based udf in Sedona. I totally agree that we can do better. Right now based on my internal tests it's 2 times faster than normal udf.
There was a problem hiding this comment.
Awesome! At some point my Spark/Scala will be good enough to see if there's any room to improve on that 🙂
There was a problem hiding this comment.
I would like to help on that 🙇
186feae to
131622a
Compare
|
@jiayuasu, please let me know what you think. Maybe we should turn this off by default and make it experimental? |
| for i in range(5): | ||
| start = time() | ||
| area1 = self.get_area(df, vectorized_buffer) | ||
|
|
||
| assert area1 > 478 | ||
|
|
||
| vectorized_times.append(time() - start) | ||
|
|
||
| area2 = self.get_area(df, buffer_distanced_udf) | ||
|
|
||
| assert area2 > 478 | ||
|
|
||
| non_vectorized_times.append(time() - start) |
There was a problem hiding this comment.
We are not resetting start before calling buffer_distanced_udf, the non_vectorized_time will be the total time of calling vectorized_buffer and buffer_distanced_udf.
I also wonder if this test could be flaky because the size of dataset is not large enough to exhibit the performance advantage of Arrow UDF.
There was a problem hiding this comment.
Yeah, that's a good point; I missed the start here. For sure, for larger datasets, there is an improvement; maybe we don't need the test at all to test the performance. Maybe having benchmark tests only when releasing is a good idea?
There was a problem hiding this comment.
Agreed. Let's remove it from the unit test.
| def non_vectorized_buffer_udf(geom: b.BaseGeometry) -> b.BaseGeometry: | ||
| return geom.buffer(0.1) | ||
|
|
||
|
|
||
| @sedona_vectorized_udf() | ||
| def vectorized_buffer(geom: b.BaseGeometry) -> b.BaseGeometry: | ||
| return geom.buffer(0.1) |
There was a problem hiding this comment.
Rename vectorized_buffer to vectorized_buffer_udf to be consistent with non_vectorized_buffer_udf.
|
|
||
| import scala.collection.mutable | ||
|
|
||
| class ExtractSedonaUDFRule extends Rule[LogicalPlan] { |
There was a problem hiding this comment.
Can you add comments to this class declaring how this rule is different from org.apache.spark.sql.execution.python.ExtractPythonUDFs?
|
|
||
| import scala.collection.JavaConverters.asScalaIteratorConverter | ||
|
|
||
| //import scala.jdk.CollectionConverters.asScalaIteratorConverter |
| package org.apache.sedona.sql.UDF | ||
|
|
||
| object PythonEvalType { | ||
| val SQL_SCALAR_SEDONA_UDF = 5200 |
There was a problem hiding this comment.
Let's add comment to clarify that 5200 is SEDONA_UDF_TYPE_CONSTANT + SQL_SCALAR_PANDAS_UDF, where SQL_SCALAR_PANDAS_UDF is 200.
| case class SedonaArrowEvalPythonExec( | ||
| udfs: Seq[PythonUDF], | ||
| resultAttrs: Seq[Attribute], | ||
| child: SparkPlan, | ||
| evalType: Int) | ||
| extends EvalPythonExec | ||
| with PythonSQLMetrics { |
There was a problem hiding this comment.
Can you add comments declaring how it is different from ArrowEvalPythonExec? I can see that we omitted the checks on the output types.
| ### Shapely scalar UDF | ||
|
|
||
| ```python | ||
| import shapely.geometry.base as b | ||
| from sedona.sql.functions import sedona_vectorized_udf, SedonaUDFType | ||
|
|
||
| @sedona_vectorized_udf() | ||
| def vectorized_buffer(geom: b.BaseGeometry) -> b.BaseGeometry: | ||
| return geom.buffer(0.1) | ||
| ``` | ||
|
|
||
| ### GeoSeries UDF | ||
|
|
||
| ```python | ||
| import geopandas as gpd | ||
| from sedona.sql.functions import sedona_vectorized_udf, SedonaUDFType | ||
|
|
||
| @sedona_vectorized_udf(udf_type=SedonaUDFType.GEO_SERIES) | ||
| def vectorized_geo_series_buffer(series: gpd.GeoSeries) -> gpd.GeoSeries: | ||
| buffered = series.buffer(0.1) | ||
|
|
||
| return buffered | ||
| ``` |
There was a problem hiding this comment.
Do we support other variants of UDFs involving geometries?
- UDFs taking geometry as input and returning a numeric value
- UDFs taking a numeric value and returning a geometry object
- UDFs that has more than 1 parameters, has geometry as parameter type or return type
There was a problem hiding this comment.
Let me verify those; my plan is to add new functionalities later, like table functions or agg functions.
There was a problem hiding this comment.
Looks I messed up a little, now I fixed it and tested
There was a problem hiding this comment.
I am not sure if this is going to work, as more than 1 column is not scala, and I didn't intend to add this in this MR; I am planning to add this later.
|
I think it is OK to have this PR as a workaround for supporting UDFs involving geometry types. The final solution is to push SPARK-34771 and apache/spark#31735 forward to make Spark supports UDFs involving UDTs generally. |
Yes, I agree. This MR aims to unlock arrow udfs for geometry type. I thought of moving this forward with custom geopandas or duckdb runners based on Geoarrow. That's why I added new Python functions instead of reusing the existing pandas_udf from spark. |
|
|
||
| import spark.implicits._ | ||
|
|
||
| test("Chained Scalar Pandas UDFs should be combined to a single physical node") { |
There was a problem hiding this comment.
What is the purpose of this test? Are we testing the physical plan? I don't see any assertion if it is supposed to test on the physical plan tree node.
There was a problem hiding this comment.
sry, I changed the name of the test
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.spark.sql.udf |
There was a problem hiding this comment.
Any reason why this is put under the spark-3.5 folder? Is it version specific code?
There was a problem hiding this comment.
yes, the one only is working with Spark 3.5
Co-authored-by: Kristin Cowalcijk <kontinuation@apache.org>
e2bccb7 to
6277cbd
Compare
| Let's analyze the two examples below, that creates buffers from | ||
| a given geometry. | ||
|
|
||
| Make sure |
There was a problem hiding this comment.
Can you fix the sentence here?
| @@ -0,0 +1,126 @@ | |||
| import inspect | |||
There was a problem hiding this comment.
I didn't realize that it is not added automatically
| @@ -0,0 +1,213 @@ | |||
| from sedona.sql.types import GeometryType | |||
* SEDONA-721 Add Sedona vectorized udf. * SEDONA-721 Add documentation * SEDONA-721 Add documentation * SEDONA-721 Add documentation * Update .github/workflows/java.yml Co-authored-by: Kristin Cowalcijk <kontinuation@apache.org> * SEDONA-721 Apply requested changes. * SEDONA-721 Apply requested changes. * SEDONA-721 Apply requested changes. * SEDONA-721 Apply requested changes. * SEDONA-721 Apply requested changes. * SEDONA-721 Apply requested changes. * SEDONA-721 Apply requested changes. * SEDONA-721 Apply requested changes. --------- Co-authored-by: Kristin Cowalcijk <kontinuation@apache.org>
Did you read the Contributor Guide?
Is this PR related to a ticket?
SEDONA-721.What changes were proposed in this PR?
Sedona vectorized geometry udfs (scalar only now)
How was this patch tested?
unit tests
Did this PR include necessary documentation updates?
yes