Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-1039

Python: pyarrow.Filesystem.read_parquet causing error if nthreads>1

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.3.0
    • Fix Version/s: 0.4.1
    • Component/s: Python
    • Labels:
      None

      Description

      Currently I have the code:

      client = HdfsClient("hdfshost", 8020, "myuser", driver='libhdfs3')
      parquet_file = '/my/parquet/file'
      parquet = client.read_parquet(parquet_file, nthreads=1)
      df = parquet.to_pandas()
      

      This works as expected. If I make nthreads=2 however I get:

      2017-05-16 02:59:36.414677, p116977, th140677336123136, ERROR InputStreamImpl: failed to read Block: [block pool ID: BP-1695827161-10.87.14.23-1472240973777 block ID 1080474497_6733673] file /my/parquet/file/part-00001-e32bec64-fdcc-4c42-a292-c837a081310b.snappy.parquet from Datanode: hdfshost1(1.2.3.4), 
      RemoteBlockReader.cpp: 304: ChecksumException: RemoteBlockReader: checksum not match for Block: [block pool ID: BP-1695827161-10.87.14.23-1472240973777 block ID 1080474497_6733673], on Datanode: hdfshost1(1.2.3.4)
              @       Hdfs::Internal::RemoteBlockReader::verifyChecksum(int)
              @       Hdfs::Internal::RemoteBlockReader::readNextPacket()
              @       Hdfs::Internal::RemoteBlockReader::read(char*, int)
              @       Hdfs::Internal::InputStreamImpl::readOneBlock(char*, int, bool)
              @       Hdfs::Internal::InputStreamImpl::readInternal(char*, int)
              @       Hdfs::Internal::InputStreamImpl::read(char*, int)
              @       hdfsRead
              @       arrow::io::HdfsReadableFile::ReadAt(long, long, std::shared_ptr<arrow::Buffer>*)
              @       parquet::ArrowInputFile::ReadAt(long, long)
              @       parquet::InMemoryInputStream::InMemoryInputStream(parquet::RandomAccessSource*, long, long)
              @       parquet::SerializedRowGroup::GetColumnPageReader(int)
              @       parquet::RowGroupReader::Column(int)
              @       parquet::arrow::AllRowGroupsIterator::Next()
              @       parquet::arrow::ColumnReader::Impl::NextRowGroup()
              @       parquet::arrow::ColumnReader::Impl::Impl(arrow::MemoryPool*, std::unique_ptr<parquet::arrow::FileColumnIterator, std::default_delete<parquet::arrow::FileColumnIterator> >)
              @       parquet::arrow::FileReader::Impl::GetColumn(int, std::unique_ptr<parquet::arrow::ColumnReader, std::default_delete<parquet::arrow::ColumnReader> >*)
              @       parquet::arrow::FileReader::Impl::ReadColumn(int, std::shared_ptr<arrow::Array>*)
              @       parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*)::{lambda(int)#1}::operator()(int) const
              @       std::thread::_Impl<std::_Bind_simple<arrow::Status parquet::arrow::ParallelFor<parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*)::{lambda(int)#1}&>(int, int, parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*)::{lambda(int)#1}&)::{lambda()#1} ()> >::_M_run()
              @       Unknown
              @       start_thread
              @       clone
      , retry read again from another Datanode.
      2017-05-16 02:59:36.414858, p116977, th140677336123136, INFO IntputStreamImpl: Add invalid datanode hdfshost1(1.2.3.4) to failed datanodes and try another datanode again for file /my/parquet/file/part-00001-e32bec64-fdcc-4c42-a292-c837a081310b.snappy.parquet.
      2017-05-16 02:59:36.424118, p116977, th140677702768384, ERROR InputStreamImpl: failed to read Block: [block pool ID: BP-1695827161-10.87.14.23-1472240973777 block ID 1080474497_6733673] file /my/parquet/file/part-00001-e32bec64-fdcc-4c42-a292-c837a081310b.snappy.parquet from Datanode: hdfshost2(1.2.3.5), 
      RemoteBlockReader.cpp: 205: HdfsIOException: RemoteBlockReader: failed to read block header for Block: [block pool ID: BP-1695827161-10.87.14.23-1472240973777 block ID 1080474497_6733673] from Datanode: hdfshost1(1.2.3.4) .
              @       Hdfs::Internal::RemoteBlockReader::readPacketHeader()
              @       Hdfs::Internal::RemoteBlockReader::readNextPacket()
              @       Hdfs::Internal::RemoteBlockReader::read(char*, int)
              @       Hdfs::Internal::InputStreamImpl::readOneBlock(char*, int, bool)
              @       Hdfs::Internal::InputStreamImpl::readInternal(char*, int)
              @       Hdfs::Internal::InputStreamImpl::read(char*, int)
              @       hdfsRead
              @       arrow::io::HdfsReadableFile::ReadAt(long, long, std::shared_ptr<arrow::Buffer>*)
              @       parquet::ArrowInputFile::ReadAt(long, long)
              @       parquet::InMemoryInputStream::InMemoryInputStream(parquet::RandomAccessSource*, long, long)
              @       parquet::SerializedRowGroup::GetColumnPageReader(int)
              @       parquet::RowGroupReader::Column(int)
              @       parquet::arrow::AllRowGroupsIterator::Next()
              @       parquet::arrow::ColumnReader::Impl::NextRowGroup()
              @       parquet::arrow::ColumnReader::Impl::Impl(arrow::MemoryPool*, std::unique_ptr<parquet::arrow::FileColumnIterator, std::default_delete<parquet::arrow::FileColumnIterator> >)
              @       parquet::arrow::FileReader::Impl::GetColumn(int, std::unique_ptr<parquet::arrow::ColumnReader, std::default_delete<parquet::arrow::ColumnReader> >*)
              @       parquet::arrow::FileReader::Impl::ReadColumn(int, std::shared_ptr<arrow::Array>*)
              @       parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*)::{lambda(int)#1}::operator()(int) const
              @       std::thread::_Impl<std::_Bind_simple<arrow::Status parquet::arrow::ParallelFor<parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*)::{lambda(int)#1}&>(int, int, parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*)::{lambda(int)#1}&)::{lambda()#1} ()> >::_M_run()
              @       Unknown
              @       start_thread
              @       clone
      Caused by
      PacketHeader.cpp: 109: HdfsIOException: Invalid PacketHeader, packetLen is 1228668939, protoLen is 1290, buf size is 31
              @       Hdfs::Internal::PacketHeader::readFields(char const*, unsigned long)
              @       Hdfs::Internal::RemoteBlockReader::readPacketHeader()
              @       Hdfs::Internal::RemoteBlockReader::readNextPacket()
              @       Hdfs::Internal::RemoteBlockReader::read(char*, int)
              @       Hdfs::Internal::InputStreamImpl::readOneBlock(char*, int, bool)
              @       Hdfs::Internal::InputStreamImpl::readInternal(char*, int)
              @       Hdfs::Internal::InputStreamImpl::read(char*, int)
              @       hdfsRead
              @       arrow::io::HdfsReadableFile::ReadAt(long, long, std::shared_ptr<arrow::Buffer>*)
              @       parquet::ArrowInputFile::ReadAt(long, long)
              @       parquet::InMemoryInputStream::InMemoryInputStream(parquet::RandomAccessSource*, long, long)
              @       parquet::SerializedRowGroup::GetColumnPageReader(int)
              @       parquet::RowGroupReader::Column(int)
              @       parquet::arrow::AllRowGroupsIterator::Next()
              @       parquet::arrow::ColumnReader::Impl::NextRowGroup()
              @       parquet::arrow::ColumnReader::Impl::Impl(arrow::MemoryPool*, std::unique_ptr<parquet::arrow::FileColumnIterator, std::default_delete<parquet::arrow::FileColumnIterator> >)
              @       parquet::arrow::FileReader::Impl::GetColumn(int, std::unique_ptr<parquet::arrow::ColumnReader, std::default_delete<parquet::arrow::ColumnReader> >*)
              @       parquet::arrow::FileReader::Impl::ReadColumn(int, std::shared_ptr<arrow::Array>*)
              @       parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*)::{lambda(int)#1}::operator()(int) const
              @       std::thread::_Impl<std::_Bind_simple<arrow::Status parquet::arrow::ParallelFor<parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*)::{lambda(int)#1}&>(int, int, parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, std::allocator<int> > const&, std::shared_ptr<arrow::Table>*)::{lambda(int)#1}&)::{lambda()#1} ()> >::_M_run()
              @       Unknown
              @       start_thread
              @       clone
      , retry read again from another Datanode.
      2017-05-16 02:59:36.424266, p116977, th140677702768384, INFO IntputStreamImpl: Add invalid datanode hdfshost2(1.2.3.5) to failed datanodes and try another datanode again for file /my/parquet/file/part-00001-e32bec64-fdcc-4c42-a292-c837a081310b.snappy.parquet.
      terminate called after throwing an instance of 'parquet::ParquetException'
        what():  Unable to read column chunk data
      Aborted
      

      If nthreads>2 I get:

      Segmentation fault
      

      I'm running this in a conda environment with:

      pyarrow                   0.3.0.post          np112py27_1    conda-forge
      parquet-cpp               1.1.0pre                      2    conda-forge
      arrow-cpp                 0.3.0.post          np112py27_1    conda-forge
      libhdfs3                  2.2.31                        1  
      

        Attachments

          Activity

            People

            • Assignee:
              wesm Wes McKinney
              Reporter:
              jporritt James Porritt
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: