View Javadoc
1   package com.kodexa.client.pipeline;
2   
3   import com.kodexa.client.Document;
4   import com.kodexa.client.connectors.Connector;
5   import com.kodexa.client.connectors.FolderConnector;
6   import com.kodexa.client.connectors.InputStreamConnector;
7   import com.kodexa.client.remote.RemoteAction;
8   import com.kodexa.client.steps.PipelineStep;
9   import lombok.extern.slf4j.Slf4j;
10  
11  import java.io.InputStream;
12  import java.util.ArrayList;
13  import java.util.List;
14  
15  /**
16   * A Pipeline allows you to put together steps from a connector to a sink
17   * to enable re-use and encapsulation of steps.
18   */
19  @Slf4j
20  public class Pipeline {
21  
22      protected final Connector connector;
23  
24      private final PipelineContext context;
25  
26      private List<PipelineStepWrapper> steps = new ArrayList<>();
27  
28      protected List<PipelineParameter> parameters = new ArrayList<>();
29  
30      public Pipeline(Connector connector) {
31          this.connector = connector;
32          this.context = new PipelineContext();
33      }
34  
35      public Pipeline(Document document) {
36          this.connector = new DocumentConnector(document);
37          this.context = new PipelineContext();
38      }
39  
40      public Pipeline addStep(Class stepClass, Options options) {
41          steps.add(new PipelineStepWrapper(new ClassBasedStep(stepClass), options));
42          return this;
43      }
44  
45      public Pipeline addStep(String actionSlug, Options options) {
46          steps.add(new PipelineStepWrapper(new RemoteAction(actionSlug), options));
47          return this;
48      }
49  
50      public Pipeline addStep(PipelineStep step) {
51          steps.add(new PipelineStepWrapper(step, new Options()));
52          return this;
53      }
54  
55      public Pipeline parameters(List<PipelineParameter> parameters) {
56          this.parameters = parameters;
57          return this;
58      }
59  
60      public PipelineContext run() {
61  
62          log.info("Starting pipeline");
63  
64          this.context.setParameters(this.parameters);
65  
66          connector.forEachRemaining(document -> {
67              for (PipelineStepWrapper step : steps) {
68                  long startTime = System.currentTimeMillis();
69                  document = step.process(document, context);
70                  long endTime = System.currentTimeMillis();
71                  log.info("Step processed in " + (endTime - startTime) + " ms");
72              }
73              context.setOutputDocument(document);
74          });
75  
76          log.info("Pipeline completed");
77          return context;
78  
79      }
80  
81      /**
82       * Create a pipeline with a single document as input containing the content from this text
83       *
84       * @param text The text to use as the content of the document
85       * @return an instance of the pipeline
86       */
87      public static Pipeline fromText(String text) {
88          return new Pipeline(Document.fromText(text));
89      }
90  
91      public static Pipeline fromUrl(String url) {
92          return new Pipeline(Document.fromUrl(url));
93      }
94  
95      public static Pipeline fromInputStream(InputStream inputStream) {
96          return new Pipeline(new InputStreamConnector(inputStream));
97      }
98  
99      public static Pipeline fromFolder(String folderPath, String filenameFilter, boolean recursive) {
100         return new Pipeline(new FolderConnector(folderPath, filenameFilter, recursive));
101     }
102 
103     public Pipeline addParameter(String name, String value) {
104         parameters.add(new PipelineParameter(name, value));
105         return this;
106     }
107 }