Skip to content

Commit

Permalink
Merge pull request #222 from boalang/RobertHSchmidt
Browse files Browse the repository at this point in the history
fix data race
optimize code
  • Loading branch information
nguyenhoan authored Aug 2, 2018
2 parents 0982765 + 9e5150a commit 1095167
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 37 deletions.
2 changes: 1 addition & 1 deletion src/java/boa/datagen/BoaGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static void main(final String[] args) throws IOException {

if (jsonAvailable) {
try {
SeqRepoImporterJson.main(new String[0]);
SeqRepoImporter.main(new String[0]);
} catch (InterruptedException e) {
e.printStackTrace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import boa.types.Code.Revision;
import boa.types.Toplevel.Project;

public class SeqRepoImporterJson {
public class SeqRepoImporter {
private final static boolean debug = Properties.getBoolean("debug", DefaultProperties.DEBUG);
private final static boolean cache = Properties.getBoolean("cache", DefaultProperties.CACHE);

Expand All @@ -73,9 +73,11 @@ public static void main(String[] args) throws IOException, InterruptedException
getProcessedProjects();

ImportTask[] workers = new ImportTask[poolSize];
Thread[] threads = new Thread[poolSize];
for (int i = 0; i < poolSize; i++) {
workers[i] = new ImportTask(i);
new Thread(workers[i]).start();
threads[i] = new Thread(workers[i]);
threads[i].start();
Thread.sleep(10);
}

Expand All @@ -96,15 +98,16 @@ public static void main(String[] args) throws IOException, InterruptedException
for (int i = 0; i < repoArray.size(); i++) {
JsonObject rp = repoArray.get(i).getAsJsonObject();
RepoMetadata repo = new RepoMetadata(rp);
if (repo.id != null && repo.name != null) {
if (repo.id != null && repo.name != null && !processedProjectIds.contains(repo.id)) {
try {
Project protobufRepo = repo.toBoaMetaDataProtobuf();

// System.out.println(jRepo.toString());
boolean assigned = false;
while (!assigned) {
for (int j = 0; j < poolSize; j++) {
if (workers[j].isReady()) {
workers[j].setProject(protobufRepo.toByteArray());
workers[j].setProject(protobufRepo);
workers[j].ready = false;
assigned = true;
break;
Expand All @@ -121,14 +124,16 @@ public static void main(String[] args) throws IOException, InterruptedException
}
}
}
done = true;
Thread.sleep(100);
// wait for workers to close writers
for (ImportTask worker : workers) {
while (!worker.isReady()){
for (int j = 0; j < poolSize; j++) {
while (!workers[j].isReady())
Thread.sleep(100);
}
}
done = true;

// wait for workers to close writers and finish
for (Thread thread : threads)
while (thread.isAlive())
Thread.sleep(1000);
}

private static void getProcessedProjects() throws IOException {
Expand Down Expand Up @@ -164,14 +169,14 @@ public static class ImportTask implements Runnable {
private SequenceFile.Writer projectWriter, astWriter, commitWriter, contentWriter;
private long astWriterLen = 1, commitWriterLen = 1, contentWriterLen = 1;
private boolean ready = true;
byte[] bs;
Project project;

public ImportTask(int id) {
this.id = id;
}

public void setProject(byte[] project) {
this.bs = project;
public void setProject(Project protobufRepo) {
this.project = protobufRepo;
};

public boolean isReady() {
Expand Down Expand Up @@ -240,28 +245,12 @@ public void run() {
if (done)
break;
try {
Project cachedProject = null;
try {
cachedProject = Project.parseFrom(bs);
if (processedProjectIds.contains(cachedProject.getId())) {
this.ready = true;
return;
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
this.ready = true;
return;
}
bs = null;

final String name = cachedProject.getName();
final String name = project.getName();

if (debug)
System.out.println(
Thread.currentThread().getId() + " Processing " + cachedProject.getId() + " " + name);

Project project = storeRepository(cachedProject, 0);

Thread.currentThread().getId() + " Processing " + project.getId() + " " + name);
project = storeRepository(project, 0);
if (debug)
System.out.println(
Thread.currentThread().getId() + " Putting in sequence file: " + project.getId());
Expand Down Expand Up @@ -301,12 +290,8 @@ public void run() {
e.printStackTrace();
}
this.ready = true;
if (done)
break;
}
this.ready = false;
closeWriters();
this.ready = true;
}

private Project storeRepository(final Project project, final int i) {
Expand Down

0 comments on commit 1095167

Please sign in to comment.