amazon swf - AWS Simple Workflow Service - run long running processes reliably -
situation:
i want use aws swf coordinate long running manual activities. when activity being scheduled in aws transfer db show on ui tasks pending. tasks can take weeks complete, have huge timeouts in swf.
problem:
in case application fails populate db (hangs or dies without reporting error), task not seen person , retry can happen after weeks, when activity times out (which unacceptable).
question:
so want have ability "start" task (say having timeout of 30 seconds) , when application sure activity started set timeout weeks. possible elegantly using swf?
(i've read through doc , several examples , still don't understand envisioned way of running manual tasks)
unfortunately swf service doesn't support "start activity task" api call. workaround used use activity short timeout insert record db. upon manual task completion signal workflow it. separate timer needed deal manual task timeout. logic can encapsulated in separate class reuse.
added benefit of using signal manual tasks have more 1 state. example workflow can signaled when task claimed , later released back. each state can have different timeout.
[edit: added strawman manualactivityclient example]
public class manualactivityclient { private final map<string, settable<void>> outstandingmanualactivities = new hashmap<>(); private startmanualactivityclient startactivityclient; private workflowclock clock; public promise<void> invoke(string id, string activityargs, long timeout) { promise<void> started = startactivityclient.start(id, activityargs); settable<void> completionpromise = new settable<>(); outstandingmanualactivities.put(id, completionpromise); // tryfinally used define cancellation scope automatic timer cancellation. new tryfinally() { @override protected void dotry() throws throwable { // wrap timer invocation in task(true) give daemon flag. daemon tasks automatically // cancelled when other tasks in same scope (defined dotry) done. new task(true) { @override protected void doexecute() throws throwable { promise<void> manualactivitytimeout = clock.createtimer(timeout); new task(manualactivitytimeout) { @override protected void doexecute() throws throwable { throw new timeoutexception("manual activity " + id + " timed out"); } }; } }; // task used "wait" manual task completion. without timer // cancelled. new task(completionpromise) { @override protected void doexecute() throws throwable { // intentionally empty } }; } @override protected void dofinally() throws throwable { } }; return completionpromise; } public void signalmanualactivitycompletion(string id) { // set completionpromise ready state outstandingmanualactivities.get(id).set(null); } }
and class can used as:
@workflow(...) public class manualactivityworkflow { private manualactivityclient manualactivityclient; @execute(...) public void execute() { // ... promise<void> activity1 = manualactivityclient.invoke("activity1", "someargs1", 300); promise<void> activity2 = manualactivityclient.invoke("activity2", "someargs2", 300); // ... } @signal(...) public void signalmanualactivitycompletion(string id) { manualactivityclient.signalmanualactivitycompletion(id); }
}
Comments
Post a Comment