BigData

GraphRAG(그래프 기반 검색-증강 생성) 파이프라인을 구축하는 방법

IT오이시이 2025. 3. 11. 08:35
728x90

GraphRAG(그래프 기반 검색-증강 생성) 파이프라인을 구축하는 방법

 

아래는 GraphRAG(그래프 기반 검색-증강 생성) 파이프라인 구축 방법을 구체적으로 실행 가능하도록 정리한 내용입니다. 각 단계별로 필요한 코드를 포함하여 설명합니다.


1. 환경 설정

먼저, Python 환경을 설정하고 필요한 라이브러리를 설치합니다.

  1. Python(3.7 이상)과 pip 설치.
  2. 가상 환경 생성:
  3. python -m venv myenv source myenv/bin/activate # Windows: myenv\Scripts\activate
  4. 필요한 라이브러리 설치:
  5. pip install llama-index graspologic numpy scipy neo4j-graphrag

 

가상 환경 생성 및 활성화

python -m venv myenv
source myenv/bin/activate  # Windows: myenv\Scripts\activate

라이브러리 설치

pip install llama-index graspologic numpy scipy neo4j-graphrag

2. 데이터 준비

 

  1. 문서 분할: 큰 문서를 작은 텍스트 조각으로 나눕니다.
    • 예: 문서를 문단 단위로 나누거나, 일정한 길이로 슬라이싱.
  2. 엔티티 및 관계 추출:
    • LLM(예: GPT)을 사용하여 텍스트에서 엔티티(노드)와 관계(엣지)를 추출합니다.
      • 예:
        • 엔티티: NASA
        • 관계: launched (NASA와 우주선을 연결).

문서 분할

긴 문서를 문단 단위 또는 일정 길이로 나눕니다.

def split_document(document, max_length=200):
    # 텍스트를 일정 길이로 나눕니다.
    return [document[i:i+max_length] for i in range(0, len(document), max_length)]

엔티티 및 관계 추출

LLM(예: GPT)을 사용하여 텍스트에서 엔티티(노드)와 관계(엣지)를 추출합니다.

def extract_entities_and_relations(text, llm):
    # 엔티티와 관계를 추출
    entities = llm.extract_entities(text)
    relations = llm.extract_relations(text)
    return entities, relations

 


3. 컴포넌트 생성

 

파이프라인에서 사용할 커스텀 컴포넌트를 작성합니다.

  • 이 코드는 두 숫자를 더하는 간단한 컴포넌트를 정의한 것입니다.

 

ComponentAdd 컴포넌트 구현

from neo4j_graphrag.experimental.pipeline import Component, DataModel

# 데이터 모델 정의
class IntResultModel(DataModel):
    result: int

# 두 숫자를 더하는 컴포넌트
class ComponentAdd(Component):
    async def run(self, number1: int, number2: int = 1) -> IntResultModel:
        return IntResultModel(result=number1 + number2)

 

IntResultModel()

  • DataModel을 상속받아 결과 값을 저장하는 구조를 만듭니다.
  • result는 정수형 데이터로, 연산 결과를 저장하는 역할을 합니다.

ComponentAdd()

  • Component를 상속받아, 비동기 작업을 처리하는 컴포넌트를 정의합니다.
  • run 메서드는 비동기 함수(async)로 구현되어 비동기 환경에서 호출됩니다.

테스트 코드

import asyncio

# ComponentAdd 객체 생성
component = ComponentAdd()

# 비동기 함수 실행
async def test_component():
    # 3 + 7 = 10
    result_model = await component.run(3, 7)
    print(result_model.result)  # 출력: 10

    # 5 + 기본값 1 = 6
    result_model = await component.run(5)
    print(result_model.result)  # 출력: 6

# 이벤트 루프 실행
asyncio.run(test_component())

 


4. 파이프라인 구성

여러 컴포넌트를 연결하여 데이터를 처리하는 파이프라인을 만듭니다.

  • 예제 코드: 이 코드는 ComponentAdd 컴포넌트를 두 번 사용하여 출력값을 다음 입력값으로 연결합니다.

