Documentation
¶
Overview ¶
Copyright 2025 Specter Ops, Inc.
Licensed under the Apache License, Version 2.0 you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
SPDX-License-Identifier: Apache-2.0
Copyright 2025 Specter Ops, Inc.
Licensed under the Apache License, Version 2.0 you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
SPDX-License-Identifier: Apache-2.0
Copyright 2025 Specter Ops, Inc.
Licensed under the Apache License, Version 2.0 you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
SPDX-License-Identifier: Apache-2.0
Index ¶
- Constants
- func ConvertGenericEdge(entity ein.GenericEdge, converted *ConvertedData) error
- func ConvertGenericNode(entity ein.GenericNode, converted *ConvertedData) error
- func CreateIngestDecoder(reader io.ReadSeeker, key string, targetDepth int) (*json.Decoder, error)
- func DecodeGenericData[T any](batch *IngestContext, decoder *json.Decoder, sourceKind graph.Kind, ...) error
- func IngestAzureData(batch *IngestContext, converted ConvertedAzureData) error
- func IngestBasicData(batch *IngestContext, converted ConvertedData) error
- func IngestDNRelationships(batch *IngestContext, relationships []ein.IngestibleRelationship) error
- func IngestGenericData(batch *IngestContext, sourceKind graph.Kind, converted ConvertedData) error
- func IngestGroupData(batch *IngestContext, converted ConvertedGroupData) error
- func IngestNode(ic *IngestContext, baseKind graph.Kind, nextNode ein.IngestibleNode) error
- func IngestNodes(ingestCtx *IngestContext, baseKind graph.Kind, nodes []ein.IngestibleNode) error
- func IngestRelationships(ingestCtx *IngestContext, sourceKind graph.Kind, ...) error
- func IngestSessions(batch *IngestContext, sessions []ein.IngestibleSession) error
- func IngestWrapper(batch *IngestContext, reader io.ReadSeeker, meta ingest.OriginalMetadata, ...) error
- func InitializeIngestMetrics(registerer prometheus.Registerer) error
- func MergeNodeKinds(sourceKind graph.Kind, additionalKinds ...graph.Kind) []graph.Kind
- func PublishIngestThroughput(nodesProcessed, relsProcessed, nodesWritten, relsWritten int64, ...)
- func ReadFileForIngest(batch *IngestContext, reader io.ReadSeeker, options ReadOptions) error
- func SeekToKey(decoder *json.Decoder, key string, targetDepth int) error
- type AzureBase
- type BatchUpdater
- type ChangeManager
- type ConversionFunc
- type ConversionFuncWithTime
- type ConvertedAzureData
- type ConvertedData
- type ConvertedGroupData
- type ConvertedSessionData
- type GraphifyData
- type GraphifyService
- func (s *GraphifyService) NewIngestContext(ctx context.Context, ingestTime time.Time, useChangelog bool) *IngestContext
- func (s *GraphifyService) ProcessIngestFile(ic *IngestContext, task model.IngestTask) ([]IngestFileData, error)
- func (s *GraphifyService) ProcessTasks(updateJob UpdateJobFunc)
- func (s *GraphifyService) RegisterSourceKind(ctx context.Context) func(kind graph.Kind) error
- type IngestContext
- type IngestFileData
- type IngestOption
- type IngestStats
- type IngestUserDataError
- type ReadOptions
- type UpdateJobFunc
Constants ¶
const ( SerialError = "error deserializing %s: %v" ExtractError = "failed to extract owner id/type from directory object: %v" PrincipalTypeServicePrincipal = "ServicePrincipal" PrincipalTypeUser = "User" )
const ( IngestCountThreshold = 500 ReconcileProperty = "reconcile" )
Variables ¶
This section is empty.
Functions ¶
func ConvertGenericEdge ¶
func ConvertGenericEdge(entity ein.GenericEdge, converted *ConvertedData) error
func ConvertGenericNode ¶
func ConvertGenericNode(entity ein.GenericNode, converted *ConvertedData) error
func CreateIngestDecoder ¶
CreateIngestDecoder returns a JSON decoder that is positioned at the start of the array under the specified top-level key (e.g., "nodes", "edges", "data"). The returned decoder is ready to stream-decode each element of the array sequentially.
func DecodeGenericData ¶
func DecodeGenericData[T any](batch *IngestContext, decoder *json.Decoder, sourceKind graph.Kind, conversionFunc ConversionFunc[T]) error
func IngestAzureData ¶
func IngestAzureData(batch *IngestContext, converted ConvertedAzureData) error
func IngestBasicData ¶
func IngestBasicData(batch *IngestContext, converted ConvertedData) error
func IngestDNRelationships ¶
func IngestDNRelationships(batch *IngestContext, relationships []ein.IngestibleRelationship) error
func IngestGenericData ¶
func IngestGenericData(batch *IngestContext, sourceKind graph.Kind, converted ConvertedData) error
IngestGenericData writes generic graph data into the database using the provided batch. It attempts to ingest all nodes and relationships from the ConvertedData object.
Because generic entities do not have a predefined base kind (unlike AZ or AD), this function passes graph.EmptyKind to the node and relationship ingestion functions. This indicates that no base kind should be applied uniformly to all ingested entities, and instead the kind(s) defined directly on each node or edge (if any) are used as-is.
func IngestGroupData ¶
func IngestGroupData(batch *IngestContext, converted ConvertedGroupData) error
func IngestNode ¶
func IngestNode(ic *IngestContext, baseKind graph.Kind, nextNode ein.IngestibleNode) error
func IngestNodes ¶
func IngestNodes(ingestCtx *IngestContext, baseKind graph.Kind, nodes []ein.IngestibleNode) error
func IngestRelationships ¶
func IngestRelationships(ingestCtx *IngestContext, sourceKind graph.Kind, relationships []ein.IngestibleRelationship) error
IngestRelationships resolves and writes a batch of ingestible relationships to the graph.
This function first calls resolveRelationships to resolve node identifiers based on name and kind.
Each resolved relationship update is applied to the graph via batch.UpdateRelationshipBy. Errors encountered during resolution or update are collected and returned as a single combined error.
func IngestSessions ¶
func IngestSessions(batch *IngestContext, sessions []ein.IngestibleSession) error
func IngestWrapper ¶
func IngestWrapper(batch *IngestContext, reader io.ReadSeeker, meta ingest.OriginalMetadata, readOpts ReadOptions) error
IngestWrapper dispatches the ingest process based on the metadata's type.
func InitializeIngestMetrics ¶
func InitializeIngestMetrics(registerer prometheus.Registerer) error
InitializeIngestMetrics registers the ingestion throughput gauge with the Prometheus registry
func MergeNodeKinds ¶
MergeNodeKinds combines a source kind with any additional kinds, then removes any occurrences of graph.EmptyKind from the result. Ensures a clean, usable kind list for downstream logic.
func PublishIngestThroughput ¶
func PublishIngestThroughput(nodesProcessed, relsProcessed, nodesWritten, relsWritten int64, duration time.Duration)
PublishIngestThroughput publishes ingestion throughput metrics to Prometheus
func ReadFileForIngest ¶
func ReadFileForIngest(batch *IngestContext, reader io.ReadSeeker, options ReadOptions) error
ReadFileForIngest orchestrates the ingestion of a file into the graph database, performing any necessary metadata validation and schema enforcement before delegating to the core ingest logic.
If the file type is ZIP, additional validation is performed using JSON Schema, and the full stream is consumed to enable downstream readers to function correctly. Zip files are validated here and not at file upload time because it would be expensive to decompress the entire zip into memory. Files that fail this validation step will not be processed further.
Returns an error if metadata validation or ingestion fails.
func SeekToKey ¶
SeekToKey positions the JSON decoder at the value of the given key, which must appear at the specified object depth. If the key is "nodes" or "edges" or "data", the decoder advances past the key and opening '[' token, positioning at the first array element. For other keys (e.g., "metadata"), the decoder stops at the delimiter token itself (e.g., '{') so that callers can handle decoding.
Types ¶
type AzureBase ¶
type AzureBase struct {
Kind enums.Kind `json:"kind"`
Data json.RawMessage `json:"data"`
}
type BatchUpdater ¶
type BatchUpdater interface {
UpdateNodeBy(update graph.NodeUpdate) error
UpdateRelationshipBy(update graph.RelationshipUpdate) error
Nodes() graph.NodeQuery
Relationships() graph.RelationshipQuery
}
BatchUpdater represents the ingestion-facing API for a dawgs BatchOperation
func NewCountingBatchUpdater ¶
func NewCountingBatchUpdater(inner BatchUpdater, stats *IngestStats) BatchUpdater
NewCountingBatchUpdater creates a BatchUpdater wrapper that tracks operation counts
type ChangeManager ¶
type ChangeManager interface {
ResolveChange(change changelog.Change) (bool, error)
Submit(ctx context.Context, change changelog.Change) bool
FlushStats()
ClearCache(ctx context.Context)
}
ChangeManager represents the ingestion-facing API for the changelog daemon.
It provides three responsibilities:
- Deduplication: ResolveChange determines whether a proposed change is new or modified and therefore requires persistence, or whether it has already been seen.
- Submission: Submit enqueues a change for asynchronous processing by the changelog loop.
- Metrics: FlushStats logs and resets internal cache hit/miss statistics, allowing callers to observe deduplication efficiency over time.
To generate mocks for this interface for unit testing seams in the application please use:
mockgen -source=ingest.go -destination=mocks/ingest.go -package=mocks
type ConversionFunc ¶
type ConversionFunc[T any] func(decoded T, converted *ConvertedData) error
ConversionFunc is a function that transforms a decoded JSON object (of type T) into its corresponding internal ingest representation, appending it to the provided ConvertedData.
T represents a specific ingest type (e.g., User, Computer, Group, etc.).
type ConversionFuncWithTime ¶
type ConversionFuncWithTime[T any] func(decoded T, converted *ConvertedData, ingestTime time.Time)
type ConvertedAzureData ¶
type ConvertedAzureData struct {
NodeProps []ein.IngestibleNode
RelProps []ein.IngestibleRelationship
OnPremNodes []ein.IngestibleNode
}
func (*ConvertedAzureData) Clear ¶
func (s *ConvertedAzureData) Clear()
type ConvertedData ¶
type ConvertedData struct {
NodeProps []ein.IngestibleNode
RelProps []ein.IngestibleRelationship
}
func (*ConvertedData) Clear ¶
func (s *ConvertedData) Clear()
type ConvertedGroupData ¶
type ConvertedGroupData struct {
NodeProps []ein.IngestibleNode
RelProps []ein.IngestibleRelationship
DistinguishedNameProps []ein.IngestibleRelationship
}
func (*ConvertedGroupData) Clear ¶
func (s *ConvertedGroupData) Clear()
type ConvertedSessionData ¶
type ConvertedSessionData struct {
SessionProps []ein.IngestibleSession
}
func CreateConvertedSessionData ¶
func CreateConvertedSessionData(count int) ConvertedSessionData
func (*ConvertedSessionData) Clear ¶
func (s *ConvertedSessionData) Clear()
type GraphifyData ¶
type GraphifyData interface {
appcfg.ParameterService
// Task handlers
GetAllIngestTasks(ctx context.Context) (model.IngestTasks, error)
DeleteIngestTask(ctx context.Context, ingestTask model.IngestTask) error
GetFlagByKey(context.Context, string) (appcfg.FeatureFlag, error)
RegisterSourceKind(context.Context) func(sourceKind graph.Kind) error
}
The GraphifyData interface is designed to manage the lifecycle of ingestion tasks
type GraphifyService ¶
type GraphifyService struct {
// contains filtered or unexported fields
}
func NewGraphifyService ¶
func NewGraphifyService(ctx context.Context, db GraphifyData, graphDb graph.Database, cfg config.Configuration, schema upload.IngestSchema, changeManager ChangeManager) GraphifyService
func (*GraphifyService) NewIngestContext ¶
func (s *GraphifyService) NewIngestContext(ctx context.Context, ingestTime time.Time, useChangelog bool) *IngestContext
func (*GraphifyService) ProcessIngestFile ¶
func (s *GraphifyService) ProcessIngestFile(ic *IngestContext, task model.IngestTask) ([]IngestFileData, error)
ProcessIngestFile reads the files at the path supplied, and returns the total number of files in the archive, the number of files that failed to ingest as JSON, and an error
func (*GraphifyService) ProcessTasks ¶
func (s *GraphifyService) ProcessTasks(updateJob UpdateJobFunc)
func (*GraphifyService) RegisterSourceKind ¶
RegisterSourceKind - returns a function that will register a source kind and then refresh the in-memory DAWGS kind map
type IngestContext ¶
type IngestContext struct {
Ctx context.Context
// Batch is the buffering/flushing mechanism that writes entities to the graph database
Batch BatchUpdater
// IngestTime is a single timestamp assigned to the lastseen property of every entity ignested per ingest run
IngestTime time.Time
// Manager is the caching layer that deduplicates ingest payloads across ingest runs
Manager ChangeManager
// Stats tracks the number of nodes and relationships processed during ingestion
Stats *IngestStats
// RetainIngestedFiles determines if the service should clean up working files after ingest
RetainIngestedFiles bool
}
IngestContext is a container for dependencies needed by ingest
func NewIngestContext ¶
func NewIngestContext(ctx context.Context, opts ...IngestOption) *IngestContext
func (*IngestContext) BindBatchUpdater ¶
func (s *IngestContext) BindBatchUpdater(batch BatchUpdater)
func (*IngestContext) HasChangelog ¶
func (s *IngestContext) HasChangelog() bool
type IngestFileData ¶
type IngestOption ¶
type IngestOption func(*IngestContext)
option helpers
func WithBatchUpdater ¶
func WithBatchUpdater(batchUpdater BatchUpdater) IngestOption
func WithChangeManager ¶
func WithChangeManager(manager ChangeManager) IngestOption
func WithIngestRetentionConfig ¶
func WithIngestRetentionConfig(shouldRetainIngestedFiles bool) IngestOption
func WithIngestTime ¶
func WithIngestTime(ingestTime time.Time) IngestOption
type IngestStats ¶
type IngestStats struct {
// Total entities processed (including deduplicated ones).
// Consider this the number of elements present in the raw ingest payload.
NodesProcessed atomic.Int64
RelationshipsProcessed atomic.Int64
// Entities actually written to database (subset of processed)
NodesWritten atomic.Int64
RelationshipsWritten atomic.Int64
}
IngestStats tracks the number of nodes and relationships processed during ingestion
func (*IngestStats) GetCounts ¶
func (s *IngestStats) GetCounts() (nodesProcessed, relsProcessed, nodesWritten, relsWritten int64)
func (*IngestStats) Reset ¶
func (s *IngestStats) Reset()
type IngestUserDataError ¶
type IngestUserDataError struct {
Msg string
}
IngestUserDataError is used to return an error related to the data a user is ingesting, vs an error in the internal go logic
func (IngestUserDataError) Error ¶
func (e IngestUserDataError) Error() string
type ReadOptions ¶
type ReadOptions struct {
FileType model.FileType // JSON or ZIP
IngestSchema upload.IngestSchema
RegisterSourceKind registrationFn
}
type UpdateJobFunc ¶
type UpdateJobFunc func(jobId int64, fileData []IngestFileData)
UpdateJobFunc is passed to the graphify service to let it tell us about the tasks as they are processed
The datapipe doesn't know or care about tasks, and the graphify service doesn't know or care about jobs. Instead, this func is provided as an abstraction for graphify.