Code Samples

This section provides code samples to show use cases and examples using scripting APIs. Each sample shows a portion of the Results Analytics scripting capability encapsulated as a reusable function.

This page discusses:

getOutputs

Search for all output parameters.

def getOutputs():
    outputs = RA.ParameterAPI.select(lambda p: "OUTPUT" in p["role"])
    
    return outputs

Usage:

getOutputs()

getParamByName

Search parameters based on name and return the object or objects matching that name. Returns an error if no parameter matching that name is found.

def getParamByName(name):
    params = RA.ParameterAPI.select(lambda p: p["displayName"] == name)
    
    if(len(params) == 1):
        return params[0]
    elif(len(params) > 1):
        return params
    else:
        raise ValueError("Could not find parameter named '" + name +"'")

Usage: Get the parameter named y

yParam = getParamByName("y")

getParamIdByName

Gets the ID of a parameter for a specific parameter name. This returns an error if there is not exactly one parameter with the specified name.

Required snippets: getParamByName

def getParamByName(name):
    params = RA.ParameterAPI.select(lambda p: p["displayName"] == name)
    
    if(len(params) == 1):
        return params[0]
    elif(len(params) > 1):
        return params
    else:
        raise ValueError("Could not find parameter named '" + name +"'")

Usage: Get the parameter id for a parameter named x.

xParamId = getParamByName("x")

createOutputScatterGrid

Create a scatter grid with all the output parameters.

def createOutputScatterGrid():
    outputs = RA.ParameterAPI.select(lambda p: "OUTPUT" in p["role"])
    outputIds = list(map(lambda o: o["ID"], outputs))
    
    RA.VisualsAPI.addScatterMatrix(outputIds);

Usage:

createOutputScatterGrid()

setParameterRoles

Set all input, output, and unknown roles based on parameter names.

def setParameterRoles(inputParamNames=None, outputParamNames=None, unknownParamNames=None):
    #Note: all arguments for this function are optional. To only set e.g. output
    #parameters, use the function as setParameterRoles(outputParamNames=["x","y"])
    rolesToSet = len(inputParamNames) + len(outputParamNames)
    if(unknownParamNames is not None):
        rolesToSet += len(unknownParamNames)
    rolesSet = 0
    paramIter = RA.ParameterAPI.getIterator()
    while(RA.ParameterAPI.hasNext(paramIter) and (rolesSet < rolesToSet)):
        nextParam = RA.ParameterAPI.next(paramIter)
        
        if(inputParamNames is not None and nextParam["displayName"] in inputParamNames):
            RA.ParameterAPI.modifyParameterRole(nextParam["ID"], "INPUT")
            rolesSet += 1
            
        if(outputParamNames is not None and nextParam["displayName"] in outputParamNames):
            RA.ParameterAPI.modifyParameterRole(nextParam["ID"], "OUTPUT")
            rolesSet += 1
            
        if(unknownParamNames is not None and nextParam["displayName"] in unknownParamNames):
            RA.ParameterAPI.modifyParameterRole(nextParam["ID"], "UNKNOWN")
            rolesSet += 1

Usage: Set x and y as inputs, set z as an output.

setParameterRoles(["x","y"],["z"])

removeAllNaN

Remove all Not-a-Number (NaN) values in a specified column from the data set.

def removeAllNaN(columnId):
    #Note: the column specified by "columnId" should contain numeric data.
    setValuesDict = {}
    setValuesDict[columnId] = {}
    
    dataIter = RA.DataAPI.getRowIterator()
    while(RA.DataAPI.hasNext(dataIter)):
        nextRow = RA.DataAPI.next(dataIter)
        
        value = nextRow["values"][columnId]
        
        if(value == "NaN"):
            setValuesDict[columnId][nextRow["rowID"]] = ""
            
    RA.DataAPI.setValues(setValuesDict)

Usage: Remove all NaN values for the parameter named x.

xParamId = getParamIdByName("x")
excludeIncompleteRows(xParamId)

excludeIncompleteRows

Exclude all rows with missing values.

