Working with File System from PySpark

Motivation

Any of us is working with File System in our work. Almost every pipeline or application has some kind of file-based configuration. Typically json or yaml files are used. Also for data pipelines, it is sometimes important to be able to write results or state them in a human-readable format. Or serialize some artifacts, like matplotlib plot, into bytes and write them to the disk.

But PySpark applications are running in cluster mode, especially in a production environment. And all that we have is some distributed file system or object storage like HDFS, S3, Azure Blob Storage, etc.

Regular approach

An obvious solution is of course to use some side library. For example, we can use boto3 for working with S3, pyarrow for working with HDFS, or built-in Pathlib for Local One. But there are some problems:

  1. Sometimes it is a bad idea to take a huge library and add it to the project as a dependency especially if all that we need is just read or write some bytes from/to storage;
  2. All of these libraries has own abstractions and interfaces. So each user should learn one more API;
  3. Sometimes we need to be able to write into Local Files System when running unit tests but into some cloud storage from production. Of course one can use unittest.mock.patch (or pytest fixtures) but it can make writing tests not a trivial task.

At the same moment, we know that PySpark can read and write data quite effectively into any file system. Moreover, spark understands which system is it by path prefix. For example in this code we shouldn't specify the file system, all we need is just write the right prefix:

  spark_data_frame.write.parquet("s3a://my-prefix/table") # this will write to S3 bucket
  spark_data_frame.write.parquet("file://my-home-dir/table") # and this one will save data locally

So why not use such built-in `PySpark` features?

Java way

Spark is written in Scala, a language from the JVM family. And under the hood Spark steel heavily uses org.apache.hadoop so this jar is accessible out-of-the-box in almost each Spark setup. We can make a look into the documentation of org.apache.hadoop.fs.FileSystem: a main class for making i/o operations. There are implementations for S3, HDFS, Local and Azure file storage. So we can use a single interface and all the advantages of Java classes hierarchy and do not care about which implementation to use where. Imagine we have a SparkSession in some Java program. In this case, we can write code like this:

  import org.apache.spark.sql.SparkSession;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;

  public class Main {
      public static void main(String[] args) {
          SparkSession spark = SparkSession.builder.getOrCreate();
          FileSystem fs = FileSystem.get(spark.sparkContext.hadoopConfiguration);
          Path oldFile = new Path("hdfs://some-place/old-file");
          Path newFile = new Path("hdfs://some-place/new-file");
          fs.rename(oldFile, newFile);
      }
  }

Here we used, for example, method of FileSystem. Generally speaking, we can do any file operation (move, read, write, rename, glob, etc.) with such a class. For Scala users there is also a nice scala library with a simple, functional interface that hide FileSystem under the hood and provides clean public interfaces.

But how can we use this solution from PySpark?

PySpark solution

Working with py4j and JVM

Interestingly, all the PySpark is built on the shoulders of py4j: a library with 1100 stars in GitHub. Just for comparison spark has 35300 stars on GitHub.

Under the hood PySpark just wraps Scala calls into py4j. In spark runtime you have access to JVM:

  from pyspark.sql import SparkSession

  spark = SparkSession.builder.getOrCreate()
  spark_jvm = spark.sparkContext._jvm

Let's create some Java objects and try to interact with them from Python.

  java_int = spark_jvm.java.lang.Integer(1)
  java_another_int = spark_jvm.java.lang.Integer(2)
  print(java_int + java_another_int)
3

Under the hood py4j implicitly make the conversion from simple Python types into simple Java types. In the example above we pass python int into java.lang.Integer as is. We can do the same things with strings, numbers, and sometimes with lists. But often we should explicitly covert types from python to Java and back.

Create a FileSystem instance

Let's create a FileSystem instance. From the documentation, we can see that there is a constructor (constructor in Java is like __init__(self, **kwargs) in Python) but it is protected which means it is accessible only from FileSystem class but not from outside. But there are few static methods that allows us to initialize an instance of FileSystem:

get(Configuration conf)Returns the configured FileSystem implementation.
get(URI uri, Configuration conf)Get a FileSystem for this URI's scheme and authority.
get(URI uri, Configuration conf, String user)Get a FileSystem instance based on the uri, the passed in configuration and the user.

At first, we need to get Configuration conf instance which contains all the information about FileSystem. The good news is that we can get it from our SparkSession object directly from python:

  hadoop_conf = spark._jsc.hadoopConfiguration()

Here we are using another object: jsc which is the same SparkContext but accessible not via pyspark wrapper but as JavaObject.

To allow spark to choose the right implementation of FileSystem (for example, NativeS3FileSystem for S3 or RawLocalFileSystem for local files) we should pass into get method also URI. To get a URI from a simple path we can use org.apache.hadoop.fs.Path.toUri method:

def _get_hdfs(
  spark: SparkSession, pattern: str
) -> Tuple[JavaObject, JavaObject]:
  # Java is accessible in runtime only and it is impossible to infer types here
  hadoop = spark.sparkContext._jvm.org.apache.hadoop  # type: ignore
  hadoop_conf = spark._jsc.hadoopConfiguration()  # type: ignore
  uri = hadoop.fs.Path(pattern).toUri()  # type: ignore
  hdfs = hadoop.fs.FileSystem.get(uri, hadoop_conf)  # type: ignore

  return (hadoop, hdfs)  # type: ignore

