1 package com.kodexa.client.remote;
2
3 import com.kodexa.client.Document;
4 import com.kodexa.client.connectors.Connector;
5 import com.kodexa.client.connectors.FolderConnector;
6 import com.kodexa.client.connectors.InputStreamConnector;
7 import com.kodexa.client.pipeline.Options;
8 import com.kodexa.client.pipeline.Pipeline;
9 import com.kodexa.client.pipeline.PipelineContext;
10 import lombok.extern.slf4j.Slf4j;
11
12 import java.io.InputStream;
13 import java.util.Map;
14
15
16
17
18 @Slf4j
19 public class RemotePipeline extends Pipeline {
20
21 private final RemotePipelineSession session;
22
23 private Options options = Options.start();
24
25 private Map<String, Object> parameters;
26
27 public RemotePipeline(String ref, Connector connector) {
28 super(connector);
29 session = new RemotePipelineSession(ref);
30 }
31
32 public RemotePipeline(String ref, Document document) {
33 super(document);
34 session = new RemotePipelineSession(ref);
35 }
36
37 public RemotePipeline parameters(Map<String, Object> parameters) {
38 this.parameters = parameters;
39 return this;
40 }
41
42 public RemotePipeline options(Options options) {
43 this.options = options;
44 return this;
45 }
46
47 @Override
48 public PipelineContext run() {
49
50 log.info("Starting pipeline");
51 PipelineContextl#PipelineContext">PipelineContext pipelineContext = new PipelineContext();
52
53 connector.forEachRemaining(document -> {
54 long startTime = System.currentTimeMillis();
55
56 CloudSession session = this.session.createSession(CloudSessionType.pipeline);
57 CloudExecution execution = this.session.executeService(session, document, pipelineContext, options, parameters);
58 execution = this.session.waitForExecution(session, execution);
59 this.session.mergeStores(session, execution, pipelineContext);
60 pipelineContext.setOutputDocument(this.session.getOutputDocument(session, execution));
61 long endTime = System.currentTimeMillis();
62 log.info("Pipeline processed in " + (endTime - startTime) + " ms");
63
64 });
65
66 log.info("Pipeline completed");
67 return pipelineContext;
68
69 }
70
71
72
73
74
75
76
77
78 public static RemotePipeline fromText(String ref, String text) {
79 return new RemotePipeline(ref, Document.fromText(text));
80 }
81
82 public static RemotePipeline fromUrl(String ref, String url) {
83 return new RemotePipeline(ref, Document.fromUrl(url));
84 }
85
86 public static RemotePipeline fromInputStream(String ref, InputStream inputStream) {
87 RemotePipelinee.html#RemotePipeline">RemotePipeline pipeline = new RemotePipeline(ref, new InputStreamConnector(inputStream));
88 pipeline.options(Options.start()
89 .attachSource());
90 return pipeline;
91 }
92
93 public static RemotePipeline fromFolder(String ref, String folderPath, String filenameFilter, boolean recursive) {
94 RemotePipelinee.html#RemotePipeline">RemotePipeline pipeline = new RemotePipeline(ref, new FolderConnector(folderPath, filenameFilter, recursive));
95 pipeline.options(Options.start()
96 .attachSource());
97 return pipeline;
98 }
99 }