Skip to content
This repository was archived by the owner on Apr 22, 2020. It is now read-only.

Parallelise batching of writes for similarity algorithms #814

Open
wants to merge 11 commits into
base: 3.4
Choose a base branch
from
Prev Previous commit
Next Next commit
return write duration
  • Loading branch information
mneedham committed Feb 12, 2019
commit 714545d338774bc9c35d04eaf7608b6e08b3cfc6
Original file line number Diff line number Diff line change
Expand Up @@ -94,51 +94,45 @@ public void export(Stream<SimilarityResult> similarityPairs, long batchSize) {

ArrayBlockingQueue<List<SimilarityResult>> outQueue = new ArrayBlockingQueue<>(queueSize);

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> inQueueBatchCountFuture = executor.submit(() -> {
AtomicInteger inQueueBatchCount = new AtomicInteger(0);
stream.parallel().forEach(partition -> {
IntSet nodesInPartition = new IntHashSet();
for (DisjointSetStruct.InternalResult internalResult : partition) {
nodesInPartition.add(internalResult.internalNodeId);
}

List<SimilarityResult> inPartition = new ArrayList<>();
List<SimilarityResult> outPartition = new ArrayList<>();
AtomicInteger inQueueBatchCount = new AtomicInteger(0);
stream.parallel().forEach(partition -> {
IntSet nodesInPartition = new IntHashSet();
for (DisjointSetStruct.InternalResult internalResult : partition) {
nodesInPartition.add(internalResult.internalNodeId);
}

for (DisjointSetStruct.InternalResult result : partition) {
int nodeId = result.internalNodeId;
graph.forEachRelationship(nodeId, Direction.OUTGOING, (sourceNodeId, targetNodeId, relationId, weight) -> {
SimilarityResult similarityRelationship = new SimilarityResult(idMap.toOriginalNodeId(sourceNodeId), idMap.toOriginalNodeId(targetNodeId), -1, -1, -1, weight);
List<SimilarityResult> inPartition = new ArrayList<>();
List<SimilarityResult> outPartition = new ArrayList<>();

if (nodesInPartition.contains(targetNodeId)) {
inPartition.add(similarityRelationship);
} else {
outPartition.add(similarityRelationship);
}
for (DisjointSetStruct.InternalResult result : partition) {
int nodeId = result.internalNodeId;
graph.forEachRelationship(nodeId, Direction.OUTGOING, (sourceNodeId, targetNodeId, relationId, weight) -> {
SimilarityResult similarityRelationship = new SimilarityResult(idMap.toOriginalNodeId(sourceNodeId),
idMap.toOriginalNodeId(targetNodeId), -1, -1, -1, weight);

return false;
});
}
if (nodesInPartition.contains(targetNodeId)) {
inPartition.add(similarityRelationship);
} else {
outPartition.add(similarityRelationship);
}

if (!inPartition.isEmpty()) {
int inQueueBatches = writeSequential(inPartition.stream(), batchSize);
inQueueBatchCount.addAndGet(inQueueBatches);
}
return false;
});
}

if (!outPartition.isEmpty()) {
put(outQueue, outPartition);
}
});
return inQueueBatchCount.get();
if (!inPartition.isEmpty()) {
int inQueueBatches = writeSequential(inPartition.stream(), batchSize);
inQueueBatchCount.addAndGet(inQueueBatches);
}

if (!outPartition.isEmpty()) {
put(outQueue, outPartition);
}
});

Integer inQueueBatches = null;
try {
inQueueBatches = inQueueBatchCountFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

int inQueueBatches = inQueueBatchCount.get();


int outQueueBatches = writeSequential(outQueue.stream().flatMap(Collection::stream), batchSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@
import org.HdrHistogram.DoubleHistogram;
import org.neo4j.graphalgo.core.ProcedureConfiguration;
import org.neo4j.graphalgo.core.ProcedureConstants;
import org.neo4j.graphalgo.core.utils.ParallelUtil;
import org.neo4j.graphalgo.core.utils.Pools;
import org.neo4j.graphalgo.core.utils.QueueBasedSpliterator;
import org.neo4j.graphalgo.core.utils.TerminationFlag;
import org.neo4j.graphalgo.core.utils.*;
import org.neo4j.graphalgo.impl.util.TopKConsumer;
import org.neo4j.graphdb.Result;
import org.neo4j.kernel.api.KernelTransaction;
Expand Down Expand Up @@ -95,7 +92,76 @@ Long getWriteBatchSize(ProcedureConfiguration configuration) {
return configuration.get("writeBatchSize", 10000L);
}


public class SimilarityResultBuilder {
protected long writeDuration = -1;
protected boolean write = false;
private int nodes;
private String writeRelationshipType;
private String writeProperty;
private AtomicLong similarityPairs;
private DoubleHistogram histogram;


public SimilarityResultBuilder withWriteDuration(long writeDuration) {
this.writeDuration = writeDuration;
return this;
}

public SimilarityResultBuilder withWrite(boolean write) {
this.write = write;
return this;
}

/**
* returns an AutoClosable which measures the time
* until it gets closed. Saves the duration as writeMillis
*
* @return
*/
public ProgressTimer timeWrite() {
return ProgressTimer.start(this::withWriteDuration);
}

public SimilaritySummaryResult build() {
return SimilaritySummaryResult.from(nodes, similarityPairs, writeRelationshipType, writeProperty, write, histogram, writeDuration);
}

public SimilarityResultBuilder nodes(int nodes) {
this.nodes = nodes;
return this;
}

public SimilarityResultBuilder write(boolean write) {
this.write = write;
return this;
}

public SimilarityResultBuilder writeRelationshipType(String writeRelationshipType) {
this.writeRelationshipType = writeRelationshipType;
return this;
}

public SimilarityResultBuilder writeProperty(String writeProperty) {
this.writeProperty = writeProperty;
return this;
}

public SimilarityResultBuilder similarityPairs(AtomicLong similarityPairs) {
this.similarityPairs = similarityPairs;
return this;
}

public SimilarityResultBuilder histogram(DoubleHistogram histogram) {
this.histogram = histogram;
return this;
}
}

Stream<SimilaritySummaryResult> writeAndAggregateResults(Stream<SimilarityResult> stream, int length, ProcedureConfiguration configuration, boolean write, String writeRelationshipType, String writeProperty, boolean writeParallel) {
SimilarityResultBuilder builder = new SimilarityResultBuilder();
builder.nodes(length).write(write).writeRelationshipType(writeRelationshipType).writeProperty(writeProperty);

long writeBatchSize = getWriteBatchSize(configuration);
AtomicLong similarityPairs = new AtomicLong();
DoubleHistogram histogram = new DoubleHistogram(5);
Expand All @@ -106,24 +172,31 @@ Stream<SimilaritySummaryResult> writeAndAggregateResults(Stream<SimilarityResult

if (write) {
if (writeParallel) {
ParallelSimilarityExporter parallelSimilarityExporter = new ParallelSimilarityExporter(api, log, writeRelationshipType, writeProperty, length);
parallelSimilarityExporter.export(stream.peek(recorder), writeBatchSize);
try (ProgressTimer timer = builder.timeWrite()) {
ParallelSimilarityExporter parallelSimilarityExporter = new ParallelSimilarityExporter(api, log, writeRelationshipType, writeProperty, length);
parallelSimilarityExporter.export(stream.peek(recorder), writeBatchSize);
}

} else {
SimilarityExporter similarityExporter = new SimilarityExporter(api, log, writeRelationshipType, writeProperty);
similarityExporter.export(stream.peek(recorder), writeBatchSize);
try (ProgressTimer timer = builder.timeWrite()) {
SimilarityExporter similarityExporter = new SimilarityExporter(api, log, writeRelationshipType, writeProperty);
similarityExporter.export(stream.peek(recorder), writeBatchSize);
}
}

} else {
stream.forEach(recorder);
}

return Stream.of(SimilaritySummaryResult.from(length, similarityPairs, writeRelationshipType, writeProperty, write, histogram));
builder.similarityPairs(similarityPairs).histogram(histogram);
return Stream.of(builder.build());

// return Stream.of(SimilaritySummaryResult.from(length, similarityPairs, writeRelationshipType, writeProperty, write, histogram));
}

Stream<SimilaritySummaryResult> emptyStream(String writeRelationshipType, String writeProperty) {
return Stream.of(SimilaritySummaryResult.from(0, new AtomicLong(0), writeRelationshipType,
writeProperty, false, new DoubleHistogram(5)));
writeProperty, false, new DoubleHistogram(5), -1));
}

Double getSimilarityCutoff(ProcedureConfiguration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

public class SimilaritySummaryResult {

public final long writeDuration;
public final long nodes;
public final long similarityPairs;
public final boolean write;
Expand All @@ -46,7 +47,7 @@ public SimilaritySummaryResult(long nodes, long similarityPairs,
boolean write, String writeRelationshipType, String writeProperty,
double min, double max, double mean, double stdDev,
double p25, double p50, double p75, double p90, double p95,
double p99, double p999, double p100) {
double p99, double p999, double p100, long writeDuration) {
this.nodes = nodes;
this.similarityPairs = similarityPairs;
this.write = write;
Expand All @@ -64,9 +65,10 @@ public SimilaritySummaryResult(long nodes, long similarityPairs,
this.p99 = p99;
this.p999 = p999;
this.p100 = p100;
this.writeDuration = writeDuration;
}

static SimilaritySummaryResult from(long length, AtomicLong similarityPairs, String writeRelationshipType, String writeProperty, boolean write, DoubleHistogram histogram) {
static SimilaritySummaryResult from(long length, AtomicLong similarityPairs, String writeRelationshipType, String writeProperty, boolean write, DoubleHistogram histogram, long writeDuration) {
return new SimilaritySummaryResult(
length,
similarityPairs.get(),
Expand All @@ -84,7 +86,8 @@ static SimilaritySummaryResult from(long length, AtomicLong similarityPairs, Str
histogram.getValueAtPercentile(95D),
histogram.getValueAtPercentile(99D),
histogram.getValueAtPercentile(99.9D),
histogram.getValueAtPercentile(100D)
histogram.getValueAtPercentile(100D),
writeDuration
);
}
}