Preface

I work for a company that is a customer of the Databricks platform. However, this blog post represents only my personal view and opinion and is not related to any official position of my employer or my employer's dissatisfaction with the service provided by Databricks.

Follow-up

I had a very productive discussion with Martin Grund and Matthew Powers from Databricks. I can summarize their points in the following list:

  1. The main reason for pushing SparkConnect into DBR 14.x is security. Also, calls to the underlying JVM were already restricted in DBR 13.x, so it should not change anything dramatically. Long story short, the libraries listed below would most likely not work on shared clusters even in DBR 13.x;
  2. Spark developers are not going to reimplement all the py4j interfaces. Instead of providing a generic way to work with the underlying JVM, they would prefer to support migration of existing libraries to grpc;
  3. We will work together to fill the existing gap in documentation on how to extend the SparkConnect protocol;

If some of these points are not clear to you, I would recommend going deeper into the details of the blog post.

Introduction: Spark-Connect

When I saw the announcement of Spark Connect for the first time, I had very controversial feelings. From one side, developers of PySpark applications are finally getting the ability to use the latest minor version of Python and not depend on the long list of PySpark dependencies and JVM. But from the other side, I was thinking about how 3d-party PySpark libraries should work with it?

A common pattern of 3d-party PySpark packages

We all know that Python is not the fastest programming language, but it works very well as a glue for compiled languages like the JVM family. And this is exactly how PySpark worked before Spark Connect: we have a JVM when all the Spark routines are running and we have Py4j acting as a bridge. If you take a look at the SparkSession object, you can see the following private attributes:

self._sc = sparkContext
self._jsc = self._sc._jsc
self._jvm = self._sc._jvm

So, we are having an access to the underlying JVM and underlying JavaSparkContext object. Another important thing is an __init__ method of pyspark.sql.DataFrame object:

def __init__(
    self,
    jdf: JavaObject,
    sql_ctx: Union["SQLContext", "SparkSession"],
):

It allows developers to take a py4j.java_gateway.JavaObject that represents an org.apache.spark.sql.DataFrame JVM object and create from it a PySpark DataFrame. Back converting is even easier: pyspark.sql.DataFrame has an attribute _jdf that is a reference to the underlying org.apache.spark.sql.DataFrame object:

self._jdf: JavaObject = jdf

One may already understand the obvious pattern of creating a PySpark package:

  1. We are writing a core part of the package in Java/Scala with access to all Apache Spark APIs in fast compilable language
  2. We are creating a python library that just provide bindings to underlying JAR-package via py4j.

For example, recently I was happy to create such a PySpark wrapper for a new huge Graph storage format library: my PR with PySpark bindings to GraphAr scala package.

What was changed with SparkConnect?

The answer is quite obvious: because Spark Connect decouples the driver from the user's code, the user no longer has access to the underlying JVM. Of course, the topic of importance of py4j for 3d-party libs is very specific and many users of PySpark did not even notice the importance of this change. But for me it was the first question.

I will be honest if I say that I tried to find an alternative way for 3d-parties to interact with the underlying JVM with Spark Connect. But there is almost nothing about it in all the promotional material from Databricks and in the Apache Spark documentation. The only theoretically possible workaround I could find is this tiny Markdown page, hidden very deep in the source code of Apache Spark. Based on this, I can imagine that the Spark Connect protocol was designed to be extensible, but again: there is no documentation on how to do it!

What was changed in Databricks 14.x release and why it is an absolutely breaking change

Databricks 14.x introduced the following changes:

  • There is no longer a dependency on the JVM when querying Apache Spark and as a consequence, internal APIs related to the JVM, such as _jsc, _jconf, _jvm, _jsparkSession, _jreader, _jc, _jseq, _jdf, _jmap, and _jcols are no longer supported.

Of course, almost no one (including myself) checks the Release Notes of non-LTS releases of Databricks. And finally, at the beginning of February it happens: Databricks Runtime 14.3 LTS was released. Everyone, including me go and check the changes and see the same thing like in 14.0: _jvm, _jsc, _jsparkSession and _jdf are no longer available in Databricks Notebooks.

Such a change absolutely destroyed described above pattern of creating PySpark 3d-party packages…

Why is it important?

Of course, one may say: Ok, they break something, but no-one except you care about it because everything you need is inside Databricks and Apache Spark itself. Ok, lets see which libraries will be broken.

Microsoft Synapse ML (ex MMLSpark)

