python - PySpark FileAlreadyExistsException: Unable to overwrite output directory during saveAsTextFile - Stack Overflow

admin2025-05-01  1

I am working on a PySpark script to perform a simple word count. My script runs fine, but I encounter an error when trying to save the results using saveAsTextFile (Now I'm on ubuntu). Here's the error I get:

py4j.protocol.Py4JJavaError: An error occurred while calling o48.saveAsTextFile. 
org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/home/pyspark_python/wordcount/output_new already exists

Here are the steps I have taken so far:

Verified that the output directory does not contain any data (ls shows it is empty). Deleted and recreated the directory using rm -r and mkdir -p. Ensured no other Spark jobs are running (ps aux | grep spark).

Despite this, the error persists when I re-run the script.

Here is the code I am using :

from pyspark import SparkConf, SparkContext
import os

def main(input_file, output_dir):
    # Configuration Spark
    conf = SparkConf().setAppName("WordCountTask").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    # Lecture du fichier d'entrée
    text_file = sc.textFile(input_file)

    # Comptage des mots
    counts = (
        text_file.flatMap(lambda line: line.split(" "))
                 .map(lambda word: (word, 1))
                 .reduceByKey(lambda a, b: a + b)
    )

    # Sauvegarde des résultats
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    counts.saveAsTextFile(output_dir)

    print(f"Résultats sauvegardés dans le répertoire : {output_dir}")

if __name__ == "__main__":
    # Définir les chemins d'entrée et de sortie
    input_file = r"/home/othniel/pyspark_python/wordcount/input/loremipsum.txt"
    output_dir = "/home/othniel/pyspark_python/wordcount/output_new"

    # Exécution de la tâche WordCount
    main(input_file, output_dir)

How can I resolve this error and ensure PySpark successfully writes to the output directory ? Is there something specific I need to configure in my script or environment ?

Thank you for your help!

I am working on a PySpark script to perform a simple word count. My script runs fine, but I encounter an error when trying to save the results using saveAsTextFile (Now I'm on ubuntu). Here's the error I get:

py4j.protocol.Py4JJavaError: An error occurred while calling o48.saveAsTextFile. 
org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/home/pyspark_python/wordcount/output_new already exists

Here are the steps I have taken so far:

Verified that the output directory does not contain any data (ls shows it is empty). Deleted and recreated the directory using rm -r and mkdir -p. Ensured no other Spark jobs are running (ps aux | grep spark).

Despite this, the error persists when I re-run the script.

Here is the code I am using :

from pyspark import SparkConf, SparkContext
import os

def main(input_file, output_dir):
    # Configuration Spark
    conf = SparkConf().setAppName("WordCountTask").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    # Lecture du fichier d'entrée
    text_file = sc.textFile(input_file)

    # Comptage des mots
    counts = (
        text_file.flatMap(lambda line: line.split(" "))
                 .map(lambda word: (word, 1))
                 .reduceByKey(lambda a, b: a + b)
    )

    # Sauvegarde des résultats
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    counts.saveAsTextFile(output_dir)

    print(f"Résultats sauvegardés dans le répertoire : {output_dir}")

if __name__ == "__main__":
    # Définir les chemins d'entrée et de sortie
    input_file = r"/home/othniel/pyspark_python/wordcount/input/loremipsum.txt"
    output_dir = "/home/othniel/pyspark_python/wordcount/output_new"

    # Exécution de la tâche WordCount
    main(input_file, output_dir)

How can I resolve this error and ensure PySpark successfully writes to the output directory ? Is there something specific I need to configure in my script or environment ?

Thank you for your help!

Share Improve this question edited Jan 3 at 19:01 Fractal asked Jan 2 at 22:30 FractalFractal 1411 silver badge6 bronze badges 3
  • 1 No, not me. Now I'll give you + 1. – Fractal Commented Jan 3 at 6:07
  • unrelated to the question but useful to know: os.makedirs(output_dir, exist_ok=True) will skip creating & won’t throw an error if the directory already exists, meaning you could get rid of the os.path.exists() line above it docs.python.org/3/library/os.html#os.makedirs – dijksterhuis Commented Jan 3 at 21:42
  • 1 @Fractal, do you want to write one file per partition in counts to your output_dir? Or do you want to write a single file with some random name in side output_dir? Or is output_dir supposed to be actually name of a file, although the name says it's output "dir"? – Kashyap Commented Jan 4 at 0:50
Add a comment  | 

2 Answers 2

Reset to default 0

Your main issue is that Spark's RDD saveAsTextFile method attempts to create a new directory at the location denoted by its first filepath argument [0].

You've already created the output_dir directory, so spark is throwing an error as it cannot create the directory for you.

To solve this, you can add an additional path element to the argument you pass to the saveAsTextFile method.

counts.saveAsTextFile(os.path.join(output_dir, "counts.txt"))

This will create a 'container' directory called counts.txt which contains 1x text file for each partition of the RDD (you can see the number of partitions for an RDD with the getNumPartitions method [1] before writing).

For an RDD with 2x partitions, you will end up with 2x files in this container directory. The important ones are the part-* files, which contain the actual text data for each partition:

./output_new/counts.txt/part-00000
./output_new/counts.txt/part-00001

If you want to read all of the data back into a single RDD [2] later on, you can use this:

rdd = sc.textFile(os.path.join(output_dir, "counts.txt"))

While a counts.txt directory name may look odd at first glance, it is good practice for two reasons:

  1. It indicates the format of the partition data files stored within that directory. For example, a *.txt container directory likely contains files in a different format to a container directory suffixed with *.parquet.

  2. When saving multiple RDDs from the same script, you would want to ensure each RDD is written into their own container directory. Otherwise data from one RDD write operation can be overwrite data for another.

In the land of Spark APIs, container directories are usually treated as a "file" abstraction. That way we don't need to worry about reading individual partition files. Especially when it comes to the DataFrame API.


[0]: saveAsTextFile docs: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.saveAsTextFile.html

[1]: getNumPartitions docs: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.getNumPartitions.html

[2]: be aware that the data will be read in as string types, and you'll need to parse out whatever you've stored into their proper types.

As the error message states Output directory file:/home/pyspark_python/wordcount/output_new already exists. Spark expects only the parent dir (os.path.basedir(output_dir)) to exist. Not the actual output dir.

Don't create it!

from pyspark import SparkConf, SparkContext
import os

def main(input_file, output_dir):
    # Configuration Spark
    conf = SparkConf().setAppName("WordCountTask").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    # Lecture du fichier d'entrée
    text_file = sc.textFile(input_file)

    # Comptage des mots
    counts = (
        text_file.flatMap(lambda line: line.split(" "))
                 .map(lambda word: (word, 1))
                 .reduceByKey(lambda a, b: a + b)
    )

    # ---------- DON'T DO THIS ---------------
    # if not os.path.exists(output_dir):              <--------
    #     os.makedirs(output_dir)                     <--------
    # ---------- DON'T DO THIS ---------------

    counts.saveAsTextFile(output_dir)

    print(f"Résultats sauvegardés dans le répertoire : {output_dir}")

if __name__ == "__main__":
    # Définir les chemins d'entrée et de sortie
    input_file = r"/home/othniel/pyspark_python/wordcount/input/loremipsum.txt"
    output_dir = "/home/othniel/pyspark_python/wordcount/output_new"

    # Exécution de la tâche WordCount
    main(input_file, output_dir)

PS: You should not be using SparkContext and RDD interface. Use SparkSession/DataFrame instead. E.g.

from pyspark.sql import SparkSession, DataFrame

def main(input_file, output_dir):
    spark: SparkSession = SparkSession.builder.getOrCreate()
    df_text: DataFrame = spark.read.text('input_file')
    df_counts: DataFrame = (
        df_text.
        # ... your transformations using DataFrame API instead of RDD)
    )

    df_counts.write.csv(output_dir, overwrite=True)
转载请注明原文地址:http://anycun.com/QandA/1746093931a91578.html