Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SYNCHRONIZE dead loop #43

Open
LinJianping opened this issue Apr 12, 2017 · 1 comment
Open

SYNCHRONIZE dead loop #43

LinJianping opened this issue Apr 12, 2017 · 1 comment

Comments

@LinJianping
Copy link

LinJianping commented Apr 12, 2017

I have written a simple test as follow, but it seems the demo would fall into dead loop. Can anyone tell me why? Thanks.

 from workflow.engine import GenericWorkflowEngine
 from workflow.patterns import SYNCHRONIZE
 my_engine = GenericWorkflowEngine()
 def fangxingzaobo(obj,eng):
     print 'fangxingzaobo'
 def shixingzaobo(obj,eng):
     print 'shixingzaobo'
 class ECG_data(object):
     def __init__(self,data):
         self.data = data
 my_workflow_definition = [
              SYNCHRONIZE(shixingzaobo,fangxingzaobo)
              ]
 ecg_data = ECG_data(1)
 my_engine.callbacks.replace(my_workflow_definition)
 my_engine.process([ecg_data])
@poinot
Copy link

poinot commented Aug 16, 2017

Fixed:

  1. a lambda in a closure is evaluated at run time. the side effect in workflow/pattern/controlflow.py SYNCRONIZE function is that you only have the last function of the list for all Queue entries. Fixed by forcing a keyword arg with default value in the lambda so that the closure actually as the right function
  2. the join requires a .task_done to complete the threaded call.

patch below (by the way line numbers are for v2.0.1) in workflow/patterns/controlflow.py:

--- ./workflow/patterns/controlflow.py	2017-08-16 15:08:58.917214062 +0200
+++ ../workflow-2.0.1/workflow/patterns/controlflow.py	2017-08-04 19:05:24.000000000 +0200
@@ -383,13 +383,14 @@
             t = MySpecialThread(queue)
             t.setDaemon(True)
             t.start()
+
         for func in args[0:-1]:
             if isinstance(func, list) or isinstance(func, tuple):
                 new_eng = eng.duplicate()
                 new_eng.setWorkflow(func)
                 queue.put(lambda: new_eng.process([obj]))
             else:
-                queue.put(lambda func=func : func(obj, eng))
+                queue.put(lambda: func(obj, eng))
 
         # wait on the queue until everything has been processed
         queue.join_with_timeout(timeout)
@@ -476,6 +477,7 @@
         finally:
             self.all_tasks_done.release()
 
+
 class MySpecialThread(threading.Thread):
 
     def __init__(self, itemq, *args, **kwargs):
@@ -485,4 +487,3 @@
     def run(self):
         call = self.itemq.get()
         call()
-        self.itemq.task_done()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants