View Javadoc
1   package com.kodexa.client.remote;
2   
3   import com.kodexa.client.Document;
4   import com.kodexa.client.KodexaException;
5   import com.kodexa.client.pipeline.Options;
6   import com.kodexa.client.pipeline.PipelineContext;
7   import com.kodexa.client.registry.SourceRegistry;
8   import com.kodexa.client.store.TableStore;
9   import lombok.Getter;
10  import lombok.extern.slf4j.Slf4j;
11  import org.apache.commons.io.IOUtils;
12  import org.apache.http.HttpResponse;
13  import org.apache.http.client.methods.HttpGet;
14  import org.apache.http.client.methods.HttpPost;
15  import org.apache.http.entity.ContentType;
16  import org.apache.http.entity.mime.HttpMultipartMode;
17  import org.apache.http.entity.mime.MultipartEntityBuilder;
18  import org.apache.http.entity.mime.content.ByteArrayBody;
19  import org.apache.http.entity.mime.content.StringBody;
20  import org.apache.http.impl.client.CloseableHttpClient;
21  import org.apache.http.impl.client.HttpClientBuilder;
22  
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.nio.charset.StandardCharsets;
26  import java.util.Map;
27  
28  /**
29   * Abstract base for both Cloud Service and Cloud Pipelines in Kodexa
30   */
31  @Slf4j
32  public abstract class AbstractKodexaSession extends AbstractKodexaConnection {
33  
34      @Getter
35      private final String ref;
36  
37      public AbstractKodexaSession(String ref) {
38          this.ref = ref;
39      }
40  
41      public TableStore getTableStore(CloudSession session, CloudExecution execution, CloudStore cloudStore) {
42          try (CloseableHttpClient client = HttpClientBuilder.create()
43                  .setDefaultRequestConfig(getRequestConfig())
44                  .build()) {
45              Thread.sleep(1000);
46              String url = KodexaPlatform.getUrl() + "/api/sessions/" + session.getId() + "/executions/" + execution.getId() + "/stores/" + cloudStore.getId();
47              HttpGet post = new HttpGet(url);
48              post.addHeader("x-access-token", KodexaPlatform.getAccessToken());
49              HttpResponse response = client.execute(post);
50              if (response.getStatusLine().getStatusCode() != 200) {
51                  throw new KodexaException("Unable to get store from Kodexa, check your access token and URL [" + response.getStatusLine().getStatusCode() + "]");
52              }
53              return jsonOm.readValue(response.getEntity().getContent(), CloudStore.class).getData();
54          } catch (IOException | InterruptedException e) {
55              throw new KodexaException("Unable to connect to Kodexa", e);
56          }
57      }
58  
59      public Document getOutputDocument(CloudSession session, CloudExecution execution) {
60          ContentObject outputDocument = execution.getOutputDocument();
61          if (outputDocument == null) {
62              return null;
63          }
64          try (CloseableHttpClient client = HttpClientBuilder.create()
65                  .setDefaultRequestConfig(getRequestConfig())
66                  .build()) {
67              Thread.sleep(1000);
68              String url = KodexaPlatform.getUrl() + "/api/sessions/" + session.getId() + "/executions/" + execution.getId() + "/objects/" + outputDocument.getId();
69              HttpGet post = new HttpGet(url);
70              post.addHeader("x-access-token", KodexaPlatform.getAccessToken());
71              HttpResponse response = client.execute(post);
72              if (response.getStatusLine().getStatusCode() != 200) {
73                  throw new KodexaException("Unable to get store, check your access token and URL [" + response.getStatusLine().getStatusCode() + "]");
74              }
75              return Document.fromMsgPack(response.getEntity().getContent());
76          } catch (IOException | InterruptedException e) {
77              throw new KodexaException("Unable to connect to Kodexa", e);
78          }
79      }
80  
81      public void mergeStores(CloudSession session, CloudExecution execution, PipelineContext pipelineContext) {
82          for (CloudStore store : execution.getStores()) {
83              TableStore tableStore = getTableStore(session, execution, store);
84              pipelineContext.addStore(store.getName(), tableStore);
85              log.info("Store " + store.getName() + " with " + tableStore.getRows().size() + " rows");
86          }
87      }
88  
89  
90      protected CloudExecutionCloudExecution.html#CloudExecution">CloudExecution waitForExecution(CloudSession session, CloudExecution execution) {
91          String status = execution.getStatus();
92          while (execution.getStatus().equals("PENDING") || execution.getStatus().equals("RUNNING")) {
93              try (CloseableHttpClient client = HttpClientBuilder.create()
94                      .setDefaultRequestConfig(getRequestConfig())
95                      .build()) {
96                  Thread.sleep(1000);
97                  String url = KodexaPlatform.getUrl() + "/api/sessions/" + session.getId() + "/executions/" + execution.getId();
98                  HttpGet post = new HttpGet(url);
99                  post.addHeader("x-access-token", KodexaPlatform.getAccessToken());
100                 HttpResponse response = client.execute(post);
101                 if (response.getStatusLine().getStatusCode() != 200) {
102                     throw new KodexaException("Unable to create a session on Kodexa, check your access token and URL [" + response.getStatusLine().getStatusCode() + "]");
103                 }
104 
105                 String responseJson = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
106                 execution = jsonOm.readValue(responseJson, CloudExecution.class);
107                 if (!execution.getStatus().equals(status)) {
108                     log.info("Execution Status changed from " + status + " => " + execution.getStatus());
109                     status = execution.getStatus();
110                 }
111             } catch (IOException | InterruptedException e) {
112                 throw new KodexaException("Unable to connect to Kodexa", e);
113             }
114         }
115 
116 
117         if (execution.getStatus().equals("FAILED")) {
118             CloudExceptionDetail exceptionDetail = execution.getExceptionDetail();
119             log.error("Failed: " + exceptionDetail.getMessage());
120             if (exceptionDetail.getErrorType() != null)
121                 log.error("Exception Type: " + exceptionDetail.getErrorType());
122             log.error("More information is available:\n\n" + exceptionDetail.getHelp() + "\n");
123             throw new KodexaException("Failed:" + exceptionDetail.getMessage() + "]");
124         } else {
125             log.info("Execution completed with " + execution.getStores().size() + " stores and " + execution.getContentObjects().size() + " content objects");
126         }
127 
128         return execution;
129     }
130 
131     protected CloudSession createSession(CloudSessionType sessionType) {
132         log.info("Creating session in Kodexa");
133         try (CloseableHttpClient client = HttpClientBuilder.create()
134                 .setDefaultRequestConfig(getRequestConfig())
135                 .build()) {
136             String url = KodexaPlatform.getUrl() + "/api/sessions?" + sessionType + "=" + ref;
137 
138             log.info("Connecting to [" + url + "]");
139 
140             HttpPost post = new HttpPost(url);
141             post.addHeader("x-access-token", KodexaPlatform.getAccessToken());
142             HttpResponse response = client.execute(post);
143             if (response.getStatusLine().getStatusCode() != 200) {
144                 throw new KodexaException("Unable to create a session, check your access token and URL [" + response.getStatusLine().getStatusCode() + "]");
145             }
146             CloudSession session = jsonOm.readValue(response.getEntity().getContent(), CloudSession.class);
147             log.info("Session created [" + session + "]");
148             return session;
149         } catch (IOException e) {
150             throw new KodexaException("Unable to create session on Kodexa", e);
151         }
152     }
153 
154     /**
155      * Execute the service in Kodexa
156      *
157      * @param session    The session to use
158      * @param document   The document to send
159      * @param context    The context for the pipeline
160      * @param options    The options for the execution
161      * @param parameters the parameters to pass to this pipeline
162      * @return An instance of a cloud execution
163      */
164     public CloudExecution executeService(CloudSession session, Document document, PipelineContext context, Options options, Map<String, Object> parameters) {
165         log.info("Executing service in Kodexa");
166         try (CloseableHttpClient client = HttpClientBuilder.create()
167                 .setDefaultRequestConfig(getRequestConfig())
168                 .build()) {
169             String url = KodexaPlatform.getUrl() + "/api/sessions/" + session.getId() + "/execute";
170             log.info("Connecting to [" + url + "]");
171             HttpPost post = new HttpPost(url);
172             post.addHeader("x-access-token", KodexaPlatform.getAccessToken());
173 
174             MultipartEntityBuilder builder = MultipartEntityBuilder.create();
175 
176             ByteArrayBody documentBody = new ByteArrayBody(document.toMsgPack(), "document.kdxa");
177 
178             if (options.isAttachSource()) {
179                 log.info("Attaching source file");
180                 InputStream inputStream = SourceRegistry.getInstance().getSource(document);
181                 ByteArrayBody fileBody = new ByteArrayBody(IOUtils.toByteArray(inputStream), "test.doc");
182                 builder.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
183                 builder.addPart("file", fileBody);
184                 builder.addPart("file_document", documentBody);
185             } else {
186                 builder.addPart("document", documentBody);
187             }
188 
189             StringBody optionsBody = new StringBody(jsonOm.writeValueAsString(options.get()), ContentType.APPLICATION_JSON);
190             builder.addPart("options", optionsBody);
191 
192             if (parameters != null) {
193                 StringBody parameterBody = new StringBody(jsonOm.writeValueAsString(parameters), ContentType.APPLICATION_JSON);
194                 builder.addPart("parameters", parameterBody);
195             }
196 
197             post.setEntity(builder.build());
198             HttpResponse response = client.execute(post);
199             if (response.getStatusLine().getStatusCode() != 200) {
200                 throw new KodexaException("Unable to create a session, check your access token and URL [" + response.getStatusLine().getStatusCode() + "]");
201             }
202             String responseJson = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
203             CloudExecution cloudExecution = jsonOm.readValue(responseJson, CloudExecution.class);
204             log.info("Execution started [" + cloudExecution.getId() + "]");
205             return cloudExecution;
206         } catch (IOException e) {
207             throw new KodexaException("Unable to create a session on Kodexa", e);
208         }
209     }
210 }