본문 바로가기
python

bert를 활용한 문장 분류(bert classification), BertForSequenceClassification

by 와우지니 2023. 4. 25.
반응형

  • transformer 패키지에 BertForSequenceClassification를 활용한 분류기 코드 입니다.  샘플데이터는 nsmc 데이터를 사용하였습니다.
  • txt 확장자를 가지며, 아래 그림과 같이 tab으로 분류 되고, 문장에 대한 컬럼이름이 "document"  라벨에 대한 컬럼이름이 "label" 이면,  본 코드 활용이 가능합니다.


  • [요약] 제가 작성한 함수에 대한 설명과 각 함수의 위계 입니다.
level 1 level 2 level 3 내용
run preprocess   - 데이터 불러오기 -> 스페셜 토큰 추가 -> 분절화(토큰화)->패딩->마스팅 과정을 거치는 함수 입니다.
- 토큰에 대한 아이디, 마스킹, 라벨 값을 리턴합니다.
    load_data - data 폴더 안에 tab으로 분류된 txt파일을 판다스를 활용하여 불러옵니다.
- document 와 rabel 컬럼이 존재해야 합니다.
- 문장과 문장에 대한 라벨을 리스트 타입으로 각각 반환합니다.
    add_special_token - 문장의 시작에 "[CLS]" , 끝에 "[SEP]" 토큰을 추가하였습니다.
- 스페셜토큰이 추가된 문장을 리스트 타입으로 반환합니다.
    tokenization - BertTokenizer에서 제공하는'bert-base-multilingual-cased' 모델을 활용하여 토큰화를 진행하였습니다.
- 토큰에 대한 아이디 값을 리스트로 리턴합니다.
    padding - 패딩의 최대값은 "max_len" 아규먼트를 통해 수정할 수 있습니다. 디폴트는 128입니다.
- 아이디 값에 패딩이 추가된 문장을 리스트로 리턴합니다.
    attention_mask - 토큰(스페셜 토큰 포함)이 있는 부분은 1 없는 부분을 0으로 하는 마스킹 합니다.
- 마스킹 값을 리스트로 리턴합니다.
  train_test_data_split   - 토큰에 대한 아이디값, 마스킹 값, 라벨을 train, test 셋으로 나누줍니다.
- 토큰에 대한 아이디값, 마스킹 값, 라벨 각각에 대한 train, test 값을 리스트 타입으로 리턴합니다. 
  build_dataloader(for train)   - 데이터를 pytorch 데이터로더로 변환합니다.
- train 데이터를 데이터로더로 리턴합니다.
  build_dataloader(for test)   - 위 함수 반복하여 사용하였습니다.
- test 데이터를 데이터로더로 리턴합니다.
  train   - 옵티마이저, 러닝레이트 스케줄러, 등을 포함하고, 설정된 에폭과 베치 사이즈에 따라 훈련을 시행하고 정확도 및 로스를 에폭마다 프린트 합니다.
- result 폴더를 만들고 폴더 안에 모델을 pt 파일로 에폭마다 저장합니다.
- 파일이름은 "epoch_{번호}_evalAcc_{정확도*100}.pth" 와같이 저장됩니다.
    build_model - BertForSequenceClassification 을 활용하여 모델을 쌓습니다.
- GPU로 이동한 모델과 설정된 장치를 리턴합니다.
    test - test 데이터 로더와 모델을 받아 accuracy를 측정합니다.
- accuracy를 리턴합니다.

  • 함수별로 코드를 보여 드립니다. 제일 아래로 가면 전체가 연결되 코드를 보실 수 있습니다.

패키지 불러오기

import os
import random
import time
import datetime
import torch
import argparse

import pandas as pd
import numpy as np

from transformers import BertTokenizer, BertForSequenceClassification, AdamW, get_linear_schedule_with_warmup, BertConfig
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

 

데이터 불러오기

    - data 폴더 안에 tab으로 분류된 txt파일을 판다스를 활용하여 불러옵니다.
    - document 와 rabel 컬럼이 존재해야 합니다.
    - 문장과 문장에 대한 라벨을 리스트 타입으로 각각 반환합니다.

def load_data(args):
    temp = pd.read_csv(args.raw_data, sep="\t")
    temp = temp
    document = temp.document.tolist()
    labels = temp.label.tolist()
    return document, labels

 

스페셜 토큰 붙이기

    - 문장의 시작에 "[CLS]" , 끝에 "[SEP]" 토큰을 추가하였습니다.
    - 스페셜토큰이 추가된 문장을 리스트 타입으로 반환합니다.

def add_special_token(document):
    added = ["[CLS]" + str(sentence) + "[SEP]" for sentence in document]
    return added

 

