compute_pipes

package
v0.0.0-...-5d3869a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: Apache-2.0 Imports: 61 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrKillSwitch     = errors.New("ErrKillSwitch")
	ComputePipesStart = time.Now()
)

Load multipart files to JetStore, file to load are provided by channel fileNameCh

View Source
var DomainKeyDelimit string = os.Getenv("JETS_DOMAIN_KEY_SEPARATOR")
View Source
var ErrEOFTooEarly error = errors.New("error: cannot determine encoding, got EOF")
View Source
var ErrFileZipArchive error = errors.New("the file is a ZIP archive")
View Source
var ErrUnknownEncoding error = errors.New("encoding unknown, unable to detected the encoding")
View Source
var ErrUnknownEncodingOrWrongDelimit error = errors.New("unable to detected the file encoding or the specified delimiter is not the delimiter used in the file")
View Source
var HashingAlgo string = strings.ToLower(os.Getenv("JETS_DOMAIN_KEY_HASH_ALGO"))
View Source
var HashingSeed uuid.UUID

Functions

func AdjustFillers

func AdjustFillers(rawHeaders *[]string)

func ApplyAllConditionalTransformationSpec

func ApplyAllConditionalTransformationSpec(pipeConfig []PipeSpec, env map[string]any) error

Function to apply all conditional transformation spec in the pipeConfig

func ApplyConditionalEnvVars

func ApplyConditionalEnvVars(envVars []ConditionalEnvVariable, env map[string]any) error

func AssertMetadataSource

func AssertMetadataSource(re JetRuleEngine, config *JetrulesSpec, env map[string]any) error

Assert rule config to meta graph from the pipeline configuration

func AssertRuleConfiguration

func AssertRuleConfiguration(re JetRuleEngine, config *JetrulesSpec, env map[string]any) (err error)

Assert rule config to meta graph from the pipeline configuration

func AssertSourcePeriodInfo

func AssertSourcePeriodInfo(re JetRuleEngine, config *JetrulesSpec, env map[string]any) (err error)

Assert source period info (date, period, type) to rdf graph

func BuildEvalOperator

func BuildEvalOperator(op string) (evalOperator, error)

build the runtime evaluator for the column transformation

func CastToRdfType

func CastToRdfType(input any, rdfType string) (any, error)

Utility function for casting to specified rdf type

func ClearJetrulesCaches

func ClearJetrulesCaches()

Function to clear local caches, needed for when workspace have been updated and need to force the lambda to reload the worspace metadata from jetstore db Note: This must be called before starting goroutines as it is not thread safe.

func CmpRecord

func CmpRecord(lhs any, rhs any) int

*TODO Migrate to this cmp function satisfy sort.SortFunc: SortFunc sorts the slice x in ascending order as determined by the cmp function. This sort is not guaranteed to be stable. cmp(a, b) should return a negative number when a < b, a positive number when a > b and zero when a == b or a and b are incomparable in the sense of a strict weak ordering. SortFunc requires that cmp is a strict weak ordering. See https://en.wikipedia.org/wiki/Weak_ordering#Strict_weak_orderings. The function should return 0 for incomparable items.

func ConvertToSchemaV2

func ConvertToSchemaV2(v any, se *FieldInfo) (any, error)

func ConvertWithSchemaV1

func ConvertWithSchemaV1(irow int, col arrow.Array, trimStrings bool, fieldInfo *FieldInfo, castToRdfTxtFnc CastToRdfTxtFnc) (any, error)

return value is either nil or a string representing the input v

func CreateOutputTable

func CreateOutputTable(dbpool *pgxpool.Pool, tableName pgx.Identifier, tableSpec *TableSpec) error

Create the Output Table

func DetectCrAsEol

func DetectCrAsEol(fileHd ReaderAtSeeker, compression string) (bool, error)

func DetectCsvDelimitor

func DetectCsvDelimitor(fileHd ReaderAtSeeker, compression string) (jcsv.Chartype, error)

func DetectEncoding

func DetectEncoding(data []byte, delimit rune) (string, error)

func DetectFileEncoding

func DetectFileEncoding(fileHd ReaderAtSeeker, delimit rune) (encoding string, err error)

func DoesQualifyAsDate

func DoesQualifyAsDate(value string) bool

Qualify as a date:

  • 6 < len < 30
  • contains digits, letters, space, comma, dash, slash, column, apostrophe

Example of longest date to expect: 23 November 2025 13:10 AM

func DownloadS3Object

func DownloadS3Object(externalBucket string, s3Key *FileKeyInfo, localDir string, minSize int64) (string, int64, error)

func EvalHash

func EvalHash(key any, partitions uint64) *uint64

func ExtractPartitionLabelFromS3Key

func ExtractPartitionLabelFromS3Key(s3Key string) (string, error)

func ExtractRdfNodeInfoJson

func ExtractRdfNodeInfoJson(e any) (value, rdfType string, err error)

Function to extract value and type from json struct

func GetAdditionalInputColumns

func GetAdditionalInputColumns(cpConfig *ComputePipesConfig) []string

Function to get the column to add to the input file(s), these columns are added to the input_row channel. They are taken from the channel config with name input_row.

func GetDomainProperties

func GetDomainProperties(className string, directPropertitesOnly bool) ([]string, error)

Get the domain properties for className. If directPropertiesOnly is true, return only the direct properties of the class, not the inherited ones. When directPropertiesOnly is false, the return slice will have jets:key and rdf:type at position 0 and 1 resp.

func GetFileKeys

func GetFileKeys(ctx context.Context, dbpool *pgxpool.Pool, sessionId string, nodeId int) ([][]*FileKeyInfo, error)

Get the file_key(s) from compute_pipes_shard_registry assigned to nodeId -- these are the input multipart files. This is used during the sharding mode. Returned [0][]*FileKeyInfo are the main input files, [i][]*FileKeyInfo, i=1:n are the merge channel files.

func GetMaxConcurrency

func GetMaxConcurrency(nbrNodes, defaultMaxConcurrency int) int

func GetPartitionSize4LookbackPeriod

func GetPartitionSize4LookbackPeriod(bucket, fileKey, lookbackPeriod string, env map[string]any, partitionSizes map[string]int64) error

Get Partition key and size. This is used un start reducing.

func GetRawHeadersCsv

func GetRawHeadersCsv(fileHd *os.File, fileName, fileFormat, compression string, sepFlag jcsv.Chartype,
	encoding string, eolByte byte, multiColumns, noQuotes bool) ([]string, error)

Get the raw headers from fileHd, put them in *ic Use *sepFlag as the csv delimiter

func GetRawHeadersParquet

func GetRawHeadersParquet(fileHd *os.File, fileName string) ([]string, error)

func GetRawHeadersXlsx

func GetRawHeadersXlsx(fileName string, fileFormatDataJson string) ([]string, error)

func GetRdfNodeValue

func GetRdfNodeValue(r RdfNode) any

func GetRuleEngineConfig

func GetRuleEngineConfig(mainRuleFile, property string) (string, error)

Function to get domain classes info from the local workspace

func GetS3FileKeys

func GetS3FileKeys(processName, sessionId, mainInputStepId, jetsPartitionLabel string,
	inputChannelConfig *InputChannelConfig, envSettings map[string]any) ([][]*FileKeyInfo, error)

Get the file_key(s) from s3 for the given process/session/step/partition. This is used during the reducing mode.

func GetS3Objects4LookbackPeriod

func GetS3Objects4LookbackPeriod(bucket, fileKey, lookbackPeriod string, env map[string]any) ([]*awsi.S3Object, error)

func GetWorkspaceControl

func GetWorkspaceControl() (*rete.WorkspaceControl, error)

func GetWorkspaceDataProperties

func GetWorkspaceDataProperties() (map[string]*rete.DataPropertyNode, error)

Function to get the domain properties info from the local workspace

func GetWorkspaceDomainClasses

func GetWorkspaceDomainClasses() (map[string]*rete.ClassNode, error)

Function to get domain classes info from the local workspace

func GetWorkspaceDomainTables

func GetWorkspaceDomainTables() (map[string]*rete.TableNode, error)

Function to get domain tables info from the local workspace

func Hash

func Hash(key []byte, partitions uint64) uint64

func LastIndexByte

func LastIndexByte(s []byte, c byte) int

func MakeJetsPartitionLabel

func MakeJetsPartitionLabel(jetsPartitionKey any) string

func MergeParquetPartitions

func MergeParquetPartitions(nrowsInRec int64, columns []string, fout io.Writer, FileNamesCh <-chan FileName, gotError func(error))

func MergeTransformationSpec

func MergeTransformationSpec(host, override *TransformationSpec) error

func ParseDate

func ParseDate(date string) (*time.Time, error)

func ParseDateDateFormat

func ParseDateDateFormat(dateFormats []string, value string) (tm time.Time, fmt string)

ParseDateDateFormat returns the first match of [value] amongs the [dateFormats]

func ParseDateStrict

func ParseDateStrict(date string) (*time.Time, error)

func ParseDatetime

func ParseDatetime(datetime string) (*time.Time, error)

func ParseInputFormatDataXlsx

func ParseInputFormatDataXlsx(inputDataFormatJson *string) (map[string]any, error)

func PrepareOutoutTable

func PrepareOutoutTable(dbpool *pgxpool.Pool, tableIdentifier pgx.Identifier, tableSpec *TableSpec) error

func SplitTableName

func SplitTableName(tableName string) (pgx.Identifier, error)

func ToBool

func ToBool(b any) bool

func ToDouble

func ToDouble(d any) (float64, error)

func WorkspaceHome

func WorkspaceHome() string

func WorkspacePrefix

func WorkspacePrefix() string

func WrapReaderWithDecoder

func WrapReaderWithDecoder(r io.Reader, encoding string) (utfReader io.Reader, err error)

func WrapReaderWithDecompressor

func WrapReaderWithDecompressor(r io.Reader, compression string) io.Reader

func WrapWriterWithEncoder

func WrapWriterWithEncoder(w io.Writer, encoding string) (utfWriter io.Writer, err error)

func WriteCpipesArgsToS3

func WriteCpipesArgsToS3(cpa []ComputePipesNodeArgs, s3Location string) error

Write the compute pipes arguments as json to s3 at location specified by s3Location This is currently not used, needed for Distributed Map

func WriteParquetPartitionV3

func WriteParquetPartitionV3(schemaInfo *ParquetSchemaInfo, nrowsInRec int64, fout io.Writer, inCh <-chan []any, gotError func(error))

Types

type AggregateTransformationPipe

type AggregateTransformationPipe struct {
	// contains filtered or unexported fields
}

func (*AggregateTransformationPipe) Apply

func (ctx *AggregateTransformationPipe) Apply(input *[]interface{}) error

Implementing interface PipeTransformationEvaluator

func (*AggregateTransformationPipe) Done

func (ctx *AggregateTransformationPipe) Done() error

func (*AggregateTransformationPipe) Finally

func (ctx *AggregateTransformationPipe) Finally()

type AnalyzeSpec

type AnalyzeSpec struct {
	SchemaProvider                  string               `json:"schema_provider,omitempty"`
	ScrubChars                      string               `json:"scrub_chars,omitempty"`
	DistinctValuesWhenLessThanCount int                  `json:"distinct_values_when_less_than_count,omitzero"`
	PadShortRowsWithNulls           bool                 `json:"pad_short_rows_with_nulls,omitzero"`
	ColumnNameToken                 *ColumnNameTokenNode `json:"column_name_token,omitempty"`
	EntityHints                     []*EntityHint        `json:"entity_hints,omitempty"`
	RegexTokens                     []RegexNode          `json:"regex_tokens,omitempty"`
	LookupTokens                    []LookupTokenNode    `json:"lookup_tokens,omitempty"`
	KeywordTokens                   []KeywordTokenNode   `json:"keyword_tokens,omitempty"`
	FunctionTokens                  []FunctionTokenNode  `json:"function_tokens,omitempty"`
}

AnalyzeSpec configuration SchemaProvider is used for external configuration, such as date format ScrubChars is the list of characters to scrub from the input values. DistinctValuesWhenLessThanCount is the threshold to list distinct values. PadShortRowsWithNulls indicates to pad short rows with nulls to match row length. ColumnNameToken is used to classify columns based on their name. EntityHints provide hints for entity recognition. RegexTokens specify regex patterns to identify classification tokens. LookupTokens specify lookup tables to identify classification tokens. KeywordTokens specify keywords to identify classification tokens. FunctionTokens specify functions to identify classification tokens.

type AnalyzeState

type AnalyzeState struct {
	ColumnName     string
	ColumnPos      int
	DistinctValues map[string]*DistinctCount
	NullCount      int
	LenWelford     *WelfordAlgo
	CharToScrub    map[rune]bool
	RegexMatch     map[string]*RegexCount
	LookupState    []*LookupTokensState
	KeywordMatch   map[string]*KeywordCount
	ParseDate      *ParseDateMatchFunction
	ParseDouble    *ParseDoubleMatchFunction
	ParseText      *ParseTextMatchFunction
	TotalRowCount  int
	BlankMarkers   *BlankFieldMarkers
	Spec           *TransformationSpec
}

Analyze data TransformationSpec implementing PipeTransformationEvaluator interface

func (*AnalyzeState) NewToken

func (state *AnalyzeState) NewToken(value string) error

func (*AnalyzeState) NewValue

func (state *AnalyzeState) NewValue(value any) error

type AnalyzeTransformationPipe

type AnalyzeTransformationPipe struct {
	// contains filtered or unexported fields
}

firstInputRow is the first row from the input channel. A reference to it is kept for use in the Done function so to carry over the select fields in the columnEvaluators. Note: columnEvaluators is applied only on the firstInputRow and it is used only to select column having same value for every input row or to put constant values comming from the env

Base columns available on the output (only columns specified in outputCh are actually send out):

"column_name",
"column_pos",
"input_data_type",
"entity_hint",
"distinct_count",
"distinct_count_pct",
"distinct_values",
"null_count",
"null_count_pct",
"total_count",
"avr_length",
"length_var"

Other base columns available when using parse function (parse_date, parse_double, parse_text)

"min_date",
"max_date",
"min_double",
"max_double",
"large_double_pct",
"min_length",
"max_length",
"min_value",
"max_value",
"large_value_pct",
"minmax_type"

Note: for min_value/max_value are determined based on this priority rule:

  1. min_date/max_date if more than 50% of values are valid dates;
  2. min_double/max_double if more than 75% of values are valid double (note this includes int as well);
  3. otherwise it's the text min/max length.

Note: distinct_values will contains a comma-separated list of distinct value if distinct_count < spec.distinct_values_when_less_than_count. There is a hardcoded check that cap distinct_values_when_less_than_count to 20.

column_name: the name of the column using the original column name if available, otherwise it is the column name from the input channel. entity_hint: is determined based on the hints provided in spec.analyze_config.entity_hints

Other columns are added based on regex_tokens, lookup_tokens, keyword_tokens, and parse functions The value of the domain counts are expressed in percentage of the non null count:

ratio = <domain count>/(totalCount - nullCount) * 100.0

Note that if totalCount - nullCount == 0, then ratio = -1.

inputDataType contains the data type for each column according to the parquet schema. inputDataType is a map of column name -> input data type Range of value for input data type: string (default if not parquet), bool, int32, int64, float32, float64, date, uint32, uint64

func (*AnalyzeTransformationPipe) Apply

func (ctx *AnalyzeTransformationPipe) Apply(input *[]any) error

Implementing interface PipeTransformationEvaluator

func (*AnalyzeTransformationPipe) Done

func (ctx *AnalyzeTransformationPipe) Done() error

func (*AnalyzeTransformationPipe) Finally

func (ctx *AnalyzeTransformationPipe) Finally()

type AnonymizationAction

type AnonymizationAction struct {
	// contains filtered or unexported fields
}

type AnonymizeSpec

type AnonymizeSpec struct {
	Mode                        string               `json:"mode,omitempty"`
	LookupName                  string               `json:"lookup_name,omitempty"`
	AnonymizeType               string               `json:"anonymize_type,omitempty"`
	KeyPrefix                   string               `json:"key_prefix,omitempty"`
	DeidFunctions               map[string]string    `json:"deid_functions,omitempty"`
	DeidLookups                 map[string]string    `json:"deid_lookups,omitempty"`
	InputDateLayout             string               `json:"input_date_layout,omitempty"`
	DateFormatsColumn           string               `json:"date_formats_column,omitempty"`
	OutputDateLayout            string               `json:"output_date_layout,omitempty"`
	KeyDateLayout               string               `json:"key_date_layout,omitempty"`
	DefaultInvalidDate          string               `json:"default_invalid_date,omitempty"`
	SchemaProvider              string               `json:"schema_provider,omitempty"`
	AdjustFieldWidthOnFW        bool                 `json:"adjust_field_width_on_fixed_width_file,omitzero"`
	OmitPrefixOnFW              bool                 `json:"omit_prefix_on_fixed_width_file,omitzero"`
	AnonymizedColumnsOutputFile *ColumnFileSpec      `json:"anonymized_columns_output_file,omitzero"`
	KeysOutputChannel           *OutputChannelConfig `json:"keys_output_channel"`
}

Mode: Specify mode of action: de-identification, anonymization (default) - de-identification: mask the data (not reversible); - anonymization: replace the data with hashed value (reversible using crosswalk file). LookupName is name of lookup table containing the file metadata from analyze operator. AnonymizeType is column name in lookup table that specifiy how to anonymize (value: date, text). KeyPrefix is column name of lookup table to use as prefix of the anonymized value or key mapping for de-identification lookup table. DeidFunctions is map of KeyPrefix value to function name for de-identification. DeidLookups is map of KeyPrefix value to lookup table name for substitution values for de-identification. InputDateLayout is the format for parsing the input date (incoming data) when specified. DateFormatsColumn is column name of the lookup table having the list of date format (optional). OutputDateLayout is the format to use for anonymized date, will be set at 1st of the month of the original date (anonymization) or to XXX when de-identification. KeyDateLayout is the format to use in the key mapping file (crosswalk file) for anonymization. DefaultInvalidDate is a placeholder to use as the anonymized date when the input date (the date to anonymize) is not valid. If unspecified, the input value is used unchanged as the output value. DefaultInvalidDate must be a valid date in the format YYYY/MM/DD or MM/DD/YYYY so it can be parsed using JetStore default date parser. The date will be formatted according to KeyDateLayout. OutputDateLayout defaults to InputDateLayout. KeyDateLayout defaults to OutputDateLayout. SchemaProvider is used to: - get the DateLayout / KeyDateLayout if not specified here. - get CapDobYears / SetDodToJan1 for date anonymization. If date format is not specified, the default format for both OutputDateFormat and KeyDateFormat is "2006/01/02", ie. yyyy/MM/dd and the rdf.ParseDate() is used to parse the input date.

type AnonymizeTransformationPipe

type AnonymizeTransformationPipe struct {
	// contains filtered or unexported fields
}

func (*AnonymizeTransformationPipe) Apply

