Java의 동기화 및 Locking 기법 들 (Synchronized, ReentrantLock, Semaphore, Atomic Package, varHandle)

알고자 하는 것 : Synchronized keyword, ReentrantLock, Semaphore, Atomic, varHandle

 

Synchronized

https://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html

synchronized의 경우에는 다양한 기법으로 동시성 제어에 쉽게 사용될 수 있다.

Locks In Synchronized Methods와 Synchronized Statements 으로 나누어서 생각 할 수 있다.

 

Locks In Synchronized Methods

public class SynchronizedCounter {
    private int c = 0;

    public synchronized void increment() {
        c++;
    }

    public synchronized void decrement() {
        c--;
    }

    public int value() {
        return c;
    }
}

 

상기와 같이 Methods 자체에 Syncronized를 설정 함으로써 해당 Method가 한번에 하나의 Thread에 의해서 불리워 짐을 만족 시킨다.

 여기서 한가지 주의 해야 할 것이 있는데 Methods Level의 Synchronized는 한 Class 내에 정의된 모든 Methods에 동일하게 적용된다.

즉 increment() 메서드를 누군가 사용하고 있다면 decrement() 메서드 역시 다른 Thread에서 접근 할 수 없다. 하나의 동시 객체로 인식하게 된다.

그리고 생성자 Method는 Syncronized를 사용할 수 없다.

 

Synchronized Statements

public void addName(String name) {
    synchronized(this) {
        lastName = name;
        nameCount++;
    }
    nameList.add(name);
}

특정 Block 영역 범위로 Synchronized를 제한하게 된다. 상기와 같이 정의 하게 되면

Syncronized 되는 영역은 다음 범위로 좁혀지게 된다.

    synchronized(this) {
        lastName = name;
        nameCount++;
    }

즉 nameList.add(name) 이 부분은 동시 접근 되게 된다.

상기에서 synchronized(this)에서 this는 일반적으로 사용되는 Statement인데 뜻은 이 클래스를 사른 Thread가 Locking 하고 있지 않다면이라는 의미가 된다. 그래서 꼭 this를 쓸 필요는 없다. 임의의 객체를 사용해서 synchronized 처리 해도 상관은 없다.

그래서 그 상관없는 임의의 객체를 사용해서 Locking처리에 유연성을 주는 Class가 Reentrant Class이다.

 

Reentrant 

https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/locks/ReentrantLock.html

 class X {
   private final ReentrantLock lock = new ReentrantLock();
   // ...

   public void m() {
     lock.lock();  // block until condition holds
     try {
       // ... method body
     } finally {
       lock.unlock()
     }
   }
 }

Syncronized라는 특수 keyword를 사용하는 대신 ReentrantLock이라고 하는 객체를 사용해서 특정 Statements에 대한 Locking을 관리하는 것을 볼 수 있다.

일정 범위에 대한 Locking을 처리한다고 하는 것을 볼때는 Syncronized와 다름이 없지만, ReentrantLock을 사용할때는 프로그램적인 개발 선택 범위를 넓혀 주는 강점이 있다.

예를 들자면 Syncronized를 사용할 경우 나의 Thread가 현재 멈추어 있는지? 기아 상태에 빠진건지? 내 Thread에서 해당 Locking을 들고 있는 건지? 등등의 정보를 얻을 수 없다. Syncronized 구문을 마주했을 때 누군가 해당 객체의 Locking을 Release 하기까지 무한히 기다리는 수 밖에 없다.

그래서 Reentrant는 다양한 기능을 제공한다.

대표적인 메서드 몇개만 소개 하겠다.

  • lock() : Lock을 얻어온다.
  • unlock() : Lock을 해제 한다.
  • tryLock() : 일정시간을 기다리던가 아니면 지금 즉시 상태를 확인하고 true, false를 return 한다. true인경우 Lock을 얻는다.
  • isHeldByCurrentThread() : 현재 Thread가 Lock을 얻었는가?
  • hasQueuedThreads() : 해당 객체의 Lock을 얻기위해 기다리는 Threads가 있는가?
  • getOwner() :  지금 Lock을 갖고 있는 Thread를 Return 한다.

여기에 한가지 더 강력한 기능을 제공하는 Reentrant Class가 있는데 바로 ReentrantReadWriteLock Class이다.

DB를 예로 들자면 다수개의 Read와 매우적은 수의 Write가 발생한다고 했을 때 위에서 이야기 한 것처럼 일대일 방식의 Locking은 불합리 하다. 이를 해결하기 위한 Class이다.

https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/locks/ReentrantReadWriteLock.html

예를 들자면 Database가 있다고 생각해 보자. 대부분의 경우 Read만 발생 할 것이다. 반면에 Write는 상대적으로 적게 발생한다. Read 되는 경우에는 여러 Threads가 동시에 들어와서 처리해도 문제가 없다. Write 되는 경우에는 해당 객체에 대한 접근을 제한하고 Lock을 잡은 Thread만 해당 업데이트 역할을 할 수 있게 만들어야 한다. 이러한 요구조건을 만족하기 위한 Class라고 생각하면 쉽게 이해가 될 것이다.

핵심을 말하자면 

  • Read Lock : 동시에 여러 Thrads가 접근가능함
  • Write Lock : 오직 하나의 Thread에서만 접근 가능함

이다. 하기 링크를 참고 하자. 

https://www.codejava.net/java-core/concurrency/java-readwritelock-and-reentrantreadwritelock-example

그런데 Syncronized나 Reentrant나 Threads Pool의 접근에 대해서는 개발 접근이 쉽지 않다. 즉 하나의 Resource를 동시에 하나의 Thread가 접근하는 것은 문제가 없어 보이는데, 하나의 Type으로 갖고 있는 여러개의 Resource를 여러개의 Thread가 동시 접근 제어 가능하게 하는 것은 어떻게 해야할까? 예를 들면 http 서버를 만드는데 Threads Pool을 관리하면서 들어오는 Request를 해당 Pool의 idle Thread에게 배당하려면 어떻게 해야할까? 그래서 필요한게 Semaphore 이다.

Semaphore

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html

class Pool {
   private static final int MAX_AVAILABLE = 100;
   private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

   public Object getItem() throws InterruptedException {
     available.acquire();
     return getNextAvailableItem();
   }

   public void putItem(Object x) {
     if (markAsUnused(x))
       available.release();
   }

   // Not a particularly efficient data structure; just for demo

   protected Object[] items = ... whatever kinds of items being managed
   protected boolean[] used = new boolean[MAX_AVAILABLE];

   protected synchronized Object getNextAvailableItem() {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (!used[i]) {
          used[i] = true;
          return items[i];
       }
     }
     return null; // not reached
   }

   protected synchronized boolean markAsUnused(Object item) {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (item == items[i]) {
          if (used[i]) {
            used[i] = false;
            return true;
          } else
            return false;
       }
     }
     return false;
   }

 }

해당 Class의 주요 Method는 다음과 같다

  • Semaphore​(int permits) : permits 갯수 만큼 세마포어의 Pool이 사용된다고 표시
  • acquire(int permits) : permits 갯수 만큼 세마포어의 Pool이 사용된다고 표시한다. 없으면 하나
  • availablePermits() : 사용가능한 Permits 갯수
  • drainPermits() : 현 모든 Permits을 취소하고 Availables 상태로 변화 시킨다
  • reducePermits​(int reduction) : permit 갯수를 줄인다
  • tryAcquire​(int permits, long timeout, TimeUnit unit) : 일정 시간 또는 일정 permit 갯수를 기다렸다가 실패 하면 false 처리한다 . true되면 acquire(int permits)과 같은 효과를 낸다.
  • release​(int permits) :사용된Permit을 반환한다.

https://www.baeldung.com/java-semaphore

다양한 곳에서 많이 사용하지만 Subscriber/Publisher Pattern을 만들때 대표적으로 사용한다고 한다.

예를 들자면

package sample;

import java.util.concurrent.Semaphore;
import java.util.logging.Logger;

public class SemaphoreTest {
    static public Logger logger = Logger.getLogger(SemaphoreTest.class.getSimpleName());
    static public String data = "none";

    public static void main(String[] args) throws Exception {
        Semaphore publishers = new Semaphore(0);
        Semaphore subscribers = new Semaphore(1);
        
        Thread publisherThread = new Thread(() -> {
            while (true) {
                logger.info("[Publish Thread Start] " + publishers.availablePermits() + " / " + subscribers.availablePermits() + " / " + data);
                try {
                    subscribers.acquire();
                    logger.info("[Publish Thread] " + "push Publish Data for one second");
                    Thread.sleep(1000);
                    data = "publisher";
                    publishers.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                logger.info("[Publish Thread End] " + publishers.availablePermits() + " / " + subscribers.availablePermits() + " / " + data);
            }

        });

        Thread subscriberThread = new Thread(() -> {
            while (true) {
                logger.info("[Subscriber Thread Start] " + publishers.availablePermits() + " / " + subscribers.availablePermits() + " / " + data);
                try {
                    publishers.acquire();
                    logger.info("[Subscriber Thread] " + "use Publish Data for one second");
                    Thread.sleep(1000);
                    data = "subscriber";
                    subscribers.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                logger.info("[Subscriber Thread End] " + publishers.availablePermits() + " / " + subscribers.availablePermits() + " / " + data);
            }

        });

        publisherThread.start();
        subscriberThread.start();

    }
}

이 코드의 경우는 아래와 같은 순서로 작동이 된다.

Subscriber가 Publisher Semaphore를 얻을 수 없는 상태 (0이 시작값이라) 가 된다. 이후 Publisher가 Subscriber Semaphore를 얻고 Publish할 Data를 write하고 Publisher Semaphore를 Release함으로써 Subscriber가 작동 할 수 있게 한다. 이후 다시 Subscriber를 Release 함으로써 Publisher가 작동 가능하도록 하는 뭐랄까... 먹고 먹히는? 그런 관계의 Thread를 만들 수 있다. 

Semaphore의 경우 다음 특이사항을 명심해야 한다.

  • Thread와 다르게 Owner가 없다 어느 Thread 든 Semaphore를 얻고 릴리즈 할 수 있다.
  • 하나의 Thread에서 여러차례 Semaphore Acquire를 할 수있다. 그래서 Semaphore가 1인 상태에서 연속으로 Acquire를 2번 부르면, 다른 Thread에서 Release 해주기전에는 영원히 멈춘다.
  • A Thread가 Acquire하고 리소스 처리도중 B Thread가 Release하게 되면 A thread의 Acquire는 사라진다.

 

Condition

2개 이상의 Thread가 있고, 각 Thread는 다른 Thread의 상태를 참고한다고 할때 Thread를 Condition에 따라서 작동 하게 하는 Class가 Condition Class이다.

package sample;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;

public class InterThread {
    static Logger logger = Logger.getLogger(InterThread.class.getSimpleName());

    static class Holder {
        String username = null;
        String password = null;

        public String getUsername() {
            return username;
        }

        public synchronized void setUsername(String username) {
            this.username = username;
        }

        public String getPassword() {
            return password;
        }

        public synchronized void setPassword(String password) {
            this.password = password;
        }
    }

    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        Holder holer = new Holder();

        Thread waitThread = new Thread(()-> {
            lock.lock();
            logger.info("[Wait Thread Locking]");

            try {
                while (holer.username == null || holer.password == null) {
                    logger.info("[Wait Thread await]");
                    condition.await();
//                    Thread.sleep(0);
                }
                logger.info("[Wait Thread Progress]");
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }

            logger.info("[Wait Thread End]");
        });

        Thread conditionThread = new Thread(new Runnable() {
            @Override
            public void run() {
                logger.info("[Condition Thread Start]");

                lock.lock();
                try {
                    Thread.sleep(1000);
                    holer.setPassword("def");
                    holer.setUsername("abc");
                    logger.info("[Condition Thread Send Signal]");
                    condition.signal();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                    logger.info("[Condition Thread End]");

                }
            }
        });

        waitThread.start();
        conditionThread.start();
    }
}

 

아래 코드는 reentrantLock을 이용해서 Condition Class를 가져오는 방법을 보여주고 있다.

        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

어떤 Condition에 따라서 Lock을 해제할지는 다음 holder Class의 값으로 결정하고자 한다.

    static class Holder {
        String username = null;
        String password = null;

        public String getUsername() {
            return username;
        }

        public synchronized void setUsername(String username) {
            this.username = username;
        }

        public String getPassword() {
            return password;
        }

        public synchronized void setPassword(String password) {
            this.password = password;
        }
    }

즉 username과 password가 모두 있을 때 작동 하는 Thread를 만들고자 한다.

        Thread waitThread = new Thread(()-> {
            lock.lock();
            logger.info("[Wait Thread Locking]");

            try {
                while (holer.username == null || holer.password == null) {
                    logger.info("[Wait Thread await]");
                    condition.await();
                }
                logger.info("[Wait Thread Progress]");
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }

            logger.info("[Wait Thread End]");
        });

 

위 코드를 보면 holder의 username과 password 중 하나라도 널이면 condition은 await상태에 머무른다.

이때 await는 lock.lock()을 lock.unlock()과 동일한 상태로 만들고 codition signal이 올 때까지 기다리는 상태가 된다.