def excludeIncompleteRows():
    allParams = RA.ParameterAPI.select()
    allParamIds = list(map(lambda p: p["ID"], allParams))
    
    setValuesDict = {}
    for paramId in allParamIds:
        setValuesDict[paramId] = {}
        
    rowsToExclude = []
    
    dataIter = RA.DataAPI.getRowIterator()
    while(RA.DataAPI.hasNext(dataIter)):
        nextRow = RA.DataAPI.next(dataIter)
        for paramId in allParamIds:
            paramVal = nextRow["values"][paramId]
            if paramVal == "" or paramVal == None:
                rowsToExclude.append(nextRow["rowID"])
                break
            
    RA.DataAPI.excludeRows(rowsToExclude)

Usage:

excludeIncompleteRows()

addDerivedColumn

Add an equation-based derived column to the data set.

def addDerivedColumn(columnName, executable, paramIds):
    #Note: paramIds must be listed in the same order as the inputs to the
    #executable provided for columnFormula.
    
    newParam = RA.ParameterAPI.createParameter(name=columnName)
    newParamId = newParam["ID"]
    
    setValuesDict = {}
    setValuesDict[newParamId] = {}
    
    dataIter = RA.DataAPI.getRowIterator()
    while(RA.DataAPI.hasNext(dataIter)):
        nextRow = RA.DataAPI.next(dataIter)
        values = list(map(lambda p: nextRow["values"][p], paramIds))
        setValuesDict[newParamId][nextRow["rowID"]] = executable(*values)
        
    RA.DataAPI.setValues(setValuesDict)
        
    return newParam

Usage: Create a column named SUM(x,y) containing the sum of those two columns.

executable = lambda x, y: x+y
paramIds = [getParamByName("x")["ID"], getParamByName("y")["ID"]]
newParam = addDerivedColumn("SUM(x,y)", executable, paramIds)

addTunedRSM

Create an RSM approximation with a polyOrder chosen to optimize error.

def addTunedRSM(rsmName, outputId):
    import time
    
    approximations = []
    unfinishedApproxIds = []
    totalApproximations = 8
    
    #First create a set of RSMs with a variety of polyOrder settings
    for i in range(totalApproximations):
        order = i+1
        newApprox = RA.ApproximationAPI.create("Temp_RSM_"+str(order), "rsm", {"polyOrder": order})
        #approximations.append(newApprox)
        unfinishedApproxIds.append(newApprox["id"])
        
    #This helper function checks if an approximation is done
    def isFinished(approxId):
        approx = RA.ApproximationAPI.describeApproximation(approxId)
        status = approx["status"]
        if(status == "READY" or status == "NOT_AVAILABLE"):
            approximations.append(approx)
            return True
        
        return False
    
    iters = 0
        
    #This loop will sleep for a second at a time until all of the approximations
    #are finished
    while(len(unfinishedApproxIds) > 0 and iters < 20):
        unfinishedApproxIds[:] = [approxId for approxId in unfinishedApproxIds if not isFinished(approxId)]
            
        iters = iters + 1;
        time.sleep(1)
        
    #Now find the approximation with the highest value for cross validation R^2.
    #This is used to measure goodness of fit.
    #If multiple approximations have the same R^2 value, we choose the first since
    #it has the lowest order polynomial.
    approxIds = list(map(lambda a: a["id"], approximations))
    measures = RA.ApproximationAPI.errorMeasures(approxIds)
    cv_r2_vals = list(map(lambda id: measures[id][outputId]["cv_R2"], approxIds))
    
    cvMax = 0
    index = -1
    
    for i in range(len(cv_r2_vals)):
        if(cv_r2_vals[i] > cvMax):
            cvMax = cv_r2_vals[i]
            index = i
    
    finalApprox=RA.ApproximationAPI.describeApproximation(approxIds[index])
            
    for i in range(len(approxIds)):
        if(i == index):
            #Rename the best approximation
            RA.ApproximationAPI.train(approxIds[i], displayName=rsmName)
        else:
            #Delete the unused approximation
            RA.ApproximationAPI.deleteApproximations([approxIds[i]])
    
    return finalApprox

Usage:

setParameterRoles(["x","y"],["z"])
addTunedRSM("Tuned RSM", getParamIdByName("z"))

addPredictedColumn

Create a new column based on values generated from an approximation.

