Embedded Pipeline

The embedded pipeline class provides a way to instantiate and run a sprokit pipeline within a program. The main use case for an embedded pipeline is in cases where a program calculates or obtains the input and needs the output for local processing. A prime example would be a GUI that runs a pipeline and display the results.

The pipeline to be run is passed to the build_pipeline() method as a stream. The stream can be from a file or from a string built at run time. If the pipeline is coming from a file, it is useful to supply the directory portion (up to but not including the last ‘/’) to the call so the error messages can provide the real file name and any relativepath modifiers can supply an accurate prefix. If the pipeline comes from a string stream, do not supply this parameter.

After the pipeline has been built, it is started with the start() call. Data are supplied to the pipeline via the send() method and retrieved with the receive() method. If input data is supplied faster than the pipeline can accept it, the send() method will block until there is room for the new element. Similarly, the receive() method will block if there is no output available from the pipeline.

When all the input has been supplied to the pipeline, the send_end_of_input() is used to signal this condition. This will cause the pipeline to terminate after all the supplied data has been processed. The receive() method will return an end of data indication after the last pipeline output has been returned.

Example

Let’s look at an example to see the details of using an embedded pipeline. The following code implements a simple pipeline that just passes data from the input to the output. The following paragraphs will describe sections of this example in detail.

The following section builds a pipeline at runtime using a set of macros to simplify constructing the pipeline definition. The full list of these macros and associated documentation can be found in the documentation for the include file.

 1#include <sprokit/pipeline_util/literal_pipeline.h>
 2
 3// SPROKIT macros can be used to create pipeline description
 4std::stringstream pipeline_desc;
 5pipeline_desc << SPROKIT_PROCESS( "input_adapter",  "ia" )
 6              << SPROKIT_PROCESS( "output_adapter", "oa" )
 7
 8              << SPROKIT_CONNECT( "ia", "counter",  "oa", "out_num" )
 9              << SPROKIT_CONNECT( "ia", "port2",    "oa", "port3" )
10              << SPROKIT_CONNECT( "ia", "port3",    "oa", "port2" );

Line 1 includes the file that defines a set of macros that can be used to programmatically create a pipeline definition.

Line 4 defines a string stream that will contain the constructed pipeline definition.

Line 5 defines a process of type “input_adapter” that will be referenced as “ia” in the pipeline definition.

Line 6 defines a process of type “output_adapter” that will be referenced as “oa” in the pipeline definition.

Line 8 connects port “counter” on process “ia” (the input adapter) to port “out_num” on process “oa” (the output adapter).

Lines 9 and 10 make additional connections between the input and output adapter.

The following section creates and starts the pipeline.

11// create embedded pipeline
12kwiver::embedded_pipeline ep;
13ep.build_pipeline( pipeline_desc );
14
15// Start pipeline
16ep.start();

Line 12 creates the embedded pipeline object.

Line 13 builds the pipeline based on the supplied input stream. Errors may be detected while building the pipeline.

Line 16 starts the pipeline running. Control returns after the pipeline is started to allow this thread to optionally supply inputs and/or consume outputs while the pipeline runs asynchronously.

The following code illustrates how data items are supplied to the embedded pipeline. In this sample code, ten sets of data are sent to the pipeline with the result being read back immediately. This may be impractical for more complicated pipelines, because some operations require more than one input before an output can be computed. It is also not very efficient, since sending many inputs will allow the pipeline to keep working while we wait on an output.

17for ( int i = 0; i < 10; ++i)
18{
19  // Create dataset for input
20  auto ds = kwiver::adapter::adapter_data_set::create();
21
22  // Add value to be pushed to the named port
23  ds.add_value( "counter", i );
24
25  // Data values need to be supplied to all connected ports
26  // (based on the previous pipeline definition)
27  ds.add_value( "port2", i );
28  ds.add_value( "port3", i );
29  ep.send( ds ); // push into pipeline
30
31  // Get output from pipeline
32  auto rds = ep.receive();
33
34  // get value from the output adapter
35  int val = rds->get_port_data<int>( "out_num" );
36} // end for

