Documentation
¶
Index ¶
- Variables
- func AdjustFillers(rawHeaders *[]string)
- func ApplyAllConditionalTransformationSpec(pipeConfig []PipeSpec, env map[string]any) error
- func ApplyConditionalEnvVars(envVars []ConditionalEnvVariable, env map[string]any) error
- func AssertMetadataSource(re JetRuleEngine, config *JetrulesSpec, env map[string]any) error
- func AssertRuleConfiguration(re JetRuleEngine, config *JetrulesSpec, env map[string]any) (err error)
- func AssertSourcePeriodInfo(re JetRuleEngine, config *JetrulesSpec, env map[string]any) (err error)
- func BuildEvalOperator(op string) (evalOperator, error)
- func CastToRdfType(input any, rdfType string) (any, error)
- func ClearJetrulesCaches()
- func CmpRecord(lhs any, rhs any) int
- func ConvertToSchemaV2(v any, se *FieldInfo) (any, error)
- func ConvertWithSchemaV1(irow int, col arrow.Array, trimStrings bool, fieldInfo *FieldInfo, ...) (any, error)
- func CreateOutputTable(dbpool *pgxpool.Pool, tableName pgx.Identifier, tableSpec *TableSpec) error
- func DetectCrAsEol(fileHd ReaderAtSeeker, compression string) (bool, error)
- func DetectCsvDelimitor(fileHd ReaderAtSeeker, compression string) (jcsv.Chartype, error)
- func DetectEncoding(data []byte, delimit rune) (string, error)
- func DetectFileEncoding(fileHd ReaderAtSeeker, delimit rune) (encoding string, err error)
- func DoesQualifyAsDate(value string) bool
- func DownloadS3Object(externalBucket string, s3Key *FileKeyInfo, localDir string, minSize int64) (string, int64, error)
- func EvalHash(key any, partitions uint64) *uint64
- func ExtractPartitionLabelFromS3Key(s3Key string) (string, error)
- func ExtractRdfNodeInfoJson(e any) (value, rdfType string, err error)
- func GetAdditionalInputColumns(cpConfig *ComputePipesConfig) []string
- func GetDomainProperties(className string, directPropertitesOnly bool) ([]string, error)
- func GetFileKeys(ctx context.Context, dbpool *pgxpool.Pool, sessionId string, nodeId int) ([][]*FileKeyInfo, error)
- func GetMaxConcurrency(nbrNodes, defaultMaxConcurrency int) int
- func GetPartitionSize4LookbackPeriod(bucket, fileKey, lookbackPeriod string, env map[string]any, ...) error
- func GetRawHeadersCsv(fileHd *os.File, fileName, fileFormat, compression string, ...) ([]string, error)
- func GetRawHeadersParquet(fileHd *os.File, fileName string) ([]string, error)
- func GetRawHeadersXlsx(fileName string, fileFormatDataJson string) ([]string, error)
- func GetRdfNodeValue(r RdfNode) any
- func GetRuleEngineConfig(mainRuleFile, property string) (string, error)
- func GetS3FileKeys(processName, sessionId, mainInputStepId, jetsPartitionLabel string, ...) ([][]*FileKeyInfo, error)
- func GetS3Objects4LookbackPeriod(bucket, fileKey, lookbackPeriod string, env map[string]any) ([]*awsi.S3Object, error)
- func GetWorkspaceControl() (*rete.WorkspaceControl, error)
- func GetWorkspaceDataProperties() (map[string]*rete.DataPropertyNode, error)
- func GetWorkspaceDomainClasses() (map[string]*rete.ClassNode, error)
- func GetWorkspaceDomainTables() (map[string]*rete.TableNode, error)
- func Hash(key []byte, partitions uint64) uint64
- func LastIndexByte(s []byte, c byte) int
- func MakeJetsPartitionLabel(jetsPartitionKey any) string
- func MergeParquetPartitions(nrowsInRec int64, columns []string, fout io.Writer, ...)
- func MergeTransformationSpec(host, override *TransformationSpec) error
- func ParseDate(date string) (*time.Time, error)
- func ParseDateDateFormat(dateFormats []string, value string) (tm time.Time, fmt string)
- func ParseDateStrict(date string) (*time.Time, error)
- func ParseDatetime(datetime string) (*time.Time, error)
- func ParseInputFormatDataXlsx(inputDataFormatJson *string) (map[string]any, error)
- func PrepareOutoutTable(dbpool *pgxpool.Pool, tableIdentifier pgx.Identifier, tableSpec *TableSpec) error
- func SplitTableName(tableName string) (pgx.Identifier, error)
- func ToBool(b any) bool
- func ToDouble(d any) (float64, error)
- func WorkspaceHome() string
- func WorkspacePrefix() string
- func WrapReaderWithDecoder(r io.Reader, encoding string) (utfReader io.Reader, err error)
- func WrapReaderWithDecompressor(r io.Reader, compression string) io.Reader
- func WrapWriterWithEncoder(w io.Writer, encoding string) (utfWriter io.Writer, err error)
- func WriteCpipesArgsToS3(cpa []ComputePipesNodeArgs, s3Location string) error
- func WriteParquetPartitionV3(schemaInfo *ParquetSchemaInfo, nrowsInRec int64, fout io.Writer, ...)
- type AggregateTransformationPipe
- type AnalyzeSpec
- type AnalyzeState
- type AnalyzeTransformationPipe
- type AnonymizationAction
- type AnonymizeSpec
- type AnonymizeTransformationPipe
- type ArrayBuilder
- func NewBinaryBuilder(mem memory.Allocator) ArrayBuilder
- func NewBooleanBuilder(mem memory.Allocator) ArrayBuilder
- func NewDateBuilder(mem memory.Allocator) ArrayBuilder
- func NewDecimal128Builder(mem memory.Allocator, precision, scale int32) ArrayBuilder
- func NewDecimal256Builder(mem memory.Allocator, precision, scale int32) ArrayBuilder
- func NewFloat32Builder(mem memory.Allocator) ArrayBuilder
- func NewFloat64Builder(mem memory.Allocator) ArrayBuilder
- func NewInt32Builder(mem memory.Allocator) ArrayBuilder
- func NewInt64Builder(mem memory.Allocator) ArrayBuilder
- func NewStringBuilder(mem memory.Allocator) ArrayBuilder
- func NewTimestampBuilder(mem memory.Allocator) ArrayBuilder
- func NewUint32Builder(mem memory.Allocator) ArrayBuilder
- func NewUint64Builder(mem memory.Allocator) ArrayBuilder
- type ArrayRecord
- type BadRowsChannel
- type BadRowsSpec
- type BinaryBuilder
- type BlankFieldMarkers
- type BlankFieldMarkersSpec
- type BooleanBuilder
- type BuilderContext
- func (ctx *BuilderContext) BuildCaseExprTCEvaluator(source *InputChannel, outCh *OutputChannel, spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)
- func (ctx *BuilderContext) BuildComputeGraph() error
- func (ctx *BuilderContext) BuildCountTCEvaluator(source *InputChannel, outCh *OutputChannel, spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)
- func (ctx *BuilderContext) BuildDistinctCountTCEvaluator(source *InputChannel, outCh *OutputChannel, spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)
- func (ctx *BuilderContext) BuildExprNodeEvaluator(sourceName string, columns map[string]int, spec *ExpressionNode) (evalExpression, error)
- func (ctx *BuilderContext) BuildHashTCEvaluator(source *InputChannel, outCh *OutputChannel, spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)
- func (ctx *BuilderContext) BuildLookupTCEvaluator(source *InputChannel, outCh *OutputChannel, spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)
- func (ctx *BuilderContext) BuildMapReduceTCEvaluator(source *InputChannel, outCh *OutputChannel, spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)
- func (ctx *BuilderContext) BuildMapTCEvaluator(source *InputChannel, outCh *OutputChannel, spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)
- func (ctx *BuilderContext) BuildMinTCEvaluator(source *InputChannel, outCh *OutputChannel, spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)
- func (ctx *BuilderContext) BuildPipeTransformationEvaluator(source *InputChannel, jetsPartitionKey any, ...) (PipeTransformationEvaluator, error)
- func (ctx *BuilderContext) BuildSumTCEvaluator(source *InputChannel, outCh *OutputChannel, spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)
- func (ctx *BuilderContext) BuildTransformationColumnEvaluator(source *InputChannel, outCh *OutputChannel, spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)
- func (ctx *BuilderContext) FileKey() string
- func (ctx *BuilderContext) MakeMergeTransformationPipe(mainSource *InputChannel, mergeSources []*InputChannel, outCh *OutputChannel, ...) (PipeTransformationEvaluator, error)
- func (ctx *BuilderContext) NewAggregateTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*AggregateTransformationPipe, error)
- func (ctx *BuilderContext) NewAnalyzeState(columnName string, columnPos int, inputColumns *map[string]int, ...) (*AnalyzeState, error)
- func (ctx *BuilderContext) NewAnalyzeTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*AnalyzeTransformationPipe, error)
- func (ctx *BuilderContext) NewAnonymizeTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*AnonymizeTransformationPipe, error)
- func (ctx *BuilderContext) NewClusteringPoolManager(config *ClusteringSpec, source *InputChannel, outputCh *OutputChannel, ...) (poolMgr *ClusteringPoolManager, err error)
- func (ctx *BuilderContext) NewClusteringTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*ClusteringTransformationPipe, error)
- func (ctx *BuilderContext) NewDistinctTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*DistinctTransformationPipe, error)
- func (ctx *BuilderContext) NewFilterTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*FilterTransformationPipe, error)
- func (ctx *BuilderContext) NewGroupByTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*GroupByTransformationPipe, error)
- func (ctx *BuilderContext) NewHashEvaluator(source *InputChannel, spec *HashExpression) (*HashEvaluator, error)
- func (ctx *BuilderContext) NewHighFreqTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*HighFreqTransformationPipe, error)
- func (ctx *BuilderContext) NewJetrulesTransformationPipe(source *InputChannel, _ *OutputChannel, spec *TransformationSpec) (*JetrulesTransformationPipe, error)
- func (ctx *BuilderContext) NewJrPoolManager(config *JetrulesSpec, source *InputChannel, ...) (jrpm *JrPoolManager, err error)
- func (ctx *BuilderContext) NewMapRecordTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*MapRecordTransformationPipe, error)
- func (ctx *BuilderContext) NewMergeTransformationPipe(source *InputChannel, outCh *OutputChannel, spec *TransformationSpec) (PipeTransformationEvaluator, error)
- func (ctx *BuilderContext) NewPartitionWriterTransformationPipe(source *InputChannel, jetsPartitionKey any, outputCh *OutputChannel, ...) (*PartitionWriterTransformationPipe, error)
- func (ctx *BuilderContext) NewShufflingTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*ShufflingTransformationPipe, error)
- func (ctx *BuilderContext) NewSortTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*SortTransformationPipe, error)
- func (ctx *BuilderContext) ReportMetrics()
- func (ctx *BuilderContext) StartFanOutPipe(spec *PipeSpec, source *InputChannel, ...)
- func (ctx *BuilderContext) StartSplitterPipe(spec *PipeSpec, source *InputChannel, ...)
- type CaseExpression
- type CastToRdfFnc
- type CastToRdfTxtFnc
- type Channel
- type ChannelRegistry
- func (r *ChannelRegistry) AddDistributionChannel(input string) string
- func (r *ChannelRegistry) CloseChannel(name string)
- func (r *ChannelRegistry) GetInputChannel(name string, hasGroupedRows bool) (*InputChannel, error)
- func (r *ChannelRegistry) GetOutputChannel(name string) (*OutputChannel, error)
- type ChannelResults
- type ChannelSpec
- type ChannelState
- type ClusterCorrelation
- type ClusterInfo
- type ClusterShardingInfo
- type ClusterShardingSpec
- type ClusterSpec
- type ClusteringDistributor
- type ClusteringPoolManager
- type ClusteringResult
- type ClusteringSpec
- type ClusteringTransformationPipe
- type ClusteringWorker
- type ColumnCorrelation
- type ColumnFileSpec
- type ColumnNameLookupNode
- type ColumnNameTokenNode
- type CompiledPartFileComponent
- type ComputePipesArgs
- type ComputePipesCommonArgs
- type ComputePipesConfig
- type ComputePipesContext
- func (cpCtx *ComputePipesContext) DoneAll(err error)
- func (cpCtx *ComputePipesContext) DownloadS3Files(inFolderPath []string, externalBucket string, fileKeys [][]*FileKeyInfo) error
- func (cpCtx *ComputePipesContext) InsertPipelineExecutionStatus(dbpool *pgxpool.Pool) (int, error)
- func (cpCtx *ComputePipesContext) LoadFiles(ctx context.Context, dbpool *pgxpool.Pool) (err error)
- func (cpCtx *ComputePipesContext) NewMergeFileReader(mergePos int, inputFormat string, delimiter rune, outputSp SchemaProvider, ...) (io.Reader, error)
- func (cpCtx *ComputePipesContext) NewS3DeviceManager() error
- func (cpCtx *ComputePipesContext) ProcessFilesAndReportStatus(ctx context.Context, dbpool *pgxpool.Pool) error
- func (cpCtx *ComputePipesContext) ReadCsvFile(filePath *FileName, fileReader ReaderAtSeeker, ...) (int64, int64, error)
- func (cpCtx *ComputePipesContext) ReadFixedWidthFile(filePath *FileName, fileReader ReaderAtSeeker, ...) (int64, int64, error)
- func (cpCtx *ComputePipesContext) ReadParquetFileV2(filePath *FileName, fileReader parquet.ReaderAtSeeker, readBatchSize int64, ...) (int64, error)
- func (cpCtx *ComputePipesContext) ReadXlsxFile(filePath *FileName, xlsxSheetInfo map[string]any, ...) (int64, int64, error)
- func (cpCtx *ComputePipesContext) StartComputePipes(dbpool *pgxpool.Pool, inputSchemaCh <-chan ParquetSchemaInfo, ...)
- func (cpCtx *ComputePipesContext) StartMergeFiles(dbpool *pgxpool.Pool) (cpErr error)
- func (cpCtx *ComputePipesContext) UpdatePipelineExecutionStatus(dbpool *pgxpool.Pool, key int, ...) error
- type ComputePipesNodeArgs
- type ComputePipesResult
- type ComputePipesRun
- type ConditionalEnvVarEvaluator
- type ConditionalEnvVariable
- type ConditionalPipeSpec
- type ConditionalTransformationSpec
- type ContextSpec
- type CpipesStartup
- func (cpipesStartup *CpipesStartup) EvalUseEcsTask(stepId int) (bool, error)
- func (cpipesStartup *CpipesStartup) GetComputePipesPartitions(dbpool *pgxpool.Pool, processName, sessionId string, ...) ([]JetsPartitionInfo, error)
- func (args *CpipesStartup) ValidatePipeSpecConfig(cpConfig *ComputePipesConfig, pipeConfig []PipeSpec) error
- type CsvSourceS3
- type CsvSourceSpec
- type DataSchemaSpec
- type DateBuilder
- type Decimal128Builder
- type Decimal256Builder
- type DefaultPF
- type DefaultSchemaProvider
- func (sp *DefaultSchemaProvider) AdjustColumnWidth(width map[string]int) error
- func (sp *DefaultSchemaProvider) BadRowsConfig() *BadRowsSpec
- func (sp *DefaultSchemaProvider) BlankFieldMarkers() *BlankFieldMarkers
- func (sp *DefaultSchemaProvider) Bucket() string
- func (sp *DefaultSchemaProvider) CapDobYears() int
- func (sp *DefaultSchemaProvider) ColumnNames() []string
- func (sp *DefaultSchemaProvider) Columns() []SchemaColumnSpec
- func (sp *DefaultSchemaProvider) Compression() string
- func (sp *DefaultSchemaProvider) Delimiter() rune
- func (sp *DefaultSchemaProvider) DetectEncoding() bool
- func (sp *DefaultSchemaProvider) DiscardFileHeaders() bool
- func (sp *DefaultSchemaProvider) DomainClass() string
- func (sp *DefaultSchemaProvider) DomainKeys() map[string]any
- func (sp *DefaultSchemaProvider) DropExcedentHeaders() bool
- func (sp *DefaultSchemaProvider) Encoding() string
- func (sp *DefaultSchemaProvider) EnforceRowMaxLength() bool
- func (sp *DefaultSchemaProvider) EnforceRowMinLength() bool
- func (sp *DefaultSchemaProvider) Env() map[string]any
- func (sp *DefaultSchemaProvider) FixedWidthEncodingInfo() *FixedWidthEncodingInfo
- func (sp *DefaultSchemaProvider) FixedWidthFileHeaders() ([]string, string)
- func (sp *DefaultSchemaProvider) Format() string
- func (sp *DefaultSchemaProvider) Initialize(_ *pgxpool.Pool, spec *SchemaProviderSpec, _ map[string]any, isDebugMode bool) error
- func (sp *DefaultSchemaProvider) InputFormatDataJson() string
- func (sp *DefaultSchemaProvider) IsPartFiles() bool
- func (sp *DefaultSchemaProvider) Key() string
- func (sp *DefaultSchemaProvider) NbrRowsInRecord() int64
- func (sp *DefaultSchemaProvider) NoQuotes() bool
- func (sp *DefaultSchemaProvider) OutputEncoding() string
- func (sp *DefaultSchemaProvider) ParquetSchema() *ParquetSchemaInfo
- func (sp *DefaultSchemaProvider) QuoteAllRecords() bool
- func (sp *DefaultSchemaProvider) ReadBatchSize() int64
- func (sp *DefaultSchemaProvider) ReadDateLayout() string
- func (sp *DefaultSchemaProvider) ReorderColumnsOnRead() []int
- func (sp *DefaultSchemaProvider) SchemaName() string
- func (sp *DefaultSchemaProvider) SetDodToJan1() bool
- func (sp *DefaultSchemaProvider) SetParquetSchema(schema *ParquetSchemaInfo)
- func (sp *DefaultSchemaProvider) TrimColumns() bool
- func (sp *DefaultSchemaProvider) UseLazyQuotes() bool
- func (sp *DefaultSchemaProvider) UseLazyQuotesSpecial() bool
- func (sp *DefaultSchemaProvider) VariableFieldsPerRecord() bool
- func (sp *DefaultSchemaProvider) WriteDateLayout() string
- type DistinctCount
- type DistinctSpec
- type DistinctTransformationPipe
- type DomainKeyInfo
- type DomainKeysSpec
- type DownloadS3Result
- type EntityHint
- type ExprBuilderContext
- type ExpressionNode
- type FetchFileInfoResult
- type FieldInfo
- type FileConfig
- type FileKeyInfo
- type FileName
- type FilterColumnSpec
- type FilterSpec
- type FilterTransformationPipe
- type FixedWidthColumn
- type FixedWidthEncodingInfo
- type Float32Builder
- type Float64Builder
- type FormatDatePF
- type FunctionTokenNode
- type GroupBySpec
- type GroupByTransformationPipe
- type HashEvaluator
- type HashExpression
- type HashingAlgoEnum
- type HighFreqSpec
- type HighFreqTransformationPipe
- type Input2PipeSet
- type InputChannel
- type InputChannelConfig
- type InputMappingExpr
- type InputRowColumns
- type InputSourceSpec
- type Int32Builder
- type Int64Builder
- type JetRdfSession
- type JetResourceManager
- type JetResources
- type JetReteSession
- type JetRuleEngine
- type JetRulesFactory
- type JetRulesProxy
- type JetrulesOutputChan
- type JetrulesSpec
- type JetrulesTransformationPipe
- type JetrulesWorkerResult
- type JetsPartitionInfo
- type JrPoolManager
- type JrPoolWorker
- type KeywordCount
- type KeywordTokenNode
- type LoadFromS3FilesResult
- type LookupColumnSpec
- type LookupCount
- type LookupSpec
- type LookupTable
- type LookupTableManager
- type LookupTableS3
- type LookupTableSql
- type LookupTokenNode
- type LookupTokensState
- type MapExpression
- type MapRecordSpec
- type MapRecordTransformationPipe
- type MergeCurrentValue
- type MergeFileReader
- type MergeFileSpec
- type MergeSpec
- type MergeTransformationPipe
- type Metric
- type MetricsSpec
- type MinMaxValue
- type MultiTokensNode
- type OutputChannel
- type OutputChannelConfig
- type OutputFileSpec
- type ParquetSchemaInfo
- type ParseDateFTSpec
- type ParseDateMatchFunction
- type ParseDateSpec
- type ParseDoubleMatchFunction
- type ParseTextMatchFunction
- type PartitionWriterSpec
- type PartitionWriterTransformationPipe
- type PathSubstitution
- type PipeSet
- type PipeSpec
- type PipeTransformationEvaluator
- type PreprocessingFunction
- type RdfNode
- type ReaderAtSeeker
- type RegexCount
- type RegexNode
- type RemoveMiPF
- type ReportCmdSpec
- type RuleEngineConfig
- type S3CopyFileSpec
- type S3DeviceManager
- type S3DeviceWorker
- type S3DeviceWriter
- type S3Object
- type SaveResultsContext
- type SchemaColumnSpec
- type SchemaManager
- type SchemaProvider
- type SchemaProviderSpec
- type ShardFileKeyResult
- type ShufflingSpec
- type ShufflingTransformationPipe
- type SortSpec
- type SortTransformationPipe
- type SourcesConfigSpec
- type SplitterSpec
- type StartComputePipesArgs
- type StringBuilder
- type TableColumnSpec
- type TableSpec
- type TargetColumnsLookupSpec
- type TimestampBuilder
- type TransformationColumnEvaluator
- type TransformationColumnSpec
- type TransformationSpec
- type TripleIterator
- type Uint32Builder
- type Uint64Builder
- type WelfordAlgo
- type WriteTableSource
Constants ¶
This section is empty.
Variables ¶
var ( ErrKillSwitch = errors.New("ErrKillSwitch") ComputePipesStart = time.Now() )
Load multipart files to JetStore, file to load are provided by channel fileNameCh
var DomainKeyDelimit string = os.Getenv("JETS_DOMAIN_KEY_SEPARATOR")
var ErrEOFTooEarly error = errors.New("error: cannot determine encoding, got EOF")
var ErrFileZipArchive error = errors.New("the file is a ZIP archive")
var ErrUnknownEncoding error = errors.New("encoding unknown, unable to detected the encoding")
var ErrUnknownEncodingOrWrongDelimit error = errors.New("unable to detected the file encoding or the specified delimiter is not the delimiter used in the file")
var HashingSeed uuid.UUID
Functions ¶
func AdjustFillers ¶
func AdjustFillers(rawHeaders *[]string)
func ApplyAllConditionalTransformationSpec ¶
Function to apply all conditional transformation spec in the pipeConfig
func ApplyConditionalEnvVars ¶
func ApplyConditionalEnvVars(envVars []ConditionalEnvVariable, env map[string]any) error
func AssertMetadataSource ¶
func AssertMetadataSource(re JetRuleEngine, config *JetrulesSpec, env map[string]any) error
Assert rule config to meta graph from the pipeline configuration
func AssertRuleConfiguration ¶
func AssertRuleConfiguration(re JetRuleEngine, config *JetrulesSpec, env map[string]any) (err error)
Assert rule config to meta graph from the pipeline configuration
func AssertSourcePeriodInfo ¶
func AssertSourcePeriodInfo(re JetRuleEngine, config *JetrulesSpec, env map[string]any) (err error)
Assert source period info (date, period, type) to rdf graph
func BuildEvalOperator ¶
build the runtime evaluator for the column transformation
func CastToRdfType ¶
Utility function for casting to specified rdf type
func ClearJetrulesCaches ¶
func ClearJetrulesCaches()
Function to clear local caches, needed for when workspace have been updated and need to force the lambda to reload the worspace metadata from jetstore db Note: This must be called before starting goroutines as it is not thread safe.
func CmpRecord ¶
*TODO Migrate to this cmp function satisfy sort.SortFunc: SortFunc sorts the slice x in ascending order as determined by the cmp function. This sort is not guaranteed to be stable. cmp(a, b) should return a negative number when a < b, a positive number when a > b and zero when a == b or a and b are incomparable in the sense of a strict weak ordering. SortFunc requires that cmp is a strict weak ordering. See https://en.wikipedia.org/wiki/Weak_ordering#Strict_weak_orderings. The function should return 0 for incomparable items.
func ConvertWithSchemaV1 ¶
func ConvertWithSchemaV1(irow int, col arrow.Array, trimStrings bool, fieldInfo *FieldInfo, castToRdfTxtFnc CastToRdfTxtFnc) (any, error)
return value is either nil or a string representing the input v
func CreateOutputTable ¶
Create the Output Table
func DetectCrAsEol ¶
func DetectCrAsEol(fileHd ReaderAtSeeker, compression string) (bool, error)
func DetectCsvDelimitor ¶
func DetectCsvDelimitor(fileHd ReaderAtSeeker, compression string) (jcsv.Chartype, error)
func DetectFileEncoding ¶
func DetectFileEncoding(fileHd ReaderAtSeeker, delimit rune) (encoding string, err error)
func DoesQualifyAsDate ¶
Qualify as a date:
- 6 < len < 30
- contains digits, letters, space, comma, dash, slash, column, apostrophe
Example of longest date to expect: 23 November 2025 13:10 AM
func DownloadS3Object ¶
func ExtractRdfNodeInfoJson ¶
Function to extract value and type from json struct
func GetAdditionalInputColumns ¶
func GetAdditionalInputColumns(cpConfig *ComputePipesConfig) []string
Function to get the column to add to the input file(s), these columns are added to the input_row channel. They are taken from the channel config with name input_row.
func GetDomainProperties ¶
Get the domain properties for className. If directPropertiesOnly is true, return only the direct properties of the class, not the inherited ones. When directPropertiesOnly is false, the return slice will have jets:key and rdf:type at position 0 and 1 resp.
func GetFileKeys ¶
func GetFileKeys(ctx context.Context, dbpool *pgxpool.Pool, sessionId string, nodeId int) ([][]*FileKeyInfo, error)
Get the file_key(s) from compute_pipes_shard_registry assigned to nodeId -- these are the input multipart files. This is used during the sharding mode. Returned [0][]*FileKeyInfo are the main input files, [i][]*FileKeyInfo, i=1:n are the merge channel files.
func GetMaxConcurrency ¶
func GetPartitionSize4LookbackPeriod ¶
func GetPartitionSize4LookbackPeriod(bucket, fileKey, lookbackPeriod string, env map[string]any, partitionSizes map[string]int64) error
Get Partition key and size. This is used un start reducing.
func GetRawHeadersCsv ¶
func GetRawHeadersCsv(fileHd *os.File, fileName, fileFormat, compression string, sepFlag jcsv.Chartype, encoding string, eolByte byte, multiColumns, noQuotes bool) ([]string, error)
Get the raw headers from fileHd, put them in *ic Use *sepFlag as the csv delimiter
func GetRawHeadersParquet ¶
func GetRawHeadersXlsx ¶
func GetRdfNodeValue ¶
func GetRuleEngineConfig ¶
Function to get domain classes info from the local workspace
func GetS3FileKeys ¶
func GetS3FileKeys(processName, sessionId, mainInputStepId, jetsPartitionLabel string, inputChannelConfig *InputChannelConfig, envSettings map[string]any) ([][]*FileKeyInfo, error)
Get the file_key(s) from s3 for the given process/session/step/partition. This is used during the reducing mode.
func GetWorkspaceControl ¶
func GetWorkspaceControl() (*rete.WorkspaceControl, error)
func GetWorkspaceDataProperties ¶
func GetWorkspaceDataProperties() (map[string]*rete.DataPropertyNode, error)
Function to get the domain properties info from the local workspace
func GetWorkspaceDomainClasses ¶
Function to get domain classes info from the local workspace
func GetWorkspaceDomainTables ¶
Function to get domain tables info from the local workspace
func LastIndexByte ¶
func MakeJetsPartitionLabel ¶
func MergeParquetPartitions ¶
func MergeTransformationSpec ¶
func MergeTransformationSpec(host, override *TransformationSpec) error
func ParseDateDateFormat ¶
ParseDateDateFormat returns the first match of [value] amongs the [dateFormats]
func PrepareOutoutTable ¶
func SplitTableName ¶
func SplitTableName(tableName string) (pgx.Identifier, error)
func WorkspaceHome ¶
func WorkspaceHome() string
func WorkspacePrefix ¶
func WorkspacePrefix() string
func WrapReaderWithDecoder ¶
func WrapWriterWithEncoder ¶
func WriteCpipesArgsToS3 ¶
func WriteCpipesArgsToS3(cpa []ComputePipesNodeArgs, s3Location string) error
Write the compute pipes arguments as json to s3 at location specified by s3Location This is currently not used, needed for Distributed Map
func WriteParquetPartitionV3 ¶
Types ¶
type AggregateTransformationPipe ¶
type AggregateTransformationPipe struct {
// contains filtered or unexported fields
}
func (*AggregateTransformationPipe) Apply ¶
func (ctx *AggregateTransformationPipe) Apply(input *[]interface{}) error
Implementing interface PipeTransformationEvaluator
func (*AggregateTransformationPipe) Done ¶
func (ctx *AggregateTransformationPipe) Done() error
func (*AggregateTransformationPipe) Finally ¶
func (ctx *AggregateTransformationPipe) Finally()
type AnalyzeSpec ¶
type AnalyzeSpec struct {
SchemaProvider string `json:"schema_provider,omitempty"`
ScrubChars string `json:"scrub_chars,omitempty"`
DistinctValuesWhenLessThanCount int `json:"distinct_values_when_less_than_count,omitzero"`
PadShortRowsWithNulls bool `json:"pad_short_rows_with_nulls,omitzero"`
ColumnNameToken *ColumnNameTokenNode `json:"column_name_token,omitempty"`
EntityHints []*EntityHint `json:"entity_hints,omitempty"`
RegexTokens []RegexNode `json:"regex_tokens,omitempty"`
LookupTokens []LookupTokenNode `json:"lookup_tokens,omitempty"`
KeywordTokens []KeywordTokenNode `json:"keyword_tokens,omitempty"`
FunctionTokens []FunctionTokenNode `json:"function_tokens,omitempty"`
}
AnalyzeSpec configuration SchemaProvider is used for external configuration, such as date format ScrubChars is the list of characters to scrub from the input values. DistinctValuesWhenLessThanCount is the threshold to list distinct values. PadShortRowsWithNulls indicates to pad short rows with nulls to match row length. ColumnNameToken is used to classify columns based on their name. EntityHints provide hints for entity recognition. RegexTokens specify regex patterns to identify classification tokens. LookupTokens specify lookup tables to identify classification tokens. KeywordTokens specify keywords to identify classification tokens. FunctionTokens specify functions to identify classification tokens.
type AnalyzeState ¶
type AnalyzeState struct {
ColumnName string
ColumnPos int
DistinctValues map[string]*DistinctCount
NullCount int
LenWelford *WelfordAlgo
CharToScrub map[rune]bool
RegexMatch map[string]*RegexCount
LookupState []*LookupTokensState
KeywordMatch map[string]*KeywordCount
ParseDate *ParseDateMatchFunction
ParseDouble *ParseDoubleMatchFunction
ParseText *ParseTextMatchFunction
TotalRowCount int
BlankMarkers *BlankFieldMarkers
Spec *TransformationSpec
}
Analyze data TransformationSpec implementing PipeTransformationEvaluator interface
func (*AnalyzeState) NewToken ¶
func (state *AnalyzeState) NewToken(value string) error
func (*AnalyzeState) NewValue ¶
func (state *AnalyzeState) NewValue(value any) error
type AnalyzeTransformationPipe ¶
type AnalyzeTransformationPipe struct {
// contains filtered or unexported fields
}
firstInputRow is the first row from the input channel. A reference to it is kept for use in the Done function so to carry over the select fields in the columnEvaluators. Note: columnEvaluators is applied only on the firstInputRow and it is used only to select column having same value for every input row or to put constant values comming from the env
Base columns available on the output (only columns specified in outputCh are actually send out):
"column_name", "column_pos", "input_data_type", "entity_hint", "distinct_count", "distinct_count_pct", "distinct_values", "null_count", "null_count_pct", "total_count", "avr_length", "length_var"
Other base columns available when using parse function (parse_date, parse_double, parse_text)
"min_date", "max_date", "min_double", "max_double", "large_double_pct", "min_length", "max_length", "min_value", "max_value", "large_value_pct", "minmax_type"
Note: for min_value/max_value are determined based on this priority rule:
- min_date/max_date if more than 50% of values are valid dates;
- min_double/max_double if more than 75% of values are valid double (note this includes int as well);
- otherwise it's the text min/max length.
Note: distinct_values will contains a comma-separated list of distinct value if distinct_count < spec.distinct_values_when_less_than_count. There is a hardcoded check that cap distinct_values_when_less_than_count to 20.
column_name: the name of the column using the original column name if available, otherwise it is the column name from the input channel. entity_hint: is determined based on the hints provided in spec.analyze_config.entity_hints
Other columns are added based on regex_tokens, lookup_tokens, keyword_tokens, and parse functions The value of the domain counts are expressed in percentage of the non null count:
ratio = <domain count>/(totalCount - nullCount) * 100.0
Note that if totalCount - nullCount == 0, then ratio = -1.
inputDataType contains the data type for each column according to the parquet schema. inputDataType is a map of column name -> input data type Range of value for input data type: string (default if not parquet), bool, int32, int64, float32, float64, date, uint32, uint64
func (*AnalyzeTransformationPipe) Apply ¶
func (ctx *AnalyzeTransformationPipe) Apply(input *[]any) error
Implementing interface PipeTransformationEvaluator
func (*AnalyzeTransformationPipe) Done ¶
func (ctx *AnalyzeTransformationPipe) Done() error
func (*AnalyzeTransformationPipe) Finally ¶
func (ctx *AnalyzeTransformationPipe) Finally()
type AnonymizationAction ¶
type AnonymizationAction struct {
// contains filtered or unexported fields
}
type AnonymizeSpec ¶
type AnonymizeSpec struct {
Mode string `json:"mode,omitempty"`
LookupName string `json:"lookup_name,omitempty"`
AnonymizeType string `json:"anonymize_type,omitempty"`
KeyPrefix string `json:"key_prefix,omitempty"`
DeidFunctions map[string]string `json:"deid_functions,omitempty"`
DeidLookups map[string]string `json:"deid_lookups,omitempty"`
InputDateLayout string `json:"input_date_layout,omitempty"`
DateFormatsColumn string `json:"date_formats_column,omitempty"`
OutputDateLayout string `json:"output_date_layout,omitempty"`
KeyDateLayout string `json:"key_date_layout,omitempty"`
DefaultInvalidDate string `json:"default_invalid_date,omitempty"`
SchemaProvider string `json:"schema_provider,omitempty"`
AdjustFieldWidthOnFW bool `json:"adjust_field_width_on_fixed_width_file,omitzero"`
OmitPrefixOnFW bool `json:"omit_prefix_on_fixed_width_file,omitzero"`
AnonymizedColumnsOutputFile *ColumnFileSpec `json:"anonymized_columns_output_file,omitzero"`
KeysOutputChannel *OutputChannelConfig `json:"keys_output_channel"`
}
Mode: Specify mode of action: de-identification, anonymization (default) - de-identification: mask the data (not reversible); - anonymization: replace the data with hashed value (reversible using crosswalk file). LookupName is name of lookup table containing the file metadata from analyze operator. AnonymizeType is column name in lookup table that specifiy how to anonymize (value: date, text). KeyPrefix is column name of lookup table to use as prefix of the anonymized value or key mapping for de-identification lookup table. DeidFunctions is map of KeyPrefix value to function name for de-identification. DeidLookups is map of KeyPrefix value to lookup table name for substitution values for de-identification. InputDateLayout is the format for parsing the input date (incoming data) when specified. DateFormatsColumn is column name of the lookup table having the list of date format (optional). OutputDateLayout is the format to use for anonymized date, will be set at 1st of the month of the original date (anonymization) or to XXX when de-identification. KeyDateLayout is the format to use in the key mapping file (crosswalk file) for anonymization. DefaultInvalidDate is a placeholder to use as the anonymized date when the input date (the date to anonymize) is not valid. If unspecified, the input value is used unchanged as the output value. DefaultInvalidDate must be a valid date in the format YYYY/MM/DD or MM/DD/YYYY so it can be parsed using JetStore default date parser. The date will be formatted according to KeyDateLayout. OutputDateLayout defaults to InputDateLayout. KeyDateLayout defaults to OutputDateLayout. SchemaProvider is used to: - get the DateLayout / KeyDateLayout if not specified here. - get CapDobYears / SetDodToJan1 for date anonymization. If date format is not specified, the default format for both OutputDateFormat and KeyDateFormat is "2006/01/02", ie. yyyy/MM/dd and the rdf.ParseDate() is used to parse the input date.
type AnonymizeTransformationPipe ¶
type AnonymizeTransformationPipe struct {
// contains filtered or unexported fields
}
func (*AnonymizeTransformationPipe) Apply ¶
func (ctx *AnonymizeTransformationPipe) Apply(input *[]any) error
Implementing interface PipeTransformationEvaluator
func (*AnonymizeTransformationPipe) Done ¶
func (ctx *AnonymizeTransformationPipe) Done() error
Anonymization complete, now send out the keys mapping to keys_output_channel if in mode "anonymization"
func (*AnonymizeTransformationPipe) Finally ¶
func (ctx *AnonymizeTransformationPipe) Finally()
type ArrayBuilder ¶
type ArrayBuilder interface {
Reserve(n int)
Append(v any)
AppendEmptyValue()
AppendNull()
NewArray() arrow.Array
Release()
}
func NewBinaryBuilder ¶
func NewBinaryBuilder(mem memory.Allocator) ArrayBuilder
func NewBooleanBuilder ¶
func NewBooleanBuilder(mem memory.Allocator) ArrayBuilder
func NewDateBuilder ¶
func NewDateBuilder(mem memory.Allocator) ArrayBuilder
func NewDecimal128Builder ¶
func NewDecimal128Builder(mem memory.Allocator, precision, scale int32) ArrayBuilder
func NewDecimal256Builder ¶
func NewDecimal256Builder(mem memory.Allocator, precision, scale int32) ArrayBuilder
func NewFloat32Builder ¶
func NewFloat32Builder(mem memory.Allocator) ArrayBuilder
func NewFloat64Builder ¶
func NewFloat64Builder(mem memory.Allocator) ArrayBuilder
func NewInt32Builder ¶
func NewInt32Builder(mem memory.Allocator) ArrayBuilder
func NewInt64Builder ¶
func NewInt64Builder(mem memory.Allocator) ArrayBuilder
func NewStringBuilder ¶
func NewStringBuilder(mem memory.Allocator) ArrayBuilder
func NewTimestampBuilder ¶
func NewTimestampBuilder(mem memory.Allocator) ArrayBuilder
func NewUint32Builder ¶
func NewUint32Builder(mem memory.Allocator) ArrayBuilder
func NewUint64Builder ¶
func NewUint64Builder(mem memory.Allocator) ArrayBuilder
type ArrayRecord ¶
func NewArrayRecord ¶
func NewArrayRecord(schema *arrow.Schema, builders []ArrayBuilder) *ArrayRecord
func (*ArrayRecord) Release ¶
func (r *ArrayRecord) Release()
type BadRowsChannel ¶
type BadRowsChannel struct {
OutputCh chan []byte
// contains filtered or unexported fields
}
func NewBadRowChannel ¶
func NewBadRowChannel(s3DeviceManager *S3DeviceManager, s3BasePath string, doneCh chan struct{}, errCh chan error) *BadRowsChannel
func (*BadRowsChannel) Done ¶
func (ctx *BadRowsChannel) Done()
func (*BadRowsChannel) Write ¶
func (ctx *BadRowsChannel) Write(nodeId int)
type BadRowsSpec ¶
type BadRowsSpec struct {
BadRowsStepId string `json:"bad_rows_step_id,omitempty"`
}
Defines the identification and handling of bad rows Currently only used for input_row channel BadRowsStepId: step id in stage location to output bad rows The input row is considered a bad row when any of WhenCriteria applies then the row is sent to bad row channel and remove from the input rows.
type BinaryBuilder ¶
type BinaryBuilder struct {
// contains filtered or unexported fields
}
func (*BinaryBuilder) Append ¶
func (b *BinaryBuilder) Append(v any)
func (*BinaryBuilder) AppendEmptyValue ¶
func (b *BinaryBuilder) AppendEmptyValue()
func (*BinaryBuilder) AppendNull ¶
func (b *BinaryBuilder) AppendNull()
func (*BinaryBuilder) NewArray ¶
func (b *BinaryBuilder) NewArray() arrow.Array
func (*BinaryBuilder) Release ¶
func (b *BinaryBuilder) Release()
func (*BinaryBuilder) Reserve ¶
func (b *BinaryBuilder) Reserve(n int)
type BlankFieldMarkers ¶
BlankFieldMarkers is the runtime version of BlankFieldMarkersSpec when CaseSensitive is true, the Markers are in upper case. Note: This type is re-used by the Anonymize operator as well.
type BlankFieldMarkersSpec ¶
type BooleanBuilder ¶
type BooleanBuilder struct {
// contains filtered or unexported fields
}
func (*BooleanBuilder) Append ¶
func (b *BooleanBuilder) Append(v any)
func (*BooleanBuilder) AppendEmptyValue ¶
func (b *BooleanBuilder) AppendEmptyValue()
func (*BooleanBuilder) AppendNull ¶
func (b *BooleanBuilder) AppendNull()
func (*BooleanBuilder) NewArray ¶
func (b *BooleanBuilder) NewArray() arrow.Array
func (*BooleanBuilder) Release ¶
func (b *BooleanBuilder) Release()
func (*BooleanBuilder) Reserve ¶
func (b *BooleanBuilder) Reserve(n int)
type BuilderContext ¶
type BuilderContext struct {
// contains filtered or unexported fields
}
func (*BuilderContext) BuildCaseExprTCEvaluator ¶
func (ctx *BuilderContext) BuildCaseExprTCEvaluator(source *InputChannel, outCh *OutputChannel, spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)
func (*BuilderContext) BuildComputeGraph ¶
func (ctx *BuilderContext) BuildComputeGraph() error
func (*BuilderContext) BuildCountTCEvaluator ¶
func (ctx *BuilderContext) BuildCountTCEvaluator(source *InputChannel, outCh *OutputChannel, spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)
func (*BuilderContext) BuildDistinctCountTCEvaluator ¶
func (ctx *BuilderContext) BuildDistinctCountTCEvaluator(source *InputChannel, outCh *OutputChannel, spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)
func (*BuilderContext) BuildExprNodeEvaluator ¶
func (ctx *BuilderContext) BuildExprNodeEvaluator(sourceName string, columns map[string]int, spec *ExpressionNode) (evalExpression, error)
Delegate to ExprBuilderContext
func (*BuilderContext) BuildHashTCEvaluator ¶
func (ctx *BuilderContext) BuildHashTCEvaluator(source *InputChannel, outCh *OutputChannel, spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)
The Hash operator example (dw_rawfilename is string):
Case hash function:
=========================================
{
"name": "jets_partition",
"type": "hash",
"hash_expr": {
"expr": "dw_rawfilename",
"composite_expr": ["partion", "dw_rawfilename"],
"nbr_jets_partitions": 3,
"alternate_composite_expr": ["name", "gender", "format_date(dob)"],
}
jets_partition will be of type uint64
Case compute domain key:
=========================================
{
"name": "Claim:domain_key",
"type": "hash",
"hash_expr": {
"domain_key": "Claim",
"compute_domain_key": true
}
Claim:domain_key will be of type string
func (*BuilderContext) BuildLookupTCEvaluator ¶
func (ctx *BuilderContext) BuildLookupTCEvaluator(source *InputChannel, outCh *OutputChannel, spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)
func (*BuilderContext) BuildMapReduceTCEvaluator ¶
func (ctx *BuilderContext) BuildMapReduceTCEvaluator(source *InputChannel, outCh *OutputChannel, spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)
func (*BuilderContext) BuildMapTCEvaluator ¶
func (ctx *BuilderContext) BuildMapTCEvaluator(source *InputChannel, outCh *OutputChannel, spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)
func (*BuilderContext) BuildMinTCEvaluator ¶
func (ctx *BuilderContext) BuildMinTCEvaluator(source *InputChannel, outCh *OutputChannel, spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)
func (*BuilderContext) BuildPipeTransformationEvaluator ¶
func (ctx *BuilderContext) BuildPipeTransformationEvaluator(source *InputChannel, jetsPartitionKey any, partitionResultCh chan ComputePipesResult, spec *TransformationSpec) (PipeTransformationEvaluator, error)
Build the PipeTransformationEvaluator: one of map_record, aggregate, or partition_writer The partitionResultCh argument is used only by partition_writer to return the number of rows written and the error that might occur
func (*BuilderContext) BuildSumTCEvaluator ¶
func (ctx *BuilderContext) BuildSumTCEvaluator(source *InputChannel, outCh *OutputChannel, spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)
func (*BuilderContext) BuildTransformationColumnEvaluator ¶
func (ctx *BuilderContext) BuildTransformationColumnEvaluator(source *InputChannel, outCh *OutputChannel, spec *TransformationColumnSpec) (TransformationColumnEvaluator, error)
build the runtime evaluator for the column transformation
func (*BuilderContext) FileKey ¶
func (ctx *BuilderContext) FileKey() string
func (*BuilderContext) MakeMergeTransformationPipe ¶
func (ctx *BuilderContext) MakeMergeTransformationPipe( mainSource *InputChannel, mergeSources []*InputChannel, outCh *OutputChannel, spec *TransformationSpec) (PipeTransformationEvaluator, error)
func (*BuilderContext) NewAggregateTransformationPipe ¶
func (ctx *BuilderContext) NewAggregateTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*AggregateTransformationPipe, error)
func (*BuilderContext) NewAnalyzeState ¶
func (ctx *BuilderContext) NewAnalyzeState(columnName string, columnPos int, inputColumns *map[string]int, sp SchemaProvider, blankMarkers *BlankFieldMarkers, spec *TransformationSpec) (*AnalyzeState, error)
func (*BuilderContext) NewAnalyzeTransformationPipe ¶
func (ctx *BuilderContext) NewAnalyzeTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*AnalyzeTransformationPipe, error)
func (*BuilderContext) NewAnonymizeTransformationPipe ¶
func (ctx *BuilderContext) NewAnonymizeTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*AnonymizeTransformationPipe, error)
func (*BuilderContext) NewClusteringPoolManager ¶
func (ctx *BuilderContext) NewClusteringPoolManager(config *ClusteringSpec, source *InputChannel, outputCh *OutputChannel, correlationOutputCh *OutputChannel, clusteringResultCh chan ClusteringResult) (poolMgr *ClusteringPoolManager, err error)
Create the ClusteringPoolManager, it will be set to the receiving BuilderContext
func (*BuilderContext) NewClusteringTransformationPipe ¶
func (ctx *BuilderContext) NewClusteringTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*ClusteringTransformationPipe, error)
func (*BuilderContext) NewDistinctTransformationPipe ¶
func (ctx *BuilderContext) NewDistinctTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*DistinctTransformationPipe, error)
func (*BuilderContext) NewFilterTransformationPipe ¶
func (ctx *BuilderContext) NewFilterTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*FilterTransformationPipe, error)
func (*BuilderContext) NewGroupByTransformationPipe ¶
func (ctx *BuilderContext) NewGroupByTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*GroupByTransformationPipe, error)
Builder function for GroupByTransformationPipe
func (*BuilderContext) NewHashEvaluator ¶
func (ctx *BuilderContext) NewHashEvaluator(source *InputChannel, spec *HashExpression) (*HashEvaluator, error)
Build the HashEvaluator, see BuildHashTCEvaluator
func (*BuilderContext) NewHighFreqTransformationPipe ¶
func (ctx *BuilderContext) NewHighFreqTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*HighFreqTransformationPipe, error)
func (*BuilderContext) NewJetrulesTransformationPipe ¶
func (ctx *BuilderContext) NewJetrulesTransformationPipe(source *InputChannel, _ *OutputChannel, spec *TransformationSpec) ( *JetrulesTransformationPipe, error)
func (*BuilderContext) NewJrPoolManager ¶
func (ctx *BuilderContext) NewJrPoolManager( config *JetrulesSpec, source *InputChannel, rdfType2Columns map[string][]string, ruleEngine JetRuleEngine, outputChannels []*JetrulesOutputChan, jetrulesWorkerResultCh chan JetrulesWorkerResult) (jrpm *JrPoolManager, err error)
Create the JrPoolManager, it will be set to the receiving BuilderContext
func (*BuilderContext) NewMapRecordTransformationPipe ¶
func (ctx *BuilderContext) NewMapRecordTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*MapRecordTransformationPipe, error)
func (*BuilderContext) NewMergeTransformationPipe ¶
func (ctx *BuilderContext) NewMergeTransformationPipe( source *InputChannel, outCh *OutputChannel, spec *TransformationSpec) (PipeTransformationEvaluator, error)
Builder function for MergeTransformationPipe
func (*BuilderContext) NewPartitionWriterTransformationPipe ¶
func (ctx *BuilderContext) NewPartitionWriterTransformationPipe(source *InputChannel, jetsPartitionKey any, outputCh *OutputChannel, copy2DeviceResultCh chan ComputePipesResult, spec *TransformationSpec) (*PartitionWriterTransformationPipe, error)
Create a new jets_partition writer, the partition is identified by the jetsPartition
func (*BuilderContext) NewShufflingTransformationPipe ¶
func (ctx *BuilderContext) NewShufflingTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*ShufflingTransformationPipe, error)
func (*BuilderContext) NewSortTransformationPipe ¶
func (ctx *BuilderContext) NewSortTransformationPipe(source *InputChannel, outputCh *OutputChannel, spec *TransformationSpec) (*SortTransformationPipe, error)
func (*BuilderContext) ReportMetrics ¶
func (ctx *BuilderContext) ReportMetrics()
func (*BuilderContext) StartFanOutPipe ¶
func (ctx *BuilderContext) StartFanOutPipe(spec *PipeSpec, source *InputChannel, writePartitionsResultCh chan ComputePipesResult)
func (*BuilderContext) StartSplitterPipe ¶
func (ctx *BuilderContext) StartSplitterPipe(spec *PipeSpec, source *InputChannel, writePartitionsResultCh chan ComputePipesResult)
type CaseExpression ¶
type CaseExpression struct {
When ExpressionNode `json:"when"`
Then []*ExpressionNode `json:"then"`
}
type CastToRdfFnc ¶
This file contains function to cast input data into rdf type based on domain classes
func BuildCastToRdfFunctions ¶
func BuildCastToRdfFunctions(domainClass string, properties []string) ([]CastToRdfFnc, error)
type CastToRdfTxtFnc ¶
func BuildCastToRdfTxtFunctions ¶
func BuildCastToRdfTxtFunctions(domainClass string, properties []string) ([]CastToRdfTxtFnc, error)
type Channel ¶
type Channel struct {
Name string
Channel chan []any
Columns *map[string]int
DomainKeySpec *DomainKeysSpec
Config *ChannelSpec
}
type ChannelRegistry ¶
type ChannelRegistry struct {
InputRowChannel *InputChannel
ComputeChannels map[string]*Channel
OutputTableChannels []string
ClosedChannels map[string]bool
ClosedChMutex sync.Mutex
DistributionChannels map[string]*[]string
}
ChannelRegistry keeps track of all input and output channels inputRowChannel, called input_row correspond to the main input file. InputMergeChannels correspond to any merge input files. ComputeChannels correspond to all other channels created for intermediate steps in the compute graph. OutputTableChannels correspond to the output tables that need to be written ClosedChannels keeps track of which channels have been closed
func (*ChannelRegistry) AddDistributionChannel ¶
func (r *ChannelRegistry) AddDistributionChannel(input string) string
func (*ChannelRegistry) CloseChannel ¶
func (r *ChannelRegistry) CloseChannel(name string)
func (*ChannelRegistry) GetInputChannel ¶
func (r *ChannelRegistry) GetInputChannel(name string, hasGroupedRows bool) (*InputChannel, error)
func (*ChannelRegistry) GetOutputChannel ¶
func (r *ChannelRegistry) GetOutputChannel(name string) (*OutputChannel, error)
type ChannelResults ¶
type ChannelResults struct {
LoadFromS3FilesResultCh chan LoadFromS3FilesResult
Copy2DbResultCh chan chan ComputePipesResult
WritePartitionsResultCh chan chan ComputePipesResult
S3PutObjectResultCh chan ComputePipesResult
JetrulesWorkerResultCh chan chan JetrulesWorkerResult
ClusteringResultCh chan chan ClusteringResult
}
ChannelResults holds the channel reporting back results. LoadFromS3FilesResultCh: results from loading files (row count) Copy2DbResultCh: results of records written to JetStore DB (row count) WritePartitionsResultCh: report on rows output to s3 (row count) S3PutObjectResultCh: reports on nbr of files put to s3 (file count) JetrulesWorkerResultCh: reports on nbr of rete session and errors ClusteringResultCh: reports on nbr of clusters identified and errors
type ChannelSpec ¶
type ChannelSpec struct {
Name string `json:"name"`
Columns []string `json:"columns"`
ClassName string `json:"class_name,omitempty"`
DirectPropertiesOnly bool `json:"direct_properties_only,omitzero"`
HasDynamicColumns bool `json:"has_dynamic_columns,omitzero"`
SameColumnsAsInput bool `json:"same_columns_as_input,omitzero"`
DomainKeys map[string]any `json:"domain_keys,omitempty"`
DomainKeysInfo *DomainKeysSpec `json:"domain_keys_spec,omitzero"`
// contains filtered or unexported fields
}
ChannelSpec specifies the columns of a channel The columns can be obtained from a domain class from the local workspace using class_name. In that case, the columns that are specified in the slice, are added to the columns of the domain class. When direct_properties_only is true, only take the data properties of the class, not including the properties of the parent classes. DomainKeys provide the ability to configure the domain keys in the cpipes config document. DomainKeysSpec is parsed version of DomainKeys or the spec from the domain_keys_registry table. DomainKeysSpec is derived from DomainKeys when provided. columnsMap is added in StartComputePipes
func GetChannelSpec ¶
func GetChannelSpec(channels []ChannelSpec, name string) *ChannelSpec
type ChannelState ¶
type ChannelState struct {
// contains filtered or unexported fields
}
type ClusterCorrelation ¶
type ClusterCorrelation struct {
// contains filtered or unexported fields
}
func NewClusterCorrelation ¶
func NewClusterCorrelation(c1, c2 string, minObservationsCount int) *ClusterCorrelation
func (*ClusterCorrelation) AddObservation ¶
func (cc *ClusterCorrelation) AddObservation(distinctValues, nbrObservations int)
func (*ClusterCorrelation) CumulatedCounts ¶
func (cc *ClusterCorrelation) CumulatedCounts() (int, int)
returns commulated counts Note: a minimum number of observations for column1 is required, otherwise the function returns -1, -1
type ClusterInfo ¶
type ClusterInfo struct {
// contains filtered or unexported fields
}
func MakeClusters ¶
func MakeClusters(columnsCorrelation []*ColumnCorrelation, columnClassificationMap map[string]string, config *ClusteringSpec) []*ClusterInfo
Function that build the clusters from the raw column correlation
func NewClusterInfo ¶
func NewClusterInfo(classificationMap map[string]string, config *ClusteringSpec) *ClusterInfo
func (*ClusterInfo) AddMember ¶
func (cc *ClusterInfo) AddMember(column string)
func (*ClusterInfo) String ¶
func (cc *ClusterInfo) String() string
type ClusterShardingInfo ¶
type ClusterShardingInfo struct {
TotalFileSize int64 `json:"total_file_size"`
MaxNbrPartitions int `json:"max_nbr_partitions"`
NbrPartitions int `json:"nbr_partitions"`
MultiStepSharding int `json:"multi_step_sharding"`
}
Contains info about the clustersharding. This info is determined during the sharding phase in ShardFileKeys and passed to the StartReducing actions via StartComputePipesArgs
type ClusterShardingSpec ¶
type ClusterShardingSpec struct {
AppliesToFormat string `json:"applies_to_format,omitempty"`
WhenTotalSizeGe int `json:"when_total_size_ge_mb,omitzero"`
MaxNbrPartitions int `json:"max_nbr_partitions,omitzero"`
MultiStepShardingThresholds int `json:"multi_step_sharding_thresholds,omitzero"`
ShardSizeMb float64 `json:"shard_size_mb,omitzero"`
ShardMaxSizeMb float64 `json:"shard_max_size_mb,omitzero"`
ShardSizeBy float64 `json:"shard_size_by,omitzero"` // for testing only
ShardMaxSizeBy float64 `json:"shard_max_size_by,omitzero"` // for testing only
S3WorkerPoolSize int `json:"s3_worker_pool_size,omitzero"`
MaxConcurrency int `json:"max_concurrency,omitzero"`
}
Cluster sizing configuration Allows to dynamically determine the NbrNodes based on total size of input files. AppliesToFormat allows to specify the file format to which this spec applies, e.g., parquet, csv, etc. When empty, applies to all formats. WhenTotalSizeGe: Specify the condition for this to be applied in MB (read as "when total size greater than or equal to"). When using ecs tasks, MaxConcurrency applies to ECS cluster, otherwise MaxConcurrency is the number of concurrent lambda functions executing. Note that S3WorkerPoolSize is used for reducing01, all other reducing steps use the S3WorkerPoolSize set at the ClusterSpec level. ShardSizeMb/ShardMaxSizeMb must be spcified to determine the nbr of nodes and to allocate files to shards. When [MaxNbrPartitions] is not specified, the value at the ClusterSpec level is taken.
type ClusterSpec ¶
type ClusterSpec struct {
MaxNbrPartitions int `json:"max_nbr_partitons,omitzero"`
MultiStepShardingThresholds int `json:"multi_step_sharding_thresholds,omitzero"`
DefaultShardSizeMb float64 `json:"default_shard_size_mb,omitzero"`
DefaultShardMaxSizeMb float64 `json:"default_shard_max_size_mb,omitzero"`
DefaultShardSizeBy float64 `json:"default_shard_size_by,omitzero"` // for testing only
DefaultShardMaxSizeBy float64 `json:"default_shard_max_size_by,omitzero"` // for testing only
ShardOffset int `json:"shard_offset,omitzero"`
DefaultMaxConcurrency int `json:"default_max_concurrency,omitzero"`
S3WorkerPoolSize int `json:"s3_worker_pool_size,omitzero"`
ClusterShardingTiers []ClusterShardingSpec `json:"cluster_sharding_tiers,omitempty"`
IsDebugMode bool `json:"is_debug_mode,omitzero"`
KillSwitchMin int `json:"kill_switch_min,omitzero"`
ShardingInfo *ClusterShardingInfo `json:"sharding_info,omitzero"`
}
Cluster configuration [DefaultMaxConcurrency] is to override the env var MAX_CONCURRENCY [nbrPartitions] is specified at ClusterShardingSpec level otherwise at the ClusterSpec level. [nbrPartitions] is determined by the nbr of sharding nodes, capped by MaxNbrPartitions. [DefaultShardSizeMb] is the default value (in MB) when not specified at ClusterShardingSpec level. [DefaultShardMaxSizeMb] is the default value (in MB) when not specified at ClusterShardingSpec level. [DefaultShardSizeBy] is the default value (in bytes) when not specified at ClusterShardingSpec level. [DefaultShardMaxSizeBy] is the default value (in bytes) when not specified at ClusterShardingSpec level. NOTE: [ShardSizeMb] / [ShardMaxSizeMb] must be spefified for the sharding to take place. [MultiStepShardingThresholds] is the number of partitions to trigger the use of multi step sharding. When [MultiStepShardingThresholds] > 0 then [nbrPartitions] is sqrt(nbr of sharding nodes). [ShardingInfo] is calculated based on input files. [ShardingInfo] is used by the hash operator. Do not set [ShardingInfo] at configuration time, it will be ignored and replaced with the calculated values. Note: Make sure that ClusterShardingSpec is in decreasing order of WhenTotalSizeGe.
func (*ClusterSpec) NbrPartitions ¶
func (cs *ClusterSpec) NbrPartitions(mode string) int
type ClusteringDistributor ¶
type ClusteringDistributor struct {
// contains filtered or unexported fields
}
type ClusteringPoolManager ¶
type ClusteringPoolManager struct {
WorkersTaskCh chan []any
WaitForDone *sync.WaitGroup
// contains filtered or unexported fields
}
ClusteringDistributors distribute row by unique value of column1 to an associated ClusteringPoolWorkers. ClusteringPoolWorkers calculate the columns correlation in parallel. poolWg is a wait group of the workers. The WorkersTaskCh is a channel between the clustering operator and the pool manager (to limit the nbr of rows) The distributionTaskCh is used by the pool manager to distribute the input rows to all the workers. The distributionResultCh is used to collect the correlation resultts from the workers by the pool manager. The correlationOutputCh is to send the correlation results to s3, this is done when is_debug is true
type ClusteringResult ¶
type ClusteringResult struct {
Err error
}
type ClusteringSpec ¶
type ClusteringSpec struct {
MaxInputCount int `json:"max_input_count,omitzero"`
MinColumn1NonNilCount int `json:"min_column1_non_null_count,omitzero"`
MinColumn2NonNilCount int `json:"min_column2_non_null_count,omitzero"`
TargetColumnsLookup TargetColumnsLookupSpec `json:"target_columns_lookup"`
ClusterDataSubclassification []string `json:"cluster_data_subclassification,omitempty"`
SoloDataSubclassification []string `json:"solo_data_subclassification,omitempty"`
TransitiveDataClassification []string `json:"transitive_data_classification,omitempty"`
IsDebug bool `json:"is_debug,omitzero"`
CorrelationOutputChannel *OutputChannelConfig `json:"correlation_output_channel,omitzero"`
}
If is_debug is true, correlation results are forwarded to s3 otherwise the correlation_output_channel is only used to specify the intermediate channels between the pool manager and the workers. MinColumn1NonNilCount is min nbr of column1 distinct values observed MinColumn2NonNilCount is min nbr of non nil values of column2 for a worker to report the correlation. ClusterDataSubclassification contains data_classification values, when found in a cluster all columns member of the cluster get that value as data_subclassification.
type ClusteringTransformationPipe ¶
type ClusteringTransformationPipe struct {
// contains filtered or unexported fields
}
func (*ClusteringTransformationPipe) Apply ¶
func (ctx *ClusteringTransformationPipe) Apply(input *[]any) error
Implementing interface PipeTransformationEvaluator Each call to Apply, the input correspond to a row for which we calculate the column correlation.
func (*ClusteringTransformationPipe) Done ¶
func (ctx *ClusteringTransformationPipe) Done() error
func (*ClusteringTransformationPipe) Finally ¶
func (ctx *ClusteringTransformationPipe) Finally()
type ClusteringWorker ¶
type ClusteringWorker struct {
// contains filtered or unexported fields
}
func NewClusteringWorker ¶
func NewClusteringWorker(config *ClusteringSpec, source *InputChannel, column1 *string, columns2 []string, outputChannel *OutputChannel, done chan struct{}, errCh chan error) *ClusteringWorker
source and outputChannel are provided for their spec, the data is sent and recieved via inputCh and outputCh
func (*ClusteringWorker) DoWork ¶
func (ctx *ClusteringWorker) DoWork(inputCh <-chan []any, outputCh chan<- []any, resultCh chan ClusteringResult)
type ColumnCorrelation ¶
type ColumnCorrelation struct {
// contains filtered or unexported fields
}
func NewColumnCorrelation ¶
func NewColumnCorrelation(c1, c2 string, dc1, dc2, oc int) *ColumnCorrelation
type ColumnFileSpec ¶
type ColumnFileSpec struct {
// OutputLocation: custom file key.
// Bucket is bucket or empty for jetstore one.
// Delimiter: rune delimiter to use for output file.
Bucket string `json:"bucket,omitempty"`
OutputLocation string `json:"output_location,omitempty"`
SchemaProvider string `json:"schema_provider,omitempty"`
Delimiter rune `json:"delimiter,omitzero"`
}
type ColumnNameLookupNode ¶
type ColumnNameLookupNode struct {
Name string `json:"name"`
ColumnNames []string `json:"column_names,omitempty"`
ColumnNameFragments []string `json:"column_name_fragments,omitempty"`
}
ColumnNameLookupNode specifies the column name to classification token Name: classification token name ColumnNames: list of column names that map to the classification token
type ColumnNameTokenNode ¶
type ColumnNameTokenNode struct {
Name string `json:"name"`
Lookup []*ColumnNameLookupNode `json:"lookup,omitempty"`
}
ColumnNameTokenNode specifies the classification by column name match Name: correspond to the name of the output column where the classification token is stored Lookup: list of ColumnNameLookupNode to match the column names to the classification token.
type ComputePipesArgs ¶
type ComputePipesArgs struct {
ComputePipesNodeArgs
ComputePipesCommonArgs
}
Full arguments to cp_node for sharding and reducing
type ComputePipesCommonArgs ¶
type ComputePipesCommonArgs struct {
CpipesMode string `json:"cpipes_mode,omitempty"`
Client string `json:"client,omitempty"`
Org string `json:"org,omitempty"`
ObjectType string `json:"object_type,omitempty"`
FileKey string `json:"file_key,omitempty"`
SessionId string `json:"session_id,omitempty"`
MainInputStepId string `json:"read_step_id,omitempty"`
MergeFiles bool `json:"merge_files"`
InputSessionId string `json:"input_session_id,omitempty"`
SourcePeriodKey int `json:"source_period_key"`
ProcessName string `json:"process_name,omitempty"`
SourcesConfig SourcesConfigSpec `json:"sources_config"`
DomainKeysSpecByClass map[string]*DomainKeysSpec `json:"domain_keys_by_class,omitempty"`
PipelineConfigKey int `json:"pipeline_config_key"`
UserEmail string `json:"user_email,omitempty"`
}
Common arguments factored out and put into the the ComputePipesConfig component MergeFile property is to indicate that the pipe is at the stage of merging the part files into a single output file. This will use a single node since merge_files has a single partition to read from (the step_id prior to merge_files writes a single partition). Note: Client and Org are the pipeline execution client and org and may be different than the client/vendor of the actual data (case using stand-in client/org name). In that situation the actual client/vendor of the data is specified at run time via the SchemaProviders on table input_registry.
type ComputePipesConfig ¶
type ComputePipesConfig struct {
CommonRuntimeArgs *ComputePipesCommonArgs `json:"common_runtime_args,omitzero"`
MetricsConfig *MetricsSpec `json:"metrics_config,omitzero"`
ClusterConfig *ClusterSpec `json:"cluster_config,omitzero"`
OutputTables []*TableSpec `json:"output_tables,omitempty"`
OutputFiles []OutputFileSpec `json:"output_files,omitempty"`
LookupTables []*LookupSpec `json:"lookup_tables,omitempty"`
Channels []ChannelSpec `json:"channels,omitempty"`
Context []ContextSpec `json:"context,omitempty"`
SchemaProviders []*SchemaProviderSpec `json:"schema_providers,omitempty"`
PipesConfig []PipeSpec `json:"pipes_config,omitempty"`
ReducingPipesConfig [][]PipeSpec `json:"reducing_pipes_config,omitempty"`
ConditionalPipesConfig []ConditionalPipeSpec `json:"conditional_pipes_config,omitempty"`
}
This file contains the Compute Pipes configuration model
func UnmarshalComputePipesConfig ¶
func UnmarshalComputePipesConfig(computePipesJson *string) (*ComputePipesConfig, error)
func (*ComputePipesConfig) GetComputePipes ¶
func (cp *ComputePipesConfig) GetComputePipes(stepId int, env map[string]any) ([]PipeSpec, int, error)
This function is called once per compute pipes step (sharding or redicung) so we construct the ExprNodeEvaluator as needed.
func (*ComputePipesConfig) GetStepName ¶
func (cp *ComputePipesConfig) GetStepName(stepId int) string
func (*ComputePipesConfig) MainInputChannel ¶
func (cp *ComputePipesConfig) MainInputChannel() *InputChannelConfig
func (*ComputePipesConfig) NbrComputePipes ¶
func (cp *ComputePipesConfig) NbrComputePipes() int
type ComputePipesContext ¶
type ComputePipesContext struct {
ComputePipesArgs
CpConfig *ComputePipesConfig
FileKeyComponents map[string]any
PartFileKeyComponents []CompiledPartFileComponent
AddionalInputHeaders []string
EnvSettings map[string]any
SamplingCount int
JetStoreTempFolder string
InputFileKeys [][]*FileKeyInfo
JetRules JetRulesProxy
ChResults *ChannelResults
KillSwitch chan struct{}
Done chan struct{}
ErrCh chan error
FileNamesCh []chan FileName
DownloadS3ResultCh chan DownloadS3Result // avoid to modify ChannelResult for now...
S3DeviceMgr *S3DeviceManager
SchemaManager *SchemaManager
}
func (*ComputePipesContext) DoneAll ¶
func (cpCtx *ComputePipesContext) DoneAll(err error)
func (*ComputePipesContext) DownloadS3Files ¶
func (cpCtx *ComputePipesContext) DownloadS3Files(inFolderPath []string, externalBucket string, fileKeys [][]*FileKeyInfo) error
func (*ComputePipesContext) InsertPipelineExecutionStatus ¶
func (cpCtx *ComputePipesContext) InsertPipelineExecutionStatus(dbpool *pgxpool.Pool) (int, error)
Register the CPIPES execution status details to pipeline_execution_details
func (*ComputePipesContext) NewMergeFileReader ¶
func (cpCtx *ComputePipesContext) NewMergeFileReader(mergePos int, inputFormat string, delimiter rune, outputSp SchemaProvider, headers []string, writeHeaders bool, compression string) (io.Reader, error)
outputSp is needed to determine if we quote all or non fields. It also provided the writeHeaders value.
func (*ComputePipesContext) NewS3DeviceManager ¶
func (cpCtx *ComputePipesContext) NewS3DeviceManager() error
Create the S3DeviceManager
func (*ComputePipesContext) ProcessFilesAndReportStatus ¶
func (*ComputePipesContext) ReadCsvFile ¶
func (cpCtx *ComputePipesContext) ReadCsvFile( filePath *FileName, fileReader ReaderAtSeeker, castToRdfTxtTypeFncs []CastToRdfTxtFnc, reorderColumnsOnRead []int, computePipesInputCh chan<- []any, badRowChannel *BadRowsChannel) (int64, int64, error)
func (*ComputePipesContext) ReadFixedWidthFile ¶
func (cpCtx *ComputePipesContext) ReadFixedWidthFile( filePath *FileName, fileReader ReaderAtSeeker, fwEncodingInfo *FixedWidthEncodingInfo, castToRdfTxtTypeFncs []CastToRdfTxtFnc, reorderColumnsOnRead []int, computePipesInputCh chan<- []any, badRowChannel *BadRowsChannel) (int64, int64, error)
func (*ComputePipesContext) ReadParquetFileV2 ¶
func (cpCtx *ComputePipesContext) ReadParquetFileV2(filePath *FileName, fileReader parquet.ReaderAtSeeker, readBatchSize int64, castToRdfTxtTypeFncs []CastToRdfTxtFnc, inputSchemaCh chan<- ParquetSchemaInfo, reorderColumnsOnRead []int, computePipesInputCh chan<- []any) (int64, error)
func (*ComputePipesContext) ReadXlsxFile ¶
func (cpCtx *ComputePipesContext) ReadXlsxFile(filePath *FileName, xlsxSheetInfo map[string]any, castToRdfTxtTypeFncs []CastToRdfTxtFnc, reorderColumnsOnRead []int, computePipesInputCh chan<- []any, badRowChannel *BadRowsChannel) (int64, int64, error)
ReadXlsxFile reads an xlsx file and sends the records to computePipesInputCh EnforceRowMinLength and EnforceRowMaxLength does not apply to xlsx files, values past the last expected field are ignored. As a result badRowChannel is not used.
func (*ComputePipesContext) StartComputePipes ¶
func (cpCtx *ComputePipesContext) StartComputePipes(dbpool *pgxpool.Pool, inputSchemaCh <-chan ParquetSchemaInfo, computePipesInputCh <-chan []any, computePipesMergeChs []chan []any)
Function to write transformed row to database
func (*ComputePipesContext) StartMergeFiles ¶
func (cpCtx *ComputePipesContext) StartMergeFiles(dbpool *pgxpool.Pool) (cpErr error)
Function to merge the partfiles into a single file by streaming the content to s3 using a channel. This is run in the main thread, so no need to have a result channel back to the caller.
func (*ComputePipesContext) UpdatePipelineExecutionStatus ¶
type ComputePipesNodeArgs ¶
type ComputePipesNodeArgs struct {
NodeId int `json:"id"`
JetsPartitionLabel string `json:"jp,omitempty"`
PipelineExecKey int `json:"pe"`
}
Arguments to start cp_node for sharding and reducing minimal set of arguments to reduce the size of the json to call the lambda functions
func ReadCpipesArgsFromS3 ¶
func ReadCpipesArgsFromS3(s3Location string) ([]ComputePipesNodeArgs, error)
This is currently not used, needed for Distributed Map (for testing purpose)
func (*ComputePipesNodeArgs) CoordinateComputePipes ¶
func (args *ComputePipesNodeArgs) CoordinateComputePipes(ctx context.Context, dbpool *pgxpool.Pool, jrProxy JetRulesProxy) error
type ComputePipesResult ¶
type ComputePipesRun ¶
type ComputePipesRun struct {
CpipesCommands any `json:"cpipesCommands"`
CpipesCommandsS3Key string `json:"cpipesCommandsS3Key,omitempty"`
CpipesMaxConcurrency int `json:"cpipesMaxConcurrency"`
StartReducing StartComputePipesArgs `json:"startReducing"`
IsLastReducing bool `json:"isLastReducing"`
NoMoreTask bool `json:"noMoreTask"`
UseECSReducingTask bool `json:"useECSReducingTask"`
ReportsCommand []string `json:"reportsCommand"`
SuccessUpdate map[string]any `json:"successUpdate"`
ErrorUpdate map[string]any `json:"errorUpdate"`
}
Returned by the cp_starter for a cpipes run. CpipesCommandsS3Key is for Distributed Map, currently not used. CpipesMaxConcurrency is passed to step function. IsLastReducing is true when StartReducing is the last step. NoMoreTask is true when StartReducing turned out to have nothing to do and we're past the last step, the step function will go directly to ReportsCommand. UseECSReducingTask is true when to use fargate task rather than lambda functions. ReportsCommand contains the argument for RunReport SuccessUpdate / ErrorUpdate are the arguments for status update.
type ConditionalEnvVarEvaluator ¶
type ConditionalEnvVarEvaluator struct {
// contains filtered or unexported fields
}
type ConditionalEnvVariable ¶
type ConditionalEnvVariable struct {
CaseExpr []CaseExpression `json:"case_expr,omitempty"` // case operator
ElseExpr []*ExpressionNode `json:"else_expr,omitempty"` // case operator
}
type ConditionalPipeSpec ¶
type ConditionalPipeSpec struct {
StepName string `json:"step_name,omitempty"`
UseEcsTasks bool `json:"use_ecs_tasks,omitzero"`
UseEcsTasksWhen *ExpressionNode `json:"use_ecs_tasks_when,omitzero"`
PipesConfig []PipeSpec `json:"pipes_config"`
When *ExpressionNode `json:"when,omitzero"`
AddlEnv []ConditionalEnvVariable `json:"addl_env,omitempty"`
}
ConditionalPipe: Each step are executed conditionally. When the key "when" is nil, the step is always executed. Available expr variables as main schema provider env var (see above): multi_step_sharding as int, when > 0, nbr of shards is nbr_partition**2 total_file_size in bytes nbr_partitions as int (used for hashing purpose) use_ecs_tasks is true to use ecs fargate task use_ecs_tasks_when is an expression as the when property.
type ConditionalTransformationSpec ¶
type ConditionalTransformationSpec struct {
When ExpressionNode `json:"when"`
Then TransformationSpec `json:"then"`
}
This type is to provide conditional TransformationSpec such that the host TransformationSpec has fields overriden based on the condition. When is the condition to evaluate, if true then apply the Then spec. Note: when Then.Type is not empty, replace the host TransformationSpec altogether.
type ContextSpec ¶
type CpipesStartup ¶
type CpipesStartup struct {
CpConfig ComputePipesConfig `json:"compute_pipes_config"`
ProcessName string `json:"process_name,omitempty"`
InputColumns []string `json:"input_columns,omitempty"`
InputColumnsOriginal []string `json:"input_columns_original,omitempty"`
MainInputSchemaProviderConfig *SchemaProviderSpec `json:"main_input_schema_provider_config,omitzero"`
MainInputDomainKeysSpec *DomainKeysSpec `json:"main_input_domain_keys_spec,omitzero"`
MainInputDomainClass string `json:"main_input_domain_class,omitempty"`
DomainKeysSpecByClass map[string]*DomainKeysSpec `json:"domain_keys_spec_by_class,omitzero"`
EnvSettings map[string]any `json:"env_settings,omitzero"`
PipelineConfigKey int `json:"pipeline_config_key,omitempty"`
InputSessionId string `json:"input_session_id,omitempty"`
SourcePeriodKey int `json:"source_period_key,omitempty"`
OperatorEmail string `json:"operator_email,omitempty"`
}
Collect and prepare cpipes configuration for both sharding and reducing steps. InputColumns correspond to the domain column in the main input file, which can be a subset of the columns in the main_input schema provider based on source_config table. InputColumns can be empty if needs to be read from the input file. InputColumnsOriginal is the original input columns before uniquefying them. It is empty if InputColumns is empty or already unique. MainInputDomainKeysSpec contains the domain keys spec based on source_config table, which can be overriden by value from the main schema provider. MainInputDomainClass applies when input_registry.input_type = 'domain_table'
func (*CpipesStartup) EvalUseEcsTask ¶
func (cpipesStartup *CpipesStartup) EvalUseEcsTask(stepId int) (bool, error)
func (*CpipesStartup) GetComputePipesPartitions ¶
func (cpipesStartup *CpipesStartup) GetComputePipesPartitions(dbpool *pgxpool.Pool, processName, sessionId string, inputChannelConfig *InputChannelConfig) ([]JetsPartitionInfo, error)
GetComputePipesPartitions get the jets_partition partition id and size. This is use during reducing steps.
func (*CpipesStartup) ValidatePipeSpecConfig ¶
func (args *CpipesStartup) ValidatePipeSpecConfig(cpConfig *ComputePipesConfig, pipeConfig []PipeSpec) error
Function to validate the PipeSpec output channel config Apply a default snappy compression if compression is not specified and channel Type 'stage'. This function also syncs the input and ouput channels with the associated schema provider.
type CsvSourceS3 ¶
type CsvSourceS3 struct {
// contains filtered or unexported fields
}
func NewCsvSourceS3 ¶
func NewCsvSourceS3(spec *CsvSourceSpec, env map[string]any) (*CsvSourceS3, error)
func (*CsvSourceS3) ReadFileToMetaGraph ¶
func (ctx *CsvSourceS3) ReadFileToMetaGraph(re JetRuleEngine, config *JetrulesSpec) error
*TODO Refactor this ReadFileToMetaGraph func
type CsvSourceSpec ¶
type CsvSourceSpec struct {
// This is used for lookup tables and loading metadata in jetrules.
// This is a single file source, the first file found is taken.
// Type range: cpipes, csv_file (future)
// Default values are taken from current pipeline
// Format: csv, headerless_csv
// Compression: none, snappy
// MakeEmptyWhenNoFile: Do not make an error when no files
// are found, make empty source. Default: generate an error when no files
// are found in s3.
Type string `json:"type"`
Format string `json:"format,omitempty"`
Compression string `json:"compression,omitempty"`
Delimiter rune `json:"delimiter,omitzero"` // default ','
ProcessName string `json:"process_name,omitempty"` // for cpipes
ReadStepId string `json:"read_step_id,omitempty"` // for cpipes
JetsPartitionLabel string `json:"jets_partition,omitempty"` // for cpipes
SessionId string `json:"session_id,omitempty"` // for cpipes
ClassName string `json:"class_name,omitempty"` // used by jetrules_config
MakeEmptyWhenNoFile bool `json:"make_empty_source_when_no_files_found,omitzero"`
}
type DataSchemaSpec ¶
type DateBuilder ¶
type DateBuilder struct {
// contains filtered or unexported fields
}
func (*DateBuilder) Append ¶
func (b *DateBuilder) Append(v any)
func (*DateBuilder) AppendEmptyValue ¶
func (b *DateBuilder) AppendEmptyValue()
func (*DateBuilder) AppendNull ¶
func (b *DateBuilder) AppendNull()
func (*DateBuilder) NewArray ¶
func (b *DateBuilder) NewArray() arrow.Array
func (*DateBuilder) Release ¶
func (b *DateBuilder) Release()
func (*DateBuilder) Reserve ¶
func (b *DateBuilder) Reserve(n int)
type Decimal128Builder ¶
type Decimal128Builder struct {
Precision int32
Scale int32
// contains filtered or unexported fields
}
func (*Decimal128Builder) Append ¶
func (b *Decimal128Builder) Append(v any)
func (*Decimal128Builder) AppendEmptyValue ¶
func (b *Decimal128Builder) AppendEmptyValue()
func (*Decimal128Builder) AppendNull ¶
func (b *Decimal128Builder) AppendNull()
func (*Decimal128Builder) NewArray ¶
func (b *Decimal128Builder) NewArray() arrow.Array
func (*Decimal128Builder) Release ¶
func (b *Decimal128Builder) Release()
func (*Decimal128Builder) Reserve ¶
func (b *Decimal128Builder) Reserve(n int)
type Decimal256Builder ¶
type Decimal256Builder struct {
Precision int32
Scale int32
// contains filtered or unexported fields
}
func (*Decimal256Builder) Append ¶
func (b *Decimal256Builder) Append(v any)
func (*Decimal256Builder) AppendEmptyValue ¶
func (b *Decimal256Builder) AppendEmptyValue()
func (*Decimal256Builder) AppendNull ¶
func (b *Decimal256Builder) AppendNull()
func (*Decimal256Builder) NewArray ¶
func (b *Decimal256Builder) NewArray() arrow.Array
func (*Decimal256Builder) Release ¶
func (b *Decimal256Builder) Release()
func (*Decimal256Builder) Reserve ¶
func (b *Decimal256Builder) Reserve(n int)
type DefaultPF ¶
type DefaultPF struct {
// contains filtered or unexported fields
}
DefaultPF is when there is no preprocessing function, simply add the value to the byte buffer
type DefaultSchemaProvider ¶
type DefaultSchemaProvider struct {
// contains filtered or unexported fields
}
columnNames is the list of file headers for fixed_width fwColumnPrefix is for fixed_width with multiple record type, prefix for making table columns (dkInfo)
func (*DefaultSchemaProvider) AdjustColumnWidth ¶
func (sp *DefaultSchemaProvider) AdjustColumnWidth(width map[string]int) error
func (*DefaultSchemaProvider) BadRowsConfig ¶
func (sp *DefaultSchemaProvider) BadRowsConfig() *BadRowsSpec
func (*DefaultSchemaProvider) BlankFieldMarkers ¶
func (sp *DefaultSchemaProvider) BlankFieldMarkers() *BlankFieldMarkers
func (*DefaultSchemaProvider) Bucket ¶
func (sp *DefaultSchemaProvider) Bucket() string
func (*DefaultSchemaProvider) CapDobYears ¶
func (sp *DefaultSchemaProvider) CapDobYears() int
func (*DefaultSchemaProvider) ColumnNames ¶
func (sp *DefaultSchemaProvider) ColumnNames() []string
func (*DefaultSchemaProvider) Columns ¶
func (sp *DefaultSchemaProvider) Columns() []SchemaColumnSpec
func (*DefaultSchemaProvider) Compression ¶
func (sp *DefaultSchemaProvider) Compression() string
func (*DefaultSchemaProvider) Delimiter ¶
func (sp *DefaultSchemaProvider) Delimiter() rune
func (*DefaultSchemaProvider) DetectEncoding ¶
func (sp *DefaultSchemaProvider) DetectEncoding() bool
func (*DefaultSchemaProvider) DiscardFileHeaders ¶
func (sp *DefaultSchemaProvider) DiscardFileHeaders() bool
func (*DefaultSchemaProvider) DomainClass ¶
func (sp *DefaultSchemaProvider) DomainClass() string
func (*DefaultSchemaProvider) DomainKeys ¶
func (sp *DefaultSchemaProvider) DomainKeys() map[string]any
func (*DefaultSchemaProvider) DropExcedentHeaders ¶
func (sp *DefaultSchemaProvider) DropExcedentHeaders() bool
func (*DefaultSchemaProvider) Encoding ¶
func (sp *DefaultSchemaProvider) Encoding() string
func (*DefaultSchemaProvider) EnforceRowMaxLength ¶
func (sp *DefaultSchemaProvider) EnforceRowMaxLength() bool
func (*DefaultSchemaProvider) EnforceRowMinLength ¶
func (sp *DefaultSchemaProvider) EnforceRowMinLength() bool
func (*DefaultSchemaProvider) Env ¶
func (sp *DefaultSchemaProvider) Env() map[string]any
func (*DefaultSchemaProvider) FixedWidthEncodingInfo ¶
func (sp *DefaultSchemaProvider) FixedWidthEncodingInfo() *FixedWidthEncodingInfo
func (*DefaultSchemaProvider) FixedWidthFileHeaders ¶
func (sp *DefaultSchemaProvider) FixedWidthFileHeaders() ([]string, string)
func (*DefaultSchemaProvider) Format ¶
func (sp *DefaultSchemaProvider) Format() string
func (*DefaultSchemaProvider) Initialize ¶
func (sp *DefaultSchemaProvider) Initialize(_ *pgxpool.Pool, spec *SchemaProviderSpec, _ map[string]any, isDebugMode bool) error
func (*DefaultSchemaProvider) InputFormatDataJson ¶
func (sp *DefaultSchemaProvider) InputFormatDataJson() string
func (*DefaultSchemaProvider) IsPartFiles ¶
func (sp *DefaultSchemaProvider) IsPartFiles() bool
func (*DefaultSchemaProvider) Key ¶
func (sp *DefaultSchemaProvider) Key() string
func (*DefaultSchemaProvider) NbrRowsInRecord ¶
func (sp *DefaultSchemaProvider) NbrRowsInRecord() int64
func (*DefaultSchemaProvider) NoQuotes ¶
func (sp *DefaultSchemaProvider) NoQuotes() bool
func (*DefaultSchemaProvider) OutputEncoding ¶
func (sp *DefaultSchemaProvider) OutputEncoding() string
func (*DefaultSchemaProvider) ParquetSchema ¶
func (sp *DefaultSchemaProvider) ParquetSchema() *ParquetSchemaInfo
func (*DefaultSchemaProvider) QuoteAllRecords ¶
func (sp *DefaultSchemaProvider) QuoteAllRecords() bool
func (*DefaultSchemaProvider) ReadBatchSize ¶
func (sp *DefaultSchemaProvider) ReadBatchSize() int64
func (*DefaultSchemaProvider) ReadDateLayout ¶
func (sp *DefaultSchemaProvider) ReadDateLayout() string
func (*DefaultSchemaProvider) ReorderColumnsOnRead ¶
func (sp *DefaultSchemaProvider) ReorderColumnsOnRead() []int
func (*DefaultSchemaProvider) SchemaName ¶
func (sp *DefaultSchemaProvider) SchemaName() string
func (*DefaultSchemaProvider) SetDodToJan1 ¶
func (sp *DefaultSchemaProvider) SetDodToJan1() bool
func (*DefaultSchemaProvider) SetParquetSchema ¶
func (sp *DefaultSchemaProvider) SetParquetSchema(schema *ParquetSchemaInfo)
func (*DefaultSchemaProvider) TrimColumns ¶
func (sp *DefaultSchemaProvider) TrimColumns() bool
func (*DefaultSchemaProvider) UseLazyQuotes ¶
func (sp *DefaultSchemaProvider) UseLazyQuotes() bool
func (*DefaultSchemaProvider) UseLazyQuotesSpecial ¶
func (sp *DefaultSchemaProvider) UseLazyQuotesSpecial() bool
func (*DefaultSchemaProvider) VariableFieldsPerRecord ¶
func (sp *DefaultSchemaProvider) VariableFieldsPerRecord() bool
func (*DefaultSchemaProvider) WriteDateLayout ¶
func (sp *DefaultSchemaProvider) WriteDateLayout() string
type DistinctCount ¶
type DistinctSpec ¶
type DistinctSpec struct {
DistinctOn []string `json:"distinct_on,omitempty"`
}
type DistinctTransformationPipe ¶
type DistinctTransformationPipe struct {
// contains filtered or unexported fields
}
func (*DistinctTransformationPipe) Apply ¶
func (ctx *DistinctTransformationPipe) Apply(input *[]interface{}) error
Implementing interface PipeTransformationEvaluator
func (*DistinctTransformationPipe) Done ¶
func (ctx *DistinctTransformationPipe) Done() error
func (*DistinctTransformationPipe) Finally ¶
func (ctx *DistinctTransformationPipe) Finally()
type DomainKeyInfo ¶
type DomainKeyInfo struct {
KeyExpr []string `json:"key_expr,omitempty"`
ObjectType string `json:"object_type,omitempty"`
}
DomainKeyInfo associates a domain hashed key made as a composide domain key with an optional prep-processing function on each of the column making the key. KeyExpr is the original function(column) expression. Columns: list of input column name making the domain key PreprocessFnc: list of pre-processing functions for the input column (one per column) ObjectType: Object type associated with the Domain Key
type DomainKeysSpec ¶
type DomainKeysSpec struct {
HashingOverride string `json:"hashing_override,omitempty"`
DomainKeys map[string]*DomainKeyInfo `json:"domain_keys_info,omitempty"`
}
DomainKeysSpec contains the overall information, with overriding hashing method. The hashing method is applicable to all object types. DomainKeys is a map keyed by the object type.
func ParseDomainKeyInfo ¶
func ParseDomainKeyInfo(mainObjectType string, domainKeys any) (*DomainKeysSpec, error)
Parse domain key configuration info from [domainKeys], supporting 3 use cases: in json format:
"key" ["key1", "key2"]
{"ObjectType1": "key", "ObjectType1": ["key1", "key2"]}
type DownloadS3Result ¶
type EntityHint ¶
type ExprBuilderContext ¶
main builder, builds expression evaluator
func (ExprBuilderContext) BuildExprNodeEvaluator ¶
func (ctx ExprBuilderContext) BuildExprNodeEvaluator(sourceName string, columns map[string]int, spec *ExpressionNode) (evalExpression, error)
Note that columns can be nil when evalExtression is having map[string]any as argument.
type ExpressionNode ¶
type ExpressionNode struct {
// Type is for leaf nodes: select, value
// Name is for CaseExpression.Then and TransformationColumnSpec.ElseExpr
// to indicate which column to set the calculated value
Name string `json:"name,omitempty"` // TransformationColumnSpec case operator
Type string `json:"type,omitempty"`
Expr string `json:"expr,omitempty"`
ExprList []string `json:"expr_list,omitempty"`
AsRdfType string `json:"as_rdf_type,omitempty"`
Arg *ExpressionNode `json:"arg,omitzero"`
Lhs *ExpressionNode `json:"lhs,omitzero"`
Op string `json:"op,omitempty"`
Rhs *ExpressionNode `json:"rhs,omitzero"`
}
type FetchFileInfoResult ¶
type FetchFileInfoResult struct {
Headers []string
SepFlag jcsv.Chartype
Encoding string
EolByte byte
MultiColumns bool
}
func FetchHeadersAndDelimiterFromFile ¶
func FetchHeadersAndDelimiterFromFile(externalBucket, fileKey, fileFormat, compression, encoding string, delimitor rune, multiColumnsInput, noQuotes, fetchHeaders, fetchDelimitor, fetchEncoding, detectCrAsEol bool, fileFormatDataJson string) (*FetchFileInfoResult, error)
Main function
type FileConfig ¶
type FileConfig struct {
BadRowsConfig *BadRowsSpec `json:"bad_rows_config,omitzero"`
BlankFieldMarkers *BlankFieldMarkersSpec `json:"blank_field_markers,omitzero"`
Bucket string `json:"bucket,omitempty"`
Compression string `json:"compression,omitempty"`
Delimiter rune `json:"delimiter,omitzero"`
DetectCrAsEol bool `json:"detect_cr_as_eol,omitzero"`
DetectEncoding bool `json:"detect_encoding,omitzero"`
DiscardFileHeaders bool `json:"discard_file_headers,omitzero"`
DomainClass string `json:"domain_class,omitempty"`
DomainKeys map[string]any `json:"domain_keys,omitempty"`
DropExcedentHeaders bool `json:"drop_excedent_headers,omitzero"`
Encoding string `json:"encoding,omitempty"`
EnforceRowMaxLength bool `json:"enforce_row_max_length,omitzero"`
EnforceRowMinLength bool `json:"enforce_row_min_length,omitzero"`
EolByte byte `json:"eol_byte,omitzero"`
FileKey string `json:"file_key,omitempty"`
LookbackPeriods string `json:"lookback_periods,omitzero"`
FileName string `json:"file_name,omitempty"` // Type output
FixedWidthColumnsCsv string `json:"fixed_width_columns_csv,omitempty"`
Format string `json:"format,omitempty"`
InputFormatDataJson string `json:"input_format_data_json,omitempty"`
IsPartFiles bool `json:"is_part_files,omitzero"`
KeyPrefix string `json:"key_prefix,omitempty"`
MainInputRowCount int64 `json:"main_input_row_count,omitzero"`
MultiColumnsInput bool `json:"multi_columns_input,omitzero"`
NbrRowsInRecord int64 `json:"nbr_rows_in_record,omitzero"` // Format: parquet
NoQuotes bool `json:"no_quotes,omitzero"`
ParquetSchema *ParquetSchemaInfo `json:"parquet_schema,omitzero"`
PutHeadersOnFirstPartition bool `json:"put_headers_on_first_partition,omitzero"`
QuoteAllRecords bool `json:"quote_all_records,omitzero"`
ReadBatchSize int64 `json:"read_batch_size,omitzero"` // Format: parquet
ReadDateLayout string `json:"read_date_layout,omitempty"`
ReorderColumnsOnRead []int `json:"reorder_columns_on_read,omitempty"`
TrimColumns bool `json:"trim_columns,omitzero"`
UseLazyQuotes bool `json:"use_lazy_quotes,omitzero"`
UseLazyQuotesSpecial bool `json:"use_lazy_quotes_special,omitzero"`
VariableFieldsPerRecord bool `json:"variable_fields_per_record,omitzero"`
WriteDateLayout string `json:"write_date_layout,omitempty"`
OutputEncoding string `json:"output_encoding,omitempty"`
OutputEncodingSameAsInput bool `json:"output_encoding_same_as_input,omitempty"`
}
Configuration type for factoring out all file settings. This is used by more specific types such as: SchemaProviderSpec, InputChannelConfig, OutputChannelConfig, OutputFileSpec
type FileKeyInfo ¶
type FileKeyInfo struct {
// contains filtered or unexported fields
}
type FileName ¶
type FileName struct {
LocalFileName string
InFileKeyInfo FileKeyInfo
}
type FilterColumnSpec ¶
type FilterColumnSpec struct {
LookupName string `json:"lookup_name,omitempty"`
ColumnName string `json:"column_name,omitempty"`
LookupColumn string `json:"lookup_column,omitempty"`
RetainOnValues []string `json:"retain_on_values,omitempty"`
}
FilterColumnSpec specify how to filter the input rows before shuffling LookupName is the name of the lookup table containing the column metadata, produced by the analyze operator. ColumnName is the name of the column of the lookup table containing the column name to use on the output rows. LookupColumn is the name of the column in the lookup table containing column name of the metadata table to filter on. RetainOnValues is the list of values in the lookup table for LookupColumn to retain, only rows with thosae values are retained.
type FilterSpec ¶
type FilterSpec struct {
RowLengthStrict bool `json:"row_length_strict,omitzero"`
When *ExpressionNode `json:"when,omitzero"`
MaxOutputCount int `json:"max_output_records,omitzero"`
}
Filter row base on:
- when criteria, if provided,
- max output count, if provided.
RowLengthStrict: when true, will enforce that input row length matches the schema length, otherwise they are filtered.
type FilterTransformationPipe ¶
type FilterTransformationPipe struct {
// contains filtered or unexported fields
}
Filter operator. Filter input records based on filter criteria. Note: Automatically filter the bad rows (ie rows w/o the expected number of columns)
func (*FilterTransformationPipe) Apply ¶
func (ctx *FilterTransformationPipe) Apply(input *[]any) error
Implementing interface PipeTransformationEvaluator
func (*FilterTransformationPipe) Done ¶
func (ctx *FilterTransformationPipe) Done() error
func (*FilterTransformationPipe) Finally ¶
func (ctx *FilterTransformationPipe) Finally()
type FixedWidthColumn ¶
Struct to hold column names and positions for fixed-width file encoding. ColumnsMap key is record type or empty string if single record type (RecordTypeColumn = nil in that case). In ColumnsMap elements, ColumnName is <record type>.<record column name> to make it unique across record types. However RecordTypeColumn.ColumnName is <record column name> without prefix. Note that all record type MUST have RecordTypeColumn.ColumnName with same start and end position. Any record having a unrecognized record type (ie not found in ColumnsMap) are ignored.
func (*FixedWidthColumn) String ¶
func (c *FixedWidthColumn) String() string
type FixedWidthEncodingInfo ¶
type FixedWidthEncodingInfo struct {
RecordTypeColumn *FixedWidthColumn
ColumnsMap map[string]*[]*FixedWidthColumn
ColumnsOffsetMap map[string]int
RecordTypeList []string
}
func (*FixedWidthEncodingInfo) String ¶
func (fw *FixedWidthEncodingInfo) String() string
type Float32Builder ¶
type Float32Builder struct {
// contains filtered or unexported fields
}
func (*Float32Builder) Append ¶
func (b *Float32Builder) Append(v any)
func (*Float32Builder) AppendEmptyValue ¶
func (b *Float32Builder) AppendEmptyValue()
func (*Float32Builder) AppendNull ¶
func (b *Float32Builder) AppendNull()
func (*Float32Builder) NewArray ¶
func (b *Float32Builder) NewArray() arrow.Array
func (*Float32Builder) Release ¶
func (b *Float32Builder) Release()
func (*Float32Builder) Reserve ¶
func (b *Float32Builder) Reserve(n int)
type Float64Builder ¶
type Float64Builder struct {
// contains filtered or unexported fields
}
func (*Float64Builder) Append ¶
func (b *Float64Builder) Append(v any)
func (*Float64Builder) AppendEmptyValue ¶
func (b *Float64Builder) AppendEmptyValue()
func (*Float64Builder) AppendNull ¶
func (b *Float64Builder) AppendNull()
func (*Float64Builder) NewArray ¶
func (b *Float64Builder) NewArray() arrow.Array
func (*Float64Builder) Release ¶
func (b *Float64Builder) Release()
func (*Float64Builder) Reserve ¶
func (b *Float64Builder) Reserve(n int)
type FormatDatePF ¶
type FormatDatePF struct {
// contains filtered or unexported fields
}
FormatDatePF is writing a date field using YYYYMMDD format This assume the date in the input is a valid date as string Returns no error if date is empty or not valid
func (*FormatDatePF) ApplyPF ¶
func (pf *FormatDatePF) ApplyPF(buf *bytes.Buffer, input *[]any) error
func (*FormatDatePF) String ¶
func (pf *FormatDatePF) String() string
type FunctionTokenNode ¶
type FunctionTokenNode struct {
Type string `json:"type"`
ParseDateConfig *ParseDateSpec `json:"parse_date_config,omitzero"`
LargeDouble *float64 `json:"large_double,omitzero"`
}
Type: parse_date, parse_double, parse_text MinMaxDateFormat: Date parser, Type: parse_date ParseDateArguments: for Type: parse_date Large_Double: for Type: parse_double
type GroupBySpec ¶
type GroupBySpec struct {
GroupByName []string `json:"group_by_name,omitempty"`
GroupByPos []int `json:"group_by_pos,omitempty"`
GroupByCount int `json:"group_by_count,omitzero"`
DomainKey string `json:"domain_key,omitempty"`
IsDebug bool `json:"is_debug,omitzero"`
}
Specify either domain_key, group_by_name, group_by_pos or group_by_count. group_by_count has priority over other mode of grouping. group_by_name wins when both group_by_name and group_by_pos are specified. domain_key use the domain key info to compute the composite key At least one must be specified.
type GroupByTransformationPipe ¶
type GroupByTransformationPipe struct {
// contains filtered or unexported fields
}
Group By operator. Group the input records into bundles, where each record of the bundle is a rule session.
func (*GroupByTransformationPipe) Apply ¶
func (ctx *GroupByTransformationPipe) Apply(input *[]any) error
Implementing interface PipeTransformationEvaluator
func (*GroupByTransformationPipe) Done ¶
func (ctx *GroupByTransformationPipe) Done() error
func (*GroupByTransformationPipe) Finally ¶
func (ctx *GroupByTransformationPipe) Finally()
type HashEvaluator ¶
type HashEvaluator struct {
// contains filtered or unexported fields
}
HashEvaluator is a type to compute a hash key based on an input record. The hashing algo can be sha1, md5 or none. This is used to compute domain key ONLY. For regular hash-based partitioning, the hashing algo is always FNV-1a 64bit, and the hash key is always uint64.
func (*HashEvaluator) ComputeDomainKey ¶
func (ctx *HashEvaluator) ComputeDomainKey(input []any) (any, error)
func (*HashEvaluator) ComputeHash ¶
func (ctx *HashEvaluator) ComputeHash(input []any) (any, error)
func (*HashEvaluator) String ¶
func (ctx *HashEvaluator) String() string
type HashExpression ¶
type HashExpression struct {
Expr string `json:"expr,omitempty"`
CompositeExpr []string `json:"composite_expr,omitempty"`
DomainKey string `json:"domain_key,omitempty"`
NbrJetsPartitionsAny any `json:"nbr_jets_partitions,omitzero"`
MultiStepShardingMode string `json:"multi_step_sharding_mode,omitempty"`
AlternateCompositeExpr []string `json:"alternate_composite_expr,omitempty"`
NoPartitions bool `json:"no_partitions,omitzero"`
ComputeDomainKey bool `json:"compute_domain_key,omitzero"`
}
Hash using values from columns. Case single column, use Expr. Case multi column, use CompositeExpr. Expr takes precedence if both are populated. DomainKey is specified as an object_type. DomainKeysJson provides the mapping between domain keys and columns. AlternateCompositeExpr is used when Expr or CompositeExpr returns nil or empty. MultiStepShardingMode values: 'limited_range', 'full_range' or empty. NoPartitions indicated not to assign the hash to a partition (no modulo operation). ComputeDomainKey flag indicate to compute the domain key rather than a simple hash. This consider the hashing algo used and delimitor between the key components.
func MakeHashExpressionFromGroupByConfig ¶
func MakeHashExpressionFromGroupByConfig(column map[string]int, config *GroupBySpec) (*HashExpression, error)
func (*HashExpression) NbrJetsPartitions ¶
func (h *HashExpression) NbrJetsPartitions() uint64
func (*HashExpression) String ¶
func (h *HashExpression) String() string
type HashingAlgoEnum ¶
type HashingAlgoEnum int
Hashing Algo supported
const ( HashingAlgo_None HashingAlgoEnum = iota HashingAlgo_SHA1 HashingAlgo_MD5 )
func (HashingAlgoEnum) String ¶
func (e HashingAlgoEnum) String() string
type HighFreqSpec ¶
type HighFreqSpec struct {
Name string `json:"name"`
KeyRe string `json:"key_re,omitempty"`
TopPercentile int `json:"top_pct,omitzero"`
TopRank int `json:"top_rank,omitzero"`
// contains filtered or unexported fields
}
top_pct correspond the top percentile of the data, ie, retain the distinct values that correspond to
totalCount * top_pct / 100
where totalCount is all the count of value for the column. If top_pct then all distinct alues are retained. top_rank correspond to the percentage of the distinct values to retain. That is:
nbrDistinctValues * top_rank / 100
where nbrDistinctValues is the number of distinct values for the column. Note that the distinct values are by descending frequence of occurence.
type HighFreqTransformationPipe ¶
type HighFreqTransformationPipe struct {
// contains filtered or unexported fields
}
firstInputRow is the first row from the input channel. A reference to it is kept for use in the Done function so to carry over the select fields in the columnEvaluators. Note: columnEvaluators is applied only on the firstInputRow and it is used only to select column having same value for every input row or to put constant values comming from the env
func (*HighFreqTransformationPipe) Apply ¶
func (ctx *HighFreqTransformationPipe) Apply(input *[]interface{}) error
Implementing interface PipeTransformationEvaluator
func (*HighFreqTransformationPipe) Done ¶
func (ctx *HighFreqTransformationPipe) Done() error
func (*HighFreqTransformationPipe) Finally ¶
func (ctx *HighFreqTransformationPipe) Finally()
type Input2PipeSet ¶
type InputChannel ¶
type InputChannel struct {
Name string
Channel <-chan []any
Columns *map[string]int
DomainKeySpec *DomainKeysSpec
Config *ChannelSpec
HasGroupedRows bool
}
type InputChannelConfig ¶
type InputChannelConfig struct {
// Type range: memory (default), input, stage
// Format: csv, headerless_csv, etc.
// ReadBatchSize: nbr of rows to read per record (format: parquet)
// Compression: none, snappy (parquet: always snappy)
// DetectEncoding: Detect file encoding (limited) for text file format
// DetectCrAsEol: Detect if \r is used as eol (format: csv,headerless_csv)
// DiscardFileHeaders: when true, discard the headers from the input file (typically for csv format).
// EolByte: Byte to use as eol (format: csv,headerless_csv)
// MultiColumnsInput: Indicate that input file must have multiple columns,
// this is used to detect if the wrong delimiter is used (csv,headerless_csv)
// Note: SchemaProvider, Compression, Format for Type input are provided via
// ComputePipesCommonArgs.SourcesConfig (ie input_registry table).
// BadRowsConfig: Specify how to handle bad rows.
// HasGroupedRows indicates that the channel contains grouped rows,
// most likely from the group_by or merge operator.
// Note: The input_row channel (main input) will be cast to the
// rdf type specified by the domain class of the main input source.
FileConfig
Type string `json:"type"`
Name string `json:"name"`
SchemaProvider string `json:"schema_provider,omitempty"`
ReadSessionId string `json:"read_session_id,omitempty"`
ReadStepId string `json:"read_step_id,omitempty"`
ReadPartitionId string `json:"read_partition_id,omitempty"`
SamplingRate int `json:"sampling_rate,omitzero"`
SamplingMaxCount int `json:"sampling_max_count,omitzero"`
HasGroupedRows bool `json:"has_grouped_rows,omitzero"`
MergeChannels []InputChannelConfig `json:"merge_channels,omitempty"`
// contains filtered or unexported fields
}
type InputMappingExpr ¶
type InputMappingExpr struct {
InputColumn sql.NullString
DataProperty string
CleansingFunctionName sql.NullString
Argument sql.NullString
DefaultValue sql.NullString
ErrorMessage sql.NullString
}
func GetInputMapping ¶
func GetInputMapping(dbpool *pgxpool.Pool, tableName string) ([]InputMappingExpr, error)
read mapping definitions
type InputRowColumns ¶
type InputRowColumns struct {
OriginalHeaders []string `json:"original_headers"`
MainInput []string `json:"main_input"`
}
InputRowColumns: used to save the input_row channel column name in table cpipes_execution_status, column input_row_columns_json OriginalHeaders are the original headers read from the main input file
type InputSourceSpec ¶
type InputSourceSpec struct {
OriginalInputColumns []string `json:"original_input_columns,omitempty"`
InputColumns []string `json:"input_columns,omitempty"`
DomainClass string `json:"domain_class,omitempty"`
DomainKeys *DomainKeysSpec `json:"domain_keys_spec,omitempty"`
}
InputColumns correspond to columns in the input files, this applies to reducing as well as sharding steps. InputColumns is the uniquefied version of OriginalInputColumns OriginalInputColumns is the orignal input file columns For the case of sharding step, it includes columns from part files key. DomainKeys is taken from:
- source_config table or main schema provider for source_type = 'file'
- domain_keys_registry table or schema_provider / input_source_spec for source_type = 'domain_table'
DomainClass is taken from:
- domain_keys_registry table or schema_provider / input_source_spec for source_type = 'domain_table'
Note: for source_type = 'file', DomainClass does not apply, the file needs to be mapped first.
type Int32Builder ¶
type Int32Builder struct {
// contains filtered or unexported fields
}
func (*Int32Builder) Append ¶
func (b *Int32Builder) Append(v any)
func (*Int32Builder) AppendEmptyValue ¶
func (b *Int32Builder) AppendEmptyValue()
func (*Int32Builder) AppendNull ¶
func (b *Int32Builder) AppendNull()
func (*Int32Builder) NewArray ¶
func (b *Int32Builder) NewArray() arrow.Array
func (*Int32Builder) Release ¶
func (b *Int32Builder) Release()
func (*Int32Builder) Reserve ¶
func (b *Int32Builder) Reserve(n int)
type Int64Builder ¶
type Int64Builder struct {
// contains filtered or unexported fields
}
func (*Int64Builder) Append ¶
func (b *Int64Builder) Append(v any)
func (*Int64Builder) AppendEmptyValue ¶
func (b *Int64Builder) AppendEmptyValue()
func (*Int64Builder) AppendNull ¶
func (b *Int64Builder) AppendNull()
func (*Int64Builder) NewArray ¶
func (b *Int64Builder) NewArray() arrow.Array
func (*Int64Builder) Release ¶
func (b *Int64Builder) Release()
func (*Int64Builder) Reserve ¶
func (b *Int64Builder) Reserve(n int)
type JetRdfSession ¶
type JetRdfSession interface {
GetResourceManager() JetResourceManager
JetResources() *JetResources
Insert(s, p RdfNode, o RdfNode) error
Erase(s, p RdfNode, o RdfNode) (bool, error)
Retract(s, p RdfNode, o RdfNode) (bool, error)
Contains(s, p RdfNode, o RdfNode) bool
ContainsSP(s, p RdfNode) bool
GetObject(s, p RdfNode) RdfNode
FindSPO(s, p, o RdfNode) TripleIterator
FindSP(s, p RdfNode) TripleIterator
FindS(s RdfNode) TripleIterator
Find() TripleIterator
NewReteSession(ruleset string) (JetReteSession, error)
Release() error
}
type JetResourceManager ¶
type JetResourceManager interface {
RdfNull() RdfNode
CreateBNode(key int) RdfNode
NewDateLiteral(data string) RdfNode
NewDateDetails(year, month, day int) RdfNode
NewDatetimeLiteral(data string) RdfNode
NewDatetimeDetails(year, month, day, hour, min, sec int) RdfNode
NewDoubleLiteral(x float64) RdfNode
NewIntLiteral(data int) RdfNode
NewUIntLiteral(data uint) RdfNode
NewResource(name string) RdfNode
NewTextLiteral(data string) RdfNode
}
type JetResources ¶
type JetResources struct {
Jets__client RdfNode
Jets__completed RdfNode
Jets__currentSourcePeriod RdfNode
Jets__currentSourcePeriodDate RdfNode
Jets__entity_property RdfNode
Jets__exception RdfNode
Jets__from RdfNode
Jets__input_record RdfNode
Jets__istate RdfNode
Jets__key RdfNode
Jets__length RdfNode
Jets__lookup_multi_rows RdfNode
Jets__lookup_row RdfNode
Jets__loop RdfNode
Jets__max_vertex_visits RdfNode
Jets__operator RdfNode
Jets__org RdfNode
Jets__range_value RdfNode
Jets__replace_chars RdfNode
Jets__replace_with RdfNode
Jets__source_period_sequence RdfNode
Jets__sourcePeriodType RdfNode
Jets__state RdfNode
Jets__value_property RdfNode
Rdf__type RdfNode
}
func NewJetResources ¶
func NewJetResources(je JetResourceManager) *JetResources
func (*JetResources) Initialize ¶
func (jr *JetResources) Initialize(je JetResourceManager)
type JetReteSession ¶
type JetRuleEngine ¶
type JetRuleEngine interface {
MainRuleFile() string
JetResources() *JetResources
GetMetaGraphTriples() []string
GetMetaResourceManager() JetResourceManager
Insert(s, p RdfNode, o RdfNode) error
NewRdfSession() (JetRdfSession, error)
Release() error
}
func GetJetRuleEngine ¶
func GetJetRuleEngine(reFactory JetRulesFactory, dbpool *pgxpool.Pool, processName string, isDebug bool) ( ruleEngine JetRuleEngine, err error)
Function to get the JetRuleEngine for a rule process
type JetRulesFactory ¶
type JetRulesProxy ¶
type JetRulesProxy interface {
GetDefaultFactory() JetRulesFactory
GetGoFactory() JetRulesFactory
GetNativeFactory() JetRulesFactory
}
type JetrulesOutputChan ¶
type JetrulesOutputChan struct {
ClassName string
ColumnEvaluators []TransformationColumnEvaluator
OutputCh *OutputChannel
}
type JetrulesSpec ¶
type JetrulesSpec struct {
ProcessName string `json:"process_name,omitempty"`
UseJetRulesNative bool `json:"use_jet_rules_native,omitzero"`
UseJetRulesGo bool `json:"use_jet_rules_go,omitzero"`
InputRdfType string `json:"input_rdf_type,omitempty"`
MaxInputCount int `json:"max_input_count,omitzero"`
PoolSize int `json:"pool_size,omitzero"`
MaxReteSessionsSaved int `json:"max_rete_sessions_saved,omitzero"`
MaxLooping int `json:"max_looping,omitzero"`
CurrentSourcePeriod int `json:"current_source_period,omitzero"`
CurrentSourcePeriodDate string `json:"current_source_period_date,omitempty"`
CurrentSourcePeriodType string `json:"current_source_period_type,omitempty"`
RuleConfig []map[string]any `json:"rule_config,omitempty"`
MetadataInputSources []CsvSourceSpec `json:"metadata_input_sources,omitempty"`
IsDebug bool `json:"is_debug,omitzero"`
OutputChannels []OutputChannelConfig `json:"output_channels,omitempty"`
}
JetrulesSpec configuration ProcessName is the jetrules process name to use. UseJetRulesNative when true use the jetrules native engine. UseJetRulesGo when true use the jetrules go engine. Note: only one of UseJetRulesNative or UseJetRulesGo can be true. When both are false, the default is to use the jetrules native engine when available. InputRdfType is the rdf type (class name) of the input records. MaxInputCount is the max nbr of input records to process. PoolSize is the nbr of worker pool size. MaxReteSessionsSaved is the max nbr of rete sessions to save in err table. CurrentSourcePeriod is the source period key to use for this process. CurrentSourcePeriodDate is the source period date (aka file period date) to use this process. CurrentSourcePeriodType is the source period type (day, week or, month) to use for this process. RuleConfig provide additional configuration for jetrules processing. MetadataInputSources provide the list of csv sources to load as metadata input sources for jetrules processing. IsDebug when true enable debug mode for jetrules processing. MaxLooping overrides the value in the jetrules metastore.
type JetrulesTransformationPipe ¶
type JetrulesTransformationPipe struct {
// contains filtered or unexported fields
}
func (*JetrulesTransformationPipe) Apply ¶
func (ctx *JetrulesTransformationPipe) Apply(input *[]interface{}) error
Implementing interface PipeTransformationEvaluator Each call to Apply, the input correspond to a rdf session on which to Apply the jetrules see jetrules_pool_worker.go for worker implementation
func (*JetrulesTransformationPipe) Done ¶
func (ctx *JetrulesTransformationPipe) Done() error
func (*JetrulesTransformationPipe) Finally ¶
func (ctx *JetrulesTransformationPipe) Finally()
type JetrulesWorkerResult ¶
type JetsPartitionInfo ¶
type JrPoolManager ¶
type JrPoolManager struct {
WorkersTaskCh chan []interface{}
WaitForDone *sync.WaitGroup
// contains filtered or unexported fields
}
JrPoolManager manage a pool of workers to execute rules in parallel jrPoolWg is a wait group of the workers. The WorkersTaskCh is closed in jetrules operator
type JrPoolWorker ¶
type JrPoolWorker struct {
// contains filtered or unexported fields
}
func NewJrPoolWorker ¶
func NewJrPoolWorker(config *JetrulesSpec, source *InputChannel, rdfType2Columns map[string][]string, re JetRuleEngine, outputChannels []*JetrulesOutputChan, done chan struct{}, errCh chan error) *JrPoolWorker
func (*JrPoolWorker) DoWork ¶
func (ctx *JrPoolWorker) DoWork(mgr *JrPoolManager, resultCh chan JetrulesWorkerResult)
type KeywordCount ¶
func NewKeywordCount ¶
func NewKeywordCount(name string, keywords []string) *KeywordCount
type KeywordTokenNode ¶
type LoadFromS3FilesResult ¶
type LookupColumnSpec ¶
type LookupCount ¶
func NewLookupCount ¶
func NewLookupCount(name string) *LookupCount
type LookupSpec ¶
type LookupSpec struct {
// type range: sql_lookup, s3_csv_lookup
Key string `json:"key"`
Type string `json:"type"`
Query string `json:"query,omitempty"` // for sql_lookup
CsvSource *CsvSourceSpec `json:"csv_source,omitzero"` //for s3_csv_lookup
Columns []TableColumnSpec `json:"columns,omitempty"`
LookupKey []string `json:"lookup_key,omitempty"`
LookupValues []string `json:"lookup_values,omitempty"`
}
func SelectActiveLookupTable ¶
func SelectActiveLookupTable(lookupConfig []*LookupSpec, pipeConfig []PipeSpec) ([]*LookupSpec, error)
Function to prune the lookupConfig and return only the lookup used in the pipeConfig Returns an error if pipeConfig has reference to a lookup not in lookupConfig
type LookupTable ¶
type LookupTable interface {
// Returns the lookup row associated with key
Lookup(key *string) (*[]any, error)
// Returns the row's value associated with the lookup column
LookupValue(row *[]any, columnName string) (any, error)
// Returns the mapping between column name to pos in the returned row
ColumnMap() map[string]int
// Return true if the table is empty, ColumnMap is empty as well
IsEmptyTable() bool
// Return the number of rows in the lookup table
Size() int64
}
func NewLookupTableS3 ¶
func NewLookupTableS3(_ *pgxpool.Pool, spec *LookupSpec, env map[string]any, isVerbose bool) (LookupTable, error)
func NewLookupTableSql ¶
func NewLookupTableSql(dbpool *pgxpool.Pool, spec *LookupSpec, env map[string]any, isVerbose bool) (LookupTable, error)
type LookupTableManager ¶
type LookupTableManager struct {
LookupTableMap map[string]LookupTable
// contains filtered or unexported fields
}
func NewLookupTableManager ¶
func NewLookupTableManager(spec []*LookupSpec, envSettings map[string]any, isVerbose bool) *LookupTableManager
func (*LookupTableManager) PrepareLookupTables ¶
func (mgr *LookupTableManager) PrepareLookupTables(dbpool *pgxpool.Pool) error
type LookupTableS3 ¶
type LookupTableS3 struct {
// contains filtered or unexported fields
}
data is the mapping of the looup key -> values columnsMap is the mapping of the return column name -> position in the returned row (values)
func (*LookupTableS3) ColumnMap ¶
func (tbl *LookupTableS3) ColumnMap() map[string]int
func (*LookupTableS3) IsEmptyTable ¶
func (tbl *LookupTableS3) IsEmptyTable() bool
Return true only if there was no files found on s3
func (*LookupTableS3) LookupValue ¶
func (tbl *LookupTableS3) LookupValue(row *[]any, columnName string) (any, error)
type LookupTableSql ¶
type LookupTableSql struct {
// contains filtered or unexported fields
}
data is the mapping of the looup key -> values columnsMap is the mapping of the return column name -> position in the returned row (values)
func (*LookupTableSql) ColumnMap ¶
func (tbl *LookupTableSql) ColumnMap() map[string]int
func (*LookupTableSql) IsEmptyTable ¶
func (tbl *LookupTableSql) IsEmptyTable() bool
Not applicable to sql lookup, only to s3 lookup
func (*LookupTableSql) LookupValue ¶
func (tbl *LookupTableSql) LookupValue(row *[]any, columnName string) (any, error)
func (*LookupTableSql) Size ¶
func (tbl *LookupTableSql) Size() int64
Return size of the lookup table
type LookupTokenNode ¶
type LookupTokenNode struct {
Name string `json:"lookup_name"`
KeyRe string `json:"key_re,omitempty"`
Tokens []string `json:"tokens,omitempty"`
MultiTokensMatch []MultiTokensNode `json:"multi_tokens_match,omitempty"`
}
Matching values using a lookup table. Typically values in lookup table does not have spaces. MultiTokensMatch: Matching composite values, separated by space(s)
type LookupTokensState ¶
type LookupTokensState struct {
LookupTbl LookupTable
KeyRe *regexp.Regexp
LookupMatch map[string]*LookupCount
MultiTokensMatch []MultiTokensNode
}
func NewLookupTokensState ¶
func NewLookupTokensState(lookupTbl LookupTable, lookupNode *LookupTokenNode) (*LookupTokensState, error)
func (*LookupTokensState) LookupValue ¶
func (state *LookupTokensState) LookupValue(value *string) ([]string, error)
func (*LookupTokensState) NewValue ¶
func (state *LookupTokensState) NewValue(value *string) error
type MapExpression ¶
type MapRecordSpec ¶
type MapRecordTransformationPipe ¶
type MapRecordTransformationPipe struct {
// contains filtered or unexported fields
}
func (*MapRecordTransformationPipe) Apply ¶
func (ctx *MapRecordTransformationPipe) Apply(input *[]any) error
Implementing interface PipeTransformationEvaluator
func (*MapRecordTransformationPipe) Done ¶
func (ctx *MapRecordTransformationPipe) Done() error
func (*MapRecordTransformationPipe) Finally ¶
func (ctx *MapRecordTransformationPipe) Finally()
type MergeCurrentValue ¶
type MergeCurrentValue struct {
// contains filtered or unexported fields
}
type MergeFileReader ¶
type MergeFileReader struct {
// contains filtered or unexported fields
}
MergeFileReader provides a reader that conforms to io.Reader interface that reads content from partfiles and makes it available to the s3 manager via the Read interface
type MergeFileSpec ¶
type MergeFileSpec struct {
FirstPartitionHasHeaders bool `json:"first_partition_has_headers,omitempty"` // splitter column
}
MergeFileSpec configuration FirstPartitionHasHeaders: when true, the first partitions has the headers (considered for csv to determine if use s3 multipart copy).
type MergeSpec ¶
type MergeSpec struct {
IsDebug bool `json:"is_debug,omitzero"`
MainGroupBy GroupBySpec `json:"main_group_by"`
MergeGroupBy []*GroupBySpec `json:"merge_group_by,omitempty"`
}
type MergeTransformationPipe ¶
type MergeTransformationPipe struct {
// contains filtered or unexported fields
}
Merge operator. Merge the input records from multiple input channels into one output channel.
func (*MergeTransformationPipe) Apply ¶
func (ctx *MergeTransformationPipe) Apply(input *[]any) error
Implementing interface PipeTransformationEvaluator
func (*MergeTransformationPipe) Done ¶
func (ctx *MergeTransformationPipe) Done() error
func (*MergeTransformationPipe) Finally ¶
func (ctx *MergeTransformationPipe) Finally()
type MetricsSpec ¶
type MinMaxValue ¶
type MinMaxValue struct {
MinValue string
MaxValue string
MinMaxType string
HitRatio float64
NbrSamples int
}
MinValue and MaxValue are expressed as string but represent one of:
- min/max date when MinMaxType == "date"
- min/max double when MinMaxType == "double"
- min/max length when MinMaxType == "text"
type MultiTokensNode ¶
type MultiTokensNode struct {
Name string `json:"name"`
NbrTokens int `json:"nbr_tokens"`
Tokens []string `json:"tokens"`
// contains filtered or unexported fields
}
To identify value made of multiple tokens for example, full name. The value is split on spaces and punctuation characters and single character words are removed. Name: the name of the feature (ie output column name) Tokens: each splitted value must match at least one token
type OutputChannel ¶
type OutputChannel struct {
Name string
Channel chan<- []any
Columns *map[string]int
Config *ChannelSpec
}
type OutputChannelConfig ¶
type OutputChannelConfig struct {
// Type range: memory (default), stage, output, sql
// Format: csv, headerless_csv, etc.
// NbrRowsInRecord: nbr of rows in record (format: parquet)
// Compression: none, snappy (default).
// UseInputParquetSchema to use the same schema as the input file.
// UseOriginalHeaders to use the headers from the input file (csv only).
// Must have save_parquet_schema = true in the cpipes first input_channel.
// OutputLocation: jetstore_s3_input, jetstore_s3_output (default), or custom location.
// When OutputLocation is jetstore_s3_input it will also write to the input bucket.
// When OutputLocation uses a custom location, it replaces KeyPrefix and FileName.
// OutputLocation must ends with "/" if we want to use default file name
// (i.e. OutputLocation does not include the file name).
// Note: refactoring using FileConfig.FileKey is synonym to OutputLocation
// KeyPrefix is optional, default to $PATH_FILE_KEY.
// Use $CURRENT_PARTITION_LABEL in KeyPrefix and FileName to substitute with
// current partition label.
// Other available env substitution:
// $FILE_KEY main input file key.
// $SESSIONID current session id.
// $PROCESS_NAME current process name.
// $PATH_FILE_KEY file key path portion.
// $NAME_FILE_KEY file key file name portion (empty when in part files mode).
// $SHARD_ID current node id.
// $JETS_PARTITION_LABEL current node partition label.
FileConfig
Type string `json:"type"`
Name string `json:"name,omitempty"`
UseOriginalHeaders bool `json:"use_original_headers,omitzero"` // Type output
UseInputParquetSchema bool `json:"use_input_parquet_schema,omitzero"` // Type stage,output
SchemaProvider string `json:"schema_provider,omitempty"` // Type stage,output, alt to Format
WriteStepId string `json:"write_step_id,omitempty"` // Type stage
OutputTableKey string `json:"output_table_key,omitempty"` // Type sql
FileKey2 string `json:"output_location,omitempty"` // Type output
SpecName string `json:"channel_spec_name,omitempty"`
}
func (OutputChannelConfig) OutputLocation ¶
func (r OutputChannelConfig) OutputLocation() string
Note: refactoring using FileConfig.FileKey is synonym to OutputLocation
func (*OutputChannelConfig) SetOutputLocation ¶
func (r *OutputChannelConfig) SetOutputLocation(s string)
type OutputFileSpec ¶
type OutputFileSpec struct {
// OutputLocation: jetstore_s3_input, jetstore_s3_stage, jetstore_s3_output (default),
// or custom file key (the lasy option is depricated, use FileKey).
// When OutputLocation has a custom file key, it replace Name and KeyPrefix.
// Note: refactoring using FileConfig.FileKey is synonym to OutputLocation
// Note: refactoring using FileConfig.FileName is synonym to Name
// KeyPrefix is optional, default to input file key path in OutputLocation.
// Name is file name (required or via OutputLocation).
// Headers overrides the headers from the input_channel's spec or
// from the schema_provider.
// Schema provider indicates if put the header line or not.
// The input channel's schema provider indicates what delimiter
// to use on the header line.
FileConfig
Key string `json:"key"`
FileName2 string `json:"name,omitempty"`
FileKey2 string `json:"output_location,omitempty"`
SchemaProvider string `json:"schema_provider,omitempty"`
UseOriginalHeaders bool `json:"use_original_headers,omitzero"`
Headers []string `json:"headers,omitempty"`
}
func GetOutputFileConfig ¶
func GetOutputFileConfig(cpConfig *ComputePipesConfig, outputFileKey string) *OutputFileSpec
func (OutputFileSpec) Name ¶
func (r OutputFileSpec) Name() string
Note: refactoring using FileConfig.FileName is synonym to Name
func (OutputFileSpec) OutputLocation ¶
func (r OutputFileSpec) OutputLocation() string
Note: refactoring using FileConfig.FileKey is synonym to OutputLocation
func (*OutputFileSpec) SetName ¶
func (r *OutputFileSpec) SetName(s string)
func (*OutputFileSpec) SetOutputLocation ¶
func (r *OutputFileSpec) SetOutputLocation(s string)
type ParquetSchemaInfo ¶
type ParquetSchemaInfo struct {
Fields []*FieldInfo `json:"fields,omitempty"`
// contains filtered or unexported fields
}
func BuildParquetSchemaInfo ¶
func BuildParquetSchemaInfo(columns []string) *ParquetSchemaInfo
func NewEmptyParquetSchemaInfo ¶
func NewEmptyParquetSchemaInfo() *ParquetSchemaInfo
func NewParquetSchemaInfo ¶
func NewParquetSchemaInfo(schema *arrow.Schema) *ParquetSchemaInfo
func (*ParquetSchemaInfo) ArrowSchema ¶
func (psi *ParquetSchemaInfo) ArrowSchema() *arrow.Schema
func (*ParquetSchemaInfo) Columns ¶
func (psi *ParquetSchemaInfo) Columns() []string
func (*ParquetSchemaInfo) CreateBuilders ¶
func (psi *ParquetSchemaInfo) CreateBuilders(pool *memory.GoAllocator) ([]ArrayBuilder, error)
type ParseDateFTSpec ¶
type ParseDateFTSpec struct {
Token string `json:"token"`
YearLessThan int `json:"year_less_than,omitzero"`
YearGreaterThan int `json:"year_greater_than,omitzero"`
}
The date format is using a reference date of Mon Jan 2 15:04:05 MST 2006 (see https://pkg.go.dev/time#Layout) Will use DateFormat when provided, otherwise take the format from the Schema Provider when avail. Will default to DefaultDateFormat or will use the jetstore date parser if no default provided or if UseJetstoreParser is true. year_less_than and year_greater_than is an additional condition to the match result.
func (ParseDateFTSpec) CheckYearRange ¶
func (pd ParseDateFTSpec) CheckYearRange(tm time.Time) bool
Match implements the match function, returns true when match.
type ParseDateMatchFunction ¶
type ParseDateMatchFunction struct {
// contains filtered or unexported fields
}
ParseDateMatchFunction is a match function to vaidate dates. ParseDateMatchFunction implements FunctionCount interface
func NewParseDateMatchFunction ¶
func NewParseDateMatchFunction(fspec *FunctionTokenNode, sp SchemaProvider) (*ParseDateMatchFunction, error)
func (*ParseDateMatchFunction) Done ¶
func (p *ParseDateMatchFunction) Done(ctx *AnalyzeTransformationPipe, outputRow []any) error
func (*ParseDateMatchFunction) GetMinMaxValues ¶
func (p *ParseDateMatchFunction) GetMinMaxValues() *MinMaxValue
func (*ParseDateMatchFunction) NewValue ¶
func (p *ParseDateMatchFunction) NewValue(value string)
ParseDateMatchFunction implements FunctionCount interface
type ParseDateSpec ¶
type ParseDateSpec struct {
DateFormatToken string `json:"date_format_token,omitempty"`
OtherDateFormatToken string `json:"other_date_format_token,omitempty"`
DateSamplingMaxCount int `json:"sampling_max_count,omitzero"`
DateFormats []string `json:"date_formats,omitempty"`
OtherDateFormats []string `json:"other_date_formats,omitempty"`
MinMaxDateFormat string `json:"minmax_date_format,omitempty"`
ParseDateArguments []ParseDateFTSpec `json:"parse_date_args,omitempty"`
UseJetstoreParser bool `json:"use_jetstore_date_parser,omitzero"`
}
DateFormatToken: output column name for listing up to 3 formats used in file. OtherDateFormatToken: output column name to put the count of other format used in file. DateSamplingMaxCount: nbr of samples to use for determining the date format. DateFormats: list of date formats to use for parsing the date. OtherDateFormats: list of other date formats to use for parsing the date when DateFormatToken does not match (which are undesirable formats). MinMaxDateFormat: format used in output report for min/max dates. ParseDateArguments: list of parse date function token spec. UseJetstoreParser: when true it will use only the jetstore date parser. Identify top date format matches, up to 3. The first match must account for 75% of total date matches. Identify other date matches, each must match 98% of total date matches.
type ParseDoubleMatchFunction ¶
type ParseDoubleMatchFunction struct {
// contains filtered or unexported fields
}
func NewParseDoubleMatchFunction ¶
func NewParseDoubleMatchFunction(fspec *FunctionTokenNode) (*ParseDoubleMatchFunction, error)
func (*ParseDoubleMatchFunction) Done ¶
func (p *ParseDoubleMatchFunction) Done(ctx *AnalyzeTransformationPipe, outputRow []any) error
func (*ParseDoubleMatchFunction) GetMinMaxValues ¶
func (p *ParseDoubleMatchFunction) GetMinMaxValues() *MinMaxValue
func (*ParseDoubleMatchFunction) NewValue ¶
func (p *ParseDoubleMatchFunction) NewValue(value string)
ParseDoubleMatchFunction implements FunctionCount interface
type ParseTextMatchFunction ¶
type ParseTextMatchFunction struct {
// contains filtered or unexported fields
}
func NewParseTextMatchFunction ¶
func NewParseTextMatchFunction(fspec *FunctionTokenNode) (*ParseTextMatchFunction, error)
func (*ParseTextMatchFunction) Done ¶
func (p *ParseTextMatchFunction) Done(ctx *AnalyzeTransformationPipe, outputRow []any) error
func (*ParseTextMatchFunction) GetMinMaxValues ¶
func (p *ParseTextMatchFunction) GetMinMaxValues() *MinMaxValue
func (*ParseTextMatchFunction) NewValue ¶
func (p *ParseTextMatchFunction) NewValue(value string)
ParseTextMatchFunction implements FunctionCount interface
type PartitionWriterSpec ¶
type PartitionWriterSpec struct {
DeviceWriterType string `json:"device_writer_type,omitempty"`
JetsPartitionKey *string `json:"jets_partition_key,omitzero"`
PartitionSize int `json:"partition_size,omitzero"`
SamplingRate int `json:"sampling_rate,omitzero"`
SamplingMaxCount int `json:"sampling_max_count,omitzero"`
StreamDataOut bool `json:"stream_data_out,omitzero"`
}
DeviceWriterType range: csv_writer, parquet_writer, fixed_width_writer JetsPartitionKey used by partition_writer as the default value for jet_partition use $JETS_PARTITION_LABEL for current node input partition When StreamDataOut is true, data is stream to s3 rather than written locally and then copied to s3. Useful for large files that would exceed local storage capacity.
type PartitionWriterTransformationPipe ¶
type PartitionWriterTransformationPipe struct {
// contains filtered or unexported fields
}
func (*PartitionWriterTransformationPipe) Apply ¶
func (ctx *PartitionWriterTransformationPipe) Apply(input *[]any) error
Implementing interface PipeTransformationEvaluator Delegating to applyInternal to support blundled input for group by transformation
func (*PartitionWriterTransformationPipe) Done ¶
func (ctx *PartitionWriterTransformationPipe) Done() error
Done writing the splitter partition
- Close the current ctx.currentDeviceCh to flush the data, update totalRowCount
- Write to db the nodeId of this partition: session_id, shard, jets_partition
- Send the total row count to ctx.copy2DeviceResultCh
Not called if the process has error upstream (see pipe_executor_splitter.go)
func (*PartitionWriterTransformationPipe) Finally ¶
func (ctx *PartitionWriterTransformationPipe) Finally()
Always called, if error or not upstream
type PathSubstitution ¶
type PipeSpec ¶
type PipeSpec struct {
// Type range: fan_out, splitter, merge_files
Type string `json:"type"`
InputChannel InputChannelConfig `json:"input_channel"`
SplitterConfig *SplitterSpec `json:"splitter_config,omitzero"`
MergeFileConfig *MergeFileSpec `json:"merge_file_config,omitzero"`
Apply []TransformationSpec `json:"apply"`
OutputFile *string `json:"output_file,omitzero"` // for merge_files
}
type PreprocessingFunction ¶
type PreprocessingFunction interface {
ApplyPF(buf *bytes.Buffer, input *[]any) error
String() string
}
type RdfNode ¶
type RdfNode interface {
Hdle() any
IsNil() bool
Value() any
Equals(other RdfNode) bool
String() string
}
func NewRdfNode ¶
func NewRdfNode(inValue any, re JetResourceManager) (RdfNode, error)
func ParseRdfNodeValue ¶
func ParseRdfNodeValue(re JetResourceManager, value, rdfType string) (node RdfNode, err error)
type RegexCount ¶
func NewRegexCount ¶
func NewRegexCount(re *regexp.Regexp, useScrubbedValue bool) *RegexCount
type RemoveMiPF ¶
type RemoveMiPF struct {
// contains filtered or unexported fields
}
RemoveMiPF remove last 2 char if last-1 is a space, e.g. "michel f" becomes "michel"
func (*RemoveMiPF) String ¶
func (pf *RemoveMiPF) String() string
type ReportCmdSpec ¶
type ReportCmdSpec struct {
Type string `json:"type"`
S3CopyFileConfig *S3CopyFileSpec `json:"s3_copy_file_config,omitzero"`
When *ExpressionNode `json:"when,omitzero"`
}
Commands for the run_report step Type range: s3_copy_file When is an optional expression to determine if the command is to be executed. S3CopyFileConfig provides the configuration for s3_copy_file command.
type RuleEngineConfig ¶
type S3CopyFileSpec ¶
type S3CopyFileSpec struct {
SourceBucket string `json:"src_bucket,omitempty"`
SourceKey string `json:"src_key,omitempty"`
DestinationBucket string `json:"dest_bucket,omitempty"`
DestinationKey string `json:"dest_key,omitempty"`
WorkerPoolSize int `json:"worker_pool_size,omitzero"`
}
ReportCommand to copy file from s3 to s3 Default WorkerPoolSize is calculated based on number of tasks
type S3DeviceManager ¶
type S3DeviceManager struct {
WorkersTaskCh chan S3Object
ClientsWg *sync.WaitGroup
JetStoreTempFolder string
// contains filtered or unexported fields
}
S3DeviceManager manage a pool of workers to put file to s3. ClientWg is a wait group of the partition writers created during BuildComputeGraph function. The WorkersTaskCh is closed in process_file.go
type S3DeviceWorker ¶
type S3DeviceWorker struct {
// contains filtered or unexported fields
}
func NewS3DeviceWorker ¶
func NewS3DeviceWorker(s3Uploader *manager.Uploader, done chan struct{}, errCh chan error) *S3DeviceWorker
func (*S3DeviceWorker) DoWork ¶
func (ctx *S3DeviceWorker) DoWork(mgr *S3DeviceManager, resultCh chan ComputePipesResult)
type S3DeviceWriter ¶
type S3DeviceWriter struct {
// contains filtered or unexported fields
}
func (*S3DeviceWriter) WriteCsvPartition ¶
func (ctx *S3DeviceWriter) WriteCsvPartition(fout io.Writer)
func (*S3DeviceWriter) WriteFixedWidthPartition ¶
func (ctx *S3DeviceWriter) WriteFixedWidthPartition(fout io.Writer)
func (*S3DeviceWriter) WriteParquetPartitionV2 ¶
func (ctx *S3DeviceWriter) WriteParquetPartitionV2(fout io.Writer)
func (*S3DeviceWriter) WritePartition ¶
func (ctx *S3DeviceWriter) WritePartition(writer func(w io.Writer))
WritePartition is main write function that coordinates between writing the partition to a temp file locally or stream the data directly to s3.
type SaveResultsContext ¶
type SaveResultsContext struct {
JetsPartition string
NodeId int
SessionId string
// contains filtered or unexported fields
}
func NewSaveResultsContext ¶
func NewSaveResultsContext(dbpool *pgxpool.Pool) *SaveResultsContext
func (*SaveResultsContext) Save ¶
func (ctx *SaveResultsContext) Save(category string, result *ComputePipesResult)
type SchemaColumnSpec ¶
type SchemaManager ¶
type SchemaManager struct {
// contains filtered or unexported fields
}
func NewSchemaManager ¶
func NewSchemaManager(spec []*SchemaProviderSpec, envSettings map[string]any, isDebugMode bool) *SchemaManager
func (*SchemaManager) GetSchemaProvider ¶
func (sm *SchemaManager) GetSchemaProvider(key string) SchemaProvider
func (*SchemaManager) PrepareSchemaProviders ¶
func (sm *SchemaManager) PrepareSchemaProviders(dbpool *pgxpool.Pool) error
type SchemaProvider ¶
type SchemaProvider interface {
Initialize(dbpool *pgxpool.Pool, spec *SchemaProviderSpec,
envSettings map[string]any, isDebugMode bool) error
Key() string
Env() map[string]any
AdjustColumnWidth(width map[string]int) error
BadRowsConfig() *BadRowsSpec
BlankFieldMarkers() *BlankFieldMarkers
Bucket() string
ColumnNames() []string
Columns() []SchemaColumnSpec
Compression() string
Delimiter() rune
DetectEncoding() bool
DiscardFileHeaders() bool
DomainClass() string
DomainKeys() map[string]any
DropExcedentHeaders() bool
Encoding() string
EnforceRowMaxLength() bool
EnforceRowMinLength() bool
FixedWidthEncodingInfo() *FixedWidthEncodingInfo
FixedWidthFileHeaders() ([]string, string)
Format() string
InputFormatDataJson() string
IsPartFiles() bool
NbrRowsInRecord() int64
NoQuotes() bool
OutputEncoding() string
ParquetSchema() *ParquetSchemaInfo
QuoteAllRecords() bool
ReadBatchSize() int64
ReadDateLayout() string
ReorderColumnsOnRead() []int
SchemaName() string
SetParquetSchema(schema *ParquetSchemaInfo)
TrimColumns() bool
UseLazyQuotes() bool
UseLazyQuotesSpecial() bool
VariableFieldsPerRecord() bool
WriteDateLayout() string
CapDobYears() int
SetDodToJan1() bool
}
func NewDefaultSchemaProvider ¶
func NewDefaultSchemaProvider() SchemaProvider
type SchemaProviderSpec ¶
type SchemaProviderSpec struct {
// Type range: default
// Key is schema provider key for reference by compute pipes steps
// Format: csv, headerless_csv, fixed_width, parquet, parquet_select,
// xlsx, headerless_xlsx
// Compression: none, snappy (parquet is always snappy).
// DetectEncoding: Detect file encoding (limited) for text file format.
// DetectCrAsEol: Detect if \r is used as eol (format: csv,headerless_csv).
// DiscardFileHeaders: when true, discard the headers from the input file (typically for csv format),
// this will force to use Headers or Columns from the configuration, or from the schema provider if Headers and Columns are not provided.
// EolByte: Byte to use as eol (format: csv,headerless_csv).
// MultiColumnsInput: Indicate that input file must have multiple columns,
// this is used to detect if the wrong delimiter is used (csv,headerless_csv).
// ReadBatchSize: nbr of rows to read per record (format: parquet).
// NbrRowsInRecord: nbr of rows in record (format: parquet).
// InputFormatDataJson: json config based on Format (typically used for xlsx).
// example: {"currentSheet": "Daily entry for Approvals"} (for xlsx).
// EnforceRowMinLength: when true, all columns must be in input record, otherwise missing columns are null.
// EnforceRowMaxLength: when true, no extra characters must exist past last field (applies to text format).
// Note EnforceRowMinLength and EnforceRowMaxLength apply to text format only (csv, headerless_csv, fixed_width).
// UseOriginSourceConfig: when true, use the source config from file_key components (client, org, object_type).
// Note origin session_id is from cpipes env $ORIGIN_SESSIONID
// BadRowsConfig: Specify how to handle bad rows when bot specified on InputChannelConfig.
// SourceType range: main_input, merged_input, historical_input (from input_source table)
// Columns: may be ommitted if fixed_width_columns_csv is provided or is a csv format
// Headers: alt to Columns, typically for csv format
// BlankFieldMarkers: specify markers for blank fields (any format, typically for csv format)
// GetPartitionsSize: when true, get the size of the partitions from s3
// CapDobYears: number of years to cap dob (date of birth) to today - for Anonymization.
// SetDodToJan1: set dod (date of death) to January 1st of the date year - for Anonymization.
// UseLazyQuotes, UseLazyQuotesSpecial, VariableFieldsPerRecord: see csv.NewReader.
// QuoteAllRecords will quote all records for csv writer.
// NoQuotes will no quote any records for csv writer (even if the record contains '"').
// Bucket and FileKey are location and source object (fileKey may be directory if IsPartFiles is true)
// KmsKey is kms key to use when writing output data. May be empty.
// RequestID is used for logging and tracking purpose.
// Contains properties to register FileKey with input_registry table:
// Client, Vendor, ObjectType, FileDate (does not apply to Jets_Loader).
// NotificationTemplatesOverrides have the following keys to override the templates defined
// in the deployment environment var: CPIPES_START_NOTIFICATION_JSON,
// CPIPES_COMPLETED_NOTIFICATION_JSON, and CPIPES_FAILED_NOTIFICATION_JSON.
//*TODO domain_keys_json
//*TODO code_values_mapping_json
FileConfig
Key string `json:"key"`
Type string `json:"type"`
FileSize int64 `json:"file_size,omitzero"`
KmsKey string `json:"kms_key_arn,omitempty"`
Client string `json:"client,omitempty"`
Vendor string `json:"vendor,omitempty"`
ObjectType string `json:"object_type,omitempty"`
RequestID string `json:"request_id,omitempty"`
UseOriginSourceConfig bool `json:"use_origin_source_config,omitempty"`
FileDate string `json:"file_date,omitempty"`
SourceType string `json:"source_type,omitempty"`
SchemaName string `json:"schema_name,omitempty"`
Columns []SchemaColumnSpec `json:"columns,omitempty"`
Headers []string `json:"headers,omitempty"`
CapDobYears int `json:"cap_dob_years,omitzero"`
SetDodToJan1 bool `json:"set_dod_to_jan1,omitzero"`
Env map[string]any `json:"env,omitempty"`
ReportCmds []ReportCmdSpec `json:"report_cmds,omitempty"`
NotificationTemplatesOverrides map[string]string `json:"notification_templates_overrides,omitempty"`
NotificationRoutingOverridesJson string `json:"notification_routing_overrides_json,omitempty"`
}
func GetSchemaProviderConfigByKey ¶
func GetSchemaProviderConfigByKey(schemaProviders []*SchemaProviderSpec, key string) *SchemaProviderSpec
func GetSchemaProviderConfigBySourceType ¶
func GetSchemaProviderConfigBySourceType(schemaProviders []*SchemaProviderSpec, sourceType string) *SchemaProviderSpec
type ShardFileKeyResult ¶
type ShardFileKeyResult struct {
// contains filtered or unexported fields
}
func ShardFileKeys ¶
func ShardFileKeys(exeCtx context.Context, dbpool *pgxpool.Pool, baseFileKey string, sessionId string, inputChannelConfig InputChannelConfig, clusterConfig *ClusterSpec, schemaProviderConfig *SchemaProviderSpec) (result ShardFileKeyResult, cpErr error)
ShardFileKeys: assign file keys to nodes for sharding mode according to inputChannelConfig and clusterConfig.
type ShufflingSpec ¶
type ShufflingSpec struct {
MaxInputSampleSize int `json:"max_input_sample_size,omitzero"`
OutputSampleSize int `json:"output_sample_size,omitzero"`
PadShortRowsWithNulls bool `json:"pad_short_rows_with_nulls,omitzero"`
FilterColumns *FilterColumnSpec `json:"filter_columns,omitzero"`
}
type ShufflingTransformationPipe ¶
type ShufflingTransformationPipe struct {
// contains filtered or unexported fields
}
Note: No columnEvaluators is used by this operator. inputColPos is the position of the retained input columns, if any filtering is done.
func (*ShufflingTransformationPipe) Apply ¶
func (ctx *ShufflingTransformationPipe) Apply(input *[]any) error
Implementing interface PipeTransformationEvaluator
func (*ShufflingTransformationPipe) Done ¶
func (ctx *ShufflingTransformationPipe) Done() error
Analysis complete, now send out the results to ctx.outputCh.
func (*ShufflingTransformationPipe) Finally ¶
func (ctx *ShufflingTransformationPipe) Finally()
type SortSpec ¶
type SortSpec struct {
DomainKey string `json:"domain_key,omitempty"`
SortByColumn []string `json:"sort_by,omitempty"`
IsDebug bool `json:"is_debug,omitzero"`
}
Sort using composite key sort_by column names making the composite key domain_key use the domain key info to compute the composite key
type SortTransformationPipe ¶
type SortTransformationPipe struct {
// contains filtered or unexported fields
}
sort operator.Sort the input records according to a composite key
func (*SortTransformationPipe) Apply ¶
func (ctx *SortTransformationPipe) Apply(input *[]any) error
Implementing interface PipeTransformationEvaluator
func (*SortTransformationPipe) Done ¶
func (ctx *SortTransformationPipe) Done() error
func (*SortTransformationPipe) Finally ¶
func (ctx *SortTransformationPipe) Finally()
type SourcesConfigSpec ¶
type SourcesConfigSpec struct {
MainInput *InputSourceSpec `json:"main_input"`
MergedInput []*InputSourceSpec `json:"merged_inputs"`
InjectedInput []*InputSourceSpec `json:"injected_inputs"`
}
SourcesConfigSpec contains carry over configuration from table source_config. It has provision for multiple input sources to be merged via domian keys
type SplitterSpec ¶
type SplitterSpec struct {
// Type range: standard (default), ext_count
// standard: split on Column / DefaultSplitterValue / ShardOn, create partition for each value
// ext_count: split on Column / DefaultSplitterValue / ShardOn + N, N = 0..ExtPartitionsCount-1
// where each partition has up to PartitionRowCount rows
Type string `json:"type,omitempty"`
Column string `json:"column,omitempty"` // splitter column
DefaultSplitterValue string `json:"default_splitter_value,omitempty"` // splitter default value
ShardOn *HashExpression `json:"shard_on,omitzero"` // splitter hash on the fly
PartitionRowCount int `json:"partition_row_count,omitzero"` // nbr of row for each ext partition
}
type StartComputePipesArgs ¶
type StartComputePipesArgs struct {
PipelineExecKey int `json:"pipeline_execution_key"`
FileKey string `json:"file_key,omitempty"`
SessionId string `json:"session_id,omitempty"`
StepId *int `json:"step_id,omitzero"`
ClusterInfo *ClusterShardingInfo `json:"cluster_sharding_info,omitzero"`
MainInputRowCount int64 `json:"main_input_row_count,omitzero"`
}
Argument to start_cp (start_sharding_cp, start_reducing_cp) for starting the cp cluster. MainInputRowCount is available in start_reducing_cp as the total number of records processed at step id 'reducing00'.
func (*StartComputePipesArgs) StartReducingComputePipes ¶
func (args *StartComputePipesArgs) StartReducingComputePipes(ctx context.Context, dbpool *pgxpool.Pool) (ComputePipesRun, error)
func (*StartComputePipesArgs) StartShardingComputePipes ¶
func (args *StartComputePipesArgs) StartShardingComputePipes(ctx context.Context, dbpool *pgxpool.Pool) (ComputePipesRun, *SchemaProviderSpec, error)
type StringBuilder ¶
type StringBuilder struct {
// contains filtered or unexported fields
}
func (*StringBuilder) Append ¶
func (b *StringBuilder) Append(v any)
func (*StringBuilder) AppendEmptyValue ¶
func (b *StringBuilder) AppendEmptyValue()
func (*StringBuilder) AppendNull ¶
func (b *StringBuilder) AppendNull()
func (*StringBuilder) NewArray ¶
func (b *StringBuilder) NewArray() arrow.Array
func (*StringBuilder) Release ¶
func (b *StringBuilder) Release()
func (*StringBuilder) Reserve ¶
func (b *StringBuilder) Reserve(n int)
type TableColumnSpec ¶
type TableSpec ¶
type TableSpec struct {
Key string `json:"key"`
Name string `json:"name"`
CheckSchemaChanged bool `json:"check_schema_changed,omitzero"`
Columns []TableColumnSpec `json:"columns,omitempty"`
ChannelSpecName string `json:"channel_spec_name,omitempty"`
}
ChannelSpecName specify the channel spec. Column provides metadata info
type TargetColumnsLookupSpec ¶
type TargetColumnsLookupSpec struct {
LookupName string `json:"lookup_name"`
DataClassificationColumn string `json:"data_classification_column,omitempty"`
Column1ClassificationValues []string `json:"column1_classification_values,omitempty"`
Column2ClassificationValues []string `json:"column2_classification_values,omitempty"`
}
type TimestampBuilder ¶
type TimestampBuilder struct {
// contains filtered or unexported fields
}
func (*TimestampBuilder) Append ¶
func (b *TimestampBuilder) Append(v any)
func (*TimestampBuilder) AppendEmptyValue ¶
func (b *TimestampBuilder) AppendEmptyValue()
func (*TimestampBuilder) AppendNull ¶
func (b *TimestampBuilder) AppendNull()
func (*TimestampBuilder) NewArray ¶
func (b *TimestampBuilder) NewArray() arrow.Array
func (*TimestampBuilder) Release ¶
func (b *TimestampBuilder) Release()
func (*TimestampBuilder) Reserve ¶
func (b *TimestampBuilder) Reserve(n int)
type TransformationColumnEvaluator ¶
type TransformationColumnEvaluator interface {
InitializeCurrentValue(currentValue *[]any)
Update(currentValue *[]any, input *[]any) error
Done(currentValue *[]any) error
}
Initialize and Done are intended for aggregate transformations column evaluators
type TransformationColumnSpec ¶
type TransformationColumnSpec struct {
// Type range: select, multi_select, value, eval, map, hash
// count, distinct_count, sum, min, case,
// map_reduce, lookup
Name string `json:"name"`
Type string `json:"type"`
Expr *string `json:"expr,omitempty"`
ExprArray []string `json:"expr_array,omitempty"`
MapExpr *MapExpression `json:"map_expr,omitzero"`
EvalExpr *ExpressionNode `json:"eval_expr,omitzero"`
HashExpr *HashExpression `json:"hash_expr,omitzero"`
Where *ExpressionNode `json:"where,omitzero"`
CaseExpr []CaseExpression `json:"case_expr,omitempty"` // case operator
ElseExpr []*ExpressionNode `json:"else_expr,omitempty"` // case operator
MapOn *string `json:"map_on,omitzero"`
AlternateMapOn []string `json:"alternate_map_on,omitempty"`
ApplyMap []TransformationColumnSpec `json:"apply_map,omitempty"`
ApplyReduce []TransformationColumnSpec `json:"apply_reduce,omitempty"`
LookupName *string `json:"lookup_name,omitzero"`
LookupKey []LookupColumnSpec `json:"key,omitempty"`
LookupValues []LookupColumnSpec `json:"values,omitempty"`
}
type TransformationSpec ¶
type TransformationSpec struct {
// Type range: map_record, aggregate, analyze, high_freq, partition_writer,
// anonymize, distinct, shuffling, group_by, filter, sort, merge, jetrules, clustering
// Format takes precedence over SchemaProvider's Format (from OutputChannelConfig)
Type string `json:"type"`
NewRecord bool `json:"new_record,omitzero"`
Columns []TransformationColumnSpec `json:"columns,omitempty"`
MapRecordConfig *MapRecordSpec `json:"map_record_config,omitzero"`
AnalyzeConfig *AnalyzeSpec `json:"analyze_config,omitzero"`
HighFreqColumns []*HighFreqSpec `json:"high_freq_columns,omitempty"` // Type high_freq
PartitionWriterConfig *PartitionWriterSpec `json:"partition_writer_config,omitzero"`
AnonymizeConfig *AnonymizeSpec `json:"anonymize_config,omitzero"`
DistinctConfig *DistinctSpec `json:"distinct_config,omitzero"`
ShufflingConfig *ShufflingSpec `json:"shuffling_config,omitzero"`
GroupByConfig *GroupBySpec `json:"group_by_config,omitzero"`
FilterConfig *FilterSpec `json:"filter_config,omitzero"`
SortConfig *SortSpec `json:"sort_config,omitzero"`
JetrulesConfig *JetrulesSpec `json:"jetrules_config,omitzero"`
ClusteringConfig *ClusteringSpec `json:"clustering_config,omitzero"`
MergeConfig *MergeSpec `json:"merge_config,omitzero"`
OutputChannel OutputChannelConfig `json:"output_channel"`
ConditionalConfig []*ConditionalTransformationSpec `json:"conditional_config,omitzero"`
}
type TripleIterator ¶
type Uint32Builder ¶
type Uint32Builder struct {
// contains filtered or unexported fields
}
func (*Uint32Builder) Append ¶
func (b *Uint32Builder) Append(v any)
func (*Uint32Builder) AppendEmptyValue ¶
func (b *Uint32Builder) AppendEmptyValue()
func (*Uint32Builder) AppendNull ¶
func (b *Uint32Builder) AppendNull()
func (*Uint32Builder) NewArray ¶
func (b *Uint32Builder) NewArray() arrow.Array
func (*Uint32Builder) Release ¶
func (b *Uint32Builder) Release()
func (*Uint32Builder) Reserve ¶
func (b *Uint32Builder) Reserve(n int)
type Uint64Builder ¶
type Uint64Builder struct {
// contains filtered or unexported fields
}
func (*Uint64Builder) Append ¶
func (b *Uint64Builder) Append(v any)
func (*Uint64Builder) AppendEmptyValue ¶
func (b *Uint64Builder) AppendEmptyValue()
func (*Uint64Builder) AppendNull ¶
func (b *Uint64Builder) AppendNull()
func (*Uint64Builder) NewArray ¶
func (b *Uint64Builder) NewArray() arrow.Array
func (*Uint64Builder) Release ¶
func (b *Uint64Builder) Release()
func (*Uint64Builder) Reserve ¶
func (b *Uint64Builder) Reserve(n int)
type WelfordAlgo ¶
func NewWelfordAlgo ¶
func NewWelfordAlgo() *WelfordAlgo
func (*WelfordAlgo) Finalize ¶
func (w *WelfordAlgo) Finalize() (mean, variance float64)
func (*WelfordAlgo) Update ¶
func (w *WelfordAlgo) Update(value float64)
type WriteTableSource ¶
type WriteTableSource struct {
// contains filtered or unexported fields
}
func NewWriteTableSource ¶
func NewWriteTableSource(source <-chan []any, tableIdentifier pgx.Identifier, columns []string) *WriteTableSource
func (*WriteTableSource) Err ¶
func (wt *WriteTableSource) Err() error
func (*WriteTableSource) Next ¶
func (wt *WriteTableSource) Next() bool
pgx.CopyFromSource interface
func (*WriteTableSource) Values ¶
func (wt *WriteTableSource) Values() ([]any, error)
func (*WriteTableSource) WriteTable ¶
func (wt *WriteTableSource) WriteTable(dbpool *pgxpool.Pool, done chan struct{}, copy2DbResultCh chan<- ComputePipesResult)
Methods for writing output entity records to postgres
Source Files
¶
- actions_common_model.go
- actions_coordinate_cp.go
- actions_env_var_evaluator.go
- actions_get_columns_from_file.go
- actions_load_files.go
- actions_load_main_input.go
- actions_load_merge_input.go
- actions_process_file.go
- actions_read_csv_file.go
- actions_read_fixedwidth_file.go
- actions_read_xlsx_file.go
- actions_s3_utils.go
- actions_shard_file_keys.go
- actions_start_common.go
- actions_start_reducing_cp.go
- actions_start_sharding_cp.go
- bad_rows_channel.go
- clustering_cluster_info.go
- clustering_correlation.go
- clustering_pool_manager.go
- clustering_pool_worker.go
- column_evaluators.go
- column_evaluators_aggregate.go
- column_evaluators_case_expr.go
- column_evaluators_hash.go
- column_evaluators_lookup.go
- column_evaluators_map_reduce.go
- column_evaluators_mapping.go
- compute_pipes.go
- compute_pipes_results.go
- datetime_parser.go
- eval_expression.go
- eval_operators.go
- eval_operators_other.go
- jetrules_interface.go
- jetrules_pool_manager.go
- jetrules_pool_worker.go
- jetrules_rdf.go
- jetrules_utils.go
- lookup_table_manager.go
- lookup_table_s3.go
- lookup_table_sql.go
- parquet_merge_file.go
- parquet_read_file.go
- parquet_schema_info.go
- parquet_utils.go
- parquet_write_file.go
- pipe_executor_fan_out.go
- pipe_executor_merge_files.go
- pipe_executor_splitter.go
- pipe_transformation_aggregate.go
- pipe_transformation_analyze.go
- pipe_transformation_anonymize.go
- pipe_transformation_clustering.go
- pipe_transformation_distinct.go
- pipe_transformation_filter.go
- pipe_transformation_group_by.go
- pipe_transformation_high_freq.go
- pipe_transformation_jetrules.go
- pipe_transformation_map_record.go
- pipe_transformation_merge.go
- pipe_transformation_partition_writer.go
- pipe_transformation_shuffling.go
- pipe_transformation_sort.go
- pipes_ast.go
- pipes_domain_key.go
- pipes_model.go
- pipes_preprocessing_fnc.go
- pipes_runtime_model.go
- runtime_metrics.go
- s3_device_manager.go
- s3_device_worker.go
- s3_device_writter.go
- s3_utils.go
- schema_providers.go
- utils_analyze.go
- utils_analyze_parse_date.go
- utils_analyze_parse_double.go
- utils_analyze_parse_text.go
- utils_csv.go
- utils_csv_source.go
- utils_fixed_width.go
- utils_xlsx.go
- write_table.go