001/*
002 * Licensed under the Apache License, Version 2.0 (the "License");
003 * you may not use this file except in compliance with the License.
004 * You may obtain a copy of the License at
005 *
006 *     http://www.apache.org/licenses/LICENSE-2.0
007 *
008 * Unless required by applicable law or agreed to in writing, software
009 * distributed under the License is distributed on an "AS IS" BASIS,
010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011 * See the License for the specific language governing permissions and
012 * limitations under the License.
013 */
014package org.gbif.api.model.pipelines;
015
016import java.util.Collection;
017import java.util.Collections;
018import java.util.HashMap;
019import java.util.HashSet;
020import java.util.LinkedHashSet;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.Map;
024import java.util.Queue;
025import java.util.Set;
026import java.util.function.Function;
027import java.util.function.ToIntFunction;
028import java.util.stream.Collectors;
029
030import static org.gbif.api.model.pipelines.StepType.*;
031
032public class PipelinesWorkflow {
033
034  private PipelinesWorkflow() {
035    // NOP
036  }
037
038  private static final Graph<StepType> OCCURRENCE_WF_GRAPH = new Graph<>();
039  private static final Graph<StepType> EVENT_OCCURRENCE_WF_GRAPH = new Graph<>();
040  private static final Graph<StepType> EVENT_WF_GRAPH = new Graph<>();
041  private static final Graph<StepType> VALIDATON_WF_GRAPH = new Graph<>();
042
043  static {
044    // Pipelines occurrence workflow
045    // 1
046    OCCURRENCE_WF_GRAPH.addNode(DWCA_TO_VERBATIM, VERBATIM_TO_IDENTIFIER);
047    OCCURRENCE_WF_GRAPH.addNode(XML_TO_VERBATIM, VERBATIM_TO_IDENTIFIER);
048    OCCURRENCE_WF_GRAPH.addNode(ABCD_TO_VERBATIM, VERBATIM_TO_IDENTIFIER);
049    // 2
050    OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_IDENTIFIER, VERBATIM_TO_INTERPRETED);
051    // 3
052    OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, INTERPRETED_TO_INDEX);
053    OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, HDFS_VIEW);
054    OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, FRAGMENTER);
055
056    // Pipelines event-occurrence workflow
057    // 1
058    EVENT_OCCURRENCE_WF_GRAPH.addNode(DWCA_TO_VERBATIM, VERBATIM_TO_IDENTIFIER);
059    // 2
060    EVENT_OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_IDENTIFIER, VERBATIM_TO_INTERPRETED);
061    // 3
062    EVENT_OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, INTERPRETED_TO_INDEX);
063    EVENT_OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, HDFS_VIEW);
064    EVENT_OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, FRAGMENTER);
065    EVENT_OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, EVENTS_VERBATIM_TO_INTERPRETED);
066    // 4
067    EVENT_OCCURRENCE_WF_GRAPH.addNode(EVENTS_VERBATIM_TO_INTERPRETED, EVENTS_INTERPRETED_TO_INDEX);
068    EVENT_OCCURRENCE_WF_GRAPH.addNode(EVENTS_VERBATIM_TO_INTERPRETED, EVENTS_HDFS_VIEW);
069
070    // Pipelines event only workflow
071    // 1
072    EVENT_WF_GRAPH.addNode(DWCA_TO_VERBATIM, EVENTS_VERBATIM_TO_INTERPRETED);
073    // 2
074    EVENT_WF_GRAPH.addNode(EVENTS_VERBATIM_TO_INTERPRETED, EVENTS_INTERPRETED_TO_INDEX);
075    EVENT_WF_GRAPH.addNode(EVENTS_VERBATIM_TO_INTERPRETED, EVENTS_HDFS_VIEW);
076
077    // Pipelines validator workflow
078    // 1
079    VALIDATON_WF_GRAPH.addNode(VALIDATOR_UPLOAD_ARCHIVE, VALIDATOR_VALIDATE_ARCHIVE);
080    // 2
081    VALIDATON_WF_GRAPH.addNode(VALIDATOR_VALIDATE_ARCHIVE, VALIDATOR_DWCA_TO_VERBATIM);
082    VALIDATON_WF_GRAPH.addNode(VALIDATOR_VALIDATE_ARCHIVE, VALIDATOR_XML_TO_VERBATIM);
083    VALIDATON_WF_GRAPH.addNode(VALIDATOR_VALIDATE_ARCHIVE, VALIDATOR_ABCD_TO_VERBATIM);
084    VALIDATON_WF_GRAPH.addNode(VALIDATOR_VALIDATE_ARCHIVE, VALIDATOR_TABULAR_TO_VERBATIM);
085    // 3
086    VALIDATON_WF_GRAPH.addNode(VALIDATOR_DWCA_TO_VERBATIM, VALIDATOR_VERBATIM_TO_INTERPRETED);
087    VALIDATON_WF_GRAPH.addNode(VALIDATOR_XML_TO_VERBATIM, VALIDATOR_VERBATIM_TO_INTERPRETED);
088    VALIDATON_WF_GRAPH.addNode(VALIDATOR_ABCD_TO_VERBATIM, VALIDATOR_VERBATIM_TO_INTERPRETED);
089    VALIDATON_WF_GRAPH.addNode(VALIDATOR_TABULAR_TO_VERBATIM, VALIDATOR_VERBATIM_TO_INTERPRETED);
090    // 4
091    VALIDATON_WF_GRAPH.addNode(VALIDATOR_VERBATIM_TO_INTERPRETED, VALIDATOR_INTERPRETED_TO_INDEX);
092    // 5
093    VALIDATON_WF_GRAPH.addNode(VALIDATOR_INTERPRETED_TO_INDEX, VALIDATOR_COLLECT_METRICS);
094  }
095
096  public static Graph<StepType> getOccurrenceWorkflow() {
097    return OCCURRENCE_WF_GRAPH;
098  }
099
100  public static Graph<StepType> getEventOccurrenceWorkflow() {
101    return EVENT_OCCURRENCE_WF_GRAPH;
102  }
103
104  public static Graph<StepType> getEventWorkflow() {
105    return EVENT_WF_GRAPH;
106  }
107
108  public static Graph<StepType> getValidatorWorkflow() {
109    return VALIDATON_WF_GRAPH;
110  }
111
112  public static Graph<StepType> getWorkflow(boolean containsOccurrences, boolean containsEvents) {
113    if(containsOccurrences && containsEvents){
114      return getEventOccurrenceWorkflow();
115    } else if(containsOccurrences){
116      return getOccurrenceWorkflow();
117    } else if (containsEvents){
118      return getEventWorkflow();
119    }
120    return new Graph<>();
121  }
122
123  public static class Graph<T> {
124
125    public class Edge {
126      private final T node;
127
128      public Edge(T node) {
129        this.node = node;
130      }
131
132      public T getNode() {
133        return node;
134      }
135    }
136
137    private final Map<T, LinkedList<Edge>> nodes = new HashMap<>();
138
139    private final Map<T, Integer> levels = new HashMap<>();
140
141    private final ToIntFunction<T> calculateLevelFn = t -> levels.get(t) != null ? levels.get(t) + 1 : 1;
142
143    private final ToIntFunction<T> findLevelFn = t -> nodes.values()
144      .stream()
145      .flatMap(Collection::stream)
146      .filter(x->x.getNode().equals(t))
147      .findAny()
148      .map(x->levels.get(x.getNode()))
149      .orElse(1);
150
151    public int getNodesQuantity() {
152      return nodes.size();
153    }
154
155    public List<Edge> getNodeEdges(T node) {
156      return nodes.get(node);
157    }
158
159    public Set<T> getAllNodes() {
160      return nodes.keySet();
161    }
162
163    public Set<T> getAllNodesFor(Set<T> fromTypesSet) {
164      return fromTypesSet.stream()
165        .map(ft -> bfs(this, ft))
166        .flatMap(Collection::stream)
167        .collect(Collectors.toSet());
168    }
169
170    public int getLevel(T t){
171      return levels.get(t);
172    }
173
174    public Set<T> getRootNodesFor(Set<T> fromTypesSet) {
175      Map<T, Set<T>> map = fromTypesSet.stream()
176        .collect(Collectors.toMap(Function.identity(), ft -> bfs(this, ft)));
177
178      Set<T> result = new LinkedHashSet<>();
179
180      fromTypesSet.forEach(ts -> {
181        result.add(ts);
182        map.forEach((key, value) -> {
183          if (key != ts && value.contains(ts)) {
184            result.remove(ts);
185          }
186        });
187      });
188
189      return result;
190    }
191
192    private void addNode(T fromNode, T toNode) {
193      if (nodes.containsKey(fromNode)) {
194        LinkedList<Edge> edges = nodes.get(fromNode);
195
196        int order;
197        if (edges.isEmpty()) {
198          order = calculateLevelFn.applyAsInt(fromNode);
199        } else {
200          order = levels.get(edges.getLast().getNode());
201        }
202
203        edges.add(new Edge(toNode));
204        levels.put(toNode, order);
205      } else {
206        int order = calculateLevelFn.applyAsInt(fromNode);
207        nodes.put(fromNode, new LinkedList<>(Collections.singletonList(new Edge(toNode))));
208        levels.put(fromNode, order);
209        levels.put(toNode, order + 1);
210      }
211
212      nodes.computeIfAbsent(toNode, n -> new LinkedList<>(Collections.emptyList()));
213      levels.put(toNode, findLevelFn.applyAsInt(toNode));
214    }
215
216    private static <T> Set<T> bfs(Graph<T> graph, T startNode) {
217
218      Set<T> resultSet = new HashSet<>();
219      Set<T> visitedNodes = new HashSet<>(graph.getNodesQuantity());
220      Queue<T> queue = new LinkedList<>();
221      T currentNode;
222
223      visitedNodes.add(startNode);
224      queue.add(startNode);
225
226      while (!queue.isEmpty()) {
227        currentNode = queue.poll();
228        resultSet.add(currentNode);
229
230        graph.getNodeEdges(currentNode).forEach(edge -> {
231          T node = edge.getNode();
232          if (visitedNodes.contains(node)) {
233            return;
234          }
235          visitedNodes.add(node);
236          queue.add(node);
237        });
238      }
239
240      return resultSet;
241    }
242
243  }
244}