NLP進階,使用TextRNN和TextRNN_ATT實現文本分類
TextRNN
TextRNN僅僅是將Word Embedding后,輸入到雙向LSTM中,然后對最后一位的輸出輸入到全連接層中,在對其進行softmax分類即可,模型如下圖:
代碼:
class RNN(nn.Module): def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers=2, bidirectional=True, dropout=0.2, pad_idx=0): super().__init__() self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx) self.rnn = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers,batch_first=True, bidirectional=bidirectional) self.fc = nn.Linear(hidden_dim * 2, output_dim) # 這里hidden_dim乘以2是因為是雙向,需要拼接兩個方向,跟n_layers的層數無關。 self.dropout = nn.Dropout(dropout) def forward(self, text): # text.shape=[seq_len, batch_size] embedded = self.dropout(self.embedding(text)) # output: [batch,seq,2*hidden if bidirection else hidden] # hidden/cell: [bidirec * n_layers, batch, hidden] output, (hidden, cell) = self.rnn(embedded) # concat the final forward (hidden[-2,:,:]) and backward (hidden[-1,:,:]) hidden layers hidden = self.dropout(torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1)) # hidden = [batch size, hid dim * num directions], return self.fc(hidden.squeeze(0)) # 在接一個全連接層,最終輸出[batch size, output_dim]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
TextRNN_ATT
在TextRNN的基礎上加入注意力機制,代碼:
class RNN_ATTs(nn.Module): def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers=2, bidirectional=True, dropout=0.2, pad_idx=0, hidden_size2=64): super().__init__() self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx) self.lstm = nn.LSTM(embedding_dim, hidden_dim, n_layers, bidirectional=bidirectional, batch_first=True, dropout=dropout) self.tanh1 = nn.Tanh() # self.u = nn.Parameter(torch.Tensor(config.hidden_size * 2, config.hidden_size * 2)) self.w = nn.Parameter(torch.zeros(hidden_dim * 2)) self.tanh2 = nn.Tanh() self.fc1 = nn.Linear(hidden_dim * 2, hidden_size2) self.fc = nn.Linear(hidden_size2, output_dim) def forward(self, x): emb = self.embedding(x) # [batch_size, seq_len, embeding]=[128, 32, 300] H, _ = self.lstm(emb) # [batch_size, seq_len, hidden_size * num_direction]=[128, 32, 256] M = self.tanh1(H) # [128, 32, 256] # M = torch.tanh(torch.matmul(H, self.u)) alpha = F.softmax(torch.matmul(M, self.w), dim=1).unsqueeze(-1) # [128, 32, 1] out = H * alpha # [128, 32, 256] out = torch.sum(out, 1) # [128, 256] out = F.relu(out) out = self.fc1(out) out = self.fc(out) # [128, 64] return out
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
數據集
數據集采用cnews數據集,包含三個文件,分別是cnews.train.txt,cnews.val.txt,cnews,test.txt。類別:體育, 娛樂, 家居, 房產, 教育, 時尚, 時政, 游戲, 科技, 財經,共10個類別。網盤地址:
鏈接:https://pan.baidu.com/s/1awlBYclO_mxntEgL_tUF0g
提取碼:rtnv
構建詞向量
第一步,讀取預料,做分詞。
思路:
1、創建默認方式的分詞對象seg。
2、打開文件,按照行讀取文章。
3、去掉收尾的空格,將label和文章分割開。
4、將分詞后的文章放到src_data,label放入labels里。
5、返回結果。
我對代碼做了注解,如下:
def read_corpus(file_path): """讀取語料 :param file_path: :param type: :return: """ src_data = [] labels = [] seg = pkuseg.pkuseg() #使用默認分詞方式。 with codecs.open(file_path,'r',encoding='utf-8') as fout: for line in tqdm(fout.readlines(),desc='reading corpus'): if line is not None: # line.strip()的意思是去掉每句話句首句尾的空格 # .split(‘\t’)的意思是根據'\t'把label和文章內容分開,label和內容是通過‘\t’隔開的。 # \t表示空四個字符,也稱縮進,相當于按一下Tab鍵 pair = line.strip().split('\t') if len(pair) != 2: print(pair) continue src_data.append(seg.cut(pair[1]))# 對文章內容分詞。 labels.append(pair[0]) return (src_data, labels) #返回文章內容的分詞結果和labels
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
經過這個步驟得到了labels和分詞后的文章。如下代碼:
src_sents, labels = read_corpus('cnews/cnews.train.txt')
1
對labels做映射:
labels = {label: idx for idx, label in enumerate(labels)}
1
得到labels對應的idx的字典,idx的值是最后一次插入label的值。
第二步 構建詞向量
這一步主要用到vocab.py的from_corpus方法
思路:
1、創建vocab_entry對象。
2、對分詞后的文章統計詞頻,生成一個詞和詞頻構成的字典。
3、從字典中取出Top size - 2個元素。
4、獲取元素的詞。
5、執行add方法將詞放入vocab_entry,生成詞和id,id就是詞對應的向量值。
代碼如下:
@staticmethod def from_corpus(corpus, size, min_feq=3): """從給定語料中創建VocabEntry""" vocab_entry = VocabEntry() # chain函數來自于itertools庫,itertools庫提供了非常有用的基于迭代對象的函數,而chain函數則是可以串聯多個迭代對象來形成一個更大的迭代對象 # *的作用:返回單個迭代器。 # word_freq是個字典,key=詞,value=詞頻 word_freq = Counter(chain(*corpus)) # Counter 是實現的 dict 的一個子類,可以用來方便地計數,統計詞頻 valid_words = word_freq.most_common(size - 2) # most_common()函數用來實現Top n 功能,在這里選出Top size-2個詞 valid_words = [word for word, value in valid_words if value >= min_feq] # 把符合要求的詞找出來放到list里面。 print('number of word types: {}, number of word types w/ frequency >= {}: {}' .format(len(word_freq), min_feq, len(valid_words))) for word in valid_words: # 將詞放進VocabEntry里面。 vocab_entry.add(word) return vocab_entry
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
創建完成后將詞向量保存到json文件中
vocab = Vocab.build(src_sents, labels, 50000, 3) print('generated vocabulary, source %d words' % (len(vocab.vocab))) vocab.save('./vocab.json')
1
2
3
4
訓練
訓練使用Train_RNN.py,先看分析main方法的參數。
參數
parse = argparse.ArgumentParser() parse.add_argument("--train_data_dir", default='./cnews/cnews.train.txt', type=str, required=False) parse.add_argument("--dev_data_dir", default='./cnews/cnews.val.txt', type=str, required=False) parse.add_argument("--test_data_dir", default='./cnews/cnews.test.txt', type=str, required=False) parse.add_argument("--output_file", default='deep_model.log', type=str, required=False) parse.add_argument("--batch_size", default=4, type=int) parse.add_argument("--do_train", default=True, action="store_true", help="Whether to run training.") parse.add_argument("--do_test", default=True, action="store_true", help="Whether to run training.") parse.add_argument("--learnning_rate", default=5e-4, type=float) parse.add_argument("--num_epoch", default=50, type=int) parse.add_argument("--max_vocab_size", default=50000, type=int) parse.add_argument("--min_freq", default=2, type=int) parse.add_argument("--hidden_size", default=256, type=int) parse.add_argument("--embed_size", default=300, type=int) parse.add_argument("--dropout_rate", default=0.2, type=float) parse.add_argument("--warmup_steps", default=0, type=int, help="Linear warmup over warmup_steps.") parse.add_argument("--GRAD_CLIP", default=1, type=float) parse.add_argument("--vocab_path", default='vocab.json', type=str)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
參數說明:
train_data_dir:訓練集路徑。
dev_data_dir:驗證集路徑
test_data_dir:測試集路徑
output_file:輸出的log路徑
batch_size:batchsize的大小。
do_train:是否訓練,默認True、
do_test:是否測試,默認True
learnning_rate:學習率
num_epoch:epoch的數量
max_vocab_size:詞向量的個數
min_freq:詞頻,過濾低于這個數值的詞
hidden_size:隱藏層的個數
embed_size:Embedding的長度。
dropout_rate:dropout的值。
warmup_steps:設置預熱的值。
vocab_path:詞向量保存的路徑
構建詞向量
vocab = build_vocab(args) label_map = vocab.labels print(label_map)
1
2
3
build_vocab的方法:
def build_vocab(args): if not os.path.exists(args.vocab_path): src_sents, labels = read_corpus(args.train_data_dir) labels = {label: idx for idx, label in enumerate(labels)} vocab = Vocab.build(src_sents, labels, args.max_vocab_size, args.min_freq) vocab.save(args.vocab_path) else: vocab = Vocab.load(args.vocab_path) return vocab
1
2
3
4
5
6
7
8
9
創建模型
創建CNN模型,將模型放到GPU上,調用train方法,訓練。
rnn_model = RNN_ATTs(len(vocab.vocab), args.embed_size, args.hidden_size, len(label_map), n_layers=1, bidirectional=True, dropout=args.dropout_rate) rnn_model.to(device) train(args, rnn_model, train_data, dev_data, vocab, dtype='RNN')
1
2
3
4
對train方法做了一些注解,如下:
def train(args, model, train_data, dev_data, vocab, dtype='CNN'): LOG_FILE = args.output_file #記錄訓練log with open(LOG_FILE, "a") as fout: fout.write('\n') fout.write('==========' * 6) fout.write('start trainning: {}'.format(dtype)) fout.write('\n') time_start = time.time() if not os.path.exists(os.path.join('./runs', dtype)): os.makedirs(os.path.join('./runs', dtype)) tb_writer = SummaryWriter(os.path.join('./runs', dtype)) # 計算總的迭代次數 t_total = args.num_epoch * (math.ceil(len(train_data) / args.batch_size)) #optimizer = bnb.optim.Adam8bit(model.parameters(), lr=0.001, betas=(0.9, 0.995)) # add bnb optimizer optimizer = AdamW(model.parameters(), lr=args.learnning_rate, eps=1e-8)#設置優化器 scheduler = get_linear_schedule_with_warmup(optimizer=optimizer, num_warmup_steps=args.warmup_steps, num_training_steps=t_total) #設置預熱。 criterion = nn.CrossEntropyLoss()# 設置loss為交叉熵 global_step = 0 total_loss = 0. logg_loss = 0. val_acces = [] train_epoch = trange(args.num_epoch, desc='train_epoch') for epoch in train_epoch:#訓練epoch model.train() for src_sents, labels in batch_iter(train_data, args.batch_size, shuffle=True): src_sents = vocab.vocab.to_input_tensor(src_sents, args.device) global_step += 1 optimizer.zero_grad() logits = model(src_sents) y_labels = torch.tensor(labels, device=args.device) example_losses = criterion(logits, y_labels) example_losses.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), args.GRAD_CLIP) optimizer.step() scheduler.step() total_loss += example_losses.item() if global_step % 100 == 0: loss_scalar = (total_loss - logg_loss) / 100 logg_loss = total_loss with open(LOG_FILE, "a") as fout: fout.write("epoch: {}, iter: {}, loss: {},learn_rate: {}\n".format(epoch, global_step, loss_scalar, scheduler.get_lr()[0])) print("epoch: {}, iter: {}, loss: {}, learning_rate: {}".format(epoch, global_step, loss_scalar, scheduler.get_lr()[0])) tb_writer.add_scalar("lr", scheduler.get_lr()[0], global_step) tb_writer.add_scalar("loss", loss_scalar, global_step) print("Epoch", epoch, "Training loss", total_loss / global_step) eval_loss, eval_result = evaluate(args, criterion, model, dev_data, vocab) # 評估模型 with open(LOG_FILE, "a") as fout: fout.write("EVALUATE: epoch: {}, loss: {},eval_result: {}\n".format(epoch, eval_loss, eval_result)) eval_acc = eval_result['acc'] if len(val_acces) == 0 or eval_acc > max(val_acces): # 如果比之前的acc要da,就保存模型 print("best model on epoch: {}, eval_acc: {}".format(epoch, eval_acc)) torch.save(model.state_dict(), "classifa-best-{}.th".format(dtype)) val_acces.append(eval_acc) time_end = time.time() print("run model of {},taking total {} m".format(dtype, (time_end - time_start) / 60)) with open(LOG_FILE, "a") as fout: fout.write("run model of {},taking total {} m\n".format(dtype, (time_end - time_start) / 60))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
重點注釋了一下batch_iter方法,如下:
def batch_iter(data, batch_size, shuffle=False): """ batch數據 :param data: list of tuple :param batch_size: :param shuffle: :return: """ batch_num = math.ceil(len(data) / batch_size)# 計算迭代的次數 index_array = list(range(len(data))) #按照data的長度,映射list if shuffle:#是否打亂順序 random.shuffle(index_array) for i in range(batch_num): indices = index_array[i*batch_size:(i+1)*batch_size]# 選出batchsize個index examples = [data[idx] for idx in indices]# 通過index找到對應的data examples = sorted(examples,key=lambda x: len(x[1]),reverse=True)#按照label排序 src_sents = [e[0] for e in examples] #把data中的文章放到src_sents labels = [label_map[e[1]] for e in examples] #將標題映射label_map對應的value yield src_sents, labels
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
下面一個重要的方法是vocab.vocab.to_input_tensor,核心思路:
1、將數據通過 self.words2indices方法轉為詞對應的數值。
2、找出一個batch中最長的數據,剩下的數據后面補0,形成統一的長度。
3、將第二步得到的結果放入torch.tensor
代碼如下:
def to_input_tensor(self, sents: List[List[str]], device: torch.device): """ 將原始句子list轉為tensor,同時將句子PAD成max_len :param sents: list of list
1
2
3
4
5
6
7
8
9
10
11
12
開始訓練:
驗證
將do_train改為False,do_test改為True就可以開啟驗證模型,TextRNN能達到0.96的成績。
parse.add_argument("--do_train", default=False, action="store_true", help="Whether to run training.")
1
完整代碼鏈接:
https://download.csdn.net/download/hhhhhhhhhhwwwwwwwwww/40816205
機器學習 自然語言處理基礎
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。