func (ctx *AnonymizeTransformationPipe) Apply(input *[]any) error

Implementing interface PipeTransformationEvaluator

func (*AnonymizeTransformationPipe) Done

func (ctx *AnonymizeTransformationPipe) Done() error

Anonymization complete, now send out the keys mapping to keys_output_channel if in mode "anonymization"

func (*AnonymizeTransformationPipe) Finally

func (ctx *AnonymizeTransformationPipe) Finally()

type ArrayBuilder

type ArrayBuilder interface {
	Reserve(n int)
	Append(v any)
	AppendEmptyValue()
	AppendNull()
	NewArray() arrow.Array
	Release()
}

func NewBinaryBuilder

func NewBinaryBuilder(mem memory.Allocator) ArrayBuilder

func NewBooleanBuilder

func NewBooleanBuilder(mem memory.Allocator) ArrayBuilder

func NewDateBuilder

func NewDateBuilder(mem memory.Allocator) ArrayBuilder

func NewDecimal128Builder

func NewDecimal128Builder(mem memory.Allocator, precision, scale int32) ArrayBuilder

func NewDecimal256Builder

func NewDecimal256Builder(mem memory.Allocator, precision, scale int32) ArrayBuilder

func NewFloat32Builder

func NewFloat32Builder(mem memory.Allocator) ArrayBuilder

func NewFloat64Builder

func NewFloat64Builder(mem memory.Allocator) ArrayBuilder

func NewInt32Builder

func NewInt32Builder(mem memory.Allocator) ArrayBuilder

func NewInt64Builder

func NewInt64Builder(mem memory.Allocator) ArrayBuilder

func NewStringBuilder

func NewStringBuilder(mem memory.Allocator) ArrayBuilder

func NewTimestampBuilder

func NewTimestampBuilder(mem memory.Allocator) ArrayBuilder

func NewUint32Builder

func NewUint32Builder(mem memory.Allocator) ArrayBuilder

func NewUint64Builder

func NewUint64Builder(mem memory.Allocator) ArrayBuilder

type ArrayRecord

type ArrayRecord struct {
	Record arrow.Record
	// contains filtered or unexported fields
}

func NewArrayRecord

func NewArrayRecord(schema *arrow.Schema, builders []ArrayBuilder) *ArrayRecord

func (*ArrayRecord) Release

func (r *ArrayRecord) Release()

type BadRowsChannel

type BadRowsChannel struct {
	OutputCh chan []byte
	// contains filtered or unexported fields
}

func NewBadRowChannel

func NewBadRowChannel(s3DeviceManager *S3DeviceManager, s3BasePath string,
	doneCh chan struct{}, errCh chan error) *BadRowsChannel

func (*BadRowsChannel) Done

func (ctx *BadRowsChannel) Done()

func (*BadRowsChannel) Write

func (ctx *BadRowsChannel) Write(nodeId int)

type BadRowsSpec

type BadRowsSpec struct {
	BadRowsStepId string `json:"bad_rows_step_id,omitempty"`
}

Defines the identification and handling of bad rows Currently only used for input_row channel BadRowsStepId: step id in stage location to output bad rows The input row is considered a bad row when any of WhenCriteria applies then the row is sent to bad row channel and remove from the input rows.

type BinaryBuilder

type BinaryBuilder struct {
	// contains filtered or unexported fields
}

func (*BinaryBuilder) Append

func (b *BinaryBuilder) Append(v any)

func (*BinaryBuilder) AppendEmptyValue

func (b *BinaryBuilder) AppendEmptyValue()

func (*BinaryBuilder) AppendNull

func (b *BinaryBuilder) AppendNull()

func (*BinaryBuilder) NewArray

func (b *BinaryBuilder) NewArray() arrow.Array

func (*BinaryBuilder) Release

func (b *BinaryBuilder) Release()

func (*BinaryBuilder) Reserve

func (b *BinaryBuilder) Reserve(n int)

type BlankFieldMarkers

type BlankFieldMarkers struct {
	CaseSensitive bool
	Markers       []string
}

BlankFieldMarkers is the runtime version of BlankFieldMarkersSpec when CaseSensitive is true, the Markers are in upper case. Note: This type is re-used by the Anonymize operator as well.

type BlankFieldMarkersSpec

type BlankFieldMarkersSpec struct {
	CaseSensitive bool     `json:"case_sensitive,omitzero"`
	Markers       []string `json:"markers,omitempty"`
}

type BooleanBuilder

type BooleanBuilder struct {
	// contains filtered or unexported fields
}

func (*BooleanBuilder) Append

func (b *BooleanBuilder) Append(v any)

func (*BooleanBuilder) AppendEmptyValue

func (b *BooleanBuilder) AppendEmptyValue()

func (*BooleanBuilder) AppendNull

func (b *BooleanBuilder) AppendNull()

func (*BooleanBuilder) NewArray

func (b *BooleanBuilder) NewArray() arrow.Array

func (*BooleanBuilder) Release

func (b *BooleanBuilder) Release()

func (*BooleanBuilder) Reserve

func (b *BooleanBuilder) Reserve(n int)

type BuilderContext

type BuilderContext struct {
	// contains filtered or unexported fields
}

func (*BuilderContext) BuildCaseExprTCEvaluator

func (ctx *BuilderContext) BuildCaseExprTCEvaluator(source *InputChannel, outCh *OutputChannel,
	spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)

func (*BuilderContext) BuildComputeGraph

func (ctx *BuilderContext) BuildComputeGraph() error

func (*BuilderContext) BuildCountTCEvaluator

func (ctx *BuilderContext) BuildCountTCEvaluator(source *InputChannel, outCh *OutputChannel,
	spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)

func (*BuilderContext) BuildDistinctCountTCEvaluator

func (ctx *BuilderContext) BuildDistinctCountTCEvaluator(source *InputChannel, outCh *OutputChannel,
	spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)

func (*BuilderContext) BuildExprNodeEvaluator

func (ctx *BuilderContext) BuildExprNodeEvaluator(sourceName string, columns map[string]int, spec *ExpressionNode) (evalExpression, error)

Delegate to ExprBuilderContext

func (*BuilderContext) BuildHashTCEvaluator

func (ctx *BuilderContext) BuildHashTCEvaluator(source *InputChannel, outCh *OutputChannel,
	spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)

The Hash operator example (dw_rawfilename is string):

	Case hash function:
 =========================================
		{
			"name": "jets_partition",
			"type": "hash",
			"hash_expr": {
				"expr": "dw_rawfilename",
				"composite_expr": ["partion", "dw_rawfilename"],
				"nbr_jets_partitions": 3,
				"alternate_composite_expr": ["name", "gender", "format_date(dob)"],
			}

jets_partition will be of type uint64

	Case compute domain key:
 =========================================
		{
			"name": "Claim:domain_key",
			"type": "hash",
			"hash_expr": {
				"domain_key": "Claim",
				"compute_domain_key": true
			}

Claim:domain_key will be of type string

func (*BuilderContext) BuildLookupTCEvaluator

func (ctx *BuilderContext) BuildLookupTCEvaluator(source *InputChannel, outCh *OutputChannel,
	spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)

func (*BuilderContext) BuildMapReduceTCEvaluator

func (ctx *BuilderContext) BuildMapReduceTCEvaluator(source *InputChannel, outCh *OutputChannel,
	spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)

func (*BuilderContext) BuildMapTCEvaluator

func (ctx *BuilderContext) BuildMapTCEvaluator(source *InputChannel, outCh *OutputChannel,
	spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)

func (*BuilderContext) BuildMinTCEvaluator

func (ctx *BuilderContext) BuildMinTCEvaluator(source *InputChannel, outCh *OutputChannel,
	spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)

func (*BuilderContext) BuildPipeTransformationEvaluator

func (ctx *BuilderContext) BuildPipeTransformationEvaluator(source *InputChannel, jetsPartitionKey any,
	partitionResultCh chan ComputePipesResult, spec *TransformationSpec) (PipeTransformationEvaluator, error)

Build the PipeTransformationEvaluator: one of map_record, aggregate, or partition_writer The partitionResultCh argument is used only by partition_writer to return the number of rows written and the error that might occur

func (*BuilderContext) BuildSumTCEvaluator

func (ctx *BuilderContext) BuildSumTCEvaluator(source *InputChannel, outCh *OutputChannel,
	spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)

func (*BuilderContext) BuildTransformationColumnEvaluator

func (ctx *BuilderContext) BuildTransformationColumnEvaluator(source *InputChannel, outCh *OutputChannel, spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)

build the runtime evaluator for the column transformation

func (*BuilderContext) FileKey

func (ctx *BuilderContext) FileKey() string

func (*BuilderContext) MakeMergeTransformationPipe

func (ctx *BuilderContext) MakeMergeTransformationPipe(
	mainSource *InputChannel,
	mergeSources []*InputChannel,
	outCh *OutputChannel,
	spec *TransformationSpec) (PipeTransformationEvaluator, error)

func (*BuilderContext) NewAggregateTransformationPipe

func (ctx *BuilderContext) NewAggregateTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*AggregateTransformationPipe, error)

func (*BuilderContext) NewAnalyzeState

func (ctx *BuilderContext) NewAnalyzeState(columnName string, columnPos int, inputColumns *map[string]int,
	sp SchemaProvider, blankMarkers *BlankFieldMarkers, spec *TransformationSpec) (*AnalyzeState, error)

func (*BuilderContext) NewAnalyzeTransformationPipe

func (ctx *BuilderContext) NewAnalyzeTransformationPipe(source *InputChannel, outputCh *OutputChannel,
	spec *TransformationSpec) (*AnalyzeTransformationPipe, error)

func (*BuilderContext) NewAnonymizeTransformationPipe

func (ctx *BuilderContext) NewAnonymizeTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*AnonymizeTransformationPipe, error)

func (*BuilderContext) NewClusteringPoolManager

func (ctx *BuilderContext) NewClusteringPoolManager(config *ClusteringSpec,
	source *InputChannel, outputCh *OutputChannel, correlationOutputCh *OutputChannel,
	clusteringResultCh chan ClusteringResult) (poolMgr *ClusteringPoolManager, err error)

Create the ClusteringPoolManager, it will be set to the receiving BuilderContext

func (*BuilderContext) NewClusteringTransformationPipe

func (ctx *BuilderContext) NewClusteringTransformationPipe(source *InputChannel, outputCh *OutputChannel,
	spec *TransformationSpec) (*ClusteringTransformationPipe, error)

func (*BuilderContext) NewDistinctTransformationPipe

func (ctx *BuilderContext) NewDistinctTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*DistinctTransformationPipe, error)

func (*BuilderContext) NewFilterTransformationPipe

func (ctx *BuilderContext) NewFilterTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*FilterTransformationPipe, error)

func (*BuilderContext) NewGroupByTransformationPipe

func (ctx *BuilderContext) NewGroupByTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*GroupByTransformationPipe, error)

Builder function for GroupByTransformationPipe

func (*BuilderContext) NewHashEvaluator

func (ctx *BuilderContext) NewHashEvaluator(source *InputChannel,
	spec *HashExpression) (*HashEvaluator, error)

Build the HashEvaluator, see BuildHashTCEvaluator

func (*BuilderContext) NewHighFreqTransformationPipe

func (ctx *BuilderContext) NewHighFreqTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*HighFreqTransformationPipe, error)

func (*BuilderContext) NewJetrulesTransformationPipe

func (ctx *BuilderContext) NewJetrulesTransformationPipe(source *InputChannel, _ *OutputChannel, spec *TransformationSpec) (
	*JetrulesTransformationPipe, error)

func (*BuilderContext) NewJrPoolManager

func (ctx *BuilderContext) NewJrPoolManager(
	config *JetrulesSpec, source *InputChannel, rdfType2Columns map[string][]string, ruleEngine JetRuleEngine,
	outputChannels []*JetrulesOutputChan, jetrulesWorkerResultCh chan JetrulesWorkerResult) (jrpm *JrPoolManager, err error)

Create the JrPoolManager, it will be set to the receiving BuilderContext

func (*BuilderContext) NewMapRecordTransformationPipe

func (ctx *BuilderContext) NewMapRecordTransformationPipe(source *InputChannel, outputCh *OutputChannel,
	spec *TransformationSpec) (*MapRecordTransformationPipe, error)

func (*BuilderContext) NewMergeTransformationPipe

func (ctx *BuilderContext) NewMergeTransformationPipe(
	source *InputChannel,
	outCh *OutputChannel,
	spec *TransformationSpec) (PipeTransformationEvaluator, error)

Builder function for MergeTransformationPipe

func (*BuilderContext) NewPartitionWriterTransformationPipe

func (ctx *BuilderContext) NewPartitionWriterTransformationPipe(source *InputChannel, jetsPartitionKey any,
	outputCh *OutputChannel, copy2DeviceResultCh chan ComputePipesResult, spec *TransformationSpec) (*PartitionWriterTransformationPipe, error)

Create a new jets_partition writer, the partition is identified by the jetsPartition

func (*BuilderContext) NewShufflingTransformationPipe

func (ctx *BuilderContext) NewShufflingTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*ShufflingTransformationPipe, error)

func (*BuilderContext) NewSortTransformationPipe

func (ctx *BuilderContext) NewSortTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*SortTransformationPipe, error)

func (*BuilderContext) ReportMetrics

func (ctx *BuilderContext) ReportMetrics()

func (*BuilderContext) StartFanOutPipe

func (ctx *BuilderContext) StartFanOutPipe(spec *PipeSpec, source *InputChannel, writePartitionsResultCh chan ComputePipesResult)

func (*BuilderContext) StartSplitterPipe

func (ctx *BuilderContext) StartSplitterPipe(spec *PipeSpec, source *InputChannel, writePartitionsResultCh chan ComputePipesResult)

type CaseExpression

type CaseExpression struct {
	When ExpressionNode    `json:"when"`
	Then []*ExpressionNode `json:"then"`
}

type CastToRdfFnc

type CastToRdfFnc = func(v any) (any, error)

This file contains function to cast input data into rdf type based on domain classes

func BuildCastToRdfFunctions

func BuildCastToRdfFunctions(domainClass string, properties []string) ([]CastToRdfFnc, error)

type CastToRdfTxtFnc

type CastToRdfTxtFnc = func(v string) (any, error)

func BuildCastToRdfTxtFunctions

func BuildCastToRdfTxtFunctions(domainClass string, properties []string) ([]CastToRdfTxtFnc, error)

type Channel

type Channel struct {
	Name          string
	Channel       chan []any
	Columns       *map[string]int
	DomainKeySpec *DomainKeysSpec
	Config        *ChannelSpec
}

type ChannelRegistry

type ChannelRegistry struct {
	InputRowChannel      *InputChannel
	ComputeChannels      map[string]*Channel
	OutputTableChannels  []string
	ClosedChannels       map[string]bool
	ClosedChMutex        sync.Mutex
	DistributionChannels map[string]*[]string
}

ChannelRegistry keeps track of all input and output channels inputRowChannel, called input_row correspond to the main input file. InputMergeChannels correspond to any merge input files. ComputeChannels correspond to all other channels created for intermediate steps in the compute graph. OutputTableChannels correspond to the output tables that need to be written ClosedChannels keeps track of which channels have been closed

func (*ChannelRegistry) AddDistributionChannel

func (r *ChannelRegistry) AddDistributionChannel(input string) string

func (*ChannelRegistry) CloseChannel

func (r *ChannelRegistry) CloseChannel(name string)

func (*ChannelRegistry) GetInputChannel

func (r *ChannelRegistry) GetInputChannel(name string, hasGroupedRows bool) (*InputChannel, error)

func (*ChannelRegistry) GetOutputChannel

func (r *ChannelRegistry) GetOutputChannel(name string) (*OutputChannel, error)

type ChannelResults

type ChannelResults struct {
	LoadFromS3FilesResultCh chan LoadFromS3FilesResult
	Copy2DbResultCh         chan chan ComputePipesResult
	WritePartitionsResultCh chan chan ComputePipesResult
	S3PutObjectResultCh     chan ComputePipesResult
	JetrulesWorkerResultCh  chan chan JetrulesWorkerResult
	ClusteringResultCh      chan chan ClusteringResult
}

ChannelResults holds the channel reporting back results. LoadFromS3FilesResultCh: results from loading files (row count) Copy2DbResultCh: results of records written to JetStore DB (row count) WritePartitionsResultCh: report on rows output to s3 (row count) S3PutObjectResultCh: reports on nbr of files put to s3 (file count) JetrulesWorkerResultCh: reports on nbr of rete session and errors ClusteringResultCh: reports on nbr of clusters identified and errors

type ChannelSpec

type ChannelSpec struct {
	Name                 string          `json:"name"`
	Columns              []string        `json:"columns"`
	ClassName            string          `json:"class_name,omitempty"`
	DirectPropertiesOnly bool            `json:"direct_properties_only,omitzero"`
	HasDynamicColumns    bool            `json:"has_dynamic_columns,omitzero"`
	SameColumnsAsInput   bool            `json:"same_columns_as_input,omitzero"`
	DomainKeys           map[string]any  `json:"domain_keys,omitempty"`
	DomainKeysInfo       *DomainKeysSpec `json:"domain_keys_spec,omitzero"`
	// contains filtered or unexported fields
}

ChannelSpec specifies the columns of a channel The columns can be obtained from a domain class from the local workspace using class_name. In that case, the columns that are specified in the slice, are added to the columns of the domain class. When direct_properties_only is true, only take the data properties of the class, not including the properties of the parent classes. DomainKeys provide the ability to configure the domain keys in the cpipes config document. DomainKeysSpec is parsed version of DomainKeys or the spec from the domain_keys_registry table. DomainKeysSpec is derived from DomainKeys when provided. columnsMap is added in StartComputePipes

func GetChannelSpec

func GetChannelSpec(channels []ChannelSpec, name string) *ChannelSpec

type ChannelState

type ChannelState struct {
	// contains filtered or unexported fields
}

type ClusterCorrelation

type ClusterCorrelation struct {
	// contains filtered or unexported fields
}

func NewClusterCorrelation

func NewClusterCorrelation(c1, c2 string, minObservationsCount int) *ClusterCorrelation

func (*ClusterCorrelation) AddObservation

func (cc *ClusterCorrelation) AddObservation(distinctValues, nbrObservations int)

func (*ClusterCorrelation) CumulatedCounts

func (cc *ClusterCorrelation) CumulatedCounts() (int, int)

returns commulated counts Note: a minimum number of observations for column1 is required, otherwise the function returns -1, -1

type ClusterInfo

type ClusterInfo struct {
	// contains filtered or unexported fields
}

func MakeClusters

func MakeClusters(columnsCorrelation []*ColumnCorrelation,
	columnClassificationMap map[string]string, config *ClusteringSpec) []*ClusterInfo

Function that build the clusters from the raw column correlation

func NewClusterInfo

func NewClusterInfo(classificationMap map[string]string, config *ClusteringSpec) *ClusterInfo

