Skip to contents

Introduction

While linear workflows are powerful, real-world data processing often requires more sophisticated patterns. The llmr package provides advanced workflow capabilities including conditional branching, parallel processing, and dynamic routing.

This vignette explores these advanced patterns, showing how to build intelligent workflows that adapt their behavior based on data characteristics and processing requirements.

What You’ll Learn

By the end of this guide, you’ll understand how to:

  • Use when() for conditional branching
  • Implement preprocessors for data preparation
  • Create parallel processing workflows
  • Build dynamic routing based on data characteristics
  • Combine multiple advanced patterns
  • Design fault-tolerant workflows

Prerequisites

library(llmr)
#> 
#> Attaching package: 'llmr'
#> The following object is masked from 'package:stats':
#> 
#>     step
library(ellmer)

Understanding when(): Conditional Branching

The when() function enables conditional branching in workflows. It takes a condition function and multiple named branches, executing the appropriate branch based on the condition’s result.

Basic Conditional Workflow

# Define condition function
classify_number <- function(x) {
  if (x > 100) {
    "large"
  } else if (x > 10) {
    "medium"
  } else {
    "small"
  }
}

# Define branch-specific processing
process_large <- function(x) {
  cat("Processing large number:", x, "\n")
  x / 10  # Scale down large numbers
}

process_medium <- function(x) {
  cat("Processing medium number:", x, "\n")
  x * 2   # Double medium numbers
}

process_small <- function(x) {
  cat("Processing small number:", x, "\n")
  x + 100 # Boost small numbers
}

# Create conditional workflow
number_processor <- when(
  classify_number,
  large = step(process_large, name = "scale_down"),
  medium = step(process_medium, name = "double"),
  small = step(process_small, name = "boost")
)

# View the workflow structure
print(number_processor)
#> Workflow conditional branch with 3 branches:
#> Branches: large, medium, small

# Test with different values
test_values <- c(5, 50, 500)
for (val in test_values) {
  cat("\n--- Processing:", val, "---\n")
  result <- execute(number_processor, val)
  print(result)
}
#> 
#> --- Processing: 5 ---
#> Processing small number: 5 
#> $small
#> [1] 105
#> 
#> 
#> --- Processing: 50 ---
#> Processing medium number: 50 
#> $medium
#> [1] 100
#> 
#> 
#> --- Processing: 500 ---
#> Processing large number: 500 
#> $large
#> [1] 50

Preprocessing with Conditional Branching

Combine preprocessing steps with conditional logic:

# Preprocessor function
validate_and_prepare <- function(x) {
  cat("Validating input:", x, "\n")
  if (!is.numeric(x)) {
    stop("Input must be numeric")
  }
  abs(x)  # Ensure positive values
}

# Create workflow with preprocessing
robust_processor <- step(validate_and_prepare, name = "validate") %->%
  when(
    classify_number,
    large = step(process_large, name = "scale_down"),
    medium = step(process_medium, name = "double"),
    small = step(process_small, name = "boost")
  )

print(robust_processor)
#> Workflow: <unnamed> 
#> Nodes: 5 | Edges: 4 
#> 
#> ┌─ ⚙️ validate
#> │
#> ├─ 🔀 when(large, medium, small)
#> │   ├─ large: ⚙️ scale_down
#> │   ├─ medium: ⚙️ double
#> │   └─ small: ⚙️ boost

# Test with various inputs
test_inputs <- c(-25, 15, 150)
for (input in test_inputs) {
  cat("\n--- Processing:", input, "---\n")
  result <- execute(robust_processor, input)
  print(result)
}
#> 
#> --- Processing: -25 ---
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing node: validate 
#> Validating input: -25 
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing node: condition_1 
#> > 2025-07-04 11:49:23 [WORKFLOW] Condition node 'condition_1' selected branches: medium 
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing branch 'medium' -> node 'double_1' 
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing node: double 
#> Processing medium number: 25 
#> $medium
#> [1] 50
#> 
#> 
#> --- Processing: 15 ---
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing node: validate 
#> Validating input: 15 
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing node: condition_1 
#> > 2025-07-04 11:49:23 [WORKFLOW] Condition node 'condition_1' selected branches: medium 
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing branch 'medium' -> node 'double_1' 
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing node: double 
#> Processing medium number: 15 
#> $medium
#> [1] 30
#> 
#> 
#> --- Processing: 150 ---
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing node: validate 
#> Validating input: 150 
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing node: condition_1 
#> > 2025-07-04 11:49:23 [WORKFLOW] Condition node 'condition_1' selected branches: large 
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing branch 'large' -> node 'scale_down_1' 
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing node: scale_down 
#> Processing large number: 150 
#> $large
#> [1] 15

Parallel Processing with when()

Use when() to execute multiple branches in parallel by returning multiple branch names:

# Define analysis functions
sentiment_analysis <- function(text) {
  cat("Analyzing sentiment...\n")
  # Simulate sentiment analysis
  sentiments <- c("positive", "negative", "neutral")
  sample(sentiments, 1)
}

word_count <- function(text) {
  cat("Counting words...\n")
  length(strsplit(text, "\\s+")[[1]])
}

extract_keywords <- function(text) {
  cat("Extracting keywords...\n")
  words <- strsplit(tolower(text), "\\s+")[[1]]
  # Return top 3 unique words
  unique_words <- unique(words)
  sample(unique_words, min(3, length(unique_words)))
}

readability_score <- function(text) {
  cat("Calculating readability...\n")
  # Simple readability metric
  words <- strsplit(text, "\\s+")[[1]]
  avg_word_length <- mean(nchar(words))
  round(avg_word_length, 2)
}

# Function to combine parallel results
combine_analysis <- function(results) {
  cat("Combining analysis results...\n")
  list(
    sentiment = results$sentiment,
    word_count = results$word_count,
    keywords = results$keywords,
    readability = results$readability,
    summary = paste(
      "Text analysis:", results$word_count, "words,",
      results$sentiment, "sentiment,",
      "readability score:", results$readability
    )
  )
}

# Create parallel analysis workflow
text_analyzer <- when(
  function(text) c("sentiment", "word_count", "keywords", "readability"),
  sentiment = step(sentiment_analysis, name = "sentiment"),
  word_count = step(word_count, name = "count_words"),
  keywords = step(extract_keywords, name = "extract_keys"),
  readability = step(readability_score, name = "readability")
) %->%
  step(combine_analysis, name = "combine")

print(text_analyzer)
#> Workflow: <unnamed> 
#> Nodes: 6 | Edges: 5 
#> 
#> ┌─ 🔀 when(sentiment, word_count, keywords, readability)
#> │   ├─ sentiment: ⚙️ sentiment
#> │   ├─ word_count: ⚙️ count_words
#> │   ├─ keywords: ⚙️ extract_keys
#> │   └─ readability: ⚙️ readability
#> │
#> └─ ⚙️ combine

# Test parallel processing
sample_text <- "The advanced workflow capabilities in llmr enable sophisticated data processing patterns."
result <- execute(text_analyzer, sample_text)
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing node: condition_1 
#> > 2025-07-04 11:49:23 [WORKFLOW] Condition node 'condition_1' selected branches: sentiment, word_count, keywords, readability 
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing branch 'sentiment' -> node 'sentiment_1' 
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing node: sentiment 
#> Analyzing sentiment...
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing branch 'word_count' -> node 'count_words_1' 
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing node: count_words 
#> Counting words...
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing branch 'keywords' -> node 'extract_keys_1' 
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing node: extract_keys 
#> Extracting keywords...
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing branch 'readability' -> node 'readability_1' 
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing node: readability 
#> Calculating readability...
#> > 2025-07-04 11:49:23 [WORKFLOW] Executing node: combine 
#> Combining analysis results...
print(result)
#> $sentiment
#> [1] "positive"
#> 
#> $word_count
#> [1] 11
#> 
#> $keywords
#> [1] "enable" "in"     "llmr"  
#> 
#> $readability
#> [1] 7.18
#> 
#> $summary
#> [1] "Text analysis: 11 words, positive sentiment, readability score: 7.18"

Dynamic Routing Based on Data Characteristics

Create workflows that route data based on its characteristics:

# Data type classifier
classify_data_type <- function(data) {
  if (is.character(data)) {
    if (nchar(data) > 100) {
      "long_text"
    } else {
      "short_text"
    }
  } else if (is.numeric(data)) {
    if (length(data) > 1) {
      "numeric_vector"
    } else {
      "single_number"
    }
  } else {
    "other"
  }
}