def addPredictedColumn(approxId, outputId):
    approx = RA.ApproximationAPI.describeApproximation(approxId)
    outputParam = RA.ParameterAPI.select(lambda p: p["ID"] == outputId)[0]
    inputParams = RA.ParameterAPI.select(lambda p: p["role"] == "INPUT")
    
    newParamName = outputParam["displayName"] + " (" + approx["displayName"] + ")"
    
    #Create a parameter for the approximation data
    newParam = RA.ParameterAPI.createParameter(name=newParamName)
    newParamId = newParam["ID"]
    
    setValuesDict = {}
    setValuesDict[newParamId] = {}
    
    #Generate approximation data for every row in the data set
    dataIter = RA.DataAPI.getRowIterator()
    while(RA.DataAPI.hasNext(dataIter)):
        nextRow = RA.DataAPI.next(dataIter)
        values = {}
        for input in inputParams:
            values[input["ID"]] = nextRow["values"][input["ID"]]
        
        predValueObject = RA.ApproximationAPI.evaluate({
            "approxIds":[approxId],
            "approxPoint": values
        })
        
        setValuesDict[newParamId][nextRow["rowID"]] = predValueObject[approxId][outputId]
        
    #Enter all of the generated data into the data set
    RA.DataAPI.setValues(setValuesDict)
        
    return newParam

Usage:

setParameterRoles(["x","y"],["z"])
tunedApprox = addTunedRSM("Tuned RSM", getParamIdByName("z"))
tunedApproxId = tunedApprox["id"]
addPredictedColumn(tunedApproxId, getParamIdByName("z"))

addPredictionPlot

Create a scatter plot showing actual data and predicted data.

def addPredictionPlot(approxId, outputId, inputId):
    import plotly.graph_objects as go
    
    inputParams = RA.ParameterAPI.select(lambda p: p["role"] == "INPUT")
    
    inputVals = []
    outputVals = []
    predVals = []
    
    #Similar to the addPredictedColumn snippet, we need to generate an array
    #of predicted data to plit
    dataIter = RA.DataAPI.getRowIterator()
    while(RA.DataAPI.hasNext(dataIter)):
        nextRow = RA.DataAPI.next(dataIter)
        values = {}
        for input in inputParams:
            values[input["ID"]] = nextRow["values"][input["ID"]]
            
        inputVals.append(nextRow["values"][inputId])
        outputVals.append(nextRow["values"][outputId])
        
        predObject = RA.ApproximationAPI.evaluate({
            "approxIds":[approxId],
            "approxPoint": values
        })
        
        predVals.append(predObject[approxId][outputId])
    
    #Instead of creating a new column, we will use plotly to create a scatter
    #plot showing both the actual and predicted values.
    pFig = go.Figure()
    pFig.add_trace(go.Scatter(x=inputVals, y=outputVals, mode='markers', name='Actual'))
    pFig.add_trace(go.Scatter(x=inputVals, y=predVals, mode='markers', name='Predicted'))
    
    pVis = RA.VisualsAPI.addPlotlyVisual(pFig, title="Actual and Predicted")
    
    return pVis

Usage:

tunedApprox = addTunedRSM()
tunedApproxId = tunedApprox["id"]
inputId = getParamByName("x")["ID"]
outputId = getParamByName("z2")["ID"]
addPredictionPlot(tunedApproxId, outputId, inputId)

plotCSV

Import and visualize external data using Python in a new Visual Analytics tab.

