pyspark – java.lang.ClassFormatError: Unknown constant tag 83 in class file org/apache/spark/ml/param/DoubleArrayParam Exception when uploading SparkModel to S3

We are running a Spark Job in our kubernetes cluster, in which we are trying to save a Spark PipelineModel to S3. When submitting the job with only 1 executor, everything works fine, but when using 2 or more executors the job fails with the following Error:

22/07/01 07:07:48 ERROR Instrumentation: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 6) (10.0.2.190 executor 2): java.lang.ClassFormatError: Unknown constant tag 83 in class file org/apache/spark/ml/param/DoubleArrayParam
    at java.base/java.lang.ClassLoader.defineClass1(Native Method)
    at java.base/java.lang.ClassLoader.defineClass(Unknown Source)
    at java.base/java.security.SecureClassLoader.defineClass(Unknown Source)
    at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(Unknown Source)
    at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(Unknown Source)
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(Unknown Source)
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source)
    at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
    at java.base/java.lang.Class.getDeclaredFields0(Native Method)
    at java.base/java.lang.Class.privateGetDeclaredFields(Unknown Source)
    at java.base/java.lang.Class.getDeclaredField(Unknown Source)
    at java.base/java.io.ObjectStreamClass.getDeclaredSUID(Unknown Source)
    at java.base/java.io.ObjectStreamClass$2.run(Unknown Source)
    at java.base/java.io.ObjectStreamClass$2.run(Unknown Source)
    at java.base/java.security.AccessController.doPrivileged(Native Method)
    at java.base/java.io.ObjectStreamClass.<init>(Unknown Source)
    at java.base/java.io.ObjectStreamClass.lookup(Unknown Source)
    at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source)
    at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
    at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
    at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
    at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
    at java.base/java.io.ObjectInputStream.readClass(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
    at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.readArray(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
    at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
    at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
    at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at jdk.internal.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.base/java.lang.reflect.Method.invoke(Unknown Source)
    at java.base/java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
    at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
    at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at jdk.internal.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.base/java.lang.reflect.Method.invoke(Unknown Source)
    at java.base/java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
    at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
    at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

The Error also appears, when using the following code snippet from the spark examples.

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

conf = SparkConf()

spark = SparkSession 
    .builder 
    .config(conf=conf) 
    .getOrCreate()

# $example on$
# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)


model.write().session(spark).overwrite().save('s3a://path/to/save')

Removing the Estimator from the Pipeline, and just saving the Preprocessing steps works as expected.

Additional Information:

  • Spark version: 3.2.1
  • Additional jars: hadoop-aws in version 3.2.2, aws-java-sdk-bundle in version 1.11.375

Leave a Comment