Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-4890

[Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.8.0
    • None
    • Python
    • None
    • Cloudera cdh5.13.3
      Cloudera Spark 2.3.0.cloudera3

    Description

      Creating this in Arrow project as the traceback seems to suggest this is an issue in Arrow.
      Continuation from the conversation on the https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=P_1Nst5AjjCRg0MExO5Kby9i-g@mail.gmail.com%3E

      When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:

        File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 279, in load_stream
          for batch in reader:
        File "pyarrow/ipc.pxi", line 265, in __iter__
        File "pyarrow/ipc.pxi", line 281, in pyarrow.lib._RecordBatchReader.read_next_batch
        File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
      pyarrow.lib.ArrowIOError: read length must be positive or -1
      

      as my dataset size starts increasing that I want to group on. Here is a reproducible code snippet where I can reproduce this.
      Note: My actual dataset is much larger and has many more unique IDs and is a valid usecase where I cannot simplify this groupby in any way. I have stripped out all the logic to make this example as simple as I could.

      import os
      
      os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
      import findspark
      findspark.init()
      import pyspark
      from pyspark.sql import functions as F, types as T
      import pandas as pd
      
      spark = pyspark.sql.SparkSession.builder.getOrCreate()
      
      pdf1 = pd.DataFrame(
      	[[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
      	columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
      )
      df1 = spark.createDataFrame(pd.concat([pdf1 for i in range(429)]).reset_index()).drop('index')
      
      pdf2 = pd.DataFrame(
      	[[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", "abcdefghijklmno"]],
      	columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
      )
      df2 = spark.createDataFrame(pd.concat([pdf2 for i in range(48993)]).reset_index()).drop('index')
      df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
      
      def myudf(df):
          return df
      
      df4 = df3
      udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
      
      df5 = df4.groupBy('df1_c1').apply(udf)
      print('df5.count()', df5.count())
      
      # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
      

      I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per executor too.

      Attachments

        1. Task retry fails.png
          60 kB
          Arvind Ravish
        2. image-2019-07-04-12-03-57-002.png
          60 kB
          Arvind Ravish

        Issue Links

          Activity

            People

              Unassigned Unassigned
              AbdealiJK Abdeali Kothari
              Votes:
              7 Vote for this issue
              Watchers:
              12 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: