BigData

Apache Spark로 집계하기 클러스터 컴퓨팅으로 집계 해결 (1)

IT오이시이 2017. 6. 14. 14:07
728x90


Aggregating with Apache Spark(1)

Solving aggregation with cluster computing


http://www.itworld.com/article/3184109/analytics/aggregating-with-apache-spark.html


집합체 수학이되는 정의 A와 "결과 총 전체 것을 의미하지 않고 조립을 또는 그룹의 구성 요소 또는 부품을 추가하거나 함께 모든 구성 요소를 바꾸어 도착 집단 금액 합계 질량." 데이터 집계에는 로그 집계, 공간 집계 및 네트워크 집계가 포함되지만 집계에는 항상 합산 또는 수집과 관련이 있습니다. 이 기사에서는 번개 빠른 클러스터 컴퓨팅에 널리 사용되는 최상위 Apache 프로젝트 인 Apache Spark에서 집계 기법을 살펴 보겠습니다.

집계는 이론상 추상적이지만, 우리는 현실 세계에서 항상 사용합니다. 동쪽, 서쪽, 남쪽 및 북쪽의 4 사분면으로 나뉘는 정원 (JavaWorld 정원이라고 부름)을 상상해보십시오. 각 사분면에는 Alba에서 Mini-flora, Polyantha에 이르기까지 다양한 품종의 아름다운 장미가 조화를 이루고 있습니다. 정원의 각 사분면에 3 명이 배정되고 각 사분면의 4 분면에있는 장미를 모두 골라야합니다. 총 12 명이 정원에서 장미를 따기입니다.

우리의 임무는 고른 모든 꽃의 총수를 찾는 것입니다. 우리는 그 숫자를 12로 나눠서 각 사람이 뽑은 평균 꽃 수를 결정합니다.

다운로드
이 기사에서 설명하는 예제 애플리케이션의 소스 코드는 "Apache Spark로 집계"를 참조하십시오. Ravishankar Nair가 JavaWorld를 위해 작성한 것입니다.

Streams API를 사용한 집계

처음에는 문제 설명이 매우 간단 해 보입니다. 첫 번째 부분을 고려해 봅시다 : 전체 정원에서 골라낸 총 장미 수를 찾습니다. Java 8부터는 Stream일련의 요소를 표현하고 해당 요소에 대해 다양한 연산 연산을 수행 할 수 있습니다 . 이 경우 배열의 요소를 처리하여 다음과 같이 다른 함수를 사용할 수있는 함수로 줄일 수 있습니다.

Listing 1. 스트림이있는 집계


import java.util.stream.IntStream;
public class  NormalAggregate
{
public static void main(String[] args)
{
/* First store number of flowers picked by each person in an
array called flowers */
int[] flowers = new int[]{11,12,13,24,25, 26, 35,36,37, 24,15,16};
int noofpersons=12;
   int sum = IntStream.of(flowers).reduce( 0,(a, b) -> a + b);
   System.out.println("The no of flowers aggregated: " + sum);
System.out.println("The average flowers picked per person: " + (float)sum/(float)noofpersons);
  System.out.println("Another way to find aggregate :" + IntStream.of(flowers).sum());
}
}

시내 및 파이프

Unix 에서 파이프로 작업 한 적이 있다면 Listing 1의 코드가 친숙 해 보일 것이다. 다음은 8 월 한 달 동안 Apache 로그를 필터링하는 파이프 사용 방법입니다.

Listing 1. 파이프로 필터링하기


ls -al | grep Aug

그 이름에서 알 수 있듯이, pipe ( |)는 유닉스 파이프 라인의 한 구성 요소이며, 하나의 명령에서 출력을 가져 와서 다음 명령으로 보내 게합니다. Java의 문제로 돌아가서, 먼저 각 사람이 선택한 꽃의 수를 배열이라는 배열에 저장합니다 flowers. 그런 다음 람다 함수를 사용하여 누적 값 이 0 인 요소를 추가합니다 . 내부적으로 매개 변수는 배열의 초기 요소에서 시작하여 마지막 요소에 도달 할 때까지 연속하여 추가됩니다.

Listing 2. Stream을 사용하여 집계 계산하기


int accumulator = 0;
for( int i = 0; i < flowers.length; i++)
accumulator += flowers[i];

여태까지는 그런대로 잘됐다. 이제로드를 분산 할 때 어떤 일이 발생하는지 살펴 보겠습니다.

더 나은 접근법 : 자바 쓰레드를 이용한 병렬 집계

