查看原文
其他

@9.23 GOSIM Workshop 上海 | 可能是史上最强 Rust 机器人框架 Dora

张汉东 觉学社 2024-01-01

今年(2023)九月份将在上海举办首届的 GOSIM Workshop (9.23-24)[1] 和  GOSIM Conference (9.26)[2]  盛会。GOSIM 会场将支持「同声传译」,小伙伴们可以放心购票参与。还不了解 GOSIM 大会的朋友,可以参考文章结尾的详细介绍。

本次 GOSIM Workshop 机器人与自动驾驶分论坛请来了 Rust 开源下一代机器人框架的项目核心维护者 Xavier Tao 和 Philipp  来给大家做精彩分享。在参加 Workshop 之前,你应该对 Dora-rs 有一定了解。在本文中,你也将学到 Rust 如何安全高效和 Python 交互。

点击观看:机器人与自动驾驶的开源之旅 | GOSIM 数字纪事 ,了解关于汽车与自动驾驶,以及 Dora 的更多故事。


Dora 介绍

Dora-rs[3] 是一个全新的现代化机器人框架,它基于 Rust 实现,具有极高的性能。dora-rs 也可以用于深度学习应用,旨在方便地集成最先进的深度学习模型,并希望能够在框架内进行训练。

2022年DORA-RS开源项目被发布,它全称为Dataflow Oriented Robotics Architecture。它借鉴了ROS,CyberRT和ERDOS这些项目的优点,试图解决机器人和自动驾驶领域长久以来的一大难题:如何能把繁荣的基于Python的AI算法生态集成到机器人和自动驾驶的软件开发中,同时能够高效地处理数据流。
它使用Rust语言做数据流的传输和调度管理,大大减少了数据的重复拷贝和传输。它提供了Rust语言和Python语言之间的无缝集成,减少了跨语言的性能代价。借助Python丰富的算法模块,开发者可以通过YAML蓝图脚本轻松设计出适用于各类机器人和自动驾驶场景的数据流。DORA为您提供与,实现数据流的零拷贝和高效IPC传输,极大提高性能。使得开发者可以专注于应用开发,而无需过多担心性能问题。
同时与Carla自动驾驶仿真系统完美结合,开发者可以使用DORA-RS提供的基线算法,开发先进的自动驾驶应用,并进行仿真测试。不仅仅是仿真,DORA-RS同样支持真实的自动驾驶与控制器系统,无需更改代码,即可在仿真和现实环境中验证您的算法。


它的性能是ROS2 Python API 的 17 倍!是 ROS2 Rust API 的 10 倍!与 ROS2 C/Cpp API 共享内存快 0.06 ms。

Dora 的特点是:

  • 零成本开销:在共享内存上进行零拷贝传输消息!使用Arrow和自己的共享内存守护程序,在单台机器上实现光速通信。
  • 可扩展:专为机器和机器人扩展而设计!使用YAML描述来使软件具有声明性,以便在多台机器上进行分发。
  • 快速原型制作:可复用YAML数据流中的现有节点和运算符,这样用户就不需要费心复制粘贴样板代码。用户还可以使用Python进行实时调试!支持热重载。
  • 可观测:提供命令行界面和OpenTelemetry获取日志、跟踪和指标!
  • 跨平台支持:Dora在大多数平台和架构上都提供Python、Rust、C和C++版本!
  • 社区驱动:Dora 团队希望打造成一个由社区驱动的项目,并帮助其他人了解机器人技术。

dora 框架被划分为不同的组件:

  • 节点(Node):Dora节点是通过Dora库与其他节点进行通信的独立进程。节点可以是自定义的、用户指定的程序,也可以是Dora运行时节点,允许运行Dora操作符。节点实现了自己的 main 函数,因此对其执行具有完全控制权。
  • 算子(Operators:):算子是轻量级、协作的、基于库的组件,由dora运行时节点执行。它们必须根据所使用的语言实现特定的接口。算子可以使用dora运行时提供的各种高级功能,例如优先级调度或本地截止日期支持。
  • 协调器(Coordinator):协调器负责从YAML文件中读取数据流,验证数据流,并将节点和运算符部署到指定或自动确定的机器上。它监控运算符的健康状况,并实现高级集群管理功能。例如,我们可以为云节点实现自动扩展或运算符的复制和重启。协调器可以通过命令行程序(CLI)进行控制。
  • 通信层采用可插拔组件设计,目前支持 zenoh[4]

