sorry for late reply …
I want something like Erlang … where you send & receive messages between Modules, but the difference I need to have control over which module is called when i.e. in what order.
So sort of NN-layered network, but every Mod can send message/tensor to any other Module.
Pseudo code of Module of how i imagine it:
class Module:
def recv(folder, msg ) : self.drop(folder,msg)
def send(module, folder,msg): module.recv(folder,msg)
def consume(folder): return self.inbox[folder].pop()
def drop(folder,msg):
self.inbox[folder].push(msg)
#-- the rest is a bit iffy ....
def link(mods): self.mods = mods
def forward(....):
for m in m.run()
def run():
..............
self.forward(...)
So how do i do the processing loop ?
Also I want this to work in single/multiprocess environment … so probably Module is Actor class
Steps are functions annotated with the @workflow.step decorator. Steps are retried on failure, but once a step finishes successfully it will never be run again.
A module need to stay persistent and be able to send/recv msgs and periodically run()
Also it seems Workflow pass data from one step to another i.e. it cant pass data across steps !
Virtual actor is for this purpose. Virtual actor is build on top of workflow. You can think virtual actor as an actor, but don’t have a process in the back.
You can call another virtual actor inside one virtual actor to pass messages.
How does that work ? I’m still experimenting and thinking about the features I need and this seems like my rule 4
The basic structure is a Module that can send&receive messages/tensors
The order of calling the modules is externally imposed
I want to be able to extend the system with Ray to support multiprocessing
A Module has to be able to Encapsulate other Modules, so that it acts as a single unit
and hide the internal message passing (the wrapper module has its own inbox and may be outbox so it can hide the modules inside)
Here is my current blueprint … no Ray yet … still trying to fish out the functionality i need :
class Module(object):
def __init__(self):
self.inbox = defaultdict({})
self.out_map = defaultdict({})
self.mods = [] #execute run() on those
def recv(self, folder, msg ) : self.drop(folder,msg)
def send(self, mod, folder, msg): mod.recv(folder,msg)
#send msg out VIA an output map
def sendVomap(self, key, msg):
omap = self.out_map[key]
self.send(omap['module'], omap['folder'], msg)
def add2omap(self, key, mod, folder): self.out_map[key] = {'module':mod, 'folder':folder}
def has_omap_key(self, key):
if key not in self.out_map :
self.warn(f'missing OMap key: {key}')
return False
return True
def consume(self, folder):
if folder in self.inbox :
return self.inbox[folder].pop()
def drop(self, folder, msg):
if folder not in self.inbox : self.inbox[folder] = deque()
self.inbox[folder].push(msg)
def warn(self, msg): print('WARN: ',msg)
def link(self, mods): self.mods.extend(mods)
def run(self): raise NotImplementedError
class Flow:
def __init__(self, roots):
if isinstance(roots,list) : self.roots = roots
else : self.roots = [roots]
def run(self, mods=None):
if mods is None : mods = self.roots
for m in mods :
m.run()
if len(m.mods) > 0 : self.run(m.mods)