Synapse ML is a well know (4.9k stars) spark extension, focused on applying ML/DL on Apache Spark clusters. One may know it as MMLSpark. There core part of the library is written in Scala, but APIs for R, Python, #NET and Java are provided. If one make a look how a Python API is organized under the hood they would see the described above "py4j-pattern":

class DiscreteHyperParam(object):
    """
    Specifies a discrete list of values.
    """

    def __init__(self, values, seed=0):
        ctx = SparkContext.getOrCreate()
        self.jvm = ctx.getOrCreate()._jvm
        self.hyperParam = self.jvm.com.microsoft.azure.synapse.ml.automl.HyperParamUtils.getDiscreteHyperParam(
            values,
            seed,
        )

link to the code above

Due to the popularity of that library they already faced issues from Databricks users: [BUG] Databricks 14.3 LTS usage of internal _jvm variable is no longer supported #2167. And I have zero ideas how they are going to fix it because to make it work with Spark Connect they need to rewrite all the logic in pure Python/PySpark.

Amazon Deequ/PyDeequ

PyDeequ is a popular (625 stars) Data Quality library that is native to Apache Spark because its core is written in Scala. Again, if one make a look on how is it implemented under the hood they will see "py4j-pattern" again:

class _AnalyzerObject:
    """
    Analyzer base object to pass and accumulate the analyzers of the run with respect to the JVM
    """

    def _set_jvm(self, jvm):
        self._jvm = jvm
        return self

    @property
    def _deequAnalyzers(self):
        if self._jvm:
            return self._jvm.com.amazon.deequ.analyzers
        raise AttributeError(
            "JVM not set, please run _set_jvm() method first."
        )  # TODO: Test that this exception gets raised

link to the code above

Spark-NLP

Spark-NLP is one of the most popular (3.6k stars) way to run LLMs on Apache Spark clusters. Let's again go the source code and see how it works. Oops, looks like we found using of _jdf / _jvm again:

class RecursiveEstimator(JavaEstimator, ABC):

    def _fit_java(self, dataset, pipeline=None):
        self._transfer_params_to_java()
        if pipeline:
            return self._java_obj.recursiveFit(dataset._jdf, pipeline._to_java())
        else:
            return self._java_obj.fit(dataset._jdf)

link to the code above

Spark-extensions

spark-extensions is relative popular (155 stars) and actively maintained library, that contains a lot of small helpers and extensions of Apache Spark/PySpark. Under the hood its PySpark part is based on the "py4j-pattern" (yes, again):

func = sc._jvm.uk.co.gresearch.spark.__getattr__("package$").__getattr__("MODULE$").dotNetTicksToTimestamp

link to the code above

H2O Sparkling Water

Sparkling Water is an official way to run H2O models on Apache Spark cluster. Repository has 955 stars and is actively maintained. Under the hood one may again find "py4j-pattern" that is based on _jvm / _jdf:

class H2OTargetEncoderModel(H2OTargetEncoderMOJOParams, JavaModel, JavaMLWritable):

    def transform(self, dataset):
        callerFrame = inspect.stack()[1]
        inTrainingMode = (callerFrame[3] == '_fit') & callerFrame[1].endswith('pyspark/ml/pipeline.py')
        if inTrainingMode:
            return self.transformTrainingDataset(dataset)
        else:
            return super(H2OTargetEncoderModel, self).transform(dataset)

    def transformTrainingDataset(self, dataset):
        self._transfer_params_to_java()
        return DataFrame(self._java_obj.transformTrainingDataset(dataset._jdf), dataset.sql_ctx)

link to the code above

PayPal gimel

gimel is a quite popular (239 stars) framework that is built on top of Apache Spark. In the documentation they directly recommend to use "py4j-pattern":

# import DataFrame and SparkSession
from pyspark.sql import DataFrame, SparkSession, SQLContext

# fetch reference to the class in JVM
ScalaDataSet = sc._jvm.com.paypal.gimel.DataSet

# fetch reference to java SparkSession
jspark = spark._jsparkSession

# initiate dataset
dataset = ScalaDataSet.apply(jspark)

# Read Data | kafka semantics abstracted for user
df = dataset.read("kafka_dataset")

# Apply transformations (business logic | abstracted for Gimel)
transformed_df = df(...transformations...)

# Write Data | Elastic semantics abstracted for user
dataset.write("elastic_dataset",df)

link to the code above

HNSWlib-spark