파이프라인 생성 및 컴포넌트 연결

from neo4j_graphrag.experimental.pipeline import Pipeline

# 파이프라인 생성
pipe = Pipeline()

# 컴포넌트 추가
pipe.add_component(ComponentAdd(), "a")
pipe.add_component(ComponentAdd(), "b")

# 컴포넌트 연결 (a -> b)
pipe.connect("a", "b", input_config={"number2": "a.result"})

# 파이프라인 실행
import asyncio
asyncio.run(pipe.run({
    "a": {"number1": 10},
    "b": {"number1": 4}
}))
  •  
from neo4j_graphrag.experimental.pipeline import Pipeline
from neo4j_graphrag.experimental.pipeline import Component, DataModel

# 데이터 모델 정의
class IntResultModel(DataModel):
    result: int

# 컴포넌트 정의
class ComponentAdd(Component):
    async def run(self, number1: int, number2: int = 1) -> IntResultModel:
        return IntResultModel(result=number1 + number2)

# 파이프라인 생성
pipe = Pipeline()

# 컴포넌트 추가
pipe.add_component(ComponentAdd(), "a")
pipe.add_component(ComponentAdd(), "b")

# 컴포넌트 연결  : 컴포넌트 "a"와 "b"를 연결합니다.
pipe.connect("a", "b", input_config={"number2": "a.result"})

# 파이프라인 실행
import asyncio
asyncio.run(pipe.run({
    "a": {"number1": 10},
    "b": {"number1": 4}
}))
  • pipe = Pipeline()
    • Pipeline 객체를 생성하여 컴포넌트를 추가하고 연결할 기반이 되는 파이프라인을 만듭니다.
  • pipe.add_component(ComponentAdd(), "a")
  • pipe.add_component(ComponentAdd(), "b")
    • 파이프라인에 두 개의 ComponentAdd 인스턴스를 추가합니다.
    • 각 컴포넌트는 "a"와 "b"라는 이름으로 식별됩니다.
    • ComponentAdd는 number1과 number2의 합을 계산하는 비동기 컴포넌트입니다(이전 코드에서 정의된 클래스).

 

"a" 컴포넌트:

      number1=10, number2=1(기본값).
      결과: result=11.

"b" 컴포넌트:

      number1=4, number2=11 ("a"의 결과).
      최종 결과: result=15.

 

 


5. 그래프 시각화 및 디버깅

파이프라인 구조를 시각화:

pipe.draw("pipeline.png")

이벤트 콜백을 추가하여 진행 상황을 모니터링할 수도 있습니다.

파이프라인 시각화

pipe.draw("pipeline.png")  # 파이프라인을 이미지 파일로 저장

진행 상황 모니터링

파이프라인 실행 시 이벤트 콜백 추가:

def event_callback(event):
    print(f"Event: {event}")

pipe.add_event_listener(event_callback)

6. 파이프라인 실행

 

준비된 데이터를 사용해 파이프라인 실행:

graphrag index --root ./ragtest

설정 파일(settings.yaml)에서 모델 종류, 청크 크기 등을 조정합니다.

 

준비된 데이터를 사용해 실행

graphrag CLI 명령어를 사용할 수도 있습니다:

graphrag index --root ./ragtest

위 코드는 Python 환경에서 GraphRAG 파이프라인을 실제로 작동시킬 수 있도록 설계되었습니다.

 

 

7. 질의 및 요약 생성

질의 모드:

Global Search: 그래프 전체 커뮤니티를 대상으로 요약.

Local Search: 특정 노드나 관계에 초점을 맞춘 요약.

결과를 확인하고 필요한 정보를 추출합니다.


활용 사례

PDF 문서 처리 및 요약.

질문 응답 시스템 구축.

뉴스 데이터 분석 등 다양한 분야에 적용 가능합니다.

728x90
반응형