打雜的我,又閑下來有時間繼續(xù)看代碼了蹭睡。
人狠話不多衍菱,上代碼。我們先看actor肩豁。
actor.rs 前面部分引用的東西
use crate::addr::ActorEvent;
use crate::runtime::spawn;
use crate::{Addr, Context};
use futures::channel::mpsc::UnboundedReceiver;
use futures::channel::oneshot;
use futures::lock::Mutex;
use futures::{FutureExt, StreamExt};
use std::sync::Arc;
use anyhow::Result;
這個時候看到第一個addr:ActorEvent.
type ExecFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
pub(crate) type ExecFn<A> =
Box<dyn FnOnce(Arc<Mutex<A>>, Arc<Context<A>>) -> ExecFuture + Send + 'static>;
pub(crate) enum ActorEvent<A> {
Exec(ExecFn<A>),
Stop(Option<Error>),
}
我們看到
ExecFuture 是定義了一個Type脊串, 把Future 放到一個Box里面 ,再放到Pin中清钥。
ExecFn 是定義了一個 執(zhí)行函數的類型
ActorEvent 是枚舉 Actor的事件有哪些琼锋。
接著還有use crate::{Addr, Context};
Actor模型定義來說,Addr對應的就是mailbox.
我們看下定義
pub struct Addr<A> {
pub(crate) actor_id: u64,
pub(crate) tx: mpsc::UnboundedSender<ActorEvent<A>>,
pub(crate) rx_exit: Option<Shared<oneshot::Receiver<()>>>,
}
核心的東西來了 祟昭,他來了缕坎。
actor_id 是定義了id,是u64
tx 是發(fā)送通道, , 用的是std的 mpsc,發(fā)送的內容是Actor的事件篡悟。
rx_exit 是定義接收通道谜叹,用的是oneshot::Receiver.
接著我們就轉到先看addr.rs
//這些代碼主要是完成Addr的Clone,PartialEq ,Hash
impl<A> Clone for Addr<A> {
fn clone(&self) -> Self {
Self {
actor_id: self.actor_id,
tx: self.tx.clone(),
rx_exit: self.rx_exit.clone(),
}
}
}
//
impl<A> PartialEq for Addr<A> {
fn eq(&self, other: &Self) -> bool {
self.actor_id == other.actor_id
}
}
impl<A> Hash for Addr<A> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.actor_id.hash(state)
}
}
完成對Addr的主要接口實現
impl<A: Actor> Addr<A> {
/// Returns the id of the actor.
//返回actor的ID
pub fn actor_id(&self) -> u64 {
self.actor_id
}
/// Stop the actor. 停止actor.通過發(fā)送事件恰力,進行停止
pub fn stop(&mut self, err: Option<Error>) -> Result<()> {
self.tx.start_send(ActorEvent::Stop(err))?;
Ok(())
}
/// Send a message `msg` to the actor and wait for the return value.
/// 發(fā)送一個消息給actor 并且等待返回值
pub async fn call<T: Message>(&mut self, msg: T) -> Result<T::Result>
where
A: Handler<T>,
{
let (tx, rx) = oneshot::channel();
self.tx
.start_send(ActorEvent::Exec(Box::new(move |actor, ctx| {
Box::pin(async move {
let mut actor = actor.lock().await;
let res = Handler::handle(&mut *actor, &ctx, msg).await;
let _ = tx.send(res);
})
})))?;
Ok(rx.await?)
}
/// Send a message `msg` to the actor without waiting for the return value.
// 發(fā)送一個消息給actor 不等待返回值
pub fn send<T: Message<Result = ()>>(&mut self, msg: T) -> Result<()>
where
A: Handler<T>,
{
self.tx
.start_send(ActorEvent::Exec(Box::new(move |actor, ctx| {
Box::pin(async move {
let mut actor = actor.lock().await;
Handler::handle(&mut *actor, &ctx, msg).await;
})
})))?;
Ok(())
}
/// Create a `Caller<T>` for a specific message type
/// 為特定的消息類型創(chuàng)建一個“Caller”
pub fn caller<T: Message>(&self) -> Caller<T>
where
A: Handler<T>,
{
let addr = self.clone();
Caller(Box::new(move |msg| {
let mut addr = addr.clone();
Box::pin(async move { addr.call(msg).await })
}))
}
/// Create a `Sender<T>` for a specific message type
/// 為特定的消息類型創(chuàng)建一個“Sender”
pub fn sender<T: Message<Result = ()>>(&self) -> Sender<T>
where
A: Handler<T>,
{
let addr = self.clone();
Sender(Box::new(move |msg| {
let mut addr = addr.clone();
addr.send(msg)
}))
}
/// Wait for an actor to finish, and if the actor has finished, the function returns immediately.
///等待actor 完成叉谜,如果actor 已經完成旗吁,函數立即返回踩萎。
pub async fn wait_for_stop(self) {
if let Some(rx_exit) = self.rx_exit {
rx_exit.await.ok();
} else {
futures::future::pending::<()>().await;
}
}
}
首先我們先看第一個
pub async fn call<T: Message>(&mut self, msg: T) -> Result<T::Result>
where
A: Handler<T>,
代碼中我們又看到使用了Message,Handler的類型很钓。
繼續(xù)翻找香府;
/// Represents a message that can be handled by the actor.
///表示可以由 actor 處理的消息董栽。
pub trait Message: 'static + Send {
/// The return value type of the message
/// This type can be set to () if the message does not return a value, or if it is a notification message
type Result: 'static + Send;
}
/// Describes how to handle messages of a specific type.描述如何處理特定類型的消息。
/// Implementing Handler is a general way to handle incoming messages.實現Handler是處理傳入消息的一種常用方法企孩。
/// The type T is a message which can be handled by the actor.類型T是可以由actor處理的消息锭碳。
#[async_trait::async_trait]
pub trait Handler<T: Message>: Actor {
/// Method is called for every message received by this Actor.
async fn handle(&mut self, ctx: &Context<Self>, msg: T) -> T::Result;
}
然后我們繼續(xù)看下一個。
pub fn caller<T: Message>(&self) -> Caller<T>
where
A: Handler<T>,
有一個新的類型 Caller
use crate::{Message, Result};
use std::future::Future;
use std::pin::Pin;
pub(crate) type CallerFn<T> = Box<
dyn Fn(T) -> Pin<Box<dyn Future<Output = Result<<T as Message>::Result>> + Send + 'static>>
+ 'static,
>;
pub(crate) type SenderFn<T> = Box<dyn Fn(T) -> Result<()> + 'static + Send>;
/// Caller of a specific message type
pub struct Caller<T: Message>(pub(crate) CallerFn<T>);
impl<T: Message> Caller<T> {
pub async fn call(&mut self, msg: T) -> Result<T::Result> {
self.0(msg).await
}
}
/// Sender of a specific message type
pub struct Sender<T: Message>(pub(crate) SenderFn<T>);
impl<T: Message<Result = ()>> Sender<T> {
pub fn send(&mut self, msg: T) -> Result<()> {
self.0(msg)
}
}
其中我們三個文件都看到了一個引用
use crate:: Result;
我們在lib.rs 中看到定義
/// Alias of anyhow::Result
pub type Result<T> = anyhow::Result<T>;
/// Alias of anyhow::Error
pub type Error = anyhow::Error;
看到actor的Result 和Error都是使用 anyhow勿璃。
從代碼上我們知道擒抛,actor 的 addr 具有的屬性和函數。