1. Home
  2. Docs
  3. Functions
  4. Developing functions

Developing functions

Functions are typically small pieces of code written in Java or Python. You have two options when writing functions:

  • Basic functions
  • Enhanced functions

Basic functions

With basic functions, you have access to the core Java or Python libraries to build custom logic.

In Python, you create a script that implements the process(input) method that returns the value to be published to the output topic. Here is an example of a basic Python function:

def process(input):
    return "{}!".format(input)

This function takes the string content of the message , appends an exclamation mark, then returns the result. The result will be published to the output topic configured for the message.

In Java you create a basic function using a class that implements the apply (String input) method of the Function class. The custom logic should return a value that will be published to the output topic. Here is an example of a basic Java function that appends an exclamation mark to a string:

public class JavaNativeExclamationFunction implements Function<String, String> {
    @Override
    public String apply(String input) {
        return String.format("%s!", input);
    }
}

Enhanced functions

With enhanced functions, the function has access to a context object. The context object provides various information and functionality to the function. Here is what the context object provides:

  • The name and ID of the function
  • The message ID of the message being processed
  • The key, event time, properties and partition key of the message being processed
  • The name of the input topic
  • The name of the output topic
  • The tenant and namespace of the function
  • The ID of the function worker instance running the function
  • The version of the function
  • The logger object used by the function, which can be used to create log messages
  • The  configuration values supplied with the function
  • Functions for storing and retrieving state in state storage.
  • A function to publish new messages onto arbitrary topics.
  • A function to acknowledge the message being processed (if auto-ack is disabled).

You can find the Java interface for the context object here.

In Java you create an enhanced function using a class that implements the process(String input, Context context) method of the Function class. The custom logic can return a value that will be published to the output topic.

Here is an example of an enhanced Java function that gets information about the function from the Context and logs that to the log topic. It then records a metric value for that function.

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.slf4j.Logger;

impo
rt java.util.stream.Collectors;

public class ContextFunction implements Function<String, Void> {
    public Void process(String input, Context context) {
        Logger LOG = context.getLogger();
        String inputTopics = context.getInputTopics().stream().collect(Collectors.joining(", "));
        String functionName = context.getFunctionName();

        String logMessage = String.format("A message with a value of \"%s\" has arrived on one of the following topics: %s\n",
                input,
                inputTopics);

        LOG.info(logMessage);

        String metricName = String.format("function-%s-messages-received", functionName);
        context.recordMetric(metricName, 1);

        return null;
    }
}

In Python, to create an enhanced Function, create a class that take Function as a parameter. Here is an example of a Python function that takes a forbidden word that is passed at a configuration parameter when registering the function and filters out messages that contain that word:

from pulsar import Function

class WordFilter(Function):
    def process(self, context, input):
        forbidden_word = context.user_config()["forbidden-word"]

        # Don't publish the message if it contains the user-supplied
        # forbidden word
        if forbidden_word in input:
            pass
        # Otherwise publish the message
        else:
            return input

Example Functions

Examples of Java functions can be found here and here.

Examples of Python functions can be found here.

Was this article helpful to you? Yes No

How can we help?