001// Generated by delombok at Fri Sep 03 18:26:10 UTC 2021 002package com.kodexa.client.pipeline; 003 004import com.kodexa.client.Document; 005import com.kodexa.client.connectors.Connector; 006import com.kodexa.client.connectors.FolderConnector; 007import com.kodexa.client.connectors.InputStreamConnector; 008import com.kodexa.client.remote.RemoteAction; 009import com.kodexa.client.sink.Sink; 010import com.kodexa.client.steps.PipelineStep; 011import java.io.InputStream; 012import java.util.ArrayList; 013import java.util.List; 014 015/** 016 * A Pipeline allows you to put together steps from a connector to a sink 017 * to enable re-use and encapsulation of steps. 018 */ 019public class Pipeline { 020 @java.lang.SuppressWarnings("all") 021 private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(Pipeline.class); 022 protected final Connector connector; 023 private final PipelineContext context; 024 private Sink sink; 025 private List<PipelineStepWrapper> steps = new ArrayList<>(); 026 protected List<PipelineParameter> parameters = new ArrayList<>(); 027 028 public Pipeline(Connector connector) { 029 this.connector = connector; 030 this.context = new PipelineContext(); 031 } 032 033 public Pipeline(Document document) { 034 this.connector = new DocumentConnector(document); 035 this.context = new PipelineContext(); 036 } 037 038 public Pipeline addStep(Class stepClass, Options options) { 039 steps.add(new PipelineStepWrapper(new ClassBasedStep(stepClass), options)); 040 return this; 041 } 042 043 public Pipeline addStep(String actionSlug, Options options) { 044 steps.add(new PipelineStepWrapper(new RemoteAction(actionSlug), options)); 045 return this; 046 } 047 048 public Pipeline addStep(PipelineStep step) { 049 steps.add(new PipelineStepWrapper(step, new Options())); 050 return this; 051 } 052 053 public Pipeline setSink(Sink sink) { 054 this.sink = sink; 055 return this; 056 } 057 058 public Pipeline parameters(List<PipelineParameter> parameters) { 059 this.parameters = parameters; 060 return this; 061 } 062 063 public PipelineContext run() { 064 log.info("Starting pipeline"); 065 this.context.setParameters(this.parameters); 066 connector.forEachRemaining(document -> { 067 for (PipelineStepWrapper step : steps) { 068 long startTime = System.currentTimeMillis(); 069 document = step.process(document, context); 070 long endTime = System.currentTimeMillis(); 071 log.info("Step processed in " + (endTime - startTime) + " ms"); 072 } 073 if (sink != null) { 074 log.info("Writing to sink " + sink.getName()); 075 sink.sink(document); 076 } 077 context.setOutputDocument(document); 078 }); 079 log.info("Pipeline completed"); 080 return context; 081 } 082 083 /** 084 * Create a pipeline with a single document as input containing the content from this text 085 * 086 * @param text The text to use as the content of the document 087 * @return an instance of the pipeline 088 */ 089 public static Pipeline fromText(String text) { 090 return new Pipeline(Document.fromText(text)); 091 } 092 093 public static Pipeline fromUrl(String url) { 094 return new Pipeline(Document.fromUrl(url)); 095 } 096 097 public static Pipeline fromInputStream(InputStream inputStream) { 098 return new Pipeline(new InputStreamConnector(inputStream)); 099 } 100 101 public static Pipeline fromFolder(String folderPath, String filenameFilter, boolean recursive) { 102 return new Pipeline(new FolderConnector(folderPath, filenameFilter, recursive)); 103 } 104 105 public Pipeline addParameter(String name, String value) { 106 parameters.add(new PipelineParameter(name, value)); 107 return this; 108 } 109}