func (*ClusterInfo) AddMember

func (cc *ClusterInfo) AddMember(column string)

func (*ClusterInfo) String

func (cc *ClusterInfo) String() string

type ClusterShardingInfo

type ClusterShardingInfo struct {
	TotalFileSize     int64 `json:"total_file_size"`
	MaxNbrPartitions  int   `json:"max_nbr_partitions"`
	NbrPartitions     int   `json:"nbr_partitions"`
	MultiStepSharding int   `json:"multi_step_sharding"`
}

Contains info about the clustersharding. This info is determined during the sharding phase in ShardFileKeys and passed to the StartReducing actions via StartComputePipesArgs

type ClusterShardingSpec

type ClusterShardingSpec struct {
	AppliesToFormat             string  `json:"applies_to_format,omitempty"`
	WhenTotalSizeGe             int     `json:"when_total_size_ge_mb,omitzero"`
	MaxNbrPartitions            int     `json:"max_nbr_partitions,omitzero"`
	MultiStepShardingThresholds int     `json:"multi_step_sharding_thresholds,omitzero"`
	ShardSizeMb                 float64 `json:"shard_size_mb,omitzero"`
	ShardMaxSizeMb              float64 `json:"shard_max_size_mb,omitzero"`
	ShardSizeBy                 float64 `json:"shard_size_by,omitzero"`     // for testing only
	ShardMaxSizeBy              float64 `json:"shard_max_size_by,omitzero"` // for testing only
	S3WorkerPoolSize            int     `json:"s3_worker_pool_size,omitzero"`
	MaxConcurrency              int     `json:"max_concurrency,omitzero"`
}

Cluster sizing configuration Allows to dynamically determine the NbrNodes based on total size of input files. AppliesToFormat allows to specify the file format to which this spec applies, e.g., parquet, csv, etc. When empty, applies to all formats. WhenTotalSizeGe: Specify the condition for this to be applied in MB (read as "when total size greater than or equal to"). When using ecs tasks, MaxConcurrency applies to ECS cluster, otherwise MaxConcurrency is the number of concurrent lambda functions executing. Note that S3WorkerPoolSize is used for reducing01, all other reducing steps use the S3WorkerPoolSize set at the ClusterSpec level. ShardSizeMb/ShardMaxSizeMb must be spcified to determine the nbr of nodes and to allocate files to shards. When [MaxNbrPartitions] is not specified, the value at the ClusterSpec level is taken.

type ClusterSpec

type ClusterSpec struct {
	MaxNbrPartitions            int                   `json:"max_nbr_partitons,omitzero"`
	MultiStepShardingThresholds int                   `json:"multi_step_sharding_thresholds,omitzero"`
	DefaultShardSizeMb          float64               `json:"default_shard_size_mb,omitzero"`
	DefaultShardMaxSizeMb       float64               `json:"default_shard_max_size_mb,omitzero"`
	DefaultShardSizeBy          float64               `json:"default_shard_size_by,omitzero"`     // for testing only
	DefaultShardMaxSizeBy       float64               `json:"default_shard_max_size_by,omitzero"` // for testing only
	ShardOffset                 int                   `json:"shard_offset,omitzero"`
	DefaultMaxConcurrency       int                   `json:"default_max_concurrency,omitzero"`
	S3WorkerPoolSize            int                   `json:"s3_worker_pool_size,omitzero"`
	ClusterShardingTiers        []ClusterShardingSpec `json:"cluster_sharding_tiers,omitempty"`
	IsDebugMode                 bool                  `json:"is_debug_mode,omitzero"`
	KillSwitchMin               int                   `json:"kill_switch_min,omitzero"`
	ShardingInfo                *ClusterShardingInfo  `json:"sharding_info,omitzero"`
}

Cluster configuration [DefaultMaxConcurrency] is to override the env var MAX_CONCURRENCY [nbrPartitions] is specified at ClusterShardingSpec level otherwise at the ClusterSpec level. [nbrPartitions] is determined by the nbr of sharding nodes, capped by MaxNbrPartitions. [DefaultShardSizeMb] is the default value (in MB) when not specified at ClusterShardingSpec level. [DefaultShardMaxSizeMb] is the default value (in MB) when not specified at ClusterShardingSpec level. [DefaultShardSizeBy] is the default value (in bytes) when not specified at ClusterShardingSpec level. [DefaultShardMaxSizeBy] is the default value (in bytes) when not specified at ClusterShardingSpec level. NOTE: [ShardSizeMb] / [ShardMaxSizeMb] must be spefified for the sharding to take place. [MultiStepShardingThresholds] is the number of partitions to trigger the use of multi step sharding. When [MultiStepShardingThresholds] > 0 then [nbrPartitions] is sqrt(nbr of sharding nodes). [ShardingInfo] is calculated based on input files. [ShardingInfo] is used by the hash operator. Do not set [ShardingInfo] at configuration time, it will be ignored and replaced with the calculated values. Note: Make sure that ClusterShardingSpec is in decreasing order of WhenTotalSizeGe.

func (*ClusterSpec) NbrPartitions

func (cs *ClusterSpec) NbrPartitions(mode string) int

type ClusteringDistributor

type ClusteringDistributor struct {
	// contains filtered or unexported fields
}

type ClusteringPoolManager

type ClusteringPoolManager struct {
	WorkersTaskCh chan []any

	WaitForDone *sync.WaitGroup
	// contains filtered or unexported fields
}

ClusteringDistributors distribute row by unique value of column1 to an associated ClusteringPoolWorkers. ClusteringPoolWorkers calculate the columns correlation in parallel. poolWg is a wait group of the workers. The WorkersTaskCh is a channel between the clustering operator and the pool manager (to limit the nbr of rows) The distributionTaskCh is used by the pool manager to distribute the input rows to all the workers. The distributionResultCh is used to collect the correlation resultts from the workers by the pool manager. The correlationOutputCh is to send the correlation results to s3, this is done when is_debug is true

type ClusteringResult

type ClusteringResult struct {
	Err error
}

type ClusteringSpec

type ClusteringSpec struct {
	MaxInputCount                int                     `json:"max_input_count,omitzero"`
	MinColumn1NonNilCount        int                     `json:"min_column1_non_null_count,omitzero"`
	MinColumn2NonNilCount        int                     `json:"min_column2_non_null_count,omitzero"`
	TargetColumnsLookup          TargetColumnsLookupSpec `json:"target_columns_lookup"`
	ClusterDataSubclassification []string                `json:"cluster_data_subclassification,omitempty"`
	SoloDataSubclassification    []string                `json:"solo_data_subclassification,omitempty"`
	TransitiveDataClassification []string                `json:"transitive_data_classification,omitempty"`
	IsDebug                      bool                    `json:"is_debug,omitzero"`
	CorrelationOutputChannel     *OutputChannelConfig    `json:"correlation_output_channel,omitzero"`
}

If is_debug is true, correlation results are forwarded to s3 otherwise the correlation_output_channel is only used to specify the intermediate channels between the pool manager and the workers. MinColumn1NonNilCount is min nbr of column1 distinct values observed MinColumn2NonNilCount is min nbr of non nil values of column2 for a worker to report the correlation. ClusterDataSubclassification contains data_classification values, when found in a cluster all columns member of the cluster get that value as data_subclassification.

type ClusteringTransformationPipe

type ClusteringTransformationPipe struct {
	// contains filtered or unexported fields
}

func (*ClusteringTransformationPipe) Apply

func (ctx *ClusteringTransformationPipe) Apply(input *[]any) error

Implementing interface PipeTransformationEvaluator Each call to Apply, the input correspond to a row for which we calculate the column correlation.

func (*ClusteringTransformationPipe) Done

func (ctx *ClusteringTransformationPipe) Done() error

func (*ClusteringTransformationPipe) Finally

func (ctx *ClusteringTransformationPipe) Finally()

type ClusteringWorker

type ClusteringWorker struct {
	// contains filtered or unexported fields
}

func NewClusteringWorker

func NewClusteringWorker(config *ClusteringSpec, source *InputChannel,
	column1 *string, columns2 []string, outputChannel *OutputChannel,
	done chan struct{}, errCh chan error) *ClusteringWorker

source and outputChannel are provided for their spec, the data is sent and recieved via inputCh and outputCh

func (*ClusteringWorker) DoWork

func (ctx *ClusteringWorker) DoWork(inputCh <-chan []any, outputCh chan<- []any, resultCh chan ClusteringResult)

type ColumnCorrelation

type ColumnCorrelation struct {
	// contains filtered or unexported fields
}

func NewColumnCorrelation

func NewColumnCorrelation(c1, c2 string, dc1, dc2, oc int) *ColumnCorrelation

type ColumnFileSpec

type ColumnFileSpec struct {
	// OutputLocation: custom file key.
	// Bucket is bucket or empty for jetstore one.
	// Delimiter: rune delimiter to use for output file.
	Bucket         string `json:"bucket,omitempty"`
	OutputLocation string `json:"output_location,omitempty"`
	SchemaProvider string `json:"schema_provider,omitempty"`
	Delimiter      rune   `json:"delimiter,omitzero"`
}

type ColumnNameLookupNode

type ColumnNameLookupNode struct {
	Name                string   `json:"name"`
	ColumnNames         []string `json:"column_names,omitempty"`
	ColumnNameFragments []string `json:"column_name_fragments,omitempty"`
}

ColumnNameLookupNode specifies the column name to classification token Name: classification token name ColumnNames: list of column names that map to the classification token

type ColumnNameTokenNode

type ColumnNameTokenNode struct {
	Name   string                  `json:"name"`
	Lookup []*ColumnNameLookupNode `json:"lookup,omitempty"`
}

ColumnNameTokenNode specifies the classification by column name match Name: correspond to the name of the output column where the classification token is stored Lookup: list of ColumnNameLookupNode to match the column names to the classification token.

type CompiledPartFileComponent

type CompiledPartFileComponent struct {
	ColumnName string
	Regex      *regexp.Regexp
}

type ComputePipesArgs

type ComputePipesArgs struct {
	ComputePipesNodeArgs
	ComputePipesCommonArgs
}

Full arguments to cp_node for sharding and reducing

type ComputePipesCommonArgs

type ComputePipesCommonArgs struct {
	CpipesMode            string                     `json:"cpipes_mode,omitempty"`
	Client                string                     `json:"client,omitempty"`
	Org                   string                     `json:"org,omitempty"`
	ObjectType            string                     `json:"object_type,omitempty"`
	FileKey               string                     `json:"file_key,omitempty"`
	SessionId             string                     `json:"session_id,omitempty"`
	MainInputStepId       string                     `json:"read_step_id,omitempty"`
	MergeFiles            bool                       `json:"merge_files"`
	InputSessionId        string                     `json:"input_session_id,omitempty"`
	SourcePeriodKey       int                        `json:"source_period_key"`
	ProcessName           string                     `json:"process_name,omitempty"`
	SourcesConfig         SourcesConfigSpec          `json:"sources_config"`
	DomainKeysSpecByClass map[string]*DomainKeysSpec `json:"domain_keys_by_class,omitempty"`
	PipelineConfigKey     int                        `json:"pipeline_config_key"`
	UserEmail             string                     `json:"user_email,omitempty"`
}

Common arguments factored out and put into the the ComputePipesConfig component MergeFile property is to indicate that the pipe is at the stage of merging the part files into a single output file. This will use a single node since merge_files has a single partition to read from (the step_id prior to merge_files writes a single partition). Note: Client and Org are the pipeline execution client and org and may be different than the client/vendor of the actual data (case using stand-in client/org name). In that situation the actual client/vendor of the data is specified at run time via the SchemaProviders on table input_registry.

type ComputePipesConfig

type ComputePipesConfig struct {
	CommonRuntimeArgs      *ComputePipesCommonArgs `json:"common_runtime_args,omitzero"`
	MetricsConfig          *MetricsSpec            `json:"metrics_config,omitzero"`
	ClusterConfig          *ClusterSpec            `json:"cluster_config,omitzero"`
	OutputTables           []*TableSpec            `json:"output_tables,omitempty"`
	OutputFiles            []OutputFileSpec        `json:"output_files,omitempty"`
	LookupTables           []*LookupSpec           `json:"lookup_tables,omitempty"`
	Channels               []ChannelSpec           `json:"channels,omitempty"`
	Context                []ContextSpec           `json:"context,omitempty"`
	SchemaProviders        []*SchemaProviderSpec   `json:"schema_providers,omitempty"`
	PipesConfig            []PipeSpec              `json:"pipes_config,omitempty"`
	ReducingPipesConfig    [][]PipeSpec            `json:"reducing_pipes_config,omitempty"`
	ConditionalPipesConfig []ConditionalPipeSpec   `json:"conditional_pipes_config,omitempty"`
}

This file contains the Compute Pipes configuration model

func UnmarshalComputePipesConfig

func UnmarshalComputePipesConfig(computePipesJson *string) (*ComputePipesConfig, error)

func (*ComputePipesConfig) GetComputePipes

func (cp *ComputePipesConfig) GetComputePipes(stepId int, env map[string]any) ([]PipeSpec, int, error)

This function is called once per compute pipes step (sharding or redicung) so we construct the ExprNodeEvaluator as needed.

func (*ComputePipesConfig) GetStepName

func (cp *ComputePipesConfig) GetStepName(stepId int) string

func (*ComputePipesConfig) MainInputChannel

func (cp *ComputePipesConfig) MainInputChannel() *InputChannelConfig

func (*ComputePipesConfig) NbrComputePipes

func (cp *ComputePipesConfig) NbrComputePipes() int

type ComputePipesContext

type ComputePipesContext struct {
	ComputePipesArgs
	CpConfig              *ComputePipesConfig
	FileKeyComponents     map[string]any
	PartFileKeyComponents []CompiledPartFileComponent
	AddionalInputHeaders  []string
	EnvSettings           map[string]any
	SamplingCount         int
	JetStoreTempFolder    string
	InputFileKeys         [][]*FileKeyInfo
	JetRules              JetRulesProxy
	ChResults             *ChannelResults
	KillSwitch            chan struct{}
	Done                  chan struct{}
	ErrCh                 chan error
	FileNamesCh           []chan FileName
	DownloadS3ResultCh    chan DownloadS3Result // avoid to modify ChannelResult for now...
	S3DeviceMgr           *S3DeviceManager
	SchemaManager         *SchemaManager
}

func (*ComputePipesContext) DoneAll

func (cpCtx *ComputePipesContext) DoneAll(err error)

func (*ComputePipesContext) DownloadS3Files

func (cpCtx *ComputePipesContext) DownloadS3Files(inFolderPath []string, externalBucket string, fileKeys [][]*FileKeyInfo) error

func (*ComputePipesContext) InsertPipelineExecutionStatus

func (cpCtx *ComputePipesContext) InsertPipelineExecutionStatus(dbpool *pgxpool.Pool) (int, error)

Register the CPIPES execution status details to pipeline_execution_details

func (*ComputePipesContext) LoadFiles

func (cpCtx *ComputePipesContext) LoadFiles(ctx context.Context, dbpool *pgxpool.Pool) (err error)

func (*ComputePipesContext) NewMergeFileReader

func (cpCtx *ComputePipesContext) NewMergeFileReader(mergePos int, inputFormat string, delimiter rune, outputSp SchemaProvider, headers []string,
	writeHeaders bool, compression string) (io.Reader, error)

outputSp is needed to determine if we quote all or non fields. It also provided the writeHeaders value.

func (*ComputePipesContext) NewS3DeviceManager

func (cpCtx *ComputePipesContext) NewS3DeviceManager() error

Create the S3DeviceManager

func (*ComputePipesContext) ProcessFilesAndReportStatus

func (cpCtx *ComputePipesContext) ProcessFilesAndReportStatus(ctx context.Context, dbpool *pgxpool.Pool) error

func (*ComputePipesContext) ReadCsvFile

func (cpCtx *ComputePipesContext) ReadCsvFile(
	filePath *FileName, fileReader ReaderAtSeeker, castToRdfTxtTypeFncs []CastToRdfTxtFnc,
	reorderColumnsOnRead []int,
	computePipesInputCh chan<- []any, badRowChannel *BadRowsChannel) (int64, int64, error)

func (*ComputePipesContext) ReadFixedWidthFile

func (cpCtx *ComputePipesContext) ReadFixedWidthFile(
	filePath *FileName, fileReader ReaderAtSeeker,
	fwEncodingInfo *FixedWidthEncodingInfo, castToRdfTxtTypeFncs []CastToRdfTxtFnc,
	reorderColumnsOnRead []int,
	computePipesInputCh chan<- []any, badRowChannel *BadRowsChannel) (int64, int64, error)

func (*ComputePipesContext) ReadParquetFileV2

func (cpCtx *ComputePipesContext) ReadParquetFileV2(filePath *FileName,
	fileReader parquet.ReaderAtSeeker, readBatchSize int64,
	castToRdfTxtTypeFncs []CastToRdfTxtFnc, inputSchemaCh chan<- ParquetSchemaInfo,
	reorderColumnsOnRead []int, computePipesInputCh chan<- []any) (int64, error)

func (*ComputePipesContext) ReadXlsxFile

func (cpCtx *ComputePipesContext) ReadXlsxFile(filePath *FileName, xlsxSheetInfo map[string]any,
	castToRdfTxtTypeFncs []CastToRdfTxtFnc, reorderColumnsOnRead []int,
	computePipesInputCh chan<- []any, badRowChannel *BadRowsChannel) (int64, int64, error)

ReadXlsxFile reads an xlsx file and sends the records to computePipesInputCh EnforceRowMinLength and EnforceRowMaxLength does not apply to xlsx files, values past the last expected field are ignored. As a result badRowChannel is not used.

func (*ComputePipesContext) StartComputePipes

func (cpCtx *ComputePipesContext) StartComputePipes(dbpool *pgxpool.Pool,
	inputSchemaCh <-chan ParquetSchemaInfo, computePipesInputCh <-chan []any,
	computePipesMergeChs []chan []any)

Function to write transformed row to database

func (*ComputePipesContext) StartMergeFiles

func (cpCtx *ComputePipesContext) StartMergeFiles(dbpool *pgxpool.Pool) (cpErr error)

Function to merge the partfiles into a single file by streaming the content to s3 using a channel. This is run in the main thread, so no need to have a result channel back to the caller.

func (*ComputePipesContext) UpdatePipelineExecutionStatus

func (cpCtx *ComputePipesContext) UpdatePipelineExecutionStatus(
	dbpool *pgxpool.Pool, key int,
	inputRowCount, badRowCount, totalFilesSizeMb, inputFilesCount, reteSessionCount, outputRowCount int,
	cpipesStepId, status, errMessage string) error

type ComputePipesNodeArgs

type ComputePipesNodeArgs struct {
	NodeId             int    `json:"id"`
	JetsPartitionLabel string `json:"jp,omitempty"`
	PipelineExecKey    int    `json:"pe"`
}

