## ## New-TaskPool.ps1 ## Gaurhoth ## gaurhoth@live.com ## version 0.5 ## Accepts pipeline input and feeds it ## to a pool of runspaces. Each runspace ## Executes $scriptblock against the input ## and returns results back up the pipeline param ( [scriptblock]$scriptblock, [scriptblock]$failblock, [int]$pools=10, [int]$maxruntime=120 ) begin { set-PSDebug -strict if (!$scriptblock) { throw '$Scriptblock is required' } # If there is no $failblock scriptblock specified, # the input object will be swallowed without return # should the $scriptblock get stalled for more than # $maxruntime seconds or $scriptblock fail to return # a result. ## ## Example $failblock if not already defined ## if (!$failblock) { ## $failblock = { ## $input | % { ## $item = 1 | select-Object Computer,Status,Version ## if ($obj.gettype().Name -eq "SearchResult") { ## $item.Computer = $_.properties.cn[0] ## } else { ## $item.Computer = $_ ## } ## $item.Status = "TimeOut" ## write-Output $item ## } ## } ## } function StartJob { param ($obj,$job) ## We'll be using this block of code often so it's now a function. $script:rscfg[$job] = [management.automation.runspaces.runspaceconfiguration]::Create() $rs[$job] = [management.automation.runspaces.runspacefactory]::CreateRunspace($script:rscfg[$job]) $script:rs[$job].Open() $script:pipe[$job] = $script:rs[$job].CreatePipeline($ScriptBlock) [void]$script:pipe[$job].Input.Write($obj,$true) $script:pipe[$job].Input.Close() $script:pipe[$job].InvokeAsync() $script:pipetimer[$job] = [datetime]::now $script:cache[$job]=$obj } # Set of variables for creating and maintaining the Runspaces # Predefine enough array elements to handle all the threads. $script:rscfg = @(); (0..($pools-1)) | % { $rscfg += 0 } $script:rs = @(); (0..($pools-1)) | % { $rs += 0 } $script:pipe = @(); (0..($pools-1)) | % { $pipe += 0 } $script:pipetimer = @(); (0..($pools-1)) | % { $pipetimer += 0 } $script:cache = @(); (0..($pools-1)) | % { $cache += 0 } $smarp = [system.Management.Automation.Runspaces.PipelineState] $complete = 0 $count = 0 #counts all items in the pipeline } process { $obj = $_ $count++ while (1) { (0..($pools-1)) | % { $job=$_ $pipestate = $pipe[$job].pipelinestateinfo.state if ($pipestate -eq $null) { StartJob $obj $job break } elseif ($pipestate -eq $smarp::Completed) { write-output $pipe[$job].output.readtoend() StartJob $obj $job $complete++ break } elseif ($pipestate -eq $smarp::Running) { if ( [datetime]::now -gt $pipetimer[$job].addseconds($maxruntime) ) { if ($failblock) { $cache[$job] | &$failblock } StartJob $obj $job $complete++ break } } else { if ($failblock) { $cache[$job] | &$failblock } StartJob $obj $job break } } } } end { # we should have no more than ($pools-1) still running. # we'll keep track using an array $jobsdone. When all elements # of $jobsdone have been set to 1 (not 0), we'll know all # remaining inputs have been completed. # If total number of items processed ($count) is less than number of $pools # go ahead and set $jobsdone(#) to 1 so we don't try to dispose of a runspace # that never existed. $jobsdone = @(); (0..($pools-1)) | % { if ($pipe[$_]) { $jobsdone += 0 } else { $jobsdone += 1 } } while ($jobsdone -contains 0) { # Loop through all runspaces and check their status (0..($pools-1)) | % { $job=$_ if ($jobsdone[$job] -eq 0) { $pipestate = $pipe[$job].pipelinestateinfo.state if ($pipestate -eq $smarp::Completed) { write-Output $pipe[$job].output.readtoend() $jobsdone[$job] = 1 $pipe[$job].Dispose() $rs[$job].Close() $pipe[$job] = $null $rs[$job] = $null $rscfg[$job] = $null $complete++ } elseif ($pipestate -eq $smarp::Running) { if ( [datetime]::now -gt $pipetimer[$job].addseconds($maxruntime) ) { if ($failblock) { $cache[$job] | &$failblock } $jobsdone[$job] = 1 $pipe[$job].StopAsync() $rs[$job].CloseAsync() $pipe[$job] = $null $rs[$job] = $null $rscfg[$job] = $null $complete++ } } else { $jobsdone[$job] = 1 if ($pipe[$job]) { $pipe[$job].Dispose() $rs[$job].Close() $pipe[$job] = $null $rs[$job] = $null $rscfg[$job] = $null } } } } } }