Skip to content

Quantum Optimization Training - part 3

View on GitHub

Dealing with constraint using portfolio optimization

Guidance for the workshop:

The # TODO or # Your code is there for you to do yourself.

The # Solution start and # Solution end are only for helping you. Please delete the Solution and try doing it yourself...

Portfolio Optimization with the Quantum Approximate Optimization Algorithm (QAOA)

Introduction

Portfolio optimization is the process of allocating a portfolio of financial assets optimally, according to some predetermined goal. Usually, the goal is to maximize the potential return while minimizing the financial risk of the portfolio. One can express this problem as a combinatorial optimization problem like many other real-world problems. In this demo, we'll show how the Quantum Approximate Optimization Algorithm (QAOA) can be employed on the Classiq platform to solve the problem of portfolio optimization.

Modeling the Portfolio Optimization Problem

As a first step, we have to model the problem mathematically. We will use a simple yet powerful model, which captures the essence of portfolio optimization:

  • A portfolio is built from a pool of \(n\) financial assets, each asset labeled \(i \in \{1,\ldots,n\}\).

  • Every asset's return is a random variable, with expected value \(\mu_i\) and variance \(\Sigma_i\) (modeling the financial risk involved in the asset).

  • Every two assets \(i \neq j\) have covariance \(\Sigma_{ij}\) (modeling market correlation between assets).

  • Every asset \(i\) has a weight \(w_i \in D_i = \{0,\ldots,b_i\}\) in the portfolio, with \(b_i\) defined as the budget for asset \(i\) (modeling the maximum allowed weight of the asset).

  • The return vector \(\mu\), the covariance matrix \(\Sigma\) and the weight vector \(w\) are defined naturally from the above (with the domain \(D = D_1 \times D_2 \times \ldots \times D_n\) for \(w\)).

With the above definitions, the total expected return of the portfolio is \(\mu^T w\) and the total risk is \(w^T \Sigma w\). We'll use a simple difference of the two as our cost function, with the additional constraint that the total sum of assets does not exceed a predefined budget \(B\). We note that there are many other possibilities for defining a cost function (e.g. add a scaling factor to the risk/return or even some non-linear relation). For reasons of simplicity we select the model below, and we assume all constants and variables are dimensionless. Thus, the problem is, given the constant inputs \(\mu, \Sigma, D, B\), to find optimal variable \(w\) as follows:

\[\begin{equation*} \min_{w \in D} w^T \Sigma w - \mu^T w, \end{equation*}\]

subject to \(\(\Sigma_{i} w_i \leq B\)\)

The case presented above is called integer portfolio optimization, since the domains \(D_i\) are over the (positive) integers. Another variation of this problem defines weights over binary domains, and will not be discussed here.

from typing import List

import networkx as nx
from classiq import *
import math
import matplotlib.pyplot as plt
import numpy as np
from scipy.optimize import minimize
from classiq.execution import ExecutionSession, ExecutionPreferences

Finaly, we will add inequality constraints:

\[\begin{equation*} \min_{w \in D} w^T \Sigma w - \mu^T w, \end{equation*}\]

subject to:

\[\Sigma_{i} w_i \leq B\]

We will do it similarly to the equality constraint but we add slack variable that can take multiple values to make sure \(\Sigma_{i} w_i \leq B\)

In this case, we will change the objective function as follows:

\[\begin{equation*} \min_{w \in D} w^T \Sigma w - \mu^T w + P * (\Sigma_{i} w_i + slack - B)^2 \end{equation*}\]

Where \(P\) is the penalty value you need to define.

The Portfolio Optimization Problem Parameters

First we define the parameters of the optimization problem, which include the expected return vector, the covariance matrix, the total budget and the asset-specific budgets.

returns = np.array([3, 4, -1])
# fmt: off
covariances = np.array(
    [
        [ 0.9,  0.5, -0.7],
        [ 0.5,  0.9, -0.2],
        [-0.7, -0.2,  0.9],
    ]
)
# fmt: on
total_budget = 6

Defining the variables

The number of slack qubits needs to reach to get to the number \(B\).

num_assets = 3

num_qubits_per_asset = 2 # Defines the possible values of choosing each asset.

num_slack = 3

class PortfolioOptimizationVars(QStruct):
    a: QArray[QNum[num_qubits_per_asset], num_assets]
    slack: QNum[num_slack]

Define the expected return

Define a function that describes \(\mu^T w\) where \(\mu\) is the return vector.

def expected_return_cost(returns: np.ndarray, w_array: PortfolioOptimizationVars) -> float:
    return sum(returns[i] * w_array.a[i] for i in range(len(returns)))

Define the risk term

Define a function that describes the risk term in the objective function \(w^T \Sigma w\) where \(\Sigma\) is the covariances matrix.

\[\begin{equation*} \min_{w \in D} w^T \Sigma w - \mu^T w + P * (\Sigma_{i} w_i + slack - B)^2 \end{equation*}\]
def risk_cost(covariances: np.ndarray, w_array: PortfolioOptimizationVars) -> float:
    risk_term =  sum(
        w_array.a[i] * sum(w_array.a[j] * covariances[i][j] for j in range(covariances.shape[0])) for i in range(covariances.shape[0])
    )
    return risk_term