        Thread conditionThread = new Thread(new Runnable() {
            @Override
            public void run() {
                logger.info("[Condition Thread Start]");

                lock.lock();
                try {
                    Thread.sleep(1000);
                    holer.setPassword("def");
                    holer.setUsername("abc");
                    logger.info("[Condition Thread Send Signal]");
                    condition.signal();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                    logger.info("[Condition Thread End]");

                }
            }
        });

ConditionThread에서는 username과 password를 넣고 signal을 호출하면서 await하고 있는 Thread를 깨우는 역할을 한다.

그러나 근본적으로 어떤 Resource를 Locking한다고 하면 다음과 같은 내재된 문제는 해결되지 않는다.

  • 여러 스레드가 하나의 Resource를 Locking하면 스레드가 늘어날 수록 속도가 느려진다.
  • Dead Lock이 발생할 경우 해결이 어렵고, 내재된 발생 가능성도 높기 때문에 개발이 복잡해 진다.
  • High Priority(예, UI), Lower Priority(예, Background)간에 Race가 일어 났을 때 경우에 따라서 High Priority Job이 Lower Priority 잡으로 인해(Resource가 Lock되어 있음으로) 서비스 속도가 전체적으로 느려 보일 수 있다.

가장 좋은 방법은 공유되는 Resource가 Locking 되지 않게 하는 것이다.

즉 HW Operation이 1회 이상 일어 나게 하지 않는 것이 해답이다.

예를 들어 a++ 라는 코드는 

  • a를 읽어 온다
  • a에 1을 더한다
  • a에 더해진 결과 값을 set 한다.

라는 3회의 Operations가 진행 되기 때문에 Locking이 필요하게 된다.

그래서 우리는 1회의 Operation만으로 해결 되는 것을 보고 Atomic 하다고 하는데 Atomic한 대상을 확인해 보자

Atomic한 대상

  • Primitive Type (int, boolean, char, byte, short 등) 읽기/쓰기 , 단 long, double은 제외
    long, double의 경우 내부적으로 소수점을 중심으로 왼쪽 오른쪽을 나누어서 2개로 내부 value 관리함
  • 모든 Reference : 객체의 Memory 위치 수정
  • Volatile Primitive Types (long, double 포함) 및 Reference 읽기 쓰기
    이경우는 Main Memory (CPU Chach 영역을 거치지 않음)를 직접 수정하는 방식
  • java.util.concurrent.atomic Package 이하 Class 사용
    주의 해야할 것은 AtomicX Package를 사용하더라도 Method를 2회 연속 Call 하면 Atomic이 깨진다.

아래 코드는 int와 atomicX를 이용한 개발을 비교한 코드이다.

int는 Syncronized를 이용한 Locks In Synchronized Methods를 사용하였고 한쪽은 AtomicInteger를 사용하였다.

둘다 0이 나오면 기대 결과 값이 된다.

package sample;

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicX {

    public static void main(String[] args) {
        int count = 0;

        Counter counter = new Counter(count);
        CounterX counterX = new CounterX(new AtomicInteger(count));

        DecrementingThread intDeCounter = new DecrementingThread(counter);
        IncrementingThread intInCounter = new IncrementingThread(counter);
        intDeCounter.start();
        intInCounter.start();


        System.out.println("Counter = " + counter.getCount());

        DecrementingThread atomicDeCounter = new DecrementingThread(counterX);
        IncrementingThread atomicInCounter = new IncrementingThread(counterX);
        atomicDeCounter.start();
        atomicInCounter.start();

        System.out.println("CounterX = " + counterX.getCount());

    }

    public static class DecrementingThread extends Thread {
        CountIf count;

        public DecrementingThread(CountIf count) {
            this.count = count;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                this.count.decrement();
            }
        }
    }

    public static class IncrementingThread extends Thread {
        CountIf count;

        public IncrementingThread(CountIf count) {
            this.count = count;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                this.count.increment();
            }
        }
    }

    private static interface CountIf {
        public void increment();
        public void decrement();
        public int getCount();
    }

    private static class Counter implements CountIf{

        private int count = 0;

        public Counter(int count) {
            this.count = count;
        }

        public synchronized void increment() {
            this.count++;
        }

        public synchronized void decrement() {
            this.count--;
        }

        public int getCount(){
            return count;
        }

    }

    private static class CounterX implements CountIf{

        private AtomicInteger count;

        public CounterX(AtomicInteger count) {
            this.count = count;
        }

        public void increment() {
            count.incrementAndGet();
        }

        public void decrement() {
            count.decrementAndGet();
        }

        public int getCount(){
            return count.get();
        }

    }


}

결과값은 아래와 같다.

Counter = 0
CounterX = 0

속도상으로는 Atomic X Package가 더 좋다고 한다.