def plotCSV(fileName):
    import csv
    #This assumes that the csv file given is a CSV with numeric data.
    
    with open(fileName, newline='') as csvfile:
        reader = csv.reader(csvfile, skipinitialspace=True)
        
        firstRow = True
        columnNames = []
        columns = []
        #Read the CSV data into local variables.
        #The first row will be the parameter names
        for row in reader:
            if firstRow:
                firstRow = False
                columnNames[:] = row[:]
                columns = list(map(lambda x: [], columnNames))
            else:
                for i in range(len(columnNames)):
                    columns[i].append(row[i])
                    
        #Get all of the current param names. If we have param names already in
        #the dataset in the new file we'll overwrite the existing data instead of
        #creating a new column.
        currentParams = set(map(lambda p: p["displayName"], RA.ParameterAPI.select()))
        
        #Create new parameters. Set up the data structrue for sending the
        #new data to the server
        paramIds = []
        setValuesDict = {}
        for name in columnNames:
            if name in currentParams:
                paramId = getParamIdByName("name")
                paramIds.append(paramId)
                setValuesDict[paramId] = {}
            else:
                newParam = RA.ParameterAPI.createParameter(name=name)
                paramIds.append(newParam["ID"])
                setValuesDict[newParam["ID"]] = {}
        
        j=0
        #Add data for all of the rows in the data set
        dataIter = RA.DataAPI.getRowIterator()
        while(RA.DataAPI.hasNext(dataIter)):
            nextRow = RA.DataAPI.next(dataIter)
            for i in range(len(paramIds)):
                setValuesDict[paramIds[i]][nextRow["rowID"]] = columns[i][j]
            j += 1
            
        #Add additional rows as needed
        while j < len(columns[0]):
            newRowId = RA.DataAPI.addRow()
            for i in range(len(paramIds)):
                setValuesDict[paramIds[i]][newRowId] = columns[i][j]
            j += 1
        
        #Set the new values
        RA.DataAPI.setValues(setValuesDict)
        
        RA.VisualsAPI.addTab("Imported Data")
        #Note: plotly could be used here for a multi-series scatter chart
        RA.VisualsAPI.addScatter2D(paramIds[0], paramIds[1])

Usage:

plotCSV("more_data.csv")

addObjectivesByName

Add objectives based on parameter name.

Required snippets: getParamByName.

def addObjectivesByName(namesToMaximize=None, namesToMinimize=None):
    for name in namesToMaximize:
        param = getParamByName(name)
        RA.FormulationAPI.addObjective(param["ID"], "MAXIMIZE")
        
    for name in namesToMinimize:
        param = getParamByName(name)
        RA.FormulationAPI.addObjective(param["ID"], "MINIMIZE")

Usage:

addObjectivesByName(["x","z"],["y"])

addBestPerObjectiveToBasket

Add the best row for each objective to the basket.

def addBestPerObjectiveToBasket():
    import math
    
    objectives = RA.FormulationAPI.selectObjectives()
    bestPerObjective = {}
    for objective in objectives:
        if(objective["type"] == "MAXIMIZE"):
            bestPerObjective[objective["parameterID"]] = {"value": -math.inf}
        elif(objective["type"] == "MINIMIZE"):
            bestPerObjective[objective["parameterID"]] = {"value": math.inf}
    
    #Figure out the best designs for each objective.
    #Multiple designs can be optimal for the same param.
    dataIter = RA.DataAPI.getRowIterator()
    while(RA.DataAPI.hasNext(dataIter)):
        nextRow = RA.DataAPI.next(dataIter)
        for objective in objectives:
            objId = objective["parameterID"]
            objVal = nextRow["values"][objId]
            if(objective["type"] == "MAXIMIZE" and objVal > bestPerObjective[objId]["value"]):
                bestPerObjective[objId] = {"value": objVal, "ids": [nextRow["rowID"]]}
            elif(objective["type"] == "MINIMIZE" and objVal < bestPerObjective[objId]["value"]):
                bestPerObjective[objId] = {"value": objVal, "ids": [nextRow["rowID"]]}
            elif objVal == bestPerObjective[objId]["value"]:
                bestPerObjective[objId]["ids"].append(nextRow["rowID"])
                
    allBestRows = set([])
    for key in bestPerObjective.keys():
        objective = bestPerObjective[key]
        newRowIds  = set(objective["ids"]) - allBestRows
        allBestRows |= newRowIds
        
    RA.DataAPI.addRowsToBasket(list(allBestRows))

Usage:

addObjectivesByName(["x","z2"],["y"])
addBestPerObjectiveToBasket()

addParetoToBasket

Add all the rows on the Pareto frontier to the basket.