단어 분절화 및 토큰별 아이디 맵핑

    - BertTokenizer에서 제공하는'bert-base-multilingual-cased' 모델을 활용하여 토큰화를 진행하였습니다.
    - 토큰에 대한 아이디 값을 리스트로 리턴합니다.

def tokenization(document):
    tokenizer = BertTokenizer.from_pretrained(
            'bert-base-multilingual-cased', 
            do_lower_case=False,
            )
    tokenized = [tokenizer.tokenize(sentence) for sentence in document]
    ids = [tokenizer.convert_tokens_to_ids(sentence) for sentence in tokenized]
    return ids

 

패딩

    - 패딩의 최대값은 "max_len" 아규먼트를 통해 수정할 수 있습니다. 디폴트는 128입니다.
    - 아이디 값에 패딩이 추가된 문장을 리스트로 리턴합니다.

def padding(ids, args):
    ids = pad_sequences(ids, maxlen=args.max_len, dtype="long", truncating='post', padding='post')
    return ids

 

어텐션 마스킹

    - 토큰(스페셜 토큰 포함)이 있는 부분은 1 없는 부분을 0으로 하는 마스킹 합니다.
    - 마스킹 값을 리스트로 리턴합니다.

# 학습 속도를 높이기 위한 어텐션 마스크 표시
def attention_mask(ids):
    masks = []
    for id in ids:
        mask = [float(i>0) for i in id]
        masks.append(mask)
    return masks

 

전처리 종합

    - 데이터 불러오기 -> 스페셜 토큰 추가 -> 분절화(토큰화)->패딩->마스팅 과정을 거치는 함수 입니다.
    - 토큰에 대한 아이디, 마스킹, 라벨 값을 리턴합니다.

def preprocess(args):
    document, labels = load_data(args)
    document = add_special_token(document)
    ids = tokenization(document)
    ids = padding(ids, args)
    masks = attention_mask(ids)
    del document
    return ids, masks, labels

 

train, test 데이터 분리

    - 토큰에 대한 아이디값, 마스킹 값, 라벨을 train, test 셋으로 나누줍니다.
    - 토큰에 대한 아이디값, 마스킹 값, 라벨 각각에 대한 train, test 값을 리스트 타입으로 리턴합니다. 

def train_test_data_split(ids, masks, labels):
    train_ids, test_ids, train_labels, test_labels = train_test_split(ids, labels, random_state=42, test_size=0.1)
    train_masks, test_masks, _, _ = train_test_split(masks, ids, random_state=42, test_size=0.1)
    return train_ids, train_masks, train_labels, test_ids, test_masks, test_labels

 

pytorch 데이터 로더 생성

    - 데이터를 pytorch 데이터로더로 변환합니다.

    - 데이터로더를 리턴합니다.

def build_dataloader(ids, masks, label, args):
    dataloader = TensorDataset(torch.tensor(ids), torch.tensor(masks), torch.tensor(label))
    dataloader = DataLoader(dataloader, sampler=RandomSampler(dataloader), batch_size=args.batch_size)
    return dataloader

 

모델 구축 (본 코드는 GPU가 있어야 수행가능합니다.)

    - BertForSequenceClassification 을 활용하여 모델을 쌓습니다.
    - GPU로 이동한 모델과 설정된 장치를 리턴합니다.

def build_model(args):
    model = BertForSequenceClassification.from_pretrained("bert-base-multilingual-cased", num_labels=args.num_labels)
    device = torch.device("cuda")
    print(f"{torch.cuda.get_device_name(0)} available")
    model = model.cuda()

    return model, device

 

test 수행 

    - test 데이터 로더와 모델을 받아 accuracy를 측정합니다.
    - accuracy를 리턴합니다.

def test(test_dataloader, model, device):
    # 테스트 모드 전환
    model.eval()
    
    # 정확도 초기화
    total_accuracy = 0
    
    for batch in test_dataloader:
    	# 배치를 GPU로 이동
        batch = tuple(index.to(device) for index in batch)
        ids, masks, labels = batch
        
        # 테스트의 경우 그레디언트 연산을 수행할 필요가 없음
        with torch.no_grad():
            outputs = model(ids, token_type_ids=None, attention_mask=masks)
            
        pred = [torch.argmax(logit).cpu().detach().item() for logit in outputs.logits]
        true = [label for label in labels.cpu().numpy()]
        accuracy = accuracy_score(true, pred)
        total_accuracy += accuracy
    avg_accuracy = total_accuracy/len(test_dataloader)
    print(f"test AVG accuracy : {avg_accuracy: .2f}")
    return avg_accuracy

