Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
0.8.0
-
None
-
None
-
Cloudera cdh5.13.3
Cloudera Spark 2.3.0.cloudera3
Description
Creating this in Arrow project as the traceback seems to suggest this is an issue in Arrow.
Continuation from the conversation on the https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=P_1Nst5AjjCRg0MExO5Kby9i-g@mail.gmail.com%3E
When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 279, in load_stream for batch in reader: File "pyarrow/ipc.pxi", line 265, in __iter__ File "pyarrow/ipc.pxi", line 281, in pyarrow.lib._RecordBatchReader.read_next_batch File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status pyarrow.lib.ArrowIOError: read length must be positive or -1
as my dataset size starts increasing that I want to group on. Here is a reproducible code snippet where I can reproduce this.
Note: My actual dataset is much larger and has many more unique IDs and is a valid usecase where I cannot simplify this groupby in any way. I have stripped out all the logic to make this example as simple as I could.
import os os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell' import findspark findspark.init() import pyspark from pyspark.sql import functions as F, types as T import pandas as pd spark = pyspark.sql.SparkSession.builder.getOrCreate() pdf1 = pd.DataFrame( [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]], columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4'] ) df1 = spark.createDataFrame(pd.concat([pdf1 for i in range(429)]).reset_index()).drop('index') pdf2 = pd.DataFrame( [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", "abcdefghijklmno"]], columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6'] ) df2 = spark.createDataFrame(pd.concat([pdf2 for i in range(48993)]).reset_index()).drop('index') df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner') def myudf(df): return df df4 = df3 udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf) df5 = df4.groupBy('df1_c1').apply(udf) print('df5.count()', df5.count()) # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per executor too.
Attachments
Attachments
Issue Links
- relates to
-
SPARK-32294 GroupedData Pandas UDF 2Gb limit
- Resolved