멀티 코어 CPU는 이제 우리가 집계를 해결하는 데있어 다중 스레드 접근 방식을 취할 수있을 정도로 일반적입니다. 예를 들어 Intel i7 4 코어 프로세서가 있다고 가정하면 사용 가능한 각 프로세서의 상한 및 하한으로 배열을 나누는 것으로 시작합니다. 그런 다음 집계를 계산합니다. Listing 3은 솔루션의 가장 중요한 부분을 보여준다. 나머지는 이 기사 의 소스 코드 에서 찾을 수 있습니다 .

Listing 3. 멀티 코어 프로세싱을 사용한 멀티 스레드 계산


public static int parallelAggregate(int[] arr, int threads)
    {
        int size = (int) Math.ceil(arr.length * 1.0 / threads);
        ParallelAggregation[] individualTotals = new ParallelAggregation[threads];
        for (int i = 0; i < threads; i++) {
            individualTotals[i] = new ParallelAggregation(arr, i * size, (i + 1) * size);
            individualTotals[i].start();
        }
        try {
            for (ParallelAggregation sum : indivdualTotals) {
                sum.join();
            }
        } catch (InterruptedException e) { }
        int total = 0;
        for (ParallelAggregation sum : individualTotals) {
            total += sum.getPerThreadAggregate();
        }
        return total;
    }

Listing 3의 함수는 배열의 크기를 스레드 수 사이에서 거의 균등하게 나눔으로써 시작된다. 그런 다음 각각의 합계를 취하여 병렬 스레드를 실행할 수 있습니다. 마지막으로 모든 합계를 더하여 집계를 얻습니다.

위의 메소드를 호출하는 방법은 다음과 같습니다.


