Skip to content
This repository was archived by the owner on Apr 22, 2020. It is now read-only.

Parallelise batching of writes for similarity algorithms #814

Open
wants to merge 11 commits into
base: 3.4
Choose a base branch
from
Prev Previous commit
Next Next commit
push more code into the base SimilarityExporter class
  • Loading branch information
mneedham committed Feb 12, 2019
commit c75886cf4fb77680404be4b0604076a5b61ba700
Original file line number Diff line number Diff line change
Expand Up @@ -154,60 +154,7 @@ private DisjointSetStruct computePartitions(HeavyGraph graph) {
return struct;
}

private void export(SimilarityResult similarityResult) {
applyInTransaction(statement -> {
try {
createRelationship(similarityResult, statement);
} catch (KernelException e) {
ExceptionUtil.throwKernelException(e);
}
return null;
});

}

private void export(List<SimilarityResult> similarityResults) {
applyInTransaction(statement -> {
for (SimilarityResult similarityResult : similarityResults) {
try {
createRelationship(similarityResult, statement);
} catch (KernelException e) {
ExceptionUtil.throwKernelException(e);
}
}
return null;
});

}

private int writeSequential(Stream<SimilarityResult> similarityPairs, long batchSize) {
int[] counter = {0};
if (batchSize == 1) {
similarityPairs.forEach(similarityResult -> {
export(similarityResult);
counter[0]++;
});
} else {
Iterator<SimilarityResult> iterator = similarityPairs.iterator();
do {
List<SimilarityResult> batch = take(iterator, Math.toIntExact(batchSize));
export(batch);
if (batch.size() > 0) {
counter[0]++;
}
} while (iterator.hasNext());
}

return counter[0];
}

private static List<SimilarityResult> take(Iterator<SimilarityResult> iterator, int batchSize) {
List<SimilarityResult> result = new ArrayList<>(batchSize);
while (iterator.hasNext() && batchSize-- > 0) {
result.add(iterator.next());
}
return result;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -49,62 +49,5 @@ public int export(Stream<SimilarityResult> similarityPairs, long batchSize) {
return batches;
}

private void export(SimilarityResult similarityResult) {
applyInTransaction(statement -> {
try {
createRelationship(similarityResult, statement);
} catch (KernelException e) {
ExceptionUtil.throwKernelException(e);
}
return null;
});

}

private void export(List<SimilarityResult> similarityResults) {
applyInTransaction(statement -> {
for (SimilarityResult similarityResult : similarityResults) {
try {
createRelationship(similarityResult, statement);
} catch (KernelException e) {
ExceptionUtil.throwKernelException(e);
}
}
return null;
});

}

private int writeSequential(Stream<SimilarityResult> similarityPairs, long batchSize) {
log.info("SequentialSimilarityExporter: Writing relationships...");
int[] counter = {0};
if (batchSize == 1) {
similarityPairs.forEach(similarityResult -> {
export(similarityResult);
counter[0]++;
});
} else {
Iterator<SimilarityResult> iterator = similarityPairs.iterator();
do {
List<SimilarityResult> batch = take(iterator, Math.toIntExact(batchSize));
export(batch);
if(batch.size() > 0) {
counter[0]++;
}
} while (iterator.hasNext());
}

return counter[0];
}


private static List<SimilarityResult> take(Iterator<SimilarityResult> iterator, int batchSize) {
List<SimilarityResult> result = new ArrayList<>(batchSize);
while (iterator.hasNext() && batchSize-- > 0) {
result.add(iterator.next());
}
return result;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

import org.neo4j.graphalgo.core.utils.ExceptionUtil;
import org.neo4j.graphalgo.core.utils.StatementApi;
import org.neo4j.internal.kernel.api.exceptions.EntityNotFoundException;
import org.neo4j.internal.kernel.api.exceptions.InvalidTransactionTypeKernelException;
import org.neo4j.internal.kernel.api.exceptions.KernelException;
import org.neo4j.internal.kernel.api.exceptions.explicitindex.AutoIndexingKernelException;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.Log;
import org.neo4j.values.storable.Values;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;

public abstract class SimilarityExporter extends StatementApi {
Expand Down Expand Up @@ -62,5 +67,60 @@ protected void createRelationship(SimilarityResult similarityResult, KernelTrans
relationshipId, propertyId, Values.doubleValue(similarityResult.similarity));
}

private void export(SimilarityResult similarityResult) {
applyInTransaction(statement -> {
try {
createRelationship(similarityResult, statement);
} catch (KernelException e) {
ExceptionUtil.throwKernelException(e);
}
return null;
});

}

private void export(List<SimilarityResult> similarityResults) {
applyInTransaction(statement -> {
for (SimilarityResult similarityResult : similarityResults) {
try {
createRelationship(similarityResult, statement);
} catch (KernelException e) {
ExceptionUtil.throwKernelException(e);
}
}
return null;
});

}

int writeSequential(Stream<SimilarityResult> similarityPairs, long batchSize) {
int[] counter = {0};
if (batchSize == 1) {
similarityPairs.forEach(similarityResult -> {
export(similarityResult);
counter[0]++;
});
} else {
Iterator<SimilarityResult> iterator = similarityPairs.iterator();
do {
List<SimilarityResult> batch = take(iterator, Math.toIntExact(batchSize));
export(batch);
if (batch.size() > 0) {
counter[0]++;
}
} while (iterator.hasNext());
}

return counter[0];
}

private static List<SimilarityResult> take(Iterator<SimilarityResult> iterator, int batchSize) {
List<SimilarityResult> result = new ArrayList<>(batchSize);
while (iterator.hasNext() && batchSize-- > 0) {
result.add(iterator.next());
}
return result;
}

abstract int export(Stream<SimilarityResult> similarityPairs, long batchSize);
}