Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Arrow extension types in UDF evaluation #14

Closed

Conversation

jpolchlo
Copy link

@jpolchlo jpolchlo commented Dec 3, 2019

Pursuant to #13, I'm trying some experiments to gauge the feasibility of providing this functionality without needing to develop inside Spark proper.

The initial commit provides a raster_udf wrapper to supplant pandas_udf.

The procedure for working with this PR is as follows.

  1. Run sbt pySparkCmd. This will package up the relevant material and present a shell command that one can run to start pyspark, however, the result was faulty for me. See step 3.
  2. Install the development version of pyrasterframes:
pip install --upgrade /path/to/rasterframes/pyrasterframes/target/python/dist/pyrasterframes-0.8.4.dev0-py3-none-any.whl
  1. Modify command line to
PYSPARK_PYTHON=ipython PYTHONSTARTUP=<as supplied by SBT> pyspark \
--jars /path/to/rasterframes/pyrasterframes/target/scala-2.11/pyrasterframes-assembly-0.8.4-SNAPSHOT.jar \
--py-files /path/to/rasterframes/pyrasterframes/target/python/dist/pyrasterframes-0.8.4.dev0-py3-none-any.whl

It's best to confine these machinations to a virtual environment.

I'll use comments to update progress as I push up new functionality.

@jpolchlo
Copy link
Author

Current status: The infrastructure for custom execution of Python UDFs now exists, but testing is proving problematic. We need to register a set of custom extensions to catalyst. These are provided by this class.

On the Scala side, these extensions are registered using either the withExtensions method on SparkSession.Builder, or using the spark.sql.extensions conf parameter. The Python side of the story is sadly complicated. There is a PR which enhances the SparkSession constructor used by PySpark to utilize the spark.sql.extensions conf argument, but that PR has not been backported to the 2.3 or 2.4 lines. We'll need to compile a custom Spark build with this patch applied before we can test in PySpark.

On the positive side, I can load the required extensions in Scala using the conf parameter mechanism, but have no reasonable means to test purely in Scala.

On the evolution of Catalyst nodes

There is a variety of node types in different tree representations utilizing classes in both languages that wants some explanation.

  1. [PYTHON] UserDefinedRasterFunction (invoked from raster_udf) contains a py4j reference to [SCALA] UserDefinedRasterFunction. (All class mentions below are on the Scala side.)
  2. This generates a PythonRasterUDF Expression instance (spark sql column function). Expression nodes comprise the Logical Plan.
  3. PythonRasterUDF is an Unevaluable node, so must be transformed via a logical plan optimization step provided by the ExtractRasterUDFs Rule which ultimately converts to...
  4. RasterEvalPython nodes which are (I think?) tree nodes in the query planner
    which the RasterUDFStrategy converts to RasterEvalPythonExec (SparkPlan) nodes in the physical plan.
  5. RasterEvalPythonExec overrides EvalPythonExec's evaluate machinery to rely on ArrowPythonRunner, which leans on our borrowed implementations of ArrowUtils and ArrowConverters (which will eventually handle the extension type stuff).

For now, this will delegate to the default worker.py to do the actual processing in python of the arrow batches, but we will eventually need to reimplement that worker module as well (which will probably require us to interdict on this line of BasePythonRunner in order to delegate to our custom python worker).

@jpolchlo
Copy link
Author

For posterity, this test code snippet will fail due to a mismatch of arrow versions between Spark's Scala implementation of the pandas UDF code and the python arrow version required for extension types (>= 0.15.0):

from pyspark.sql.functions import PandasUDFType, col, pandas_udf        
from pyspark import Row                                                 

ref_fn = pandas_udf(lambda v: v+1, 'double', PandasUDFType.SCALAR)      
R = Row('i')                                                            
df = spark.createDataFrame([R(i) for i in [1,2,3,4]])                   
df_test = df.select(col("i"), ref_fn(col("i")))                         
df_test.show()                                                          

The following extension should work:

from pyrasterframes.udf import raster_udf                               

test_fn = raster_udf(lambda v: v+1, 'double', PandasUDFType.SCALAR)     
raster_test = df.select(col("i"), test_fn(col("i")))                   
raster_test.show()                                                     

(For now, this fails because of the PySpark deficiency mentioned above.)

@jpolchlo
Copy link
Author

There are some deficiencies in my understanding of the transformation process used by pandas_udf. The primary difference is that it appears that the catalyst tree is being populated by an unexpected object: a python lambda (contained in a functools.wrapper). It seems that the Python UserDefinedFunction is not being added directly. At some point later on, this function wrapper is being rewritten into a Python UDF, and I have not been able to discover the point at which that happens. Without being able to intercede on this pathway, we won't be able to finish this work.

We're mostly out of time on this, but I stand by the idea that this is eminently possible. We can return to this task later, but for accounting and cleanliness purposes, this issue will be closed. with any luck, it will be reopened at a later date.

@jpolchlo jpolchlo closed this Jan 14, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant