python – Customization UDF in pyspark raised error

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.catalog import *
from pyspark import SparkConf
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding
from dateutil.relativedelta import *
from ecies.utils import generate_eth_key, generate_key
from ecies import encrypt, decrypt
from hashlib import blake2b
import boto3
import json
import re
import secrets
import base64
import random
import sys
import datetime
import os 

# Process the arguments
## Define the function to parse the arguments
def parseOptions(argsLst):
    argsDct = {}
    for i in range(0, len(argsLst) - 1):
        k = argsLst[i].strip("--")
        argsDct[k] = argsLst[i + 1]
        i = i + 2
    return argsDct

## Collect the arguments
args = parseOptions(sys.argv[1:])

## Parse the job arguments
hub_name = args['HUB_NAME']
db = args['DATABASE_NAME']
table = args['TABLE_NAME']
masking_set = json.loads(args['MASKING_SET'])
bucket = args['RESULT_BUCKET']
email = args['EMAIL']
region_name = args['REGION_NAME']

# Get the environment variable
os.environ['AWS_DEFAULT_REGION'] = region_name
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"

# spark session initialization 
conf = SparkConf()
conf.setExecutorEnv('PYSPARK_PYTHON', './environment/bin/python')
conf.setExecutorEnv('AWS_DEFAULT_REGION', region_name)
conf.set('spark.eventLog.enabled', 'true')
conf.set('spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version.emr_internal_use_only.EmrFileSystem', '2')
conf.set('spark.sql.parquet.output.committer.class', 'com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter')
conf.set('spark.sql.emr.internal.extensions', 'com.amazonaws.emr.spark.EmrSparkSessionExtensions')
conf.set('spark.eventLog.dir', 'hdfs:///var/log/spark/apps')
conf.set('spark.sql.hive.metastore.sharedPrefixes', 'com.amazonaws.services.dynamodbv2')
conf.set('spark.sql.warehouse.dir', 'hdfs:///user/spark/warehouse')
#conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set('spark.serializer.objectStreamReset', '-1')
conf.set('spark.history.fs.logDirectory', 'hdfs:///var/log/spark/apps')
conf.set('spark.sql.parquet.fs.optimized.committer.optimization-enabled', 'true')
conf.set('spark.shuffle.service.enabled', 'true')
conf.set('spark.hadoop.yarn.timeline-service.enabled', 'true')
conf.set('spark.resourceManager.cleanupExpiredHost', 'true')
conf.set('spark.files.fetchFailure.unRegisterOutputOnHost', 'true')
conf.set('spark.hadoop.mapreduce.output.fs.optimized.committer.enabled', 'true')
conf.set('spark.sql.catalogImplementation', 'hive')
conf.set('spark.stage.attempt.ignoreOnDecommissionFetchFailure', 'true')
conf.set('spark.rdd.compress', 'True')
conf.set('spark.decommissioning.timeout.threshold', '20')
conf.set('spark.dynamicAllocation.enabled', 'true')
conf.set('spark.hadoop.fs.s3.getObject.initialSocketTimeoutMilliseconds', '2000')
spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Load the rule in detail
rule_dict = {}
for i in masking_set:
    # Load the rule
    rule = spark.sql(f"select * from datacustoms_{hub_name}.{hub_name}_dmrl where rule_id='{i['mask_rule_id']}'").collect()
    rule_dict[i['mask_rule_id']] = {
            "rsdata_type": rule[0].data_type,
            "action_type": rule[0].action_type
    }

    if rule[0].action_type == "hash" :
        client = boto3.client('secretsmanager')
        rsp = client.get_secret_value(SecretId=rule[0].mask_rule)
        salt_dict = eval(rsp['SecretString'])
        salt = salt_dict['salt']
        rule_dict[i['mask_rule_id']]['mask_rule'] = salt

    if rule[0].action_type == "encrypt_aes":
        client = boto3.client('secretsmanager')
        rsp = client.get_secret_value(SecretId=rule[0].mask_rule)
        aes_info = eval(rsp['SecretString'])
        key = aes_info['aes_key']
        iv = aes_info['aes_iv']
        rule_dict[i['mask_rule_id']]['mask_rule'] = {
            "key":key,
            "iv":iv
        }
    
    if rule[0].action_type == "encrypt_ecc":
        client = boto3.client('secretsmanager')
        rsp = client.get_secret_value(SecretId=rule[0].mask_rule)
        ecc_info = eval(rsp['SecretString'])
        pub_key = ecc_info['pub_key']
        rule_dict[i['mask_rule_id']]['mask_rule'] = {
            "pub_key":pub_key
        }
    
    if rule[0].action_type == "mask" or rule[0].action_type == "list" or rule[0].action_type == 'range':
        rule_dict[i['mask_rule_id']]['mask_rule'] = rule[0].action_type

print(rule_dict)

# Prepare the UDF
## Simple masking
def mask_data(item,pattern,replacement):
    m = re.search(pattern,item)
    if m:
        return re.sub(m.group(1),replacement,item,count=1)
    else:
        return "Unmasked_"+item

## Fetch the random list
def fetch_item(mask_list,item):
    return mask_list[random.randint(0,len(mask_list)-1)]

## Generate the digest
def gen_digest(data,mask_rule):
    ## Get the secret value
    salt = mask_rule
    h = blake2b(digest_size = 32)
    h.update(salt.encode('utf-8'))
    h.update(data.encode('utf-8'))
    return h.hexdigest()

## AES
def aes_encrypt(data,mask_rule):
    ## Get the secret value
    key = mask_rule['aes_key']
    iv = mask_rule['aes_iv']
    cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
    encryptor = cipher.encryptor()
    padder = padding.PKCS7(128).padder()
    b_data = data.encode('utf-8')
    c_data = padder.update(b_data) + padder.finalize()
    en_data = encryptor.update(c_data)
    return base64.b64encode(en_data)

## ECC        
def ecc_encrypt(data,mask_rule):
    ## Get the secret value
    pub_key = mask_rule['pub_key']
    b_data = data.encode('utf-8')
    en_data = encrypt(pub_key, b_data)
    return base64.b64encode(en_data)


## Parse the rule             
def mystrip(el1):
    return el1.strip("'")

def str2list(str1):
    if len(re.findall("'",str1))>0:
        l1 = str1.split(" '")
        return list(map(mystrip,l1))    
    else:
        return str1.split(" ")
            
# Define masking udf
@udf
def masking(data,rule_id):
    if data is None:
        return "unmasked_none"
    
    action_type  = rule_dict[rule_id]['action_type']
    
    ## Process mask action
    if action_type == 'mask':
        mask_rule = rule_dict[rule_id]['mask_rule']
        pattern,replacement = mask_rule.split(";;")
        masked_data = mask_data(data,pattern,replacement)
        return masked_data

    ## Process list action
    if action_type == 'list':
        mask_rule = rule_dict[rule_id]['mask_rule']
        mask_list = mask_rule.split(",")
        masked_data = fetch_item(mask_list,data)
        return masked_data
                  
    ## Process hash(BLAKE2) action
    if action_type == 'hash':
        mask_rule = rule_dict[rule_id]['mask_rule']
        masked_data = gen_digest(data,mask_rule)
        return masked_data
        
    ## Process encrypt_aes action
    if action_type == 'encrypt_aes':
        mask_rule = rule_dict[rule_id]['mask_rule']
        masked_data = aes_encrypt(data,mask_rule)
        return masked_data

    ## Process encrypt_ecc action
    if action_type == 'encrypt_ecc':
        mask_rule = rule_dict[rule_id]['mask_rule']
        masked_data = ecc_encrypt(data,mask_rule)
        return masked_data

#masking = udf(lambda data,rule_id:masking(data,rule_id))
spark.udf.register("masking_data", masking)

# full cols list
full_cols_list = []
cols_list = []
for i in masking_set:
    cols_list = cols_list + i['columns']

# Load the pending masking dataset
df_tgt_meta = spark.sql(f"describe {db}.{table}")
df_tgt_meta.createTempView('tgtMeta')
df_tgt_meta.show()

# If the column name is *, then all column will be masked, but un-supported data type columns should be excluded first.
# Unsupported data type:
# - binary
# - boolean
# - interval
# Otherwise, check the column name whether is correct
def get_col_name(r):
    return r.col_name

# Get the full cols list
full_cols = spark.sql("select * from tgtMeta").collect()
full_cols_list = list(map(get_col_name,full_cols))

print(f"{db}.{table} full cols list: {full_cols_list}")
print(f"Column arguments is: {cols_list}")

