Title: | A Light-Weight Data Pipelining Tool |
---|---|
Description: | Provides a simple interface to developing complex data pipelines which can be executed in a single call. 'sewage' makes it easy to test, debug, and share data pipelines through it's interface and visualizations. |
Authors: | Matthew Whalen [aut, cre, cph] |
Maintainer: | Matthew Whalen <[email protected]> |
License: | MIT + file LICENSE |
Version: | 0.2.5.9000 |
Built: | 2025-02-15 04:10:25 UTC |
Source: | https://github.com/mwhalen18/sewage |
add_node()
will place a new node in the specified pipeline. This will be executed sequentially when the pipeline is executed using run()
add_node(pipeline, component, name, input, ...)
add_node(pipeline, component, name, input, ...)
pipeline |
an initialized sewage pipeline |
component |
a function to be executed. Must be a valid function specification or exported sewage object including |
name |
a name to give to the given component. This will be used as the 'input' parameter for downstream nodes |
input |
the node to use as input into 'component'. Inputs should be either (1) the name of an existing node in the pipeline, or (2) the name(s) of any argument(s) in the first ndoe of the pipeline. These names can be whatever you want, but should match the arguments you pass to |
... |
additional arguments to be passed to the 'component' argument |
a sewage_pipeline
object
my_func = function(df) { df %>% head(15) } pipeline = Pipeline() pipeline = pipeline |> add_node(name = 'processor', component = my_func, input = 'file')
my_func = function(df) { df %>% head(15) } pipeline = Pipeline() pipeline = pipeline |> add_node(name = 'processor', component = my_func, input = 'file')
This function draws a DAG of the existing pipeline flow.
For additional information see igraph::spec_viz
## S3 method for class 'sewage_pipeline' draw(pipeline, ...) draw(pipeline, ...)
## S3 method for class 'sewage_pipeline' draw(pipeline, ...) draw(pipeline, ...)
pipeline |
an instantiated |
... |
reserved for future use |
an htmlwdget
object
The Joiner
takes in objects and joins them according to a
defined method into a single node.
Joiner(method)
Joiner(method)
method |
function to join incoming objects together |
a sewage_joiner
object
additional arguments to be passed to method
should be passed in the
...
of [add_node()]
pipeline = Pipeline() |> add_node(Joiner(method = rbind), name = "Joiner", input = c("file1", "file2"))
pipeline = Pipeline() |> add_node(Joiner(method = rbind), name = "Joiner", input = c("file1", "file2"))
Initialize a sewage Pipeline
Pipeline()
Pipeline()
A sewage pipeline object
print a sewage pipeline
this will print all nodes and theis inputs in the pipeline. Once the pipeline has been executed, print will show the outputs available through [pull_output()]
## S3 method for class 'sewage_pipeline' print(x, ...)
## S3 method for class 'sewage_pipeline' print(x, ...)
x |
a [Pipeline()] object |
... |
not used |
formatted sewage pipeline output
pipeline = Pipeline() |> add_node(component = head, name = "Head", input = "file") print(pipeline)
pipeline = Pipeline() |> add_node(component = head, name = "Head", input = "file") print(pipeline)
Extract output components from a pipeline
pull_output(x, component, ...) ## S3 method for class 'sewage_pipeline' pull_output(x, component, ...)
pull_output(x, component, ...) ## S3 method for class 'sewage_pipeline' pull_output(x, component, ...)
x |
an executed pipeline object |
component |
a character string specifying which output component to pull |
... |
reserved for future use |
output from a terminating node of an executed sewage pipeline
pipeline = Pipeline() |> add_node(component = head, name = "Head", input = 'file') result = run(pipeline, file = iris) pull_output(result, "Head")
pipeline = Pipeline() |> add_node(component = head, name = "Head", input = 'file') result = run(pipeline, file = iris) pull_output(result, "Head")
This function is the extry point for executing a pipeline object
run(pipeline, start = NULL, halt = NULL, ...)
run(pipeline, start = NULL, halt = NULL, ...)
pipeline |
an initialized pipeline object |
start |
node at which to start execution. If NULL then execution will start at the first node |
halt |
halt execution at a specified node. Adding this parameter will halt execution of the remainder of the pipeline. Note that because pipelines are executed sequentially in the order you add them to the pipeline, in the case of a branching pipeline, any nodes from a different branch that were specified earlier in the pipeline will still be executed. |
... |
parameter(s) to pass to starting node of the pipeline. This should match the 'input' parameter of 'add_node' of the starting node. In the case that you have multiple inputs or are starting at a later point in the pipeline, each argument should match the name of a starting node in your pipeline. |
an executed sewage_pipeline object
func1 = function(x) { x } pipeline = Pipeline() |> add_node(component = func1, name = "Func1", input = "file") |> add_node(component = func1, name = "Func2", input = "Func1") |> add_node(component = func1, name = "Func3", input = "Func2") run(pipeline, file = mtcars) run(pipeline, start = "Func2", Func1 = iris) run(pipeline, halt = "Func2", file = mtcars)
func1 = function(x) { x } pipeline = Pipeline() |> add_node(component = func1, name = "Func1", input = "file") |> add_node(component = func1, name = "Func2", input = "Func1") |> add_node(component = func1, name = "Func3", input = "Func2") run(pipeline, file = mtcars) run(pipeline, start = "Func2", Func1 = iris) run(pipeline, halt = "Func2", file = mtcars)
Splitter
takes in exactly one input node and
propogates the input to n output nodes.
Splitter(edges = 2)
Splitter(edges = 2)
edges |
number out outputs. Must be greater than 1 |
After executing a Splitter
object, the pipeline will contains
n outputs and will be named as SplitterName_output{i}
.
a sewage_splitter
object
The ouputs of a Splitter
object are accessed through the naming
convention {name}.output_{i}
where name
is the specified name of the Splitter object. This allows you to pass
split objects to downstream nodes or access them through the pipeline
results.
pipeline = Pipeline() pipeline = pipeline |> add_node(name = 'Splitter', component = Splitter(), input = 'file') result = run(pipeline, file = mtcars) pull_output(result, 'Splitter.output_1') pull_output(result, 'Splitter.output_2')
pipeline = Pipeline() pipeline = pipeline |> add_node(name = 'Splitter', component = Splitter(), input = 'file') result = run(pipeline, file = mtcars) pull_output(result, 'Splitter.output_1') pull_output(result, 'Splitter.output_2')