Propeller dynamics identification#

This tutorial outlines how to perform system identification of the propeller/fan dynamics using Telega.

Zubax Robotics maintains a publicly accessible Airscrew and fan parameters spreadsheet with the basic characteristics of some common propellers.

Prerequisites#

  • Telega-based motor controller.

  • Motor, unless it is part of an integrated drive.

  • DC power supply.

  • Propeller/fan test stand.

  • GNU/Linux or macOS.

  • Network interface adapter, such as Zubax Babel.

  • Yakut CLI tool installed on your PC.

Configure the motor controller#

The motor controller should be able to drive the motor in the ratiometric torque mode. No velocity control mode is needed; indeed, it would be difficult to tune it adequately without knowing the dynamic parameters of the load, which this tutorial is focused on.

This tutorial does not cover the initial configuration of a Telega-based motor controller; for that, refer to other tutorials, such as Yakut CLI tool.

Collect data using the driver script#

The following script will exercise the drive to make the dynamics observable and emit sampled data via stdout as TSV. Run it as specified in the instructions, piping the output into a file.

  1#!/usr/bin/env python
  2# Author: Pavel Kirienko <pavel.kirienko@zubax.com>
  3
  4"""
  5This script drives the motor in the ratiometric torque mode while varying the setpoint
  6to make the dynamic parameters of the load observable.
  7It needs PyCyphal to run.
  8
  9The motor controller shall be configured as follows:
 10
 11- The feedback topic publication rate should be high, at least 100 Hz.
 12  mns.pub_interval_min: 0.005
 13
 14- The sensor feed from the motor controller shall be timestamped.
 15  mns.local_timestamp: true
 16
 17The measurement feed is printed into stdout (should be redirected to a file).
 18Use Dyshlo offline dynamics identification tool to process the data.
 19
 20The script provides the following topics:
 21
 22- Readiness command publisher "readiness" of type zubax.service.Readiness.1.
 23
 24- Ratiometric setpoint publisher "sp_rat" of type zubax.primitive.real16.Vector31.1.
 25  This should normally be the ratiometric torque setpoint, although other ratiometric modes can be used as well.
 26  All elements are set to the same value.
 27
 28- Subscriber "fb_dyn" of type zubax.physics.dynamics.DoF3rdTs.1.
 29  This is the feedback from the motor controller.
 30
 31The setpoint range can be adjusted via register "setpoint_rat_lo_hi" of type real32[2].
 32
 33You can run this script either directly or via Yakut Orchestrator;
 34in the latter case, write up roughly the following configuration and execute it as a script:
 35
 36    #!/usr/bin/env -S yakut -v orc
 37    uavcan:
 38      pub.readiness.id: 10  # Remove if the readiness subject is not used in your application.
 39      pub.sp_rat.id:    24
 40      sub.fb_dyn.id:    110
 41    $=:
 42    - ./dynamics_id_driver.py
 43
 44Or simply pass the register values via environment variables;
 45omit the readiness publisher if the readiness subject is not used in your application:
 46
 47    UAVCAN__PUB__READINESS__ID=10 UAVCAN__PUB__SP_RAT__ID=24 UAVCAN__SUB__FB_DYN__ID=110 ./dynamics_id_driver.py
 48"""
 49
 50from __future__ import annotations
 51import os
 52import sys
 53import math
 54import asyncio
 55import logging
 56import numpy as np
 57import pycyphal.application  # pip install pycyphal;  export CYPHAL_PATH=~/public_regulated_data_types
 58from uavcan.node import GetInfo_1 as GetNodeInfo
 59from zubax.physics.dynamics import DoF3rdTs_1 as FeedbackDyn
 60from zubax.primitive.real16 import Vector31_1 as Real16_31
 61from zubax.service import Readiness_1 as Readiness
 62
 63_logger = logging.getLogger(__name__)
 64
 65_OUTPUT_COLUMNS = ["ts_local", "t", "force", "velocity", "acceleration"]
 66_COLUMN_SEP = "\t"
 67
 68
 69async def on_feedback(fb: FeedbackDyn, meta: pycyphal.transport.TransferFrom) -> None:
 70    print(
 71        meta.timestamp.monotonic,
 72        fb.timestamp.microsecond * 1e-6,
 73        fb.value.force,
 74        fb.value.kinematics.base.velocity,
 75        fb.value.kinematics.acceleration,
 76        sep=_COLUMN_SEP,
 77    )
 78
 79
 80async def publish_until(
 81    publisher_message_pairs: list[tuple[pycyphal.presentation.Publisher | None, object]],
 82    deadline: float,
 83    publication_period: float,
 84) -> None:
 85    get_time = asyncio.get_event_loop().time
 86    next_pub_at = get_time()
 87    while (now := get_time()) < deadline:
 88        for pub, msg in publisher_message_pairs:
 89            if pub is not None:
 90                pub.publish_soon(msg(now) if callable(msg) else msg)
 91        next_pub_at += publication_period
 92        if (sleep_duration := next_pub_at - get_time()) > 0:
 93            await asyncio.sleep(sleep_duration)
 94
 95
 96async def main() -> None:
 97    node = pycyphal.application.make_node(GetNodeInfo.Response(name="com.zubax.telega.dynamics_id_driver"))
 98    try:
 99        pub_readiness = node.make_publisher(Readiness, "readiness")