Arguments to start cp_node for sharding and reducing minimal set of arguments to reduce the size of the json to call the lambda functions

func ReadCpipesArgsFromS3

func ReadCpipesArgsFromS3(s3Location string) ([]ComputePipesNodeArgs, error)

This is currently not used, needed for Distributed Map (for testing purpose)

func (*ComputePipesNodeArgs) CoordinateComputePipes

func (args *ComputePipesNodeArgs) CoordinateComputePipes(ctx context.Context, dbpool *pgxpool.Pool, jrProxy JetRulesProxy) error

type ComputePipesResult

type ComputePipesResult struct {
	// Table name can be jets_partition name
	// PartCount is nbr of file part in jets_partition
	TableName    string
	CopyRowCount int64
	PartsCount   int64
	Err          error
}

type ComputePipesRun

type ComputePipesRun struct {
	CpipesCommands       any                   `json:"cpipesCommands"`
	CpipesCommandsS3Key  string                `json:"cpipesCommandsS3Key,omitempty"`
	CpipesMaxConcurrency int                   `json:"cpipesMaxConcurrency"`
	StartReducing        StartComputePipesArgs `json:"startReducing"`
	IsLastReducing       bool                  `json:"isLastReducing"`
	NoMoreTask           bool                  `json:"noMoreTask"`
	UseECSReducingTask   bool                  `json:"useECSReducingTask"`
	ReportsCommand       []string              `json:"reportsCommand"`
	SuccessUpdate        map[string]any        `json:"successUpdate"`
	ErrorUpdate          map[string]any        `json:"errorUpdate"`
}

Returned by the cp_starter for a cpipes run. CpipesCommandsS3Key is for Distributed Map, currently not used. CpipesMaxConcurrency is passed to step function. IsLastReducing is true when StartReducing is the last step. NoMoreTask is true when StartReducing turned out to have nothing to do and we're past the last step, the step function will go directly to ReportsCommand. UseECSReducingTask is true when to use fargate task rather than lambda functions. ReportsCommand contains the argument for RunReport SuccessUpdate / ErrorUpdate are the arguments for status update.

type ConditionalEnvVarEvaluator

type ConditionalEnvVarEvaluator struct {
	// contains filtered or unexported fields
}

func (*ConditionalEnvVarEvaluator) Update

func (evaluator *ConditionalEnvVarEvaluator) Update(env map[string]any) error

type ConditionalEnvVariable

type ConditionalEnvVariable struct {
	CaseExpr []CaseExpression  `json:"case_expr,omitempty"` // case operator
	ElseExpr []*ExpressionNode `json:"else_expr,omitempty"` // case operator
}

type ConditionalPipeSpec

type ConditionalPipeSpec struct {
	StepName        string                   `json:"step_name,omitempty"`
	UseEcsTasks     bool                     `json:"use_ecs_tasks,omitzero"`
	UseEcsTasksWhen *ExpressionNode          `json:"use_ecs_tasks_when,omitzero"`
	PipesConfig     []PipeSpec               `json:"pipes_config"`
	When            *ExpressionNode          `json:"when,omitzero"`
	AddlEnv         []ConditionalEnvVariable `json:"addl_env,omitempty"`
}

ConditionalPipe: Each step are executed conditionally. When the key "when" is nil, the step is always executed. Available expr variables as main schema provider env var (see above): multi_step_sharding as int, when > 0, nbr of shards is nbr_partition**2 total_file_size in bytes nbr_partitions as int (used for hashing purpose) use_ecs_tasks is true to use ecs fargate task use_ecs_tasks_when is an expression as the when property.

type ConditionalTransformationSpec

type ConditionalTransformationSpec struct {
	When ExpressionNode     `json:"when"`
	Then TransformationSpec `json:"then"`
}

This type is to provide conditional TransformationSpec such that the host TransformationSpec has fields overriden based on the condition. When is the condition to evaluate, if true then apply the Then spec. Note: when Then.Type is not empty, replace the host TransformationSpec altogether.

type ContextSpec

type ContextSpec struct {
	// Type range: file_key_component, partfile_key_component
	Type string `json:"type,omitempty"`
	Key  string `json:"key,omitempty"`
	Expr string `json:"expr,omitempty"`
}

type CpipesStartup

type CpipesStartup struct {
	CpConfig                      ComputePipesConfig         `json:"compute_pipes_config"`
	ProcessName                   string                     `json:"process_name,omitempty"`
	InputColumns                  []string                   `json:"input_columns,omitempty"`
	InputColumnsOriginal          []string                   `json:"input_columns_original,omitempty"`
	MainInputSchemaProviderConfig *SchemaProviderSpec        `json:"main_input_schema_provider_config,omitzero"`
	MainInputDomainKeysSpec       *DomainKeysSpec            `json:"main_input_domain_keys_spec,omitzero"`
	MainInputDomainClass          string                     `json:"main_input_domain_class,omitempty"`
	DomainKeysSpecByClass         map[string]*DomainKeysSpec `json:"domain_keys_spec_by_class,omitzero"`
	EnvSettings                   map[string]any             `json:"env_settings,omitzero"`
	PipelineConfigKey             int                        `json:"pipeline_config_key,omitempty"`
	InputSessionId                string                     `json:"input_session_id,omitempty"`
	SourcePeriodKey               int                        `json:"source_period_key,omitempty"`
	OperatorEmail                 string                     `json:"operator_email,omitempty"`
}

Collect and prepare cpipes configuration for both sharding and reducing steps. InputColumns correspond to the domain column in the main input file, which can be a subset of the columns in the main_input schema provider based on source_config table. InputColumns can be empty if needs to be read from the input file. InputColumnsOriginal is the original input columns before uniquefying them. It is empty if InputColumns is empty or already unique. MainInputDomainKeysSpec contains the domain keys spec based on source_config table, which can be overriden by value from the main schema provider. MainInputDomainClass applies when input_registry.input_type = 'domain_table'

func (*CpipesStartup) EvalUseEcsTask

func (cpipesStartup *CpipesStartup) EvalUseEcsTask(stepId int) (bool, error)

func (*CpipesStartup) GetComputePipesPartitions

func (cpipesStartup *CpipesStartup) GetComputePipesPartitions(dbpool *pgxpool.Pool, processName, sessionId string,
	inputChannelConfig *InputChannelConfig) ([]JetsPartitionInfo, error)

GetComputePipesPartitions get the jets_partition partition id and size. This is use during reducing steps.

func (*CpipesStartup) ValidatePipeSpecConfig

func (args *CpipesStartup) ValidatePipeSpecConfig(cpConfig *ComputePipesConfig, pipeConfig []PipeSpec) error

Function to validate the PipeSpec output channel config Apply a default snappy compression if compression is not specified and channel Type 'stage'. This function also syncs the input and ouput channels with the associated schema provider.

type CsvSourceS3

type CsvSourceS3 struct {
	// contains filtered or unexported fields
}

func NewCsvSourceS3

func NewCsvSourceS3(spec *CsvSourceSpec, env map[string]any) (*CsvSourceS3, error)

func (*CsvSourceS3) ReadFileToMetaGraph

func (ctx *CsvSourceS3) ReadFileToMetaGraph(re JetRuleEngine, config *JetrulesSpec) error

*TODO Refactor this ReadFileToMetaGraph func

type CsvSourceSpec

type CsvSourceSpec struct {
	// This is used for lookup tables and loading metadata in jetrules.
	// This is a single file source, the first file found is taken.
	// Type range: cpipes, csv_file (future)
	// Default values are taken from current pipeline
	// Format: csv, headerless_csv
	// Compression: none, snappy
	// MakeEmptyWhenNoFile: Do not make an error when no files
	// are found, make empty source. Default: generate an error when no files
	// are found in s3.
	Type                string `json:"type"`
	Format              string `json:"format,omitempty"`
	Compression         string `json:"compression,omitempty"`
	Delimiter           rune   `json:"delimiter,omitzero"`       // default ','
	ProcessName         string `json:"process_name,omitempty"`   // for cpipes
	ReadStepId          string `json:"read_step_id,omitempty"`   // for cpipes
	JetsPartitionLabel  string `json:"jets_partition,omitempty"` // for cpipes
	SessionId           string `json:"session_id,omitempty"`     // for cpipes
	ClassName           string `json:"class_name,omitempty"`     // used by jetrules_config
	MakeEmptyWhenNoFile bool   `json:"make_empty_source_when_no_files_found,omitzero"`
}

type DataSchemaSpec

type DataSchemaSpec struct {
	Columns string `json:"column"`
	RdfType string `json:"rdf_type,omitempty"`
}

type DateBuilder

type DateBuilder struct {
	// contains filtered or unexported fields
}

func (*DateBuilder) Append

func (b *DateBuilder) Append(v any)

func (*DateBuilder) AppendEmptyValue

func (b *DateBuilder) AppendEmptyValue()

func (*DateBuilder) AppendNull

func (b *DateBuilder) AppendNull()

func (*DateBuilder) NewArray

func (b *DateBuilder) NewArray() arrow.Array

func (*DateBuilder) Release

func (b *DateBuilder) Release()

func (*DateBuilder) Reserve

func (b *DateBuilder) Reserve(n int)

type Decimal128Builder

type Decimal128Builder struct {
	Precision int32
	Scale     int32
	// contains filtered or unexported fields
}

func (*Decimal128Builder) Append

func (b *Decimal128Builder) Append(v any)

func (*Decimal128Builder) AppendEmptyValue

func (b *Decimal128Builder) AppendEmptyValue()

func (*Decimal128Builder) AppendNull

func (b *Decimal128Builder) AppendNull()

func (*Decimal128Builder) NewArray

func (b *Decimal128Builder) NewArray() arrow.Array

func (*Decimal128Builder) Release

func (b *Decimal128Builder) Release()

func (*Decimal128Builder) Reserve

func (b *Decimal128Builder) Reserve(n int)

type Decimal256Builder

type Decimal256Builder struct {
	Precision int32
	Scale     int32
	// contains filtered or unexported fields
}

func (*Decimal256Builder) Append

func (b *Decimal256Builder) Append(v any)

func (*Decimal256Builder) AppendEmptyValue

func (b *Decimal256Builder) AppendEmptyValue()

func (*Decimal256Builder) AppendNull

func (b *Decimal256Builder) AppendNull()

func (*Decimal256Builder) NewArray

func (b *Decimal256Builder) NewArray() arrow.Array

func (*Decimal256Builder) Release

func (b *Decimal256Builder) Release()

func (*Decimal256Builder) Reserve

func (b *Decimal256Builder) Reserve(n int)

type DefaultPF

type DefaultPF struct {
	// contains filtered or unexported fields
}

DefaultPF is when there is no preprocessing function, simply add the value to the byte buffer

func (*DefaultPF) ApplyPF

func (pf *DefaultPF) ApplyPF(buf *bytes.Buffer, input *[]any) error

func (*DefaultPF) String

func (pf *DefaultPF) String() string

type DefaultSchemaProvider

type DefaultSchemaProvider struct {
	// contains filtered or unexported fields
}

columnNames is the list of file headers for fixed_width fwColumnPrefix is for fixed_width with multiple record type, prefix for making table columns (dkInfo)

func (*DefaultSchemaProvider) AdjustColumnWidth

func (sp *DefaultSchemaProvider) AdjustColumnWidth(width map[string]int) error

func (*DefaultSchemaProvider) BadRowsConfig

func (sp *DefaultSchemaProvider) BadRowsConfig() *BadRowsSpec

func (*DefaultSchemaProvider) BlankFieldMarkers

func (sp *DefaultSchemaProvider) BlankFieldMarkers() *BlankFieldMarkers

func (*DefaultSchemaProvider) Bucket

func (sp *DefaultSchemaProvider) Bucket() string

func (*DefaultSchemaProvider) CapDobYears

func (sp *DefaultSchemaProvider) CapDobYears() int

func (*DefaultSchemaProvider) ColumnNames

func (sp *DefaultSchemaProvider) ColumnNames() []string

func (*DefaultSchemaProvider) Columns

func (sp *DefaultSchemaProvider) Columns() []SchemaColumnSpec

func (*DefaultSchemaProvider) Compression

func (sp *DefaultSchemaProvider) Compression() string

func (*DefaultSchemaProvider) Delimiter

func (sp *DefaultSchemaProvider) Delimiter() rune

func (*DefaultSchemaProvider) DetectEncoding

func (sp *DefaultSchemaProvider) DetectEncoding() bool

func (*DefaultSchemaProvider) DiscardFileHeaders

func (sp *DefaultSchemaProvider) DiscardFileHeaders() bool

func (*DefaultSchemaProvider) DomainClass

func (sp *DefaultSchemaProvider) DomainClass() string

func (*DefaultSchemaProvider) DomainKeys

func (sp *DefaultSchemaProvider) DomainKeys() map[string]any

func (*DefaultSchemaProvider) DropExcedentHeaders

func (sp *DefaultSchemaProvider) DropExcedentHeaders() bool

func (*DefaultSchemaProvider) Encoding

func (sp *DefaultSchemaProvider) Encoding() string

func (*DefaultSchemaProvider) EnforceRowMaxLength

func (sp *DefaultSchemaProvider) EnforceRowMaxLength() bool

func (*DefaultSchemaProvider) EnforceRowMinLength

func (sp *DefaultSchemaProvider) EnforceRowMinLength() bool

func (*DefaultSchemaProvider) Env

func (sp *DefaultSchemaProvider) Env() map[string]any

func (*DefaultSchemaProvider) FixedWidthEncodingInfo

func (sp *DefaultSchemaProvider) FixedWidthEncodingInfo() *FixedWidthEncodingInfo

func (*DefaultSchemaProvider) FixedWidthFileHeaders

func (sp *DefaultSchemaProvider) FixedWidthFileHeaders() ([]string, string)

func (*DefaultSchemaProvider) Format

func (sp *DefaultSchemaProvider) Format() string

func (*DefaultSchemaProvider) Initialize

func (sp *DefaultSchemaProvider) Initialize(_ *pgxpool.Pool, spec *SchemaProviderSpec,
	_ map[string]any, isDebugMode bool) error

func (*DefaultSchemaProvider) InputFormatDataJson

func (sp *DefaultSchemaProvider) InputFormatDataJson() string

func (*DefaultSchemaProvider) IsPartFiles

func (sp *DefaultSchemaProvider) IsPartFiles() bool

func (*DefaultSchemaProvider) Key

func (sp *DefaultSchemaProvider) Key() string

func (*DefaultSchemaProvider) NbrRowsInRecord

func (sp *DefaultSchemaProvider) NbrRowsInRecord() int64

func (*DefaultSchemaProvider) NoQuotes

func (sp *DefaultSchemaProvider) NoQuotes() bool

func (*DefaultSchemaProvider) OutputEncoding

func (sp *DefaultSchemaProvider) OutputEncoding() string

func (*DefaultSchemaProvider) ParquetSchema

func (sp *DefaultSchemaProvider) ParquetSchema() *ParquetSchemaInfo

func (*DefaultSchemaProvider) QuoteAllRecords

func (sp *DefaultSchemaProvider) QuoteAllRecords() bool

func (*DefaultSchemaProvider) ReadBatchSize

func (sp *DefaultSchemaProvider) ReadBatchSize() int64

func (*DefaultSchemaProvider) ReadDateLayout

func (sp *DefaultSchemaProvider) ReadDateLayout() string

func (*DefaultSchemaProvider) ReorderColumnsOnRead

func (sp *DefaultSchemaProvider) ReorderColumnsOnRead() []int

func (*DefaultSchemaProvider) SchemaName

func (sp *DefaultSchemaProvider) SchemaName() string

func (*DefaultSchemaProvider) SetDodToJan1

func (sp *DefaultSchemaProvider) SetDodToJan1() bool

func (*DefaultSchemaProvider) SetParquetSchema

func (sp *DefaultSchemaProvider) SetParquetSchema(schema *ParquetSchemaInfo)

func (*DefaultSchemaProvider) TrimColumns

func (sp *DefaultSchemaProvider) TrimColumns() bool

func (*DefaultSchemaProvider) UseLazyQuotes

func (sp *DefaultSchemaProvider) UseLazyQuotes() bool

func (*DefaultSchemaProvider) UseLazyQuotesSpecial

func (sp *DefaultSchemaProvider) UseLazyQuotesSpecial() bool

func (*DefaultSchemaProvider) VariableFieldsPerRecord

func (sp *DefaultSchemaProvider) VariableFieldsPerRecord() bool

func (*DefaultSchemaProvider) WriteDateLayout

func (sp *DefaultSchemaProvider) WriteDateLayout() string

type DistinctCount

type DistinctCount struct {
	Value string
	Count int
}

type DistinctSpec

type DistinctSpec struct {
	DistinctOn []string `json:"distinct_on,omitempty"`
}

type DistinctTransformationPipe

type DistinctTransformationPipe struct {
	// contains filtered or unexported fields
}

func (*DistinctTransformationPipe) Apply

func (ctx *DistinctTransformationPipe) Apply(input *[]interface{}) error

Implementing interface PipeTransformationEvaluator

func (*DistinctTransformationPipe) Done

func (ctx *DistinctTransformationPipe) Done() error

func (*DistinctTransformationPipe) Finally

func (ctx *DistinctTransformationPipe) Finally()

type DomainKeyInfo

type DomainKeyInfo struct {
	KeyExpr    []string `json:"key_expr,omitempty"`
	ObjectType string   `json:"object_type,omitempty"`
}

DomainKeyInfo associates a domain hashed key made as a composide domain key with an optional prep-processing function on each of the column making the key. KeyExpr is the original function(column) expression. Columns: list of input column name making the domain key PreprocessFnc: list of pre-processing functions for the input column (one per column) ObjectType: Object type associated with the Domain Key

type DomainKeysSpec

type DomainKeysSpec struct {
	HashingOverride string                    `json:"hashing_override,omitempty"`
	DomainKeys      map[string]*DomainKeyInfo `json:"domain_keys_info,omitempty"`
}

DomainKeysSpec contains the overall information, with overriding hashing method. The hashing method is applicable to all object types. DomainKeys is a map keyed by the object type.

func ParseDomainKeyInfo

func ParseDomainKeyInfo(mainObjectType string, domainKeys any) (*DomainKeysSpec, error)

Parse domain key configuration info from [domainKeys], supporting 3 use cases: in json format:

	"key"
 ["key1", "key2"]

{"ObjectType1": "key", "ObjectType1": ["key1", "key2"]}

type DownloadS3Result

type DownloadS3Result struct {
	InputFilesCount int
	TotalFilesSize  int64
	Err             error
}

type EntityHint

type EntityHint struct {
	Entity             string   `json:"entity"`
	NameFragments      []string `json:"column_name_fragments,omitempty"`
	ExclusionFragments []string `json:"exclusion_fragments,omitempty"`
}

type ExprBuilderContext

type ExprBuilderContext map[string]any

main builder, builds expression evaluator

func (ExprBuilderContext) BuildExprNodeEvaluator

func (ctx ExprBuilderContext) BuildExprNodeEvaluator(sourceName string, columns map[string]int, spec *ExpressionNode) (evalExpression, error)

Note that columns can be nil when evalExtression is having map[string]any as argument.

type ExpressionNode

type ExpressionNode struct {
	// Type is for leaf nodes: select, value
	// Name is for CaseExpression.Then and TransformationColumnSpec.ElseExpr
	// to indicate which column to set the calculated value
	Name      string          `json:"name,omitempty"` // TransformationColumnSpec case operator
	Type      string          `json:"type,omitempty"`
	Expr      string          `json:"expr,omitempty"`
	ExprList  []string        `json:"expr_list,omitempty"`
	AsRdfType string          `json:"as_rdf_type,omitempty"`
	Arg       *ExpressionNode `json:"arg,omitzero"`
	Lhs       *ExpressionNode `json:"lhs,omitzero"`
	Op        string          `json:"op,omitempty"`
	Rhs       *ExpressionNode `json:"rhs,omitzero"`
}

type FetchFileInfoResult

type FetchFileInfoResult struct {
	Headers      []string
	SepFlag      jcsv.Chartype
	Encoding     string
	EolByte      byte
	MultiColumns bool
}

func FetchHeadersAndDelimiterFromFile

func FetchHeadersAndDelimiterFromFile(externalBucket, fileKey, fileFormat, compression, encoding string, delimitor rune,
	multiColumnsInput, noQuotes, fetchHeaders, fetchDelimitor, fetchEncoding, detectCrAsEol bool, fileFormatDataJson string) (*FetchFileInfoResult, error)

Main function

type FieldInfo

type FieldInfo struct {
	Name             string `json:"name"`
	Type             string `json:"type"`
	Nullable         bool   `json:"nullable,omitzero"`
	DecimalPrecision int32  `json:"precision,omitzero"`
	DecimalScale     int32  `json:"scale,omitzero"`
}

type FileConfig

type FileConfig struct {
	BadRowsConfig              *BadRowsSpec           `json:"bad_rows_config,omitzero"`
	BlankFieldMarkers          *BlankFieldMarkersSpec `json:"blank_field_markers,omitzero"`
	Bucket                     string                 `json:"bucket,omitempty"`
	Compression                string                 `json:"compression,omitempty"`
	Delimiter                  rune                   `json:"delimiter,omitzero"`
	DetectCrAsEol              bool                   `json:"detect_cr_as_eol,omitzero"`
	DetectEncoding             bool                   `json:"detect_encoding,omitzero"`
	DiscardFileHeaders         bool                   `json:"discard_file_headers,omitzero"`
	DomainClass                string                 `json:"domain_class,omitempty"`
	DomainKeys                 map[string]any         `json:"domain_keys,omitempty"`
	DropExcedentHeaders        bool                   `json:"drop_excedent_headers,omitzero"`
	Encoding                   string                 `json:"encoding,omitempty"`
	EnforceRowMaxLength        bool                   `json:"enforce_row_max_length,omitzero"`
	EnforceRowMinLength        bool                   `json:"enforce_row_min_length,omitzero"`
	EolByte                    byte                   `json:"eol_byte,omitzero"`
	FileKey                    string                 `json:"file_key,omitempty"`
	LookbackPeriods            string                 `json:"lookback_periods,omitzero"`
	FileName                   string                 `json:"file_name,omitempty"` // Type output
	FixedWidthColumnsCsv       string                 `json:"fixed_width_columns_csv,omitempty"`
	Format                     string                 `json:"format,omitempty"`
	InputFormatDataJson        string                 `json:"input_format_data_json,omitempty"`
	IsPartFiles                bool                   `json:"is_part_files,omitzero"`
	KeyPrefix                  string                 `json:"key_prefix,omitempty"`
	MainInputRowCount          int64                  `json:"main_input_row_count,omitzero"`
	MultiColumnsInput          bool                   `json:"multi_columns_input,omitzero"`
	NbrRowsInRecord            int64                  `json:"nbr_rows_in_record,omitzero"` // Format: parquet
	NoQuotes                   bool                   `json:"no_quotes,omitzero"`
	ParquetSchema              *ParquetSchemaInfo     `json:"parquet_schema,omitzero"`
	PutHeadersOnFirstPartition bool                   `json:"put_headers_on_first_partition,omitzero"`
	QuoteAllRecords            bool                   `json:"quote_all_records,omitzero"`
	ReadBatchSize              int64                  `json:"read_batch_size,omitzero"` // Format: parquet
	ReadDateLayout             string                 `json:"read_date_layout,omitempty"`
	ReorderColumnsOnRead       []int                  `json:"reorder_columns_on_read,omitempty"`
	TrimColumns                bool                   `json:"trim_columns,omitzero"`
	UseLazyQuotes              bool                   `json:"use_lazy_quotes,omitzero"`
	UseLazyQuotesSpecial       bool                   `json:"use_lazy_quotes_special,omitzero"`
	VariableFieldsPerRecord    bool                   `json:"variable_fields_per_record,omitzero"`
	WriteDateLayout            string                 `json:"write_date_layout,omitempty"`
	OutputEncoding             string                 `json:"output_encoding,omitempty"`
	OutputEncodingSameAsInput  bool                   `json:"output_encoding_same_as_input,omitempty"`
}

Configuration type for factoring out all file settings. This is used by more specific types such as: SchemaProviderSpec, InputChannelConfig, OutputChannelConfig, OutputFileSpec

type FileKeyInfo

type FileKeyInfo struct {
	// contains filtered or unexported fields
}

type FileName

type FileName struct {
	LocalFileName string
	InFileKeyInfo FileKeyInfo
}

type FilterColumnSpec

type FilterColumnSpec struct {
	LookupName     string   `json:"lookup_name,omitempty"`
	ColumnName     string   `json:"column_name,omitempty"`
	LookupColumn   string   `json:"lookup_column,omitempty"`
	RetainOnValues []string `json:"retain_on_values,omitempty"`
}

FilterColumnSpec specify how to filter the input rows before shuffling LookupName is the name of the lookup table containing the column metadata, produced by the analyze operator. ColumnName is the name of the column of the lookup table containing the column name to use on the output rows. LookupColumn is the name of the column in the lookup table containing column name of the metadata table to filter on. RetainOnValues is the list of values in the lookup table for LookupColumn to retain, only rows with thosae values are retained.

type FilterSpec

type FilterSpec struct {
	RowLengthStrict bool            `json:"row_length_strict,omitzero"`
	When            *ExpressionNode `json:"when,omitzero"`
	MaxOutputCount  int             `json:"max_output_records,omitzero"`
}

Filter row base on:

  • when criteria, if provided,
  • max output count, if provided.

RowLengthStrict: when true, will enforce that input row length matches the schema length, otherwise they are filtered.

type FilterTransformationPipe

type FilterTransformationPipe struct {
	// contains filtered or unexported fields
}

Filter operator. Filter input records based on filter criteria. Note: Automatically filter the bad rows (ie rows w/o the expected number of columns)

func (*FilterTransformationPipe) Apply

func (ctx *FilterTransformationPipe) Apply(input *[]any) error

Implementing interface PipeTransformationEvaluator

func (*FilterTransformationPipe) Done

func (ctx *FilterTransformationPipe) Done() error

func (*FilterTransformationPipe) Finally

func (ctx *FilterTransformationPipe) Finally()

type FixedWidthColumn

type FixedWidthColumn struct {
	Start      int
	End        int
	ColumnName string
}

Struct to hold column names and positions for fixed-width file encoding. ColumnsMap key is record type or empty string if single record type (RecordTypeColumn = nil in that case). In ColumnsMap elements, ColumnName is <record type>.<record column name> to make it unique across record types. However RecordTypeColumn.ColumnName is <record column name> without prefix. Note that all record type MUST have RecordTypeColumn.ColumnName with same start and end position. Any record having a unrecognized record type (ie not found in ColumnsMap) are ignored.

func (*FixedWidthColumn) String

func (c *FixedWidthColumn) String() string

type FixedWidthEncodingInfo

type FixedWidthEncodingInfo struct {
	RecordTypeColumn *FixedWidthColumn
	ColumnsMap       map[string]*[]*FixedWidthColumn
	ColumnsOffsetMap map[string]int
	RecordTypeList   []string
}

func (*FixedWidthEncodingInfo) String

func (fw *FixedWidthEncodingInfo) String() string

type Float32Builder

type Float32Builder struct {
	// contains filtered or unexported fields
}

func (*Float32Builder) Append

func (b *Float32Builder) Append(v any)

func (*Float32Builder) AppendEmptyValue

func (b *Float32Builder) AppendEmptyValue()

func (*Float32Builder) AppendNull

func (b *Float32Builder) AppendNull()

func (*Float32Builder) NewArray

func (b *Float32Builder) NewArray() arrow.Array

func (*Float32Builder) Release

func (b *Float32Builder) Release()

func (*Float32Builder) Reserve

func (b *Float32Builder) Reserve(n int)

type Float64Builder

type Float64Builder struct {
	// contains filtered or unexported fields
}

func (*Float64Builder) Append

func (b *Float64Builder) Append(v any)

func (*Float64Builder) AppendEmptyValue

func (b *Float64Builder) AppendEmptyValue()

func (*Float64Builder) AppendNull

func (b *Float64Builder) AppendNull()

func (*Float64Builder) NewArray

func (b *Float64Builder) NewArray() arrow.Array

func (*Float64Builder) Release

func (b *Float64Builder) Release()

func (*Float64Builder) Reserve

func (b *Float64Builder) Reserve(n int)

type FormatDatePF

type FormatDatePF struct {
	// contains filtered or unexported fields
}

FormatDatePF is writing a date field using YYYYMMDD format This assume the date in the input is a valid date as string Returns no error if date is empty or not valid

func (*FormatDatePF) ApplyPF

func (pf *FormatDatePF) ApplyPF(buf *bytes.Buffer, input *[]any) error

func (*FormatDatePF) String

func (pf *FormatDatePF) String() string

type FunctionTokenNode

type FunctionTokenNode struct {
	Type            string         `json:"type"`
	ParseDateConfig *ParseDateSpec `json:"parse_date_config,omitzero"`
	LargeDouble     *float64       `json:"large_double,omitzero"`
}

Type: parse_date, parse_double, parse_text MinMaxDateFormat: Date parser, Type: parse_date ParseDateArguments: for Type: parse_date Large_Double: for Type: parse_double

type GroupBySpec

type GroupBySpec struct {
	GroupByName  []string `json:"group_by_name,omitempty"`
	GroupByPos   []int    `json:"group_by_pos,omitempty"`
	GroupByCount int      `json:"group_by_count,omitzero"`
	DomainKey    string   `json:"domain_key,omitempty"`
	IsDebug      bool     `json:"is_debug,omitzero"`
}

Specify either domain_key, group_by_name, group_by_pos or group_by_count. group_by_count has priority over other mode of grouping. group_by_name wins when both group_by_name and group_by_pos are specified. domain_key use the domain key info to compute the composite key At least one must be specified.

type GroupByTransformationPipe

type GroupByTransformationPipe struct {
	// contains filtered or unexported fields
}

Group By operator. Group the input records into bundles, where each record of the bundle is a rule session.

func (*GroupByTransformationPipe) Apply

func (ctx *GroupByTransformationPipe) Apply(input *[]any) error

Implementing interface PipeTransformationEvaluator

func (*GroupByTransformationPipe) Done

func (ctx *GroupByTransformationPipe) Done() error

func (*GroupByTransformationPipe) Finally

func (ctx *GroupByTransformationPipe) Finally()

type HashEvaluator

type HashEvaluator struct {
	// contains filtered or unexported fields
}

HashEvaluator is a type to compute a hash key based on an input record. The hashing algo can be sha1, md5 or none. This is used to compute domain key ONLY. For regular hash-based partitioning, the hashing algo is always FNV-1a 64bit, and the hash key is always uint64.

func (*HashEvaluator) ComputeDomainKey

func (ctx *HashEvaluator) ComputeDomainKey(input []any) (any, error)

func (*HashEvaluator) ComputeHash

func (ctx *HashEvaluator) ComputeHash(input []any) (any, error)

func (*HashEvaluator) String

func (ctx *HashEvaluator) String() string

type HashExpression

type HashExpression struct {
	Expr                   string   `json:"expr,omitempty"`
	CompositeExpr          []string `json:"composite_expr,omitempty"`
	DomainKey              string   `json:"domain_key,omitempty"`
	NbrJetsPartitionsAny   any      `json:"nbr_jets_partitions,omitzero"`
	MultiStepShardingMode  string   `json:"multi_step_sharding_mode,omitempty"`
	AlternateCompositeExpr []string `json:"alternate_composite_expr,omitempty"`
	NoPartitions           bool     `json:"no_partitions,omitzero"`
	ComputeDomainKey       bool     `json:"compute_domain_key,omitzero"`
}

Hash using values from columns. Case single column, use Expr. Case multi column, use CompositeExpr. Expr takes precedence if both are populated. DomainKey is specified as an object_type. DomainKeysJson provides the mapping between domain keys and columns. AlternateCompositeExpr is used when Expr or CompositeExpr returns nil or empty. MultiStepShardingMode values: 'limited_range', 'full_range' or empty. NoPartitions indicated not to assign the hash to a partition (no modulo operation). ComputeDomainKey flag indicate to compute the domain key rather than a simple hash. This consider the hashing algo used and delimitor between the key components.

func MakeHashExpressionFromGroupByConfig

func MakeHashExpressionFromGroupByConfig(column map[string]int, config *GroupBySpec) (*HashExpression, error)

func (*HashExpression) NbrJetsPartitions

func (h *HashExpression) NbrJetsPartitions() uint64

func (*HashExpression) String

func (h *HashExpression) String() string

type HashingAlgoEnum

type HashingAlgoEnum int

Hashing Algo supported

const (
	HashingAlgo_None HashingAlgoEnum = iota
	HashingAlgo_SHA1
	HashingAlgo_MD5
)

func (HashingAlgoEnum) String

func (e HashingAlgoEnum) String() string

type HighFreqSpec

type HighFreqSpec struct {
	Name          string `json:"name"`
	KeyRe         string `json:"key_re,omitempty"`
	TopPercentile int    `json:"top_pct,omitzero"`
	TopRank       int    `json:"top_rank,omitzero"`
	// contains filtered or unexported fields
}

top_pct correspond the top percentile of the data, ie, retain the distinct values that correspond to

totalCount * top_pct / 100

where totalCount is all the count of value for the column. If top_pct then all distinct alues are retained. top_rank correspond to the percentage of the distinct values to retain. That is:

nbrDistinctValues * top_rank / 100

where nbrDistinctValues is the number of distinct values for the column. Note that the distinct values are by descending frequence of occurence.

type HighFreqTransformationPipe

type HighFreqTransformationPipe struct {
	// contains filtered or unexported fields
}

firstInputRow is the first row from the input channel. A reference to it is kept for use in the Done function so to carry over the select fields in the columnEvaluators. Note: columnEvaluators is applied only on the firstInputRow and it is used only to select column having same value for every input row or to put constant values comming from the env

func (*HighFreqTransformationPipe) Apply

func (ctx *HighFreqTransformationPipe) Apply(input *[]interface{}) error

Implementing interface PipeTransformationEvaluator

func (*HighFreqTransformationPipe) Done

func (ctx *HighFreqTransformationPipe) Done() error

func (*HighFreqTransformationPipe) Finally

func (ctx *HighFreqTransformationPipe) Finally()

type Input2PipeSet

type Input2PipeSet map[string]*PipeSet

type InputChannel

type InputChannel struct {
	Name           string
	Channel        <-chan []any
	Columns        *map[string]int
	DomainKeySpec  *DomainKeysSpec
	Config         *ChannelSpec
	HasGroupedRows bool
}

type InputChannelConfig

type InputChannelConfig struct {
	// Type range: memory (default), input, stage
	// Format: csv, headerless_csv, etc.
	// ReadBatchSize: nbr of rows to read per record (format: parquet)
	// Compression: none, snappy (parquet: always snappy)
	// DetectEncoding: Detect file encoding (limited) for text file format
	// DetectCrAsEol: Detect if \r is used as eol (format: csv,headerless_csv)
	// DiscardFileHeaders: when true, discard the headers from the input file (typically for csv format).
	// EolByte: Byte to use as eol (format: csv,headerless_csv)
	// MultiColumnsInput: Indicate that input file must have multiple columns,
	// this is used to detect if the wrong delimiter is used (csv,headerless_csv)
	// Note: SchemaProvider, Compression, Format for Type input are provided via
	// ComputePipesCommonArgs.SourcesConfig (ie input_registry table).
	// BadRowsConfig: Specify how to handle bad rows.
	// HasGroupedRows indicates that the channel contains grouped rows,
	// most likely from the group_by or merge operator.
	// Note: The input_row channel (main input) will be cast to the
	// rdf type specified by the domain class of the main input source.
	FileConfig
	Type             string               `json:"type"`
	Name             string               `json:"name"`
	SchemaProvider   string               `json:"schema_provider,omitempty"`
	ReadSessionId    string               `json:"read_session_id,omitempty"`
	ReadStepId       string               `json:"read_step_id,omitempty"`
	ReadPartitionId  string               `json:"read_partition_id,omitempty"`
	SamplingRate     int                  `json:"sampling_rate,omitzero"`
	SamplingMaxCount int                  `json:"sampling_max_count,omitzero"`
	HasGroupedRows   bool                 `json:"has_grouped_rows,omitzero"`
	MergeChannels    []InputChannelConfig `json:"merge_channels,omitempty"`
	// contains filtered or unexported fields
}

type InputMappingExpr

type InputMappingExpr struct {
	InputColumn           sql.NullString
	DataProperty          string
	CleansingFunctionName sql.NullString
	Argument              sql.NullString
	DefaultValue          sql.NullString
	ErrorMessage          sql.NullString
}

func GetInputMapping

func GetInputMapping(dbpool *pgxpool.Pool, tableName string) ([]InputMappingExpr, error)

read mapping definitions

type InputRowColumns

type InputRowColumns struct {
	OriginalHeaders []string `json:"original_headers"`
	MainInput       []string `json:"main_input"`
}

InputRowColumns: used to save the input_row channel column name in table cpipes_execution_status, column input_row_columns_json OriginalHeaders are the original headers read from the main input file

type InputSourceSpec

type InputSourceSpec struct {
	OriginalInputColumns []string        `json:"original_input_columns,omitempty"`
	InputColumns         []string        `json:"input_columns,omitempty"`
	DomainClass          string          `json:"domain_class,omitempty"`
	DomainKeys           *DomainKeysSpec `json:"domain_keys_spec,omitempty"`
}

InputColumns correspond to columns in the input files, this applies to reducing as well as sharding steps. InputColumns is the uniquefied version of OriginalInputColumns OriginalInputColumns is the orignal input file columns For the case of sharding step, it includes columns from part files key. DomainKeys is taken from:

  • source_config table or main schema provider for source_type = 'file'
  • domain_keys_registry table or schema_provider / input_source_spec for source_type = 'domain_table'