Line 20 creates a new adapter data set object. An adapter_data_set contains all inputs to the pipeline. They are collected in this object so they can be presented to the pipeline at the same time.

Lines 23 - 28 add individual data values to the pipeline input object (adapter_data_set). The string specified in the call must match the port name that was used to connect to the input_adapter. The value specified will be supplied to that port.

Line 29 sends the set of input data to the input adapter process. An error will be thrown if there is a port connected to that process which does not have an associated data element. An error will also be thrown if there is a element with a name that is not connected to the input process.

Line 32 retrieves a set of output values from the pipeline. There will be a value for each port that is connected to the output_process.

Line 35 extracts the integer value from pipeline output. The value from port “out_num” of the “output_adapter” process is returned.

Pipeline Inputs and Outputs

In order to adapt a pipeline to running in an embedded manner, the inputs that are supplied by the program are passed to the input_adapter process and the outputs from the pipeline are passed to the output_adapter. The pipeline definition must specify the connections from/to these processes.

Sets of input data elements are passed to the pipeline using an adapter_data_set object. This class defines a named set of data items where the name corresponds to the port name, as specified in the pipeline definition. The type of the data element must be compatible with what is expected on the port by the receiving process. The output_adapter returns the named data elements in the same way.

class adapter_data_set

Adapter datum to or from sprokit external adapter process.

This class represents a set of data to be used as input or contains output from a sprokit pipeline. This set consists of a set of elements containing data to/from the pipeline. A data element consists of the port name and a data element.

When creating a adapter_data_set for input to the pipeline, use the add_value() method to add a data value for the named port. The names of the ports to the input process are specified in the pipeline configuration file.

When a adapter_data_set is returned from the output of a pipeline, it contains one element from each connection to the output process. Each element in the set is labeled with the port name as specified in the pipeline configuration file.

Public Types

enum data_set_type

Type of data set.

These are used to specify the payload in this data set. Usually it contains data for the ports, but at the end it is marked with end_of_input.

Usually, sending an end_of_input element is not needed. Call the embedded_pipeline::send_end_of_input() method to signal end of input and terminate the pipeline processing. In any event, no data can be sent to an adapter after the end_of_input element has been sent.

Values:

enumerator data
enumerator end_of_input

Public Functions

data_set_type type() const

Get data set type.

This method returns the data set type. Valid types are defined in the data_set_type enum.

Returns:

data set type enum.

bool is_end_of_data() const

Test if this object has end of data marker.

This method is a convenient way to check if the type is the end marker.

Returns:

true if this is end of data element.

void add_datum(sprokit::process::port_t const &port, sprokit::datum_t const &datum)

Add datum to this data set.

This method adds the specified port name and the datum to be placed on that port to the data_set. If there is already a datum in the set for the specified port, the data is overwritten with the new value.

Parameters:
  • port – Name of the port where data is sent.

  • datum – Sprokit datum object to be pushed to port.

template<typename T>
void add_value(::sprokit::process::port_t const &port, T const &val)

Add typed value to data set.

This method adds the specified value to the adapter data set. The value is copied into the data set. This will overwrite the value at the port

Parameters:
  • port – Name of the port where data is sent.

  • val – Value to be wrapped in datum for port.

bool empty() const

Query if data set is empty.

This method tests if the data set is empty.

Returns:

true if the data set is empty (contains no values), otherwise false.

datum_map_t::iterator begin()

Get begin iterator for items in this data set.

An iterator can be used to inspect the elements of the data set.

Returns:

Begin iterator.

datum_map_t::iterator end()

Get ending iterator for items in this data set.

An iterator can be used to inspect the elements of the data set.

Returns:

End iterator.

datum_map_t::const_iterator find(sprokit::process::port_t const &port) const