Dora 与 多语言接口

Dora 目前提供 Rust/Cpp/Python Api,但编写一个多语言库并不容易。dora 的核心贡献者 Xavier 近日分享了他在构建 wonnx[5] 和 dora-rs 过程中遇到的 FFI 相关的问题和解决方案,将以Rust-Python FFI通过 pyo3 为例进行说明,demo源码见 blogpost_ffi[6]。该主题也会在 GOSIM Workshop 上面讲。

wonnx 是纯 Rust 实现的 GPU 加速的 ONNX 推理运行时,适用于Web.

`pyo3`[7]  是最常用的Rust-Python绑定之一,可以为开发者创建FFI。我们所要做的就是用 #[pyfunction] 包装我们的函数,这样它就可以在Python中使用了。

use pyo3::prelude::*;

/// 两个数字之和必须被转为字符串
/// `#[pyfunction]` 指定这是 Python 中用的函数
#[pyfunction]
fn sum_as_string(a: usize, b: usize) -> PyResult<String> {
    Ok((a + b).to_string())
}

/// `#[pymodule]` 指定用 Rust 实现的 Python 函数
/// 该函数名应该与 `Cargo.toml`的 `lib.name` 名字设定相同
#[pymodule]
fn blogpost_ffi(_py: Python, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(sum_as_string, m)?)?;
    Ok(())
}

然后在 Python 中就可以这样调用:

maturin develop
python -c "import blogpost_ffi; print(blogpost_ffi.sum_as_string(1,1))"
# Return: "2" 

这里有个问题就是,自动解释的类型不是最优化实现。

实现1: Default

假如,我们想要使用数组进行操作,我们希望在Rust和Python之间接收一个数组输入并返回一个数组输出。默认的实现如下所示:

#[pyfunction]
fn create_list(a: Vec<&PyAny>) -> PyResult<Vec<&PyAny>> {
    Ok(a)
}

#[pymodule]
fn blogpost_ffi(_py: Python, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(sum_as_string, m)?)?;
    m.add_function(wrap_pyfunction!(create_list, m)?)?;
    Ok(())
}

对于 create_list  创建 value = [1] * 100_000_000这样的非常大的列表进行调用,返回时间将为2.27秒。参考 test_script.py[8]

这个速度有点慢...原因是这个列表将会在循环中逐个解释元素。我们可以尝试同时使用所有元素来提高效率。

实现2: PyBytes

假设我们的数组是一个可以表示为 PyBytes 的C连续数组。代码可以通过将输入和输出转换为 PyBytes 来进行优化:

#[pyfunction]
fn create_list_bytes<'a>(py: Python<'a>, a: &'a PyBytes) -> PyResult<&'a PyBytes> {
    let s = a.as_bytes();

    let output = PyBytes::new_with(py, s.len(), |bytes| {
        bytes.copy_from_slice(s);
        Ok(())
    })?;
    Ok(output)
}

对于相同的列表输入, create_list_bytes 返回结果需要78毫秒。这是比之前快30倍 🐎

加速是因为可以复制内存范围而不是遍历每个元素,并且可以在不复制的情况下进行读取。

现在的问题是:

  • PyBytes 只在Python中可用,这意味着如果我们计划使用其他语言,我们将不得不为每种语言复制这个。
  • PyBytes 可能还需要重新转换为其他有用的类型。
  • 需要创建一个副本。

解决这个问题的方法是使用  Apache Arrow[9]

实现3: Arrow

Apache Arrow是一种通用的内存格式,可在许多语言中使用。使用 Arrow ,跟上面相同功能的代码是这样:

#[pyfunction]
fn create_list_arrow(py: Python, a: &PyAny) -> PyResult<Py<PyAny>> {
    let arraydata = arrow::array::ArrayData::from_pyarrow(a).unwrap();

    let buffer = arraydata.buffers()[0].as_slice();
    let len = buffer.len();

    // Zero Copy Buffer reference counted
    let arc_s = Arc::new(buffer.to_vec());
    let ptr = NonNull::new(arc_s.as_ptr() as *mut _).unwrap();
    let raw_buffer = unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, arc_s) };
    let output = arrow::array::ArrayData::try_new(
        arrow::datatypes::DataType::UInt8,
        len,
        None,
        0,
        vec![raw_buffer],
        vec![],
    )
    .unwrap();

    output.to_pyarrow(py)
}

相同的列表在33毫秒内返回。这比🐎🐎好2倍。

这是由于在返回结果时进行了零拷贝。零拷贝是安全的,因为我们正在对数组进行引用计数。只有当所有引用都被移除后,数组才会被释放。

arrow 的好处是:

  • 为了实现零拷贝,在处理更大的数据时能够更好地扩展。
  • 可以在其他语言中重复使用。我们只需要将函数的最后一行替换为导出到其他语言的代码。
  • 有许多类型的描述,包括 List , Mapping 和 Struct 。
  • 可以直接在 numpy 、 pandas 和 pytorch 中进行零拷贝转换。

调试

处理高效的接口并不是连接多种语言的唯一挑战。我们还必须处理跨语言调试。

我们目前的实现使用 .unwrap() 。然而,如果出现错误,这将导致整个Python进程崩溃。

thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: PyErr { type: <class 'TypeError'>, value: TypeError('Expected instance of pyarrow.lib.Array, got builtins.int'), traceback: None }', src/lib.rs:45:62
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
  File "/home/peter/Documents/work/blogpost_ffi/test_script.py", line 79in <module>
    array = blogpost_ffi.create_list_arrow(1)
pyo3_runtime.PanicException: called `Result::unwrap()` on an `Err` value: PyErr { type: <class 'TypeError'>, value: TypeError('Expected instance of pyarrow.lib.Array, got builtins.int'), traceback: None }

解决方案:eyre[10]

Eyre是一个用于Rust应用程序的简单惯用错误处理库。我们可以通过在我们的 pyo3 项目中添加 pyo3/eyre 特性标志来使用eyre,将所有的 .unwrap() 替换为 .context("our context")? 。这将把不可恢复的错误转换为可恢复的Python错误,并提供有关我们错误的详细信息。

与上述相同的错误,但使用 eyre ,这将提供一个更好看的错误消息:

Could not convert arrow data

Caused by:
    TypeError: Expected instance of pyarrow.lib.Array, got builtins.int

Location:
    src/lib.rs:75:50

相关 Rust 实现代码:

#[pyfunction]
fn create_list_arrow_eyre(py: Python, a: &PyAny) -> Result<Py<PyAny>> {
    let arraydata =
        arrow::array::ArrayData::from_pyarrow(a).context("Could not convert arrow data")?;

    let buffer = arraydata.buffers()[0].as_slice();
    let len = buffer.len();

    // Zero Copy Buffer reference counted
    let arc_s = Arc::new(buffer.to_vec());
    let ptr = NonNull::new(arc_s.as_ptr() as *mut _).context("Could not create pointer")?;
    let raw_buffer = unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, arc_s) };
    let output = arrow::array::ArrayData::try_new(
        arrow::datatypes::DataType::UInt8,
        len,
        None,
        0,
        vec![raw_buffer],
        vec![],
    )
    .context("could not create arrow arraydata")?;

    output
        .to_pyarrow(py)
        .context("Could not convert to pyarrow")
}

我要提一下,当从Rust代码调用Python代码时,你可能会丢失Python的回溯错误。

我建议使用以下自定义的回溯方法来获得一个描述性的错误:

#[pyfunction]
fn call_func_eyre(py: Python, func: Py<PyAny>) -> Result<()> {    
    let _call_python = func.call0(py).context("function called failed")?;
    Ok(())
}

fn traceback(err: pyo3::PyErr) -> eyre::Report {
    let traceback = Python::with_gil(|py| err.traceback(py).and_then(|t| t.format().ok()));
    if let Some(traceback) = traceback {
        eyre::eyre!("{traceback}\n{err}")
    } else {
        eyre::eyre!("{err}")
    }
}