DomainClass is taken from:

  • domain_keys_registry table or schema_provider / input_source_spec for source_type = 'domain_table'

Note: for source_type = 'file', DomainClass does not apply, the file needs to be mapped first.

type Int32Builder

type Int32Builder struct {
	// contains filtered or unexported fields
}

func (*Int32Builder) Append

func (b *Int32Builder) Append(v any)

func (*Int32Builder) AppendEmptyValue

func (b *Int32Builder) AppendEmptyValue()

func (*Int32Builder) AppendNull

func (b *Int32Builder) AppendNull()

func (*Int32Builder) NewArray

func (b *Int32Builder) NewArray() arrow.Array

func (*Int32Builder) Release

func (b *Int32Builder) Release()

func (*Int32Builder) Reserve

func (b *Int32Builder) Reserve(n int)

type Int64Builder

type Int64Builder struct {
	// contains filtered or unexported fields
}

func (*Int64Builder) Append

func (b *Int64Builder) Append(v any)

func (*Int64Builder) AppendEmptyValue

func (b *Int64Builder) AppendEmptyValue()

func (*Int64Builder) AppendNull

func (b *Int64Builder) AppendNull()

func (*Int64Builder) NewArray

func (b *Int64Builder) NewArray() arrow.Array

func (*Int64Builder) Release

func (b *Int64Builder) Release()

func (*Int64Builder) Reserve

func (b *Int64Builder) Reserve(n int)

type JetRdfSession

type JetRdfSession interface {
	GetResourceManager() JetResourceManager
	JetResources() *JetResources

	Insert(s, p RdfNode, o RdfNode) error
	Erase(s, p RdfNode, o RdfNode) (bool, error)
	Retract(s, p RdfNode, o RdfNode) (bool, error)
	Contains(s, p RdfNode, o RdfNode) bool
	ContainsSP(s, p RdfNode) bool
	GetObject(s, p RdfNode) RdfNode

	FindSPO(s, p, o RdfNode) TripleIterator
	FindSP(s, p RdfNode) TripleIterator
	FindS(s RdfNode) TripleIterator
	Find() TripleIterator

	NewReteSession(ruleset string) (JetReteSession, error)
	Release() error
}

type JetResourceManager

type JetResourceManager interface {
	RdfNull() RdfNode
	CreateBNode(key int) RdfNode
	NewDateLiteral(data string) RdfNode
	NewDateDetails(year, month, day int) RdfNode
	NewDatetimeLiteral(data string) RdfNode
	NewDatetimeDetails(year, month, day, hour, min, sec int) RdfNode
	NewDoubleLiteral(x float64) RdfNode
	NewIntLiteral(data int) RdfNode
	NewUIntLiteral(data uint) RdfNode
	NewResource(name string) RdfNode
	NewTextLiteral(data string) RdfNode
}

type JetResources

type JetResources struct {
	Jets__client                  RdfNode
	Jets__completed               RdfNode
	Jets__currentSourcePeriod     RdfNode
	Jets__currentSourcePeriodDate RdfNode
	Jets__entity_property         RdfNode
	Jets__exception               RdfNode
	Jets__from                    RdfNode
	Jets__input_record            RdfNode
	Jets__istate                  RdfNode
	Jets__key                     RdfNode
	Jets__length                  RdfNode
	Jets__lookup_multi_rows       RdfNode
	Jets__lookup_row              RdfNode
	Jets__loop                    RdfNode
	Jets__max_vertex_visits       RdfNode
	Jets__operator                RdfNode
	Jets__org                     RdfNode
	Jets__range_value             RdfNode
	Jets__replace_chars           RdfNode
	Jets__replace_with            RdfNode
	Jets__source_period_sequence  RdfNode
	Jets__sourcePeriodType        RdfNode
	Jets__state                   RdfNode
	Jets__value_property          RdfNode
	Rdf__type                     RdfNode
}

func NewJetResources

func NewJetResources(je JetResourceManager) *JetResources

func (*JetResources) Initialize

func (jr *JetResources) Initialize(je JetResourceManager)

type JetReteSession

type JetReteSession interface {
	ExecuteRules() error
	Release() error
}

type JetRuleEngine

type JetRuleEngine interface {
	MainRuleFile() string
	JetResources() *JetResources
	GetMetaGraphTriples() []string

	GetMetaResourceManager() JetResourceManager
	Insert(s, p RdfNode, o RdfNode) error

	NewRdfSession() (JetRdfSession, error)
	Release() error
}

func GetJetRuleEngine

func GetJetRuleEngine(reFactory JetRulesFactory, dbpool *pgxpool.Pool, processName string, isDebug bool) (
	ruleEngine JetRuleEngine, err error)

Function to get the JetRuleEngine for a rule process

type JetRulesFactory

type JetRulesFactory interface {
	JetRulesName() string
	NewJetRuleEngine(dbpool *pgxpool.Pool, processName string, isDebug bool) (JetRuleEngine, error)
	ClearCache() bool
}

type JetRulesProxy

type JetRulesProxy interface {
	GetDefaultFactory() JetRulesFactory
	GetGoFactory() JetRulesFactory
	GetNativeFactory() JetRulesFactory
}

type JetrulesOutputChan

type JetrulesOutputChan struct {
	ClassName        string
	ColumnEvaluators []TransformationColumnEvaluator
	OutputCh         *OutputChannel
}

type JetrulesSpec

type JetrulesSpec struct {
	ProcessName             string                `json:"process_name,omitempty"`
	UseJetRulesNative       bool                  `json:"use_jet_rules_native,omitzero"`
	UseJetRulesGo           bool                  `json:"use_jet_rules_go,omitzero"`
	InputRdfType            string                `json:"input_rdf_type,omitempty"`
	MaxInputCount           int                   `json:"max_input_count,omitzero"`
	PoolSize                int                   `json:"pool_size,omitzero"`
	MaxReteSessionsSaved    int                   `json:"max_rete_sessions_saved,omitzero"`
	MaxLooping              int                   `json:"max_looping,omitzero"`
	CurrentSourcePeriod     int                   `json:"current_source_period,omitzero"`
	CurrentSourcePeriodDate string                `json:"current_source_period_date,omitempty"`
	CurrentSourcePeriodType string                `json:"current_source_period_type,omitempty"`
	RuleConfig              []map[string]any      `json:"rule_config,omitempty"`
	MetadataInputSources    []CsvSourceSpec       `json:"metadata_input_sources,omitempty"`
	IsDebug                 bool                  `json:"is_debug,omitzero"`
	OutputChannels          []OutputChannelConfig `json:"output_channels,omitempty"`
}

JetrulesSpec configuration ProcessName is the jetrules process name to use. UseJetRulesNative when true use the jetrules native engine. UseJetRulesGo when true use the jetrules go engine. Note: only one of UseJetRulesNative or UseJetRulesGo can be true. When both are false, the default is to use the jetrules native engine when available. InputRdfType is the rdf type (class name) of the input records. MaxInputCount is the max nbr of input records to process. PoolSize is the nbr of worker pool size. MaxReteSessionsSaved is the max nbr of rete sessions to save in err table. CurrentSourcePeriod is the source period key to use for this process. CurrentSourcePeriodDate is the source period date (aka file period date) to use this process. CurrentSourcePeriodType is the source period type (day, week or, month) to use for this process. RuleConfig provide additional configuration for jetrules processing. MetadataInputSources provide the list of csv sources to load as metadata input sources for jetrules processing. IsDebug when true enable debug mode for jetrules processing. MaxLooping overrides the value in the jetrules metastore.

type JetrulesTransformationPipe

type JetrulesTransformationPipe struct {
	// contains filtered or unexported fields
}

func (*JetrulesTransformationPipe) Apply

func (ctx *JetrulesTransformationPipe) Apply(input *[]interface{}) error

Implementing interface PipeTransformationEvaluator Each call to Apply, the input correspond to a rdf session on which to Apply the jetrules see jetrules_pool_worker.go for worker implementation

func (*JetrulesTransformationPipe) Done

func (ctx *JetrulesTransformationPipe) Done() error

func (*JetrulesTransformationPipe) Finally

func (ctx *JetrulesTransformationPipe) Finally()

type JetrulesWorkerResult

type JetrulesWorkerResult struct {
	ReteSessionCount int64
	ErrorsCount      int64
	Err              error
}

type JetsPartitionInfo

type JetsPartitionInfo struct {
	PartitionLabel string
	PartitionSize  int64
}

type JrPoolManager

type JrPoolManager struct {
	WorkersTaskCh chan []interface{}

	WaitForDone *sync.WaitGroup
	// contains filtered or unexported fields
}

JrPoolManager manage a pool of workers to execute rules in parallel jrPoolWg is a wait group of the workers. The WorkersTaskCh is closed in jetrules operator

type JrPoolWorker

type JrPoolWorker struct {
	// contains filtered or unexported fields
}

func NewJrPoolWorker

func NewJrPoolWorker(config *JetrulesSpec, source *InputChannel, rdfType2Columns map[string][]string,
	re JetRuleEngine, outputChannels []*JetrulesOutputChan,
	done chan struct{}, errCh chan error) *JrPoolWorker

func (*JrPoolWorker) DoWork

func (ctx *JrPoolWorker) DoWork(mgr *JrPoolManager, resultCh chan JetrulesWorkerResult)

type KeywordCount

type KeywordCount struct {
	Name     string
	Keywords []string
	Count    int
}

func NewKeywordCount

func NewKeywordCount(name string, keywords []string) *KeywordCount

type KeywordTokenNode

type KeywordTokenNode struct {
	Name     string   `json:"name"`
	Keywords []string `json:"keywords,omitempty"`
}

type LoadFromS3FilesResult

type LoadFromS3FilesResult struct {
	LoadRowCount int64
	BadRowCount  int64
	Err          error
}

type LookupColumnSpec

type LookupColumnSpec struct {
	// Type range: select, value
	Name string  `json:"name,omitempty"`
	Type string  `json:"type,omitempty"`
	Expr *string `json:"expr,omitzero"`
}

type LookupCount

type LookupCount struct {
	Name  string
	Count int
}

func NewLookupCount

func NewLookupCount(name string) *LookupCount

type LookupSpec

type LookupSpec struct {
	// type range: sql_lookup, s3_csv_lookup
	Key          string            `json:"key"`
	Type         string            `json:"type"`
	Query        string            `json:"query,omitempty"`     // for sql_lookup
	CsvSource    *CsvSourceSpec    `json:"csv_source,omitzero"` //for s3_csv_lookup
	Columns      []TableColumnSpec `json:"columns,omitempty"`
	LookupKey    []string          `json:"lookup_key,omitempty"`
	LookupValues []string          `json:"lookup_values,omitempty"`
}

func SelectActiveLookupTable

func SelectActiveLookupTable(lookupConfig []*LookupSpec, pipeConfig []PipeSpec) ([]*LookupSpec, error)

Function to prune the lookupConfig and return only the lookup used in the pipeConfig Returns an error if pipeConfig has reference to a lookup not in lookupConfig

type LookupTable

type LookupTable interface {
	// Returns the lookup row associated with key
	Lookup(key *string) (*[]any, error)
	// Returns the row's value associated with the lookup column
	LookupValue(row *[]any, columnName string) (any, error)
	// Returns the mapping between column name to pos in the returned row
	ColumnMap() map[string]int
	// Return true if the table is empty, ColumnMap is empty as well
	IsEmptyTable() bool
	// Return the number of rows in the lookup table
	Size() int64
}

func NewLookupTableS3

func NewLookupTableS3(_ *pgxpool.Pool, spec *LookupSpec, env map[string]any, isVerbose bool) (LookupTable, error)

func NewLookupTableSql

func NewLookupTableSql(dbpool *pgxpool.Pool, spec *LookupSpec, env map[string]any, isVerbose bool) (LookupTable, error)

type LookupTableManager

type LookupTableManager struct {
	LookupTableMap map[string]LookupTable
	// contains filtered or unexported fields
}

func NewLookupTableManager

func NewLookupTableManager(spec []*LookupSpec, envSettings map[string]any, isVerbose bool) *LookupTableManager

func (*LookupTableManager) PrepareLookupTables

func (mgr *LookupTableManager) PrepareLookupTables(dbpool *pgxpool.Pool) error

type LookupTableS3

type LookupTableS3 struct {
	// contains filtered or unexported fields
}

data is the mapping of the looup key -> values columnsMap is the mapping of the return column name -> position in the returned row (values)

func (*LookupTableS3) ColumnMap

func (tbl *LookupTableS3) ColumnMap() map[string]int

func (*LookupTableS3) IsEmptyTable

func (tbl *LookupTableS3) IsEmptyTable() bool

Return true only if there was no files found on s3

func (*LookupTableS3) Lookup

func (tbl *LookupTableS3) Lookup(key *string) (*[]any, error)

func (*LookupTableS3) LookupValue

func (tbl *LookupTableS3) LookupValue(row *[]any, columnName string) (any, error)

func (*LookupTableS3) Size

func (tbl *LookupTableS3) Size() int64

Return size of the lookup table

type LookupTableSql

type LookupTableSql struct {
	// contains filtered or unexported fields
}

data is the mapping of the looup key -> values columnsMap is the mapping of the return column name -> position in the returned row (values)

func (*LookupTableSql) ColumnMap

func (tbl *LookupTableSql) ColumnMap() map[string]int

func (*LookupTableSql) IsEmptyTable

func (tbl *LookupTableSql) IsEmptyTable() bool

Not applicable to sql lookup, only to s3 lookup

func (*LookupTableSql) Lookup

func (tbl *LookupTableSql) Lookup(key *string) (*[]any, error)

func (*LookupTableSql) LookupValue

func (tbl *LookupTableSql) LookupValue(row *[]any, columnName string) (any, error)

func (*LookupTableSql) Size

func (tbl *LookupTableSql) Size() int64

Return size of the lookup table

type LookupTokenNode

type LookupTokenNode struct {
	Name             string            `json:"lookup_name"`
	KeyRe            string            `json:"key_re,omitempty"`
	Tokens           []string          `json:"tokens,omitempty"`
	MultiTokensMatch []MultiTokensNode `json:"multi_tokens_match,omitempty"`
}

Matching values using a lookup table. Typically values in lookup table does not have spaces. MultiTokensMatch: Matching composite values, separated by space(s)

type LookupTokensState

type LookupTokensState struct {
	LookupTbl        LookupTable
	KeyRe            *regexp.Regexp
	LookupMatch      map[string]*LookupCount
	MultiTokensMatch []MultiTokensNode
}

func NewLookupTokensState

func NewLookupTokensState(lookupTbl LookupTable, lookupNode *LookupTokenNode) (*LookupTokensState, error)

func (*LookupTokensState) LookupValue

func (state *LookupTokensState) LookupValue(value *string) ([]string, error)

func (*LookupTokensState) NewValue

func (state *LookupTokensState) NewValue(value *string) error

type MapExpression

type MapExpression struct {
	CleansingFunction string `json:"cleansing_function,omitempty"`
	Argument          string `json:"argument,omitempty"`
	Default           string `json:"default,omitempty"`
	ErrMsg            string `json:"err_msg,omitempty"`
	RdfType           string `json:"rdf_type,omitempty"`
}

type MapRecordSpec

type MapRecordSpec struct {
	FileMappingTableName string `json:"file_mapping_table_name"`
	IsDebug              bool   `json:"is_debug,omitzero"`
}

type MapRecordTransformationPipe

type MapRecordTransformationPipe struct {
	// contains filtered or unexported fields
}

func (*MapRecordTransformationPipe) Apply

func (ctx *MapRecordTransformationPipe) Apply(input *[]any) error

Implementing interface PipeTransformationEvaluator

func (*MapRecordTransformationPipe) Done

func (ctx *MapRecordTransformationPipe) Done() error

func (*MapRecordTransformationPipe) Finally

func (ctx *MapRecordTransformationPipe) Finally()

type MergeCurrentValue

type MergeCurrentValue struct {
	// contains filtered or unexported fields
}

type MergeFileReader

type MergeFileReader struct {
	// contains filtered or unexported fields
}

MergeFileReader provides a reader that conforms to io.Reader interface that reads content from partfiles and makes it available to the s3 manager via the Read interface

func (*MergeFileReader) Read

func (r *MergeFileReader) Read(buf []byte) (int, error)

type MergeFileSpec

type MergeFileSpec struct {
	FirstPartitionHasHeaders bool `json:"first_partition_has_headers,omitempty"` // splitter column
}

MergeFileSpec configuration FirstPartitionHasHeaders: when true, the first partitions has the headers (considered for csv to determine if use s3 multipart copy).

type MergeSpec

type MergeSpec struct {
	IsDebug      bool           `json:"is_debug,omitzero"`
	MainGroupBy  GroupBySpec    `json:"main_group_by"`
	MergeGroupBy []*GroupBySpec `json:"merge_group_by,omitempty"`
}

type MergeTransformationPipe

type MergeTransformationPipe struct {
	// contains filtered or unexported fields
}

Merge operator. Merge the input records from multiple input channels into one output channel.

func (*MergeTransformationPipe) Apply

func (ctx *MergeTransformationPipe) Apply(input *[]any) error

Implementing interface PipeTransformationEvaluator

func (*MergeTransformationPipe) Done

func (ctx *MergeTransformationPipe) Done() error

func (*MergeTransformationPipe) Finally

func (ctx *MergeTransformationPipe) Finally()

type Metric

type Metric struct {
	// Type range: runtime
	// Name values: alloc_mb, total_alloc_mb, sys_mb, nbr_gc
	// note: suffix _mb for units in MiB
	Type string `json:"type"`
	Name string `json:"name"`
}

type MetricsSpec

type MetricsSpec struct {
	ReportInterval int      `json:"report_interval_sec"`
	RuntimeMetrics []Metric `json:"runtime_metrics"`
}

type MinMaxValue

type MinMaxValue struct {
	MinValue   string
	MaxValue   string
	MinMaxType string
	HitRatio   float64
	NbrSamples int
}

MinValue and MaxValue are expressed as string but represent one of:

  • min/max date when MinMaxType == "date"
  • min/max double when MinMaxType == "double"
  • min/max length when MinMaxType == "text"

type MultiTokensNode

type MultiTokensNode struct {
	Name      string   `json:"name"`
	NbrTokens int      `json:"nbr_tokens"`
	Tokens    []string `json:"tokens"`
	// contains filtered or unexported fields
}

To identify value made of multiple tokens for example, full name. The value is split on spaces and punctuation characters and single character words are removed. Name: the name of the feature (ie output column name) Tokens: each splitted value must match at least one token

type OutputChannel

type OutputChannel struct {
	Name    string
	Channel chan<- []any
	Columns *map[string]int
	Config  *ChannelSpec
}

type OutputChannelConfig

