Online Flow processing

Using Workflow can I build persistent flow of processing i.e. online learning.

F.e.

[Module1]—>[Module3]---->[Module4]---->out
[Module2]-----------^

Every input is a Tensor (torch or TF).
Module3 has to be able to distinguish between the input from M1&M2.

How?
Asking because the Docs say that functions are fire and forget i.e. not persistent

Sorry I think I probably need more details about what you are asking here.

So, does the module represent a workflow step here? For example,
We’ll have a pipeline

  M1 -> M3 -> M4. 
  M2 - /

For this, we can distinguish them from the inputs:

M4.step(M3.step(M1.step(), M2.step()))

where M3 might be:

@workflow.step
def m3(m1_input, m2_input): ...

Or do you mean module3 will be a workflow job and module1, module2 are the place where trigger module3 to update it’s internal state?

First of all, nothing is shared across runs. And you can get the step output with the named step which has been added recently.

For example,

@workflow.step(name="A")
def f(): ...

f.step().run("id")

workflow.get_output("id", name="A")

We don’t support stateful steps directly right now.

1 Like

sorry for late reply …
I want something like Erlang … where you send & receive messages between Modules, but the difference I need to have control over which module is called when i.e. in what order.

So sort of NN-layered network, but every Mod can send message/tensor to any other Module.

Pseudo code of Module of how i imagine it:

class Module:

   def recv(folder, msg ) : self.drop(folder,msg)

   def send(module, folder,msg): module.recv(folder,msg)

   def consume(folder): return self.inbox[folder].pop()


   def drop(folder,msg): 
       self.inbox[folder].push(msg)
   
   #-- the rest is a bit iffy ....
   def link(mods): self.mods = mods

   def forward(....):
           for m in m.run()

   def run():
        ..............
       self.forward(...)  

So how do i do the processing loop ?

Also I want this to work in single/multiprocess environment … so probably Module is Actor class

If you don’t want the falt tolerance semantic, you probably should just use ray actor.

Otherwise, you can check virtual actor (Virtual Actors — Ray v2.0.0.dev0)

1 Like

thanks… this worries me

Steps

Steps are functions annotated with the @workflow.step decorator. Steps are retried on failure, but once a step finishes successfully it will never be run again.

A module need to stay persistent and be able to send/recv msgs and periodically run()

Also it seems Workflow pass data from one step to another i.e. it cant pass data across steps !

Virtual actor is for this purpose. Virtual actor is build on top of workflow. You can think virtual actor as an actor, but don’t have a process in the back.

You can call another virtual actor inside one virtual actor to pass messages.

1 Like

How does that work ? I’m still experimenting and thinking about the features I need and this seems like my rule 4

  1. The basic structure is a Module that can send&receive messages/tensors
  2. The order of calling the modules is externally imposed
  3. I want to be able to extend the system with Ray to support multiprocessing
  4. A Module has to be able to Encapsulate other Modules, so that it acts as a single unit
    and hide the internal message passing (the wrapper module has its own inbox and may be outbox so it can hide the modules inside)

Here is my current blueprint … no Ray yet … still trying to fish out the functionality i need :


class Module(object):

	def __init__(self):
		self.inbox = defaultdict({})
		self.out_map = defaultdict({})
		self.mods = [] #execute run() on those

	def recv(self, folder, msg ) : self.drop(folder,msg)
	def send(self, mod, folder, msg): mod.recv(folder,msg)
	#send msg out VIA an output map
	def sendVomap(self, key, msg): 
		omap = self.out_map[key]
		self.send(omap['module'], omap['folder'], msg)

	def add2omap(self, key, mod, folder): self.out_map[key] = {'module':mod, 'folder':folder}	
	def has_omap_key(self, key):
		if key not in self.out_map : 
			self.warn(f'missing OMap key: {key}')
			return False
		return True	

	def consume(self, folder): 
		if folder in self.inbox :
			return self.inbox[folder].pop()

	def drop(self, folder, msg): 
		if folder not in self.inbox : self.inbox[folder] = deque()
		self.inbox[folder].push(msg)

	def warn(self, msg): print('WARN: ',msg)	

	def link(self, mods): self.mods.extend(mods)

	def run(self): raise NotImplementedError



class Flow:

	def __init__(self, roots):
		if isinstance(roots,list) : self.roots = roots
		else : self.roots = [roots]

	def run(self, mods=None):
		if mods is None : mods = self.roots
		for m in mods :
			m.run()
			if len(m.mods) > 0 : self.run(m.mods)

What I mean is that:

@workflow.virtual_actor
class A:
    def __init__(self, b):
        self._b = b
    def pass_msg(self, msg):
        workflow.get_actor(self._next).method.run(msg)

@workflow.virtual_actor
class B:
    def __init__(self): pass
    def method(self, msg): pass


B.get_or_create("B")
a = A.get_or_create("A")
a.pass_msg.run("msg")

Btw, if you don’t need fault tolerance semantics, you can use ray actor directly.

1 Like

wow thanks… I dont need FT now, but may be in the future… hmm… is there some drawback of WFlow vs Bare ?

What if I have to pass msgs across several steps in any direction ?
May be normal ray remote call ?

Use workflow it’s going to do checkpoint which takes time and resource. You can get more details here: API Comparisons — Ray 3.0.0.dev0

I think it’s the same. For example, in B.method, pass it to another actor.