Skip to content

Quantum Optimization Training - part 2

View on GitHub

Dealing with constraint using portfolio optimization

In this workshop, we will solve the Portfolio Optimization problem using the Quantum Approximate Optimization Algorithm (QAOA), by introducing how to add various types of constraints of the problem to the QAOA algorithm.

Guidance for the workshop:

The # TODO or # Your code is there for you to do yourself.

The # Solution start and # Solution end are only for helping you. Please delete the Solution and try doing it yourself...

Portfolio Optimization with the Quantum Approximate Optimization Algorithm (QAOA)

Introduction

Portfolio optimization is the process of allocating a portfolio of financial assets optimally, according to some predetermined goal. Usually, the goal is to maximize the potential return while minimizing the financial risk of the portfolio. One can express this problem as a combinatorial optimization problem like many other real-world problems. In this demo, we'll show how the Quantum Approximate Optimization Algorithm (QAOA) can be employed on the Classiq platform to solve the problem of portfolio optimization.

Modeling the Portfolio Optimization Problem

As a first step, we have to model the problem mathematically. We will use a simple yet powerful model, which captures the essence of portfolio optimization:

  • A portfolio is built from a pool of \(n\) financial assets, each asset labeled \(i \in \{1,\ldots,n\}\).

  • Every asset's return is a random variable, with expected value \(\mu_i\) and variance \(\Sigma_i\) (modeling the financial risk involved in the asset).

  • Every two assets \(i \neq j\) have covariance \(\Sigma_{ij}\) (modeling market correlation between assets).

  • Every asset \(i\) has a weight \(w_i \in D_i = \{0,\ldots,b_i\}\) in the portfolio, with \(b_i\) defined as the budget for asset \(i\) (modeling the maximum allowed weight of the asset).

  • The return vector \(\mu\), the covariance matrix \(\Sigma\) and the weight vector \(w\) are defined naturally from the above (with the domain \(D = D_1 \times D_2 \times \ldots \times D_n\) for \(w\)).

With the above definitions, the total expected return of the portfolio is \(\mu^T w\) and the total risk is \(w^T \Sigma w\). We'll use a simple difference of the two as our cost function, with the additional constraint that the total sum of assets does not exceed a predefined budget \(B\). We note that there are many other possibilities for defining a cost function (e.g. add a scaling factor to the risk/return or even some non-linear relation). For reasons of simplicity we select the model below, and we assume all constants and variables are dimensionless. Thus, the problem is, given the constant inputs \(\mu, \Sigma, D, B\), to find optimal variable \(w\) as follows: \(\begin{equation*} \min_{w \in D} w^T \Sigma w - \mu^T w, \end{equation*}\) subject to \(\Sigma_{i} w_i \leq B\). The case presented above is called integer portfolio optimization, since the domains \(D_i\) are over the (positive) integers. Another variation of this problem defines weights over binary domains, and will not be discussed here.

from classiq import *
import math
# import matplotlib.pyplot as plt
import numpy as np
from scipy.optimize import minimize
from classiq.execution import ExecutionSession, ExecutionPreferences

First we will solve the problem without adding constraint.

Just:

\[\begin{equation*} \min_{w \in D} w^T \Sigma w - \mu^T w \end{equation*}\]

Then, we will add equality constraint:

\[\begin{equation*} \min_{w \in D} w^T \Sigma w - \mu^T w \end{equation*}\]

subject to:

\[\Sigma_{i} w_i == B\]

Finaly, we will add inequality constraints:

\[\begin{equation*} \min_{w \in D} w^T \Sigma w - \mu^T w, \end{equation*}\]

subject to:

\[\Sigma_{i} w_i \leq B\]

The Portfolio Optimization Problem Parameters

First we define the parameters of the optimization problem, which include the expected return vector, the covariance matrix, the total budget and the asset-specific budgets.

returns = np.array([3, 4, -1])
# fmt: off
covariances = np.array(
    [
        [ 0.9,  0.5, -0.7],
        [ 0.5,  0.9, -0.2],
        [-0.7, -0.2,  0.9],
    ]
)
# fmt: on
total_budget = 6

Defining the variables