for i in masking_set:
    # Load the rule
    rule = spark.sql(f"select * from datacustoms_{hub_name}.{hub_name}_dmrl where rule_id='{i['mask_rule_id']}'").collect()
    rule_id = rule[0].rule_id
    rsdata_type = rule[0].data_type

    if len(cols_list) == 1 and cols_list[0] =='*':
        if rsdata_type == 'string':
            cols=spark.sql("select * from tgtMeta where data_type in ('string','char','varchar')").collect()
        if rsdata_type =='int':
            cols=spark.sql("select * from tgtMeta where data_type in ('int','bigint','tinyint','smallint')").collect()
        if rsdata_type == 'date' or rsdata_type == 'datetime' :
            cols=spark.sql("select * from tgtMeta where data_type in ('date','timestamp')").collect()
        cols_list = list(map(get_col_name,cols))
    else:
        # Verify whether columns is in the pending inspection dataset
        for col in cols_list:
            if rsdata_type == 'string':
                result=spark.sql(f"select * from tgtMeta where data_type in ('string','char','varchar') and col_name="{col}"").collect()
                # If column name is mismatched, remove from cols list.
                if len(result) == 0:
                    cols_list.remove(col)
                    for i in masking_set:
                        if col in i['columns']:
                            i['columns'].remove(col)
            if rsdata_type == 'int':
                result=spark.sql(f"select * from tgtMeta where data_type in ('int','bigint','tinyint','smallint') and col_name="{col}"").collect()
                # If column name is mismatched, remove from cols list.
                if len(result) == 0:
                    cols_list.remove(col)
                    for i in masking_set:
                        if col in i['columns']:
                            i['columns'].remove(col)
            if rsdata_type == 'date' or rsdata_type == 'datetime' :
                result=spark.sql(f"select * from tgtMeta where data_type in ('date','timestamp') and col_name="{col}"").collect()
                # If column name is mismatched, remove from cols list.
                if len(result) == 0:
                    cols_list.remove(col)
                    for i in masking_set:
                        if col in i['columns']:
                            i['columns'].remove(col)
        # If all column names are mismatched, then quit   
        if len(cols_list) == 0:
            print('Column name or Column data type is mismatched')
            quit(-1)
    
print(f"{db}.{table} masking cols list: {cols_list}")


def list_minus(minuend: list, substracter: list):
   list(map(lambda el: minuend.remove(el), substracter))
   return minuend
   
unmasked_cols_list = list_minus(full_cols_list,cols_list)

print(f"{db}.{table} unmasked cols list: {unmasked_cols_list}")   


# Take a copy of table
query1 = f"select * from {db}.{table}"
df1 = spark.sql(query1)

# Append the columns
mrule_list = []
cnt = 0
for i in masking_set:
    mrule_list.append(f"'{i['mask_rule_id']}' as rule_id_{cnt}")
    cnt = cnt+1

query2 = "select *," + ",".join(mrule_list) + f" from {db}.{table}"
print(query2)
df2 = spark.sql(query2) 
df2.show()
df2.createTempView(f"{table}_with_rule_id")

# Generate the masked columns
masked_list = []
cnt = 0
for i in masking_set:
    for c in i['columns']:
        masked_list.append(f"masking_data({c},rule_id_{cnt}) as masked_{c}")
    cnt = cnt+1

query3 = "select *," + ",".join(masked_list) + f" from {table}_with_rule_id"
print(query3)
df3 = spark.sql(query3)
df3.show()

# Drop the cols: masked cols, rule cols
drop_list = []
cnt = 0
for i in masking_set:
    drop_list.append(f"rule_id_{cnt}")
    cnt = cnt+1

for i in masking_set:
    for c in i['columns']:
        drop_list.append(c)

df_result = df3.drop(*drop_list)

# Save the detected result
timepoint = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
masked_table_name = f"{db}_{table}_masked_{timepoint}"
df_result.write.parquet(f"s3://{bucket}/{masked_table_name}/")

# Save the result
spark.catalog.setCurrentDatabase(f"datacustoms_{hub_name}_masked")
spark.catalog.createExternalTable(f'{masked_table_name}', path = f"s3://{bucket}/{masked_table_name}/", schema = df_result.schema)

# Notify

spark-submit --deploy-mode client --archives s3://picomy-libs2/emr/archives/pyspark_venv.tar.gz#environment pyspark-dm.py --HUB_NAME bba --DATABASE_NAME default --TABLE_NAME primary_school --MASKING_SET "[{"mask_rule_id": "bba_010", "columns": ["cn_nid"]}, {"mask_rule_id": "bba_003", "columns": ["grade", "full_name"]}]" --RESULT_BUCKET picomy-dst --EMAIL yangshuaijun@outlook.com --REGION_NAME cn-north-1
spark-submit --deploy-mode client --archives s3://picomy-libs2/emr/archives/pyspark_venv.tar.gz#environment pyspark-dm.py --HUB_NAME bba --DATABASE_NAME default --TABLE_NAME primary_school --MASKING_SET "[{"mask_rule_id": "bba_010", "columns": ["cn_nid"]}, {"mask_rule_id": "bba_003", "columns": ["grade", "full_name"]}]" --RESULT_BUCKET picomy-dst --EMAIL yangshuaijun@outlook.com --REGION_NAME cn-north-1