type OutputChannelConfig struct {
	// Type range: memory (default), stage, output, sql
	// Format: csv, headerless_csv, etc.
	// NbrRowsInRecord: nbr of rows in record (format: parquet)
	// Compression: none, snappy (default).
	// UseInputParquetSchema to use the same schema as the input file.
	// UseOriginalHeaders to use the headers from the input file (csv only).
	// Must have save_parquet_schema = true in the cpipes first input_channel.
	// OutputLocation: jetstore_s3_input, jetstore_s3_output (default), or custom location.
	// When OutputLocation is jetstore_s3_input it will also write to the input bucket.
	// When OutputLocation uses a custom location, it replaces KeyPrefix and FileName.
	// OutputLocation must ends with "/" if we want to use default file name
	// (i.e. OutputLocation does not include the file name).
	// Note: refactoring using FileConfig.FileKey is synonym to OutputLocation
	// KeyPrefix is optional, default to $PATH_FILE_KEY.
	// Use $CURRENT_PARTITION_LABEL in KeyPrefix and FileName to substitute with
	// current partition label.
	// Other available env substitution:
	// $FILE_KEY main input file key.
	// $SESSIONID current session id.
	// $PROCESS_NAME current process name.
	// $PATH_FILE_KEY file key path portion.
	// $NAME_FILE_KEY file key file name portion (empty when in part files mode).
	// $SHARD_ID current node id.
	// $JETS_PARTITION_LABEL current node partition label.
	FileConfig
	Type                  string `json:"type"`
	Name                  string `json:"name,omitempty"`
	UseOriginalHeaders    bool   `json:"use_original_headers,omitzero"`     // Type output
	UseInputParquetSchema bool   `json:"use_input_parquet_schema,omitzero"` // Type stage,output
	SchemaProvider        string `json:"schema_provider,omitempty"`         // Type stage,output, alt to Format
	WriteStepId           string `json:"write_step_id,omitempty"`           // Type stage
	OutputTableKey        string `json:"output_table_key,omitempty"`        // Type sql
	FileKey2              string `json:"output_location,omitempty"`         // Type output
	SpecName              string `json:"channel_spec_name,omitempty"`
}

func (OutputChannelConfig) OutputLocation

func (r OutputChannelConfig) OutputLocation() string

Note: refactoring using FileConfig.FileKey is synonym to OutputLocation

func (*OutputChannelConfig) SetOutputLocation

func (r *OutputChannelConfig) SetOutputLocation(s string)

type OutputFileSpec

type OutputFileSpec struct {
	// OutputLocation: jetstore_s3_input, jetstore_s3_stage, jetstore_s3_output (default),
	// or custom file key (the lasy option is depricated, use FileKey).
	// When OutputLocation has a custom file key, it replace Name and KeyPrefix.
	// Note: refactoring using FileConfig.FileKey is synonym to OutputLocation
	// Note: refactoring using FileConfig.FileName is synonym to Name
	// KeyPrefix is optional, default to input file key path in OutputLocation.
	// Name is file name (required or via OutputLocation).
	// Headers overrides the headers from the input_channel's spec or
	// from the schema_provider.
	// Schema provider indicates if put the header line or not.
	// The input channel's schema provider indicates what delimiter
	// to use on the header line.
	FileConfig
	Key                string   `json:"key"`
	FileName2          string   `json:"name,omitempty"`
	FileKey2           string   `json:"output_location,omitempty"`
	SchemaProvider     string   `json:"schema_provider,omitempty"`
	UseOriginalHeaders bool     `json:"use_original_headers,omitzero"`
	Headers            []string `json:"headers,omitempty"`
}

func GetOutputFileConfig

func GetOutputFileConfig(cpConfig *ComputePipesConfig, outputFileKey string) *OutputFileSpec

func (OutputFileSpec) Name

func (r OutputFileSpec) Name() string

Note: refactoring using FileConfig.FileName is synonym to Name

func (OutputFileSpec) OutputLocation

func (r OutputFileSpec) OutputLocation() string

Note: refactoring using FileConfig.FileKey is synonym to OutputLocation

func (*OutputFileSpec) SetName

func (r *OutputFileSpec) SetName(s string)

func (*OutputFileSpec) SetOutputLocation

func (r *OutputFileSpec) SetOutputLocation(s string)

type ParquetSchemaInfo

type ParquetSchemaInfo struct {
	Fields []*FieldInfo `json:"fields,omitempty"`
	// contains filtered or unexported fields
}

func BuildParquetSchemaInfo

func BuildParquetSchemaInfo(columns []string) *ParquetSchemaInfo

func NewEmptyParquetSchemaInfo

func NewEmptyParquetSchemaInfo() *ParquetSchemaInfo

func NewParquetSchemaInfo

func NewParquetSchemaInfo(schema *arrow.Schema) *ParquetSchemaInfo

func (*ParquetSchemaInfo) ArrowSchema

func (psi *ParquetSchemaInfo) ArrowSchema() *arrow.Schema

func (*ParquetSchemaInfo) Columns

func (psi *ParquetSchemaInfo) Columns() []string

func (*ParquetSchemaInfo) CreateBuilders

func (psi *ParquetSchemaInfo) CreateBuilders(pool *memory.GoAllocator) ([]ArrayBuilder, error)

type ParseDateFTSpec

type ParseDateFTSpec struct {
	Token           string `json:"token"`
	YearLessThan    int    `json:"year_less_than,omitzero"`
	YearGreaterThan int    `json:"year_greater_than,omitzero"`
}

