View Javadoc
1   package com.kodexa.client.remote;
2   
3   import com.kodexa.client.Document;
4   import com.kodexa.client.connectors.Connector;
5   import com.kodexa.client.connectors.FolderConnector;
6   import com.kodexa.client.connectors.InputStreamConnector;
7   import com.kodexa.client.pipeline.Options;
8   import com.kodexa.client.pipeline.Pipeline;
9   import com.kodexa.client.pipeline.PipelineContext;
10  import lombok.extern.slf4j.Slf4j;
11  
12  import java.io.InputStream;
13  import java.util.Map;
14  
15  /**
16   * A remote-hosted Pipeline
17   */
18  @Slf4j
19  public class RemotePipeline extends Pipeline {
20  
21      private final RemotePipelineSession session;
22  
23      private Options options = Options.start();
24  
25      private Map<String, Object> parameters;
26  
27      public RemotePipeline(String ref, Connector connector) {
28          super(connector);
29          session = new RemotePipelineSession(ref);
30      }
31  
32      public RemotePipeline(String ref, Document document) {
33          super(document);
34          session = new RemotePipelineSession(ref);
35      }
36  
37      public RemotePipeline parameters(Map<String, Object> parameters) {
38          this.parameters = parameters;
39          return this;
40      }
41  
42      public RemotePipeline options(Options options) {
43          this.options = options;
44          return this;
45      }
46  
47      @Override
48      public PipelineContext run() {
49  
50          log.info("Starting pipeline");
51          PipelineContextl#PipelineContext">PipelineContext pipelineContext = new PipelineContext();
52  
53          connector.forEachRemaining(document -> {
54              long startTime = System.currentTimeMillis();
55  
56              CloudSession session = this.session.createSession(CloudSessionType.pipeline);
57              CloudExecution execution = this.session.executeService(session, document, pipelineContext, options, parameters);
58              execution = this.session.waitForExecution(session, execution);
59              this.session.mergeStores(session, execution, pipelineContext);
60              pipelineContext.setOutputDocument(this.session.getOutputDocument(session, execution));
61              long endTime = System.currentTimeMillis();
62              log.info("Pipeline processed in " + (endTime - startTime) + " ms");
63  
64          });
65  
66          log.info("Pipeline completed");
67          return pipelineContext;
68  
69      }
70  
71      /**
72       * Create a new pipeline based on a remote reference and a text document
73       *
74       * @param ref  the reference (slug) to the pipeline
75       * @param text The text to use to create a new document
76       * @return A new remote pipeline
77       */
78      public static RemotePipeline fromText(String ref, String text) {
79          return new RemotePipeline(ref, Document.fromText(text));
80      }
81  
82      public static RemotePipeline fromUrl(String ref, String url) {
83          return new RemotePipeline(ref, Document.fromUrl(url));
84      }
85  
86      public static RemotePipeline fromInputStream(String ref, InputStream inputStream) {
87          RemotePipelinee.html#RemotePipeline">RemotePipeline pipeline = new RemotePipeline(ref, new InputStreamConnector(inputStream));
88          pipeline.options(Options.start()
89                                  .attachSource());
90          return pipeline;
91      }
92  
93      public static RemotePipeline fromFolder(String ref, String folderPath, String filenameFilter, boolean recursive) {
94          RemotePipelinee.html#RemotePipeline">RemotePipeline pipeline = new RemotePipeline(ref, new FolderConnector(folderPath, filenameFilter, recursive));
95          pipeline.options(Options.start()
96                                  .attachSource());
97          return pipeline;
98      }
99  }