Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-18317

[Go] Problem of dictionary update during a communication via an IPC stream

    XMLWordPrintableJSON

Details

    Description

      Dictionaries do not seem to be updated correctly when sending a record on an IPC stream. 

      The following example creates a 1st record with a single field named "field" and initialized with the value "value_0. This record is then serialized with an ipc writer and deserialized with an ipc reader.

      A second record is then created with the value "value_1". After serialization and deserialization, the expected value for the field is "value_1" but I get "value_0". 

      Based on a quick analysis via the debugger, I suspect an error in combining the dictionary from step 1 with the dictionary from step 2. The resulting dictionary contains the concatenation of the two dictionaries (i.e. value_0value_1), but the offsets values used to read the field (of the second record) refer "value_0". It may be that the offset arrays are not correctly combined or something like that when the second record is received.

      Below a code snippet to reproduce the issue.

      // NOTE: Release methods are not managed in this test for simplicity.
      func TestDictionary(t *testing.T) {
         pool := memory.NewGoAllocator()
         // A schema with a single dictionary field
         schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: &arrow.DictionaryType{
            IndexType: arrow.PrimitiveTypes.Uint16,
            ValueType: arrow.BinaryTypes.String,
            Ordered:   false,
         }}}, nil)
         // IPC writer and reader
         var bufWriter bytes.Buffer
         ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema))
         bufReader := bytes.NewReader([]byte{})
         var ipcReader *ipc.Reader
      
         // Create a first record with field = "value_0" 
         record := CreateRecord(t, pool, schema, 0)
         expectedJson, err := record.MarshalJSON()
         require.NoError(t, err)
         // Serialize and deserialize the record via an IPC stream
         json, ipcReader, err := EncodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader)
         require.NoError(t, err)
         // Compare the expected JSON with the actual JSON
         require.JSONEq(t, string(expectedJson), string(json))
      
         // Create a second record with field = "value_1" 
         record = CreateRecord(t, pool, schema, 1)
         expectedJson, err = record.MarshalJSON()
         require.NoError(t, err)
         // Serialize and deserialize the record via an IPC stream
         json, ipcReader, err = EncodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader)
         require.NoError(t, err)
         // Compare the expected JSON with the actual JSON
         // field = "value_0" but should be "value_1"
         require.JSONEq(t, string(expectedJson), string(json))
      }
      
      // Create a record with a single field.
      // The value of field `field` depends on the value passed in parameter.
      func CreateRecord(t *testing.T, pool memory.Allocator, schema *arrow.Schema, value int) arrow.Record {
         rb := array.NewRecordBuilder(pool, schema)
         fieldB := rb.Field(0).(*array.BinaryDictionaryBuilder)
         err := fieldB.AppendString(fmt.Sprintf("value_%d", value))
         if err != nil {
            t.Fatal(err)
         }
         return rb.NewRecord()
      }
      
      // Encode and decode a record over a tuple of IPC writer and reader.
      // IPC writer and reader are the same from one call to another.
      func EncodeDecodeIpcStream(t *testing.T,
         record arrow.Record,
         bufWriter *bytes.Buffer, ipcWriter *ipc.Writer,
         bufReader *bytes.Reader, ipcReader *ipc.Reader) ([]byte, *ipc.Reader, error) {
      
         // Serialize the record via an ipc writer
         if err := ipcWriter.Write(record); err != nil {
            return nil, ipcReader, err
         }
         serializedRecord := bufWriter.Bytes()
         bufWriter.Reset()
      
         // Deserialize the record via an ipc reader
         bufReader.Reset(serializedRecord)
         if ipcReader == nil {
            newIpcReader, err := ipc.NewReader(bufReader)
            if err != nil {
               return nil, newIpcReader, err
            }
            ipcReader = newIpcReader
         }
         ipcReader.Next()
         record = ipcReader.Record()
      
         // Return the decoded record as a json string
         json, err := record.MarshalJSON()
         if err != nil {
            return nil, ipcReader, err
         }
         return json, ipcReader, nil
      } 

      Attachments

        Issue Links

          Activity

            People

              zeroshade Matthew Topol
              lquerel Laurent Querel
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 40m
                  40m