Skip to main content

Data & Intelligence

Understanding the role of Py4J in Databricks

I mentioned that my attempt to implement TDD with Databricks was not totally successful. Setting up the local environment was not a problem and getting a service id for CI/CD component was more of an administrative than a technical problem. Using mocks to test python objects that are serialized to Spark is actually the issue. This is a feature, not a bug. Let’s talk about how how Python and Spark work together.

PySpark Under the Hood

Apache Spark is written in Scala and Java. Both programming languages run in the Java Virtual Machine (JVM). Python runs in an interpreter, not a JVM.  PySpark allows developers to use Python to run Spark jobs by leveraging Py4J.

Py4J enables Python programs running in a Python interpreter to dynamically access Java objects in a Java Virtual Machine. Methods are called as if the Java objects resided in the Python interpreter and Java collections can be accessed through standard Python collection methods. Py4J also enables Java programs to call back Python objects.

PySpark can use Spark as an engine to submit and compute jobs. A JVM is launched when you created and initialize a Spark session with Pyspark (pyspark.SparkContext or pyspark.sql.SparkSession). PySpark is using Py4J on the driver side to communicate with that JVM instance. Python workers are lazily launched when Python native functions need to be mapped to their Java counterparts. This works for all the standard Python objects. This works so well that most developers don’t even need to think about how or why this works. I didn’t either until my adventure with TDD in a local environment.

OOP and PySpark

In my prior post, I explained that I needed to handle logging and observability in Databricks rather than Azure and Databricks does not have native functionality for this. No problem. First, I created some custom Exceptions so the client could easily identify responsibility in case of an issue (the API owner, the Databricks team or us). This is literally all the code:

class APIError(Exception):
    """Exception raised for errors in the API call."""
    
class ValidationError(Exception):
    """Exception raised for validation errors."""

class DataError(Exception):
    """Exception raised for errors in data processing."""
Data Intelligence - The Future of Big Data
The Future of Big Data

With some guidance, you can craft a data platform that is right for your organization’s needs and gets the most return from your data capital.

Get the Guide

Next was a Logging class that could take either a string or an exception and then create either an info- or an error-level log entry and store a record in a delta table:

def log_message(message: Optional[str] = None, exception: Optional[Exception] = None) -> None:

        ... some simple code to handle either a string or an exception ...

        spark = SparkSession.builder.appName(APP_NAME).getOrCreate()

        df = spark.createDataFrame(
            [(datetime.now(), logger.name, log_level.upper(), log_content)],
            schema="timestamp timestamp, calling_method string, log_level string, message string"
        )


        df.write.format("delta").mode("append").saveAsTable("default.logging")

Logging errors from other python scripts was pretty straightforward:

from ..utils.custom_exceptions import ValidationError, APIError
from ..utils.custom_logging import log_message

def some_validation_method(start_date, end_date)
    try {
        if start_date >= end_date:
            raise ValidationError("Start date must be before end date")
    except ValidationError as e:
        log_message(exception=e)

I created unit tests using mocks. It was very time consuming because mocking Spark is not easy, but eventually all the tests passed and I had complete code coverage. Honestly, I was feeling pretty good about the code. It was simple and straightforward enough. I went into the Databricks UI to run notebook I built for monitoring to add some dummy records to the logging table and check out the UX.

Pickle Problems

org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 38.0 failed 4 times, most recent failure: Lost task 3.3 in stage 38.0 (TID 55) (10.139.64.6 executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for main.utils.custom_exceptions.APIError). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class. at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:759) at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:199) at net.razorvine.pickle.Unpickler.load(Unpickler.java:109) at net.razorvine.pickle.Unpickler.loads(Unpickler.java:122) at org.apache.spark.api.python.SerDeUtil$.$anonfun$pythonToJava$2(SerDeUtil.scala:126)

It actually took me a second to realize I was looking at a stacktrace and not a dataframe.

My technical knowledge of Python pickling lacked depth, However, I’ve spent a lot of time in Hadoop and I have a pretty good handle on SerDe errors. Full disclosure, I usually create objects in Scala so I hadn’t see this error before in Databricks. But I am a longtime Java developer and I know how to read a stacktrace. Step one, go check out the code if you can. You can. Looking at the code, I can see that it treats any unsupported/unregistered classes as a dict. This was why I showed the code for my class; it’s not a dict. If there are unsupported/unregistered classes, there must be supported/registered classes. I checked out the README and found the list. And obviously no custom class was ever going to be on the list. I’m definitely not registering a custom IObjectConstructor. And that’s how I started the journey that ended up reading about Py4J.

Conclusion

What did I learn? Long story short, just pass dataframes simple objects like strings or ints in PySpark. In my case, for example, passing the string representation of the object was trivial since the object was just a string. Not a big issue. However, I did have to come to terms with the fact that I was comfortable with non-functioning code. The unit tests were more complicated than my code, but that’s actually pretty common. Also, I would have caught it in an integration test. In another blog, I’ll show how I handled the Spock mocking. Probably. The fact of the matter is that I had a very different outcome when running a piece of code from the Databricks UI than I did from my local instance. I’ve learned enough to not make the same mistake again. I refactored my code so its even more resilient now. I have better exception handling techniques. And I will continue to use VS Code as my primary coding tool for Databricks. But I’ll always have a Databricks notebook open in a browser because you never know.

 

 

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

David Callaghan, Solutions Architect

As a solutions architect with Perficient, I bring twenty years of development experience and I'm currently hands-on with Hadoop/Spark, blockchain and cloud, coding in Java, Scala and Go. I'm certified in and work extensively with Hadoop, Cassandra, Spark, AWS, MongoDB and Pentaho. Most recently, I've been bringing integrated blockchain (particularly Hyperledger and Ethereum) and big data solutions to the cloud with an emphasis on integrating Modern Data produces such as HBase, Cassandra and Neo4J as the off-blockchain repository.

More from this Author

Follow Us
TwitterLinkedinFacebookYoutubeInstagram