输入和输出

输入和输出
输入
  在TaskFlow中,有多种方法给任务和流提供输入。
  任务从任务的参数获取输入,通过返回值提供输出。理想的情况是将数据从一个任务传递到下一个任务。但是实际情况是,并不是所有的任务参数都被所有的任务所需要,也不是所有任务的返回值都被其它任务所需要。换句话说就是,对于某些任务,有的参数根本没用到;而有些任务的返回值并没有被其它任务所使用。
  如果一个值被一个流中的一个或者多个任务所需要,但是却没有任务提供此值,那么我们称为此值为流的输入。流的输入必须在流运行前存入存储中。流的属性requires保存了流的所有输入名。
  流中所有任务的输出称为流的输出,流的属性provides中保存了流的所有输出名。

from taskflow import task
from taskflow.patterns import linear_flow


class MyTask(task.Task):
    def execute(self, **kwargs):
        return 1, 2

flow = linear_flow.Flow('test').add(
    MyTask(requires='a', provides=('b', 'c')),
    MyTask(requires='b', provides='d')
)

print('flow.requires: %r' % flow.requires)

print('flow.provides: %r' % flow.provides)

  程序输出:

flow.requires: frozenset(['a'])
flow.provides: frozenset(['c', 'b', 'd'])

  对于第一个任务,其requires为'a',而所有其它任务都没有提供'a',所以'a'称为流的输入,它保存在流的属性requires中。
  Retry与任务Task对于输入和输出的处理是一模一样的。
  如果流运行时无法在存储中找到流的输入,那么就会抛出MissingDependencies异常。下面是一个例子:

from taskflow import task
from taskflow.patterns import linear_flow
from taskflow import engines


class CatTalk(task.Task):
def execute(self, meow):
    print meow
    return "cat"

class DogTalk(task.Task):
def execute(self, woof):
    print woof
    return "dog"

flo = linear_flow.Flow("cat-dog")
flo.add(CatTalk(), DogTalk(provides="dog"))
engines.run(flo)

  程序输出:

Traceback (most recent call last):
  ...
taskflow.exceptions.MissingDependencies: 'linear_flow.Flow: cat-dog(len=2)' requires ['meow', 'woof'] but no other entity produces said requirements
MissingDependencies: 'execute' method on '__main__.CatTalk==1.0' requires ['meow'] but no other entity produces said requirements
    MissingDependencies: 'execute' method on '__main__.DogTalk==1.0' requires ['woof'] but no other entity produces said requirements

  显然上面的程序抛出异常MissingDependencies,提示开发者需要'meow'和'woof',但是却没有找到。
  推荐使用engine的帮助方法(run()或者load())来提供流的输入,下面是一个使用run()提供流输入的例子

from taskflow import task
from taskflow.patterns import linear_flow
from taskflow import engines


class CatTalk(task.Task):
def execute(self, meow):
    print meow
    return "cat"

class DogTalk(task.Task):
def execute(self, woof):
    print woof
    return "dog"

flo = linear_flow.Flow("cat-dog")
flo.add(CatTalk(), DogTalk(provides="dog"))
result = engines.run(flo, store={'meow': 'meow', 'woof': 'woof'})
print(result)

  程序输出:

meow
woof
{'meow': 'meow', 'woof': 'woof', 'dog': 'dog'}

  上面的程序通过run()函数中的参数store提供了流输入。
  也直接与存储层进行交互,将值添加到存储层。不同的是,使用这种方法就不能使用engines的帮助函数run(),而是必须使用engines实例的方法run()。下面的例子通过方法inject()将{"woof": "bark"}添加到存储层,并调用engines实例的方法run()。

from taskflow import task
from taskflow.patterns import linear_flow
from taskflow import engines


class CatTalk(task.Task):
def execute(self, meow):
    print meow
    return "cat"

class DogTalk(task.Task):
def execute(self, woof):
    print woof
    return "dog"

flo = linear_flow.Flow("cat-dog")
flo.add(CatTalk(), DogTalk(provides="dog"))
eng = engines.load(flo, store={'meow': 'meow'})
eng.storage.inject({'woof': 'woof'})
eng.run()

  程序输出:

meow
woof
输出

  从第一个例子可以看出:流中所有任务的输出称为流的输出,流的属性provides中保存了流的所有输出名。可以通过方法fetch_all()获取所有的输出值,也可以通过fetch()获取单个输出值。下面是一个通过fetch_all()和fetch()获取输出值的例子:

from taskflow import task
from taskflow.patterns import linear_flow
from taskflow import engines


class CatTalk(task.Task):
def execute(self, meow):
    print meow
    return "cat"

class DogTalk(task.Task):
def execute(self, woof):
    print woof
    return "dog"

flo = linear_flow.Flow("cat-dog")
flo.add(CatTalk(), DogTalk(provides="dog"))
eng = engines.load(flo, store={'meow': 'meow'})
eng.storage.inject({'woof': 'woof'})
print('eng.storage.fetch_all() before run(): %r' % eng.storage.fetch_all())
eng.run()

print('eng.storage.fetch_all() after: %r' % eng.storage.fetch_all())
print("eng.storage.fetch('dog'): %r" % eng.storage.fetch('dog'))

  程序输出:

eng.storage.fetch_all() before run(): {'meow': 'meow', 'woof': 'woof'}
meow
woof
eng.storage.fetch_all() after: {'meow': 'meow', 'woof': 'woof', 'dog': 'dog'}
eng.storage.fetch('dog'): 'dog'

  从输出可以看出,执行eng.run()前,存储层包含了所有输入;执行eng.run()后,存储层多了流的输出'dog'。


参考资料:
Inputs and outputs