1 package com.kodexa.client.pipeline;
2
3 import com.kodexa.client.Document;
4 import com.kodexa.client.connectors.Connector;
5 import com.kodexa.client.connectors.FolderConnector;
6 import com.kodexa.client.connectors.InputStreamConnector;
7 import com.kodexa.client.remote.RemoteAction;
8 import com.kodexa.client.steps.PipelineStep;
9 import lombok.extern.slf4j.Slf4j;
10
11 import java.io.InputStream;
12 import java.util.ArrayList;
13 import java.util.List;
14
15
16
17
18
19 @Slf4j
20 public class Pipeline {
21
22 protected final Connector connector;
23
24 private final PipelineContext context;
25
26 private List<PipelineStepWrapper> steps = new ArrayList<>();
27
28 protected List<PipelineParameter> parameters = new ArrayList<>();
29
30 public Pipeline(Connector connector) {
31 this.connector = connector;
32 this.context = new PipelineContext();
33 }
34
35 public Pipeline(Document document) {
36 this.connector = new DocumentConnector(document);
37 this.context = new PipelineContext();
38 }
39
40 public Pipeline addStep(Class stepClass, Options options) {
41 steps.add(new PipelineStepWrapper(new ClassBasedStep(stepClass), options));
42 return this;
43 }
44
45 public Pipeline addStep(String actionSlug, Options options) {
46 steps.add(new PipelineStepWrapper(new RemoteAction(actionSlug), options));
47 return this;
48 }
49
50 public Pipeline addStep(PipelineStep step) {
51 steps.add(new PipelineStepWrapper(step, new Options()));
52 return this;
53 }
54
55 public Pipeline parameters(List<PipelineParameter> parameters) {
56 this.parameters = parameters;
57 return this;
58 }
59
60 public PipelineContext run() {
61
62 log.info("Starting pipeline");
63
64 this.context.setParameters(this.parameters);
65
66 connector.forEachRemaining(document -> {
67 for (PipelineStepWrapper step : steps) {
68 long startTime = System.currentTimeMillis();
69 document = step.process(document, context);
70 long endTime = System.currentTimeMillis();
71 log.info("Step processed in " + (endTime - startTime) + " ms");
72 }
73 context.setOutputDocument(document);
74 });
75
76 log.info("Pipeline completed");
77 return context;
78
79 }
80
81
82
83
84
85
86
87 public static Pipeline fromText(String text) {
88 return new Pipeline(Document.fromText(text));
89 }
90
91 public static Pipeline fromUrl(String url) {
92 return new Pipeline(Document.fromUrl(url));
93 }
94
95 public static Pipeline fromInputStream(InputStream inputStream) {
96 return new Pipeline(new InputStreamConnector(inputStream));
97 }
98
99 public static Pipeline fromFolder(String folderPath, String filenameFilter, boolean recursive) {
100 return new Pipeline(new FolderConnector(folderPath, filenameFilter, recursive));
101 }
102
103 public Pipeline addParameter(String name, String value) {
104 parameters.add(new PipelineParameter(name, value));
105 return this;
106 }
107 }