Working with File System from PySpark
Motivation
Any of us is working with File System in our work. Almost every pipeline or application has some kind of file-based configuration. Typically json
or yaml
files are used. Also for data pipelines, it is sometimes important to be able to write results or state them in a human-readable format. Or serialize some artifacts, like matplotlib
plot, into bytes and write them to the disk.
But PySpark applications are running in cluster mode, especially in a production environment. And all that we have is some distributed file system or object storage like HDFS, S3, Azure Blob Storage, etc.
Regular approach
An obvious solution is of course to use some side library. For example, we can use boto3
for working with S3
, pyarrow
for working with HDFS
, or built-in Pathlib
for Local One. But there are some problems:
- Sometimes it is a bad idea to take a huge library and add it to the project as a dependency especially if all that we need is just read or write some bytes from/to storage;
- All of these libraries has own abstractions and interfaces. So each user should learn one more API;
- Sometimes we need to be able to write into Local Files System when running unit tests but into some cloud storage from production. Of course one can use
unittest.mock.patch
(orpytest
fixtures) but it can make writing tests not a trivial task.
At the same moment, we know that PySpark
can read and write data quite effectively into any file system. Moreover, spark understands which system is it by path prefix. For example in this code we shouldn't specify the file system, all we need is just write the right prefix:
spark_data_frame.write.parquet("s3a://my-prefix/table") # this will write to S3 bucket
spark_data_frame.write.parquet("file://my-home-dir/table") # and this one will save data locally
So why not use such built-in `PySpark` features?
Java way
Spark is written in Scala, a language from the JVM family. And under the hood Spark steel heavily uses org.apache.hadoop
so this jar is accessible out-of-the-box in almost each Spark setup. We can make a look into the documentation of org.apache.hadoop.fs.FileSystem
: a main class for making i/o operations. There are implementations for S3
, HDFS
, Local
and Azure
file storage. So we can use a single interface and all the advantages of Java classes hierarchy and do not care about which implementation to use where. Imagine we have a SparkSession
in some Java program. In this case, we can write code like this:
import org.apache.spark.sql.SparkSession;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class Main {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder.getOrCreate();
FileSystem fs = FileSystem.get(spark.sparkContext.hadoopConfiguration);
Path oldFile = new Path("hdfs://some-place/old-file");
Path newFile = new Path("hdfs://some-place/new-file");
fs.rename(oldFile, newFile);
}
}
Here we used, for example, method of FileSystem
. Generally speaking, we can do any file operation (move, read, write, rename, glob, etc.) with such a class. For Scala
users there is also a nice scala library with a simple, functional interface that hide FileSystem
under the hood and provides clean public interfaces.
But how can we use this solution from PySpark?
PySpark solution
Working with py4j and JVM
Interestingly, all the PySpark is built on the shoulders of py4j: a library with 1100 stars in GitHub. Just for comparison spark has 35300 stars on GitHub.
Under the hood PySpark
just wraps Scala calls into py4j
. In spark runtime you have access to JVM
:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark_jvm = spark.sparkContext._jvm
Let's create some Java objects and try to interact with them from Python.
java_int = spark_jvm.java.lang.Integer(1)
java_another_int = spark_jvm.java.lang.Integer(2)
print(java_int + java_another_int)
3
Under the hood py4j implicitly make the conversion from simple Python types into simple Java types. In the example above we pass python int into
java.lang.Integer
as is. We can do the same things with strings, numbers, and sometimes with lists. But often we should explicitly covert types from python to Java and back.
Create a FileSystem instance
Let's create a FileSystem
instance. From the documentation, we can see that there is a constructor (constructor in Java is like __init__(self, **kwargs)
in Python) but it is protected
which means it is accessible only from FileSystem
class but not from outside. But there are few static
methods that allows us to initialize an instance of FileSystem
:
get(Configuration conf) | Returns the configured FileSystem implementation. |
get(URI uri, Configuration conf) | Get a FileSystem for this URI's scheme and authority. |
get(URI uri, Configuration conf, String user) | Get a FileSystem instance based on the uri, the passed in configuration and the user. |
At first, we need to get Configuration conf
instance which contains all the information about FileSystem. The good news is that we can get it from our SparkSession
object directly from python:
hadoop_conf = spark._jsc.hadoopConfiguration()
Here we are using another object: jsc which is the same SparkContext but accessible not via pyspark wrapper but as JavaObject.
To allow spark to choose the right implementation of FileSystem (for example, NativeS3FileSystem
for S3 or RawLocalFileSystem
for local files) we should pass into get
method also URI
. To get a URI
from a simple path we can use org.apache.hadoop.fs.Path.toUri
method:
def _get_hdfs(
spark: SparkSession, pattern: str
) -> Tuple[JavaObject, JavaObject]:
# Java is accessible in runtime only and it is impossible to infer types here
hadoop = spark.sparkContext._jvm.org.apache.hadoop # type: ignore
hadoop_conf = spark._jsc.hadoopConfiguration() # type: ignore
uri = hadoop.fs.Path(pattern).toUri() # type: ignore
hdfs = hadoop.fs.FileSystem.get(uri, hadoop_conf) # type: ignore
return (hadoop, hdfs) # type: ignore
This function gets a spark session and a pattern (or path) and returns us hadoop and FileSystem
instance based on the given SparkSession. So if you, for example, already configure your spark session to work with S3 such a function will use this configuration.
List files
The simplest operation we can do with such an instance of FileSystem
is to list files in a distributed or local file system. It is sometimes very useful for example if we check if some path exists or to find some directories based on a pattern. There is a method public FileStatus[] globStatus(Path pathPattern)
which takes a pattern and returns Java array of FileStatus
objects. Let's see how it works:
hadoop = spark.sparkContext._jvm.org.apache.hadoop # syntax sugar for simplifying the code
path = hadoop.fs.Path("file:///home/sem/*")
hdfs = hadoop.fs.FileSystem.get(path.toUri(), spark._jsc.hadoopConfiguration())
statuses = file_system.globStatus(path)
print(len(statuses))
105
What happens if we pass a wrong path?
hadoop = spark.sparkContext._jvm.org.apache.hadoop # syntax sugar for simplifying the code
path = hadoop.fs.Path("file://home/sem/*")
hdfs = hadoop.fs.FileSystem.get(path.toUri(), spark._jsc.hadoopConfiguration())
statuses = file_system.globStatus(path)
print(len(statuses))
pyspark.sql.utils.IllegalArgumentException: Wrong FS: file://home/sem, expected: file:///
Working with FileStatus
To provide a top-level python API we should convert results of globStatus
from Java FileStatus[]
into python list
. To do it lets create a data container for storing information about files:
@dataclass
class HDFSFile:
name: str
path: str
mod_time: int
is_dir: bool
After that we can loop through statuses and extract information from Java objects to store it inside dataclasses:
res = []
for file_status in statuses:
res.append(
HDFSFile(
name=file_status.getPath().getName(),
path=file_status.getPath().toString(),
mod_time=file_status.getModificationTime(),
is_dir=file_status.isDirectory(),
)
)
Working with strings
The next thing we want to have here is the ability to write and read strings. Using just simple strings we can serialize a lot of objects into, for example, json
and yaml
format. But here we are facing some problems. If we make a look into the documentation of FileSystem
we find that the main way to write information is a FSDataOutputStream
(link to the documentation). It implements a DataOutputStream
abstraction which provides two methods that look interesting from the first view:
public final void writeUTF(String str)
public final void writeChars(String s)
Unfortunately both of them have very bad compatibility with Python UTF-8 strings. The first one uses modified UTF-8 which is useful if you need to have C
compatibility but such strings are unreadable from python side (you can read them only as bytes and after that manually decode them). The second one uses UTF-16BE
encoding which is some kind of standard in Java but also cannot be simply read as string from Python.
path = hadoop.fs.Path("file:///home/sem/test_file.txt")
output_stream = file_system.create(path)
output_stream.writeChars("some testing data with utf-8 symbols: абвгдеж😊")
output_stream.flush()
output_stream.close()
with open("/home/sem/test_file.txt", "r") as test_file:
print(test_file.read())
(result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd8 in position 90: invalid continuation byte
Of course, you are still able to read the data as bytes and decode it manually:
with open("/home/sem/test_file.txt", "br") as byte_file:
print(byte_file.read().decode("utf-16be"))
some testing data with utf-8 symbols: абвгдеж😊
But it is not the better option. A better way is to write data as bytes on the Java side but read it as regular a string on python side:
def write_utf8(
hdfs, hadoop, path: str, data: str, mode: Literal["a", "w"]
) -> None:
"""Write a given string in UTF-16BE to the given path.
Do not use this method to write the data!
It is fantastically slow compared to `spark.write`.
:param path: Path of file
:param data: String to write
:param mode: Mode. `w` means overwrite but `a` means append.
"""
if mode == "w":
# org.apache.hadoop.fs.FileSystem.create(Path f, boolean overwrite)
output_stream = hdfs.create(hadoop.fs.Path(path), True) # type: ignore
elif mode == "a":
# org.apache.hadoop.fs.FileSystem.append(Path f)
output_stream = hdfs.append(hadoop.fs.Path(path)) # type: ignore
# org.apache.hadoop.fs.FSDataOutputStream
try:
for b in data.encode("utf-8"):
output_stream.write(b)
output_stream.flush()
output_stream.close()
except Exception as e:
output_stream.close()
raise e
Combining all together
Finally, we are ready to combine it all together and create a class for working with File Systems when all these py4
things are hidden under the hood.
import enum
import re
from dataclasses import dataclass
from typing import List, Literal, Tuple
from py4j.java_gateway import JavaObject
from pyspark.sql import SparkSession
_FS_PATTERN = r"(s3\w*://|hdfs://|dbfs://|file://|file:/).(.*)"
class FS_TYPES(enum.Enum):
DBFS = "DBFS"
HDFS = "HDFS"
S3 = "S3"
LOCAL = "LOCAL"
UNKNOWN = "UNKNOWN"
@classmethod
def _from_pattern(cls, pattern: str):
return {
"s3://": cls.S3,
"s3a://": cls.S3,
"dbfs://": cls.DBFS,
"hdfs://": cls.HDFS,
"file://": cls.LOCAL,
}.get(pattern, cls.UNKNOWN)
@dataclass
class HDFSFile:
name: str
path: str
mod_time: int
is_dir: bool
fs_type: FS_TYPES
def _get_hdfs(
spark: SparkSession, pattern: str
) -> Tuple[JavaObject, JavaObject, FS_TYPES]:
match = re.match(_FS_PATTERN, pattern)
if match is None:
raise ValueError(
f"Bad pattern or path. Got {pattern} but should be"
" one of `s3://`, `s3a://`, `dbfs://`, `hdfs://`, `file://`"
)
fs_type = FS_TYPES._from_pattern(match.groups()[0])
# Java is accessible in runtime only and it is impossible to infer types here
hadoop = spark.sparkContext._jvm.org.apache.hadoop # type: ignore
hadoop_conf = spark._jsc.hadoopConfiguration() # type: ignore
uri = hadoop.fs.Path(pattern).toUri() # type: ignore
hdfs = hadoop.fs.FileSystem.get(uri, hadoop_conf) # type: ignore
return (hadoop, hdfs, fs_type) # type: ignore
class HadoopFileSystem(object):
def __init__(self: "HadoopFileSystem", spark: SparkSession, pattern: str) -> None:
"""Helper class for working with FileSystem.
:param spark: SparkSession object
:param pattern: Any pattern related to FileSystem.
We should provide it to choose the right implementation of org.apache.hadoop.fs.FileSystem under the hood.
Pattern here should have a form of URI-like string like `s3a:///my-bucket/my-prefix` or `file:///home/user/`.
"""
hadoop, hdfs, fs_type = _get_hdfs(spark, pattern)
self._hdfs = hdfs
self._fs_type = fs_type
self._hadoop = hadoop
self._jvm = spark.sparkContext._jvm
def write_utf8(
self: "HadoopFileSystem", path: str, data: str, mode: Literal["a", "w"]
) -> None:
"""Write a given string in UTF-16BE to the given path.
Do not use this method to write the data!
It is fantastically slow compared to `spark.write`.
:param path: Path of file
:param data: String to write
:param mode: Mode. `w` means overwrite but `a` means append.
"""
if mode == "w":
# org.apache.hadoop.fs.FileSystem.create(Path f, boolean overwrite)
output_stream = self._hdfs.create(self._hadoop.fs.Path(path), True) # type: ignore
elif mode == "a":
# org.apache.hadoop.fs.FileSystem.append(Path f)
output_stream = self._hdfs.append(self._hadoop.fs.Path(path)) # type: ignore
# org.apache.hadoop.fs.FSDataOutputStream
try:
for b in data.encode("utf-8"):
output_stream.write(b)
output_stream.flush()
output_stream.close()
except Exception as e:
output_stream.close()
raise e
def read_utf8(self: "HadoopFileSystem", path: str) -> str:
"""Read string from given path.
Do not use this method to read the data!
It is fantastically slow compared to `spark.read`.
:param path: Path of file
:return: Decoded from UTF-8 string
:rtype: str
"""
res = []
# org.apache.hadoop.fs.FileSystem.open
in_stream = self._hdfs.open(self._hadoop.fs.Path(path)) # type: ignore
# open returns us org.apache.hadoop.fs.FSDataInputStream
try:
while True:
if in_stream.available() > 0:
res.append(in_stream.readByte())
else:
in_stream.close()
break
except Exception as e:
in_stream.close()
raise e
return bytes(res).decode("utf-8")
def glob(self, pattern: str) -> List[HDFSFile]:
statuses = self._hdfs.globStatus(self._hadoop.fs.Path(pattern))
res = []
for file_status in statuses:
# org.apache.hadoop.fs.FileStatus
res.append(
HDFSFile(
name=file_status.getPath().getName(),
path=file_status.getPath().toString(),
mod_time=file_status.getModificationTime(),
is_dir=file_status.isDirectory(),
fs_type=self._fs_type,
)
)
return res
Conclusion
There is a nice lightweight Python library with zero additional dependencies: Eren. This library contains a lot of useful routines for working with Hive and Hadoop. I pushed the code above into this library so you are free to use it. All that you need is just to write:
from eren import fs
hdfs = fs.HadoopFileSystem(spark_session, "hdfs://some-place")
s3fs = fs.HadoopFileSystem(spark_session, "s3a://prefix/bucket")
local_fs = fs.HadoopFileSystem(spark_session, "file://my-home-folder")