num_assets = 3

num_qubits_per_asset = 2 # Defines the possible values of choosing each asset.

class PortfolioOptimizationVars(QStruct):
    a: QArray[QNum[num_qubits_per_asset], num_assets]

Define the expected return

Define a function that describes \(\mu^T w\) where \(\mu\) is the return vector.

def expected_return_cost(returns: np.ndarray, w_array: PortfolioOptimizationVars) -> float:
    # Your code here

    # Solution start
    return sum(returns[i] * w_array.a[i] for i in range(len(returns)))
    # Solution end

Define the risk term

Define a function that describes the risk term in the objective function \(w^T \Sigma w\) where \(\Sigma\) is the covariances matrix.

def risk_cost(covariances: np.ndarray, w_array: PortfolioOptimizationVars) -> float:
    # Your code here

    # hint:
    # risk_term =  sum(
    #     ... * sum(... for j in range(covariances.shape[0])) for i in range(covariances.shape[0])
    # )

    # Solution start
    risk_term =  sum(
        w_array.a[i] * sum(w_array.a[j] * covariances[i][j] for j in range(covariances.shape[0])) for i in range(covariances.shape[0])
    )
    # Solution end
    return risk_term

Define the entire portfolio optimization objective function

Combine the risk term and the expected return functions. There a a term called return coefficient return_coeff that defines how much you prefer certainly over return. Higher values is more risky but can be more profitable. Later try changing it to see how the result changes.

return_coeff = 1.5

def objective_portfolio(
    w_array: PortfolioOptimizationVars,
    returns: np.ndarray, covariances: np.ndarray,
    return_coeff: float) -> float:
    # Your code here

    # Solution start

    return risk_cost(covariances, w_array) - return_coeff * expected_return_cost(returns, w_array)
    # Solution end

Build the QAOA circuit

@qfunc
def mixer_layer(beta: CReal, qba: QArray[QBit]):
    # Your code here

    # Solution start
    apply_to_all(lambda q: RX(beta, q), qba)
    # Solution end
NUM_LAYERS = 4

@qfunc
def main(
    params: CArray[CReal, 2 * NUM_LAYERS],
    w_array: Output[PortfolioOptimizationVars]) -> None:
    # Your code here

    # Allocating the qubits
    allocate(num_qubits_per_asset * num_assets, w_array)

    # Build the QAOA circuit similarly to the maxcut

    # Solution start
    hadamard_transform(w_array)

    repeat(
        count=params.len / 2,
        iteration=lambda i: (
            phase(
                expr= objective_portfolio(w_array, returns, covariances, return_coeff),
                theta = params[2 * i]
            ),
            mixer_layer(params[2 * i + 1], w_array)
        )
    )
    # Solution end

Synthesizing and visualizing

qmod = create_model(main)
qprog = synthesize(qmod)
show(qprog)

Execution and post processing

For the hybrid execution, we use ExecutionSession, which can evaluate the circuit in multiple methods, such as sampling the circuit, giving specific values for the parameters, and evaluating to a specific Hamiltonian, which is very common in chemical applications.

In QAOA, we will use the estimate_cost method, which samples the cost function and returns their average cost from all measurements. That helps to optimize easily.

NUM_SHOTS = 1000

es = ExecutionSession(
    qprog, execution_preferences=ExecutionPreferences(num_shots=NUM_SHOTS)
)


# Build `initial_params` list of np.array type.
# The gamma values should start from 0 and, in each layer, should approach closer to 1 linearly
# The beta values should start from 1 and in each layer, should approach closer to 0 linearly
# Then unify it to one list so scipy minimize can digest it.
# Your code here

# Solution start
def initial_qaoa_params(NUM_LAYERS) -> np.ndarray:
    initial_gammas = math.pi * np.linspace(0, 1, NUM_LAYERS)
    initial_betas = math.pi * np.linspace(1, 0, NUM_LAYERS)

    initial_params = []

    for i in range(NUM_LAYERS):
        initial_params.append(initial_gammas[i])
        initial_params.append(initial_betas[i])

    return np.array(initial_params)

# Solution end

initial_params = initial_qaoa_params(NUM_LAYERS)

