getOutputsSearch for all output parameters. def getOutputs():
outputs = RA.ParameterAPI.select(lambda p: "OUTPUT" in p["role"])
return outputs
Usage: getOutputs() getParamByNameSearch parameters based on name and return the object or objects matching that name. Returns an error if no parameter matching that name is found. def getParamByName(name):
params = RA.ParameterAPI.select(lambda p: p["displayName"] == name)
if(len(params) == 1):
return params[0]
elif(len(params) > 1):
return params
else:
raise ValueError("Could not find parameter named '" + name +"'")
Usage: Get the parameter named
yParam = getParamByName("y")
getParamIdByNameGets the ID of a parameter for a specific parameter name. This returns an error if there is not exactly one parameter with the specified name. Required snippets: def getParamByName(name):
params = RA.ParameterAPI.select(lambda p: p["displayName"] == name)
if(len(params) == 1):
return params[0]
elif(len(params) > 1):
return params
else:
raise ValueError("Could not find parameter named '" + name +"'")
Usage: Get the parameter id for a parameter named xParamId = getParamByName("x")
createOutputScatterGridCreate a scatter grid with all the output parameters. def createOutputScatterGrid():
outputs = RA.ParameterAPI.select(lambda p: "OUTPUT" in p["role"])
outputIds = list(map(lambda o: o["ID"], outputs))
RA.VisualsAPI.addScatterMatrix(outputIds);
Usage: createOutputScatterGrid() setParameterRolesSet all input, output, and unknown roles based on parameter names. def setParameterRoles(inputParamNames=None, outputParamNames=None, unknownParamNames=None):
#Note: all arguments for this function are optional. To only set e.g. output
#parameters, use the function as setParameterRoles(outputParamNames=["x","y"])
rolesToSet = len(inputParamNames) + len(outputParamNames)
if(unknownParamNames is not None):
rolesToSet += len(unknownParamNames)
rolesSet = 0
paramIter = RA.ParameterAPI.getIterator()
while(RA.ParameterAPI.hasNext(paramIter) and (rolesSet < rolesToSet)):
nextParam = RA.ParameterAPI.next(paramIter)
if(inputParamNames is not None and nextParam["displayName"] in inputParamNames):
RA.ParameterAPI.modifyParameterRole(nextParam["ID"], "INPUT")
rolesSet += 1
if(outputParamNames is not None and nextParam["displayName"] in outputParamNames):
RA.ParameterAPI.modifyParameterRole(nextParam["ID"], "OUTPUT")
rolesSet += 1
if(unknownParamNames is not None and nextParam["displayName"] in unknownParamNames):
RA.ParameterAPI.modifyParameterRole(nextParam["ID"], "UNKNOWN")
rolesSet += 1
Usage: Set
setParameterRoles(["x","y"],["z"]) removeAllNaNRemove all Not-a-Number (NaN) values in a specified column from the data set. def removeAllNaN(columnId):
#Note: the column specified by "columnId" should contain numeric data.
setValuesDict = {}
setValuesDict[columnId] = {}
dataIter = RA.DataAPI.getRowIterator()
while(RA.DataAPI.hasNext(dataIter)):
nextRow = RA.DataAPI.next(dataIter)
value = nextRow["values"][columnId]
if(value == "NaN"):
setValuesDict[columnId][nextRow["rowID"]] = ""
RA.DataAPI.setValues(setValuesDict)
Usage: Remove all NaN values for the parameter named xParamId = getParamIdByName("x")
excludeIncompleteRows(xParamId)
excludeIncompleteRowsExclude all rows with missing values. def excludeIncompleteRows():
allParams = RA.ParameterAPI.select()
allParamIds = list(map(lambda p: p["ID"], allParams))
setValuesDict = {}
for paramId in allParamIds:
setValuesDict[paramId] = {}
rowsToExclude = []
dataIter = RA.DataAPI.getRowIterator()
while(RA.DataAPI.hasNext(dataIter)):
nextRow = RA.DataAPI.next(dataIter)
for paramId in allParamIds:
paramVal = nextRow["values"][paramId]
if paramVal == "" or paramVal == None:
rowsToExclude.append(nextRow["rowID"])
break
RA.DataAPI.excludeRows(rowsToExclude)
Usage: excludeIncompleteRows() addDerivedColumnAdd an equation-based derived column to the data set. def addDerivedColumn(columnName, executable, paramIds):
#Note: paramIds must be listed in the same order as the inputs to the
#executable provided for columnFormula.
newParam = RA.ParameterAPI.createParameter(name=columnName)
newParamId = newParam["ID"]
setValuesDict = {}
setValuesDict[newParamId] = {}
dataIter = RA.DataAPI.getRowIterator()
while(RA.DataAPI.hasNext(dataIter)):
nextRow = RA.DataAPI.next(dataIter)
values = list(map(lambda p: nextRow["values"][p], paramIds))
setValuesDict[newParamId][nextRow["rowID"]] = executable(*values)
RA.DataAPI.setValues(setValuesDict)
return newParam
Usage: Create a column named executable = lambda x, y: x+y
paramIds = [getParamByName("x")["ID"], getParamByName("y")["ID"]]
newParam = addDerivedColumn("SUM(x,y)", executable, paramIds)
addTunedRSMCreate an RSM approximation with a def addTunedRSM(rsmName, outputId):
import time
approximations = []
unfinishedApproxIds = []
totalApproximations = 8
#First create a set of RSMs with a variety of polyOrder settings
for i in range(totalApproximations):
order = i+1
newApprox = RA.ApproximationAPI.create("Temp_RSM_"+str(order), "rsm", {"polyOrder": order})
#approximations.append(newApprox)
unfinishedApproxIds.append(newApprox["id"])
#This helper function checks if an approximation is done
def isFinished(approxId):
approx = RA.ApproximationAPI.describeApproximation(approxId)
status = approx["status"]
if(status == "READY" or status == "NOT_AVAILABLE"):
approximations.append(approx)
return True
return False
iters = 0
#This loop will sleep for a second at a time until all of the approximations
#are finished
while(len(unfinishedApproxIds) > 0 and iters < 20):
unfinishedApproxIds[:] = [approxId for approxId in unfinishedApproxIds if not isFinished(approxId)]
iters = iters + 1;
time.sleep(1)
#Now find the approximation with the highest value for cross validation R^2.
#This is used to measure goodness of fit.
#If multiple approximations have the same R^2 value, we choose the first since
#it has the lowest order polynomial.
approxIds = list(map(lambda a: a["id"], approximations))
measures = RA.ApproximationAPI.errorMeasures(approxIds)
cv_r2_vals = list(map(lambda id: measures[id][outputId]["cv_R2"], approxIds))
cvMax = 0
index = -1
for i in range(len(cv_r2_vals)):
if(cv_r2_vals[i] > cvMax):
cvMax = cv_r2_vals[i]
index = i
finalApprox=RA.ApproximationAPI.describeApproximation(approxIds[index])
for i in range(len(approxIds)):
if(i == index):
#Rename the best approximation
RA.ApproximationAPI.train(approxIds[i], displayName=rsmName)
else:
#Delete the unused approximation
RA.ApproximationAPI.deleteApproximations([approxIds[i]])
return finalApprox
Usage: setParameterRoles(["x","y"],["z"])
addTunedRSM("Tuned RSM", getParamIdByName("z"))
addPredictedColumnCreate a new column based on values generated from an approximation. def addPredictedColumn(approxId, outputId):
approx = RA.ApproximationAPI.describeApproximation(approxId)
outputParam = RA.ParameterAPI.select(lambda p: p["ID"] == outputId)[0]
inputParams = RA.ParameterAPI.select(lambda p: p["role"] == "INPUT")
newParamName = outputParam["displayName"] + " (" + approx["displayName"] + ")"
#Create a parameter for the approximation data
newParam = RA.ParameterAPI.createParameter(name=newParamName)
newParamId = newParam["ID"]
setValuesDict = {}
setValuesDict[newParamId] = {}
#Generate approximation data for every row in the data set
dataIter = RA.DataAPI.getRowIterator()
while(RA.DataAPI.hasNext(dataIter)):
nextRow = RA.DataAPI.next(dataIter)
values = {}
for input in inputParams:
values[input["ID"]] = nextRow["values"][input["ID"]]
predValueObject = RA.ApproximationAPI.evaluate({
"approxIds":[approxId],
"approxPoint": values
})
setValuesDict[newParamId][nextRow["rowID"]] = predValueObject[approxId][outputId]
#Enter all of the generated data into the data set
RA.DataAPI.setValues(setValuesDict)
return newParam
Usage: setParameterRoles(["x","y"],["z"])
tunedApprox = addTunedRSM("Tuned RSM", getParamIdByName("z"))
tunedApproxId = tunedApprox["id"]
addPredictedColumn(tunedApproxId, getParamIdByName("z"))
addPredictionPlotCreate a scatter plot showing actual data and predicted data. def addPredictionPlot(approxId, outputId, inputId):
import plotly.graph_objects as go
inputParams = RA.ParameterAPI.select(lambda p: p["role"] == "INPUT")
inputVals = []
outputVals = []
predVals = []
#Similar to the addPredictedColumn snippet, we need to generate an array
#of predicted data to plit
dataIter = RA.DataAPI.getRowIterator()
while(RA.DataAPI.hasNext(dataIter)):
nextRow = RA.DataAPI.next(dataIter)
values = {}
for input in inputParams:
values[input["ID"]] = nextRow["values"][input["ID"]]
inputVals.append(nextRow["values"][inputId])
outputVals.append(nextRow["values"][outputId])
predObject = RA.ApproximationAPI.evaluate({
"approxIds":[approxId],
"approxPoint": values
})
predVals.append(predObject[approxId][outputId])
#Instead of creating a new column, we will use plotly to create a scatter
#plot showing both the actual and predicted values.
pFig = go.Figure()
pFig.add_trace(go.Scatter(x=inputVals, y=outputVals, mode='markers', name='Actual'))
pFig.add_trace(go.Scatter(x=inputVals, y=predVals, mode='markers', name='Predicted'))
pVis = RA.VisualsAPI.addPlotlyVisual(pFig, title="Actual and Predicted")
return pVis
Usage: tunedApprox = addTunedRSM()
tunedApproxId = tunedApprox["id"]
inputId = getParamByName("x")["ID"]
outputId = getParamByName("z2")["ID"]
addPredictionPlot(tunedApproxId, outputId, inputId)
plotCSVImport and visualize external data using Python in a new Visual Analytics tab. def plotCSV(fileName):
import csv
#This assumes that the csv file given is a CSV with numeric data.
with open(fileName, newline='') as csvfile:
reader = csv.reader(csvfile, skipinitialspace=True)
firstRow = True
columnNames = []
columns = []
#Read the CSV data into local variables.
#The first row will be the parameter names
for row in reader:
if firstRow:
firstRow = False
columnNames[:] = row[:]
columns = list(map(lambda x: [], columnNames))
else:
for i in range(len(columnNames)):
columns[i].append(row[i])
#Get all of the current param names. If we have param names already in
#the dataset in the new file we'll overwrite the existing data instead of
#creating a new column.
currentParams = set(map(lambda p: p["displayName"], RA.ParameterAPI.select()))
#Create new parameters. Set up the data structrue for sending the
#new data to the server
paramIds = []
setValuesDict = {}
for name in columnNames:
if name in currentParams:
paramId = getParamIdByName("name")
paramIds.append(paramId)
setValuesDict[paramId] = {}
else:
newParam = RA.ParameterAPI.createParameter(name=name)
paramIds.append(newParam["ID"])
setValuesDict[newParam["ID"]] = {}
j=0
#Add data for all of the rows in the data set
dataIter = RA.DataAPI.getRowIterator()
while(RA.DataAPI.hasNext(dataIter)):
nextRow = RA.DataAPI.next(dataIter)
for i in range(len(paramIds)):
setValuesDict[paramIds[i]][nextRow["rowID"]] = columns[i][j]
j += 1
#Add additional rows as needed
while j < len(columns[0]):
newRowId = RA.DataAPI.addRow()
for i in range(len(paramIds)):
setValuesDict[paramIds[i]][newRowId] = columns[i][j]
j += 1
#Set the new values
RA.DataAPI.setValues(setValuesDict)
RA.VisualsAPI.addTab("Imported Data")
#Note: plotly could be used here for a multi-series scatter chart
RA.VisualsAPI.addScatter2D(paramIds[0], paramIds[1])
Usage: plotCSV("more_data.csv")
addObjectivesByNameAdd objectives based on parameter name. Required snippets: def addObjectivesByName(namesToMaximize=None, namesToMinimize=None):
for name in namesToMaximize:
param = getParamByName(name)
RA.FormulationAPI.addObjective(param["ID"], "MAXIMIZE")
for name in namesToMinimize:
param = getParamByName(name)
RA.FormulationAPI.addObjective(param["ID"], "MINIMIZE")
Usage: addObjectivesByName(["x","z"],["y"]) addBestPerObjectiveToBasketAdd the best row for each objective to the basket. def addBestPerObjectiveToBasket():
import math
objectives = RA.FormulationAPI.selectObjectives()
bestPerObjective = {}
for objective in objectives:
if(objective["type"] == "MAXIMIZE"):
bestPerObjective[objective["parameterID"]] = {"value": -math.inf}
elif(objective["type"] == "MINIMIZE"):
bestPerObjective[objective["parameterID"]] = {"value": math.inf}
#Figure out the best designs for each objective.
#Multiple designs can be optimal for the same param.
dataIter = RA.DataAPI.getRowIterator()
while(RA.DataAPI.hasNext(dataIter)):
nextRow = RA.DataAPI.next(dataIter)
for objective in objectives:
objId = objective["parameterID"]
objVal = nextRow["values"][objId]
if(objective["type"] == "MAXIMIZE" and objVal > bestPerObjective[objId]["value"]):
bestPerObjective[objId] = {"value": objVal, "ids": [nextRow["rowID"]]}
elif(objective["type"] == "MINIMIZE" and objVal < bestPerObjective[objId]["value"]):
bestPerObjective[objId] = {"value": objVal, "ids": [nextRow["rowID"]]}
elif objVal == bestPerObjective[objId]["value"]:
bestPerObjective[objId]["ids"].append(nextRow["rowID"])
allBestRows = set([])
for key in bestPerObjective.keys():
objective = bestPerObjective[key]
newRowIds = set(objective["ids"]) - allBestRows
allBestRows |= newRowIds
RA.DataAPI.addRowsToBasket(list(allBestRows))
Usage: addObjectivesByName(["x","z2"],["y"]) addBestPerObjectiveToBasket() addParetoToBasketAdd all the rows on the Pareto frontier to the basket. def addParetoToBasket():
import math
paretoDesigns = []
bestDesigns = [{"totalScore": -math.inf}]
dataIter = RA.DataAPI.getRowIterator()
while(RA.DataAPI.hasNext(dataIter)):
nextRow = RA.DataAPI.next(dataIter)
if nextRow["grade"] == "GRADE_PARETO":
paretoDesigns.append(nextRow["rowID"])
#Note: best design points are always pareto.
if nextRow["totalScore"] > bestDesigns[0]["totalScore"]:
bestDesigns = [nextRow]
elif nextRow["totalScore"] == bestDesigns[0]["totalScore"]:
bestDesigns.append(nextRow)
RA.DataAPI.addRowsToBasket(paretoDesigns)
bestDesignIds = list(map(lambda r: r["rowID"], bestDesigns))
for rowId in bestDesignIds:
RA.DataAPI.toggleRecommendRow(int(rowId))
Usage: addObjectivesByName(["x","z"],["y"]) addParetoToBasket() calculateParamStatsCalculate statistics on a numerical parameter. def calculateParamStats(paramName):
param = getParamByName(paramName)
rows = RA.DataAPI.getValues([param["ID"]])
values = list(map(lambda k: rows[param["ID"]][k], rows[param["ID"]].keys()))
stats = {"SUM":sum(values)}
n = len(values)
stats["MEAN"] = stats["SUM"]/n
stats["STDDEV"] = (sum(list(map(lambda v: (v-stats["MEAN"])**2, values)))/n)**.5
return stats
Usage: calculateParamStats("x")
removeConstantParamsRemove parameters with constant values. def removeConstantParams():
dataIter = RA.DataAPI.getRowIterator()
values = RA.DataAPI.next(dataIter)["values"]
while(RA.DataAPI.hasNext(dataIter)):
nextRow = RA.DataAPI.next(dataIter)
keyset = []
keyset [:] = list(values.keys())[:]
for key in keyset:
if nextRow["values"][key] != values[key]:
del values[key]
for key in values.keys():
RA.ParameterAPI.deleteParameter(key)
Usage: removeConstantParams() listParamsWithFormulationList all parameters with objectives or constraints. def listParamsWithFormulation():
paramIds = set([])
objectives = RA.FormulationAPI.selectObjectives()
for objective in objectives:
paramIds |= {objective["parameterID"]}
constraints = RA.FormulationAPI.selectConstraints()
for constraint in constraints:
paramIds |= {constraint["parameterID"]}
params = RA.ParameterAPI.select(lambda p: p["ID"] in paramIds)
paramNames = list(map(lambda p: p["displayName"], params))
print(paramNames)
return params
Usage: listParamsWithFormulation() filterToParetoFilter out infeasible and dominated points. def filterToPareto():
RA.FilterAPI.addFilter({
"@class":"LogicalCondition",
"type":"ROW_GRADE_EQUALS",
"rowGrade":"GRADE_INFEASIBLE"
})
RA.FilterAPI.addFilter({
"@class":"LogicalCondition",
"type":"ROW_GRADE_EQUALS",
"rowGrade":"GRADE_DOMINATED"
})
RA.FilterAPI.addFilter({
"@class":"LogicalCondition",
"type":"ROW_GRADE_EQUALS",
"rowGrade":"GRADE_EXCLUDED"
})
Usage: addObjectivesByName(["x","z"],["y"]) filterToPareto() filterBottom10ForParamFilter out the bottom 10% of points based on a parameter. def filterBottom10ForParam(paramId):
import math
#Figure out what the 10th percentile is
rows = RA.DataAPI.getValues([paramId])
vals = list(map(lambda k: rows[paramId][k], rows[paramId].keys()))
vals = list(filter(lambda v: v != None, vals))
vals.sort()
threshold = vals[math.ceil(len(vals)/10)]
RA.FilterAPI.addFilter({
"@class":"ParametricCondition",
"type":"LESS_THAN",
"boundValue": threshold,
"parameterID": paramId
})
Usage: filterBottom10ForParam(getParamIdByName("x"))
getFilteredDataRetrieve filtered data. def getFilteredData():
return RA.DataAPI.select(lambda r: r["filteringFlag"] == 1)
Usage: filterBottom10ForParam(getParamIdByName("x"))
getFilteredData()
| |||||||