Java Streams

Understanding Collectors for Data Processing

May 17, 2020

This blog will give you in depth understanding about a very interesting concept, Collectors. Collectors are very beneficial if you are processing a lot of data in Memory in Java.

Collectors have very useful and easy to use reduction operations, such as accumulating elements into collections, summarizing elements according to various criteria and many more.

In this blog we will discuss Collectors for Data processing in depth.

We will cover the following types:

  • List Collectors
  • Set Collectors
  • To any Collection Collectors
  • Map Collectors
  • Downstream collectors and Cascading collectors

When we build stream pipelines, once all the intermediate operations execute, the last thing that can be done is to apply Consumer on the data like

.forEach(System.out::println);

Or we can collect processed data into a container. Like using reduce which gives a single final result.

.reduce(..);

Or we can use a collect terminal call that collects the processed data into some container.

.collect(..);

List & Set Collectors

Collectors is a utility class that provides reduction algorithms that can accumulate , summarize and aggregate elements into collection. We have different ready made collectors that collect the data into various structures or containers. All the methods of this Collectors class are static. Collectors make the task easier for the programmer as he does not need to use the reduce method.

For example, we want to collect the processed data into a list. The Collectors class will have a toList() Collector. This Collector is passed to the method collect() to take the stream data into a list.

List list = stream.collect(Collectors.toList());

The methods in Collectors class returns a Collector interface whose implementation is returned by the static methods present in this Collectors utility class.

Collector listCollector = Collectors.toList();

The Collector interface performs all the reductions when passed to the collect call on the stream.

public interface Collector<T, A, R> {

// Performing reductions

}

Moving on, in this blog we will discuss about the Collectors API and how can we use this API. Let us take an example:

This is a Employee.java file

public class Employee {

     private int id;
     private String name;     
     private char gender;
     private date dob;
     private String city;
     private String designation;
     private date joiningDate;
     private Double salary;

     public Employee( int id, String name, char gender, Date dob, String city, String designation, Date joiningDate, double salary) {
          this.id = id;
          this.name = name;
          this.gender = gender;
          this.dob = dob;
          this.city = city;
          this.designation = designation;
          this.joiningDate = joiningDate;
          this.salary = salary;

     } 
     public int getId() {
          return id;
     }

     public void setId(int id) {
          this.id = id;
     }

     public String getName() {
          return name;
     }

     public void setName(String name) {
          this.name = name;
     }

     public char getGender() {
          return gender;
     }

     public void setGender(char gender) {
          this.gender = gender;
     }

     public date getDob() {
          return dob;
     }

     public void setDob(Date dob) {
          this.dob = dob;
     }

     public String getCity() {
          return city;
     }

     public void setCity(String city) {
          this.city = city;
     }

     public String getDesignation() {
          return designation;
     }

     public void setDesignation(String designation) {
          this.designation = designation;
     }

     public Date getJoiningDate() {
          return joiningDate;
     }

     public void setJoiningDate(Date joiningDate) {
          this.joiningDate = joiningdate;
     }

     public double getSalary() {
          return salary;
     }

     @Override
     public String toString() {
          return "Employee [ id=" + id + ", name=" + name + ", gender=" + gender + ", dob=" + dob + ", city=" + city + ", designation=" + designation + ", joiningDate=" + joiningDate + ", salary=" + salary + "]";
     }
}

Also, consider the corresponding database:


27827,Richard,M,1988-06-10,Boston,Developer,2017-12-12,60000.00
27828,John,M,1978-07-11,Boston,Architect,2015-01-01,15000.00
27829,David,M,182-09-12,Newyork City,Manager,2015-01-19,145000.00
27830,Meenal,F,1991-01-13,Austin,Developer,2018-07-05,65000.00
27831,Ginni,F,1993-06-14,Delhi,Developer,2017-12-12,72000.00
27832,Tom,M,1988-03-15,Bangalore,Developer,2019-01-10,69000.00
27833,Michael,M,1984-04-16,London,Lead,2018-09-21,87000.00
27834,Alexa,F,1995-10-29,Newyork City,Developer,2019-10-11,65000.00
27835,Peter,M,1993-02-09,London,Developer,2019-10-11,63000.00
37836,Parvati,F,1991-12-14,Jaipur,Lead,2017-06-10,89000.00