#[pyfunction]
fn call_func_eyre_traceback(py: Python, func: Py<PyAny>) -> Result<()> {
    let _call_python = func
        .call0(py)
        .map_err(traceback) // this will gives python traceback.
        .context("function called failed")?;
    Ok(())
}

没有自定义回溯的示例错误:

---Eyre no traceback---
eyre no traceback says: function called failed

Caused by:
    AssertionError: I have no idea what is wrong

Location:
    src/lib.rs:89:39
------

使用自定义的回溯信息来改进错误提示:

---Eyre traceback---
eyre traceback says: function called failed

Caused by:
    Traceback (most recent call last):
      File "/home/peter/Documents/work/blogpost_ffi/test_script.py", line 96in abc
        assert False, "I have no idea what is wrong"

    AssertionError: I have no idea what is wrong

Location:
    src/lib.rs:96:9
------

通过追踪,我们可以快速确定错误源。

内存管理

假设我们需要在循环中创建数组:

/// 内存无限增长
#[pyfunction]
fn unbounded_memory_growth(py: Python) -> Result<()> {
    for _ in 0..10 {
        let a: Vec<u8> = vec![040_000_000];
        let _ = PyBytes::new(py, &a);`
        
        std::thread::sleep(Duration::from_secs(1));
    }

    Ok(())

调用此函数将消耗440MB的内存。

发生的情况是, pyo3 内存模型会将所有的Python变量保存在内存中,直到GIL被释放。因此,如果我们在循环中创建变量,所有临时变量将会被保留,直到GIL被释放。

这是由于默认情况下 pyfunction 锁定了GIL。

通过理解基于GIL的内存模型,我们可以使用作用域GIL来实现预期的行为:

#[pyfunction]
fn bounded_memory_growth(py: Python) -> Result<()> {
    py.allow_threads(|| {
        for _ in 0..10 {
            Python::with_gil(|py| {
                let a: Vec<u8> = vec![040_000_000];
                let _bytes = PyBytes::new(py, &a);
            
                std::thread::sleep(Duration::from_secs(1));
            });
        }
    });

    // or
    
    for _ in 0..10 {
        let pool = unsafe { py.new_pool() };
        let py = pool.python();

        let a: Vec<u8> = vec![040_000_000];
        let _bytes = PyBytes::new(py, &a);

        std::thread::sleep(Duration::from_secs(1));
    }

    Ok(())
}

调用此函数将消耗80MB的内存。

更多信息可以在这里找到[11]Pyo3 0.21中可能的修复![12]

竞态条件

假设我们需要在不同的线程中处理数据:

/// Function GIL Lock
#[pyfunction]
fn gil_lock() {
    let start_time = Instant::now();
    std::thread::spawn(move || {
        Python::with_gil(|py| println!("This threaded print was printed after {:#?}", &start_time.elapsed()));
    });

    std::thread::sleep(Duration::from_secs(10));
}

线程是在10.0秒后打印的。

在使用Python与 pyo3 时,我们必须确保准确知道GIL何时被锁定或解锁,以避免竞态条件(Race condition)。

在上面的例子中,问题在于默认情况下 pyo3 会在主函数线程中锁定GIL,从而阻塞等待GIL的派生线程。

如果我们在主函数线程中使用GIL或者释放主函数线程中的GIL,都没有问题。

/// No gil lock
#[pyfunction]
fn gil_unlock() {
    let start_time = Instant::now();
    std::thread::spawn(move || {
        std::thread::sleep(Duration::from_secs(10));
    });

    Python::with_gil(|py| println!("1. This was printed after {:#?}", &start_time.elapsed()));

    // or

    let start_time = Instant::now();
    std::thread::spawn(move || {
        Python::with_gil(|py| println!("2. This was printed after {:#?}", &start_time.elapsed()));
    });
    Python::with_gil(|py| {
        py.allow_threads(|| {
            std::thread::sleep(Duration::from_secs(10));
        })
    });
}

“1” 在32微秒后被打印出来,而“2”在80微秒后被打印出来,因此没有竞争条件。

观测

正如我们所看到的,能够测量在接口过程中所花费的时间对于识别瓶颈非常有价值。但是像我们之前那样手动测量所花费的时间可能会很繁琐。

我们可以使用一个可观测库来为我们完成这个任务。Opentelemetry[13] 可以帮助我们构建一个分布式可观测系统,能够跨多种语言进行桥接。Opentelemetry可以用于追踪、度量和日志记录。

/// No gil lock
#[pyfunction]
fn global_tracing(py: Python, func: Py<PyAny>) {
    // global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
    global::set_text_map_propagator(TraceContextPropagator::new());

    // Connect to Jaeger Opentelemetry endpoint
    // Start a new endpoint with:
    // docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 jaegertracing/all-in-one:latest
    let _tracer = opentelemetry_jaeger::new_agent_pipeline()
        .with_endpoint("172.17.0.1:6831")
        .with_service_name("rust_ffi")
        .install_simple()
        .unwrap();

    let tracer = global::tracer("test");

    // Parent Trace, first trace
    let _ = tracer.in_span("parent_python_work", |cx| -> Result<()> { 
        std::thread::sleep(Duration::from_secs(1));
        
        let mut map = HashMap::new();
        global::get_text_map_propagator(|propagator| propagator.inject_context(&cx, &mut map));

        let output = func
            .call1(py, (map,))
            .map_err(traceback)
            .context("function called failed")?;
        let out_map: HashMap<StringString> = output.extract(py).unwrap();
        let out_context = global::get_text_map_propagator(|prop| prop.extract(&out_map));

        std::thread::sleep(Duration::from_secs(1));

        let _span = tracer.start_with_context("after_python_work", &out_context); // third trace

        Ok(())
    });
}

然后,在Python代码中:

def abc(cx):
    propagator = TraceContextTextMapPropagator()
    context = propagator.extract(carrier=cx)

    with tracing.tracer.start_as_current_span(
        name="Python_span", context=context
    ) as child_span:
        child_span.add_event("in Python!")
        output = {}
        tracing.propagator.inject(output)
        time.sleep(2)
    return output

我们将获得以下的追踪记录:

使用这个工具,我们可以测量在语言之间进行接口时所花费的时间,识别锁问题,并通过日志和指标的结合,降低多语言库的复杂性。

小结

所有上述优化已经在 dora-rs 中实现,它让您可以使用Rust、Python、C和C++构建快速简单的数据流。Dora 最近刚刚开设了一个Discord,您可以在那里随时提出任何问题:https://discord.gg/ucY3AMeu[14]

汽车与机器人 Rust Workshop  日程介绍

9.23  Dora 和 Theseus OS
  • 《ROS2与DORA之间的类型消息桥梁》,由 PHILIPP 教授分享,他是 Rust 社区知名博客《用 Rust 写操作系统》的作者。Dora[15] 是他参与开发的一个全新机器人框架,也是 Rust 实现的。Dora-ROS2 桥接器可以创建能够发送和接收 ROS2 类型信息的 Dora 节点。该桥接器不依赖于 ROS2 库或构建系统。相反,它直接通过 DDS 进行通信,并解析 ROS2 msg 文件中的类型信息。本讲座将重点介绍一些实现细节,包括 - 在编译时为 ROS2 类型自动生成 Rust 结构体 - 根据 ROS2 类型信息对消息进行动态序列化和反序列化。
  • 《DORA 中的透明零拷贝 IPC》,同样也是由 PHILIPP 教授分享。
  • 《Rust-Python FFI&多语言系统的调试》,由 Dora 工程师 Xavier 带来的分享。介绍如何克服 Python FFI 挑战,处理 GIL、python 版本链接、数据传递和并行;以及处理多语言系统中的数据、跟踪、度量、日志和错误。
  • 《Dora-drives:自动驾驶变得简单》,这是一个循序渐进的教程,初学者可以使用一个简单的入门套件,从零开始编写自己的自动驾驶汽车程序。Dora-drives 让自动驾驶学习更快更简单。是不是很有趣?
  • 《Thesus:安全可靠的 Rust 本机操作系统》,Thesus 是一个完全用 Rust 从零开始编写的新型操作系统,其目标是安全可靠。由 FutureWEI 的 Kevin 带来的分享,他将介绍 Theseus 由众多微小组件组成的独特操作系统结构,并探讨我们的关键内部语言设计原则如何让 Theseus 将资源管理等典型的操作系统职责转移到编译器中。

GOSIM 介绍

GOSIM(Gobal OpenSource Innovation Meetup) 全球开源创新汇[16] 是由 WasmEdge Runtime 创始人 Michael Yuan 和 CSDN 创始人兼董事长蒋涛共同发起的开源社区平台。

在这个数字时代,障碍不断被打破,视野持续拓展,GOSIM 为所有开源爱好者照亮了前行的道路。这不仅仅是一个聚会,更是一场变革。通过促进全球合作,多样化技术生态,并分享无与伦比的知识,GOSIM 不仅正在塑造开源的现在,更是锻造其未来。对于那些坚信开源力量,认为它有潜力重新定义技术界限的人,GOSIM 正在召唤您。加入这场运动,成为开源变革的一部分。

GOSIM 由以下三大支柱组成:

  • GOSIM 大会(Conference):每年举办一次的这场会议是开源领域各个领域思想的汇聚之地。无论您是策略师、架构师、研究者,还是仅仅是一名开源爱好者,GOSIM 会议都为您提供了一个深入探索开源技术趋势、策略、治理和最佳实践的机会。
  • GOSIM 工作坊(Workshop):这是理论与实践相结合的地方。作为一个年度盛会,GOSIM 工作坊全面致力于实践行动 —— 开源项目设计、代码开发、黑客马拉松、竞赛以及深入讨论。这是专为那些希望积极塑造开源未来的项目领导者、开发者和维护者量身定制的活动。
  • GOSIM 开源驿站(Fellowship):除了每年的聚会,GOSIM 的核心是其全年无休的奖学金项目。这是一个持续的计划,支持开源项目的开发,得到了赞助者和资助的大力支持。‍

小结

在这两天的 COSIM Workshop 中,你能通过亲自参与以及亲手开发来了解 Rust 语言在这些科技领域中如何发挥着它的作用。作为 Rust 爱好者,你还犹豫什么呢?需要购票的小伙伴可以去官网[17]购票,票价低至 99 元(学生票)。

除了 Workshop 之外,也别忘记关注  GOSIM Conference (9.26)[18]  盛会,也会有 Rust 相关的精彩议题。

参考资料

[1]

GOSIM Workshop (9.23-24): http://workshop2023.gosim.org/

[2]

GOSIM Conference (9.26): https://conference2023.gosim.org/

[3]

Dora-rs: https://dora.carsmos.ai/

[4]

zenoh: https://github.com/eclipse-zenoh/zenoh

[5]

wonnx: https://github.com/webonnx/wonnx

[6]

blogpost_ffi: https://github.com/haixuanTao/blogpost_ffi/tree/main

[7]

pyo3: https://github.com/PyO3/pyo3

[8]

1] * 100_000_000`这样的非常大的列表进行调用,返回时间将为2.27秒。参考 [test_script.py: https://github.com/haixuanTao/blogpost_ffi/blob/main/test_script.py

[9]

Apache Arrow: https://arrow.apache.org/

[10]

eyre: https://github.com/eyre-rs/eyre

[11]

更多信息可以在这里找到: https://pyo3.rs/main/memory.html#gil-bound-memory

[12]

Pyo3 0.21中可能的修复!: https://github.com/PyO3/pyo3/issues/3382

[13]

Opentelemetry: https://opentelemetry.io/

[14]

https://discord.gg/ucY3AMeu: https://discord.gg/ucY3AMeu

[15]

Dora: https://github.com/dora-rs/dora

[16]

GOSIM(Gobal OpenSource Innovation Meetup) 全球开源创新汇: https://www.gosim.org/

[17]

官网: https://workshop2023.gosim.org/

[18]

GOSIM Conference (9.26): https://conference2023.gosim.org/


继续滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存