Страницы

четверг, 11 мая 2017 г.

Rx in Unity with favor to synchronous code

Let's look at what rx is. Someone says it's about thinking of asynchronous operations as data flows and of rx operators as flow combining operations - join, merge, concat, transform (select/selectMany), etc. I prefer thinking that way. And yes, rx is somewhat about functional programming, it can be combined with LINQ.


Why do you think, I specified in the title "the synchronous way"? I specified it, because below I'll give simple tools and ideas to make using rx easier. Rx as any approach in programming has it's uses. And before giving the tools I want to say, there is no need to write everything in asynchrounous way (it's possible and it's common passion of rx newcomers, at least among my colegees). Writing in synchronous way has it's benefits - simple stacktraces, easier debugging, most of the people nowadays are good at writing synchronous code and know not much about asynchronous (if they don't write it every day, of course). I don't want to say, that rx is bad. All has it's uses. Rx is good at things with network, file, database slow operations, events, timing, coroutines, maybe tweening...). Rx is a tool that gives us clean to read, generic code, that you can compound together.

I don't repeat the basics. For it you may read Groking Rx (or in russian) and surfing documentation (there are some good links and books too) or many others.

I just want to fill small gap of rx usecases when things come to using rx in our old-style and simple synchrounous code.

First of all, you must understand BehaviourSubject. It allways has the value, so can be thought as a regular variable for which you can subscribe to it's updates. It also has interface to set value - method OnNext(newValue).

Now, moving to usecases.

Rx usecases

Get, set, subscribe for internal use

0. First things first: when you need to get, set and subscribe to changes and all inside class (nothing exposed to client code), you can simply use a BehaviourSubject:


public readonly BehaviourSubject<int> countSubject = new BehaviourSubject<int>(0); // default value is mandatory, this is for sync code too after all

Print(countSubject.Value);
countSubject.Subscribe(Print);
countSubject.OnNext(1);
countSubject.OnNext(2);

Output:
0 // Print
0 // received at subscribe
1
2

Note: you receive current value of BehaviourSubject immediately on subscribe. To not get current value and wait for next one:

countSubject.Skip(1).Subscribe(Print)

Get, set, subscribe exposed

1. Need to get, set and subscribe to changes. FIeld is public, so we must deny calling methods which end sequence, so any subsequent call will crash. Which is not we want. We just create a wrapper which has methods only to get, set and subscribe:


public readonly RxProperty<int> Count = new RxProperty<int>(0);

Count.Subscribe(Print);
Count.Value = 1;
Count.Value = 2;

Output:
0
1
2

As you may noticed it's a replacement for C#'s event pattern.

Get, subscribe exposed

2. Same as (1), but value changing is allowed only from enclosing class


public class EnclosingClass
{
  public readonly ReadOnlyRxProperty<int> Count;
  private readonly BehaviourSubject<int> count;

  public EnclosingClass()
  {
    count = new BehaviourSubject<int>(0);
    Count = new ReadOnlyRxProperty<int>(count);
  }

  public void UpdateCount()
  {
    count.OnNext(1);
    count.OnNext(2);
  }

}

EnclosingClass o = new EnclosingClass();
o.Count.Subscribe(Print);
o.UpdateCount();

Output:
0
1
2

Long running operation

3. Say, we need to perform some calculations. In synchronous code we write a method when need something like that. Nothing changes in asynchrounous code:


public IObservable<float> ObserveEquationSolution(int iterationsCount)
{
  return Observable.FromCoroutine(() => SolveRoutine(iterationsCount));
}

private IEnumetator<float> SolveRoutine(int iterationsCount) // we can use any other mechanism to perform asynchrounos operation, coroutine here is just an example
{
  yield return new WaitForSeconds(1f);
  yield return 2f;

}

// By the way, we can use Observable.Start(...) to write operations, that must be executed in background thread

Observables in rx are cold, so need to Subscribe to run calculation:

ObserveEquationSolution(100)
  .Subscribe(result => Print(result))

Output:

2

Exposed get while value change is processed with some long running operation

4. We need to make value, which change is processed and only then, change is published. Will simply reuse (3) and (2) together:


public class EnclosingClass
{
  public readonly ReadOnlyRxProperty<int> Count;
  private readonly BehaviourSubject<int> count;

  public EnclosingClass()
  {
    count = new BehaviourSubject<int>(0);
    Count = new ReadOnlyRxProperty<int>(count);
  }

  public IObservable<float> ObserveEquationSolution(int iterationsCount)
  {
    return Observable.FromCoroutine(() => SolveRoutine(iterationsCount)).Do(result => count.OnNext(result));
  }

  private IEnumetator<float> SolveRoutine(int iterationsCount) 
  {
    yield return new WaitForSeconds(1f);
    yield return 2f * iterationsCount;
  }
}

EnclosingClass o = new EnclosingClass();
o.Count.Subscribe(val => Print("printing from property subscribe " + val));
o.ObserveEquationSolution(200).Subscribe(val => Print("printing from method subscribe " + val));

Output:
printing from property subscribe 0
printing from property subscribe 400
printing from method subscribe 400

Note: Notice, that ObserveEquationSolution returns Observable, that emits only on that calculation end. One may think, that:


Count.Skip(1).Subscribe(PrintResult);
ObserveEquationSolution(200).Subscribe();

May be enough. But what do you receive in PrintResult it that case:


ObserveEquationSolution(300).Subscribe();
// later, some time passed:
Count.Skip(1).Subscribe(PrintResult);
ObserveEquationSolution(200).Subscribe();

What do you receive? 400 or 600? The result depends on calculation time of ObserveEquationSolution() which is bad. So, when you need result, you just return it from method. Simply, as usual. Not int, but IObservable<int> because it's asynchronous calculation after all :)

By the way, IObservable can be used in coroutines too:


IEnumerator Some()
{
  // do some
  yield return ObserveEquationSolution(200).ToYieldInstruction(); // ToYieldInstruction will make Subscribe called automatically
  // do some
}

While o.Count.Subscribe() is good to react on events. Allways making both can be useful to refactor code afterwards: when event is not needed anymore, it's simply removed.
When you no more need calculations being performed async, you can make ObserveEquationSolution() be simple method:


public float ObserveEquationSolution(int iterationsCount)
{
  float result = 2f * iterationsCount;
  count.OnNext(result);
  return result;
}

while keeping Count subscribers untouched.

Комментариев нет:

Отправить комментарий