Define a callback function to track the optimization

# Record the steps of the optimization
intermediate_params = []
objective_values = []

# Define the callback function to store the intermediate steps
def callback(xk):
    intermediate_params.append(xk)

Define the objective function

# Your code with hints in the comments:

# cost_func = lambda state: objective_portfolio(
#     w_array = ...,
#     returns = ...,
#     covariances = ...,
#     return_coeff= ...
# )
# def estimate_cost_func(params: np.ndarray) -> float:
#     objective_value = es.estimate_cost(
#         cost_func = ...,
#         parameters = {"params": params.tolist()}
#     )
#     # Your code here
#     # Save the result for convergence graph
#
#     return objective_value

# Solution start
cost_func = lambda state: objective_portfolio(
    w_array = state["w_array"],
    returns = returns,
    covariances = covariances,
    return_coeff= return_coeff
)
def estimate_cost_func(params: np.ndarray) -> float:
    objective_value = es.estimate_cost(
        cost_func = cost_func,
        parameters = {"params": params.tolist()}
    )
    objective_values.append(objective_value)
    return objective_value

# Solution end

Optimize

# Your code with hints in the comments:

# optimization_res = minimize(
#     fun = ...,
#     x0=...,
#     method="COBYLA",
#     callback=...,
#     options={"maxiter": 10},
# )

# Solution start
optimization_res = minimize(
    fun = estimate_cost_func,
    x0=initial_params,
    method="COBYLA",
    callback=callback,
    options={"maxiter": 30},
)
# Solution end

Look at the results

res = es.sample({"params": optimization_res.x.tolist()})

print(f"Optimized parameters: {optimization_res.x.tolist()}")

sorted_counts = sorted(res.parsed_counts, key=lambda pc: objective_portfolio(pc.state["w_array"],returns = returns, covariances = covariances, return_coeff= return_coeff))

for sampled in sorted_counts:
    w_sample = sampled.state["w_array"]
    print(f"solution={w_sample} probability={sampled.shots/NUM_SHOTS} "
          f"cost={objective_portfolio(w_array=w_sample,returns = returns, covariances = covariances, return_coeff= return_coeff)}")

Convergence graph

# plt.plot(objective_values)
# plt.xlabel("Iteration")
# plt.ylabel("Objective Value")
# plt.title("Optimization Progress")

The Entire Solution

from typing import List

import networkx as nx
from classiq import *
import math
import matplotlib.pyplot as plt
import numpy as np
from scipy.optimize import minimize
from classiq.execution import ExecutionSession, ExecutionPreferences

NUM_LAYERS = 3

returns = np.array([3, 4, -1])
# fmt: off
covariances = np.array(
    [
        [ 0.9,  0.5, -0.7],
        [ 0.5,  0.9, -0.2],
        [-0.7, -0.2,  0.9],
    ]
)
# fmt: on
total_budget = 6
specific_budgets = 3

return_coeff = 1.7

num_assets = 3

num_qubits_per_asset = 2

# start with integer variables

class PortfolioOptimizationVars(QStruct):
    a: QArray[QNum[num_qubits_per_asset], num_assets]


def expected_return_cost(returns: np.ndarray, w_array: PortfolioOptimizationVars) -> float:
    return sum(returns[i] * w_array.a[i] for i in range(len(returns)))


def risk_cost(covariances: np.ndarray, w_array: PortfolioOptimizationVars) -> float:
    risk_term =  sum(
        w_array.a[i] * sum(w_array.a[j] * covariances[i][j] for j in range(covariances.shape[0])) for i in range(covariances.shape[0])
    )
    return risk_term


def objective_portfolio(w_array: PortfolioOptimizationVars, returns: np.ndarray, covariances: np.ndarray, return_coeff: float) -> float:
    return risk_cost(covariances, w_array) - return_coeff * expected_return_cost(returns, w_array)


@qfunc
def mixer_layer(beta: CReal, qba: QArray[QBit]):
    apply_to_all(lambda q: RX(beta, q), qba)