Define the entire portfolio optimization objective function

Combine the risk term and the expected return functions. There a a term called return coefficient return_coeff that defines how much you prefer certainly over return. Higher values is more risky but can be more profitable.

Later try changing it to see how the result changes.

return_coeff = 1.4
Penalty = 1.3

def objective_portfolio_inequality(
    w_array: PortfolioOptimizationVars,
    returns: np.ndarray, covariances: np.ndarray,
    return_coeff: float) -> float:
    # Your code

    # Solution start
    return risk_cost(covariances, w_array) - return_coeff * expected_return_cost(returns, w_array) + Penalty * (sum(w_array.a[i] for i in range(len(returns))) + w_array.slack - total_budget)**2
    # Solution end

Build the QAOA circuit

@qfunc
def mixer_layer(beta: CReal, qba: QArray[QBit]):
    # Your code here

    # Solution start
    apply_to_all(lambda q: RX(beta, q), qba)
    # Solution end
NUM_LAYERS = 4

@qfunc
def main(
    params: CArray[CReal, 2 * NUM_LAYERS],
    w_array: Output[PortfolioOptimizationVars]) -> None:

    # Allocating the qubits
    allocate(num_qubits_per_asset * num_assets + num_slack, w_array)

    # Your code

    # Solution start

    hadamard_transform(w_array)

    repeat(
        count=params.len / 2,
        iteration=lambda i: (
            phase(
                expr= objective_portfolio_inequality(w_array, returns, covariances, return_coeff),
                theta = params[2 * i]
            ),
            mixer_layer(params[2 * i + 1], w_array)
        )
    )

    # Solution end

Synthesizing and visualizing

qmod = create_model(main)
qprog = synthesize(qmod)
show(qprog)

Execution and post processing

For the hybrid execution, we use ExecutionSession, which can evaluate the circuit in multiple methods, such as sampling the circuit, giving specific values for the parameters, and evaluating to a specific Hamiltonian, which is very common in chemical applications.

In QAOA, we will use the estimate_cost method, which samples the cost function and returns their average cost from all measurements. That helps to optimize easily.

NUM_SHOTS = 1000

es = ExecutionSession(
    qprog, execution_preferences=ExecutionPreferences(num_shots=NUM_SHOTS)
)

# Build `initial_params` list of np.array type.
# The gamma values should start from 0 and, in each layer, should approach closer to 1 linearly
# The beta values should start from 1 and in each layer, should approach closer to 0 linearly
# Then unify it to one list so scipy minimize can digest it.
# Your code here

# Solution start
def initial_qaoa_params(NUM_LAYERS) -> np.ndarray:
    initial_gammas = math.pi * np.linspace(0, 1, NUM_LAYERS)
    initial_betas = math.pi * np.linspace(1, 0, NUM_LAYERS)

    initial_params = []

    for i in range(NUM_LAYERS):
        initial_params.append(initial_gammas[i])
        initial_params.append(initial_betas[i])

    return np.array(initial_params)

# Solution end

initial_params = initial_qaoa_params(NUM_LAYERS)

Define a callback function to track the optimization

# Record the steps of the optimization
intermediate_params = []
objective_values = []

# Define the callback function to store the intermediate steps
def callback(xk):
    intermediate_params.append(xk)

Define the objective function

# You code
# You can use the hints in the comments

# cost_func = lambda state: objective_portfolio_inequality(
#     w_array = ...,
#     returns = ...,
#     covariances = ...,
#     return_coeff= ...
# )
# def estimate_cost_func(params: np.ndarray) -> float:
#     objective_value = es.estimate_cost(
#         cost_func = ...,
#         parameters = {"params": params.tolist()}
#     )
#     # Your code here
#     # Save the result for convergence graph

#     return objective_value

# Solution start

cost_func = lambda state: objective_portfolio_inequality(
    w_array = state["w_array"],
    returns = returns,
    covariances = covariances,
    return_coeff= return_coeff
)

def estimate_cost_func(params: np.ndarray) -> float:
    objective_value = es.estimate_cost(
        cost_func = cost_func,
        parameters = {"params": params.tolist()}
    )
    objective_values.append(objective_value)
    return objective_value

# Solution end

Optimize

# You code
# You can use the hints in the comments

# optimization_res = minimize(
#     fun = ...,
#     x0=...,
#     method="COBYLA",
#     callback=...,
#     options={"maxiter": 10},
# )

# Solution start

optimization_res = minimize(
    estimate_cost_func,
    x0=initial_params,
    method="COBYLA",
    callback=callback,
    options={"maxiter": 20},
)
# Solution end

Look at the results

res = es.sample({"params": optimization_res.x.tolist()})

print(f"Optimized parameters: {optimization_res.x.tolist()}")

sorted_counts = sorted(res.parsed_counts, key=lambda pc: objective_portfolio_inequality(pc.state["w_array"],returns = returns, covariances = covariances, return_coeff= return_coeff))