# Specialized processors for different data types
process_long_text <- function(text) {
  cat("Processing long text (", nchar(text), "characters)\n")
  # Summarize long text
  words <- strsplit(text, "\\s+")[[1]]
  paste("Summary:", length(words), "words starting with:", 
    paste(head(words, 3), collapse = " ")
  )
}

process_short_text <- function(text) {
  cat("Processing short text:", text, "\n")
  # Simple transformation for short text
  toupper(text)
}

process_numeric_vector <- function(vec) {
  cat("Processing numeric vector of length", length(vec), "\n")
  list(
    mean = mean(vec),
    sd = sd(vec),
    range = range(vec)
  )
}

process_single_number <- function(num) {
  cat("Processing single number:", num, "\n")
  list(
    value = num,
    squared = num^2,
    sqrt = sqrt(abs(num))
  )
}

process_other <- function(data) {
  cat("Processing other data type:", class(data), "\n")
  paste("Unsupported data type:", class(data)[1])
}

# Create dynamic routing workflow
data_router <- when(
  classify_data_type,
  long_text = step(process_long_text, name = "long_text_processor"),
  short_text = step(process_short_text, name = "short_text_processor"),
  numeric_vector = step(process_numeric_vector, name = "vector_processor"),
  single_number = step(process_single_number, name = "number_processor"),
  other = step(process_other, name = "fallback_processor")
)

print(data_router)
#> Workflow conditional branch with 5 branches:
#> Branches: long_text, short_text, numeric_vector, single_number, other

# Test with different data types
test_data <- list(
  "Hello World",
  "This is a much longer text that contains many words and should be classified as long text for processing purposes.",
  42,
  c(1, 2, 3, 4, 5),
  factor(c("A", "B", "C"))
)

for (i in seq_along(test_data)) {
  cat("\n--- Test", i, "---\n")
  result <- execute(data_router, test_data[[i]])
  print(result)
}
#> 
#> --- Test 1 ---
#> Processing short text: Hello World 
#> $short_text
#> [1] "HELLO WORLD"
#> 
#> 
#> --- Test 2 ---
#> Processing long text ( 114 characters)
#> $long_text
#> [1] "Summary: 20 words starting with: This is a"
#> 
#> 
#> --- Test 3 ---
#> Processing single number: 42 
#> $single_number
#> $single_number$value
#> [1] 42
#> 
#> $single_number$squared
#> [1] 1764
#> 
#> $single_number$sqrt
#> [1] 6.480741
#> 
#> 
#> 
#> --- Test 4 ---
#> Processing numeric vector of length 5 
#> $numeric_vector
#> $numeric_vector$mean
#> [1] 3
#> 
#> $numeric_vector$sd
#> [1] 1.581139
#> 
#> $numeric_vector$range
#> [1] 1 5
#> 
#> 
#> 
#> --- Test 5 ---
#> Processing other data type: factor 
#> $other
#> [1] "Unsupported data type: factor"

Agent-Based Conditional Workflows

Combine AI agents with conditional logic for intelligent processing:

# Create specialized agents with ellmer chat objects and system prompts
technical_writer <- new_agent("technical_writer", ellmer::chat_anthropic(
  system_prompt = "You are a technical writer. Analyze text for technical accuracy and clarity. Provide structured feedback."
))
#> Using model = "claude-sonnet-4-20250514".

creative_writer <- new_agent("creative_writer", ellmer::chat_anthropic(
  system_prompt = "You are a creative writing expert. Analyze text for creativity, style, and emotional impact."
))
#> Using model = "claude-sonnet-4-20250514".

data_analyst <- new_agent("data_analyst", ellmer::chat_anthropic(
  system_prompt = "You are a data analyst. Extract quantitative insights and patterns from text."
))
#> Using model = "claude-sonnet-4-20250514".

# Content classifier
classify_content <- function(text) {
  # Simple heuristic classification
  technical_keywords <- c("algorithm", "function", "data", "analysis", "method", "system")
  creative_keywords <- c("story", "character", "emotion", "beautiful", "imagine", "dream")
  
  text_lower <- tolower(text)
  technical_score <- sum(sapply(technical_keywords, function(kw) grepl(kw, text_lower)))
  creative_score <- sum(sapply(creative_keywords, function(kw) grepl(kw, text_lower)))
  
  if (technical_score > creative_score) {
    "technical"
  } else if (creative_score > technical_score) {
    "creative"
  } else {
    "general"
  }
}