Finally, this is the Collectors in Action class

public class CollectorsInAction {
     
     public static void main(String[] args ) {
          Path path =  Paths.get("/Users/mohitsinghal/newWorkspace/Functional-Programming/src/com/basicsstrong/functional/section11/EmployeeData");
          try(Stream<String> lines = Files.lines(path);) {
               Stream<String> words = lines.flatMap(line -> Arrays.stream(line.split(",")));
               Spliterator<String> wordSpliterator = words.spliterator();
               Spliterator<Employee> employeeSpliterator = new EmployeeSpliteratot(wordSpliterator);
               Stream<Employee> employees = StreamSupport.stream(employeeSpliterator, false);

               List<String> employeeNames = employees
               .map(employee -> employee.getName())
               .collect(Collectors.toList());

               employeeNames.forEach(System.out::println);
          }catch(IOException e) {
               System.out.println();
          }
     }
}

Output:

Richard
John
David
Meenal
Ginni
Tom
Michael
Alexa
Peter
Parvati

To get the list of all the designations, we could use the below code:

Set designations = employeeList.stream()
.map(employee -> employee.getDesignation())
.collect(Collectors.toSet());
designations.forEach(System.out::println);

Output:

Architect
Lead
Developer
Manager

The EmployeeSpliterator, which we have already created, will look like this:           

public class EmployeeSpliterator implements Spliterator {
private Spliterator<String> wordSpliterator;
                private int id;
                private String name;
                private char gender;
                private Date dob;
                private String city;
                private String designation;
                private Date joiningDate;
                private Double salary;

