在线客服

RxJS与Redux结合使用(一):打造自己的redux-observable

adminadmin 报建百科 2024-04-25 128 14
RxJS与Redux结合使用(一):打造自己的redux-observable

背景

Redux 的核心理念是单向数据流,只能通过 dispatch(action) 的方式修改状态,使用react-redux可以在组件和redux之间形成下面这么一个数据流闭环:

view ->  action -> reducer -> state -> view

然而,在实际业务中往往有大量异步场景,最直接的做法是在React组件中发起异步请求,在拿到数据后调用dispatch(action)去数据层修改数据。不过这样的做法使得视图层和数据层耦合在一起,会造成后期维护的困难。

Redux作者建议用中间件来处理异步流,因为在中间件中我们可以灵活地控制 dispatch的时机,这对于处理异步场景非常有效。较为常见的做法主要有两种:

  1. 更改action的类型,如redux-thunk,用函数替换了action;
  2. 在middleware中接收到action的时候做出一些对应处理,如redux-saga。

而我们今天要讲的rxjs与redux的结合,采用了第二种方式来处理异步流程。

中间件干预处理

市面上已经存在这么个中间件了:redux-observable。而我们今天要做的就是带领大家,一步一步慢慢实现自己的一个redux-observable。

这个中间件的原理我可以简化为下面的代码:

export default store => next => action => {
    const result = next(action);
    if (action.type === 'ping') {
        store.dispatch({ type: 'pong' })
    }
    return result;
}

原理实在简单,在next(action)之后去根据action做判断,做一些异步逻辑,再发起dispatch修改数据即可,而redux-observable也只是在这个基础之上加入RxJs的一些特性。

处理异步逻辑的思路

如果你比较熟悉redux的话,就会知道,redux的中间件就是一个洋葱模型,上面我们也说了,我们会在中间件的后面根据你的action再去重新dispatch一些action,而Rxjs最核心的思想就是将数据都流化。所以你可以理解为action在中间件的末端流入一个管道,最后从管道又流出一些action,这些action最终会再次被store dispatch。

至于在这个管道中进行了什么样的变化、操作,那就是Rxjs的管辖范围,通过Rxjs的强大的操作符,我们可以非常优雅地实现异步逻辑。

所以,需要有个流来承载所有的action,这样你就可以通过这个action$来进行fetch:

action$.pipe(
    switchMap(
        () => fromPromise(fetch('/api/whatever')).pipe(
            map(res => action)
        )
    ),
    catchError(() => {})
)

这样就将异步逻辑嵌入到流当中。

创建Action流

我们的核心思想是action in, action out,所以最终流出的action是要重新被store.dispatch消费的,所以action$是一个Observable对象。

同时,在dispatch的时候,action经过中间件,action也是一个observer。

因此,action$既是观察者又是可观察对象,是一个Subject对象:

替换中间件的简单写法,变成:

import%20{%20Subject%20}%20from%20'rxjs/Subject';

export%20default%20(store)%20=>%20{

%20%20const%20action$%20=%20new%20Subject();
%20%20
%20%20action$.subscribe(store.dispatch);
%20%20
%20%20return%20next%20=>%20(action)%20=>%20{
%20%20%20%20const%20result%20=%20next(action);
%20%20%20%20action$.next(action);
%20%20%20%20return%20result;
%20%20};
};

在上面代码中我们在middleware中去放入action,然后通过订阅,store会触发dispatch。

但是,如果我们就这么写的话,这是个死循环,因为任何action在进入到action去。

聪明的你应该想到了,我们对于进入到action$的action还没有进行任何过滤,而这个过滤过程也正是我们需要的处理异步逻辑的地方。

下面我们要把这个步骤加上。

流的转化器Epic

为了达到action的一个转化处理,我们将这个过程抽离出来,这个中间处理的逻辑称为Epic,epic的形式大概可以写为:

const%20epic%20=%20(action$)%20=>%20{
%20%20%20%20return%20action$.pipe(
%20%20%20%20%20%20%20%20//%20因为所有的action都会过来
%20%20%20%20%20%20%20%20//%20所以我们只需要处理我们想要的aciton
%20%20%20%20%20%20%20%20filter(action%20=>%20action.type%20===%20'GET_USER'),
%20%20%20%20%20%20%20%20switchMap(
%20%20%20%20%20%20%20%20%20%20%20%20//%20将fetch也转化为流
%20%20%20%20%20%20%20%20%20%20%20%20()%20=>%20fromPromise(fetch('/api/user/get',%20{
%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20method:%20'POST',
%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20body:%20{
%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20id:%201
%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20},
%20%20%20%20%20%20%20%20%20%20%20%20})).pipe(
%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20map(res%20=>%20({%20type:%20'GET_USER_SUCCESS',%20payload:%20res%20})),
%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20catchError(error%20=>%20({%20type:%20'GET_USER_FAILED',%20payload:%20error%20}))
%20%20%20%20%20%20%20%20%20%20%20%20)
%20%20%20%20%20%20%20%20)
%20%20%20%20)
}

epic本质是一个函数,在这个函数中,我们在action$的基础上,加入了管道控制,产生了另外一个流,而这个流就是最终我们要的,对action进行了控制的action流,上面的fetch只是一个例子,在这个管道中,你可以处理任意的异步逻辑。

而我们要做的就是将这个Epic,整合进刚才的中间中。

做法也很简单,我们只需要将订阅从action$换到新的流上就可以了:

import%20{%20Subject%20}%20from%20'rxjs/Subject';

export%20default%20(store)%20=>%20{

%20%20const%20action$%20=%20new%20Subject();
%20%20const%20newAction$%20=%20epic(action$);
%20%20
%20%20newAction$.subscribe(store.dispatch);
%20%20
%20%20return%20next%20=>%20(action)%20=>%20{
%20%20%20%20const%20result%20=%20next(action);
%20%20%20%20action$.next(action);
%20%20%20%20return%20result;
%20%20};
};

这样,action$在接收到新的action的时候,会流经epic定义的管道,然后才出发dispatch

多个Epic合并

到此,我们的中间件已经有初步处理异步逻辑的能力,但是,在现实中,我们的异步逻辑不可能只有一个,所以epic是会有很多的,而store去订阅的流只能是一个,所以这么多的epic产生的流要合并成一个流。

合并流的操作,强大的RxJs自然是有安排的,相信你想到了操作符merge,我们可以提供一个combineEpics的函数:

export%20const%20combineEpics%20=%20(...epics)%20=>%20{
%20%20const%20merger%20=%20(...args)%20=>%20merge(
%20%20%20%20...epics.map((epic)%20=>%20{
%20%20%20%20%20%20const%20output$%20=%20epic(...args);
%20%20%20%20%20%20return%20output$;
%20%20%20%20})
%20%20);
%20%20return%20merger;
};

上面的代码不难理解,combineEpics整合了所有传入的epic,然后返回一个merger,这个merger是利用merge操作符,将所有的epic产生的流合并成一个流。

代码形式为:

const pingEpic = action$ => action$.pipe(
  filter(action => action.type === 'ping'),
  map(() => ({ type: 'pong' })),
);

const getUserEpic = action$ => action$.pipe(
  filter(action => action.type === 'GET_USER'),
  map(() => ({ type: 'GET_USER_SUCCESS', payload: { user: { name: 'kang' } } })),
);

const rootEpic = combineEpics(pingEpic, getUserEpic);

export default (store) => {
  const action$ = new Subject();
  const newAction$ = rootEpic(action$);

  newAction$.subscribe(store.dispatch);
  return next => (action) => {
    const result = next(action);
    action$.next(action);
    return result;
  };
};

state获取