# Create intelligent content processor
content_processor <- when(
  classify_content,
  technical = step(technical_writer, name = "technical_analysis"),
  creative = step(creative_writer, name = "creative_analysis"),
  general = step(data_analyst, name = "general_analysis")
)

print(content_processor)
#> Workflow conditional branch with 3 branches:
#> Branches: technical, creative, general

# Test with different content types
technical_text <- "The algorithm processes data using advanced statistical methods to optimize system performance."
creative_text <- "The beautiful story unfolds as the character embarks on an emotional journey through imagination."
general_text <- "The quarterly report shows steady growth across all business segments."

test_texts <- list(
  technical = technical_text,
  creative = creative_text,
  general = general_text
)

for (type in names(test_texts)) {
  cat("\n--- Analyzing", type, "content ---\n")
  result <- execute(content_processor, test_texts[[type]])
  cat("Analysis result:", substr(result, 1, 100), "...\n")
}
#> 
#> --- Analyzing technical content ---
#> Analysis result: ## Technical Accuracy Analysis
#> 
#> **Strengths:**
#> - Uses appropriate technical terminology
#> - Conveys a  ...
#> 
#> --- Analyzing creative content ---
#> Analysis result: This sentence reads more like a summary or outline than a piece of creative writing. Here's my analy ...
#> 
#> --- Analyzing general content ---
#> Analysis result: I notice this statement lacks the specific quantitative data needed for meaningful analysis. To extr ...

Complex Multi-Stage Workflows

Combine multiple advanced patterns for sophisticated processing:

# Multi-stage document processor
document_preprocessor <- function(doc) {
  cat("Preprocessing document...\n")
  # Clean and prepare document
  cleaned <- trimws(doc)
  list(
    content = cleaned,
    length = nchar(cleaned),
    word_count = length(strsplit(cleaned, "\\s+")[[1]])
  )
}

# Document classifier based on length and content
classify_document <- function(doc_info) {
  if (doc_info$word_count > 100) {
    "long_document"
  } else if (doc_info$word_count > 20) {
    "medium_document"
  } else {
    "short_document"
  }
}

# Specialized processors for different document lengths
process_long_document <- function(doc_info) {
  cat("Processing long document...\n")
  # For long documents, do parallel analysis
  execute(
    when(
      function(x) c("summary", "keywords", "sentiment"),
      summary = step(function(x) paste("Summary of", x$word_count, "words"), name = "summarize"),
      keywords = step(function(x) c("key1", "key2", "key3"), name = "extract_keywords"),
      sentiment = step(function(x) "neutral", name = "analyze_sentiment")
    ),
    doc_info
  )
}

process_medium_document <- function(doc_info) {
  cat("Processing medium document...\n")
  list(
    type = "medium",
    analysis = paste("Medium document with", doc_info$word_count, "words"),
    recommendation = "Consider expanding or condensing"
  )
}

process_short_document <- function(doc_info) {
  cat("Processing short document...\n")
  list(
    type = "short",
    analysis = paste("Short document with", doc_info$word_count, "words"),
    recommendation = "Consider expanding content"
  )
}

# Final formatter
format_results <- function(analysis) {
  cat("Formatting final results...\n")
  list(
    timestamp = Sys.time(),
    analysis = analysis,
    status = "complete"
  )
}

# Create complex multi-stage workflow
document_pipeline <- step(document_preprocessor, name = "preprocess") %->%
  when(
    classify_document,
    long_document = step(process_long_document, name = "process_long"),
    medium_document = step(process_medium_document, name = "process_medium"),
    short_document = step(process_short_document, name = "process_short")
  ) %->%
  step(format_results, name = "format")

print(document_pipeline)
#> Workflow: <unnamed> 
#> Nodes: 6 | Edges: 5 
#> 
#> ┌─ ⚙️ preprocess
#> │
#> ├─ 🔀 when(long_document, medium_document, short_document)
#> │   ├─ long_document: ⚙️ process_long
#> │   ├─ medium_document: ⚙️ process_medium
#> │   └─ short_document: ⚙️ process_short
#> │
#> └─ ⚙️ format

# Test with documents of different lengths
test_documents <- list(
  short = "Brief note.",
  medium = "This is a medium-length document that contains several sentences and 
    provides a reasonable amount of content for analysis purposes.",
  long = paste(
    rep("This is a very long document with many repeated sentences to 
      simulate a lengthy piece of content.", 
      10
    ), 
    collapse = " "
  )
)

