Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ public FieldVector getVector(int index) {
/**
* Add vector to the record batch, producing a new VectorSchemaRoot.
*
* <p>Buffer ownership is transferred to the returned root via {@link TransferPair}. After this
* operation, the vectors in this root and the added vector are left in a transferred (empty)
* state. This root can be reused by calling {@link #allocateNew()}.
*
* @param index field index
* @param vector vector to be added.
* @return out VectorSchemaRoot with vector added
Expand All @@ -201,23 +205,26 @@ public VectorSchemaRoot addVector(int index, FieldVector vector) {
Preconditions.checkNotNull(vector);
Preconditions.checkArgument(index >= 0 && index <= fieldVectors.size());
List<FieldVector> newVectors = new ArrayList<>();
if (index == fieldVectors.size()) {
newVectors.addAll(fieldVectors);
newVectors.add(vector);
} else {
for (int i = 0; i < fieldVectors.size(); i++) {
if (i == index) {
newVectors.add(vector);
}
newVectors.add(fieldVectors.get(i));
for (int i = 0; i < fieldVectors.size(); i++) {
if (i == index) {
newVectors.add(transferVector(vector));
}
newVectors.add(transferVector(fieldVectors.get(i)));
}
if (index == fieldVectors.size()) {
newVectors.add(transferVector(vector));
}
return new VectorSchemaRoot(newVectors);
}

/**
* Remove vector from the record batch, producing a new VectorSchemaRoot.
*
* <p>Buffer ownership is transferred to the returned root via {@link TransferPair}. After this
* operation, the vectors in this root are left in a transferred (empty) state. The removed
* vector's data is not transferred and is released. This root can be reused by calling {@link
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The removed vector's data is not transferred and is released.

is released is not 100% accurate and can be confusing. The vector remains in the original schema root until it's closed/cleared.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for spotting this. You're right that "is released" was inaccurate.

Looking more closely, it was also a code issue. For the source vectors which are not removed, transfer() creates new ArrowBufs in the corresponding target vector, each referencing the same underlying allocation as its source counterpart, while the source vector's ArrowBufs release their references. So, after the operation, those source vectors' ArrowBufs no longer own any allocated memory. But for the removed vector, we weren't doing anything -- its ArrowBufs still held references to their underlying allocations, which would only be freed when the original root was eventually closed.

I've added a clear() on the removed vector so that its buffers also release their underlying allocations, making it consistent with the other vectors in the source root after the operation.

* #allocateNew()}.
*
* @param index field index
* @return out VectorSchemaRoot with vector removed
*/
Expand All @@ -226,12 +233,19 @@ public VectorSchemaRoot removeVector(int index) {
List<FieldVector> newVectors = new ArrayList<>();
for (int i = 0; i < fieldVectors.size(); i++) {
if (i != index) {
newVectors.add(fieldVectors.get(i));
newVectors.add(transferVector(fieldVectors.get(i)));
}
}
fieldVectors.get(index).clear();
return new VectorSchemaRoot(newVectors);
}

private static FieldVector transferVector(FieldVector vector) {
TransferPair transferPair = vector.getTransferPair(vector.getAllocator());
transferPair.transfer();
return (FieldVector) transferPair.getTo();
}

public Schema getSchema() {
return schema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,14 @@ private VectorSchemaRoot createBatch() {
public void testAddVector() {
try (final IntVector intVector1 = new IntVector("intVector1", allocator);
final IntVector intVector2 = new IntVector("intVector2", allocator);
final IntVector intVector3 = new IntVector("intVector3", allocator); ) {
final IntVector intVector3 = new IntVector("intVector3", allocator)) {

VectorSchemaRoot original = new VectorSchemaRoot(Arrays.asList(intVector1, intVector2));
assertEquals(2, original.getFieldVectors().size());

VectorSchemaRoot newRecordBatch = original.addVector(1, intVector3);
assertEquals(3, newRecordBatch.getFieldVectors().size());
assertEquals(intVector3, newRecordBatch.getFieldVectors().get(1));
assertEquals(intVector3.getField(), newRecordBatch.getFieldVectors().get(1).getField());

original.close();
newRecordBatch.close();
Expand All @@ -175,16 +175,16 @@ public void testAddVector() {
public void testAddVectorAtEnd() {
try (final IntVector intVector1 = new IntVector("intVector1", allocator);
final IntVector intVector2 = new IntVector("intVector2", allocator);
final IntVector intVector3 = new IntVector("intVector3", allocator); ) {
final IntVector intVector3 = new IntVector("intVector3", allocator)) {

VectorSchemaRoot original = new VectorSchemaRoot(Arrays.asList(intVector1, intVector2));
assertEquals(2, original.getFieldVectors().size());

VectorSchemaRoot newRecordBatch = original.addVector(2, intVector3);
assertEquals(3, newRecordBatch.getFieldVectors().size());
assertEquals(intVector1, newRecordBatch.getFieldVectors().get(0));
assertEquals(intVector2, newRecordBatch.getFieldVectors().get(1));
assertEquals(intVector3, newRecordBatch.getFieldVectors().get(2));
assertEquals(intVector1.getField(), newRecordBatch.getFieldVectors().get(0).getField());
assertEquals(intVector2.getField(), newRecordBatch.getFieldVectors().get(1).getField());
assertEquals(intVector3.getField(), newRecordBatch.getFieldVectors().get(2).getField());

original.close();
newRecordBatch.close();
Expand All @@ -195,16 +195,16 @@ public void testAddVectorAtEnd() {
public void testRemoveVector() {
try (final IntVector intVector1 = new IntVector("intVector1", allocator);
final IntVector intVector2 = new IntVector("intVector2", allocator);
final IntVector intVector3 = new IntVector("intVector3", allocator); ) {
final IntVector intVector3 = new IntVector("intVector3", allocator)) {

VectorSchemaRoot original =
new VectorSchemaRoot(Arrays.asList(intVector1, intVector2, intVector3));
assertEquals(3, original.getFieldVectors().size());

VectorSchemaRoot newRecordBatch = original.removeVector(0);
assertEquals(2, newRecordBatch.getFieldVectors().size());
assertEquals(intVector2, newRecordBatch.getFieldVectors().get(0));
assertEquals(intVector3, newRecordBatch.getFieldVectors().get(1));
assertEquals(intVector2.getField(), newRecordBatch.getFieldVectors().get(0).getField());
assertEquals(intVector3.getField(), newRecordBatch.getFieldVectors().get(1).getField());

original.close();
newRecordBatch.close();
Expand Down Expand Up @@ -344,4 +344,86 @@ public void testSchemaSync() {
assertFalse(schemaRoot.syncSchema());
}
}

@Test
public void testAddVectorOwnership() {
try (final IntVector intVector1 = new IntVector("intVector1", allocator);
final IntVector intVector2 = new IntVector("intVector2", allocator);
final IntVector intVector3 = new IntVector("intVector3", allocator)) {

intVector1.allocateNew();
intVector2.allocateNew();
intVector3.allocateNew();
for (int i = 0; i < 5; i++) {
intVector1.setSafe(i, i * 10);
intVector2.setSafe(i, i * 20);
intVector3.setSafe(i, i * 30);
}
intVector1.setValueCount(5);
intVector2.setValueCount(5);
intVector3.setValueCount(5);

VectorSchemaRoot original = new VectorSchemaRoot(Arrays.asList(intVector1, intVector2));
original.setRowCount(5);

VectorSchemaRoot result = original.addVector(1, intVector3);

// Close the original root and the added vector -- the result should still have valid data
original.close();
intVector3.close();

assertEquals(3, result.getFieldVectors().size());
assertEquals(5, result.getRowCount());
IntVector resultVec0 = (IntVector) result.getVector(0);
IntVector resultVec1 = (IntVector) result.getVector(1);
IntVector resultVec2 = (IntVector) result.getVector(2);
for (int i = 0; i < 5; i++) {
assertEquals(i * 10, resultVec0.get(i));
assertEquals(i * 30, resultVec1.get(i));
assertEquals(i * 20, resultVec2.get(i));
}

result.close();
}
}

@Test
public void testRemoveVectorOwnership() {
try (final IntVector intVector1 = new IntVector("intVector1", allocator);
final IntVector intVector2 = new IntVector("intVector2", allocator);
final IntVector intVector3 = new IntVector("intVector3", allocator)) {

intVector1.allocateNew();
intVector2.allocateNew();
intVector3.allocateNew();
for (int i = 0; i < 5; i++) {
intVector1.setSafe(i, i * 10);
intVector2.setSafe(i, i * 20);
intVector3.setSafe(i, i * 30);
}
intVector1.setValueCount(5);
intVector2.setValueCount(5);
intVector3.setValueCount(5);

VectorSchemaRoot original =
new VectorSchemaRoot(Arrays.asList(intVector1, intVector2, intVector3));
original.setRowCount(5);

VectorSchemaRoot result = original.removeVector(1);

// Close the original root -- the result should still have valid data
original.close();

assertEquals(2, result.getFieldVectors().size());
assertEquals(5, result.getRowCount());
IntVector resultVec0 = (IntVector) result.getVector(0);
IntVector resultVec1 = (IntVector) result.getVector(1);
for (int i = 0; i < 5; i++) {
assertEquals(i * 10, resultVec0.get(i));
assertEquals(i * 30, resultVec1.get(i));
}

result.close();
}
}
}
Loading