-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'master' into trajectory_correction
- Loading branch information
Showing
4 changed files
with
275 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
import at | ||
import numpy as np | ||
import multiprocessing | ||
from pySC.lattice_properties.response_model import SCgetModelRM, SCgetModelDispersion | ||
from pySC.core.constants import SETTING_ADD, TRACK_ORB | ||
from pySC.core.beam import bpm_reading | ||
from pySC.utils.sc_tools import SCgetPinv | ||
import matplotlib.pyplot as plt | ||
from scipy.optimize import least_squares | ||
from pySC.utils import logging_tools | ||
LOGGER = logging_tools.get_logger(__name__) | ||
|
||
|
||
def calculate_jacobian(SC, C_model, dkick, used_cor_ind, bpm_indexes, quads_ind, dk, trackMode=TRACK_ORB, | ||
useIdealRing=True, skewness=False, order=1, method=SETTING_ADD, includeDispersion=False, rf_step=1E3, | ||
cav_ords=None, full_jacobian=True): | ||
pool = multiprocessing.Pool() | ||
quad_args = [(quad_index, SC, C_model, dkick, used_cor_ind, bpm_indexes, dk, trackMode, useIdealRing, | ||
skewness, order, method, includeDispersion, rf_step, cav_ords) for quad_index in quads_ind] | ||
# results = [] | ||
# for quad_arg in quad_args: | ||
# results.append(generating_quads_response_matrices(quad_arg)) | ||
results = pool.map(generating_quads_response_matrices, quad_args) | ||
pool.close() | ||
pool.join() | ||
|
||
results = [result / dk for result in results] | ||
if full_jacobian: # Construct the complete Jacobian matrix for the LOCO | ||
# assuming only linear scaling errors of BPMs and corrector magnets | ||
n_correctors = len(np.concatenate(used_cor_ind)) | ||
n_bpms = len(bpm_indexes) * 2 # in both planes | ||
j_cor = np.zeros((n_correctors,) + C_model.shape) | ||
for i in range(n_correctors): | ||
j_cor[i, :, i] = C_model[:, i] # a single column of response matrix corresponding to a corrector | ||
j_bpm = np.zeros((n_bpms,) + C_model.shape) | ||
for i in range(n_bpms): | ||
j_bpm[i, i, :] = C_model[i, :] # a single row of response matrix corresponding to a given plane of BPM | ||
return np.concatenate((results, j_cor, j_bpm), axis=0) | ||
return results | ||
|
||
|
||
def generating_quads_response_matrices(args): | ||
(quad_index, SC, C_model, correctors_kick, used_cor_indexes, used_bpm_indexes, dk, trackMode, useIdealRing, | ||
skewness, order, method, includeDispersion, rf_step, cav_ords) = args | ||
LOGGER.debug('generating response to quad of index', quad_index) | ||
if not includeDispersion: | ||
SC.set_magnet_setpoints(quad_index, dk, skewness, order, method) | ||
C_measured = SCgetModelRM(SC, used_bpm_indexes, used_cor_indexes, dkick=correctors_kick, | ||
useIdealRing=useIdealRing, | ||
trackMode=trackMode) | ||
SC.set_magnet_setpoints(quad_index, -dk, skewness, order, method) | ||
return C_measured - C_model | ||
|
||
dispersion_model = SCgetModelDispersion(SC, used_bpm_indexes, CAVords=cav_ords) | ||
SC.set_magnet_setpoints(quad_index, dk, skewness, order, method) | ||
C_measured = SCgetModelRM(SC, used_bpm_indexes, used_cor_indexes, dkick=correctors_kick, useIdealRing=useIdealRing, | ||
trackMode=trackMode) | ||
dispersion_meas = SCgetModelDispersion(SC, used_bpm_indexes, CAVords=cav_ords, rfStep=rf_step) | ||
SC.set_magnet_setpoints(quad_index, -dk, skewness, order, method) | ||
return np.hstack((C_measured - C_model, (dispersion_meas - dispersion_model).reshape(-1, 1))) | ||
|
||
|
||
def measure_closed_orbit_response_matrix(SC, bpm_ords, cm_ords, dkick=1e-5): | ||
LOGGER.info('Calculating Measure response matrix') | ||
n_turns = 1 | ||
n_bpms = len(bpm_ords) | ||
n_cms = len(cm_ords[0]) + len(cm_ords[1]) | ||
response_matrix = np.full((2 * n_bpms * n_turns, n_cms), np.nan) | ||
SC.INJ.trackMode = TRACK_ORB # TODO may modify SC (not a pure function)! | ||
|
||
closed_orbits0 = bpm_reading(SC, bpm_ords=bpm_ords)[0] | ||
cnt = 0 | ||
for n_dim in range(2): | ||
for cm_ord in cm_ords[n_dim]: | ||
SC.set_cm_setpoints(cm_ord, dkick, skewness=bool(n_dim), method=SETTING_ADD) | ||
closed_orbits1 = bpm_reading(SC, bpm_ords=bpm_ords)[0] | ||
SC.set_cm_setpoints(cm_ord, -dkick, skewness=bool(n_dim), method=SETTING_ADD) | ||
response_matrix[:, cnt] = np.ravel((closed_orbits1 - closed_orbits0) / dkick) | ||
cnt = cnt + 1 | ||
return response_matrix | ||
|
||
|
||
def loco_correction_lm(initial_guess0, orm_model, orm_measured, Jn, lengths, including_fit_parameters, bounds=(-np.inf, np.inf), weights=1, | ||
verbose=2): | ||
mask = _get_parameters_mask(including_fit_parameters, lengths) | ||
result = least_squares(lambda delta_params: objective(delta_params, orm_measured - orm_model, Jn[mask, :, :], weights), | ||
initial_guess0[mask], #bounds=bounds, | ||
method="lm", | ||
verbose=verbose) | ||
return result.x | ||
|
||
|
||
def loco_correction_ng(initial_guess0, orm_model, orm_measured, J, lengths, including_fit_parameters, s_cut, weights=1): | ||
initial_guess = initial_guess0.copy() | ||
mask = _get_parameters_mask(including_fit_parameters, lengths) | ||
residuals = objective(initial_guess[mask], orm_measured - orm_model, J[mask, :, :], weights) | ||
r = residuals.reshape(np.transpose(orm_model).shape) | ||
t2 = np.zeros([len(initial_guess[mask]), 1]) | ||
for i in range(len(initial_guess[mask])): | ||
t2[i] = np.sum(np.dot(np.dot(J[i].T, weights), r.T)) | ||
return get_inverse(J[mask, :, :], t2, s_cut, weights) | ||
|
||
|
||
def objective(masked_params, orm_residuals, masked_jacobian, weights): | ||
return np.dot(np.transpose(orm_residuals - np.einsum("ijk,i->jk", masked_jacobian, masked_params)), | ||
np.sqrt(weights)).ravel() | ||
|
||
|
||
def _get_parameters_mask(including_fit_parameters, lengths): | ||
len_quads, len_corr, len_bpm = lengths | ||
mask = np.zeros(len_quads + len_corr + len_bpm, dtype=bool) | ||
mask[:len_quads] = 'quads' in including_fit_parameters | ||
mask[len_quads:len_quads + len_corr] = 'cor' in including_fit_parameters | ||
mask[len_quads + len_corr:] = 'bpm' in including_fit_parameters | ||
return mask | ||
|
||
|
||
def set_correction(SC, r, elem_ind, individuals=True, skewness=False, order=1, method=SETTING_ADD, dipole_compensation=True): | ||
if individuals: | ||
SC.set_magnet_setpoints(elem_ind, -r, skewness, order, method, dipole_compensation=dipole_compensation) | ||
return SC | ||
|
||
for fam_num, quad_fam in enumerate(elem_ind): | ||
SC.set_magnet_setpoints(quad_fam, -r[fam_num], skewness, order, method, dipole_compensation=dipole_compensation) | ||
return SC | ||
|
||
|
||
def model_beta_beat(ring, twiss, elements_indexes, plot=False): | ||
_, _, twiss_error = at.get_optics(ring, elements_indexes) | ||
s_pos = twiss_error.s_pos | ||
bx = np.array(twiss_error.beta[:, 0] / twiss.beta[:, 0] - 1) | ||
by = np.array(twiss_error.beta[:, 1] / twiss.beta[:, 1] - 1) | ||
bx_rms = np.sqrt(np.mean(bx ** 2)) | ||
by_rms = np.sqrt(np.mean(by ** 2)) | ||
|
||
if plot: | ||
init_font = plt.rcParams["font.size"] | ||
plt.rcParams.update({'font.size': 14}) | ||
|
||
fig, ax = plt.subplots(nrows=2, sharex="all") | ||
betas = [bx, by] | ||
letters = ("x", "y") | ||
for i in range(2): | ||
ax[i].plot(s_pos, betas[i]) | ||
ax[i].set_xlabel("s_pos [m]") | ||
ax[i].set_ylabel(rf'$\Delta\beta_{letters[i]}$ / $\beta_{letters[i]}$') | ||
ax[i].ticklabel_format(axis='y', style='sci', scilimits=(0, 0)) | ||
ax[i].grid(True, which='both', linestyle=':', color='gray') | ||
|
||
fig.show() | ||
plt.rcParams.update({'font.size': init_font}) | ||
|
||
return bx_rms, by_rms | ||
|
||
|
||
def select_equally_spaced_elements(total_elements, num_elements): | ||
step = len(total_elements) // (num_elements - 1) | ||
return total_elements[::step] | ||
|
||
|
||
def get_inverse(jacobian, B, s_cut, weights, plot=False): | ||
n_resp_mats = len(jacobian) | ||
sum_corr = np.sum(jacobian, axis=2) # Sum over i and j for all planes | ||
matrix = np.dot(np.dot(sum_corr, weights), sum_corr.T) | ||
inv_matrix = SCgetPinv(matrix, num_removed=n_resp_mats - min(n_resp_mats, s_cut), plot=plot) | ||
results = np.ravel(np.dot(inv_matrix, B)) | ||
# e = np.ravel(np.dot(matrix, results)) - np.ravel(B) | ||
return results |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
import at | ||
import numpy as np | ||
import pytest | ||
from pathlib import Path | ||
from pySC.core.simulated_commissioning import SimulatedCommissioning | ||
from pySC.utils.sc_tools import SCgetOrds | ||
from pySC.utils import logging_tools | ||
from pySC.correction import loco | ||
from pySC.lattice_properties.response_model import SCgetModelRM, SCgetModelDispersion | ||
|
||
LOGGER = logging_tools.get_logger(__name__) | ||
INPUTS = Path(__file__).parent / "inputs" | ||
|
||
|
||
def test_loco_hmba(at_ring): | ||
np.random.seed(12345678) | ||
sc = SimulatedCommissioning(at_ring) | ||
sc.register_bpms(SCgetOrds(sc.RING, 'BPM'), Roll=0.0, CalError=1E-2 * np.ones(2), NoiseCO=np.array([1e-7, 1E-7])) | ||
sc.register_magnets(SCgetOrds(sc.RING, 'QF|QD'), CalErrorB=np.array([0, 1E-2])) # relative | ||
sc.register_magnets(SCgetOrds(sc.RING, 'CXY'), CalErrorA=np.array([1E-4, 0]), CalErrorB=np.array([1E-4, 0])) | ||
sc.register_magnets(SCgetOrds(sc.RING, 'BEND')) | ||
sc.register_magnets(SCgetOrds(sc.RING, 'SF|SD')) # [1/m] | ||
sc.register_cavities(SCgetOrds(sc.RING, 'RFC')) | ||
sc.apply_errors() | ||
cor_ords = SCgetOrds(sc.RING, 'CXY') | ||
|
||
used_correctors1 = loco.select_equally_spaced_elements(cor_ords, 10) | ||
used_correctors2 = loco.select_equally_spaced_elements(cor_ords, 10) | ||
used_cor_ords = [used_correctors1, used_correctors2] | ||
used_bpms_ords = loco.select_equally_spaced_elements(sc.ORD.BPM, len(sc.ORD.BPM)) | ||
cav_ords = SCgetOrds(sc.RING, 'RFC') | ||
quads_ords = [SCgetOrds(sc.RING, 'QF'), SCgetOrds(sc.RING, 'QD')] | ||
|
||
CMstep = np.array([1.e-4]) # correctors change [rad] | ||
dk = 1.e-4 # quads change | ||
RFstep = 1e3 | ||
|
||
_, _, twiss = at.get_optics(sc.IDEALRING, sc.ORD.BPM) | ||
orbit_response_matrix_model = SCgetModelRM(sc, used_bpms_ords, used_cor_ords, trackMode='ORB', useIdealRing=True, dkick=CMstep) | ||
model_dispersion = SCgetModelDispersion(sc, used_bpms_ords, cav_ords, trackMode='ORB', Z0=np.zeros(6), nTurns=1, | ||
rfStep=RFstep, useIdealRing=True) | ||
Jn = loco.calculate_jacobian(sc, orbit_response_matrix_model, CMstep, used_cor_ords, used_bpms_ords, np.concatenate(quads_ords), dk, | ||
trackMode='ORB', useIdealRing=False, skewness=False, order=1, method='add', | ||
includeDispersion=False, rf_step=RFstep, cav_ords=cav_ords) | ||
weights = np.eye(len(used_bpms_ords) * 2) | ||
n_singular_values = 20 | ||
|
||
_, _, twiss_err = at.get_optics(sc.RING, sc.ORD.BPM) | ||
bx_rms_err, by_rms_err = loco.model_beta_beat(sc.RING, twiss, sc.ORD.BPM, plot=False) | ||
info_tab = 14 * " " | ||
LOGGER.info("RMS Beta-beating before LOCO:\n" | ||
f"{info_tab}{bx_rms_err * 100:04.2f}% horizontal\n{info_tab}{by_rms_err * 100:04.2f}% vertical ") | ||
n_iter = 3 | ||
|
||
for x in range(n_iter): # optics correction using QF and QD | ||
LOGGER.info(f'LOCO iteration {x}') | ||
orbit_response_matrix_measured = loco.measure_closed_orbit_response_matrix(sc, used_bpms_ords, used_cor_ords, CMstep) | ||
n_quads, n_corrs, n_bpms = len(np.concatenate(quads_ords)), len(np.concatenate(used_cor_ords)), len(used_bpms_ords) * 2 | ||
bx_rms_err, by_rms_err = loco.model_beta_beat(sc.RING, twiss, sc.ORD.BPM, plot=False) | ||
total_length = n_bpms + n_corrs + n_quads | ||
lengths = [n_quads, n_corrs, n_bpms] | ||
including_fit_parameters = ['quads', 'cor', 'bpm'] | ||
initial_guess = np.zeros(total_length) | ||
|
||
initial_guess[:lengths[0]] = 1e-6 | ||
initial_guess[lengths[0]:lengths[0] + lengths[1]] = 1e-6 | ||
initial_guess[lengths[0] + lengths[1]:] = 1e-6 | ||
|
||
# method lm (least squares) | ||
fit_parameters = loco.loco_correction_lm(initial_guess, orbit_response_matrix_model, | ||
orbit_response_matrix_measured, Jn, lengths, | ||
including_fit_parameters, bounds=(-0.03, 0.03), weights=weights, verbose=2) | ||
|
||
# method ng | ||
# fit_parameters = loco.loco_correction_ng(initial_guess, orbit_response_matrix_model, | ||
# orbit_response_matrix_measured, Jn, lengths, | ||
# including_fit_parameters, n_singular_values, weights=weights) | ||
|
||
dg = fit_parameters[:lengths[0]] if len(fit_parameters) > n_quads else fit_parameters | ||
sc = loco.set_correction(sc, dg, np.concatenate(quads_ords)) | ||
bx_rms_cor, by_rms_cor = loco.model_beta_beat(sc.RING, twiss, sc.ORD.BPM, plot=False) | ||
LOGGER.info(f"RMS Beta-beating after {x + 1} LOCO iterations:\n" | ||
f"{info_tab}{bx_rms_cor * 100:04.2f}% horizontal\n{info_tab}{by_rms_cor * 100:04.2f}% vertical ") | ||
LOGGER.info(f"Correction reduction: \n" | ||
f" beta_x: {(1 - bx_rms_cor / bx_rms_err) * 100:.2f}%\n" | ||
f" beta_y: {(1 - by_rms_cor / by_rms_err) * 100:.2f}%\n") | ||
assert bx_rms_cor < 0.002 | ||
assert by_rms_cor < 0.002 | ||
|
||
|
||
@pytest.fixture | ||
def at_ring(): | ||
ring = at.load_mat(f'{INPUTS}/hmba.mat') | ||
bpm_indexes = at.get_refpts(ring, at.elements.Monitor) | ||
for bpm_index in reversed(bpm_indexes): | ||
corrector = at.elements.Corrector(f'CXY{bpm_index}', length=0, kick_angle=[0, 0], PolynomA=[0, 0], | ||
PolynomB=[0, 0]) | ||
ring.insert(bpm_index + 1, corrector) | ||
ring.enable_6d() | ||
at.set_cavity_phase(ring) | ||
at.set_rf_frequency(ring) | ||
|
||
ring.tapering(niter=3, quadrupole=True, sextupole=True) | ||
ring = at.Lattice(ring) | ||
return ring |