Find entry for specific port name.

This method returns an iterator pointing at the entry for the specified port. The datum can be accessed through it->second. If the specified port name is not in the set, the returned iterator is set to end();

Parameters:

port_t – Name of port to locate.

Returns:

Iterator pointing at desired entry or end() iterator if element not found.

template<typename T>
T value(::sprokit::process::port_t const &port)

Get data value for specific port.

This method returns the data value for the specified port.

Parameters:

port – Name of port

Throws:
  • std::runtime_error – if the specified port name is not in this set.

  • sprokit::bad_datum_cast_exception – if the requested data type does not match the actual type of the data from the port.

Returns:

Data value corresponding to the port.

template<typename T>
T value_or(::sprokit::process::port_t const &port, T const &value_if_missing = {})

Get data value for specific port, or default value if not found.

This method returns the data value for the specified port.

Parameters:

port – Name of port

Throws:

sprokit::bad_datum_cast_exception – if the requested data type does not match the actual type of the data from the port.

Returns:

Data value corresponding to the port, or value_if_missing if no such element is found.

template<typename T>
inline T get_port_data(::sprokit::process::port_t const &port)

Get data value for specific port.

This method returns the data value for the specified port.

Deprecated:

This method exists for historic reasons. Use value instead.

Parameters:

port – Name of port

Throws:
  • std::runtime_error – if the specified port name is not in this set.

  • sprokit::bad_datum_cast_exception – if the requested data type does not match the actual type of the data from the port.

Returns:

Data value corresponding to the port.

size_t size() const

Return the number of elements in the adapter_data_set.

This method returns the number of elements stored in the adapter_data_set. Similar to std::map::size()

Returns:

Number of elements in the adapter_data_set.

Public Static Functions

static adapter_data_set_t create(data_set_type type = data_set_type::data)

Create a new data set object.

This factory method returns a newly allocated object managed by smart pointer. A factory method is used to enforce shared pointer memory management for these objects. Allocating one of these objects on the stack will not work.

Parameters:

type – Data set type (data or input end marker)

Returns:

New data set object managed by smart pointer.

Polling the interface queues

The above example code uses send() and receive() in a loop to supply data to the pipeline and retrieve the output. While this is a direct approach, it will not work if there is any latency in the pipeline (that is, if the pipeline will only produce any output after some number of inputs are supplied). Both the send() and receive() methods will block if they cannot complete, but it is possible to check to see if these calls will block or succeed. When ready to call receive(), the empty() method can be called to see if there is an adapter_data_set available. In the same manner, the full() method can be called to see if there is space to send a adapter_data_set before calling send().

How to Specify A Pipeline

Pipelines are provided to the embedded_pipeline object as a stream. The most common types of streams used are file streams and string streams. To use a file stream, the controlling program needs to open the file and pass the stream to the embedded pipeline object. Alternatively, the pipeline can be specified as a string stream. The easiest way to build the pipeline definition is to use the macros supplied in

#include <sprokit/pipeline_util/literal_pipeline.h>

SPROKIT_PROCESS(type, name)

Define a process in the pipeline.

This macro defines a process that is to be part of the pipeline. If this process needs any config parameters, they must follow the process definition.

Parameters:
  • type – Process type name

  • name – Process instance name in this pipeline.

SPROKIT_CONFIG(key, value)

Specify a configuration item.

Parameters:
  • key – Configuration item name or key.

  • value – Value for this configuration item.

SPROKIT_CONNECT(up_name, up_port, down_name, down_port)

Define a connection between two ports.

This macro generates a single connection between two processes.

Parameters:
  • up_name – Upstream process name.

  • up_port – Upstream port name.

  • down_name – Downstream process name.

  • down_port – Downstream port name.

SPROKIT_CONFIG_BLOCK(name)

Start a named configuration block.

This macro inserts a “config <name>” line in the config starting a named config block. Note that this is different than a nested config block.