학습(train) 수행

    - 옵티마이저, 러닝레이트 스케줄러, 등을 포함하고, 설정된 에폭과 베치 사이즈에 따라 훈련을 시행하고 정확도 및 로스를 에폭마다 프린트 합니다.
    - result 폴더를 만들고 폴더 안에 모델을 pt 파일로 에폭마다 저장합니다.

    - 파일이름은 "epoch_{번호}_evalAcc_{정확도*100}.pth" 와같이 저장됩니다.

def train(train_dataloader, test_dataloader, args):
    model, device = build_model(args)
    
    # 옵티마이저 정의 
    optimizer = AdamW(model.parameters(), lr=2e-5, eps=1e-8)
    
    # learning rate decay
    scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=len(train_dataloader)*args.epochs)
    
    # 시드 고정
    random.seed(args.seed_val)
    np.random.seed(args.seed_val)
    torch.manual_seed(args.seed_val)
    torch.cuda.manual_seed_all(args.seed_val)
    

    
    # 그레디언트 초기화
    model.zero_grad()
    for epoch in range(0, args.epochs):
        # 훈련모드 
        model.train()
        
        # 로스와 정확도 초기화
        total_loss, total_accuracy = 0, 0
        print("-"*30)
        for step, batch in enumerate(train_dataloader):
            if step % 500 == 0 :
                print(f"Epoch : {epoch+1} in {args.epochs} / Step : {step}")
			
            # 배치 선정
            batch = tuple(index.to(device) for index in batch)
            ids, masks, labels, = batch
			
            # forward
            outputs = model(ids, token_type_ids=None, attention_mask=masks, labels=labels)
            
            # loss 도출
            loss = outputs.loss
            total_loss += loss.item()
			
            # 정확도 도출
            pred = [torch.argmax(logit).cpu().detach().item() for logit in outputs.logits]
            true = [label for label in labels.cpu().numpy()]
            accuracy = accuracy_score(true, pred)
            total_accuracy += accuracy

			# 그레디언트 연산
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            
            # 파라미터 업데이트
            optimizer.step()
            
            # 러닝레이트 최적화
            scheduler.step()
            
            # 그레디언트 초기화
            model.zero_grad()
            
        # epoch 당 loss 와 정확도 계산
        avg_loss = total_loss / len(train_dataloader)
        avg_accuracy = total_accuracy/len(train_dataloader)
        print(f" {epoch+1} Epoch Average train loss :  {avg_loss}")
        print(f" {epoch+1} Epoch Average train accuracy :  {avg_accuracy}")

		# test 수행
        acc = test(test_dataloader, model, device)
        
        # 모델 저장
        os.makedirs("results", exist_ok=True)
        f = os.path.join("results", f'epoch_{epoch+1}_evalAcc_{acc*100:.0f}.pth')
        torch.save(model.state_dict(), f)
        print('Saved checkpoint:', f)

실행함수

def run(args):
    ids, masks, labels = preprocess(args)
    train_ids, train_masks, train_labels, test_ids, test_masks, test_labels = train_test_data_split(ids, masks, labels)
    train_dataloader = build_dataloader(train_ids, train_masks, train_labels, args)
    test_dataloader = build_dataloader(test_ids, test_masks, test_labels, args)
    train(train_dataloader, test_dataloader, args)

Argument parser 부분

if __name__ == "__main__":
    parser = argparse.ArgumentParser()

    parser.add_argument("-raw_data", default="./data/ratings_train.txt")
    parser.add_argument("-max_len", default=128, type=int)
    parser.add_argument("-batch_size", default=32, type=int)
    parser.add_argument("-num_labels", default=2, type=int)
    parser.add_argument("-epochs", default=4, type=int)
    parser.add_argument("-seed_val", default=42, type=int)

    args = parser.parse_args()
    run(args)

 


전체 코드

import os
import random
import time
import datetime
import torch
import argparse

import pandas as pd
import numpy as np

from transformers import BertTokenizer, BertForSequenceClassification, AdamW, get_linear_schedule_with_warmup, BertConfig
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from keras.preprocessing.sequence import pad_sequences

def load_data(args):
    temp = pd.read_csv(args.raw_data, sep="\t")
    temp = temp
    document = temp.document.tolist()
    labels = temp.label.tolist()
    return document, labels

def add_special_token(document):
    added = ["[CLS]" + str(sentence) + "[SEP]" for sentence in document]
    return added

def tokenization(document, mode="huggingface"):
    if mode == "huggingface":
        tokenizer = BertTokenizer.from_pretrained(
                'bert-base-multilingual-cased', 
                do_lower_case=False,
                )
        tokenized = [tokenizer.tokenize(sentence) for sentence in document]
        ids = [tokenizer.convert_tokens_to_ids(sentence) for sentence in tokenized]
        return ids

def padding(ids, args):
    ids = pad_sequences(ids, maxlen=args.max_len, dtype="long", truncating='post', padding='post')
    return ids

