1 package com.kodexa.client.remote;
2
3 import com.kodexa.client.Document;
4 import com.kodexa.client.KodexaException;
5 import com.kodexa.client.pipeline.Options;
6 import com.kodexa.client.pipeline.PipelineContext;
7 import com.kodexa.client.registry.SourceRegistry;
8 import com.kodexa.client.store.TableStore;
9 import lombok.Getter;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.commons.io.IOUtils;
12 import org.apache.http.HttpResponse;
13 import org.apache.http.client.methods.HttpGet;
14 import org.apache.http.client.methods.HttpPost;
15 import org.apache.http.entity.ContentType;
16 import org.apache.http.entity.mime.HttpMultipartMode;
17 import org.apache.http.entity.mime.MultipartEntityBuilder;
18 import org.apache.http.entity.mime.content.ByteArrayBody;
19 import org.apache.http.entity.mime.content.StringBody;
20 import org.apache.http.impl.client.CloseableHttpClient;
21 import org.apache.http.impl.client.HttpClientBuilder;
22
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.nio.charset.StandardCharsets;
26 import java.util.Map;
27
28
29
30
31 @Slf4j
32 public abstract class AbstractKodexaSession extends AbstractKodexaConnection {
33
34 @Getter
35 private final String ref;
36
37 public AbstractKodexaSession(String ref) {
38 this.ref = ref;
39 }
40
41 public TableStore getTableStore(CloudSession session, CloudExecution execution, CloudStore cloudStore) {
42 try (CloseableHttpClient client = HttpClientBuilder.create()
43 .setDefaultRequestConfig(getRequestConfig())
44 .build()) {
45 Thread.sleep(1000);
46 String url = KodexaPlatform.getUrl() + "/api/sessions/" + session.getId() + "/executions/" + execution.getId() + "/stores/" + cloudStore.getId();
47 HttpGet post = new HttpGet(url);
48 post.addHeader("x-access-token", KodexaPlatform.getAccessToken());
49 HttpResponse response = client.execute(post);
50 if (response.getStatusLine().getStatusCode() != 200) {
51 throw new KodexaException("Unable to get store from Kodexa, check your access token and URL [" + response.getStatusLine().getStatusCode() + "]");
52 }
53 return jsonOm.readValue(response.getEntity().getContent(), CloudStore.class).getData();
54 } catch (IOException | InterruptedException e) {
55 throw new KodexaException("Unable to connect to Kodexa", e);
56 }
57 }
58
59 public Document getOutputDocument(CloudSession session, CloudExecution execution) {
60 ContentObject outputDocument = execution.getOutputDocument();
61 if (outputDocument == null) {
62 return null;
63 }
64 try (CloseableHttpClient client = HttpClientBuilder.create()
65 .setDefaultRequestConfig(getRequestConfig())
66 .build()) {
67 Thread.sleep(1000);
68 String url = KodexaPlatform.getUrl() + "/api/sessions/" + session.getId() + "/executions/" + execution.getId() + "/objects/" + outputDocument.getId();
69 HttpGet post = new HttpGet(url);
70 post.addHeader("x-access-token", KodexaPlatform.getAccessToken());
71 HttpResponse response = client.execute(post);
72 if (response.getStatusLine().getStatusCode() != 200) {
73 throw new KodexaException("Unable to get store, check your access token and URL [" + response.getStatusLine().getStatusCode() + "]");
74 }
75 return Document.fromMsgPack(response.getEntity().getContent());
76 } catch (IOException | InterruptedException e) {
77 throw new KodexaException("Unable to connect to Kodexa", e);
78 }
79 }
80
81 public void mergeStores(CloudSession session, CloudExecution execution, PipelineContext pipelineContext) {
82 for (CloudStore store : execution.getStores()) {
83 TableStore tableStore = getTableStore(session, execution, store);
84 pipelineContext.addStore(store.getName(), tableStore);
85 log.info("Store " + store.getName() + " with " + tableStore.getRows().size() + " rows");
86 }
87 }
88
89
90 protected CloudExecutionCloudExecution.html#CloudExecution">CloudExecution waitForExecution(CloudSession session, CloudExecution execution) {
91 String status = execution.getStatus();
92 while (execution.getStatus().equals("PENDING") || execution.getStatus().equals("RUNNING")) {
93 try (CloseableHttpClient client = HttpClientBuilder.create()
94 .setDefaultRequestConfig(getRequestConfig())
95 .build()) {
96 Thread.sleep(1000);
97 String url = KodexaPlatform.getUrl() + "/api/sessions/" + session.getId() + "/executions/" + execution.getId();
98 HttpGet post = new HttpGet(url);
99 post.addHeader("x-access-token", KodexaPlatform.getAccessToken());
100 HttpResponse response = client.execute(post);
101 if (response.getStatusLine().getStatusCode() != 200) {
102 throw new KodexaException("Unable to create a session on Kodexa, check your access token and URL [" + response.getStatusLine().getStatusCode() + "]");
103 }
104
105 String responseJson = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
106 execution = jsonOm.readValue(responseJson, CloudExecution.class);
107 if (!execution.getStatus().equals(status)) {
108 log.info("Execution Status changed from " + status + " => " + execution.getStatus());
109 status = execution.getStatus();
110 }
111 } catch (IOException | InterruptedException e) {
112 throw new KodexaException("Unable to connect to Kodexa", e);
113 }
114 }
115
116
117 if (execution.getStatus().equals("FAILED")) {
118 CloudExceptionDetail exceptionDetail = execution.getExceptionDetail();
119 log.error("Failed: " + exceptionDetail.getMessage());
120 if (exceptionDetail.getErrorType() != null)
121 log.error("Exception Type: " + exceptionDetail.getErrorType());
122 log.error("More information is available:\n\n" + exceptionDetail.getHelp() + "\n");
123 throw new KodexaException("Failed:" + exceptionDetail.getMessage() + "]");
124 } else {
125 log.info("Execution completed with " + execution.getStores().size() + " stores and " + execution.getContentObjects().size() + " content objects");
126 }
127
128 return execution;
129 }
130
131 protected CloudSession createSession(CloudSessionType sessionType) {
132 log.info("Creating session in Kodexa");
133 try (CloseableHttpClient client = HttpClientBuilder.create()
134 .setDefaultRequestConfig(getRequestConfig())
135 .build()) {
136 String url = KodexaPlatform.getUrl() + "/api/sessions?" + sessionType + "=" + ref;
137
138 log.info("Connecting to [" + url + "]");
139
140 HttpPost post = new HttpPost(url);
141 post.addHeader("x-access-token", KodexaPlatform.getAccessToken());
142 HttpResponse response = client.execute(post);
143 if (response.getStatusLine().getStatusCode() != 200) {
144 throw new KodexaException("Unable to create a session, check your access token and URL [" + response.getStatusLine().getStatusCode() + "]");
145 }
146 CloudSession session = jsonOm.readValue(response.getEntity().getContent(), CloudSession.class);
147 log.info("Session created [" + session + "]");
148 return session;
149 } catch (IOException e) {
150 throw new KodexaException("Unable to create session on Kodexa", e);
151 }
152 }
153
154
155
156
157
158
159
160
161
162
163
164 public CloudExecution executeService(CloudSession session, Document document, PipelineContext context, Options options, Map<String, Object> parameters) {
165 log.info("Executing service in Kodexa");
166 try (CloseableHttpClient client = HttpClientBuilder.create()
167 .setDefaultRequestConfig(getRequestConfig())
168 .build()) {
169 String url = KodexaPlatform.getUrl() + "/api/sessions/" + session.getId() + "/execute";
170 log.info("Connecting to [" + url + "]");
171 HttpPost post = new HttpPost(url);
172 post.addHeader("x-access-token", KodexaPlatform.getAccessToken());
173
174 MultipartEntityBuilder builder = MultipartEntityBuilder.create();
175
176 ByteArrayBody documentBody = new ByteArrayBody(document.toMsgPack(), "document.kdxa");
177
178 if (options.isAttachSource()) {
179 log.info("Attaching source file");
180 InputStream inputStream = SourceRegistry.getInstance().getSource(document);
181 ByteArrayBody fileBody = new ByteArrayBody(IOUtils.toByteArray(inputStream), "test.doc");
182 builder.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
183 builder.addPart("file", fileBody);
184 builder.addPart("file_document", documentBody);
185 } else {
186 builder.addPart("document", documentBody);
187 }
188
189 StringBody optionsBody = new StringBody(jsonOm.writeValueAsString(options.get()), ContentType.APPLICATION_JSON);
190 builder.addPart("options", optionsBody);
191
192 if (parameters != null) {
193 StringBody parameterBody = new StringBody(jsonOm.writeValueAsString(parameters), ContentType.APPLICATION_JSON);
194 builder.addPart("parameters", parameterBody);
195 }
196
197 post.setEntity(builder.build());
198 HttpResponse response = client.execute(post);
199 if (response.getStatusLine().getStatusCode() != 200) {
200 throw new KodexaException("Unable to create a session, check your access token and URL [" + response.getStatusLine().getStatusCode() + "]");
201 }
202 String responseJson = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
203 CloudExecution cloudExecution = jsonOm.readValue(responseJson, CloudExecution.class);
204 log.info("Execution started [" + cloudExecution.getId() + "]");
205 return cloudExecution;
206 } catch (IOException e) {
207 throw new KodexaException("Unable to create a session on Kodexa", e);
208 }
209 }
210 }