Skip to content
This repository was archived by the owner on Apr 22, 2020. It is now read-only.

Parallelise batching of writes for similarity algorithms #814

Open
wants to merge 11 commits into
base: 3.4
Choose a base branch
from
Prev Previous commit
Next Next commit
remove duplication
  • Loading branch information
mneedham committed Feb 12, 2019
commit 35cbc2b9b2e7c8566c65182e2381f1cc2cb5425f
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,15 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ParallelSimilarityExporter extends StatementApi implements SimilarityExporter {
public class ParallelSimilarityExporter extends SimilarityExporter {

private final Log log;
private final int propertyId;
private final int relationshipTypeId;
private final int nodeCount;

public ParallelSimilarityExporter(GraphDatabaseAPI api,
Log log,
String relationshipType,
String propertyName, int nodeCount) {
super(api);
this.log = log;
propertyId = getOrCreatePropertyId(propertyName);
relationshipTypeId = getOrCreateRelationshipId(relationshipType);
super(api, log, propertyName, relationshipType);
this.nodeCount = nodeCount;
}

Expand All @@ -83,24 +77,18 @@ public int export(Stream<SimilarityResult> similarityPairs, long batchSize) {

DisjointSetStruct dssResult = computePartitions(graph);

Stream<List<DisjointSetStruct.InternalResult>> stream = dssResult.internalResultStream(graph)
.collect(Collectors.groupingBy(item -> item.setId))
.values()
.stream();
Stream<List<DisjointSetStruct.InternalResult>> partitions = groupPartitions(graph, dssResult);

int queueSize = dssResult.getSetCount();

if(queueSize == 0) {
int numberOfPartitions = dssResult.getSetCount();
if(numberOfPartitions == 0) {
return 0;
}

log.info("ParallelSimilarityExporter: Relationships to be created: %d, Partitions found: %d", numberOfRelationships[0], queueSize);

ArrayBlockingQueue<List<SimilarityResult>> outQueue = new ArrayBlockingQueue<>(queueSize);

log.info("ParallelSimilarityExporter: Relationships to be created: %d, Partitions found: %d", numberOfRelationships[0], numberOfPartitions);
ArrayBlockingQueue<List<SimilarityResult>> outQueue = new ArrayBlockingQueue<>(numberOfPartitions);

AtomicInteger inQueueBatchCount = new AtomicInteger(0);
stream.parallel().forEach(partition -> {
partitions.parallel().forEach(partition -> {
IntSet nodesInPartition = new IntHashSet();
for (DisjointSetStruct.InternalResult internalResult : partition) {
nodesInPartition.add(internalResult.internalNodeId);
Expand Down Expand Up @@ -137,13 +125,20 @@ public int export(Stream<SimilarityResult> similarityPairs, long batchSize) {


int inQueueBatches = inQueueBatchCount.get();


int outQueueBatches = writeSequential(outQueue.stream().flatMap(Collection::stream), batchSize);

log.info("ParallelSimilarityExporter: Batch Size: %d, Batches written - in parallel: %d, sequentially: %d", batchSize, inQueueBatches, outQueueBatches);

return inQueueBatches + outQueueBatches;
}

private Stream<List<DisjointSetStruct.InternalResult>> groupPartitions(HeavyGraph graph, DisjointSetStruct dssResult) {
return dssResult.internalResultStream(graph)
.collect(Collectors.groupingBy(item -> item.setId))
.values()
.stream();
}

private static <T> void put(BlockingQueue<T> queue, T items) {
try {
queue.put(items);
Expand Down Expand Up @@ -185,27 +180,6 @@ private void export(List<SimilarityResult> similarityResults) {

}

private void createRelationship(SimilarityResult similarityResult, KernelTransaction statement) throws EntityNotFoundException, InvalidTransactionTypeKernelException, AutoIndexingKernelException {
long node1 = similarityResult.item1;
long node2 = similarityResult.item2;
long relationshipId = statement.dataWrite().relationshipCreate(node1, relationshipTypeId, node2);

statement.dataWrite().relationshipSetProperty(
relationshipId, propertyId, Values.doubleValue(similarityResult.similarity));
}

private int getOrCreateRelationshipId(String relationshipType) {
return applyInTransaction(stmt -> stmt
.tokenWrite()
.relationshipTypeGetOrCreateForName(relationshipType));
}

private int getOrCreatePropertyId(String propertyName) {
return applyInTransaction(stmt -> stmt
.tokenWrite()
.propertyKeyGetOrCreateForName(propertyName));
}

private int writeSequential(Stream<SimilarityResult> similarityPairs, long batchSize) {
int[] counter = {0};
if (batchSize == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,13 @@
import java.util.List;
import java.util.stream.Stream;

public class SequentialSimilarityExporter extends StatementApi implements SimilarityExporter {

private final Log log;
private final int propertyId;
private final int relationshipTypeId;
public class SequentialSimilarityExporter extends SimilarityExporter {

public SequentialSimilarityExporter(GraphDatabaseAPI api,
Log log, String relationshipType,
String propertyName, int nodeCount) {
super(api);
this.log = log;
propertyId = getOrCreatePropertyId(propertyName);
relationshipTypeId = getOrCreateRelationshipId(relationshipType);
super(api, log, propertyName, relationshipType);

}

public int export(Stream<SimilarityResult> similarityPairs, long batchSize) {
Expand Down Expand Up @@ -81,27 +75,6 @@ private void export(List<SimilarityResult> similarityResults) {

}

private void createRelationship(SimilarityResult similarityResult, KernelTransaction statement) throws EntityNotFoundException, InvalidTransactionTypeKernelException, AutoIndexingKernelException {
long node1 = similarityResult.item1;
long node2 = similarityResult.item2;
long relationshipId = statement.dataWrite().relationshipCreate(node1, relationshipTypeId, node2);

statement.dataWrite().relationshipSetProperty(
relationshipId, propertyId, Values.doubleValue(similarityResult.similarity));
}

private int getOrCreateRelationshipId(String relationshipType) {
return applyInTransaction(stmt -> stmt
.tokenWrite()
.relationshipTypeGetOrCreateForName(relationshipType));
}

private int getOrCreatePropertyId(String propertyName) {
return applyInTransaction(stmt -> stmt
.tokenWrite()
.propertyKeyGetOrCreateForName(propertyName));
}

private int writeSequential(Stream<SimilarityResult> similarityPairs, long batchSize) {
log.info("SequentialSimilarityExporter: Writing relationships...");
int[] counter = {0};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,49 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

import org.neo4j.graphalgo.core.utils.StatementApi;
import org.neo4j.internal.kernel.api.exceptions.EntityNotFoundException;
import org.neo4j.internal.kernel.api.exceptions.InvalidTransactionTypeKernelException;
import org.neo4j.internal.kernel.api.exceptions.explicitindex.AutoIndexingKernelException;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.Log;
import org.neo4j.values.storable.Values;

import java.util.stream.Stream;

public interface SimilarityExporter {
int export(Stream<SimilarityResult> similarityPairs, long batchSize);
public abstract class SimilarityExporter extends StatementApi {
final Log log;
final int propertyId;
final int relationshipTypeId;

protected SimilarityExporter(GraphDatabaseAPI api, Log log, String propertyName, String relationshipType) {
super(api);
this.log = log;
propertyId = getOrCreatePropertyId(propertyName);
relationshipTypeId = getOrCreateRelationshipTypeId(relationshipType);
}

private int getOrCreateRelationshipTypeId(String relationshipType) {
return applyInTransaction(stmt -> stmt
.tokenWrite()
.relationshipTypeGetOrCreateForName(relationshipType));
}

private int getOrCreatePropertyId(String propertyName) {
return applyInTransaction(stmt -> stmt
.tokenWrite()
.propertyKeyGetOrCreateForName(propertyName));
}

protected void createRelationship(SimilarityResult similarityResult, KernelTransaction statement) throws EntityNotFoundException, InvalidTransactionTypeKernelException, AutoIndexingKernelException {
long node1 = similarityResult.item1;
long node2 = similarityResult.item2;
long relationshipId = statement.dataWrite().relationshipCreate(node1, relationshipTypeId, node2);

statement.dataWrite().relationshipSetProperty(
relationshipId, propertyId, Values.doubleValue(similarityResult.similarity));
}

abstract int export(Stream<SimilarityResult> similarityPairs, long batchSize);
}