def attention_mask(ids):
    masks = []
    for id in ids:
        mask = [float(i>0) for i in id]
        masks.append(mask)
    return masks

def preprocess(args):
    document, labels = load_data(args)
    document = add_special_token(document)
    ids = tokenization(document)
    ids = padding(ids, args)
    masks = attention_mask(ids)
    del document
    return ids, masks, labels

def train_test_data_split(ids, masks, labels):
    train_ids, test_ids, train_labels, test_labels = train_test_split(ids, labels, random_state=42, test_size=0.1)
    train_masks, test_masks, _, _ = train_test_split(masks, ids, random_state=42, test_size=0.1)
    return train_ids, train_masks, train_labels, test_ids, test_masks, test_labels

def build_dataloader(ids, masks, label, args):
    dataloader = TensorDataset(torch.tensor(ids), torch.tensor(masks), torch.tensor(label))
    dataloader = DataLoader(dataloader, sampler=RandomSampler(dataloader), batch_size=args.batch_size)
    return dataloader
        
def build_model(args):
    model = BertForSequenceClassification.from_pretrained("bert-base-multilingual-cased", num_labels=args.num_labels)
    if torch.cuda.is_available():
        device = torch.device("cuda")
        print(f"{torch.cuda.get_device_name(0)} available")
        model = model.cuda()
    else:
        device = torch.device("cpu")
        print("no GPU available")
        model = model
    return model, device

def test(test_dataloader, model, device):
    model.eval()
    total_accuracy = 0
    for batch in test_dataloader:
        batch = tuple(index.to(device) for index in batch)
        ids, masks, labels = batch
        with torch.no_grad():
            outputs = model(ids, token_type_ids=None, attention_mask=masks)
        pred = [torch.argmax(logit).cpu().detach().item() for logit in outputs.logits]
        true = [label for label in labels.cpu().numpy()]
        accuracy = accuracy_score(true, pred)
        total_accuracy += accuracy
    avg_accuracy = total_accuracy/len(test_dataloader)
    print(f"test AVG accuracy : {avg_accuracy: .2f}")
    return avg_accuracy

def train(train_dataloader, test_dataloader, args):
    model, device = build_model(args)
    optimizer = AdamW(model.parameters(), lr=2e-5, eps=1e-8)
    scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=len(train_dataloader)*args.epochs)
    random.seed(args.seed_val)
    np.random.seed(args.seed_val)
    torch.manual_seed(args.seed_val)
    torch.cuda.manual_seed_all(args.seed_val)
    model.zero_grad()
    
    for epoch in range(0, args.epochs):
        model.train()
        total_loss, total_accuracy = 0, 0
        print("-"*30)
        for step, batch in enumerate(train_dataloader):
            if step % 500 == 0 :
                print(f"Epoch : {epoch+1} in {args.epochs} / Step : {step}")

            batch = tuple(index.to(device) for index in batch)
            ids, masks, labels, = batch

            outputs = model(ids, token_type_ids=None, attention_mask=masks, labels=labels)
            loss = outputs.loss
            total_loss += loss.item()

            pred = [torch.argmax(logit).cpu().detach().item() for logit in outputs.logits]
            true = [label for label in labels.cpu().numpy()]
            accuracy = accuracy_score(true, pred)
            total_accuracy += accuracy

            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            scheduler.step()
            model.zero_grad()
        avg_loss = total_loss / len(train_dataloader)
        avg_accuracy = total_accuracy/len(train_dataloader)
        print(f" {epoch+1} Epoch Average train loss :  {avg_loss}")
        print(f" {epoch+1} Epoch Average train accuracy :  {avg_accuracy}")

        acc = test(test_dataloader, model, device)
        os.makedirs("results", exist_ok=True)
        f = os.path.join("results", f'epoch_{epoch+1}_evalAcc_{acc*100:.0f}.pth')
        torch.save(model.state_dict(), f)
        print('Saved checkpoint:', f)

def run(args):
    ids, masks, labels = preprocess(args)
    train_ids, train_masks, train_labels, test_ids, test_masks, test_labels = train_test_data_split(ids, masks, labels)
    train_dataloader = build_dataloader(train_ids, train_masks, train_labels, args)
    test_dataloader = build_dataloader(test_ids, test_masks, test_labels, args)
    train(train_dataloader, test_dataloader, args)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()

    parser.add_argument("-raw_data", default="./data/ratings_train.txt")
    parser.add_argument("-max_len", default=128, type=int)
    parser.add_argument("-batch_size", default=32, type=int)
    parser.add_argument("-num_labels", default=2, type=int)
    parser.add_argument("-epochs", default=4, type=int)
    parser.add_argument("-seed_val", default=42, type=int)

    args = parser.parse_args()
    run(args)

 

반응형

댓글