001/*
002 * Licensed under the Apache License, Version 2.0 (the "License");
003 * you may not use this file except in compliance with the License.
004 * You may obtain a copy of the License at
005 *
006 *     http://www.apache.org/licenses/LICENSE-2.0
007 *
008 * Unless required by applicable law or agreed to in writing, software
009 * distributed under the License is distributed on an "AS IS" BASIS,
010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011 * See the License for the specific language governing permissions and
012 * limitations under the License.
013 */
014package org.gbif.api.model.pipelines;
015
016import java.util.Collection;
017import java.util.Collections;
018import java.util.HashMap;
019import java.util.HashSet;
020import java.util.LinkedHashSet;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.Map;
024import java.util.Queue;
025import java.util.Set;
026import java.util.function.Function;
027import java.util.function.ToIntFunction;
028import java.util.stream.Collectors;
029
030import static org.gbif.api.model.pipelines.StepType.*;
031
032public class PipelinesWorkflow {
033
034  private PipelinesWorkflow() {
035    // NOP
036  }
037
038  private static final Graph<StepType> OCCURRENCE_WF_GRAPH = new Graph<>();
039  private static final Graph<StepType> EVENT_OCCURRENCE_WF_GRAPH = new Graph<>();
040  private static final Graph<StepType> EVENT_WF_GRAPH = new Graph<>();
041  private static final Graph<StepType> VALIDATON_WF_GRAPH = new Graph<>();
042
043  static {
044    // Pipelines occurrence workflow
045    // 0?
046    OCCURRENCE_WF_GRAPH.addNode(NFS_TO_HDFS, DWCDP_TO_VERBATIM);
047    // 1
048    OCCURRENCE_WF_GRAPH.addNode(DWCDP_TO_VERBATIM, VERBATIM_TO_IDENTIFIER);
049    OCCURRENCE_WF_GRAPH.addNode(DWCA_TO_VERBATIM, VERBATIM_TO_IDENTIFIER);
050    OCCURRENCE_WF_GRAPH.addNode(XML_TO_VERBATIM, VERBATIM_TO_IDENTIFIER);
051    OCCURRENCE_WF_GRAPH.addNode(ABCD_TO_VERBATIM, VERBATIM_TO_IDENTIFIER);
052    // 2
053    OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_IDENTIFIER, VERBATIM_TO_INTERPRETED);
054    // 3
055    OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, INTERPRETED_TO_INDEX);
056    OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, HDFS_VIEW);
057    OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, FRAGMENTER);
058
059    // Pipelines event-occurrence workflow
060    // 0?
061    EVENT_OCCURRENCE_WF_GRAPH.addNode(NFS_TO_HDFS, DWCDP_TO_VERBATIM);
062    // 1
063    EVENT_OCCURRENCE_WF_GRAPH.addNode(DWCDP_TO_VERBATIM, VERBATIM_TO_IDENTIFIER);
064    EVENT_OCCURRENCE_WF_GRAPH.addNode(DWCA_TO_VERBATIM, VERBATIM_TO_IDENTIFIER);
065    // 2
066    EVENT_OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_IDENTIFIER, VERBATIM_TO_INTERPRETED);
067    // 3
068    EVENT_OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, INTERPRETED_TO_INDEX);
069    EVENT_OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, HDFS_VIEW);
070    EVENT_OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, FRAGMENTER);
071    EVENT_OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, EVENTS_VERBATIM_TO_INTERPRETED);
072    // 4
073    EVENT_OCCURRENCE_WF_GRAPH.addNode(EVENTS_VERBATIM_TO_INTERPRETED, EVENTS_INTERPRETED_TO_INDEX);
074    EVENT_OCCURRENCE_WF_GRAPH.addNode(EVENTS_VERBATIM_TO_INTERPRETED, EVENTS_HDFS_VIEW);
075
076    // Pipelines event only workflow
077    // 0?
078    EVENT_WF_GRAPH.addNode(NFS_TO_HDFS, DWCDP_TO_VERBATIM);
079    // 1
080    EVENT_WF_GRAPH.addNode(DWCDP_TO_VERBATIM, EVENTS_VERBATIM_TO_INTERPRETED);
081    EVENT_WF_GRAPH.addNode(DWCA_TO_VERBATIM, EVENTS_VERBATIM_TO_INTERPRETED);
082    // 2
083    EVENT_WF_GRAPH.addNode(EVENTS_VERBATIM_TO_INTERPRETED, EVENTS_INTERPRETED_TO_INDEX);
084    EVENT_WF_GRAPH.addNode(EVENTS_VERBATIM_TO_INTERPRETED, EVENTS_HDFS_VIEW);
085
086    // Pipelines validator workflow
087    // 1
088    VALIDATON_WF_GRAPH.addNode(VALIDATOR_UPLOAD_ARCHIVE, VALIDATOR_VALIDATE_ARCHIVE);
089    // 2
090    VALIDATON_WF_GRAPH.addNode(VALIDATOR_VALIDATE_ARCHIVE, VALIDATOR_DWCA_TO_VERBATIM);
091    VALIDATON_WF_GRAPH.addNode(VALIDATOR_VALIDATE_ARCHIVE, VALIDATOR_XML_TO_VERBATIM);
092    VALIDATON_WF_GRAPH.addNode(VALIDATOR_VALIDATE_ARCHIVE, VALIDATOR_ABCD_TO_VERBATIM);
093    VALIDATON_WF_GRAPH.addNode(VALIDATOR_VALIDATE_ARCHIVE, VALIDATOR_TABULAR_TO_VERBATIM);
094    // 3
095    VALIDATON_WF_GRAPH.addNode(VALIDATOR_DWCA_TO_VERBATIM, VALIDATOR_VERBATIM_TO_INTERPRETED);
096    VALIDATON_WF_GRAPH.addNode(VALIDATOR_XML_TO_VERBATIM, VALIDATOR_VERBATIM_TO_INTERPRETED);
097    VALIDATON_WF_GRAPH.addNode(VALIDATOR_ABCD_TO_VERBATIM, VALIDATOR_VERBATIM_TO_INTERPRETED);
098    VALIDATON_WF_GRAPH.addNode(VALIDATOR_TABULAR_TO_VERBATIM, VALIDATOR_VERBATIM_TO_INTERPRETED);
099    // 4
100    VALIDATON_WF_GRAPH.addNode(VALIDATOR_VERBATIM_TO_INTERPRETED, VALIDATOR_INTERPRETED_TO_INDEX);
101    // 5
102    VALIDATON_WF_GRAPH.addNode(VALIDATOR_INTERPRETED_TO_INDEX, VALIDATOR_COLLECT_METRICS);
103  }
104
105  public static Graph<StepType> getOccurrenceWorkflow() {
106    return OCCURRENCE_WF_GRAPH;
107  }
108
109  public static Graph<StepType> getEventOccurrenceWorkflow() {
110    return EVENT_OCCURRENCE_WF_GRAPH;
111  }
112
113  public static Graph<StepType> getEventWorkflow() {
114    return EVENT_WF_GRAPH;
115  }
116
117  public static Graph<StepType> getValidatorWorkflow() {
118    return VALIDATON_WF_GRAPH;
119  }
120
121  public static Graph<StepType> getWorkflow(boolean containsOccurrences, boolean containsEvents) {
122    if(containsOccurrences && containsEvents){
123      return getEventOccurrenceWorkflow();
124    } else if(containsOccurrences){
125      return getOccurrenceWorkflow();
126    } else if (containsEvents){
127      return getEventWorkflow();
128    }
129    return new Graph<>();
130  }
131
132  public static class Graph<T> {
133
134    public class Edge {
135      private final T node;
136
137      public Edge(T node) {
138        this.node = node;
139      }
140
141      public T getNode() {
142        return node;
143      }
144    }
145
146    private final Map<T, LinkedList<Edge>> nodes = new HashMap<>();
147
148    private final Map<T, Integer> levels = new HashMap<>();
149
150    private final ToIntFunction<T> calculateLevelFn = t -> levels.get(t) != null ? levels.get(t) + 1 : 1;
151
152    private final ToIntFunction<T> findLevelFn = t -> nodes.values()
153      .stream()
154      .flatMap(Collection::stream)
155      .filter(x->x.getNode().equals(t))
156      .findAny()
157      .map(x->levels.get(x.getNode()))
158      .orElse(1);
159
160    public int getNodesQuantity() {
161      return nodes.size();
162    }
163
164    public List<Edge> getNodeEdges(T node) {
165      return nodes.get(node);
166    }
167
168    public Set<T> getAllNodes() {
169      return nodes.keySet();
170    }
171
172    public Set<T> getAllNodesFor(Set<T> fromTypesSet) {
173      return fromTypesSet.stream()
174        .map(ft -> bfs(this, ft))
175        .flatMap(Collection::stream)
176        .collect(Collectors.toSet());
177    }
178
179    /**
180     * Returns the depth level of a node in the workflow graph, calculated relative to the insertion
181     * order of edges in the static graph initializer.
182     *
183     * <p><b>Deprecated:</b> This method is fundamentally unreliable for graphs with multiple entry
184     * points, as there is no globally meaningful notion of depth in a directed acyclic graph with
185     * more than one root. The level of a node is only well-defined relative to a specific starting
186     * node, and will produce incorrect or inconsistent results when new entry points are added to
187     * the graph.
188     *
189     * <p>Use {@link Graph#getRootNodesFor(Set)} to determine which steps should be triggered first
190     * from a given set of requested steps. For execution history and step ordering, prefer deriving
191     * state from the persisted execution record in the registry rather than from static level
192     * arithmetic.
193     *
194     * @param t the node to look up
195     * @return the computed depth level of the node, unreliable in multi-root graphs
196     */
197    @Deprecated
198    public int getLevel(T t) {
199      return this.levels.get(t);
200    }
201
202    public Set<T> getRootNodesFor(Set<T> fromTypesSet) {
203      Map<T, Set<T>> map = fromTypesSet.stream()
204        .collect(Collectors.toMap(Function.identity(), ft -> bfs(this, ft)));
205
206      Set<T> result = new LinkedHashSet<>();
207
208      fromTypesSet.forEach(ts -> {
209        result.add(ts);
210        map.forEach((key, value) -> {
211          if (key != ts && value.contains(ts)) {
212            result.remove(ts);
213          }
214        });
215      });
216
217      return result;
218    }
219
220    private void addNode(T fromNode, T toNode) {
221      if (nodes.containsKey(fromNode)) {
222        LinkedList<Edge> edges = nodes.get(fromNode);
223
224        int order;
225        if (edges.isEmpty()) {
226          order = calculateLevelFn.applyAsInt(fromNode);
227        } else {
228          order = levels.get(edges.getLast().getNode());
229        }
230
231        edges.add(new Edge(toNode));
232        levels.put(toNode, order);
233      } else {
234        int order = calculateLevelFn.applyAsInt(fromNode);
235        nodes.put(fromNode, new LinkedList<>(Collections.singletonList(new Edge(toNode))));
236        levels.put(fromNode, order);
237        levels.put(toNode, order + 1);
238      }
239
240      nodes.computeIfAbsent(toNode, n -> new LinkedList<>(Collections.emptyList()));
241      levels.put(toNode, findLevelFn.applyAsInt(toNode));
242    }
243
244    private static <T> Set<T> bfs(Graph<T> graph, T startNode) {
245
246      Set<T> resultSet = new HashSet<>();
247      Set<T> visitedNodes = new HashSet<>(graph.getNodesQuantity());
248      Queue<T> queue = new LinkedList<>();
249      T currentNode;
250
251      visitedNodes.add(startNode);
252      queue.add(startNode);
253
254      while (!queue.isEmpty()) {
255        currentNode = queue.poll();
256        resultSet.add(currentNode);
257
258        graph.getNodeEdges(currentNode).forEach(edge -> {
259          T node = edge.getNode();
260          if (visitedNodes.contains(node)) {
261            return;
262          }
263          visitedNodes.add(node);
264          queue.add(node);
265        });
266      }
267
268      return resultSet;
269    }
270
271  }
272}