001// Generated by delombok at Fri Sep 03 18:26:10 UTC 2021
002package com.kodexa.client.pipeline;
003
004import com.kodexa.client.Document;
005import com.kodexa.client.connectors.Connector;
006import com.kodexa.client.connectors.FolderConnector;
007import com.kodexa.client.connectors.InputStreamConnector;
008import com.kodexa.client.remote.RemoteAction;
009import com.kodexa.client.sink.Sink;
010import com.kodexa.client.steps.PipelineStep;
011import java.io.InputStream;
012import java.util.ArrayList;
013import java.util.List;
014
015/**
016 * A Pipeline allows you to put together steps from a connector to a sink
017 * to enable re-use and encapsulation of steps.
018 */
019public class Pipeline {
020    @java.lang.SuppressWarnings("all")
021    private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(Pipeline.class);
022    protected final Connector connector;
023    private final PipelineContext context;
024    private Sink sink;
025    private List<PipelineStepWrapper> steps = new ArrayList<>();
026    protected List<PipelineParameter> parameters = new ArrayList<>();
027
028    public Pipeline(Connector connector) {
029        this.connector = connector;
030        this.context = new PipelineContext();
031    }
032
033    public Pipeline(Document document) {
034        this.connector = new DocumentConnector(document);
035        this.context = new PipelineContext();
036    }
037
038    public Pipeline addStep(Class stepClass, Options options) {
039        steps.add(new PipelineStepWrapper(new ClassBasedStep(stepClass), options));
040        return this;
041    }
042
043    public Pipeline addStep(String actionSlug, Options options) {
044        steps.add(new PipelineStepWrapper(new RemoteAction(actionSlug), options));
045        return this;
046    }
047
048    public Pipeline addStep(PipelineStep step) {
049        steps.add(new PipelineStepWrapper(step, new Options()));
050        return this;
051    }
052
053    public Pipeline setSink(Sink sink) {
054        this.sink = sink;
055        return this;
056    }
057
058    public Pipeline parameters(List<PipelineParameter> parameters) {
059        this.parameters = parameters;
060        return this;
061    }
062
063    public PipelineContext run() {
064        log.info("Starting pipeline");
065        this.context.setParameters(this.parameters);
066        connector.forEachRemaining(document -> {
067            for (PipelineStepWrapper step : steps) {
068                long startTime = System.currentTimeMillis();
069                document = step.process(document, context);
070                long endTime = System.currentTimeMillis();
071                log.info("Step processed in " + (endTime - startTime) + " ms");
072            }
073            if (sink != null) {
074                log.info("Writing to sink " + sink.getName());
075                sink.sink(document);
076            }
077            context.setOutputDocument(document);
078        });
079        log.info("Pipeline completed");
080        return context;
081    }
082
083    /**
084     * Create a pipeline with a single document as input containing the content from this text
085     *
086     * @param text The text to use as the content of the document
087     * @return an instance of the pipeline
088     */
089    public static Pipeline fromText(String text) {
090        return new Pipeline(Document.fromText(text));
091    }
092
093    public static Pipeline fromUrl(String url) {
094        return new Pipeline(Document.fromUrl(url));
095    }
096
097    public static Pipeline fromInputStream(InputStream inputStream) {
098        return new Pipeline(new InputStreamConnector(inputStream));
099    }
100
101    public static Pipeline fromFolder(String folderPath, String filenameFilter, boolean recursive) {
102        return new Pipeline(new FolderConnector(folderPath, filenameFilter, recursive));
103    }
104
105    public Pipeline addParameter(String name, String value) {
106        parameters.add(new PipelineParameter(name, value));
107        return this;
108    }
109}