100    except pycyphal.application.PortNotConfiguredError:
101        pub_readiness = None
102    pub_sp_rat = node.make_publisher(Real16_31, "sp_rat")
103    sub_fb_dyn = node.make_subscriber(FeedbackDyn, "fb_dyn")
104    sp_rat_low, sp_rat_high = node.registry.setdefault("setpoint_rat_lo_hi", [0.1, 0.9]).floats
105    if not (sp_rat_low < sp_rat_high):
106        raise RuntimeError("Invalid torque range")
107    period = float(node.registry.setdefault("period", 2.0))
108    pattern_duration = float(node.registry.setdefault("pattern_duration", 200.0))
109    init_duration = float(node.registry.setdefault("init_duration", 5.0))
110    setpoint_period = float(node.registry.setdefault("setpoint_period", 0.005))
111    get_time = asyncio.get_event_loop().time
112    try:
113        node.start()
114        _logger.info("Initialization: let the motor run at high power briefly before the pattern starts")
115        await publish_until(
116            [
117                (pub_readiness, Readiness(Readiness.ENGAGED)),
118                (pub_sp_rat, Real16_31([sp_rat_high] * 31)),
119            ],
120            get_time() + init_duration,
121            setpoint_period,
122        )
123        _logger.info("Pattern commenced")
124        while (await sub_fb_dyn.receive_for(0)) is not None:
125            pass  # Flush the queue
126        print(*_OUTPUT_COLUMNS, sep=_COLUMN_SEP)
127        sub_fb_dyn.receive_in_background(on_feedback)
128
129        def make_sp_msg(now: float):
130            sp_val = np.interp(math.sin(now / (period / (math.pi * 2))), (-1, 1), (sp_rat_low, sp_rat_high))
131            return Real16_31([sp_val] * 31)
132
133        await publish_until(
134            [
135                (pub_readiness, Readiness(Readiness.ENGAGED)),
136                (pub_sp_rat, make_sp_msg),
137            ],
138            get_time() + pattern_duration,
139            setpoint_period,
140        )
141    finally:
142        if pub_readiness is not None:
143            pub_readiness.close()
144        pub_sp_rat.close()
145        sub_fb_dyn.close()
146        node.close()
147
148
149if __name__ == "__main__":
150    logging.basicConfig(
151        stream=sys.stderr,
152        format="%(asctime)s %(levelname)-5.5s %(name)s: %(message)s\n",
153        level=logging.DEBUG if os.environ.get("VERBOSE", "0").strip() != "0" else logging.INFO,
154    )
155    logging.getLogger("pycyphal").setLevel(logging.INFO)  # Do not print debug stuff, it is too verbose.
156    try:
157        asyncio.run(main())
158    except KeyboardInterrupt:
159        pass
160    except Exception as ex:
161        _logger.debug("Unhandled exception", exc_info=True)
162        print(f"Error: {str(ex) or repr(ex)}", file=sys.stderr)
163        sys.exit(1)

Perform offline regression#