def addParetoToBasket():
    import math
    
    paretoDesigns = []
    bestDesigns = [{"totalScore": -math.inf}]
    dataIter = RA.DataAPI.getRowIterator()
    while(RA.DataAPI.hasNext(dataIter)):
        nextRow = RA.DataAPI.next(dataIter)
        if nextRow["grade"] == "GRADE_PARETO":
            paretoDesigns.append(nextRow["rowID"])
            
            #Note: best design points are always pareto.
            if nextRow["totalScore"] > bestDesigns[0]["totalScore"]:
                bestDesigns = [nextRow]
            elif nextRow["totalScore"] == bestDesigns[0]["totalScore"]:
                bestDesigns.append(nextRow)
            
    RA.DataAPI.addRowsToBasket(paretoDesigns)
            
    bestDesignIds = list(map(lambda r: r["rowID"], bestDesigns))
    for rowId in bestDesignIds:
        RA.DataAPI.toggleRecommendRow(int(rowId))

Usage:

addObjectivesByName(["x","z"],["y"])
addParetoToBasket()

calculateParamStats

Calculate statistics on a numerical parameter.

def calculateParamStats(paramName):
    param = getParamByName(paramName)
    rows = RA.DataAPI.getValues([param["ID"]])
    values = list(map(lambda k: rows[param["ID"]][k], rows[param["ID"]].keys()))
    
    stats = {"SUM":sum(values)}
    n = len(values)
    stats["MEAN"] = stats["SUM"]/n
    stats["STDDEV"] = (sum(list(map(lambda v: (v-stats["MEAN"])**2, values)))/n)**.5
    
    return stats

Usage:

calculateParamStats("x")

removeConstantParams

Remove parameters with constant values.

def removeConstantParams():
    dataIter = RA.DataAPI.getRowIterator()
    values = RA.DataAPI.next(dataIter)["values"]
    
    while(RA.DataAPI.hasNext(dataIter)):
        nextRow = RA.DataAPI.next(dataIter)
        keyset = []
        keyset [:] = list(values.keys())[:]
        for key in keyset:
            if nextRow["values"][key] != values[key]:
                del values[key]
                
    for key in values.keys():
        RA.ParameterAPI.deleteParameter(key)

Usage:

removeConstantParams()

listParamsWithFormulation

List all parameters with objectives or constraints.

def listParamsWithFormulation():
    paramIds = set([])
    
    objectives = RA.FormulationAPI.selectObjectives()
    for objective in objectives:
        paramIds |= {objective["parameterID"]}
        
    constraints = RA.FormulationAPI.selectConstraints()
    for constraint in constraints:
        paramIds |= {constraint["parameterID"]}
        
    params = RA.ParameterAPI.select(lambda p: p["ID"] in paramIds)
    
    paramNames = list(map(lambda p: p["displayName"], params))
    
    print(paramNames)
    
    return params

Usage:

listParamsWithFormulation()

filterToPareto

Filter out infeasible and dominated points.

def filterToPareto():
    RA.FilterAPI.addFilter({
        "@class":"LogicalCondition",
        "type":"ROW_GRADE_EQUALS",
        "rowGrade":"GRADE_INFEASIBLE"
    })
    
    RA.FilterAPI.addFilter({
        "@class":"LogicalCondition",
        "type":"ROW_GRADE_EQUALS",
        "rowGrade":"GRADE_DOMINATED"
    })
    
    RA.FilterAPI.addFilter({
        "@class":"LogicalCondition",
        "type":"ROW_GRADE_EQUALS",
        "rowGrade":"GRADE_EXCLUDED"
    })

Usage:

addObjectivesByName(["x","z"],["y"])
filterToPareto()

filterBottom10ForParam

Filter out the bottom 10% of points based on a parameter.

def filterBottom10ForParam(paramId):
    import math
    
    #Figure out what the 10th percentile is
    rows = RA.DataAPI.getValues([paramId])
    vals = list(map(lambda k: rows[paramId][k], rows[paramId].keys()))
    vals = list(filter(lambda v: v != None, vals))
    vals.sort()
    threshold = vals[math.ceil(len(vals)/10)]
    
    RA.FilterAPI.addFilter({
        "@class":"ParametricCondition",
        "type":"LESS_THAN",
        "boundValue": threshold,
        "parameterID": paramId
    })

Usage:

filterBottom10ForParam(getParamIdByName("x"))

getFilteredData

Retrieve filtered data.

def getFilteredData():
    return RA.DataAPI.select(lambda r: r["filteringFlag"] == 1)

Usage:

filterBottom10ForParam(getParamIdByName("x"))
getFilteredData()