This function gets a spark session and a pattern (or path) and returns us hadoop and FileSystem instance based on the given SparkSession. So if you, for example, already configure your spark session to work with S3 such a function will use this configuration.

List files

The simplest operation we can do with such an instance of FileSystem is to list files in a distributed or local file system. It is sometimes very useful for example if we check if some path exists or to find some directories based on a pattern. There is a method public FileStatus[] globStatus(Path pathPattern) which takes a pattern and returns Java array of FileStatus objects. Let's see how it works:

  hadoop = spark.sparkContext._jvm.org.apache.hadoop # syntax sugar for simplifying the code
  path = hadoop.fs.Path("file:///home/sem/*")
  hdfs = hadoop.fs.FileSystem.get(path.toUri(), spark._jsc.hadoopConfiguration())
  statuses = file_system.globStatus(path)
  print(len(statuses))
105
What happens if we pass a wrong path?
  hadoop = spark.sparkContext._jvm.org.apache.hadoop # syntax sugar for simplifying the code
  path = hadoop.fs.Path("file://home/sem/*")
  hdfs = hadoop.fs.FileSystem.get(path.toUri(), spark._jsc.hadoopConfiguration())
  statuses = file_system.globStatus(path)
  print(len(statuses))
  pyspark.sql.utils.IllegalArgumentException: Wrong FS: file://home/sem, expected: file:///
Working with FileStatus

To provide a top-level python API we should convert results of globStatus from Java FileStatus[] into python list. To do it lets create a data container for storing information about files:

@dataclass
class HDFSFile:
  name: str
  path: str
  mod_time: int
  is_dir: bool

After that we can loop through statuses and extract information from Java objects to store it inside dataclasses:

  res = []
  for file_status in statuses:
    res.append(
      HDFSFile(
        name=file_status.getPath().getName(),
        path=file_status.getPath().toString(),
        mod_time=file_status.getModificationTime(),
        is_dir=file_status.isDirectory(),
      )
    )

Working with strings

The next thing we want to have here is the ability to write and read strings. Using just simple strings we can serialize a lot of objects into, for example, json and yaml format. But here we are facing some problems. If we make a look into the documentation of FileSystem we find that the main way to write information is a FSDataOutputStream (link to the documentation). It implements a DataOutputStream abstraction which provides two methods that look interesting from the first view:

  1. public final void writeUTF(String str)
  2. public final void writeChars(String s)