The file recorded in the previous step will contain a large time series dataset that will need to be fed into the regressor script provided below. If the identification is performed correctly, the \(c_1\) value found by the regressor will be very small; a large \(c_1\) may indicate either a problem with the drive itself (high friction) or a controller tuning problem.

  1#!/usr/bin/env python3
  2# Pavel Kirienko <pavel.kirienko@zubax.com>
  3
  4"""
  5This script performs offline identification of the dynamic parameters of the mechanical load
  6attached to the motor based on the load's response to force/torque variation.
  7It needs to be fed a timestamped TSV stream via stdin containing columns named as follows
  8(extra columns are allowed and will be ignored):
  9
 10- t             [second]
 11- force         [newton] or [newton*meter]
 12- velocity      [meter/second] or [radian/second]
 13- acceleration  [meter/second^2] or [radian/second^2]
 14
 15The means of collecting such data lie outside the scope of this script.
 16Normally it would be performed with the help of some external utility that would exercise the drive
 17with a specified force/torque profile and record the response.
 18The profile is normally sinusoidal, but ultimately it is not critical.
 19
 20The identified dynamic parameters are printed as YAML, which are:
 21
 22- Load (rotational) mass m.
 23
 24- Linear friction coefficient c1, defined as, translational and rotational:
 25
 26    F = c1 * v
 27    Q = c1 * omega
 28
 29- Quadratic drag coefficient c2, defined as, translational and rotational:
 30
 31    F = c2 * v^2
 32    Q = c2 * omega^2
 33
 34The aerodynamic drag can be trivially deduced from c2 if the fluid density and the
 35cross-sectional area of the load are known.
 36The standard aerodynamic drag equation is:
 37
 38    F = v^2 * (0.5 * rho * A * C_drag)
 39
 40Where rho is the fluid density, A is the cross-sectional area, and C_drag is the drag coefficient.
 41Substitute:
 42
 43    c2 = 0.5 * rho * A * C_drag
 44    C_drag = c2 / (0.5 * rho * A)
 45
 46The equivalent cross-sectional area may not be known,
 47in which case it may be convenient to use the product of A and the drag coefficient,
 48or assume A = 1 (which is common):
 49
 50    A * C_drag = c2 / (0.5 * rho)
 51
 52The above equations are valid for translational motion.
 53In the case of a fan or propeller, the equation of interest is that of the torque Q:
 54
 55    Q = C_torque * rho * D^5 * n^2
 56
 57Where C_torque is the rotational counterpart of C_drag called the propeller's torque coefficient,
 58n is the angular velocity in hertz, and D is the propeller/fan diameter.
 59The choice of hertz for the angular velocity in the torque equation is in line with
 60the commonly accepted conventions in the propeller theory,
 61but this is inconsistent with the normal use of radian per second for angular velocity elsewhere.
 62Hence, here is the same torque formula with the angular velocity omega expressed in radian per second:
 63
 64    Q = C_torque * rho * D^5 * (omega / (2 * pi))^2
 65
 66    Q = C_torque * rho * D^5 * (1 / (4 * pi^2)) * omega^2
 67       |_______________________________________|
 68                         c2
 69
 70Express C_torque in terms of c2:
 71
 72    c2 = C_torque * rho * D^5 * (1 / (4 * pi^2))
 73    C_torque = 4 * pi^2 * c2 / (rho * D^5)
 74
 75References:
 76
 77- 16.Unified: Thermodynamics and Propulsion, Prof. Z. S. Spakovszky
 78  https://web.mit.edu/16.unified/www/FALL/thermodynamics/notes/node86.html
 79
 80- 2.016 Hydrodynamics, Prof. A.H. Techet
 81  http://web.mit.edu/2.016/www/handouts/2005Reading10.pdf
 82"""
 83
 84import os
 85import sys
 86import math
 87import logging
 88import argparse
 89import numpy as np
 90import pandas
 91import pandas as pd
 92import scipy.optimize
 93
 94
 95_logger = logging.getLogger(__name__.replace("__", ""))
 96
 97
 98def _predict_acceleration(velocity, force, mass, *, c1, c2):
 99    drag = -(np.copysign(velocity**2, velocity) * c2 + velocity * c1)
