001/* 002 * Licensed under the Apache License, Version 2.0 (the "License"); 003 * you may not use this file except in compliance with the License. 004 * You may obtain a copy of the License at 005 * 006 * http://www.apache.org/licenses/LICENSE-2.0 007 * 008 * Unless required by applicable law or agreed to in writing, software 009 * distributed under the License is distributed on an "AS IS" BASIS, 010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 011 * See the License for the specific language governing permissions and 012 * limitations under the License. 013 */ 014package org.gbif.api.model.pipelines; 015 016import java.util.Collection; 017import java.util.Collections; 018import java.util.HashMap; 019import java.util.HashSet; 020import java.util.LinkedHashSet; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.Map; 024import java.util.Queue; 025import java.util.Set; 026import java.util.function.Function; 027import java.util.function.ToIntFunction; 028import java.util.stream.Collectors; 029 030import static org.gbif.api.model.pipelines.StepType.*; 031 032public class PipelinesWorkflow { 033 034 private PipelinesWorkflow() { 035 // NOP 036 } 037 038 private static final Graph<StepType> OCCURRENCE_WF_GRAPH = new Graph<>(); 039 private static final Graph<StepType> EVENT_OCCURRENCE_WF_GRAPH = new Graph<>(); 040 private static final Graph<StepType> EVENT_WF_GRAPH = new Graph<>(); 041 private static final Graph<StepType> VALIDATON_WF_GRAPH = new Graph<>(); 042 043 static { 044 // Pipelines occurrence workflow 045 // 0? 046 OCCURRENCE_WF_GRAPH.addNode(NFS_TO_HDFS, DWCDP_TO_VERBATIM); 047 // 1 048 OCCURRENCE_WF_GRAPH.addNode(DWCDP_TO_VERBATIM, VERBATIM_TO_IDENTIFIER); 049 OCCURRENCE_WF_GRAPH.addNode(DWCA_TO_VERBATIM, VERBATIM_TO_IDENTIFIER); 050 OCCURRENCE_WF_GRAPH.addNode(XML_TO_VERBATIM, VERBATIM_TO_IDENTIFIER); 051 OCCURRENCE_WF_GRAPH.addNode(ABCD_TO_VERBATIM, VERBATIM_TO_IDENTIFIER); 052 // 2 053 OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_IDENTIFIER, VERBATIM_TO_INTERPRETED); 054 // 3 055 OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, INTERPRETED_TO_INDEX); 056 OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, HDFS_VIEW); 057 OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, FRAGMENTER); 058 059 // Pipelines event-occurrence workflow 060 // 0? 061 EVENT_OCCURRENCE_WF_GRAPH.addNode(NFS_TO_HDFS, DWCDP_TO_VERBATIM); 062 // 1 063 EVENT_OCCURRENCE_WF_GRAPH.addNode(DWCDP_TO_VERBATIM, VERBATIM_TO_IDENTIFIER); 064 EVENT_OCCURRENCE_WF_GRAPH.addNode(DWCA_TO_VERBATIM, VERBATIM_TO_IDENTIFIER); 065 // 2 066 EVENT_OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_IDENTIFIER, VERBATIM_TO_INTERPRETED); 067 // 3 068 EVENT_OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, INTERPRETED_TO_INDEX); 069 EVENT_OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, HDFS_VIEW); 070 EVENT_OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, FRAGMENTER); 071 EVENT_OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, EVENTS_VERBATIM_TO_INTERPRETED); 072 // 4 073 EVENT_OCCURRENCE_WF_GRAPH.addNode(EVENTS_VERBATIM_TO_INTERPRETED, EVENTS_INTERPRETED_TO_INDEX); 074 EVENT_OCCURRENCE_WF_GRAPH.addNode(EVENTS_VERBATIM_TO_INTERPRETED, EVENTS_HDFS_VIEW); 075 076 // Pipelines event only workflow 077 // 0? 078 EVENT_WF_GRAPH.addNode(NFS_TO_HDFS, DWCDP_TO_VERBATIM); 079 // 1 080 EVENT_WF_GRAPH.addNode(DWCDP_TO_VERBATIM, EVENTS_VERBATIM_TO_INTERPRETED); 081 EVENT_WF_GRAPH.addNode(DWCA_TO_VERBATIM, EVENTS_VERBATIM_TO_INTERPRETED); 082 // 2 083 EVENT_WF_GRAPH.addNode(EVENTS_VERBATIM_TO_INTERPRETED, EVENTS_INTERPRETED_TO_INDEX); 084 EVENT_WF_GRAPH.addNode(EVENTS_VERBATIM_TO_INTERPRETED, EVENTS_HDFS_VIEW); 085 086 // Pipelines validator workflow 087 // 1 088 VALIDATON_WF_GRAPH.addNode(VALIDATOR_UPLOAD_ARCHIVE, VALIDATOR_VALIDATE_ARCHIVE); 089 // 2 090 VALIDATON_WF_GRAPH.addNode(VALIDATOR_VALIDATE_ARCHIVE, VALIDATOR_DWCA_TO_VERBATIM); 091 VALIDATON_WF_GRAPH.addNode(VALIDATOR_VALIDATE_ARCHIVE, VALIDATOR_XML_TO_VERBATIM); 092 VALIDATON_WF_GRAPH.addNode(VALIDATOR_VALIDATE_ARCHIVE, VALIDATOR_ABCD_TO_VERBATIM); 093 VALIDATON_WF_GRAPH.addNode(VALIDATOR_VALIDATE_ARCHIVE, VALIDATOR_TABULAR_TO_VERBATIM); 094 // 3 095 VALIDATON_WF_GRAPH.addNode(VALIDATOR_DWCA_TO_VERBATIM, VALIDATOR_VERBATIM_TO_INTERPRETED); 096 VALIDATON_WF_GRAPH.addNode(VALIDATOR_XML_TO_VERBATIM, VALIDATOR_VERBATIM_TO_INTERPRETED); 097 VALIDATON_WF_GRAPH.addNode(VALIDATOR_ABCD_TO_VERBATIM, VALIDATOR_VERBATIM_TO_INTERPRETED); 098 VALIDATON_WF_GRAPH.addNode(VALIDATOR_TABULAR_TO_VERBATIM, VALIDATOR_VERBATIM_TO_INTERPRETED); 099 // 4 100 VALIDATON_WF_GRAPH.addNode(VALIDATOR_VERBATIM_TO_INTERPRETED, VALIDATOR_INTERPRETED_TO_INDEX); 101 // 5 102 VALIDATON_WF_GRAPH.addNode(VALIDATOR_INTERPRETED_TO_INDEX, VALIDATOR_COLLECT_METRICS); 103 } 104 105 public static Graph<StepType> getOccurrenceWorkflow() { 106 return OCCURRENCE_WF_GRAPH; 107 } 108 109 public static Graph<StepType> getEventOccurrenceWorkflow() { 110 return EVENT_OCCURRENCE_WF_GRAPH; 111 } 112 113 public static Graph<StepType> getEventWorkflow() { 114 return EVENT_WF_GRAPH; 115 } 116 117 public static Graph<StepType> getValidatorWorkflow() { 118 return VALIDATON_WF_GRAPH; 119 } 120 121 public static Graph<StepType> getWorkflow(boolean containsOccurrences, boolean containsEvents) { 122 if(containsOccurrences && containsEvents){ 123 return getEventOccurrenceWorkflow(); 124 } else if(containsOccurrences){ 125 return getOccurrenceWorkflow(); 126 } else if (containsEvents){ 127 return getEventWorkflow(); 128 } 129 return new Graph<>(); 130 } 131 132 public static class Graph<T> { 133 134 public class Edge { 135 private final T node; 136 137 public Edge(T node) { 138 this.node = node; 139 } 140 141 public T getNode() { 142 return node; 143 } 144 } 145 146 private final Map<T, LinkedList<Edge>> nodes = new HashMap<>(); 147 148 private final Map<T, Integer> levels = new HashMap<>(); 149 150 private final ToIntFunction<T> calculateLevelFn = t -> levels.get(t) != null ? levels.get(t) + 1 : 1; 151 152 private final ToIntFunction<T> findLevelFn = t -> nodes.values() 153 .stream() 154 .flatMap(Collection::stream) 155 .filter(x->x.getNode().equals(t)) 156 .findAny() 157 .map(x->levels.get(x.getNode())) 158 .orElse(1); 159 160 public int getNodesQuantity() { 161 return nodes.size(); 162 } 163 164 public List<Edge> getNodeEdges(T node) { 165 return nodes.get(node); 166 } 167 168 public Set<T> getAllNodes() { 169 return nodes.keySet(); 170 } 171 172 public Set<T> getAllNodesFor(Set<T> fromTypesSet) { 173 return fromTypesSet.stream() 174 .map(ft -> bfs(this, ft)) 175 .flatMap(Collection::stream) 176 .collect(Collectors.toSet()); 177 } 178 179 /** 180 * Returns the depth level of a node in the workflow graph, calculated relative to the insertion 181 * order of edges in the static graph initializer. 182 * 183 * <p><b>Deprecated:</b> This method is fundamentally unreliable for graphs with multiple entry 184 * points, as there is no globally meaningful notion of depth in a directed acyclic graph with 185 * more than one root. The level of a node is only well-defined relative to a specific starting 186 * node, and will produce incorrect or inconsistent results when new entry points are added to 187 * the graph. 188 * 189 * <p>Use {@link Graph#getRootNodesFor(Set)} to determine which steps should be triggered first 190 * from a given set of requested steps. For execution history and step ordering, prefer deriving 191 * state from the persisted execution record in the registry rather than from static level 192 * arithmetic. 193 * 194 * @param t the node to look up 195 * @return the computed depth level of the node, unreliable in multi-root graphs 196 */ 197 @Deprecated 198 public int getLevel(T t) { 199 return this.levels.get(t); 200 } 201 202 public Set<T> getRootNodesFor(Set<T> fromTypesSet) { 203 Map<T, Set<T>> map = fromTypesSet.stream() 204 .collect(Collectors.toMap(Function.identity(), ft -> bfs(this, ft))); 205 206 Set<T> result = new LinkedHashSet<>(); 207 208 fromTypesSet.forEach(ts -> { 209 result.add(ts); 210 map.forEach((key, value) -> { 211 if (key != ts && value.contains(ts)) { 212 result.remove(ts); 213 } 214 }); 215 }); 216 217 return result; 218 } 219 220 private void addNode(T fromNode, T toNode) { 221 if (nodes.containsKey(fromNode)) { 222 LinkedList<Edge> edges = nodes.get(fromNode); 223 224 int order; 225 if (edges.isEmpty()) { 226 order = calculateLevelFn.applyAsInt(fromNode); 227 } else { 228 order = levels.get(edges.getLast().getNode()); 229 } 230 231 edges.add(new Edge(toNode)); 232 levels.put(toNode, order); 233 } else { 234 int order = calculateLevelFn.applyAsInt(fromNode); 235 nodes.put(fromNode, new LinkedList<>(Collections.singletonList(new Edge(toNode)))); 236 levels.put(fromNode, order); 237 levels.put(toNode, order + 1); 238 } 239 240 nodes.computeIfAbsent(toNode, n -> new LinkedList<>(Collections.emptyList())); 241 levels.put(toNode, findLevelFn.applyAsInt(toNode)); 242 } 243 244 private static <T> Set<T> bfs(Graph<T> graph, T startNode) { 245 246 Set<T> resultSet = new HashSet<>(); 247 Set<T> visitedNodes = new HashSet<>(graph.getNodesQuantity()); 248 Queue<T> queue = new LinkedList<>(); 249 T currentNode; 250 251 visitedNodes.add(startNode); 252 queue.add(startNode); 253 254 while (!queue.isEmpty()) { 255 currentNode = queue.poll(); 256 resultSet.add(currentNode); 257 258 graph.getNodeEdges(currentNode).forEach(edge -> { 259 T node = edge.getNode(); 260 if (visitedNodes.contains(node)) { 261 return; 262 } 263 visitedNodes.add(node); 264 queue.add(node); 265 }); 266 } 267 268 return resultSet; 269 } 270 271 } 272}