HNSWlib is a quite popular (240 stars) and modern JVM library for an Approximate Nearest Neighbors Search. hnswlib-spark is an Apache Spark/PySpark wrapper on top of the main library. And under the hood PySpark part is partially based on a "py4j-pattern" by using SparkContext constructor:

def __init__(self):
    spark_conf = SparkConf()
    spark_conf.setAppName(spark_nlp_config.app_name)
    spark_conf.setMaster(spark_nlp_config.master)
    spark_conf.set("spark.driver.memory", memory)
    spark_conf.set("spark.serializer", spark_nlp_config.serializer)
    spark_conf.set("spark.kryo.registrator", spark_nlp_config.registrator)
    spark_conf.set("spark.jars.packages", spark_nlp_config.maven_spark)
    spark_conf.set("spark.hnswlib.settings.index.cache_folder", cache_folder)

    # Make the py4j JVM stdout and stderr available without buffering
    popen_kwargs = {
        'stdout': subprocess.PIPE,
        'stderr': subprocess.PIPE,
        'bufsize': 0
    }

    # Launch the gateway with our custom settings
    self.gateway = launch_gateway(conf=spark_conf, popen_kwargs=popen_kwargs)
    self.process = self.gateway.proc
    # Use the gateway we launched
    spark_context = SparkContext(gateway=self.gateway)
    self.spark_session = SparkSession(spark_context)

    self.out_thread = threading.Thread(target=self.output_reader)
    self.error_thread = threading.Thread(target=self.error_reader)
    self.std_background_listeners()

link to the code above

The Archives Unleashed Toolkit

AUT is a tool and a library to analyze Web Archives on Apache Spark clusters. Its PySpark part uses the same "py4j-pattern":

class WebArchive:
    def __init__(self, sc, sqlContext, path):
        self.sc = sc
        self.sqlContext = sqlContext
        self.loader = sc._jvm.io.archivesunleashed.df.DataFrameLoader(sc._jsc.sc())
        self.path = path

link to the code above

Apache Linkis

Linkis is a top-level Apache project (3.2k stars). It's PySpark part is heavily based on the same "py4j-pattern":

jsc = intp.getJavaSparkContext()
jconf = intp.getSparkConf()
conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
sqlc = HiveContext(sc, intp.sqlContext())
sqlContext = sqlc
spark = SparkSession(sc, intp.getSparkSession())

link to the code above

Spark-dgraph-connector

spark-dgraph-connector is an another project from G-Research. It's PySpark part uses the same "py4j-pattern":

class DgraphReader:
    def __init__(self, reader: DataFrameReader):
        super().__init__()
        self._jvm = reader._spark._jvm
        self._spark = reader._spark
        self._reader = self._jvm.uk.co.gresearch.spark.dgraph.connector.DgraphReader(reader._jreader)

link to the code above

GraphAr

And finally a project where I'm a contributor and maintainer of PySpark part: GraphAr. GraphAr is a novel way to store huge Graph data in DataLake or LakeHouse solutions. The whole PySpark part is based on "py4j-pattern". Mostly because Synapse ML and PyDeequ were main sources of inspiration for me when I worked on the implementation…

An endless amount of in-house solutions and libraries

I'm more than sure that many companies using Databricks have their own in-house helpers, libraries, etc. And I'm more than sure that a lot of these in-house projects rely on the same "py4j-pattern".

Discussion

The main question for me here is why is Databricks pushing Spark Connect so hard? I have always seen Databricks as a company founded by computer science rock stars and open source enthusiasts. I hope that the new policy of breaking 3d party libs in Databricks runtime and notebooks is just an incident and there will be an explanation soon. And I really hope that with such an action Databricks is not trying to force people to use only the built-in proprietary tools of the platform (like the recently announced data quality solution instead of PyDeequ). We all love Databricks because it is based on open source tools and because the company is so open to collaboration and integration.

I love the whole idea of Spark Connect. Many benefits of using it are obvious:

  1. Relax dependencies and requirements on user code;
  2. The ability to expose the Spark API to more programming languages (Golang, Rust, etc.);
  3. An ability to simplify integration with IDEs (JetBrains, VSCode, Vim, Emacs, etc.);
  4. A lot of other benefits…

The only problem is the speed with which Spark Connect is pushed by Databricks. In my opinion, in this case, Databricks should not just say something like "Guys, you used private stuff, there was no guarantee that it would work, so it is your and only your problem" to all 3d party project developers.