{'bba_010': {'rsdata_type': 'string', 'action_type': 'encrypt_ecc', 'mask_rule': {'pub_key': '0x05232399a1e7057e37ae54bad5b4bc0a534ec6bfdc1eb78e03dc6ba32df51581ccc1f96ed8f1998cda615c37160b91625d9d210a06b7a392e9f0a27cc7b4e564'}}, 'bba_003': {'rsdata_type': 'string', 'action_type': 'hash', 'mask_rule': 'fd09370b415afe20dadf69449b8dd69bc9b88d5520f241ce19b0518268646b2af6295d29c15afdd66dfe2fbc1d1f26133f10161a6b303f44173ba25ca252cd89'}}
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 437, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle/cloudpickle_fast.py", line 101, in dumps
    cp.dump(obj)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle/cloudpickle_fast.py", line 540, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib64/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/usr/lib64/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
    save(element)
  File "/usr/lib64/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle/cloudpickle_fast.py", line 722, in save_function
    *self._dynamic_function_reduce(obj), obj=obj
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle/cloudpickle_fast.py", line 664, in _save_reduce_pickle5
    save(state)
  File "/usr/lib64/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
    save(element)
  File "/usr/lib64/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/usr/lib64/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/usr/lib64/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle/cloudpickle_fast.py", line 722, in save_function
    *self._dynamic_function_reduce(obj), obj=obj
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle/cloudpickle_fast.py", line 664, in _save_reduce_pickle5
    save(state)
  File "/usr/lib64/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
    save(element)
  File "/usr/lib64/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/usr/lib64/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/usr/lib64/python3.7/pickle.py", line 524, in save
    rv = reduce(self.proto)
TypeError: can't pickle _ModuleWithDeprecations objects
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 437, in dumps
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle/cloudpickle_fast.py", line 101, in dumps
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle/cloudpickle_fast.py", line 540, in dump
  File "/usr/lib64/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/usr/lib64/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
    save(element)
  File "/usr/lib64/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle/cloudpickle_fast.py", line 722, in save_function
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle/cloudpickle_fast.py", line 664, in _save_reduce_pickle5
  File "/usr/lib64/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
    save(element)
  File "/usr/lib64/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/usr/lib64/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/usr/lib64/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle/cloudpickle_fast.py", line 722, in save_function
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle/cloudpickle_fast.py", line 664, in _save_reduce_pickle5
  File "/usr/lib64/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
    save(element)
  File "/usr/lib64/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/usr/lib64/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/usr/lib64/python3.7/pickle.py", line 524, in save
    rv = reduce(self.proto)
TypeError: can't pickle _ModuleWithDeprecations objects

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/hadoop/pyspark-dm.py", line 213, in <module>
    spark.udf.register("masking_data", masking)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 361, in register
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 161, in _judf
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 170, in _create_judf
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 34, in _wrap_function
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2814, in _prepare_for_python_RDD
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 447, in dumps
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _ModuleWithDeprecations objects
22/05/09 11:21:19 INFO SparkContext: Invoking stop() from shutdown hook
22/05/09 11:21:19 INFO AbstractConnector: Stopped Spark@4f695157{HTTP/1.1, (http/1.1)}{0.0.0.0:4040}
22/05/09 11:21:19 INFO SparkUI: Stopped Spark web UI at http://ip-172-31-28-206.cn-north-1.compute.internal:4040
22/05/09 11:21:19 INFO YarnClientSchedulerBackend: Interrupting monitor thread
22/05/09 11:21:19 INFO YarnClientSchedulerBackend: Shutting down all executors
22/05/09 11:21:19 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
22/05/09 11:21:19 INFO YarnClientSchedulerBackend: YARN client scheduler backend Stopped
22/05/09 11:21:19 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/05/09 11:21:19 INFO MemoryStore: MemoryStore cleared
22/05/09 11:21:19 INFO BlockManager: BlockManager stopped
22/05/09 11:21:19 INFO BlockManagerMaster: BlockManagerMaster stopped
22/05/09 11:21:19 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/05/09 11:21:19 INFO SparkContext: Successfully stopped SparkContext
22/05/09 11:21:19 INFO ShutdownHookManager: Shutdown hook called
22/05/09 11:21:19 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-9224b8d0-355a-4f91-8f0e-a22ec00722fe
22/05/09 11:21:19 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-9224b8d0-355a-4f91-8f0e-a22ec00722fe/pyspark-ad4cafe0-f8e2-4207-a0ce-df906f617866
22/05/09 11:21:19 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-031ffaac-4bbd-4166-8c88-eb676304505c

Leave a Comment