[Neuroimaging] iterating a workflow over inputs

Ian Malone ibmalone at gmail.com
Wed Jan 11 13:22:13 EST 2017

Thanks. I guess the workflow interface version you mention would be
similar, using Select() or something to split the inputs for
sub-workflow assignment? I wanted arbitrary numbers of processes and
ended up doing something along those lines:

for dtiN in range(in_count):
    split_unmerged_images = pe.Node(niu.Select(index=dtiN),
    workflow.connect(out_unmerger, 'dwis_out', split_unmerged_images, 'inlist')
    split_unmerged_bvals = pe.Node(niu.Select(index=dtiN),
    workflow.connect(out_unmerger, 'bvals_out', split_unmerged_bvals, 'inlist')
    split_unmerged_bvecs = pe.Node(niu.Select(index=dtiN),
    workflow.connect(out_unmerger, 'bvecs_out', split_unmerged_bvecs, 'inlist')
    split_unmerged_orig_file = pe.Node(niu.Select(index=dtiN),
    workflow.connect(out_unmerger, 'orig_file',
split_unmerged_orig_file, 'inlist')

    workflow.connect(split_unmerged_images, "out", tensor_fit[dtiN],
    workflow.connect(split_unmerged_bvals, "out", tensor_fit[dtiN],
    workflow.connect(split_unmerged_bvecs, "out", tensor_fit[dtiN],
    workflow.connect(split_unmerged_orig_file, "out",
tensor_fit[dtiN], 'input_node.in_orig_filename')
    tensor_fit[dtiN].inputs.input_node.in_t1_file = in_t1
    workflow.connect(r, 'output_node.t1_mask', tensor_fit[dtiN],
    workflow.connect(r, 'output_node.mask', tensor_fit[dtiN],
    workflow.connect(r, 'output_node.b0_to_t1', tensor_fit[dtiN],
    workflow.connect(tensor_fit[dtiN], 'renamer.fa', ds,
'unmerge. at fa{0}'.format(dtiN))
    workflow.connect(tensor_fit[dtiN], 'renamer.fa_res', ds,
'unmerge. at fa_res{0}'.format(dtiN))
    workflow.connect(tensor_fit[dtiN], 'renamer.b0', ds,
'unmerge. at b0{0}'.format(dtiN))
    workflow.connect(tensor_fit[dtiN], 'renamer.b0_res', ds,
'unmerge. at b0_res{0}'.format(dtiN))

It's a dti workflow, fortunately the number of nodes is determined by
the number of datasets supplied to the program (they have to be merged
for an earlier processing step), so it's easy to loop over. I'm not
sure if this would work currently if the number of processes needed is
determined in an earlier node (I suppose something similar to this but
in the function node example could work in that case).

Either way, handling for this in the next API would be very good news!

Best wishes,

On 11 January 2017 at 15:47, Satrajit Ghosh <satra at mit.edu> wrote:
> hi ian,
> in the current API, there are a few ways to do this, but all involve
> wrapping the subworkflow in something.
> option 1: create a function node
> option 2: create a workflow interface
> in both cases, some code will have to take the node/interface inputs and map
> them to the inputs of the subworkflow, take the outputs and mapping it to
> the outputs of the node/interface. however, unless your cluster allows job
> submission from arbitrary nodes, you may need to preallocate resources.
> a function node example:
> def run_subwf(input1, input2, plugin='MultiProc', plugin_args={'n_procs':
> 2}):
>      import os
>      from myscripts import import create_workflow_func
>      wf = create_workflow_func()
>      wf.inputs.inputnode.input1 = input1
>      wf.inputs.inputnode.input2 = input2
>      wf.base_dir = os.getcwd()
>      egraph = wf.run(plugin=plugin, plugin_args=plugin_args)
>      outputnode = ['outputnode' in node for node in egraph.nodes()]
>      return outputnode.out1, outputnode.out2
> subnode = Node(Function(input_names=['input1', 'input2', ...],
> output_names=['out1', 'out2'], func=run_subwf), name='subwf')
> one could probably optimize a few things automatically given a workflow.
> in the next generation API, this will be doable without creating these
> special nodes/interfaces.
> cheers,
> satra
> On Tue, Jan 10, 2017 at 11:47 AM, Ian Malone <ibmalone at gmail.com> wrote:
>> On 9 January 2017 at 18:59, Ian Malone <ibmalone at gmail.com> wrote:
>> > Hi,
>> >
>> > I've got a relatively complex workflow that I'd like to use as a
>> > sub-workflow of another one, however it needs to be iterated over some
>> > of the inputs. I suppose I could replace the appropriate pe.Node()s in
>> > it with MapNode()s, but there are a fair number of them, and quite a
>> > few connections. (I also think, that this would prevent it being used
>> > on single instance inputs without first packing them into a list,
>> > though I could be wrong.)
>> >
>> > Is this at all possible, or should I bite the bullet and start
>> > MapNode-ing the sub-workflow?
>> This has turned out to be doubly interesting as I forgot my
>> sub-workflow already had its own sub-workflow, which is already used
>> elsewhere with a single set of inputs. I suppose I can use
>> interfaces.utility.Split() to extract the single output again in that
>> case, but the bigger workflow (which I'd also like to use elsewhere)
>> has quite a few outputs, and connecting a split to each one seems a
>> bit unwieldy. Any good solutions to this?
>> --
>> imalone
>> _______________________________________________
>> Neuroimaging mailing list
>> Neuroimaging at python.org
>> https://mail.python.org/mailman/listinfo/neuroimaging
> _______________________________________________
> Neuroimaging mailing list
> Neuroimaging at python.org
> https://mail.python.org/mailman/listinfo/neuroimaging


More information about the Neuroimaging mailing list