The date format is using a reference date of Mon Jan 2 15:04:05 MST 2006 (see https://pkg.go.dev/time#Layout) Will use DateFormat when provided, otherwise take the format from the Schema Provider when avail. Will default to DefaultDateFormat or will use the jetstore date parser if no default provided or if UseJetstoreParser is true. year_less_than and year_greater_than is an additional condition to the match result.

func (ParseDateFTSpec) CheckYearRange

func (pd ParseDateFTSpec) CheckYearRange(tm time.Time) bool

Match implements the match function, returns true when match.

type ParseDateMatchFunction

type ParseDateMatchFunction struct {
	// contains filtered or unexported fields
}

ParseDateMatchFunction is a match function to vaidate dates. ParseDateMatchFunction implements FunctionCount interface

func NewParseDateMatchFunction

func NewParseDateMatchFunction(fspec *FunctionTokenNode, sp SchemaProvider) (*ParseDateMatchFunction, error)

func (*ParseDateMatchFunction) Done

func (p *ParseDateMatchFunction) Done(ctx *AnalyzeTransformationPipe, outputRow []any) error

func (*ParseDateMatchFunction) GetMinMaxValues

func (p *ParseDateMatchFunction) GetMinMaxValues() *MinMaxValue

func (*ParseDateMatchFunction) NewValue

func (p *ParseDateMatchFunction) NewValue(value string)

ParseDateMatchFunction implements FunctionCount interface

type ParseDateSpec

type ParseDateSpec struct {
	DateFormatToken      string            `json:"date_format_token,omitempty"`
	OtherDateFormatToken string            `json:"other_date_format_token,omitempty"`
	DateSamplingMaxCount int               `json:"sampling_max_count,omitzero"`
	DateFormats          []string          `json:"date_formats,omitempty"`
	OtherDateFormats     []string          `json:"other_date_formats,omitempty"`
	MinMaxDateFormat     string            `json:"minmax_date_format,omitempty"`
	ParseDateArguments   []ParseDateFTSpec `json:"parse_date_args,omitempty"`
	UseJetstoreParser    bool              `json:"use_jetstore_date_parser,omitzero"`
}

DateFormatToken: output column name for listing up to 3 formats used in file. OtherDateFormatToken: output column name to put the count of other format used in file. DateSamplingMaxCount: nbr of samples to use for determining the date format. DateFormats: list of date formats to use for parsing the date. OtherDateFormats: list of other date formats to use for parsing the date when DateFormatToken does not match (which are undesirable formats). MinMaxDateFormat: format used in output report for min/max dates. ParseDateArguments: list of parse date function token spec. UseJetstoreParser: when true it will use only the jetstore date parser. Identify top date format matches, up to 3. The first match must account for 75% of total date matches. Identify other date matches, each must match 98% of total date matches.

type ParseDoubleMatchFunction

type ParseDoubleMatchFunction struct {
	// contains filtered or unexported fields
}

func NewParseDoubleMatchFunction

func NewParseDoubleMatchFunction(fspec *FunctionTokenNode) (*ParseDoubleMatchFunction, error)

func (*ParseDoubleMatchFunction) Done

func (p *ParseDoubleMatchFunction) Done(ctx *AnalyzeTransformationPipe, outputRow []any) error

func (*ParseDoubleMatchFunction) GetMinMaxValues

func (p *ParseDoubleMatchFunction) GetMinMaxValues() *MinMaxValue

func (*ParseDoubleMatchFunction) NewValue

func (p *ParseDoubleMatchFunction) NewValue(value string)

ParseDoubleMatchFunction implements FunctionCount interface

type ParseTextMatchFunction

type ParseTextMatchFunction struct {
	// contains filtered or unexported fields
}

func NewParseTextMatchFunction

func NewParseTextMatchFunction(fspec *FunctionTokenNode) (*ParseTextMatchFunction, error)

func (*ParseTextMatchFunction) Done

func (p *ParseTextMatchFunction) Done(ctx *AnalyzeTransformationPipe, outputRow []any) error

func (*ParseTextMatchFunction) GetMinMaxValues

func (p *ParseTextMatchFunction) GetMinMaxValues() *MinMaxValue

func (*ParseTextMatchFunction) NewValue

func (p *ParseTextMatchFunction) NewValue(value string)

ParseTextMatchFunction implements FunctionCount interface

type PartitionWriterSpec

type PartitionWriterSpec struct {
	DeviceWriterType string  `json:"device_writer_type,omitempty"`
	JetsPartitionKey *string `json:"jets_partition_key,omitzero"`
	PartitionSize    int     `json:"partition_size,omitzero"`
	SamplingRate     int     `json:"sampling_rate,omitzero"`
	SamplingMaxCount int     `json:"sampling_max_count,omitzero"`
	StreamDataOut    bool    `json:"stream_data_out,omitzero"`
}

DeviceWriterType range: csv_writer, parquet_writer, fixed_width_writer JetsPartitionKey used by partition_writer as the default value for jet_partition use $JETS_PARTITION_LABEL for current node input partition When StreamDataOut is true, data is stream to s3 rather than written locally and then copied to s3. Useful for large files that would exceed local storage capacity.

type PartitionWriterTransformationPipe

type PartitionWriterTransformationPipe struct {
	// contains filtered or unexported fields
}

func (*PartitionWriterTransformationPipe) Apply

func (ctx *PartitionWriterTransformationPipe) Apply(input *[]any) error

Implementing interface PipeTransformationEvaluator Delegating to applyInternal to support blundled input for group by transformation

func (*PartitionWriterTransformationPipe) Done

Done writing the splitter partition

  • Close the current ctx.currentDeviceCh to flush the data, update totalRowCount
  • Write to db the nodeId of this partition: session_id, shard, jets_partition
  • Send the total row count to ctx.copy2DeviceResultCh

Not called if the process has error upstream (see pipe_executor_splitter.go)

func (*PartitionWriterTransformationPipe) Finally

func (ctx *PartitionWriterTransformationPipe) Finally()

Always called, if error or not upstream

type PathSubstitution

type PathSubstitution struct {
	Replace string `json:"replace"`
	With    string `json:"with"`
}

type PipeSet

type PipeSet map[*PipeSpec]bool

type PipeSpec

type PipeSpec struct {
	// Type range: fan_out, splitter, merge_files
	Type            string               `json:"type"`
	InputChannel    InputChannelConfig   `json:"input_channel"`
	SplitterConfig  *SplitterSpec        `json:"splitter_config,omitzero"`
	MergeFileConfig *MergeFileSpec       `json:"merge_file_config,omitzero"`
	Apply           []TransformationSpec `json:"apply"`
	OutputFile      *string              `json:"output_file,omitzero"` // for merge_files
}

type PipeTransformationEvaluator

type PipeTransformationEvaluator interface {
	Apply(input *[]any) error
	Done() error
	Finally()
}

type PreprocessingFunction

type PreprocessingFunction interface {
	ApplyPF(buf *bytes.Buffer, input *[]any) error
	String() string
}

func ParsePreprocessingExpressions

func ParsePreprocessingExpressions(inputExprs []string, toUpper bool, columns *map[string]int) ([]PreprocessingFunction, error)

type RdfNode

type RdfNode interface {
	Hdle() any
	IsNil() bool
	Value() any
	Equals(other RdfNode) bool
	String() string
}

func NewRdfNode

func NewRdfNode(inValue any, re JetResourceManager) (RdfNode, error)

func ParseRdfNodeValue

func ParseRdfNodeValue(re JetResourceManager, value, rdfType string) (node RdfNode, err error)

type ReaderAtSeeker

type ReaderAtSeeker interface {
	io.Reader
	io.ReaderAt
	io.Seeker
}

type RegexCount

type RegexCount struct {
	Rexpr            *regexp.Regexp
	UseScrubbedValue bool
	Count            int
}

func NewRegexCount

func NewRegexCount(re *regexp.Regexp, useScrubbedValue bool) *RegexCount

type RegexNode

type RegexNode struct {
	Name             string `json:"name"`
	Rexpr            string `json:"re,omitempty"`
	UseScrubbedValue bool   `json:"use_scrubbed_value,omitzero"`
}

type RemoveMiPF

type RemoveMiPF struct {
	// contains filtered or unexported fields
}

RemoveMiPF remove last 2 char if last-1 is a space, e.g. "michel f" becomes "michel"

func (*RemoveMiPF) ApplyPF

func (pf *RemoveMiPF) ApplyPF(buf *bytes.Buffer, input *[]any) error

func (*RemoveMiPF) String

func (pf *RemoveMiPF) String() string

type ReportCmdSpec

type ReportCmdSpec struct {
	Type             string          `json:"type"`
	S3CopyFileConfig *S3CopyFileSpec `json:"s3_copy_file_config,omitzero"`
	When             *ExpressionNode `json:"when,omitzero"`
}

Commands for the run_report step Type range: s3_copy_file When is an optional expression to determine if the command is to be executed. S3CopyFileConfig provides the configuration for s3_copy_file command.

type RuleEngineConfig

type RuleEngineConfig struct {
	MainRuleFile   string            `json:"main_rule_file_name,omitempty"`
	JetStoreConfig map[string]string `json:"jetstore_config,omitempty"`
}

type S3CopyFileSpec

type S3CopyFileSpec struct {
	SourceBucket      string `json:"src_bucket,omitempty"`
	SourceKey         string `json:"src_key,omitempty"`
	DestinationBucket string `json:"dest_bucket,omitempty"`
	DestinationKey    string `json:"dest_key,omitempty"`
	WorkerPoolSize    int    `json:"worker_pool_size,omitzero"`
}

ReportCommand to copy file from s3 to s3 Default WorkerPoolSize is calculated based on number of tasks

type S3DeviceManager

type S3DeviceManager struct {
	WorkersTaskCh      chan S3Object
	ClientsWg          *sync.WaitGroup
	JetStoreTempFolder string
	// contains filtered or unexported fields
}

S3DeviceManager manage a pool of workers to put file to s3. ClientWg is a wait group of the partition writers created during BuildComputeGraph function. The WorkersTaskCh is closed in process_file.go

type S3DeviceWorker

type S3DeviceWorker struct {
	// contains filtered or unexported fields
}

func NewS3DeviceWorker

func NewS3DeviceWorker(s3Uploader *manager.Uploader, done chan struct{}, errCh chan error) *S3DeviceWorker

func (*S3DeviceWorker) DoWork

func (ctx *S3DeviceWorker) DoWork(mgr *S3DeviceManager, resultCh chan ComputePipesResult)

type S3DeviceWriter

type S3DeviceWriter struct {
	// contains filtered or unexported fields
}

func (*S3DeviceWriter) WriteCsvPartition

func (ctx *S3DeviceWriter) WriteCsvPartition(fout io.Writer)

func (*S3DeviceWriter) WriteFixedWidthPartition

func (ctx *S3DeviceWriter) WriteFixedWidthPartition(fout io.Writer)

func (*S3DeviceWriter) WriteParquetPartitionV2

func (ctx *S3DeviceWriter) WriteParquetPartitionV2(fout io.Writer)

func (*S3DeviceWriter) WritePartition

func (ctx *S3DeviceWriter) WritePartition(writer func(w io.Writer))

WritePartition is main write function that coordinates between writing the partition to a temp file locally or stream the data directly to s3.

type S3Object

type S3Object struct {
	ExternalBucket string
	FileKey        string
	LocalFilePath  string
}

S3Object is the worker's task payload to put a file to s3

type SaveResultsContext

type SaveResultsContext struct {
	JetsPartition string
	NodeId        int
	SessionId     string
	// contains filtered or unexported fields
}

func NewSaveResultsContext

func NewSaveResultsContext(dbpool *pgxpool.Pool) *SaveResultsContext

func (*SaveResultsContext) Save

func (ctx *SaveResultsContext) Save(category string, result *ComputePipesResult)

type SchemaColumnSpec

type SchemaColumnSpec struct {
	Name      string `json:"name,omitempty"`
	Length    int    `json:"length,omitzero"`    // for fixed_width
	Precision *int   `json:"precision,omitzero"` // for fixed_width
}

type SchemaManager

type SchemaManager struct {
	// contains filtered or unexported fields
}

func NewSchemaManager

func NewSchemaManager(spec []*SchemaProviderSpec,
	envSettings map[string]any, isDebugMode bool) *SchemaManager

func (*SchemaManager) GetSchemaProvider

func (sm *SchemaManager) GetSchemaProvider(key string) SchemaProvider

func (*SchemaManager) PrepareSchemaProviders

func (sm *SchemaManager) PrepareSchemaProviders(dbpool *pgxpool.Pool) error

type SchemaProvider

type SchemaProvider interface {
	Initialize(dbpool *pgxpool.Pool, spec *SchemaProviderSpec,
		envSettings map[string]any, isDebugMode bool) error
	Key() string
	Env() map[string]any
	AdjustColumnWidth(width map[string]int) error
	BadRowsConfig() *BadRowsSpec
	BlankFieldMarkers() *BlankFieldMarkers
	Bucket() string
	ColumnNames() []string
	Columns() []SchemaColumnSpec
	Compression() string
	Delimiter() rune
	DetectEncoding() bool
	DiscardFileHeaders() bool
	DomainClass() string
	DomainKeys() map[string]any
	DropExcedentHeaders() bool
	Encoding() string
	EnforceRowMaxLength() bool
	EnforceRowMinLength() bool
	FixedWidthEncodingInfo() *FixedWidthEncodingInfo
	FixedWidthFileHeaders() ([]string, string)
	Format() string
	InputFormatDataJson() string
	IsPartFiles() bool
	NbrRowsInRecord() int64
	NoQuotes() bool
	OutputEncoding() string
	ParquetSchema() *ParquetSchemaInfo
	QuoteAllRecords() bool
	ReadBatchSize() int64
	ReadDateLayout() string
	ReorderColumnsOnRead() []int
	SchemaName() string
	SetParquetSchema(schema *ParquetSchemaInfo)
	TrimColumns() bool
	UseLazyQuotes() bool
	UseLazyQuotesSpecial() bool
	VariableFieldsPerRecord() bool
	WriteDateLayout() string
	CapDobYears() int
	SetDodToJan1() bool
}

func NewDefaultSchemaProvider

func NewDefaultSchemaProvider() SchemaProvider

type SchemaProviderSpec

type SchemaProviderSpec struct {
	// Type range: default
	// Key is schema provider key for reference by compute pipes steps
	// Format: csv, headerless_csv, fixed_width, parquet, parquet_select,
	//              xlsx, headerless_xlsx
	// Compression: none, snappy (parquet is always snappy).
	// DetectEncoding: Detect file encoding (limited) for text file format.
	// DetectCrAsEol: Detect if \r is used as eol (format: csv,headerless_csv).
	// DiscardFileHeaders: when true, discard the headers from the input file (typically for csv format),
	// this will force to use Headers or Columns from the configuration, or from the schema provider if Headers and Columns are not provided.
	// EolByte: Byte to use as eol (format: csv,headerless_csv).
	// MultiColumnsInput: Indicate that input file must have multiple columns,
	// this is used to detect if the wrong delimiter is used (csv,headerless_csv).
	// ReadBatchSize: nbr of rows to read per record (format: parquet).
	// NbrRowsInRecord: nbr of rows in record (format: parquet).
	// InputFormatDataJson: json config based on Format (typically used for xlsx).
	// example: {"currentSheet": "Daily entry for Approvals"} (for xlsx).
	// EnforceRowMinLength: when true, all columns must be in input record, otherwise missing columns are null.
	// EnforceRowMaxLength: when true, no extra characters must exist past last field (applies to text format).
	// Note EnforceRowMinLength and EnforceRowMaxLength apply to text format only (csv, headerless_csv, fixed_width).
	// UseOriginSourceConfig: when true, use the source config from file_key components (client, org, object_type).
	// Note origin session_id is from cpipes env $ORIGIN_SESSIONID
	// BadRowsConfig: Specify how to handle bad rows when bot specified on InputChannelConfig.
	// SourceType range: main_input, merged_input, historical_input (from input_source table)
	// Columns: may be ommitted if fixed_width_columns_csv is provided or is a csv format
	// Headers: alt to Columns, typically for csv format
	// BlankFieldMarkers: specify markers for blank fields (any format, typically for csv format)
	// GetPartitionsSize: when true, get the size of the partitions from s3
	// CapDobYears: number of years to cap dob (date of birth) to today - for Anonymization.
	// SetDodToJan1: set dod (date of death) to January 1st of the date year - for Anonymization.
	// UseLazyQuotes, UseLazyQuotesSpecial, VariableFieldsPerRecord: see csv.NewReader.
	// QuoteAllRecords will quote all records for csv writer.
	// NoQuotes will no quote any records for csv writer (even if the record contains '"').
	// Bucket and FileKey are location and source object (fileKey may be directory if IsPartFiles is true)
	// KmsKey is kms key to use when writing output data. May be empty.
	// RequestID is used for logging and tracking purpose.
	// Contains properties to register FileKey with input_registry table:
	// Client, Vendor, ObjectType, FileDate (does not apply to Jets_Loader).
	// NotificationTemplatesOverrides have the following keys to override the templates defined
	// in the deployment environment var: CPIPES_START_NOTIFICATION_JSON,
	// CPIPES_COMPLETED_NOTIFICATION_JSON, and CPIPES_FAILED_NOTIFICATION_JSON.
	//*TODO domain_keys_json
	//*TODO code_values_mapping_json
	FileConfig
	Key                              string             `json:"key"`
	Type                             string             `json:"type"`
	FileSize                         int64              `json:"file_size,omitzero"`
	KmsKey                           string             `json:"kms_key_arn,omitempty"`
	Client                           string             `json:"client,omitempty"`
	Vendor                           string             `json:"vendor,omitempty"`
	ObjectType                       string             `json:"object_type,omitempty"`
	RequestID                        string             `json:"request_id,omitempty"`
	UseOriginSourceConfig            bool               `json:"use_origin_source_config,omitempty"`
	FileDate                         string             `json:"file_date,omitempty"`
	SourceType                       string             `json:"source_type,omitempty"`
	SchemaName                       string             `json:"schema_name,omitempty"`
	Columns                          []SchemaColumnSpec `json:"columns,omitempty"`
	Headers                          []string           `json:"headers,omitempty"`
	CapDobYears                      int                `json:"cap_dob_years,omitzero"`
	SetDodToJan1                     bool               `json:"set_dod_to_jan1,omitzero"`
	Env                              map[string]any     `json:"env,omitempty"`
	ReportCmds                       []ReportCmdSpec    `json:"report_cmds,omitempty"`
	NotificationTemplatesOverrides   map[string]string  `json:"notification_templates_overrides,omitempty"`
	NotificationRoutingOverridesJson string             `json:"notification_routing_overrides_json,omitempty"`
}

func GetSchemaProviderConfigByKey

func GetSchemaProviderConfigByKey(schemaProviders []*SchemaProviderSpec, key string) *SchemaProviderSpec

func GetSchemaProviderConfigBySourceType

func GetSchemaProviderConfigBySourceType(schemaProviders []*SchemaProviderSpec, sourceType string) *SchemaProviderSpec

type ShardFileKeyResult

type ShardFileKeyResult struct {
	// contains filtered or unexported fields
}

func ShardFileKeys

func ShardFileKeys(exeCtx context.Context, dbpool *pgxpool.Pool, baseFileKey string, sessionId string,
	inputChannelConfig InputChannelConfig, clusterConfig *ClusterSpec,
	schemaProviderConfig *SchemaProviderSpec) (result ShardFileKeyResult, cpErr error)

ShardFileKeys: assign file keys to nodes for sharding mode according to inputChannelConfig and clusterConfig.

type ShufflingSpec

type ShufflingSpec struct {
	MaxInputSampleSize    int               `json:"max_input_sample_size,omitzero"`
	OutputSampleSize      int               `json:"output_sample_size,omitzero"`
	PadShortRowsWithNulls bool              `json:"pad_short_rows_with_nulls,omitzero"`
	FilterColumns         *FilterColumnSpec `json:"filter_columns,omitzero"`
}

type ShufflingTransformationPipe

type ShufflingTransformationPipe struct {
	// contains filtered or unexported fields
}

Note: No columnEvaluators is used by this operator. inputColPos is the position of the retained input columns, if any filtering is done.

func (*ShufflingTransformationPipe) Apply

func (ctx *ShufflingTransformationPipe) Apply(input *[]any) error

Implementing interface PipeTransformationEvaluator

func (*ShufflingTransformationPipe) Done

func (ctx *ShufflingTransformationPipe) Done() error

Analysis complete, now send out the results to ctx.outputCh.

func (*ShufflingTransformationPipe) Finally

func (ctx *ShufflingTransformationPipe) Finally()

type SortSpec

type SortSpec struct {
	DomainKey    string   `json:"domain_key,omitempty"`
	SortByColumn []string `json:"sort_by,omitempty"`
	IsDebug      bool     `json:"is_debug,omitzero"`
}

Sort using composite key sort_by column names making the composite key domain_key use the domain key info to compute the composite key

type SortTransformationPipe

type SortTransformationPipe struct {
	// contains filtered or unexported fields
}

sort operator.Sort the input records according to a composite key

func (*SortTransformationPipe) Apply

func (ctx *SortTransformationPipe) Apply(input *[]any) error

Implementing interface PipeTransformationEvaluator

func (*SortTransformationPipe) Done

func (ctx *SortTransformationPipe) Done() error

func (*SortTransformationPipe) Finally

func (ctx *SortTransformationPipe) Finally()

type SourcesConfigSpec

type SourcesConfigSpec struct {
	MainInput     *InputSourceSpec   `json:"main_input"`
	MergedInput   []*InputSourceSpec `json:"merged_inputs"`
	InjectedInput []*InputSourceSpec `json:"injected_inputs"`
}

SourcesConfigSpec contains carry over configuration from table source_config. It has provision for multiple input sources to be merged via domian keys

type SplitterSpec

type SplitterSpec struct {
	// Type range: standard (default), ext_count
	// standard: split on Column / DefaultSplitterValue / ShardOn, create partition for each value
	// ext_count: split on Column / DefaultSplitterValue / ShardOn + N, N = 0..ExtPartitionsCount-1
	//            where each partition has up to PartitionRowCount rows
	Type                 string          `json:"type,omitempty"`
	Column               string          `json:"column,omitempty"`                 // splitter column
	DefaultSplitterValue string          `json:"default_splitter_value,omitempty"` // splitter default value
	ShardOn              *HashExpression `json:"shard_on,omitzero"`                // splitter hash on the fly
	PartitionRowCount    int             `json:"partition_row_count,omitzero"`     // nbr of row for each ext partition
}

type StartComputePipesArgs

type StartComputePipesArgs struct {
	PipelineExecKey   int                  `json:"pipeline_execution_key"`
	FileKey           string               `json:"file_key,omitempty"`
	SessionId         string               `json:"session_id,omitempty"`
	StepId            *int                 `json:"step_id,omitzero"`
	ClusterInfo       *ClusterShardingInfo `json:"cluster_sharding_info,omitzero"`
	MainInputRowCount int64                `json:"main_input_row_count,omitzero"`
}

Argument to start_cp (start_sharding_cp, start_reducing_cp) for starting the cp cluster. MainInputRowCount is available in start_reducing_cp as the total number of records processed at step id 'reducing00'.

func (*StartComputePipesArgs) StartReducingComputePipes

func (args *StartComputePipesArgs) StartReducingComputePipes(ctx context.Context, dbpool *pgxpool.Pool) (ComputePipesRun, error)

func (*StartComputePipesArgs) StartShardingComputePipes

func (args *StartComputePipesArgs) StartShardingComputePipes(ctx context.Context,
	dbpool *pgxpool.Pool) (ComputePipesRun, *SchemaProviderSpec, error)

type StringBuilder

type StringBuilder struct {
	// contains filtered or unexported fields
}

func (*StringBuilder) Append

func (b *StringBuilder) Append(v any)

func (*StringBuilder) AppendEmptyValue

func (b *StringBuilder) AppendEmptyValue()

func (*StringBuilder) AppendNull

func (b *StringBuilder) AppendNull()

func (*StringBuilder) NewArray

func (b *StringBuilder) NewArray() arrow.Array

func (*StringBuilder) Release

func (b *StringBuilder) Release()

func (*StringBuilder) Reserve

func (b *StringBuilder) Reserve(n int)

type TableColumnSpec

type TableColumnSpec struct {
	Name    string `json:"name"`
	RdfType string `json:"rdf_type,omitempty"`
	IsArray bool   `json:"as_array,omitzero"`
}

type TableSpec

type TableSpec struct {
	Key                string            `json:"key"`
	Name               string            `json:"name"`
	CheckSchemaChanged bool              `json:"check_schema_changed,omitzero"`
	Columns            []TableColumnSpec `json:"columns,omitempty"`
	ChannelSpecName    string            `json:"channel_spec_name,omitempty"`
}

ChannelSpecName specify the channel spec. Column provides metadata info

func SelectActiveOutputTable

func SelectActiveOutputTable(tableConfig []*TableSpec, pipeConfig []PipeSpec) ([]*TableSpec, error)

Function to prune the output tables and return only the tables used in pipeConfig Returns an error if pipeConfig makes reference to a non-existent table

type TargetColumnsLookupSpec

type TargetColumnsLookupSpec struct {
	LookupName                  string   `json:"lookup_name"`
	DataClassificationColumn    string   `json:"data_classification_column,omitempty"`
	Column1ClassificationValues []string `json:"column1_classification_values,omitempty"`
	Column2ClassificationValues []string `json:"column2_classification_values,omitempty"`
}

type TimestampBuilder

type TimestampBuilder struct {
	// contains filtered or unexported fields
}

func (*TimestampBuilder) Append

func (b *TimestampBuilder) Append(v any)

func (*TimestampBuilder) AppendEmptyValue

func (b *TimestampBuilder) AppendEmptyValue()

func (*TimestampBuilder) AppendNull

func (b *TimestampBuilder) AppendNull()

func (*TimestampBuilder) NewArray

func (b *TimestampBuilder) NewArray() arrow.Array

func (*TimestampBuilder) Release

func (b *TimestampBuilder) Release()

func (*TimestampBuilder) Reserve

func (b *TimestampBuilder) Reserve(n int)

type TransformationColumnEvaluator

type TransformationColumnEvaluator interface {
	InitializeCurrentValue(currentValue *[]any)
	Update(currentValue *[]any, input *[]any) error
	Done(currentValue *[]any) error
}

Initialize and Done are intended for aggregate transformations column evaluators

type TransformationColumnSpec

type TransformationColumnSpec struct {
	// Type range: select, multi_select, value, eval, map, hash
	// count, distinct_count, sum, min, case,
	// map_reduce, lookup
	Name           string                     `json:"name"`
	Type           string                     `json:"type"`
	Expr           *string                    `json:"expr,omitempty"`
	ExprArray      []string                   `json:"expr_array,omitempty"`
	MapExpr        *MapExpression             `json:"map_expr,omitzero"`
	EvalExpr       *ExpressionNode            `json:"eval_expr,omitzero"`
	HashExpr       *HashExpression            `json:"hash_expr,omitzero"`
	Where          *ExpressionNode            `json:"where,omitzero"`
	CaseExpr       []CaseExpression           `json:"case_expr,omitempty"` // case operator
	ElseExpr       []*ExpressionNode          `json:"else_expr,omitempty"` // case operator
	MapOn          *string                    `json:"map_on,omitzero"`
	AlternateMapOn []string                   `json:"alternate_map_on,omitempty"`
	ApplyMap       []TransformationColumnSpec `json:"apply_map,omitempty"`
	ApplyReduce    []TransformationColumnSpec `json:"apply_reduce,omitempty"`
	LookupName     *string                    `json:"lookup_name,omitzero"`
	LookupKey      []LookupColumnSpec         `json:"key,omitempty"`
	LookupValues   []LookupColumnSpec         `json:"values,omitempty"`
}

type TransformationSpec

type TransformationSpec struct {
	// Type range: map_record, aggregate, analyze, high_freq, partition_writer,
	// anonymize, distinct, shuffling, group_by, filter, sort, merge, jetrules, clustering
	// Format takes precedence over SchemaProvider's Format (from OutputChannelConfig)
	Type                  string                           `json:"type"`
	NewRecord             bool                             `json:"new_record,omitzero"`
	Columns               []TransformationColumnSpec       `json:"columns,omitempty"`
	MapRecordConfig       *MapRecordSpec                   `json:"map_record_config,omitzero"`
	AnalyzeConfig         *AnalyzeSpec                     `json:"analyze_config,omitzero"`
	HighFreqColumns       []*HighFreqSpec                  `json:"high_freq_columns,omitempty"` // Type high_freq
	PartitionWriterConfig *PartitionWriterSpec             `json:"partition_writer_config,omitzero"`
	AnonymizeConfig       *AnonymizeSpec                   `json:"anonymize_config,omitzero"`
	DistinctConfig        *DistinctSpec                    `json:"distinct_config,omitzero"`
	ShufflingConfig       *ShufflingSpec                   `json:"shuffling_config,omitzero"`
	GroupByConfig         *GroupBySpec                     `json:"group_by_config,omitzero"`
	FilterConfig          *FilterSpec                      `json:"filter_config,omitzero"`
	SortConfig            *SortSpec                        `json:"sort_config,omitzero"`
	JetrulesConfig        *JetrulesSpec                    `json:"jetrules_config,omitzero"`
	ClusteringConfig      *ClusteringSpec                  `json:"clustering_config,omitzero"`
	MergeConfig           *MergeSpec                       `json:"merge_config,omitzero"`
	OutputChannel         OutputChannelConfig              `json:"output_channel"`
	ConditionalConfig     []*ConditionalTransformationSpec `json:"conditional_config,omitzero"`
}

type TripleIterator

type TripleIterator interface {
	IsEnd() bool
	Next() bool
	GetSubject() RdfNode
	GetPredicate() RdfNode
	GetObject() RdfNode
	Release() error
}

type Uint32Builder

type Uint32Builder struct {
	// contains filtered or unexported fields
}

func (*Uint32Builder) Append

func (b *Uint32Builder) Append(v any)

func (*Uint32Builder) AppendEmptyValue

func (b *Uint32Builder) AppendEmptyValue()

func (*Uint32Builder) AppendNull

func (b *Uint32Builder) AppendNull()

func (*Uint32Builder) NewArray

func (b *Uint32Builder) NewArray() arrow.Array

func (*Uint32Builder) Release

func (b *Uint32Builder) Release()

func (*Uint32Builder) Reserve

func (b *Uint32Builder) Reserve(n int)

type Uint64Builder

type Uint64Builder struct {
	// contains filtered or unexported fields
}

func (*Uint64Builder) Append

func (b *Uint64Builder) Append(v any)

func (*Uint64Builder) AppendEmptyValue

func (b *Uint64Builder) AppendEmptyValue()

func (*Uint64Builder) AppendNull

func (b *Uint64Builder) AppendNull()

func (*Uint64Builder) NewArray

func (b *Uint64Builder) NewArray() arrow.Array

func (*Uint64Builder) Release

func (b *Uint64Builder) Release()

func (*Uint64Builder) Reserve

func (b *Uint64Builder) Reserve(n int)

type WelfordAlgo

type WelfordAlgo struct {
	Mean  float64
	M2    float64
	Count int
}

func NewWelfordAlgo

func NewWelfordAlgo() *WelfordAlgo

func (*WelfordAlgo) Finalize

func (w *WelfordAlgo) Finalize() (mean, variance float64)

func (*WelfordAlgo) Update

func (w *WelfordAlgo) Update(value float64)

type WriteTableSource

type WriteTableSource struct {
	// contains filtered or unexported fields
}

func NewWriteTableSource

func NewWriteTableSource(source <-chan []any, tableIdentifier pgx.Identifier, columns []string) *WriteTableSource

func (*WriteTableSource) Err

func (wt *WriteTableSource) Err() error

func (*WriteTableSource) Next

func (wt *WriteTableSource) Next() bool

pgx.CopyFromSource interface

func (*WriteTableSource) Values

func (wt *WriteTableSource) Values() ([]any, error)

func (*WriteTableSource) WriteTable

func (wt *WriteTableSource) WriteTable(dbpool *pgxpool.Pool, done chan struct{}, copy2DbResultCh chan<- ComputePipesResult)

Methods for writing output entity records to postgres

Source Files

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL