001/* 002 * Licensed under the Apache License, Version 2.0 (the "License"); 003 * you may not use this file except in compliance with the License. 004 * You may obtain a copy of the License at 005 * 006 * http://www.apache.org/licenses/LICENSE-2.0 007 * 008 * Unless required by applicable law or agreed to in writing, software 009 * distributed under the License is distributed on an "AS IS" BASIS, 010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 011 * See the License for the specific language governing permissions and 012 * limitations under the License. 013 */ 014package org.gbif.api.model.pipelines; 015 016import java.util.Collection; 017import java.util.Collections; 018import java.util.HashMap; 019import java.util.HashSet; 020import java.util.LinkedHashSet; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.Map; 024import java.util.Queue; 025import java.util.Set; 026import java.util.function.Function; 027import java.util.function.ToIntFunction; 028import java.util.stream.Collectors; 029 030import static org.gbif.api.model.pipelines.StepType.*; 031 032public class PipelinesWorkflow { 033 034 private PipelinesWorkflow() { 035 // NOP 036 } 037 038 private static final Graph<StepType> OCCURRENCE_WF_GRAPH = new Graph<>(); 039 private static final Graph<StepType> EVENT_OCCURRENCE_WF_GRAPH = new Graph<>(); 040 private static final Graph<StepType> EVENT_WF_GRAPH = new Graph<>(); 041 private static final Graph<StepType> VALIDATON_WF_GRAPH = new Graph<>(); 042 043 static { 044 // Pipelines occurrence workflow 045 // 1 046 OCCURRENCE_WF_GRAPH.addNode(DWCA_TO_VERBATIM, VERBATIM_TO_IDENTIFIER); 047 OCCURRENCE_WF_GRAPH.addNode(XML_TO_VERBATIM, VERBATIM_TO_IDENTIFIER); 048 OCCURRENCE_WF_GRAPH.addNode(ABCD_TO_VERBATIM, VERBATIM_TO_IDENTIFIER); 049 // 2 050 OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_IDENTIFIER, VERBATIM_TO_INTERPRETED); 051 // 3 052 OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, INTERPRETED_TO_INDEX); 053 OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, HDFS_VIEW); 054 OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, FRAGMENTER); 055 056 // Pipelines event-occurrence workflow 057 // 1 058 EVENT_OCCURRENCE_WF_GRAPH.addNode(DWCA_TO_VERBATIM, VERBATIM_TO_IDENTIFIER); 059 // 2 060 EVENT_OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_IDENTIFIER, VERBATIM_TO_INTERPRETED); 061 // 3 062 EVENT_OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, INTERPRETED_TO_INDEX); 063 EVENT_OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, HDFS_VIEW); 064 EVENT_OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, FRAGMENTER); 065 EVENT_OCCURRENCE_WF_GRAPH.addNode(VERBATIM_TO_INTERPRETED, EVENTS_VERBATIM_TO_INTERPRETED); 066 // 4 067 EVENT_OCCURRENCE_WF_GRAPH.addNode(EVENTS_VERBATIM_TO_INTERPRETED, EVENTS_INTERPRETED_TO_INDEX); 068 EVENT_OCCURRENCE_WF_GRAPH.addNode(EVENTS_VERBATIM_TO_INTERPRETED, EVENTS_HDFS_VIEW); 069 070 // Pipelines event only workflow 071 // 1 072 EVENT_WF_GRAPH.addNode(DWCA_TO_VERBATIM, EVENTS_VERBATIM_TO_INTERPRETED); 073 // 2 074 EVENT_WF_GRAPH.addNode(EVENTS_VERBATIM_TO_INTERPRETED, EVENTS_INTERPRETED_TO_INDEX); 075 EVENT_WF_GRAPH.addNode(EVENTS_VERBATIM_TO_INTERPRETED, EVENTS_HDFS_VIEW); 076 077 // Pipelines validator workflow 078 // 1 079 VALIDATON_WF_GRAPH.addNode(VALIDATOR_UPLOAD_ARCHIVE, VALIDATOR_VALIDATE_ARCHIVE); 080 // 2 081 VALIDATON_WF_GRAPH.addNode(VALIDATOR_VALIDATE_ARCHIVE, VALIDATOR_DWCA_TO_VERBATIM); 082 VALIDATON_WF_GRAPH.addNode(VALIDATOR_VALIDATE_ARCHIVE, VALIDATOR_XML_TO_VERBATIM); 083 VALIDATON_WF_GRAPH.addNode(VALIDATOR_VALIDATE_ARCHIVE, VALIDATOR_ABCD_TO_VERBATIM); 084 VALIDATON_WF_GRAPH.addNode(VALIDATOR_VALIDATE_ARCHIVE, VALIDATOR_TABULAR_TO_VERBATIM); 085 // 3 086 VALIDATON_WF_GRAPH.addNode(VALIDATOR_DWCA_TO_VERBATIM, VALIDATOR_VERBATIM_TO_INTERPRETED); 087 VALIDATON_WF_GRAPH.addNode(VALIDATOR_XML_TO_VERBATIM, VALIDATOR_VERBATIM_TO_INTERPRETED); 088 VALIDATON_WF_GRAPH.addNode(VALIDATOR_ABCD_TO_VERBATIM, VALIDATOR_VERBATIM_TO_INTERPRETED); 089 VALIDATON_WF_GRAPH.addNode(VALIDATOR_TABULAR_TO_VERBATIM, VALIDATOR_VERBATIM_TO_INTERPRETED); 090 // 4 091 VALIDATON_WF_GRAPH.addNode(VALIDATOR_VERBATIM_TO_INTERPRETED, VALIDATOR_INTERPRETED_TO_INDEX); 092 // 5 093 VALIDATON_WF_GRAPH.addNode(VALIDATOR_INTERPRETED_TO_INDEX, VALIDATOR_COLLECT_METRICS); 094 } 095 096 public static Graph<StepType> getOccurrenceWorkflow() { 097 return OCCURRENCE_WF_GRAPH; 098 } 099 100 public static Graph<StepType> getEventOccurrenceWorkflow() { 101 return EVENT_OCCURRENCE_WF_GRAPH; 102 } 103 104 public static Graph<StepType> getEventWorkflow() { 105 return EVENT_WF_GRAPH; 106 } 107 108 public static Graph<StepType> getValidatorWorkflow() { 109 return VALIDATON_WF_GRAPH; 110 } 111 112 public static Graph<StepType> getWorkflow(boolean containsOccurrences, boolean containsEvents) { 113 if(containsOccurrences && containsEvents){ 114 return getEventOccurrenceWorkflow(); 115 } else if(containsOccurrences){ 116 return getOccurrenceWorkflow(); 117 } else if (containsEvents){ 118 return getEventWorkflow(); 119 } 120 return new Graph<>(); 121 } 122 123 public static class Graph<T> { 124 125 public class Edge { 126 private final T node; 127 128 public Edge(T node) { 129 this.node = node; 130 } 131 132 public T getNode() { 133 return node; 134 } 135 } 136 137 private final Map<T, LinkedList<Edge>> nodes = new HashMap<>(); 138 139 private final Map<T, Integer> levels = new HashMap<>(); 140 141 private final ToIntFunction<T> calculateLevelFn = t -> levels.get(t) != null ? levels.get(t) + 1 : 1; 142 143 private final ToIntFunction<T> findLevelFn = t -> nodes.values() 144 .stream() 145 .flatMap(Collection::stream) 146 .filter(x->x.getNode().equals(t)) 147 .findAny() 148 .map(x->levels.get(x.getNode())) 149 .orElse(1); 150 151 public int getNodesQuantity() { 152 return nodes.size(); 153 } 154 155 public List<Edge> getNodeEdges(T node) { 156 return nodes.get(node); 157 } 158 159 public Set<T> getAllNodes() { 160 return nodes.keySet(); 161 } 162 163 public Set<T> getAllNodesFor(Set<T> fromTypesSet) { 164 return fromTypesSet.stream() 165 .map(ft -> bfs(this, ft)) 166 .flatMap(Collection::stream) 167 .collect(Collectors.toSet()); 168 } 169 170 public int getLevel(T t){ 171 return levels.get(t); 172 } 173 174 public Set<T> getRootNodesFor(Set<T> fromTypesSet) { 175 Map<T, Set<T>> map = fromTypesSet.stream() 176 .collect(Collectors.toMap(Function.identity(), ft -> bfs(this, ft))); 177 178 Set<T> result = new LinkedHashSet<>(); 179 180 fromTypesSet.forEach(ts -> { 181 result.add(ts); 182 map.forEach((key, value) -> { 183 if (key != ts && value.contains(ts)) { 184 result.remove(ts); 185 } 186 }); 187 }); 188 189 return result; 190 } 191 192 private void addNode(T fromNode, T toNode) { 193 if (nodes.containsKey(fromNode)) { 194 LinkedList<Edge> edges = nodes.get(fromNode); 195 196 int order; 197 if (edges.isEmpty()) { 198 order = calculateLevelFn.applyAsInt(fromNode); 199 } else { 200 order = levels.get(edges.getLast().getNode()); 201 } 202 203 edges.add(new Edge(toNode)); 204 levels.put(toNode, order); 205 } else { 206 int order = calculateLevelFn.applyAsInt(fromNode); 207 nodes.put(fromNode, new LinkedList<>(Collections.singletonList(new Edge(toNode)))); 208 levels.put(fromNode, order); 209 levels.put(toNode, order + 1); 210 } 211 212 nodes.computeIfAbsent(toNode, n -> new LinkedList<>(Collections.emptyList())); 213 levels.put(toNode, findLevelFn.applyAsInt(toNode)); 214 } 215 216 private static <T> Set<T> bfs(Graph<T> graph, T startNode) { 217 218 Set<T> resultSet = new HashSet<>(); 219 Set<T> visitedNodes = new HashSet<>(graph.getNodesQuantity()); 220 Queue<T> queue = new LinkedList<>(); 221 T currentNode; 222 223 visitedNodes.add(startNode); 224 queue.add(startNode); 225 226 while (!queue.isEmpty()) { 227 currentNode = queue.poll(); 228 resultSet.add(currentNode); 229 230 graph.getNodeEdges(currentNode).forEach(edge -> { 231 T node = edge.getNode(); 232 if (visitedNodes.contains(node)) { 233 return; 234 } 235 visitedNodes.add(node); 236 queue.add(node); 237 }); 238 } 239 240 return resultSet; 241 } 242 243 } 244}