@qfunc
def main(params: CArray[CReal, 2 * NUM_LAYERS], w_array: Output[PortfolioOptimizationVars]) -> None:

    allocate(num_qubits_per_asset * num_assets, w_array)

    hadamard_transform(w_array)

    repeat(
        count=params.len / 2,
        iteration=lambda i: (
            phase(
                expr= objective_portfolio(w_array, returns, covariances, return_coeff),
                theta = params[2 * i]
            ),
            mixer_layer(params[2 * i + 1], w_array)
        )
    )

qmod = create_model(main)
qprog = synthesize(qmod)
show(qprog)


NUM_SHOTS = 1000

es = ExecutionSession(
    qprog, execution_preferences=ExecutionPreferences(num_shots=NUM_SHOTS)
)

def initial_qaoa_params(NUM_LAYERS) -> np.ndarray:
    initial_gammas = math.pi * np.linspace(0, 1, NUM_LAYERS)
    initial_betas = math.pi * np.linspace(1, 0, NUM_LAYERS)

    initial_params = []

    for i in range(NUM_LAYERS):
        initial_params.append(initial_gammas[i])
        initial_params.append(initial_betas[i])

    return np.array(initial_params)

initial_params = initial_qaoa_params(NUM_LAYERS)

# Record the steps of the optimization
intermediate_params = []
objective_values = []

# Define the callback function to store the intermediate steps
def callback(xk):
    intermediate_params.append(xk)

cost_func = lambda state: objective_portfolio(
    w_array = state["w_array"],
    returns = returns,
    covariances = covariances,
    return_coeff= return_coeff
)
def estimate_cost_func(params: np.ndarray) -> float:
    objective_value = es.estimate_cost(
        cost_func = cost_func,
        parameters = {"params": params.tolist()}
    )
    objective_values.append(objective_value)
    return objective_value



optimization_res = minimize(
    fun = estimate_cost_func,
    x0=initial_params,
    method="COBYLA",
    callback=callback,
    options={"maxiter": 10},
)

res = es.sample({"params": optimization_res.x.tolist()})

print(f"Optimized parameters: {optimization_res.x.tolist()}")

sorted_counts = sorted(res.parsed_counts, key=lambda pc: objective_portfolio(pc.state["w_array"],returns = returns, covariances = covariances, return_coeff= return_coeff))

for sampled in sorted_counts:
    w_sample = sampled.state["w_array"]
    print(f"solution={w_sample} probability={sampled.shots/NUM_SHOTS} "
          f"cost={objective_portfolio(w_array=w_sample,returns = returns, covariances = covariances, return_coeff= return_coeff)}")

plt.plot(objective_values)
plt.xlabel("Iteration")
plt.ylabel("Objective Value")
plt.title("Optimization Progress")

Adding equality constraint

The method to deal with equality constraint, namely:

\[\begin{equation*} \min_{w \in D} w^T \Sigma w - \mu^T w \end{equation*}\]

subject to:

\[\Sigma_{i} w_i == B\]

is to add a penalty term to lead us to the set of valid solutions. To do so, we will change the objective function as follows:

\[\begin{equation*} \min_{w \in D} w^T \Sigma w - \mu^T w + P * (\Sigma_{i} w_i - B)^2 \end{equation*}\]

Where \(P\) is the penalty value you need to define.

Define the objective value with a penalty term

PENALTY = 2.0

def objective_portfolio_equality(w_array: PortfolioOptimizationVars, returns: np.ndarray, covariances: np.ndarray, return_coeff: float) -> float:
    # Your code here

    # Solution start

    return risk_cost(covariances, w_array) - \
        return_coeff * expected_return_cost(returns, w_array) + \
        Penalty * (sum(w_array.a[i] for i in range(len(returns))) - total_budget)**2
    # Solution end

Repeat the whole process all over again with objective_portfolio_equality

Solution

from typing import List

import networkx as nx
from classiq import *
import math
import matplotlib.pyplot as plt
import numpy as np
from scipy.optimize import minimize
from classiq.execution import ExecutionSession, ExecutionPreferences

NUM_LAYERS = 3

returns = np.array([3, 4, -1])
# fmt: off
covariances = np.array(
    [
        [ 0.9,  0.5, -0.7],
        [ 0.5,  0.9, -0.2],
        [-0.7, -0.2,  0.9],
    ]
)
# fmt: on
total_budget = 6
specific_budgets = 3