public static int parallelAggregate(int[] arr){
return parallelAggregate(arr, Runtime.getRuntime().availableProcessors());

그림 1은 독립형 시스템에서이 테스트를 실행하는 데 사용 된 프로세서 수의 스크린 샷입니다.

jw sparkaggregate fig1Ravishankar Nair

그림 1. 사용 가능한 프로세서 수

Listing 3의 함수는 사용 가능한 모든 프로세서를 사용하며 각 프로세서는 자체 스레드를 실행한다. 그림 2는 인텔 i7 프로세서의 세부 사항을 보여줍니다.

jw sparkaggregate fig2Ravishankar Nair

그림 2. 인텔 프로세서 세부 정보 스크린 샷

그림 2는 i7 Intel 프로세서에서 최대 4 개의 스레드를 사용할 수 있음을 보여 주므로 프로그램에서 배열의 요소 수를 4 개의 동일한 크기로 나누고 집계를 계산합니다. 비교를 위해 스레드 레스 및 다중 스레드 집계를 사용하여 집계를 실행합니다. 재미있게하기 위해 5 억 명의 꽃 피커가 있다고 가정 해 봅시다.

Listing 4. 스레드 및 다중 스레드 집계


public static void main(String[] args)
{
 java.util.Random rand = new java.util.Random();
   int[] flowers = new int[500000000];
 for (int i = 0; i < flowers.length; i++) {
        flowers[i] = rand.nextInt(50) + 1; // simulate 1..50
    }
long start = System.currentTimeMillis();
ParallelAggregation.aggregate(flowers);
System.out.println("Single: " + (System.currentTimeMillis() - start));
start = System.currentTimeMillis();
ParallelAggregation.parallelAggregate(flowers);
System.out.println("Parallel: " + (System.currentTimeMillis() - start));     }

그림 3은 intel i7-5500 CPU 기반 시스템의 샘플 출력을 보여줍니다.

jw sparkaggregate fig3Ravishankar Nair

그림 3. 스레드가없는 다중 스레드 집계의 출력

스레드가없는 방법과 스레드 방식을 비교하는 방법

Listing 4의 프로그램이 작동하고 멀티 스레드 및 스레드리스 솔루션 모두에 대해 정확한 결과를 산출한다. 그러나 이러한 솔루션은 큰 데이터를 위해 어떻게 확장 될 것입니까? 수조 번에 걸친 꽃 따기를 모으기를 원한다고 가정 해 봅시다. 그림 4는 멀티 스레드 프로그램의 결과를 보여줍니다.

jw sparkaggregate fig4Ravishankar Nair

그림 4. 1 조개의 꽃 피커로 출력

프로그램에 해당 크기의 배열에 대한 처리 능력이 충분하지 않습니다. 다음으로 프로그램을 다시 실행합니다. 이번에는 꽃 따는 사람의 수를 5 억 명으로 늘리지 만 실을 4에서 1,000으로 늘립니다.

jw sparkaggregate fig5Ravishankar Nair

그림 5. 1000 개의 스레드가있는 동일한 프로그램의 출력

프로그램은 이번에 실행되지만 출력은 4 개의 프로세서에서 4 개의 스레드를 실행할 때와 같이 유망하지는 않습니다. 멀티 코어에서 일반적인로드 균형 조정 권장 사항은 n + 1 스레드이며 n 은 사용 가능한 CPU 코어 수입니다. 이렇게하면 한 스레드가 디스크 I / O를 기다리는 동안 n 개의 스레드가 CPU와 함께 작동 할 수 있습니다. 스레드 수가 적 으면 CPU 리소스를 완전히 활용하지 못합니다 (어느 시점에서 항상 기다릴 I / O가 있기 때문에). 스레드가 많을수록 CPU를 위해 싸우게됩니다.

쓰레드는 유료로 제공되며 전용 CPU 코어를 사용하여 코드를 실행하면 비용이 발생합니다. 단일 코어 CPU에서 단일 프로세스 (스레드가없는) 솔루션은 대개 더 빠릅니다. 스레드만으로는 처리 속도가 자동으로 증가하지는 않지만 더 많은 작업이 필요합니다. 알다시피, 스레드는 큰 데이터를 위해 반드시 확장되지는 않습니다. 이러한 문제를 감안할 때 이상적인 솔루션을 찾는 것이 끝나지 않았습니다.

MapReduce : 확장 성이 목표 일 때

우리의 문제 진술을 해결하고 1 조개 이상의 꽃 피커 규모로 집계하려면 근본적인 기계의 처리 능력에 의존하지 않고 확장 할 수있는 솔루션이 필요합니다. 우리는 정확성, 일관성, 내결함성, 실패 안전 (점진적 저하) 및 효율적인 자원 활용을 원합니다.

MapReduce 2는 YARN 을 사용하여이 과제에 대한 좋은 프레임 워크입니다. 아래 스크립트는 1에서 50 사이의 5 억 개의 무작위 수의 꽃을 생성합니다.

Listing 5. MapReduce 및 YARN을 사용하는 집계


 for  i in {1 .. 500000000 }; do echo $[($RANDOM % 50 +1)];
 done > test.dat

이 예제를 직접 실행하려면 소스 다운로드 에서 완전한 "test.dat"스크립트를 찾으십시오 . HDFS에 디렉토리를 만들고 테스트 데이터 파일을 안에 넣은 다음 지침을 따르십시오.

다음은 MapReduce 집계의 매퍼입니다.

Listing 6. 집계 매퍼


public void map(Object key,Text value,Context context)
    throws IOException, InterruptedException
    {
      StringTokenizer tokenizer = new StringTokenizer(value.toString(), " \t\n\r\f,.:;?![]'");
      while (tokenizer.hasMoreTokens())
      {
        // make the words lowercase so words like "an" and "An" are counted as one word
        String s = tokenizer.nextToken().toLowerCase().trim();
        IntWritable val = new IntWritable(Integer.parseInt(s));
        word.set("aggregate");
        context.write(word, val);
      }
    }

mapListing 6 의 함수는 입력 데이터를 한 줄씩 읽는다. 그런 다음 호출 된 키를 만듭니다 aggregateaggregate튜플 과 같이 키 와 함께 각 행의 모든 ​​숫자를 내 보냅니다 . 우리는 Listing 7의 감속기에 대해 동일한 키를 사용 했으므로 두 키의 출력 map과 reduce집계를 위해 단일 노드로 지시한다.

Listing 7. 집계 축소 기


public void reduce(Text key,
       Iterable<IntWritable> values,Context context)
    throws IOException, InterruptedException
    {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      total.set(sum);
      // this writes the word and the count, like this:
      // ("aggregate", 2)
      context.write(key, total);
    }

감속기는 매퍼에서 방출 된 키와 값 목록을 가져 와서 모든 값을 집계합니다. 우리는 상수 키를 사용했기 때문에 동일한 노드에서 축소 작업이 발생합니다. 운전자 프로그램에서 감속기 클래스를 결합기로 사용하면이 MapReduce 프로그램을 대형 클러스터에서 매우 효율적으로 실행할 수 있습니다.

YARN과 함께 MapReduce를 사용하면 2 노드 클러스터에서 1 조 행의 데이터를 효율적으로 실행할 수 있습니다. 꽃 피커를 더 추가해야하는 경우 응용 프로그램 노드를 늘려 확장 할 수 있습니다. MapReduce의 고유 한 설계로 인해 투기 실행이 가능하므로 원본 데이터의 복사본이있는 보조 노드에서 파일링되거나 느린 작업을 실행하여 오류 안전성을 보장합니다. 그림 6은 성공적인 단일 노드 실행을위한 출력을 보여줍니다.

jw sparkaggregate fig6Ravishankar Nair

그림 6. MapReduce 출력


728x90
반응형