                public EmployeeSpliterator(Spliterator<String> wordSpliterator {
                     this.wordSpliterator  = wordSpliterator ;
                }

                @Override
                public boolean tryAdvance(Consumer<? super Employee> action) {
                     if(this.wordSpliterator.tryAdvance(word -> this.id = Integer.valueOf(word))
                          && this.wordSpliterator.tryAdvance(word -> this.name = word)
                          && this.wordSpliterator.tryAdvance(word -> this.gender = word.charAt(0))
                          && this.wordSpliterator.tryAdvance(word -> this.dob = Date.valueOf(word))
                          && this.wordSpliterator.tryAdvance(word -> this.city = word)
                          && this.wordSpliterator.tryAdvance(word -> this.designation = word)
                          && this.wordSpliterator.tryAdvance(word -> this.joiningDate = Date.valueOf(word))
                          && this.wordSpliterator.tryAdvance(word -> this.salary = Double.valueOf(word))
                           ) {
               action.accept(new Employee(this.id, this.name, this.gender, this.dob, this.city, this.designation, this.joiningDate, this.salary));
                return true: 
            }
            return false;
     }

     @Override
     public Spliterator<Employee> trySplit() {
          return null;
     }

     @Override
     public long estimatesize() {
          return wordSpliterator.estimatesize()/8;
     }
     
     @Override
     public int characteristics() {
          return wordSpliterator.characteristics();
     }

}

To summarize, In this section, we read data from a file, used custom spliterator and discussed toList & toSet Collectors.

To any Collection Collector

Till now, we learnt about lists and sets. But for other collections like queue, linked list or a tree, we can use ToCollection method.

Taking the same data set, suppose we want the list of employees with their ids in sorted form. toSet() method can’t help here.Because it internally returns an HashSet where we don’t have sorting of data.

Basically, we want a TreeSet, to get the sorted data. Lets see how we can do that. We generate the stream and then the collect. And in this collect call, we are going to use toCollection collector. toCollecton allow us to get the data into any collection we want. Here we supply the collection, in which we want the data. So we want a treeSet that inserts data in a perticular order.
The below code can achieve that:

TreeSet<Employee> employeesSorted = employeelist.stream()
.collect(Collectors.tocollection(TreeSet :: new));

employeesSorted.forEach(System.out::println);
System.out.println("-----X-----Collection(TreeSet)-----X-----");

Also, we have to implement the compareTo method in the Employee class so to compare the employees. So lets implement the comparator Interface and override the compareTo method:

@Override
public int compareTo(Employee o) {
    if(this.id < o.getId())
        return -1;
    else if(this.id > o.getId())
        return 1;
    else
        return 0;
}

We can also use this toCollection to change a Collection to an entirely different collection too. For example, Suppose initially we have an arrayList and we want a LinkedList of all the elements in the arrayList, because there are going to be insertion, deletion kind of operations on that data. And linked list can efficiently and smoothly handle those operations. So, We can simply have a stream on arrayList and using toCollection Collector we can get the linkedList of the data in the stream. And then can apply those operations on linkedList. So at many places this collector can be very very useful.

Map Collector

Now, we will discuss the Collector to collect the data in a map. Suppose, say we want the IDs as Keys, and corresponding to those Ids we want employeeNames, basically values as the employee names. We can use , toMap Collector here.

Map<Integer, String> getNameById = employeeList.stream()
     .collect(
                   Collectors.toMap(e -> e.getId(), e -> e.getName())
     );
System.out.println(getNameById);

Output:

27827=Richard, 27828=John, 27829=David and so on…….

Moving to the Next use case : In this data We want to partition the elements on some criteria. Suppose We want different Collections of all the male employees working in the company and all the female employees in the company.

The first thought that come to mind is using the filter operation. But filter operation is only helpful when we want all the elements that prove to be true on a given condition. So if we want only a collection of males in the company, we undoubtedly can use the filter or if we just want all the female employees, we can use filter. But getting both collections is quite different from that.
The following code can solve that:

Map<Boolean, List< Employee>> partitionedData = employeeList.stream()
     .collect(
                   Collectors.partitioningBy(e -> e.getGender() == 'M')
     );
System.out.println(partitionedData);

The output will partition the data into two groups which are actually 2 lists
one that satisfy the condition, corresponding to true Key and the other that doesn’t satisfy the condition corresponding to a false key. It will return a map with 2 entries:
for first entry : the key is true and the value is a list of all the male employees
for second entry : key is False and value is a list of all the female employees. That is why it is named partitioningBy() because it partitions the data.

If we want all the males & females in a separate collection, we could use:

List<Employee> maleEmployees = partitionedData.get(true);
List<Employee> femaleEmployees = partitionedData.get(false);

Taking another use case, suppose we want the employees grouped according to the designation. All the manager in one group, all the architects in a different group, all the leads in a different group and similarly all the developers in another group.

Map<String, List< Employee>> getByDesignation = employeeList.stream()
     .collect(
                   Collectors.groupingBy(e -> e.getDesignation())
     );
System.out.println(getByDesignation);

In this section, we talked about two collection methods and two map methods. Also, we discussed partioningBy method and groupingBy Collector.

Till now , we discussed the collectors that return a collection or return a map. But not every-time we need such heavy structure to hold the data.
Suppose we need a single string containing names of all the employees separated by commas. So the returned value is just a string value instead of a collection or a map. For doing this, we have the joining Collector.
Lets create the stream on the list of employees and then a collect call,
inside the collect call, a collector, and the collector that is helpful in this case is Collectors.joining.

String employeeNamesString = employeeList.stream()
		.map(e -> e.getName())
		.collect(
			Collectors.joining(", ") 
		);
System.out.println(employeeNamesString);

Output:

Richard, John, David, Meenal, Ginni, Tom, Michael, Alexa, Peter, Parvati

DOWNSTREAM COLLECTORS

We have discussed many important collectors. We saw how and when we can use them, but that’s not all about Collectors. We saw the collectors methods that were analogous to terminal Calls but we can also cascade or nest the Collectors.
There are Collector methods which themselves can take collectors as argument. If we use a collector inside a collector, we term the inside Collector, a downstream Collector.Let’s practice this collector nesting in the next section.

CASCADING COLLECTORS

In this section of the blog, we will discuss cascading Collectors.

So, we have already created a class and here we are reading from the same employeedata file and we are generating a stream of employee objects from this employeedata file. In the next use case, we will count the number of employees belonging to each designation.

public class DownstreamCollectors {
	public static void main(String[] args) {
		Path path = Paths.get("/Users/mohitsinghal/newWorkspace/Functional-Programming/src/com/basicsstrong/functional/section11/EmployeeData");
		try(Stream<String> lines = Files.lines(path);){
			Spliterator<String> wordSpliterator = lines.flatMap(line -> Arrays.stream(line.split(",")))
											       .spliterator();
			Spliterator<Employee> employeeSpliterator = new EmployeeSpliterator(wordSpliterator);
			List<Employee> employeeList = StreamSupport.stream(employeeSpliterator, false)
											.collect(Collectors.toList());
			Map<String, Long> countByDesignation = employeeList.stream()
					.collect(
							Collectors.groupingBy(e -> e.getDesignation())
                                                                     Collectors.counting();
					             )
                                        );
                        System.out.println(countByDesignation);

          }catch(IOException e) {
               System.out.println();
          }
     }
}

Output:
{Architect=1, Lead=2, Developer=6, Manager=1}

Let us take another use case – to find the total fund distributed to each designation.

Map<String, Double> fundDistribution = employeeList.stream()
				.collect(
						Collectors.groupingBy(
								e -> e.getDesignation(),
						Collectors.summingDouble(e -> e.getSalary())
						)
				);
			
			System.out.println(fundDistribution);			

Output:
{Architect=150000.00, Lead=176000.00, Developer=394000.00, Manager=145000.00}

Suppose we want to know the employee with the highest salary in each group.

Map<String, Optional<Employee>> maxSalaryEmployees 
				= employeeList.stream()
					.collect(
							Collectors.groupingBy(
									e -> e.getDesignation(),
				                        Collectors.maxBy(Comparator.comparing(e -> e.getSalary()))
								)
					);
			System.out.println(maxSalaryEmployees);			

The above code will provide the complete employee object. If we really want only the maximum salary in each designation, the below code does just that.

Map<String, Optional<Double>> maxSalaries 
				= employeeList.stream()
					.collect(
							Collectors.groupingBy(
									e -> e.getDesignation(),
				                        Collectors.mapping(e -> e.getSalary()
				                             Collectors.maxby(Comparator.comparing(Function.identity()))
								)
					);
			System.out.println(maxSalaries);			

Output:
{Architect=Optional[150000.00], Lead=Optional[89000.00], Developer=Optional[72000.00], Manager=Optional[145000.00]}

So, in this way we can use collectors inside collectors or cascade collectors as per our requirements.

SUMMARY

Its time to summarize:

Hope after reading this blog you might have understood, how Collectors can make your life easy if you are processing a lot of data in Memory in Java. In this section we discussed Collectors for Data processing in depth. Collectors can be though as analogous to terminal operations. First, we saw List Collectors, then set Collectors, then to any collectors and after that Map Collectors. We then saw another important stuff, that is, using collectors with another collectors – Downstream collectors and Cascading collectors. .

In the coming sections, we will focus on two main aspects:

INTRODUCTION TO OWN COLLECTORS

  • Internal working of the collectors
  • Creating own / custom collectors

INTERNAL WORKING

Lets start with the collect call:

Collect takes a collector as an Argument..

How this collector is build?

The collector is an interface with 5 abstract methods. All these are higher order functions except the characteristics method.

  • Supplier returns a supplier,
  • Accumulator returns a BiConsumer.
  • Combiner returns a BinaryOperator.
  • Finisher Returns a Function.
  • And the characteristics method, which is not a higher order function returns a set of characteristics of the Collector.

We don’t have to create the implementation class of this collector interface, because we already have it in the collectors utility class.

So, the Collectors class has a static nested class CollectorImpl, this class extends Collection Interface to implement these 5 methods. Along with this inner class, the Collectors utility class, as we know, contains many static methods to collect data into different containers or objects. A container can be a Collection, or a Map or a String.

static class CollectorImpl<T, A, R> implements Collector<T,A,A>

 All the static methods returns object of this CollectorImpl class as this is an implementation to the collector interface. The constructor of the CollectorImpl class is designed to take the lambdas for supplier,  accumulator,  finisher, combiner, and the set of characteristics. So , in this way the collector object with all the lambdas for these methods gets created which is the input argument to the collect call.

return new CollectorImpl<> (supplier, accumulator, finisher, combiner, characteristics);

Now this Collect method is responsible to invoke all the lambda in a particular order to get the final Object. The supplier is implemented to supply an object, for example, if we want to collect the data in an array list, then the supplier will be implemented to supply an empty arrayList object which is the case for toList collector. And when the get method is invoked on this lambda, the lambda get executed and we get the container, which an empty arrayList.

Now we need to add the stream elements one by one to this arrayList. So next we have accumulator which returns a BiConsumer that takes 2 arguments, the first argument to this BiConsumer is the empty Container in this case its an arrayList and the second argument to this is the stream element. The Biconsumer lambda when invoked adds the stream element to the container. So for each element of the stream the accept method is called on this Biconsumer.

stream.collect(collector);

container = collector.supplier().get();
Biconsumer accumulator = collector.accumulator();
forEach(u -> accumulator.accept(container, u);
return collector.finisher().apply(container);

Next comes combiner which returns a binary operator, this step is useful to merge the partial results from different CPU to build the final result, in the case we want parallel processing.

And then the last step is of executing the finisher to get the final result , It is a function, We pass the container Object to it , and it returns the final the final object. It generally Performs the final transformation from the intermediate accumulation type. And finally we get the final object returned by the collector.  In this process, we did not talk about the Collector characteristics. Collector characteristics basically helps to decide how or whether some of the steps in this process should be executed.   Now, lets have a look to the source code..

OWN COLLECTOR

In this section, we will build our own collector. This is a collector interface and the abstract methods it is having:

public interface Collector<T, A, R>{
          Supplier<A> supplier();
          Biconsumer<A, T> accumulator();
          Function<A, R> finisher();
          BinaryOperator<A> combiner();
          Set<Characteristics> characteristics();
}

Let us try to implement our own toList Collector:

public class CustomCollector {
	public static void main(String[] args) {
		List<Integer> numbers = List.of(2,6,8,9,0,1,52,5,61,8,9,96,0,18,23);
                Collector<Integer, List<Integer>,  
List<Integer>> toList = Collector.of(ArrayList::new,
                                                                     (list, e) -> list.add(e),
                                                                     (list1, list2) -> {
                                                                          list1.addAll(list2);
                                                                          return list1;
                                                                     },
                                                                     Collector.Characteristics.IDENTITY_FINISH);
       List<Integer> evens = number.stream()
      .filter(e -> e%2 == 0)
      .collect(toList);

      evens.forEach(System.out::println);
       }
}	

It will display all the even numbers of the list.
Output:
2
6
8
0
52
8
96
0
18

Suppose we want to build a collector that will show the list in a sorted order. We will use toSortedList() collector to achieve our goal.

                Collector<Integer, List<Integer>,  
List<Integer>> toSortedListCollector = Collector.of(ArrayList::new,
                                                                     (list, e) -> list.add(e),
                                                                     (list1, list2) -> {
                                                                          list1.addAll(list2);
                                                                          return list1;
                                                                     },
                                                                     (list) -> {
                                                                           Collections.sort(list);
                                                                           return list;
                                                                     },
                                                                     Collector.Characteristics.UNORDERED);
                List<Integer> sortedList = numbers.stream()
                                                                               .collect(toSortedListCollector);  
                sortedList.forEach(System.out::println)

Output:
0
0
1
2
5
6
8
8
9
9
18
23
52
61
96

To summarize, we have learnt how collectors work internally and to create our own collectors.

Hope this detailed blog on Collectors have given you a thorough understanding about the feature.