return_coeff = 10.0

num_assets = 3

num_qubits_per_asset = 2

Penalty = 2.0

# start with integer variables

class PortfolioOptimizationVars(QStruct):
    a: QArray[QNum[num_qubits_per_asset], num_assets]

def expected_return_cost(returns: np.ndarray, w_array: PortfolioOptimizationVars) -> float:
    return sum(returns[i] * w_array.a[i] for i in range(len(returns)))

def risk_cost(covariances: np.ndarray, w_array: PortfolioOptimizationVars) -> float:
    risk_term =  sum(
        w_array.a[i] * sum(w_array.a[j] * covariances[i][j] for j in range(covariances.shape[0])) for i in range(covariances.shape[0])
    )
    return risk_term


def objective_portfolio_equality(w_array: PortfolioOptimizationVars, returns: np.ndarray, covariances: np.ndarray, return_coeff: float) -> float:
    return risk_cost(covariances, w_array) - \
        return_coeff * expected_return_cost(returns, w_array) + \
        Penalty * (sum(w_array.a[i] for i in range(len(returns))) - total_budget)**2


@qfunc
def mixer_layer(beta: CReal, qba: QArray[QBit]):
    apply_to_all(lambda q: RX(beta, q), qba)

@qfunc
def main(params: CArray[CReal, 2 * NUM_LAYERS], w_array: Output[PortfolioOptimizationVars]) -> None:
    # allocate(len(returns), w_array)
    allocate(num_qubits_per_asset * num_assets, w_array)

    hadamard_transform(w_array)

    repeat(
        count=params.len / 2,
        iteration=lambda i: (
            phase(
                expr= objective_portfolio_equality(w_array, returns, covariances, return_coeff),
                theta = params[2 * i]
            ),
            mixer_layer(params[2 * i + 1], w_array)
        )
    )

qmod = create_model(main)
qprog = synthesize(qmod)
show(qprog)


NUM_SHOTS = 1000

es = ExecutionSession(
    qprog, execution_preferences=ExecutionPreferences(num_shots=NUM_SHOTS)
)

def initial_qaoa_params(NUM_LAYERS) -> np.ndarray:
    initial_gammas = math.pi * np.linspace(0, 1, NUM_LAYERS)
    initial_betas = math.pi * np.linspace(1, 0, NUM_LAYERS)

    initial_params = []

    for i in range(NUM_LAYERS):
        initial_params.append(initial_gammas[i])
        initial_params.append(initial_betas[i])

    return np.array(initial_params)

initial_params = initial_qaoa_params(NUM_LAYERS)

# Record the steps of the optimization
intermediate_params = []
objective_values = []

# Define the callback function to store the intermediate steps
def callback(xk):
    intermediate_params.append(xk)

cost_func = lambda state: objective_portfolio_equality(state["w_array"],returns = returns, covariances = covariances, return_coeff= return_coeff)
def estimate_cost_func(params: np.ndarray) -> float:
    objective_value = es.estimate_cost(
        cost_func = cost_func,
        parameters = {"params": params.tolist()}
    )
    objective_values.append(objective_value)
    return objective_value


optimization_res = minimize(
    estimate_cost_func,
    x0=initial_params,
    method="COBYLA",
    callback=callback,
    options={"maxiter": 40},
)

res = es.sample({"params": optimization_res.x.tolist()})

print(f"Optimized parameters: {optimization_res.x.tolist()}")

sorted_counts = sorted(res.parsed_counts, key=lambda pc: objective_portfolio_equality(pc.state["w_array"],returns = returns, covariances = covariances, return_coeff= return_coeff))

for sampled in sorted_counts:
    w = sampled.state["w_array"]
    print(f"solution={w} probability={sampled.shots/NUM_SHOTS} "
          f"cost={objective_portfolio_equality(w_array=w,returns = returns, covariances = covariances, return_coeff= return_coeff)}")

plt.plot(objective_values)
plt.xlabel("Iteration")
plt.ylabel("Objective Value")
plt.title("Optimization Progress")