diff --git a/vipra-cmd/runcfg/CMD.launch b/vipra-cmd/runcfg/CMD.launch index b36db2e1f99f6ae77ceb1a8501b802a2a37c5e24..f8fc9b3106ebddee2057ef9c153f50811ae59d6d 100644 --- a/vipra-cmd/runcfg/CMD.launch +++ b/vipra-cmd/runcfg/CMD.launch @@ -11,7 +11,7 @@ </listAttribute> <stringAttribute key="org.eclipse.jdt.launching.CLASSPATH_PROVIDER" value="org.eclipse.m2e.launchconfig.classpathProvider"/> <stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="de.vipra.cmd.Main"/> -<stringAttribute key="org.eclipse.jdt.launching.PROGRAM_ARGUMENTS" value="-t"/> +<stringAttribute key="org.eclipse.jdt.launching.PROGRAM_ARGUMENTS" value="-AMd"/> <stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="vipra-cmd"/> <stringAttribute key="org.eclipse.jdt.launching.SOURCE_PATH_PROVIDER" value="org.eclipse.m2e.launchconfig.sourcepathProvider"/> <stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-ea"/> diff --git a/vipra-cmd/src/main/java/de/vipra/cmd/lda/Analyzer.java b/vipra-cmd/src/main/java/de/vipra/cmd/lda/Analyzer.java index 50e92ca4180e1de2f8d39a21e58bc7d0a0539263..d97c6728fd72847a9ad765744ca67b8546b06cc5 100644 --- a/vipra-cmd/src/main/java/de/vipra/cmd/lda/Analyzer.java +++ b/vipra-cmd/src/main/java/de/vipra/cmd/lda/Analyzer.java @@ -12,6 +12,7 @@ import java.util.Comparator; import java.util.Date; import java.util.List; import java.util.ListIterator; +import java.util.function.Consumer; import org.bson.types.ObjectId; @@ -70,7 +71,7 @@ public class Analyzer { throw new AnalyzerException("dtm binary not found at path: " + config.getDtmPath() + ", check config key 'tm.dtmpath'"); } - public void analyze(final TopicModelConfig modelConfig, final boolean reread) + public void analyze(final TopicModelConfig modelConfig, final boolean reread, final Consumer<Double> progressCallback) throws AnalyzerException, DatabaseException, ParseException, IOException, InterruptedException { final File modelDir = modelConfig.getModelDir(dataDir); @@ -122,6 +123,7 @@ public class Analyzer { double avgDuration = 0; final double smoothingFactor = 0.1; int lastLength = printProgress(0, 0, iteration, maxIterationsLength, 0, modelConfig, 0); + int nextUpdate = 10; while ((line = in.readLine()) != null) { if (line.contains("EM iter")) { @@ -140,9 +142,15 @@ public class Analyzer { avgDuration = smoothingFactor * lastDuration + (1 - smoothingFactor) * avgDuration; final long remainingDuration = (long) avgDuration * (modelConfig.getDynamicMaxIterations() - iteration); - if (progress <= 100) + if (progress <= 100) { lastLength = printProgress(tenthPercent, progress, iteration, maxIterationsLength, remainingDuration, modelConfig, lastLength); + + if (progress >= nextUpdate) { + progressCallback.accept(progress); + nextUpdate += 10; + } + } } } diff --git a/vipra-cmd/src/main/java/de/vipra/cmd/option/ModelingCommand.java b/vipra-cmd/src/main/java/de/vipra/cmd/option/ModelingCommand.java index 5e1b095bd017b241a3265caad255b1c80c1cdb49..6958cb21af03c226656dafbb16ec801cb6cb7e05 100644 --- a/vipra-cmd/src/main/java/de/vipra/cmd/option/ModelingCommand.java +++ b/vipra-cmd/src/main/java/de/vipra/cmd/option/ModelingCommand.java @@ -3,6 +3,7 @@ package de.vipra.cmd.option; import java.io.File; import java.io.IOException; import java.text.ParseException; +import java.util.function.Consumer; import de.vipra.cmd.lda.Analyzer; import de.vipra.cmd.lda.AnalyzerException; @@ -27,7 +28,7 @@ public class ModelingCommand implements Command { this.reread = reread; } - private void modelForModel(final TopicModelConfig modelConfig) + private void modelForModel(final TopicModelConfig modelConfig, final Consumer<Double> progressCallback) throws AnalyzerException, ConfigException, DatabaseException, ParseException, IOException, InterruptedException { if (reread) ConsoleUtils.info("rereading model: " + modelConfig.getName()); @@ -43,7 +44,7 @@ public class ModelingCommand implements Command { /* * do topic modeling */ - analyzer.analyze(modelConfig, reread); + analyzer.analyze(modelConfig, reread, progressCallback); timer.lap("topic modeling"); /* @@ -63,7 +64,14 @@ public class ModelingCommand implements Command { final IPCClient ipcClient = new IPCClient(); for (final TopicModelConfig modelConfig : config.getTopicModelConfigs(models)) { ipcClient.send(new IPCMessage(IPCMessageCode.MODELING).message("Started generating model '" + modelConfig.getName() + "'")); - modelForModel(modelConfig); + modelForModel(modelConfig, (progress) -> { + try { + ipcClient.send(new IPCMessage(IPCMessageCode.MODELING) + .message("Modeling progress: " + StringUtils.pad(Integer.toString((int) Math.floor(progress)), 3, true) + "% for model '" + + modelConfig.getName() + "'")); + } catch (Exception e) {} + }); + ipcClient.send(new IPCMessage(IPCMessageCode.MODELING).message("Finished generating model '" + modelConfig.getName() + "'")); } ipcClient.close(); } diff --git a/vipra-util/src/main/java/de/vipra/util/IPCClient.java b/vipra-util/src/main/java/de/vipra/util/IPCClient.java index 2d6c50e4368c31b5b72526a4c7b7d26ff4c67980..6468e1938a0169684e366d1028d3cb9365ba38d5 100644 --- a/vipra-util/src/main/java/de/vipra/util/IPCClient.java +++ b/vipra-util/src/main/java/de/vipra/util/IPCClient.java @@ -29,6 +29,7 @@ public class IPCClient implements Closeable { @Override public void close() throws IOException { + out.flush(); socket.close(); } diff --git a/vipra-util/src/main/java/de/vipra/util/IPCServer.java b/vipra-util/src/main/java/de/vipra/util/IPCServer.java index ba440f5dc163b04a5fb164c6e13e872929824386..84ea17763a1fc8558da78d4a4c4f727359aa5b8e 100644 --- a/vipra-util/src/main/java/de/vipra/util/IPCServer.java +++ b/vipra-util/src/main/java/de/vipra/util/IPCServer.java @@ -1,6 +1,5 @@ package de.vipra.util; -import java.io.DataInputStream; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; @@ -34,28 +33,14 @@ public class IPCServer extends Thread { while (true) { try { - final Socket connectionSocket = welcomeSocket.accept(); - final DataInputStream in = new DataInputStream(connectionSocket.getInputStream()); - final String input = in.readUTF(); - final IPCMessage message = Config.mapper.readValue(input, IPCMessage.class); - if (message.getCode() != IPCMessageCode.TEST) - handleMessage(message); + Socket socket = welcomeSocket.accept(); + IPCThread.start(socket, (message) -> handleMessage(message)); } catch (final IOException e) { log.error(e.getMessage()); } } } - @Override - public void start() { - if (!isAlive()) - super.start(); - } - - public void register(final String id, final IPCChain chain) { - chains.put(id, chain); - } - private void handleMessage(final IPCMessage message) { log.info("message received: " + message.getCode()); for (final Entry<String, IPCChain> chain : chains.entrySet()) { @@ -67,6 +52,16 @@ public class IPCServer extends Thread { } } + @Override + public void start() { + if (!isAlive()) + super.start(); + } + + public void register(final String id, final IPCChain chain) { + chains.put(id, chain); + } + public static IPCServer getInstance() { if (instance == null) { instance = new IPCServer(); diff --git a/vipra-util/src/main/java/de/vipra/util/IPCThread.java b/vipra-util/src/main/java/de/vipra/util/IPCThread.java new file mode 100644 index 0000000000000000000000000000000000000000..d0056489f00c8c817fca25e138bbb25756888fe3 --- /dev/null +++ b/vipra-util/src/main/java/de/vipra/util/IPCThread.java @@ -0,0 +1,55 @@ +package de.vipra.util; + +import java.io.DataInputStream; +import java.io.IOException; +import java.net.Socket; +import java.util.function.Consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IPCThread extends Thread { + + public static final Logger log = LoggerFactory.getLogger(IPCThread.class); + + private final Socket socket; + private final DataInputStream in; + private final Consumer<IPCMessage> callback; + + private IPCThread(final Socket socket, final Consumer<IPCMessage> callback) throws IOException { + this.socket = socket; + this.callback = callback; + in = new DataInputStream(socket.getInputStream()); + } + + @Override + public void run() { + while (true) { + final String input; + try { + input = in.readUTF(); + } catch (IOException e) { + return; + } + if (input == null) + return; + IPCMessage message; + try { + message = Config.mapper.readValue(input, IPCMessage.class); + } catch (IOException e) { + continue; + } + if (message.getCode() != IPCMessageCode.TEST) + callback.accept(message); + if (socket.isClosed()) + return; + } + } + + public static IPCThread start(final Socket socket, final Consumer<IPCMessage> callback) throws IOException { + final IPCThread t = new IPCThread(socket, callback); + t.start(); + return t; + } + +}