마지막으로 Stack Structure를 일반적인 Syncronized 방식으로 개발했을 때와 Atomic Reference로 개발했을 때의 속도 차이를 테스트한 코드이다.

돌려보면 Atomic Reference로 개발 한 것이 약 4배에서 8배 가량 빠른다.

(본 코드는 https://www.udemy.com/course/java-multithreading-concurrency-performance-optimization/를 참조했다. MIT 라이센스)

package sample;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

public class Stack {

    public static void StandardProcess() throws Exception{
        StandardStack<Integer> stack = new StandardStack<>();
        Random random = new Random();

        for (int i = 0; i < 100000; i++) {
            stack.push(random.nextInt());
        }

        List<Thread> threads = new ArrayList<>();

        int pushingThreads = 2;
        int popingThreads = 2;

        for (int i = 0; i < pushingThreads; i++) {
            Thread thread = new Thread(() -> {
                while (true && !Thread.interrupted()) {
                    stack.push(random.nextInt());
                }
            });
            thread.setDaemon(true);
            threads.add(thread);
        }

        for (int i = 0; i < popingThreads; i++) {
            Thread thread = new Thread(() -> {
                while (true && !Thread.interrupted()) {
                    stack.pop();
                }
            });
            thread.setDaemon(true);
            threads.add(thread);
        }

        for (Thread thread : threads) {
            thread.start();
        }

        Thread.sleep(1000 * 10);

        for (Thread thread : threads) {
            thread.interrupt();
        }
        System.out.println(String.format("Standard %d operations were performed in 10 seconds", stack.getOperationCounter()));
    }


    public static void AtomicProcess() throws Exception{
        AtomicStack<Integer> stack = new AtomicStack<>();
        Random random = new Random();

        for (int i = 0; i < 100000; i++) {
            stack.push(random.nextInt());
        }

        List<Thread> threads = new ArrayList<>();

        int pushingThreads = 2;
        int popingThreads = 2;

        for (int i = 0; i < pushingThreads; i++) {
            Thread thread = new Thread(() -> {
                while (true && !Thread.interrupted()) {
                    stack.push(random.nextInt());
                }
            });
            thread.setDaemon(true);
            threads.add(thread);
        }

        for (int i = 0; i < popingThreads; i++) {
            Thread thread = new Thread(() -> {
                while (true && !Thread.interrupted()) {
                    stack.pop();
                }
            });
            thread.setDaemon(true);
            threads.add(thread);
        }

        for (Thread thread : threads) {
            thread.start();
        }

        Thread.sleep(1000 * 10);

        for (Thread thread : threads) {
            thread.interrupt();
        }
        System.out.println(String.format("Atomic %d operations were performed in 10 seconds", stack.getOperationCounter()));
    }


    public static void main(String[] args) throws Exception{
        Stack.StandardProcess();
        Stack.AtomicProcess();
    }


    public static class AtomicStack<T> {
        AtomicReference<StackNode<T>> head = new AtomicReference<>();
        private AtomicInteger operationCounter = new AtomicInteger(0);

        public void push(T value) {
            StackNode<T> newHead = new StackNode<>(value);

            while (true) {
                StackNode<T> currentHeadNode = head.get();
                newHead.next = currentHeadNode;

                if (head.compareAndSet(currentHeadNode, newHead)) { //현재의 head가 변하지 않았다면, newHead Node를 Head에 set한다.
                    break;
                } else {
                    LockSupport.parkNanos(1);
                }
            }
            operationCounter.incrementAndGet();
        }

//        public T pop(){ //2억건
//            StackNode<T> currentHead;// = head.get();
//            StackNode<T> nextHead;
//
//            while (true) {
//                currentHead = head.get();
//
//                if (currentHead == null) {
//                    operationCounter.incrementAndGet();
//                    return null;
//                }
//
//                nextHead = currentHead.next;
//
//                T value = currentHead.value;
//
//                if (head.compareAndSet(currentHead, nextHead)) {
//                    operationCounter.incrementAndGet();
//
//                    return value;
////                    break;
//                } else {
//                    LockSupport.parkNanos(1);
////                    currentHead = head.get();
//                }
//            }
//
//////            if (currentHead == null) {
////            operationCounter.incrementAndGet();
////            return null;
//////            }
//        }

        public T pop() { //8억건
            StackNode<T> currentHeadNode = head.get();
            StackNode<T> newHeadNode;

            while (currentHeadNode != null) {
                newHeadNode = currentHeadNode.next;
                if (head.compareAndSet(currentHeadNode, newHeadNode)) {
                    break;
                } else {
                    LockSupport.parkNanos(1);
                    currentHeadNode = head.get();
                }
            }
            operationCounter.incrementAndGet();
            return currentHeadNode != null ? currentHeadNode.value : null;
        }



        public int getOperationCounter() {
            return operationCounter.get();
        }
    }

    public static class StandardStack<T> {
        private StackNode<T> head;
        private int operationCounter = 0;

        public synchronized void push(T value) {
            StackNode<T> newHead = new StackNode<>(value);
            newHead.next = head;
            head = newHead;
            operationCounter++;
        }


        public synchronized T pop(){
            if (head == null) {
                operationCounter++;
                return null;
            }

            T value = head.value;
            head = head.next;
            operationCounter++;
            return value;
        }

        public int getOperationCounter() {
            return operationCounter;
        }
    }

    private static class StackNode<T>{
        public T value;
        public StackNode<T> next;

        public StackNode(T value) {
            this.value = value;
        }
    }
}

 

그렇다면 어떻게 Atomic X Package는 Syncronized보다 빠른 Class를 제공 가능한 것인가?

이 비밀은 Java 9.0에서부터 정식 제공하고 있는 

https://docs.oracle.com/javase/9/docs/api/java/lang/invoke/VarHandle.html

로 해당 기능을 제공 하고 있다.

 

varHandle

여기 저기 찾아 보긴 했는데 생각보다 설명이 자세히 된 곳이 많지는 않아서...

혹시 하기 내용 중 틀린 내용이라고 생각되는 부분이 있으면 알려주세요.

전체적인 varHandle의 구조는 아래와 같이 작동 하는 것으로 보인다.

즉 volatile Memory를 varHandle을 직접 접근함으로써 Lock Free 상태를 만들어 주는데, 개발차원에서 직접적인 volatile을 일일이 처리하는것은 위험성이 높은 관계로 varHandle이 Proxy 역할을 하면서 Thread에서의 동시 접근을 처리하는 다양한 Methods를 제공해주는 방식이다.

varHandle 내부 소스를 보면 대부분 Reflection API를 이용해서 Memory 접근을 처리하고 있다. 그래서 더 깊히 파지는 않았다.

varHandle을 사용하는 방식은 반듯이 다음 Guide 를 따라야 한다.

  • 동시접근 대상은 Volatile로 정의
  • varHandle은 static final 처리 및 이름은 모두 대문자로 정의
  • Method Handles을 통한 대상 Varialbe lookup
    public volatile int publicTestVariable = 1;
    
    public static final VarHandle PUBLIC_TEST_VARIABLE;
    static {
        try {
            PUBLIC_TEST_VARIABLE = MethodHandles.lookup()
                        .in(VarHandlesUnitTest.class)
                        .findVarHandle(VarHandlesUnitTest.class, "publicTestVariable", int.class);
        } catch (ReflectiveOperationException e) {
            throw new ExceptionInInitializerError(e);
        }
    }

주의 할 것은 public과 private는 MethodsHandles를 사용하는 방법이 조금 다르다.

package sample;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.lang.reflect.AnnotatedTypeVariable;
import java.util.concurrent.atomic.AtomicReference;

public class VarHandlesUnitTest<V> {
    public volatile int publicTestVariable = 1;
    private volatile int privateTestVariable = 1;
    public int variableToSet = 1;
    public int variableToCompareAndSet = 1;
    public int variableToGetAndAdd = 0;
    public byte variableToBitwiseOr = 0;

    public static final VarHandle PUBLIC_TEST_VARIABLE;
    static {
        try {
            PUBLIC_TEST_VARIABLE = MethodHandles.lookup()
                        .in(VarHandlesUnitTest.class)
                        .findVarHandle(VarHandlesUnitTest.class, "publicTestVariable", int.class);
        } catch (ReflectiveOperationException e) {
            throw new ExceptionInInitializerError(e);
        }
    }

    public static final VarHandle PRIVATE_TEST_VARIABLE;
    static {
        try {
            PRIVATE_TEST_VARIABLE = MethodHandles.privateLookupIn(VarHandlesUnitTest.class, MethodHandles.lookup())
                    .findVarHandle(VarHandlesUnitTest.class, "privateTestVariable", int.class);
        } catch (ReflectiveOperationException e) {
            throw new ExceptionInInitializerError(e);
        }
    }

    public static void main(String[] args) {
        VarHandlesUnitTest varHandlesUnitTest = new VarHandlesUnitTest();
        varHandlesUnitTest.publicTestVariable = 2;

        VarHandlesUnitTest varHandlesUnitTest2 = new VarHandlesUnitTest();
        varHandlesUnitTest2.privateTestVariable = 3;

        System.out.println(VarHandlesUnitTest.PUBLIC_TEST_VARIABLE.get(varHandlesUnitTest)); // read Access
        System.out.println(VarHandlesUnitTest.PRIVATE_TEST_VARIABLE.get(varHandlesUnitTest2)); // read Access

        VarHandlesUnitTest.PUBLIC_TEST_VARIABLE.set(varHandlesUnitTest, 4); //write Access
        VarHandlesUnitTest.PRIVATE_TEST_VARIABLE.set(varHandlesUnitTest2, 6); //write Access

        System.out.println(VarHandlesUnitTest.PUBLIC_TEST_VARIABLE.get(varHandlesUnitTest)); // read Access
        System.out.println(VarHandlesUnitTest.PRIVATE_TEST_VARIABLE.get(varHandlesUnitTest2)); // read Access

        VarHandlesUnitTest.PUBLIC_TEST_VARIABLE.compareAndSet(varHandlesUnitTest, 4, 8); //write Access
        VarHandlesUnitTest.PRIVATE_TEST_VARIABLE.compareAndSet(varHandlesUnitTest2, 6,12); //write Access

        System.out.println(VarHandlesUnitTest.PUBLIC_TEST_VARIABLE.get(varHandlesUnitTest)); // read Access
        System.out.println(VarHandlesUnitTest.PRIVATE_TEST_VARIABLE.get(varHandlesUnitTest2)); // read Access
    }

}

상기 코드는 

https://www.baeldung.com/java-variable-handles

을 참고 하였다.

그럼 varHandle을 이용해서 다음과 같이 3가지 Thread를 비교 테스트 해보았다.

테스트 기준은 Increment Thread와 decrement Thread를 1하나 차이만 남기고 9999999번 더하고 9999998번 빼는 Condition Race를 만들어서 얼마의 속도 차이가 나는지 하는 테스트이다.

속도도 그렇고 정합성 테스트도 겸사 겸사 할겸 만들어 봤다.

잘 만든건진 모르겠지만 테스트 결과는 아래와 같다. 밀리세컨드 기준이다.

495 [ Counter ] = 1
253 [ CounterX ] = 1
197 [ Var Handle Counter CounterX] = 1

Syncronized를 사용한게 Counter

Atomic을 사용한게 CounterX

varHandle을 사용한게 VarHandleCounter이다.

간단한 차이만을 보자면 varHandle을 사용한 것이 가장 빠르게 처리 되었다.

Atomic보다 더빠르게 구현하려고 하면 VarHandle을 직접 구현해 보는 것도 좋을 것 같다.

전체 테스트 코드는 아래와 같다.

package sample;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.text.CollationElementIterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

public class AtomicX {

    public static void main(String[] args) throws Exception{
        int count = 0;

        Counter counter = new Counter(count);
        CounterX counterX = new CounterX(new AtomicInteger(count));
        VarHandleCounter counterVar = new VarHandleCounter(count);

        DecrementingThread intDeCounter = new DecrementingThread(counter);
        IncrementingThread intInCounter = new IncrementingThread(counter);
        long tmp = System.currentTimeMillis();

        intDeCounter.start();
        intInCounter.start();
        intDeCounter.join();
        intInCounter.join();

        System.out.println(System.currentTimeMillis() - tmp + " [ Counter ] = " + counter.getCount());

        DecrementingThread atomicDeCounter = new DecrementingThread(counterX);
        IncrementingThread atomicInCounter = new IncrementingThread(counterX);
        tmp = System.currentTimeMillis();

        atomicDeCounter.start();
        atomicInCounter.start();
        atomicDeCounter.join();
        atomicInCounter.join();

        System.out.println(System.currentTimeMillis() - tmp + " [ CounterX ] = " + counterX.getCount());

        DecrementingThread varHandleDeCounter = new DecrementingThread(counterVar);
        IncrementingThread varHandleCounter = new IncrementingThread(counterVar);

        tmp = System.currentTimeMillis();

        varHandleDeCounter.start();
        varHandleCounter.start();
        varHandleDeCounter.join();
        varHandleCounter.join();

        System.out.println(System.currentTimeMillis() - tmp + "  [ Var Handle Counter CounterX] = " + counterVar.getCount());

    }

    public static class DecrementingThread extends Thread {
        CountIf count;

        public DecrementingThread(CountIf count) {
            this.count = count;
        }

        @Override
        public void run() {
            for (int i = 0; i < 9999999 - 1; i++) {
//                System.out.println(Thread.currentThread().getName() + "] Decrement " );
                this.count.decrement();
            }
        }
    }

    public static class IncrementingThread extends Thread {
        CountIf count;

        public IncrementingThread(CountIf count) {
            this.count = count;
        }

        @Override
        public void run() {
            for (int i = 0; i < 9999999; i++) {
//                System.out.println(Thread.currentThread().getName() + "] Increment " );
                this.count.increment();
            }
        }
    }

    private static interface CountIf {
        public void increment();
        public void decrement();
        public int getCount();
    }
    private static class VarHandleCounter implements CountIf{
        private volatile int privateTestVariable = 0;

        public static final VarHandle PRIVATE_TEST_VARIABLE;
        static {
            try {
                PRIVATE_TEST_VARIABLE = MethodHandles.privateLookupIn(VarHandleCounter.class, MethodHandles.lookup())
                        .findVarHandle(VarHandleCounter.class, "privateTestVariable", int.class);
            } catch (ReflectiveOperationException e) {
                throw new ExceptionInInitializerError(e);
            }
        }

        public VarHandleCounter(int count){
            this.privateTestVariable = count;
        }

        @Override
        public void increment() {
            while(true){
                int tempValue = (int)PRIVATE_TEST_VARIABLE.get(this);

                if(PRIVATE_TEST_VARIABLE.compareAndSet(this, tempValue, tempValue + 1)){
                    break;
                } else {
                    LockSupport.parkNanos(1);
                    tempValue = (int)PRIVATE_TEST_VARIABLE.get(this);
                }
            }
        }

        @Override
        public void decrement() {
            while(true){
                int tempValue = (int)PRIVATE_TEST_VARIABLE.get(this);

                if(PRIVATE_TEST_VARIABLE.compareAndSet(this, tempValue, tempValue - 1)){
                    break;
                } else {
                    LockSupport.parkNanos(1);
                    tempValue = (int)PRIVATE_TEST_VARIABLE.get(this);
                }
            }
        }

        @Override
        public int getCount() {
            return (int)PRIVATE_TEST_VARIABLE.get(this);
        }
    }

    private static class Counter implements CountIf{

        private int count = 0;

        public Counter(int count) {
            this.count = count;
        }

        public synchronized void increment() {
            this.count++;
        }

        public synchronized void decrement() {
            this.count--;
        }

        public int getCount(){
            return count;
        }

    }

    private static class CounterX implements CountIf{

        private AtomicInteger count;

        public CounterX(AtomicInteger count) {
            this.count = count;
        }

        public void increment() {
            count.incrementAndGet();
        }

        public void decrement() {
            count.decrementAndGet();
        }

        public int getCount(){
            return count.get();
        }
    }


}

마지막으로 varHandle을 직접 Control 하기 위해서는 Memory ordering에 대한 이해가 필요하다.

간단하게만 말하자면 다음과 같다.

상기 그림을 보면 Compile 이후에 개발자가 작성하였던 int x= 5와 int y = 10이 사라진것을 볼수 있다.

이는 Compiler가 코드 최적화를 위해서 Memory의 Ordering를 자의적으로 변경이 가능하기 때문이다.

이때문에 발생하는 Memory Ordering이 개발자의 예상에 빗나가는 경우 매우 해결하기 어려운 문제가 발생한다.

이를 해결하기 위해서 Syncronized를 사용하거나 volatile을 섞어서 사용하게 되면 ordering이 만족 되게 되는데

개발시 이를 검토하면서 개발하긴 매우 어려운 문제이다.

이를 해결하기위해서 varHandle은 다음과 같은 Methods를 제공하고 있다.

  • acquireFence​() : Ensures that loads before the fence will not be reordered with loads and stores after the fence.
  • fullFence​() : Ensures that loads and stores before the fence will not be reordered with loads and stores after the fence.
  • loadLoadFence​() : Ensures that loads before the fence will not be reordered with loads after the fence.
  • releaseFence​(): Ensures that loads and stores before the fence will not be reordered with stores after the fence.

상기 영어를 번역하지 않은 이유는 각 Methds가 정상 동작하는지 여부를 test case 만들어서 확인해 보고 싶었는데.

어떻게 해야할지 몰라서 이해를 한 후에 다시 업데이트 하고자한다.

728x90
반응형

'JAVA' 카테고리의 다른 글

Spring Boot MongoDB 설정하기  (0) 2021.06.22
Spring Boot Profile 설정  (0) 2021.06.16
javaFX java version 11이상에서 실행하기  (2) 2021.05.28
Thread Demon & Join  (0) 2021.05.25
Thread 기본 코드  (0) 2021.05.24