Unfortunately both of them have very bad compatibility with Python UTF-8 strings. The first one uses modified UTF-8 which is useful if you need to have C compatibility but such strings are unreadable from python side (you can read them only as bytes and after that manually decode them). The second one uses UTF-16BE encoding which is some kind of standard in Java but also cannot be simply read as string from Python.

  path = hadoop.fs.Path("file:///home/sem/test_file.txt")
  output_stream = file_system.create(path)
  output_stream.writeChars("some testing data with utf-8 symbols: абвгдеж😊")
  output_stream.flush()
  output_stream.close()
  with open("/home/sem/test_file.txt", "r") as test_file:
   print(test_file.read())
    (result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd8 in position 90: invalid continuation byte

Of course, you are still able to read the data as bytes and decode it manually:

  with open("/home/sem/test_file.txt", "br") as byte_file:
   print(byte_file.read().decode("utf-16be"))
some testing data with utf-8 symbols: абвгдеж😊

But it is not the better option. A better way is to write data as bytes on the Java side but read it as regular a string on python side:

  def write_utf8(
    hdfs, hadoop, path: str, data: str, mode: Literal["a", "w"]
  ) -> None:
    """Write a given string in UTF-16BE to the given path.
    Do not use this method to write the data!
    It is fantastically slow compared to `spark.write`.
    :param path: Path of file
    :param data: String to write
    :param mode: Mode. `w` means overwrite but `a` means append.
    """
    if mode == "w":
      # org.apache.hadoop.fs.FileSystem.create(Path f, boolean overwrite)
      output_stream = hdfs.create(hadoop.fs.Path(path), True)  # type: ignore
    elif mode == "a":
      # org.apache.hadoop.fs.FileSystem.append(Path f)
      output_stream = hdfs.append(hadoop.fs.Path(path))  # type: ignore

    # org.apache.hadoop.fs.FSDataOutputStream
    try:
      for b in data.encode("utf-8"):
          output_stream.write(b)
      output_stream.flush()
      output_stream.close()
    except Exception as e:
      output_stream.close()
      raise e

Combining all together

Finally, we are ready to combine it all together and create a class for working with File Systems when all these py4 things are hidden under the hood.

  import enum
  import re
  from dataclasses import dataclass
  from typing import List, Literal, Tuple

  from py4j.java_gateway import JavaObject
  from pyspark.sql import SparkSession

  _FS_PATTERN = r"(s3\w*://|hdfs://|dbfs://|file://|file:/).(.*)"


  class FS_TYPES(enum.Enum):
    DBFS = "DBFS"
    HDFS = "HDFS"
    S3 = "S3"
    LOCAL = "LOCAL"
    UNKNOWN = "UNKNOWN"

    @classmethod
    def _from_pattern(cls, pattern: str):
      return {
          "s3://": cls.S3,
          "s3a://": cls.S3,
          "dbfs://": cls.DBFS,
          "hdfs://": cls.HDFS,
          "file://": cls.LOCAL,
      }.get(pattern, cls.UNKNOWN)


  @dataclass
  class HDFSFile:
    name: str
    path: str
    mod_time: int
    is_dir: bool
    fs_type: FS_TYPES


  def _get_hdfs(
    spark: SparkSession, pattern: str
  ) -> Tuple[JavaObject, JavaObject, FS_TYPES]:
    match = re.match(_FS_PATTERN, pattern)
    if match is None:
        raise ValueError(
            f"Bad pattern or path. Got {pattern} but should be"
            " one of `s3://`, `s3a://`, `dbfs://`, `hdfs://`, `file://`"
        )

    fs_type = FS_TYPES._from_pattern(match.groups()[0])

    # Java is accessible in runtime only and it is impossible to infer types here
    hadoop = spark.sparkContext._jvm.org.apache.hadoop  # type: ignore
    hadoop_conf = spark._jsc.hadoopConfiguration()  # type: ignore
    uri = hadoop.fs.Path(pattern).toUri()  # type: ignore
    hdfs = hadoop.fs.FileSystem.get(uri, hadoop_conf)  # type: ignore

    return (hadoop, hdfs, fs_type)  # type: ignore

  class HadoopFileSystem(object):
    def __init__(self: "HadoopFileSystem", spark: SparkSession, pattern: str) -> None:
      """Helper class for working with FileSystem.
      :param spark: SparkSession object
      :param pattern: Any pattern related to FileSystem.
                      We should provide it to choose the right implementation of org.apache.hadoop.fs.FileSystem under the hood.
                      Pattern here should have a form of URI-like string like `s3a:///my-bucket/my-prefix` or `file:///home/user/`.
      """
      hadoop, hdfs, fs_type = _get_hdfs(spark, pattern)
      self._hdfs = hdfs
      self._fs_type = fs_type
      self._hadoop = hadoop
      self._jvm = spark.sparkContext._jvm

    def write_utf8(
      self: "HadoopFileSystem", path: str, data: str, mode: Literal["a", "w"]
    ) -> None:
      """Write a given string in UTF-16BE to the given path.
      Do not use this method to write the data!
      It is fantastically slow compared to `spark.write`.
      :param path: Path of file
      :param data: String to write
      :param mode: Mode. `w` means overwrite but `a` means append.
      """
      if mode == "w":
        # org.apache.hadoop.fs.FileSystem.create(Path f, boolean overwrite)
        output_stream = self._hdfs.create(self._hadoop.fs.Path(path), True)  # type: ignore
      elif mode == "a":
        # org.apache.hadoop.fs.FileSystem.append(Path f)
        output_stream = self._hdfs.append(self._hadoop.fs.Path(path))  # type: ignore

      # org.apache.hadoop.fs.FSDataOutputStream
      try:
        for b in data.encode("utf-8"):
          output_stream.write(b)
        output_stream.flush()
        output_stream.close()
      except Exception as e:
        output_stream.close()
        raise e

    def read_utf8(self: "HadoopFileSystem", path: str) -> str:
      """Read string from given path.
      Do not use this method to read the data!
      It is fantastically slow compared to `spark.read`.
      :param path: Path of file
      :return: Decoded from UTF-8 string
      :rtype: str
      """
      res = []
      # org.apache.hadoop.fs.FileSystem.open
      in_stream = self._hdfs.open(self._hadoop.fs.Path(path))  # type: ignore

      # open returns us org.apache.hadoop.fs.FSDataInputStream
      try:
        while True:
          if in_stream.available() > 0:
            res.append(in_stream.readByte())
          else:
            in_stream.close()
            break
      except Exception as e:
        in_stream.close()
        raise e

      return bytes(res).decode("utf-8")

    def glob(self, pattern: str) -> List[HDFSFile]:
      statuses = self._hdfs.globStatus(self._hadoop.fs.Path(pattern))

      res = []
      for file_status in statuses:
        # org.apache.hadoop.fs.FileStatus
        res.append(
          HDFSFile(
            name=file_status.getPath().getName(),
            path=file_status.getPath().toString(),
            mod_time=file_status.getModificationTime(),
            is_dir=file_status.isDirectory(),
            fs_type=self._fs_type,
          )
        )

      return res

Conclusion

There is a nice lightweight Python library with zero additional dependencies: Eren. This library contains a lot of useful routines for working with Hive and Hadoop. I pushed the code above into this library so you are free to use it. All that you need is just to write:

  from eren import fs

  hdfs = fs.HadoopFileSystem(spark_session, "hdfs://some-place")
  s3fs = fs.HadoopFileSystem(spark_session, "s3a://prefix/bucket")
  local_fs = fs.HadoopFileSystem(spark_session, "file://my-home-folder")