Skip to content

Commit

Permalink
Fixes incorrect work of DataFrame with VBufferColumn when number of e… (
Browse files Browse the repository at this point in the history
dotnet#6851)

* Fixes incorrect work of DataFrame with VBufferColumn when number of elements is greater than Int.MaxValue

* Fix calculation of max capacity and amount of required buffers

* Fix unit test

* Run test allocating more than 2 Gb of memory on 64bit env only

* Fix StringDataFrameColumn same way as VBufferDataFrameColumn

* Fix wrong amount of buffers created in constructor of StringDataFrameColumn

* Fix code review findings
  • Loading branch information
asmirnov82 authored Oct 4, 2023
1 parent 5cf6051 commit 64d7ebd
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 53 deletions.
18 changes: 18 additions & 0 deletions src/Microsoft.Data.Analysis/ArrayUtility.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.Text;

namespace Microsoft.Data.Analysis
{
internal static class ArrayUtility
{
// Maximum size of one-dimensional array.
// See: https://msdn.microsoft.com/en-us/library/hh285054(v=vs.110).aspx
// Polyfilling Array.MaxLength API for netstandard2.0
public const int ArrayMaxSize = 0X7FEFFFFF;
}
}
2 changes: 1 addition & 1 deletion src/Microsoft.Data.Analysis/DataFrameBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void EnsureCapacity(int numberOfValues)
if (newLength > Capacity)
{
//Double buffer size, but not higher than MaxByteCapacity
var doubledSize = (int)Math.Min((long)ReadOnlyBuffer.Length * 2, MaxCapacityInBytes);
var doubledSize = (int)Math.Min((long)ReadOnlyBuffer.Length * 2, ArrayUtility.ArrayMaxSize);
var newCapacity = Math.Max(newLength * Size, doubledSize);

var memory = new Memory<byte>(new byte[newCapacity]);
Expand Down
6 changes: 1 addition & 5 deletions src/Microsoft.Data.Analysis/ReadOnlyDataFrameBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,7 @@ public ReadOnlyMemory<T> RawReadOnlyMemory

protected int Capacity => ReadOnlyBuffer.Length / Size;

//The maximum size in any single dimension for byte array is 0x7FFFFFc7 - 2147483591
//See https://learn.microsoft.com/en-us/dotnet/framework/configure-apps/file-schema/runtime/gcallowverylargeobjects-element
public const int MaxCapacityInBytes = 2147483591;

public static int MaxCapacity => MaxCapacityInBytes / Size;
public static int MaxCapacity => ArrayUtility.ArrayMaxSize / Size;

public ReadOnlySpan<T> ReadOnlySpan
{
Expand Down
46 changes: 26 additions & 20 deletions src/Microsoft.Data.Analysis/StringDataFrameColumn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using Microsoft.ML;
using Microsoft.ML.Data;

Expand All @@ -17,15 +18,17 @@ namespace Microsoft.Data.Analysis
/// <remarks> Is NOT Arrow compatible </remarks>
public partial class StringDataFrameColumn : DataFrameColumn, IEnumerable<string>
{
public static int MaxCapacity = ArrayUtility.ArrayMaxSize / Unsafe.SizeOf<IntPtr>(); // Max Size in bytes / size of pointer (8 bytes on x64)

private readonly List<List<string>> _stringBuffers = new List<List<string>>(); // To store more than intMax number of strings

public StringDataFrameColumn(string name, long length = 0) : base(name, length, typeof(string))
{
int numberOfBuffersRequired = Math.Max((int)(length / int.MaxValue), 1);
int numberOfBuffersRequired = (int)(length / MaxCapacity + 1);
for (int i = 0; i < numberOfBuffersRequired; i++)
{
long bufferLen = length - _stringBuffers.Count * int.MaxValue;
List<string> buffer = new List<string>((int)Math.Min(int.MaxValue, bufferLen));
long bufferLen = length - _stringBuffers.Count * MaxCapacity;
List<string> buffer = new List<string>((int)Math.Min(MaxCapacity, bufferLen));
_stringBuffers.Add(buffer);
for (int j = 0; j < bufferLen; j++)
{
Expand Down Expand Up @@ -64,7 +67,7 @@ protected internal override void Resize(long length)
public void Append(string value)
{
List<string> lastBuffer = _stringBuffers[_stringBuffers.Count - 1];
if (lastBuffer.Count == int.MaxValue)
if (lastBuffer.Count == MaxCapacity)
{
lastBuffer = new List<string>();
_stringBuffers.Add(lastBuffer);
Expand All @@ -75,33 +78,34 @@ public void Append(string value)
Length++;
}

private int GetBufferIndexContainingRowIndex(ref long rowIndex)
private int GetBufferIndexContainingRowIndex(long rowIndex)
{
if (rowIndex >= Length)
{
throw new ArgumentOutOfRangeException(Strings.ColumnIndexOutOfRange, nameof(rowIndex));
}
return (int)(rowIndex / int.MaxValue);
return (int)(rowIndex / MaxCapacity);
}

protected override object GetValue(long rowIndex)
{
int bufferIndex = GetBufferIndexContainingRowIndex(ref rowIndex);
return _stringBuffers[bufferIndex][(int)rowIndex];
int bufferIndex = GetBufferIndexContainingRowIndex(rowIndex);
return _stringBuffers[bufferIndex][(int)(rowIndex % MaxCapacity)];
}

protected override IReadOnlyList<object> GetValues(long startIndex, int length)
{
var ret = new List<object>();
int bufferIndex = GetBufferIndexContainingRowIndex(ref startIndex);
int bufferIndex = GetBufferIndexContainingRowIndex(startIndex);
int bufferOffset = (int)(startIndex % MaxCapacity);
while (ret.Count < length && bufferIndex < _stringBuffers.Count)
{
for (int i = (int)startIndex; ret.Count < length && i < _stringBuffers[bufferIndex].Count; i++)
for (int i = bufferOffset; ret.Count < length && i < _stringBuffers[bufferIndex].Count; i++)
{
ret.Add(_stringBuffers[bufferIndex][i]);
}
bufferIndex++;
startIndex = 0;
bufferOffset = 0;
}
return ret;
}
Expand All @@ -110,9 +114,10 @@ protected override void SetValue(long rowIndex, object value)
{
if (value == null || value is string)
{
int bufferIndex = GetBufferIndexContainingRowIndex(ref rowIndex);
int bufferIndex = GetBufferIndexContainingRowIndex(rowIndex);
int bufferOffset = (int)(rowIndex % MaxCapacity);
var oldValue = this[rowIndex];
_stringBuffers[bufferIndex][(int)rowIndex] = (string)value;
_stringBuffers[bufferIndex][bufferOffset] = (string)value;
if (oldValue != (string)value)
{
if (value == null)
Expand All @@ -138,15 +143,16 @@ protected override void SetValue(long rowIndex, object value)
get
{
var ret = new List<string>();
int bufferIndex = GetBufferIndexContainingRowIndex(ref startIndex);
int bufferIndex = GetBufferIndexContainingRowIndex(startIndex);
int bufferOffset = (int)(startIndex % MaxCapacity);
while (ret.Count < length && bufferIndex < _stringBuffers.Count)
{
for (int i = (int)startIndex; ret.Count < length && i < _stringBuffers[bufferIndex].Count; i++)
for (int i = bufferOffset; ret.Count < length && i < _stringBuffers[bufferIndex].Count; i++)
{
ret.Add(_stringBuffers[bufferIndex][i]);
}
bufferIndex++;
startIndex = 0;
bufferOffset = 0;
}
return ret;
}
Expand Down Expand Up @@ -194,7 +200,7 @@ private PrimitiveDataFrameColumn<long> GetSortIndices(Comparer<string> comparer,
sortIndices[i] = i;
if (buffer[i] == null)
{
columnNullIndices[nullIndicesSlot] = i + bufferSortIndices.Count * int.MaxValue;
columnNullIndices[nullIndicesSlot] = i + bufferSortIndices.Count * MaxCapacity;
nullIndicesSlot++;
}
}
Expand Down Expand Up @@ -295,11 +301,11 @@ private StringDataFrameColumn CloneImplementation<U>(PrimitiveDataFrameColumn<U>

List<string> setBuffer = ret._stringBuffers[0];
long setBufferMinRange = 0;
long setBufferMaxRange = int.MaxValue;
long setBufferMaxRange = MaxCapacity;
List<string> getBuffer = _stringBuffers[0];
long getBufferMinRange = 0;
long getBufferMaxRange = int.MaxValue;
long maxCapacity = int.MaxValue;
long getBufferMaxRange = MaxCapacity;
long maxCapacity = MaxCapacity;
if (mapIndices.DataType == typeof(long))
{
PrimitiveDataFrameColumn<long> longMapIndices = mapIndices as PrimitiveDataFrameColumn<long>;
Expand Down
46 changes: 24 additions & 22 deletions src/Microsoft.Data.Analysis/VBufferDataFrameColumn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using Apache.Arrow;
using Apache.Arrow.Types;
using System.Runtime.CompilerServices;
using Microsoft.ML;
using Microsoft.ML.Data;

Expand All @@ -21,6 +18,9 @@ namespace Microsoft.Data.Analysis
/// </summary>
public partial class VBufferDataFrameColumn<T> : DataFrameColumn, IEnumerable<VBuffer<T>>
{

public static int MaxCapacity = ArrayUtility.ArrayMaxSize / Unsafe.SizeOf<VBuffer<T>>();

private readonly List<List<VBuffer<T>>> _vBuffers = new List<List<VBuffer<T>>>(); // To store more than intMax number of vbuffers

/// <summary>
Expand All @@ -30,11 +30,11 @@ public partial class VBufferDataFrameColumn<T> : DataFrameColumn, IEnumerable<VB
/// <param name="length">Length of values</param>
public VBufferDataFrameColumn(string name, long length = 0) : base(name, length, typeof(VBuffer<T>))
{
int numberOfBuffersRequired = Math.Max((int)(length / int.MaxValue), 1);
int numberOfBuffersRequired = (int)(length / MaxCapacity + 1);
for (int i = 0; i < numberOfBuffersRequired; i++)
{
long bufferLen = length - _vBuffers.Count * int.MaxValue;
List<VBuffer<T>> buffer = new List<VBuffer<T>>((int)Math.Min(int.MaxValue, bufferLen));
int bufferLen = (int)Math.Min(MaxCapacity, length - _vBuffers.Count * MaxCapacity);
List<VBuffer<T>> buffer = new List<VBuffer<T>>(bufferLen);
_vBuffers.Add(buffer);
for (int j = 0; j < bufferLen; j++)
{
Expand Down Expand Up @@ -74,7 +74,7 @@ protected internal override void Resize(long length)
public void Append(VBuffer<T> value)
{
List<VBuffer<T>> lastBuffer = _vBuffers[_vBuffers.Count - 1];
if (lastBuffer.Count == int.MaxValue)
if (lastBuffer.Count == MaxCapacity)
{
lastBuffer = new List<VBuffer<T>>();
_vBuffers.Add(lastBuffer);
Expand All @@ -83,34 +83,35 @@ public void Append(VBuffer<T> value)
Length++;
}

private int GetBufferIndexContainingRowIndex(ref long rowIndex)
private int GetBufferIndexContainingRowIndex(long rowIndex)
{
if (rowIndex >= Length)
{
throw new ArgumentOutOfRangeException(Strings.ColumnIndexOutOfRange, nameof(rowIndex));
}

return (int)(rowIndex / int.MaxValue);
return (int)(rowIndex / MaxCapacity);
}

protected override object GetValue(long rowIndex)
{
int bufferIndex = GetBufferIndexContainingRowIndex(ref rowIndex);
return _vBuffers[bufferIndex][(int)rowIndex];
int bufferIndex = GetBufferIndexContainingRowIndex(rowIndex);
return _vBuffers[bufferIndex][(int)(rowIndex % MaxCapacity)];
}

protected override IReadOnlyList<object> GetValues(long startIndex, int length)
{
var ret = new List<object>();
int bufferIndex = GetBufferIndexContainingRowIndex(ref startIndex);
int bufferIndex = GetBufferIndexContainingRowIndex(startIndex);
int bufferOffset = (int)(startIndex % MaxCapacity);
while (ret.Count < length && bufferIndex < _vBuffers.Count)
{
for (int i = (int)startIndex; ret.Count < length && i < _vBuffers[bufferIndex].Count; i++)
for (int i = bufferOffset; ret.Count < length && i < _vBuffers[bufferIndex].Count; i++)
{
ret.Add(_vBuffers[bufferIndex][i]);
}
bufferIndex++;
startIndex = 0;
bufferOffset = 0;
}
return ret;
}
Expand All @@ -119,9 +120,10 @@ protected override void SetValue(long rowIndex, object value)
{
if (value == null || value is VBuffer<T>)
{
int bufferIndex = GetBufferIndexContainingRowIndex(ref rowIndex);
var oldValue = this[rowIndex];
_vBuffers[bufferIndex][(int)rowIndex] = (VBuffer<T>)value;
int bufferIndex = GetBufferIndexContainingRowIndex(rowIndex);
int bufferOffset = (int)(rowIndex % MaxCapacity);
var oldValue = _vBuffers[bufferIndex][bufferOffset];
_vBuffers[bufferIndex][bufferOffset] = (VBuffer<T>)value;
if (!oldValue.Equals((VBuffer<T>)value))
{
if (value == null)
Expand Down Expand Up @@ -250,11 +252,11 @@ private VBufferDataFrameColumn<T> CloneImplementation<U>(PrimitiveDataFrameColum

List<VBuffer<T>> setBuffer = ret._vBuffers[0];
long setBufferMinRange = 0;
long setBufferMaxRange = int.MaxValue;
long setBufferMaxRange = MaxCapacity;
List<VBuffer<T>> getBuffer = _vBuffers[0];
long getBufferMinRange = 0;
long getBufferMaxRange = int.MaxValue;
long maxCapacity = int.MaxValue;
long getBufferMaxRange = MaxCapacity;
long maxCapacity = MaxCapacity;
if (mapIndices.DataType == typeof(long))
{
PrimitiveDataFrameColumn<long> longMapIndices = mapIndices as PrimitiveDataFrameColumn<long>;
Expand Down
2 changes: 1 addition & 1 deletion test/Microsoft.Data.Analysis.Tests/BufferTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ public void TestAppend_SizeMoreThanMaxBufferCapacity()
[X64Fact("32-bit doesn't allow to allocate more than 2 Gb")]
public void TestAppendMany_SizeMoreThanMaxBufferCapacity()
{
const int MaxCapacityInBytes = 2147483591;
const int MaxCapacityInBytes = 0X7FEFFFFF;

//Check appending values with extending column size over MaxCapacity of ReadOnlyDataFrameBuffer
PrimitiveDataFrameColumn<byte> intColumn = new PrimitiveDataFrameColumn<byte>("Byte1", MaxCapacityInBytes - 5);
Expand Down
45 changes: 41 additions & 4 deletions test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Apache.Arrow;
using Microsoft.ML;
using Microsoft.ML.Data;
using Microsoft.ML.TestFramework.Attributes;
using Xunit;

namespace Microsoft.Data.Analysis.Tests
Expand Down Expand Up @@ -75,7 +76,7 @@ public static ArrowStringDataFrameColumn CreateArrowStringColumn(int length, boo
return new ArrowStringDataFrameColumn("ArrowString", dataMemory, offsetMemory, nullMemory, length, nullCount);
}

public static VBufferDataFrameColumn<int> CreateVBufferDataFrame(int length)
public static VBufferDataFrameColumn<int> CreateVBufferDataFrameColumn(int length)
{
var buffers = Enumerable.Repeat(new VBuffer<int>(5, new[] { 0, 1, 2, 3, 4 }), length).ToArray();
return new VBufferDataFrameColumn<int>("VBuffer", buffers);
Expand All @@ -85,7 +86,7 @@ public static DataFrame MakeDataFrameWithAllColumnTypes(int length, bool withNul
{
DataFrame df = MakeDataFrameWithAllMutableAndArrowColumnTypes(length, withNulls);

var vBufferColumn = CreateVBufferDataFrame(length);
var vBufferColumn = CreateVBufferDataFrameColumn(length);
df.Columns.Insert(df.Columns.Count, vBufferColumn);

return df;
Expand Down Expand Up @@ -230,15 +231,51 @@ public DataFrame SplitTrainTest(DataFrame input, float testRatio, out DataFrame
}

[Fact]
public void TestVBufferColumn()
public void TestVBufferColumn_Creation()
{
var vBufferColumn = CreateVBufferDataFrame(10);
var vBufferColumn = CreateVBufferDataFrameColumn(10);

Assert.Equal(10, vBufferColumn.Length);
Assert.Equal(5, vBufferColumn[0].GetValues().Length);
Assert.Equal(0, vBufferColumn[0].GetValues()[0]);
}

[Fact]
public void TestVBufferColumn_Indexer()
{
var buffer = new VBuffer<int>(5, new[] { 4, 3, 2, 1, 0 });

var vBufferColumn = new VBufferDataFrameColumn<int>("VBuffer", 1);
vBufferColumn[0] = buffer;

Assert.Equal(1, vBufferColumn.Length);
Assert.Equal(5, vBufferColumn[0].GetValues().Length);
Assert.Equal(0, vBufferColumn[0].GetValues()[4]);
}

[X64Fact("32-bit doesn't allow to allocate more than 2 Gb")]
public void TestVBufferColumn_Indexer_MoreThanMaxInt()
{
var originalValues = new[] { 4, 3, 2, 1, 0 };

var length = VBufferDataFrameColumn<int>.MaxCapacity + 3;

var vBufferColumn = new VBufferDataFrameColumn<int>("VBuffer", length);
long index = length - 2;

vBufferColumn[index] = new VBuffer<int>(5, originalValues);

var values = vBufferColumn[index].GetValues();

Assert.Equal(length, vBufferColumn.Length);
Assert.Equal(5, values.Length);

for (int i = 0; i < values.Length; i++)
{
Assert.Equal(originalValues[i], values[i]);
}
}

[Fact]
public void TestIndexer()
{
Expand Down

0 comments on commit 64d7ebd

Please sign in to comment.