## ## New-TaskPool.ps1 ## Gaurhoth *new email address ## version 0.7 ## 0.5 - First Public Release ## 0.6 - Converted several $script:... Variables into a single ## object. ## - Removed StartJob function and broke it out because it caused ## caused a scoping issue when using New-TaskPool as a function. ## 0.7 - Replaced StartJob since my change to using an object for ## managing the pipelines instead of individual varaibles ## solved my scoping issue. ## - Checking to see if runspace is open and reusing instead of ## recreating. Function New-TaskPool { param ( [scriptblock]$scriptblock, [scriptblock]$failblock, [int]$pools=10, [int]$maxruntime=120 ) begin { set-PSDebug -strict if (!$scriptblock) { throw '$Scriptblock is required' } # this object will hold references to all the objects necessary # to manage the runspaces and pipelines. $pipemgr = @(); (0..($pools-1)) | % { $pipemgr += 0 | select-object rscfg,rs,pipe,pipetimer,input,error,output } $smarp = [system.Management.Automation.Runspaces.PipelineState] $complete = 0 $count = 0 #counts all items in the pipeline function Startjob { param ($obj,$job) if ($pipemgr[$job].rs.runspacestateinfo.state -ne [System.Management.Automation.Runspaces.RunspaceState]::Opened) { $pipemgr[$job].rscfg = [management.automation.runspaces.runspaceconfiguration]::Create() $pipemgr[$job].rs = [management.automation.runspaces.runspacefactory]::CreateRunspace($pipemgr[$job].rscfg) $pipemgr[$job].rs.Open() } $pipemgr[$job].pipe = $pipemgr[$job].rs.CreatePipeline($scriptblock) [void]$pipemgr[$job].pipe.Input.Write($obj,$true) $pipemgr[$job].pipe.Input.Close() $pipemgr[$job].pipe.InvokeAsync() $pipemgr[$job].pipetimer = [datetime]::now $pipemgr[$job].input=$obj } } process { $obj = $_ $count++ while (1) { (0..($pools-1)) | % { $job=$_ $pipestate = $pipemgr[$job].pipe.pipelinestateinfo.state if ($pipestate -eq $null) { startjob $obj $job break } elseif ($pipestate -eq $smarp::Completed) { $pipemgr[$job].output = $pipemgr[$job].pipe.output.readtoend() $pipemgr[$job].error = $pipemgr[$job].pipe.error.readtoend() write-output $pipemgr[$job].output startjob $obj $job $complete++ break } elseif ($pipestate -eq $smarp::Running) { if ( [datetime]::now -gt $pipemgr[$job].pipetimer.addseconds($maxruntime) ) { if ($failblock) { $pipemgr[$job].Input | &$failblock } startjob $obj $job $complete++ break } } else { if ($failblock) { $pipemgr[$job].Input | &$failblock } startjob $obj $job break } } } } end { # we should have no more than ($pools-1) still running. # we'll keep track using an array $jobsdone. When all elements # of $jobsdone have been set to 1 (not 0), we'll know all # remaining inputs have been completed. # If total number of items processed ($count) is less than number of $pools # go ahead and set $jobsdone(#) to 1 so we don't try to dispose of a runspace # that never existed. $jobsdone = @(); (0..($pools-1)) | % { if ($pipemgr[$_].pipe) { $jobsdone += 0 } else { $jobsdone += 1 } } while ($jobsdone -contains 0) { # Loop through all runspaces and check their status (0..($pools-1)) | % { $job=$_ if ($jobsdone[$job] -eq 0) { $pipestate = $pipemgr[$job].pipe.pipelinestateinfo.state if ($pipestate -eq $smarp::Completed) { $pipemgr[$job].output = $pipemgr[$job].pipe.output.readtoend() $pipemgr[$job].error = $pipemgr[$job].pipe.error.readtoend() write-output $pipemgr[$job].output $jobsdone[$job] = 1 $pipemgr[$job].pipe.Dispose() $pipemgr[$job].rs.Close() $pipemgr[$job].pipe = $null $pipemgr[$job].rs = $null $pipemgr[$job].rscfg = $null $complete++ } elseif ($pipestate -eq $smarp::Running) { if ( [datetime]::now -gt $pipemgr[$job].pipetimer.addseconds($maxruntime) ) { if ($failblock) { $pipemgr[$job].Input | &$failblock } $jobsdone[$job] = 1 $pipemgr[$job].pipe.StopAsync() $pipemgr[$job].rs.CloseAsync() $pipemgr[$job].pipe = $null $pipemgr[$job].rs = $null $pipemgr[$job].rscfg = $null $complete++ } } else { $jobsdone[$job] = 1 if ($pipemgr[$job].pipe) { $pipemgr[$job].pipe.Dispose() $pipemgr[$job].rs.Close() $pipemgr[$job].pipe = $null $pipemgr[$job].rs = $null $pipemgr[$job].rscfg = $null } } } } } } }