python – How to configure parameters for Google Cloud Dataflow flex templates?

I have a parameter called –file_delimiter in my dataflow flex template job. This parameter takes ‘,’ or ‘|’ values ​​as input.

In my beam pipeline, I am passing this as the argument for the read_csv transform.

df = p | read_csv(input_file,sep=known_args.file_delimiter)

argument parser code:

parser.add_argument(
        "--file_delimiter",
        default=",",
    )

when I run my dataflow job using the following command, It works fine:

python test.py --output_table $PROJECT:$Dataset.$table --input_file $file  --runner=DataflowRunner --project=$PROJECT--job_name=titles-df  --temp_location=gs://ingest-test1/temp --region=us-central1 --delimiter ,

But when I create a flex template and run the command below. The job fails

gcloud dataflow flex-template run "titles-template-`date +%Y%m%d-%H%M%S`" 
--template-file-gcs-location "$TEMPLATE_PATH" 
--parameters input_file="gs://ingest-test1/titles.csv" 
--parameters output_table="$PROJECT:templateOutput.titles" 
--parameters file_delimiter=","  --region "$REGION"

job logs:

Error occurred in the launcher container: Template launch failed. See console logs.

console logs:

"message":"ValueError: only single character unicode strings can be converted to Py_UCS4, got length 0"}

I don’t understand why it works for a normal dataflow job but not for the flex-template job. I am I supposed to pass “,” to the –file delimiter parameter? why is it showing length 0 when I did pass the string “,”.

I also want to mention, that even when I don’t pass anything for –file_delimiter, the flex template job throws the same error. But when I don’t pass anything for the normal dataflow job, it is using the default value for the parameter which is “,” and is able to run successfully.

Complete Console logs:

{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.386919","line":"python_template_launcher.go:40","message":"Started template launcher."}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.387097","line":"python_template_launcher.go:44","message":"Initialize Python template."}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.387111","line":"python_template.go:93","message":"Falling back to using template-container args from metadata: template-container-args"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.388666","line":"python_template.go:102","message":"Validating metadata template-container-args: {"consoleLogsLocation":"gs://dataflow-staging-us-central1-1075620756053/staging/template_launches/2022-06-11_23_41_36-12248159446928913945/console_logs","environment":{"region":"us-central1","serviceAccountEmail":"1075620756053-compute@developer.gserviceaccount.com","stagingLocation":"gs://dataflow-staging-us-central1-1075620756053/staging","tempLocation":"gs://dataflow-staging-us-central1-1075620756053/tmp"},"jobId":"2022-06-11_23_41_36-12248159446928913945","jobName":"titles-template-default-20220612-064135","jobObjectLocation":"gs://dataflow-staging-us-central1-1075620756053/staging/template_launches/2022-06-11_23_41_36-12248159446928913945/job_object","operationResultLocation":"gs://dataflow-staging-us-central1-1075620756053/staging/template_launches/2022-06-11_23_41_36-12248159446928913945/operation_result","parameters":{"file_delimiter":"","input_file":"gs://ingest-test1/titles.csv","output_table":"hidden-mapper-351214:templateOutput.titles-default","staging_location":"gs://dataflow-staging-us-central1-1075620756053/staging","temp_location":"gs://dataflow-staging-us-central1-1075620756053/tmp"},"projectId":"hidden-mapper-351214"}"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389043","line":"python_template.go:111","message":"Extracting operation result location."}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389065","line":"python_template.go:119","message":"Operation result location: gs://dataflow-staging-us-central1-1075620756053/staging/template_launches/2022-06-11_23_41_36-12248159446928913945/operation_result"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389081","line":"python_template.go:122","message":"Extracting console log location."}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389091","line":"python_template.go:130","message":"Console logs location: gs://dataflow-staging-us-central1-1075620756053/staging/template_launches/2022-06-11_23_41_36-12248159446928913945/console_logs"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389106","line":"python_template.go:133","message":"Extracting Python command specs."}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389640","line":"python_template.go:142","message":"Generating launch args."}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389767","line":"python_args.go:236","message":"Overriding staging_location with value: gs://dataflow-staging-us-central1-1075620756053/staging (previous value: gs://dataflow-staging-us-central1-1075620756053/staging)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389823","line":"python_args.go:236","message":"Overriding temp_location with value: gs://dataflow-staging-us-central1-1075620756053/tmp (previous value: gs://dataflow-staging-us-central1-1075620756053/tmp)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389879","line":"launch.go:47","message":"Validating ExpectedFeatures."}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389896","line":"launch.go:72","message":"Launching Python template."}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389914","line":"python_template.go:64","message":"Using launch args: [/template/ingest-file-bq.py --requirements_file=/template/requirements.txt --runner=DataflowRunner --project=hidden-mapper-351214 --template_location=gs://dataflow-staging-us-central1-1075620756053/staging/template_launches/2022-06-11_23_41_36-12248159446928913945/job_object --temp_location=gs://dataflow-staging-us-central1-1075620756053/tmp --staging_location=gs://dataflow-staging-us-central1-1075620756053/staging --input_file=gs://ingest-test1/titles.csv --job_name=titles-template-default-20220612-064135 --region=us-central1 --service_account_email=1075620756053-compute@developer.gserviceaccount.com --file_delimiter= --output_table=hidden-mapper-351214:templateOutput.titles-default]"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389964","line":"exec.go:38","message":"Executing: python /template/ingest-file-bq.py --requirements_file=/template/requirements.txt --runner=DataflowRunner --project=hidden-mapper-351214 --template_location=gs://dataflow-staging-us-central1-1075620756053/staging/template_launches/2022-06-11_23_41_36-12248159446928913945/job_object --temp_location=gs://dataflow-staging-us-central1-1075620756053/tmp --staging_location=gs://dataflow-staging-us-central1-1075620756053/staging --input_file=gs://ingest-test1/titles.csv --job_name=titles-template-default-20220612-064135 --region=us-central1 --service_account_email=1075620756053-compute@developer.gserviceaccount.com --file_delimiter= --output_table=hidden-mapper-351214:templateOutput.titles-default"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.308089","line":"exec.go:66","message":"INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds."}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.308476","line":"exec.go:66","message":"INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds."}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.312666","line":"exec.go:66","message":"INFO:oauth2client.transport:Attempting refresh to obtain initial access_token"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644630","line":"exec.go:66","message":"Traceback (most recent call last):"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644687","line":"exec.go:66","message":"  File "/template/ingest-file-bq.py", line 96, in u003cmoduleu003e"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644715","line":"exec.go:66","message":"    run()"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644725","line":"exec.go:66","message":"  File "/template/ingest-file-bq.py", line 83, in run"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644737","line":"exec.go:66","message":"    df = p | read_csv(input_file,sep=known_args.file_delimiter,dtype=object,header=0,names=headers)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644760","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", line 614, in __ror__"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644773","line":"exec.go:66","message":"    result = p.apply(self, pvalueish, label)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644796","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 708, in apply"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644813","line":"exec.go:66","message":"    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644859","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 141, in apply"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644873","line":"exec.go:66","message":"    return super().apply(transform, input, options)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644884","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 185, in apply"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644896","line":"exec.go:66","message":"    return m(transform, input, options)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644905","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 215, in apply_PTransform"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644924","line":"exec.go:66","message":"    return transform.expand(input)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644934","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/apache_beam/dataframe/io.py", line 250, in expand"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644946","line":"exec.go:66","message":"    self.reader(handle, *self.args, **dict(self.kwargs, chunksize=100)))"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644975","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/pandas/util/_decorators.py", line 311, in wrapper"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644987","line":"exec.go:66","message":"    return func(*args, **kwargs)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644996","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/pandas/io/parsers/readers.py", line 586, in read_csv"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645007","line":"exec.go:66","message":"    return _read(filepath_or_buffer, kwds)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645021","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/pandas/io/parsers/readers.py", line 482, in _read"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645033","line":"exec.go:66","message":"    parser = TextFileReader(filepath_or_buffer, **kwds)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645043","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/pandas/io/parsers/readers.py", line 811, in __init__"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645054","line":"exec.go:66","message":"    self._engine = self._make_engine(self.engine)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645064","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/pandas/io/parsers/readers.py", line 1040, in _make_engine"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645075","line":"exec.go:66","message":"    return mapping[engine](self.f, **self.options)  # type: ignore[call-arg]"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645086","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/pandas/io/parsers/c_parser_wrapper.py", line 69, in __init__"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645098","line":"exec.go:66","message":"    self._reader = parsers.TextReader(self.handles.handle, **kwds)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645108","line":"exec.go:66","message":"  File "pandas/_libs/parsers.pyx", line 401, in pandas._libs.parsers.TextReader.__cinit__"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645119","line":"exec.go:66","message":"ValueError: only single character unicode strings can be converted to Py_UCS4, got length 0"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.881335","line":"exec.go:52","message":"python failed with exit status 1"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.881396","line":"launch.go:77","message":"Template launch failed: exit status 1"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.881414","line":"launch.go:99","message":"Uploading console logs to gcs location: gs://dataflow-staging-us-central1-1075620756053/staging/template_launches/2022-06-11_23_41_36-12248159446928913945/console_logs"}

metadata.json

{
    "name": "CSV-BQ beam Python flex template",
    "description": "flex template to ingest files into BQ",
    "parameters": [
      {
        "name": "input_file",
        "label": "Input csv file gcs path",
        "helpText": "gcscpath of the file"
       
      },
      {
        "name": "output_table",
        "label": "BigQuery output table name.",
        "helpText": "Name of the BigQuery output table name.",
        "isOptional": true,
        "regexes": [
          "([^:]+:)?[^.]+[.].+"
        ]
      },
      {
        "name": "file_delimiter",
        "label": "delimiter used in the file",
        "helpText": "pass the character used as delimited eg: , or | ",
        "isOptional": true
      }
    ]
  }

Leave a Comment