getOutputsSearch for all output parameters. def getOutputs(): outputs = RA.ParameterAPI.select(lambda p: "OUTPUT" in p["role"]) return outputs Usage: getOutputs() getParamByNameSearch parameters based on name and return the object or objects matching that name. Returns an error if no parameter matching that name is found. def getParamByName(name): params = RA.ParameterAPI.select(lambda p: p["displayName"] == name) if(len(params) == 1): return params[0] elif(len(params) > 1): return params else: raise ValueError("Could not find parameter named '" + name +"'") Usage: Get the parameter named
yParam = getParamByName("y") getParamIdByNameGets the ID of a parameter for a specific parameter name. This returns an error if there is not exactly one parameter with the specified name. Required snippets: def getParamByName(name): params = RA.ParameterAPI.select(lambda p: p["displayName"] == name) if(len(params) == 1): return params[0] elif(len(params) > 1): return params else: raise ValueError("Could not find parameter named '" + name +"'") Usage: Get the parameter id for a parameter named xParamId = getParamByName("x") createOutputScatterGridCreate a scatter grid with all the output parameters. def createOutputScatterGrid(): outputs = RA.ParameterAPI.select(lambda p: "OUTPUT" in p["role"]) outputIds = list(map(lambda o: o["ID"], outputs)) RA.VisualsAPI.addScatterMatrix(outputIds); Usage: createOutputScatterGrid() setParameterRolesSet all input, output, and unknown roles based on parameter names. def setParameterRoles(inputParamNames=None, outputParamNames=None, unknownParamNames=None): #Note: all arguments for this function are optional. To only set e.g. output #parameters, use the function as setParameterRoles(outputParamNames=["x","y"]) rolesToSet = len(inputParamNames) + len(outputParamNames) if(unknownParamNames is not None): rolesToSet += len(unknownParamNames) rolesSet = 0 paramIter = RA.ParameterAPI.getIterator() while(RA.ParameterAPI.hasNext(paramIter) and (rolesSet < rolesToSet)): nextParam = RA.ParameterAPI.next(paramIter) if(inputParamNames is not None and nextParam["displayName"] in inputParamNames): RA.ParameterAPI.modifyParameterRole(nextParam["ID"], "INPUT") rolesSet += 1 if(outputParamNames is not None and nextParam["displayName"] in outputParamNames): RA.ParameterAPI.modifyParameterRole(nextParam["ID"], "OUTPUT") rolesSet += 1 if(unknownParamNames is not None and nextParam["displayName"] in unknownParamNames): RA.ParameterAPI.modifyParameterRole(nextParam["ID"], "UNKNOWN") rolesSet += 1 Usage: Set
setParameterRoles(["x","y"],["z"]) removeAllNaNRemove all Not-a-Number (NaN) values in a specified column from the data set. def removeAllNaN(columnId): #Note: the column specified by "columnId" should contain numeric data. setValuesDict = {} setValuesDict[columnId] = {} dataIter = RA.DataAPI.getRowIterator() while(RA.DataAPI.hasNext(dataIter)): nextRow = RA.DataAPI.next(dataIter) value = nextRow["values"][columnId] if(value == "NaN"): setValuesDict[columnId][nextRow["rowID"]] = "" RA.DataAPI.setValues(setValuesDict) Usage: Remove all NaN values for the parameter named xParamId = getParamIdByName("x") excludeIncompleteRows(xParamId) excludeIncompleteRowsExclude all rows with missing values. def excludeIncompleteRows(): allParams = RA.ParameterAPI.select() allParamIds = list(map(lambda p: p["ID"], allParams)) setValuesDict = {} for paramId in allParamIds: setValuesDict[paramId] = {} rowsToExclude = [] dataIter = RA.DataAPI.getRowIterator() while(RA.DataAPI.hasNext(dataIter)): nextRow = RA.DataAPI.next(dataIter) for paramId in allParamIds: paramVal = nextRow["values"][paramId] if paramVal == "" or paramVal == None: rowsToExclude.append(nextRow["rowID"]) break RA.DataAPI.excludeRows(rowsToExclude) Usage: excludeIncompleteRows() addDerivedColumnAdd an equation-based derived column to the data set. def addDerivedColumn(columnName, executable, paramIds): #Note: paramIds must be listed in the same order as the inputs to the #executable provided for columnFormula. newParam = RA.ParameterAPI.createParameter(name=columnName) newParamId = newParam["ID"] setValuesDict = {} setValuesDict[newParamId] = {} dataIter = RA.DataAPI.getRowIterator() while(RA.DataAPI.hasNext(dataIter)): nextRow = RA.DataAPI.next(dataIter) values = list(map(lambda p: nextRow["values"][p], paramIds)) setValuesDict[newParamId][nextRow["rowID"]] = executable(*values) RA.DataAPI.setValues(setValuesDict) return newParam Usage: Create a column named executable = lambda x, y: x+y paramIds = [getParamByName("x")["ID"], getParamByName("y")["ID"]] newParam = addDerivedColumn("SUM(x,y)", executable, paramIds) addTunedRSMCreate an RSM approximation with a def addTunedRSM(rsmName, outputId): import time approximations = [] unfinishedApproxIds = [] totalApproximations = 8 #First create a set of RSMs with a variety of polyOrder settings for i in range(totalApproximations): order = i+1 newApprox = RA.ApproximationAPI.create("Temp_RSM_"+str(order), "rsm", {"polyOrder": order}) #approximations.append(newApprox) unfinishedApproxIds.append(newApprox["id"]) #This helper function checks if an approximation is done def isFinished(approxId): approx = RA.ApproximationAPI.describeApproximation(approxId) status = approx["status"] if(status == "READY" or status == "NOT_AVAILABLE"): approximations.append(approx) return True return False iters = 0 #This loop will sleep for a second at a time until all of the approximations #are finished while(len(unfinishedApproxIds) > 0 and iters < 20): unfinishedApproxIds[:] = [approxId for approxId in unfinishedApproxIds if not isFinished(approxId)] iters = iters + 1; time.sleep(1) #Now find the approximation with the highest value for cross validation R^2. #This is used to measure goodness of fit. #If multiple approximations have the same R^2 value, we choose the first since #it has the lowest order polynomial. approxIds = list(map(lambda a: a["id"], approximations)) measures = RA.ApproximationAPI.errorMeasures(approxIds) cv_r2_vals = list(map(lambda id: measures[id][outputId]["cv_R2"], approxIds)) cvMax = 0 index = -1 for i in range(len(cv_r2_vals)): if(cv_r2_vals[i] > cvMax): cvMax = cv_r2_vals[i] index = i finalApprox=RA.ApproximationAPI.describeApproximation(approxIds[index]) for i in range(len(approxIds)): if(i == index): #Rename the best approximation RA.ApproximationAPI.train(approxIds[i], displayName=rsmName) else: #Delete the unused approximation RA.ApproximationAPI.deleteApproximations([approxIds[i]]) return finalApprox Usage: setParameterRoles(["x","y"],["z"]) addTunedRSM("Tuned RSM", getParamIdByName("z")) addPredictedColumnCreate a new column based on values generated from an approximation. def addPredictedColumn(approxId, outputId): approx = RA.ApproximationAPI.describeApproximation(approxId) outputParam = RA.ParameterAPI.select(lambda p: p["ID"] == outputId)[0] inputParams = RA.ParameterAPI.select(lambda p: p["role"] == "INPUT") newParamName = outputParam["displayName"] + " (" + approx["displayName"] + ")" #Create a parameter for the approximation data newParam = RA.ParameterAPI.createParameter(name=newParamName) newParamId = newParam["ID"] setValuesDict = {} setValuesDict[newParamId] = {} #Generate approximation data for every row in the data set dataIter = RA.DataAPI.getRowIterator() while(RA.DataAPI.hasNext(dataIter)): nextRow = RA.DataAPI.next(dataIter) values = {} for input in inputParams: values[input["ID"]] = nextRow["values"][input["ID"]] predValueObject = RA.ApproximationAPI.evaluate({ "approxIds":[approxId], "approxPoint": values }) setValuesDict[newParamId][nextRow["rowID"]] = predValueObject[approxId][outputId] #Enter all of the generated data into the data set RA.DataAPI.setValues(setValuesDict) return newParam Usage: setParameterRoles(["x","y"],["z"]) tunedApprox = addTunedRSM("Tuned RSM", getParamIdByName("z")) tunedApproxId = tunedApprox["id"] addPredictedColumn(tunedApproxId, getParamIdByName("z")) addPredictionPlotCreate a scatter plot showing actual data and predicted data. def addPredictionPlot(approxId, outputId, inputId): import plotly.graph_objects as go inputParams = RA.ParameterAPI.select(lambda p: p["role"] == "INPUT") inputVals = [] outputVals = [] predVals = [] #Similar to the addPredictedColumn snippet, we need to generate an array #of predicted data to plit dataIter = RA.DataAPI.getRowIterator() while(RA.DataAPI.hasNext(dataIter)): nextRow = RA.DataAPI.next(dataIter) values = {} for input in inputParams: values[input["ID"]] = nextRow["values"][input["ID"]] inputVals.append(nextRow["values"][inputId]) outputVals.append(nextRow["values"][outputId]) predObject = RA.ApproximationAPI.evaluate({ "approxIds":[approxId], "approxPoint": values }) predVals.append(predObject[approxId][outputId]) #Instead of creating a new column, we will use plotly to create a scatter #plot showing both the actual and predicted values. pFig = go.Figure() pFig.add_trace(go.Scatter(x=inputVals, y=outputVals, mode='markers', name='Actual')) pFig.add_trace(go.Scatter(x=inputVals, y=predVals, mode='markers', name='Predicted')) pVis = RA.VisualsAPI.addPlotlyVisual(pFig, title="Actual and Predicted") return pVis Usage: tunedApprox = addTunedRSM() tunedApproxId = tunedApprox["id"] inputId = getParamByName("x")["ID"] outputId = getParamByName("z2")["ID"] addPredictionPlot(tunedApproxId, outputId, inputId) plotCSVImport and visualize external data using Python in a new Visual Analytics tab. def plotCSV(fileName): import csv #This assumes that the csv file given is a CSV with numeric data. with open(fileName, newline='') as csvfile: reader = csv.reader(csvfile, skipinitialspace=True) firstRow = True columnNames = [] columns = [] #Read the CSV data into local variables. #The first row will be the parameter names for row in reader: if firstRow: firstRow = False columnNames[:] = row[:] columns = list(map(lambda x: [], columnNames)) else: for i in range(len(columnNames)): columns[i].append(row[i]) #Get all of the current param names. If we have param names already in #the dataset in the new file we'll overwrite the existing data instead of #creating a new column. currentParams = set(map(lambda p: p["displayName"], RA.ParameterAPI.select())) #Create new parameters. Set up the data structrue for sending the #new data to the server paramIds = [] setValuesDict = {} for name in columnNames: if name in currentParams: paramId = getParamIdByName("name") paramIds.append(paramId) setValuesDict[paramId] = {} else: newParam = RA.ParameterAPI.createParameter(name=name) paramIds.append(newParam["ID"]) setValuesDict[newParam["ID"]] = {} j=0 #Add data for all of the rows in the data set dataIter = RA.DataAPI.getRowIterator() while(RA.DataAPI.hasNext(dataIter)): nextRow = RA.DataAPI.next(dataIter) for i in range(len(paramIds)): setValuesDict[paramIds[i]][nextRow["rowID"]] = columns[i][j] j += 1 #Add additional rows as needed while j < len(columns[0]): newRowId = RA.DataAPI.addRow() for i in range(len(paramIds)): setValuesDict[paramIds[i]][newRowId] = columns[i][j] j += 1 #Set the new values RA.DataAPI.setValues(setValuesDict) RA.VisualsAPI.addTab("Imported Data") #Note: plotly could be used here for a multi-series scatter chart RA.VisualsAPI.addScatter2D(paramIds[0], paramIds[1]) Usage: plotCSV("more_data.csv") addObjectivesByNameAdd objectives based on parameter name. Required snippets: def addObjectivesByName(namesToMaximize=None, namesToMinimize=None): for name in namesToMaximize: param = getParamByName(name) RA.FormulationAPI.addObjective(param["ID"], "MAXIMIZE") for name in namesToMinimize: param = getParamByName(name) RA.FormulationAPI.addObjective(param["ID"], "MINIMIZE") Usage: addObjectivesByName(["x","z"],["y"]) addBestPerObjectiveToBasketAdd the best row for each objective to the basket. def addBestPerObjectiveToBasket(): import math objectives = RA.FormulationAPI.selectObjectives() bestPerObjective = {} for objective in objectives: if(objective["type"] == "MAXIMIZE"): bestPerObjective[objective["parameterID"]] = {"value": -math.inf} elif(objective["type"] == "MINIMIZE"): bestPerObjective[objective["parameterID"]] = {"value": math.inf} #Figure out the best designs for each objective. #Multiple designs can be optimal for the same param. dataIter = RA.DataAPI.getRowIterator() while(RA.DataAPI.hasNext(dataIter)): nextRow = RA.DataAPI.next(dataIter) for objective in objectives: objId = objective["parameterID"] objVal = nextRow["values"][objId] if(objective["type"] == "MAXIMIZE" and objVal > bestPerObjective[objId]["value"]): bestPerObjective[objId] = {"value": objVal, "ids": [nextRow["rowID"]]} elif(objective["type"] == "MINIMIZE" and objVal < bestPerObjective[objId]["value"]): bestPerObjective[objId] = {"value": objVal, "ids": [nextRow["rowID"]]} elif objVal == bestPerObjective[objId]["value"]: bestPerObjective[objId]["ids"].append(nextRow["rowID"]) allBestRows = set([]) for key in bestPerObjective.keys(): objective = bestPerObjective[key] newRowIds = set(objective["ids"]) - allBestRows allBestRows |= newRowIds RA.DataAPI.addRowsToBasket(list(allBestRows)) Usage: addObjectivesByName(["x","z2"],["y"]) addBestPerObjectiveToBasket() addParetoToBasketAdd all the rows on the Pareto frontier to the basket. def addParetoToBasket(): import math paretoDesigns = [] bestDesigns = [{"totalScore": -math.inf}] dataIter = RA.DataAPI.getRowIterator() while(RA.DataAPI.hasNext(dataIter)): nextRow = RA.DataAPI.next(dataIter) if nextRow["grade"] == "GRADE_PARETO": paretoDesigns.append(nextRow["rowID"]) #Note: best design points are always pareto. if nextRow["totalScore"] > bestDesigns[0]["totalScore"]: bestDesigns = [nextRow] elif nextRow["totalScore"] == bestDesigns[0]["totalScore"]: bestDesigns.append(nextRow) RA.DataAPI.addRowsToBasket(paretoDesigns) bestDesignIds = list(map(lambda r: r["rowID"], bestDesigns)) for rowId in bestDesignIds: RA.DataAPI.toggleRecommendRow(int(rowId)) Usage: addObjectivesByName(["x","z"],["y"]) addParetoToBasket() calculateParamStatsCalculate statistics on a numerical parameter. def calculateParamStats(paramName): param = getParamByName(paramName) rows = RA.DataAPI.getValues([param["ID"]]) values = list(map(lambda k: rows[param["ID"]][k], rows[param["ID"]].keys())) stats = {"SUM":sum(values)} n = len(values) stats["MEAN"] = stats["SUM"]/n stats["STDDEV"] = (sum(list(map(lambda v: (v-stats["MEAN"])**2, values)))/n)**.5 return stats Usage: calculateParamStats("x") removeConstantParamsRemove parameters with constant values. def removeConstantParams(): dataIter = RA.DataAPI.getRowIterator() values = RA.DataAPI.next(dataIter)["values"] while(RA.DataAPI.hasNext(dataIter)): nextRow = RA.DataAPI.next(dataIter) keyset = [] keyset [:] = list(values.keys())[:] for key in keyset: if nextRow["values"][key] != values[key]: del values[key] for key in values.keys(): RA.ParameterAPI.deleteParameter(key) Usage: removeConstantParams() listParamsWithFormulationList all parameters with objectives or constraints. def listParamsWithFormulation(): paramIds = set([]) objectives = RA.FormulationAPI.selectObjectives() for objective in objectives: paramIds |= {objective["parameterID"]} constraints = RA.FormulationAPI.selectConstraints() for constraint in constraints: paramIds |= {constraint["parameterID"]} params = RA.ParameterAPI.select(lambda p: p["ID"] in paramIds) paramNames = list(map(lambda p: p["displayName"], params)) print(paramNames) return params Usage: listParamsWithFormulation() filterToParetoFilter out infeasible and dominated points. def filterToPareto(): RA.FilterAPI.addFilter({ "@class":"LogicalCondition", "type":"ROW_GRADE_EQUALS", "rowGrade":"GRADE_INFEASIBLE" }) RA.FilterAPI.addFilter({ "@class":"LogicalCondition", "type":"ROW_GRADE_EQUALS", "rowGrade":"GRADE_DOMINATED" }) RA.FilterAPI.addFilter({ "@class":"LogicalCondition", "type":"ROW_GRADE_EQUALS", "rowGrade":"GRADE_EXCLUDED" }) Usage: addObjectivesByName(["x","z"],["y"]) filterToPareto() filterBottom10ForParamFilter out the bottom 10% of points based on a parameter. def filterBottom10ForParam(paramId): import math #Figure out what the 10th percentile is rows = RA.DataAPI.getValues([paramId]) vals = list(map(lambda k: rows[paramId][k], rows[paramId].keys())) vals = list(filter(lambda v: v != None, vals)) vals.sort() threshold = vals[math.ceil(len(vals)/10)] RA.FilterAPI.addFilter({ "@class":"ParametricCondition", "type":"LESS_THAN", "boundValue": threshold, "parameterID": paramId }) Usage: filterBottom10ForParam(getParamIdByName("x")) getFilteredDataRetrieve filtered data. def getFilteredData(): return RA.DataAPI.select(lambda r: r["filteringFlag"] == 1) Usage: filterBottom10ForParam(getParamIdByName("x")) getFilteredData() |