在epic中我们不可避免地要借助state里面的数据进行不同的处理,所以我们是需要获取到state的,所以你可以在中间件中的epci执行函数中添加一个参数,将state获取函数暴露出去:

export default (store) => {
  ...
  const newAction$ = rootEpic(action$, store.getState);
  ...
};

这样epic里就可以用getState()获取state:

const pingEpic = (action$, getState) => action$.pipe(
  filter(action => action.type === 'ping'),
  map(() => ({ type: 'pong', payload: getState() })),
);

进一步优化:将state也流化

上面的做法是直接去获取state,这样的做法是主动获取,不符合函数响应式编程模式。函数响应式中,state的改变状态,应该是要能被观察的。

当state也能被响应观察,我们就可以做更多的功能,例如:当state的某些数据在发生变化的时候,我们要去进行实时保存。

在传统模式的做法中,你可以在中间件中这样写:

export default store => next => action => {
    const oldState = store.getState();
    const result = next(action);
    const newState = store.getState();
    // 类似这样的写法
    if (newState.xxx !== oldState.xxx) {
        fetch('/api/save', {
            method: 'POST',
            body: {
            
            }
        }).then(() => {}).catch(() => {})
    }
    return result;
}

这个处理逻辑要独立为一个中间件,而如果你将state也流化,你可以直接使用epic这样处理:

const saveEpic = (action$, state$) => state$.pipe(
const autoSaveEpic = (action$, state$) =>
  return action$.pipe(
    filter(action => action.type === 'AUTO_SAVE_ENABLE'), // 自动保存的开启
    exhaustMap(() => state$.pipe(
        pluck('xxx'), // 获取state.xxx
        distinctUntilChanged(), // 前后值不同时才将其发出。
        concatMap((value) => {
            // fetch to save
        }),
        // 自动保存的关闭
        takeUntil(action$.pipe(
            filter(action => action.type === 'AUTO_SAVE_DISABLE')
        ))
    ))
  )
)

如果仔细阅读这段代码,可以发现这样的方式可以使用非常优雅的方式控制这个自动保存,可以和action$结合使用,快速开关自动保存,可以利用RxJs的特性解决保存的异步执行延迟问题。

如果你只是单存想要获取最新state,可以使用withLatestFrom操作符:

const countEpic = (action$, state$) => action$.pipe(
  filter(action => action.type === 'count'),
  withLatestFrom(state$),
  switchMap(([action, state]) => {
    return of({ type: 'whatever' });
  })
);

在中间件加入state流:

export default (store) => {
  const action$ = new Subject();
  const state$ = new Subject();
  const source$ = rootEpic(action$, state$);

  source$.subscribe(store.dispatch);
  return next => (action) => {
    const result = next(action);
    state$.next(store.getState());
    action$.next(action);
    return result;
  };
};

注意state最新值不是默认state。

Action的顺序问题

如果你有耐心看到这里,那么说明你对于redux结合RxJs的使用已经理解得差不多了,但是这里还是有个问题,就是action的生效顺序,我们可以直接看个例子说明,假设有下面这样两个epic:

const epic1 = action$ => action$.pipe(
  filter(action => action.type === 'one'),
  mergeMap(() => of({ type: 'two' }, { type: 'three' })),
);

const epic2 = action$ => action$.pipe(
  filter(action => action.type === 'two'),
  mergeMap(() => of({ type: 'four' })),
);

store.dispatch({ type: 'one' }) 的时候,action的顺序为:

'one' -> 'two' -> 'four' -> 'three'

可见,action的执行顺序并不是如我们预期的那样,在two触发后就发出了four,这是因为RxJs默认的调度器是同步的,用一段简单的代码,上面的效果类似于:

class Print {
  constructor(name, nexts = []) {
    this.name = name;
    this.nexts = nexts;
  }
  print() {
    console.log(this.name);
    this.nexts.forEach((p) => {
      p.print();
    });
  }
}
const three = new Print('three');
const four = new Print('four');
const two = new Print('two', [four]);
const one = new Print('one', [two, three]);
one.print(); // one, two, four, three

换成上面的代码的话你就不陌生了吧,也会对于输出的结果表示肯定,但是我们需要的效果是

'one' -> 'two' -> 'three' -> 'four'

这如何做到?

明显,需要将调度器换成其他的,RxJs有这么几种调度器:null(同步)、asap、queue、async、animationFrame。最后一种是动画场景的调度器,直接剔除,默认是第一种,那么就剩下asap、queue、async。在这个场景下,这三种调度器都是可行的,但是queue在大量的数据的时候对于性能是有利的,所以这里可以使用它。不过,记住,这三种调度器是有区别的,大家有兴趣的自己去google一下,只提示:asap是Micro Task、async是Macro Task、queue在延迟为0的时候接近于同步,在延迟不为0的时候与async一样。

中间件中:

  const action$ = new Subject().pipe(
    observeOn(queue)
  );

这样得到的结果就是:

'one' -> 'two' -> 'three' -> 'four'

如果用简单的代码,相当于发生了这样的变化:

class Print {
  constructor(name, nexts = []) {
    this.name = name;
    this.nexts = nexts;
  }
  print() {
    console.log(this.name);
    this.nexts.forEach((p) => {
      setTimeout(() => p.print(), 0);
    });
  }
}
const three = new Print('three');
const four = new Print('four');
const two = new Print('two', [four]);
const one = new Print('one', [two, three]);
one.print(); // one, two, three, four

总结

本文就讲到这里,这次介绍了如何自己实现一个redux-observable,下次会讲redux-observable在实战中的一些应用,例如怎么类似dva那样进行模块化开发、如何统一处理loading、error等。

本网站是一个以CSS、JavaScript、Vue、HTML为核心的前端开发技术网站。我们致力于为广大前端开发者提供专业、全面、实用的前端开发知识和技术支持。 在本网站中,您可以学习到最新的前端开发技术,了解前端开发的最新趋势和最佳实践。我们提供丰富的教程和案例,让您可以快速掌握前端开发的核心技术和流程。 本网站还提供一系列实用的工具和插件,帮助您更加高效地进行前端开发工作。我们提供的工具和插件都经过精心设计和优化,可以帮助您节省时间和精力,提升开发效率。 除此之外,本网站还拥有一个活跃的社区,您可以在社区中与其他前端开发者交流技术、分享经验、解决问题。我们相信,社区的力量可以帮助您更好地成长和进步。 在本网站中,您可以找到您需要的一切前端开发资源,让您成为一名更加优秀的前端开发者。欢迎您加入我们的大家庭,一起探索前端开发的无限可能!
代办报建

本公司承接江浙沪报建代办施工许可证。
联系人:张经理,18321657689(微信同号)。

喜欢0发布评论

14条评论

  • 游客 发表于 5个月前

    楼上的说的很多!http://eslc.cqyiyou.net/test/329885417.html

  • 游客 发表于 5个月前

    楼主的文笔不错!http://j5mb7.nzrtg.com

  • 游客 发表于 5个月前

    今天的心情很不错啊https://sdceda.com/shi/105854784/

  • 游客 发表于 5个月前

    这么经典的话只有楼主能想到!http://test.cqyiyou.net/test/

  • 全盛棋牌069 发表于 4个月前

    楼主的帖子越来越有深度了!http://vcp.yushanshe.com

  • 游客 发表于 4个月前

    宇宙第一贴诞生了!http://www.guangcexing.net/voddetail/AjJGbfpTv.html

  • 游客 发表于 3个月前

    很有品味!http://www.guangcexing.net/voddetail/PTbJFCGF.html

  • 游客 发表于 3个月前

    楼上的别说的那么悲观好吧!http://www.guangcexing.net/voddetail/bnTHvpCjuYJx.html

发表评论

  • 昵称(必填)
  • 邮箱
  • 网址