for (doc_type in names(test_documents)) {
  cat("\n--- Processing", doc_type, "document ---\n")
  result <- execute(document_pipeline, test_documents[[doc_type]])
  cat("Processing complete. Result type:", class(result), "\n")
}
#> 
#> --- Processing short document ---
#> > 2025-07-04 11:49:49 [WORKFLOW] Executing node: preprocess 
#> Preprocessing document...
#> > 2025-07-04 11:49:49 [WORKFLOW] Executing node: condition_1 
#> > 2025-07-04 11:49:49 [WORKFLOW] Condition node 'condition_1' selected branches: short_document 
#> > 2025-07-04 11:49:49 [WORKFLOW] Executing branch 'short_document' -> node 'process_short_1' 
#> > 2025-07-04 11:49:49 [WORKFLOW] Executing node: process_short 
#> Processing short document...
#> > 2025-07-04 11:49:49 [WORKFLOW] Executing node: format 
#> Formatting final results...
#> Processing complete. Result type: list 
#> 
#> --- Processing medium document ---
#> > 2025-07-04 11:49:49 [WORKFLOW] Executing node: preprocess 
#> Preprocessing document...
#> > 2025-07-04 11:49:49 [WORKFLOW] Executing node: condition_1 
#> > 2025-07-04 11:49:49 [WORKFLOW] Condition node 'condition_1' selected branches: short_document 
#> > 2025-07-04 11:49:49 [WORKFLOW] Executing branch 'short_document' -> node 'process_short_1' 
#> > 2025-07-04 11:49:49 [WORKFLOW] Executing node: process_short 
#> Processing short document...
#> > 2025-07-04 11:49:49 [WORKFLOW] Executing node: format 
#> Formatting final results...
#> Processing complete. Result type: list 
#> 
#> --- Processing long document ---
#> > 2025-07-04 11:49:49 [WORKFLOW] Executing node: preprocess 
#> Preprocessing document...
#> > 2025-07-04 11:49:49 [WORKFLOW] Executing node: condition_1 
#> > 2025-07-04 11:49:49 [WORKFLOW] Condition node 'condition_1' selected branches: long_document 
#> > 2025-07-04 11:49:49 [WORKFLOW] Executing branch 'long_document' -> node 'process_long_1' 
#> > 2025-07-04 11:49:49 [WORKFLOW] Executing node: process_long 
#> Processing long document...
#> > 2025-07-04 11:49:49 [WORKFLOW] Executing node: format 
#> Formatting final results...
#> Processing complete. Result type: list

Error Handling and Fault Tolerance

Build robust workflows with error handling:

# Safe processor with error handling
safe_processor <- function(data) {
  tryCatch({
    if (is.character(data) && nchar(data) == 0) {
      stop("Empty string not allowed")
    }
    if (is.numeric(data) && data < 0) {
      stop("Negative numbers not supported")
    }
    # Normal processing
    paste("Processed:", data)
  }, error = function(e) {
    cat("Error occurred:", e$message, "\n")
    paste("Error:", e$message)
  })
}

# Fallback processor
fallback_processor <- function(data) {
  cat("Using fallback processing...\n")
  paste("Fallback result for:", as.character(data))
}

# Workflow with error handling
robust_workflow <- step(safe_processor, name = "safe_process") %->%
  when(
    function(result) {
      if (grepl("^Error:", result)) {
        "error"
      } else {
        "success"
      }
    },
    error = step(fallback_processor, name = "fallback"),
    success = step(function(x) paste("Success:", x), name = "finalize")
  )

print(robust_workflow)
#> Workflow: <unnamed> 
#> Nodes: 4 | Edges: 3 
#> 
#> ┌─ ⚙️ safe_process
#> │
#> ├─ 🔀 when(error, success)
#> │   ├─ error: ⚙️ fallback
#> │   └─ success: ⚙️ finalize

# Test error handling
test_inputs <- list("valid input", "", -5, 42)
for (input in test_inputs) {
  cat("\n--- Testing input:", as.character(input), "---\n")
  result <- execute(robust_workflow, input)
  print(result)
}
#> 
#> --- Testing input: valid input ---
#> > 2025-07-04 11:49:50 [WORKFLOW] Executing node: safe_process 
#> > 2025-07-04 11:49:50 [WORKFLOW] Executing node: condition_1 
#> > 2025-07-04 11:49:50 [WORKFLOW] Condition node 'condition_1' selected branches: success 
#> > 2025-07-04 11:49:50 [WORKFLOW] Executing branch 'success' -> node 'finalize_1' 
#> > 2025-07-04 11:49:50 [WORKFLOW] Executing node: finalize 
#> $success
#> [1] "Success: Processed: valid input"
#> 
#> 
#> --- Testing input:  ---
#> > 2025-07-04 11:49:50 [WORKFLOW] Executing node: safe_process 
#> Error occurred: Empty string not allowed 
#> > 2025-07-04 11:49:50 [WORKFLOW] Executing node: condition_1 
#> > 2025-07-04 11:49:50 [WORKFLOW] Condition node 'condition_1' selected branches: error 
#> > 2025-07-04 11:49:50 [WORKFLOW] Executing branch 'error' -> node 'fallback_1' 
#> > 2025-07-04 11:49:50 [WORKFLOW] Executing node: fallback 
#> Using fallback processing...
#> $error
#> [1] "Fallback result for: Error: Empty string not allowed"
#> 
#> 
#> --- Testing input: -5 ---
#> > 2025-07-04 11:49:50 [WORKFLOW] Executing node: safe_process 
#> Error occurred: Negative numbers not supported 
#> > 2025-07-04 11:49:50 [WORKFLOW] Executing node: condition_1 
#> > 2025-07-04 11:49:50 [WORKFLOW] Condition node 'condition_1' selected branches: error 
#> > 2025-07-04 11:49:50 [WORKFLOW] Executing branch 'error' -> node 'fallback_1' 
#> > 2025-07-04 11:49:50 [WORKFLOW] Executing node: fallback 
#> Using fallback processing...
#> $error
#> [1] "Fallback result for: Error: Negative numbers not supported"
#> 
#> 
#> --- Testing input: 42 ---
#> > 2025-07-04 11:49:50 [WORKFLOW] Executing node: safe_process 
#> > 2025-07-04 11:49:50 [WORKFLOW] Executing node: condition_1 
#> > 2025-07-04 11:49:50 [WORKFLOW] Condition node 'condition_1' selected branches: success 
#> > 2025-07-04 11:49:50 [WORKFLOW] Executing branch 'success' -> node 'finalize_1' 
#> > 2025-07-04 11:49:50 [WORKFLOW] Executing node: finalize 
#> $success
#> [1] "Success: Processed: 42"

Best Practices for Advanced Workflows

1. Design Clear Condition Functions

# Good: Clear, single-purpose condition
classify_by_size <- function(data) {
  if (length(data) > 1000) "large"
  else if (length(data) > 100) "medium"
  else "small"
}

# Good: Descriptive branch names
size_router <- when(
  classify_by_size,
  large = step(batch_processor, name = "batch_process"),
  medium = step(standard_processor, name = "standard_process"),
  small = step(quick_processor, name = "quick_process")
)

2. Handle Edge Cases

# Always include fallback branches
robust_classifier <- function(data) {
  if (is.null(data)) return("null")
  if (length(data) == 0) return("empty")
  if (is.character(data)) return("text")
  if (is.numeric(data)) return("number")
  "unknown"  # Fallback for unexpected types
}

3. Use Meaningful Names

# Good: Descriptive workflow and step names
data_pipeline <- step(validate_input, name = "validate") %->%
  when(
    classify_data_complexity,
    simple = step(fast_processor, name = "fast_track"),
    complex = step(thorough_processor, name = "deep_analysis")
  ) %->%
  step(format_output, name = "finalize")

Summary

Advanced workflows in llmr provide powerful capabilities for building sophisticated data processing systems:

  • when() enables conditional branching based on data characteristics
  • Parallel processing executes multiple branches simultaneously
  • Dynamic routing adapts workflow behavior to input data
  • Multi-stage patterns combine preprocessing, analysis, and post-processing
  • Error handling creates robust, fault-tolerant workflows
  • Agent integration adds intelligent decision-making capabilities

These patterns can be combined to create highly sophisticated, adaptive processing systems that handle complex real-world scenarios with intelligence and resilience.

What’s Next?

With these advanced patterns, you can build: - Intelligent document processing systems - Adaptive data analysis pipelines - Multi-modal content processors - Fault-tolerant production workflows - AI-powered decision trees

The combination of conditional logic, parallel processing, and AI agents makes llmr workflows incredibly powerful for tackling complex, real-world data processing challenges.