Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update LoggingBuffer.java #8868

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 55 additions & 108 deletions core/src/main/java/org/pentaho/di/core/logging/LoggingBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,19 @@

package org.pentaho.di.core.logging;

import com.google.common.annotations.VisibleForTesting;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.util.Utils;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.pentaho.di.core.Const;
import org.pentaho.di.core.util.Utils;

import com.google.common.annotations.VisibleForTesting;

/**
* This class keeps the last N lines in a buffer
*
Expand All @@ -45,8 +43,7 @@
public class LoggingBuffer {
private String name;

private List<BufferLine> buffer;
private ReadWriteLock lock = new ReentrantReadWriteLock();
private LinkedBlockingDeque<BufferLine> buffer;

private int bufferSize;

Expand All @@ -58,28 +55,24 @@ public class LoggingBuffer {

public LoggingBuffer( int bufferSize ) {
this.bufferSize = bufferSize;
// The buffer overflow protection allows it to be overflowed for 1 item within a single thread.
// Considering a possible high contention, let's set it's max overflow size to be 10%.
// Anyway, even an overflow goes higher than 10%, it wouldn't cost us too much.
buffer = new ArrayList<>( (int) ( bufferSize * 1.1 ) );
buffer = new LinkedBlockingDeque<BufferLine>();
layout = new KettleLogLayout( true );
eventListeners = new CopyOnWriteArrayList<>();
eventListeners = new CopyOnWriteArrayList<KettleLoggingEventListener>();
}

/**
* @return the number (sequence, 1..N) of the last log line. If no records are present in the buffer, 0 is returned.
*/
public int getLastBufferLineNr() {
lock.readLock().lock();
try {
if ( buffer.size() > 0 ) {
return buffer.get( buffer.size() - 1 ).getNr();
} else {
return 0;
}
} finally {
lock.readLock().unlock();
}
public int getLastBufferLineNr() {
//Based on example from https://www.baeldung.com/java-stream-last-element

// Stream<BufferLine> stream = buffer.stream();
// BufferLine line = stream.reduce((first, second) -> second)
// .orElse(null);
// return line != null ? line.getNr() : 0;

BufferLine line = buffer.peekLast();
return line != null ? line.getNr() : 0;
}

/**
Expand All @@ -89,21 +82,28 @@ public int getLastBufferLineNr() {
* @param to
* @return
*/
public List<KettleLoggingEvent> getLogBufferFromTo( List<String> channelId, boolean includeGeneral, int from,
public List<KettleLoggingEvent> getLogBufferFromTo( List<String> channelId, boolean includeGeneral, int from,
int to ) {
lock.readLock().lock();
try {
Stream<BufferLine> bufferStream = buffer.stream().filter( line -> line.getNr() > from && line.getNr() <= to );
//OPTION 1
//This is to FIX the Halting Bug when async concurrent access to the object. NEW Object
Stream<BufferLine> bufferStream=new ConcurrentLinkedQueue<BufferLine>(buffer).stream();

//OPTION 2
//Stream<BufferLine> bufferStream=buffer.stream();

//Improvement, doing all the EVALUATIONS in one iteration, to avoid iterate multiple times

if ( !Utils.isEmpty( channelId ) ) {
bufferStream = bufferStream.filter( line -> {
String logChannelId = getLogChId( line );
return includeGeneral ? isGeneral( logChannelId ) || channelId.contains( logChannelId ) : channelId.contains( logChannelId );
} );
String logChannelId = getLogChId( line );
boolean condition1 = includeGeneral ? channelId.contains( logChannelId ) || isGeneral( logChannelId ) : channelId.contains( logChannelId );
boolean condition2 = line.getNr() > from && line.getNr() <= to;
return condition1 && condition2;
} );
} else {
bufferStream = bufferStream.filter( line -> line.getNr() > from && line.getNr() <= to );
}
return bufferStream.map( BufferLine::getEvent ).collect( Collectors.toList() );
} finally {
lock.readLock().unlock();
}
return bufferStream.map( BufferLine::getEvent ).collect( Collectors.toList());
}

/**
Expand Down Expand Up @@ -154,14 +154,9 @@ public void close() {

public void doAppend( KettleLoggingEvent event ) {
if ( event.getMessage() instanceof LogMessage ) {
lock.writeLock().lock();
try {
buffer.add( new BufferLine( event ) );
while ( bufferSize > 0 && buffer.size() > bufferSize ) {
buffer.remove( 0 );
}
} finally {
lock.writeLock().unlock();
buffer.add( new BufferLine( event ) );
while ( bufferSize > 0 && buffer.size() > bufferSize ) {
buffer.poll();
}
}
}
Expand All @@ -187,12 +182,7 @@ public boolean requiresLayout() {
}

public void clear() {
lock.writeLock().lock();
try {
buffer.clear();
} finally {
lock.writeLock().unlock();
}
buffer.clear();
}

/**
Expand Down Expand Up @@ -222,25 +212,15 @@ public int getNrLines() {
* @param id the id of the logging channel to remove
*/
public void removeChannelFromBuffer( String id ) {
lock.writeLock().lock();
try {
buffer.removeIf( line -> id.equals( getLogChId( line ) ) );
} finally {
lock.writeLock().unlock();
}
buffer.removeIf( line -> id.equals( getLogChId( line ) ) );
}

public int size() {
return buffer.size();
}

public void removeGeneralMessages() {
lock.writeLock().lock();
try {
buffer.removeIf( line -> isGeneral( getLogChId( line ) ) );
} finally {
lock.writeLock().unlock();
}
buffer.removeIf( line -> isGeneral( getLogChId( line ) ) );
}

/**
Expand All @@ -262,17 +242,12 @@ public Iterator<BufferLine> getBufferIterator() {
@Deprecated
public String dump() {
StringBuilder buf = new StringBuilder( 50000 );
lock.readLock().lock();
try {
buffer.forEach( line -> {
LogMessage message = (LogMessage) line.getEvent().getMessage();
buf.append( message.getLogChannelId() ).append( "\t" )
.append( message.getSubject() ).append( "\n" );
} );
return buf.toString();
} finally {
lock.readLock().unlock();
}
buffer.forEach( line -> {
LogMessage message = (LogMessage) line.getEvent().getMessage();
buf.append( message.getLogChannelId() ).append( "\t" )
.append( message.getSubject() ).append( "\n" );
} );
return buf.toString();
}

/**
Expand All @@ -282,12 +257,7 @@ public String dump() {
*/
@Deprecated
public void removeBufferLines( List<BufferLine> linesToRemove ) {
lock.writeLock().lock();
try {
buffer.removeAll( linesToRemove );
} finally {
lock.writeLock().unlock();
}
buffer.removeAll( linesToRemove );
}

/**
Expand All @@ -297,35 +267,12 @@ public void removeBufferLines( List<BufferLine> linesToRemove ) {
*/
@Deprecated
public List<BufferLine> getBufferLinesBefore( long minTimeBoundary ) {
lock.readLock().lock();
try {
return buffer.stream().filter( line -> line.getEvent().timeStamp < minTimeBoundary )
.collect( Collectors.toList() );
} finally {
lock.readLock().unlock();
}
return buffer.stream().filter( line -> line.getEvent().timeStamp < minTimeBoundary )
.collect( Collectors.toList() );
}

public void removeBufferLinesBefore( long minTimeBoundary ) {
// Using HashSet even though BufferLine does not implement hashcode and equals,
// we just need to remove the exact objects we have found and put in the set.
Set<BufferLine> linesToRemove = new HashSet<>();
lock.writeLock().lock();
try {
for ( BufferLine bufferLine : buffer ) {
if ( bufferLine.getEvent().timeStamp < minTimeBoundary ) {
linesToRemove.add( bufferLine );
} else {
break;
}
}
// removeAll should run fast against a HashSet,
// since ArrayList.batchRemove check for each element of a collection given if it is in the ArrayList.
// Thus, removeAll should run in a linear time.
buffer.removeAll( linesToRemove );
} finally {
lock.writeLock().unlock();
}
buffer.removeIf( line -> line.getEvent().timeStamp < minTimeBoundary );
}

public void addLogggingEvent( KettleLoggingEvent loggingEvent ) {
Expand All @@ -346,7 +293,7 @@ private boolean isGeneral( String logChannelId ) {
return loggingObject != null && LoggingObjectType.GENERAL.equals( loggingObject.getObjectType() );
}

private static String getLogChId( BufferLine bufferLine ) {
private String getLogChId( BufferLine bufferLine ) {
return ( (LogMessage) bufferLine.getEvent().getMessage() ).getLogChannelId();
}
}