for sampled in sorted_counts:
    w_sample = sampled.state["w_array"]
    print(f"solution={w_sample} probability={sampled.shots/NUM_SHOTS} "
          f"cost={objective_portfolio_inequality(w_array=w_sample,returns = returns, covariances = covariances, return_coeff= return_coeff)}")

Convergence graph

plt.plot(objective_values)
plt.xlabel("Iteration")
plt.ylabel("Objective Value")
plt.title("Optimization Progress")

Solution

from typing import List

import networkx as nx
from classiq import *
import math
import matplotlib.pyplot as plt
import numpy as np
from scipy.optimize import minimize
from classiq.execution import ExecutionSession, ExecutionPreferences

NUM_LAYERS = 3

num_slack = 3

returns = np.array([3, 4, -1])
# fmt: off
covariances = np.array(
    [
        [ 0.9,  0.5, -0.7],
        [ 0.5,  0.9, -0.2],
        [-0.7, -0.2,  0.9],
    ]
)
# fmt: on
total_budget = 6
specific_budgets = 3

return_coeff = 10.0

num_assets = 3

num_qubits_per_asset = 2

Penalty = 30.5

# start with integer variables

class PortfolioOptimizationVars(QStruct):
    a: QArray[QNum[num_qubits_per_asset], num_assets]
    slack: QNum[num_slack]

def expected_return_cost(returns: np.ndarray, w_array: PortfolioOptimizationVars) -> float:
    return sum(returns[i] * w_array.a[i] for i in range(len(returns)))

def risk_cost(covariances: np.ndarray, w_array: PortfolioOptimizationVars) -> float:
    risk_term =  sum(
        w_array.a[i] * sum(w_array.a[j] * covariances[i][j] for j in range(covariances.shape[0])) for i in range(covariances.shape[0])
    )
    return risk_term


def objective_portfolio_inequality(w_array: PortfolioOptimizationVars, returns: np.ndarray, covariances: np.ndarray, return_coeff: float) -> float:
    return risk_cost(covariances, w_array) - \
        return_coeff * expected_return_cost(returns, w_array) + \
        Penalty * (sum(w_array.a[i] for i in range(len(returns))) + w_array.slack - total_budget)**2


@qfunc
def mixer_layer(beta: CReal, qba: QArray[QBit]):
    apply_to_all(lambda q: RX(beta, q), qba)

@qfunc
def main(params: CArray[CReal, 2 * NUM_LAYERS], w_array: Output[PortfolioOptimizationVars]) -> None:
    # allocate(len(returns), w_array)
    allocate(num_qubits_per_asset * num_assets + num_slack, w_array)

    hadamard_transform(w_array)

    repeat(
        count=params.len / 2,
        iteration=lambda i: (
            phase(
                expr= objective_portfolio_inequality(w_array, returns, covariances, return_coeff),
                theta = params[2 * i]
            ),
            mixer_layer(params[2 * i + 1], w_array)
        )
    )

qmod = create_model(main)
qprog = synthesize(qmod)
show(qprog)


NUM_SHOTS = 1000

es = ExecutionSession(
    qprog, execution_preferences=ExecutionPreferences(num_shots=NUM_SHOTS)
)

def initial_qaoa_params(NUM_LAYERS) -> np.ndarray:
    initial_gammas = math.pi * np.linspace(0, 1, NUM_LAYERS)
    initial_betas = math.pi * np.linspace(1, 0, NUM_LAYERS)

    initial_params = []

    for i in range(NUM_LAYERS):
        initial_params.append(initial_gammas[i])
        initial_params.append(initial_betas[i])

    return np.array(initial_params)

initial_params = initial_qaoa_params(NUM_LAYERS)

# Record the steps of the optimization
intermediate_params = []
objective_values = []

# Define the callback function to store the intermediate steps
def callback(xk):
    intermediate_params.append(xk)

cost_func = lambda state: objective_portfolio_inequality(
    w_array = state["w_array"],
    returns = returns,
    covariances = covariances,
    return_coeff= return_coeff
)

def estimate_cost_func(params: np.ndarray) -> float:
    objective_value = es.estimate_cost(
        cost_func = cost_func,
        parameters = {"params": params.tolist()}
    )
    objective_values.append(objective_value)
    return objective_value


optimization_res = minimize(
    estimate_cost_func,
    x0=initial_params,
    method="COBYLA",
    callback=callback,
    options={"maxiter": 20},
)

res = es.sample({"params": optimization_res.x.tolist()})

print(f"Optimized parameters: {optimization_res.x.tolist()}")

sorted_counts = sorted(res.parsed_counts, key=lambda pc: objective_portfolio_inequality(pc.state["w_array"],returns = returns, covariances = covariances, return_coeff= return_coeff))

for sampled in sorted_counts:
    w = sampled.state["w_array"]
    print(f"solution={w} probability={sampled.shots/NUM_SHOTS} "
          f"cost={objective_portfolio_inequality(w_array=w,returns = returns, covariances = covariances, return_coeff= return_coeff)}")

plt.plot(objective_values)
plt.xlabel("Iteration")
plt.ylabel("Objective Value")
plt.title("Optimization Progress")