Parameters:
  • name – Name of the config block.

There are additional macros available for more detailed control over the pipeline definition. Refer to the full documentation for the details.

If needed, the scheduler type can be specified in the pipeline definition as follows:

std::stringstream pipeline_desc;
pipeline_desc  << SPROKIT_CONFIG_BLOCK( "_scheduler" )
               << SPROKIT_CONFIG( "type", scheduler_type );

Advanced Topics

Overriding Input and/or Output Adapters

There are some cases where the pipeline will directly source its data rather than get it from the controlling program. Reading data directly from a file is one example. Similarly, there are pipelines that sink the output data directly rather than passing it back to the controlling program. In both of these cases, the checks for mandatory input and output adapter processes need to be bypassed to allow the pipeline to run. This is done by deriving a class and overriding the connect_input_adapter() and/or connect_output_adapter() method to just return true. The following is an example of overriding the input adapter requirement.

class no_src_embedded_pipeline
  : public kwiver::embedded_pipeline
{
public:
  no_src_embedded_pipeline() { }
  virtual ~no_src_embedded_pipeline() { }

protected:
  bool connect_input_adapter() override { return true; }
 };

Modifying the Pipeline Configuration

There may be a situation where some part of the pipeline configuration must be added or modified at runtime. The update_config() method can be overridden in a derived class to provide the ability to inspect and make modifications to the pipeline config prior to building the pipeline.

Embedded Pipeline Extensions

Embedded pipeline extensions (EPX) can be dynamically loaded based on the pipeline configuration. One use case for EPX is to check resource availability before starting the pipeline. For example, to ensure that there are enough GPUs for the pipeline to start.

The EPX are a property of the pipeline configuration and can be specified as follows:

config _pipeline
  block embedded_pipeline_extension
    type = foo # specify the name of extension to load
    block foo # optional configuration for extension
      param = value  # optional parameters
    endblock
endblock

The embedded_pipeline_extension config block is only used for mebedded pipelines and has no effect on pipelines run with the command line kwiver pipeline runner.

The list of available extensions can be found by entering the following command:

plugin_explorer --fact embedded_pipeline_extension

Usually EPX are application specific so it is unlikely you will find an existing one that is useful.

To implement your own extension, derive a class from kwiver::embedded_pipeline_extension and implement the virtual methods.

class embedded_pipeline_extension

Base class for embedded pipeline extension.

Subclassed by kwiver::epx_test

Public Functions

inline virtual void pre_setup (VITAL_UNUSED context &ctxt)

pipeline pre-setup hook

This method is called before the pipeline is setup. A context object is supplied to this hook so it can query its running environment.

Parameters:

ctxt – The calling context.

inline virtual void post_setup (VITAL_UNUSED context &ctxt)

pipeline post-setup hook

This method is called after the pipeline is setup. A context object is supplied to this hook so it can query its running environment.

Parameters:

ctxt – The calling context.

inline virtual void end_of_output (VITAL_UNUSED context &ctxt)

End of data received from pipeline.

This method is called when the end of data marker is received from the pipeline output adapter via a receive() call. If the pipeline has a sink process and does not contain an output_adapter, then this method will never be called.

Parameters:

ctxt – The calling context

virtual void configure (VITAL_UNUSED kwiver::vital::config_block_sptr const conf)

Configure provider.

This method sends the epx config sub-block to the implementation. The derived class would use the contents of this config block to modify its behaviour. This is how the epx gets its configuration and only needs to be overridden if the epx is expecting config items.

Parameters:

conf – Configuration block.

virtual kwiver::vital::config_block_sptr get_configuration() const

Get default configuration block.

This method returns the default configuration block for this pipeline extension and should contain all configuration items that are needed by this implementation. The config block returned is used during introspection to provide documentation on what config parameters are needed and what they mean. The config block should contain any default values for the config items.

Returns:

Pointer to config block.

class context

Subclassed by kwiver::real_context