100    acc = (force + drag) / mass
101    return acc
102
103
104def _compute_loss(df: pd.DataFrame, mass, *, c1, c2) -> float:
105    prediction = _predict_acceleration(
106        df.velocity,
107        df.force,
108        mass,
109        c1=c1,
110        c2=c2,
111    )
112    divergence = df.acceleration - prediction
113    return float(np.sum(divergence**2))
114
115
116def main() -> None:
117    arp = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawTextHelpFormatter)
118    arp.add_argument("--no-plot", action="store_true", help="Do not plot the results; removes the dependency on plotly")
119    arp.add_argument(
120        "--mass-min", type=float, default=1e-9, help="Min (rotational) mass in [kg] or [kg*m^2]; default=%(default)s"
121    )
122    arp.add_argument(
123        "--mass-max", type=float, default=1e3, help="Max (rotational) mass in [kg] or [kg*m^2]; default=%(default)s"
124    )
125    arp.add_argument(
126        "--c1-max", type=float, default=0, help="Max c1 coeff in [N*s/m] or [N*m*s/rad]; default=%(default)s"
127    )
128    arp.add_argument(
129        "--c2-max", type=float, default=1e2, help="Max c2 coeff in [N*s^2/m^2] or [N*m*s^2/rad^2]; default=%(default)s"
130    )
131    args = arp.parse_args()
132
133    df = pd.read_table(sys.stdin)
134    _logger.info("Loaded %d samples", len(df))
135    max_valid_index = min(df.apply(lambda series: series.last_valid_index()))
136    min_valid_index = 0
137    for idx in range(max_valid_index, -1, -1):
138        if not np.alltrue(np.isfinite(df.iloc[idx])):
139            break
140        min_valid_index = idx
141    # Cut off the beginning of the data to minimize boundary effects.
142    min_valid_index += int(0.1 * (max_valid_index - min_valid_index))
143    df = df.iloc[min_valid_index : max_valid_index + 1]
144    _logger.info(
145        "%d samples in %d:%d after normalization:\n%s\n%s\n",
146        len(df),
147        min_valid_index,
148        max_valid_index,
149        df,
150        df.describe(),
151    )
152
153    def fun(x):
154        loss = _compute_loss(df, mass=x[0], c1=x[1], c2=x[2])
155        if not math.isfinite(loss):
156            raise RuntimeError(f"Numerical failure at {x=}")
157        return loss
158
159    mass_bounds = (float(args.mass_min), float(args.mass_max))
160    c1_bounds = (0.0, float(args.c1_max))
161    c2_bounds = (0.0, float(args.c2_max))
162    arg_initial = np.array([1.0, 0.0, 0.0])
163    # https://stackoverflow.com/questions/34663539
164    res = scipy.optimize.minimize(
165        fun,
166        arg_initial,
167        bounds=(mass_bounds, c1_bounds, c2_bounds),
168        options=dict(  # https://stackoverflow.com/questions/34663539
169            eps=1e-9,
170            ftol=1e-15,  # Tolerance override is quite necessary.
171            gtol=1e-15,
172        ),
173    )
174    _logger.info("Optimization result:\n%s\n", res)
175    est_mass, est_c1, est_c2 = res.x
176    print(f"mass:{est_mass:12.6}  # [kg]        | [kg*m^2]")
177    print(f"c1:  {est_c1  :12.6}  # [N*s/m]     | [N*m*s/rad]")
178    print(f"c2:  {est_c2  :12.6}  # [N*s^2/m^2] | [N*m*s^2/rad^2]")
179    _logger.info(
180        "FYI, estimated propeller torque coefficient for D=1[meter] in standard Earth atmosphere: %s",
181        4 * math.pi**2 * est_c2 / 1.225,
182    )
183
184    if not args.no_plot:
185        _plot(df, est_mass=est_mass, est_c1=est_c1, est_c2=est_c2)
186
187
188def _plot(df: pandas.DataFrame, *, est_mass: float, est_c1: float, est_c2: float) -> None:
189    try:
190        import plotly.subplots
191        import plotly.graph_objects
192    except ImportError:
193        _logger.warning("Skipping plot due to missing plotly dependency")
194        return
195    specs = [
196        [{"secondary_y": True}],
197    ]
198    fig = plotly.subplots.make_subplots(rows=len(specs), cols=len(specs[0]), shared_xaxes=True, specs=specs)
199    fig.add_trace(
200        plotly.graph_objects.Scatter(
201            x=df.t,
202            y=df.force,
203            mode="lines",
204            name="F",
205        ),
206        secondary_y=True,
207    )
208    fig.add_trace(
209        plotly.graph_objects.Scatter(
210            x=df.t,
211            y=df.acceleration,
212            mode="markers",
213            name="a",
214        ),
215    )
216    fig.add_trace(
217        plotly.graph_objects.Scatter(
218            x=df.t,
219            y=_predict_acceleration(
220                df.velocity,
221                df.force,
222                est_mass,
223                c2=est_c2,
224                c1=est_c1,
225            ),
226            mode="lines",
227            name="a'",
228        ),
229    )
230    fig.update_layout(height=800, hovermode="x")
231    try:
232        fig.show()
233    except Exception as e:
234        _logger.exception("Failed to show plot: %s", e)
235
236
237if __name__ == "__main__":
238    logging.basicConfig(
239        stream=sys.stderr,
240        format="%(asctime)s %(levelname)-5.5s %(name)s: %(message)s",
241        level=logging.DEBUG if os.environ.get("VERBOSE", "0").strip() != "0" else logging.INFO,
242    )
243    try:
244        main()
245    except KeyboardInterrupt:
246        pass
247    except Exception as ex:
248        _logger.exception("Error: %s", ex, exc_info=True)
249        sys.exit(1)

The following plot visualizes the collected data points and the fitted parameters:

../_images/dynamics_id_regression.png

A real-time regressor is available for applications where runtime dynamics identification is required. Please reach out to Zubax Robotics for additional information.

Application notes#

The parameters found by this procedure can be used with model-based controllers, such as the incremental nonlinear dynamics inversion velocity controller.

The parameters can also be used for sizing the drivetrain for a given application. As the propeller/fan torque is a function of its torque coefficient and its angular velocity, one can predict the torque at the shaft and the angular velocity for the given lift; next, knowing the motor parameters, the motor controller, DC link, and battery requirements can be deduced. For more information on this, refer to the Drive power transfer equation.

The torque and thrust coefficients are strongly dependent on the airspeed. For the dynamics identification to be valid for forward flight, the test should be performed in a wind tunnel.