python - 這個(gè)錯(cuò)誤要怎么改
問題描述
源代碼
#!/bin/env python3# coding:utf-8'''ljk 20161116(update 20170217)This script should be put in crontab in every web server.Execute every n minutes.Collect nginx access log, process it and insert the result into mysql.'''import osimport reimport subprocessimport timeimport warningsimport pymysqlfrom sys import argv, exitfrom socket import gethostnamefrom urllib.parse import unquotefrom zlib import crc32from multiprocessing import Pool##### 自定義部分 ###### 定義日志格式,利用非貪婪匹配和分組匹配,需要嚴(yán)格參照日志定義中的分隔符和引號(hào)log_pattern = r’^(?P<remote_addr>.*?) - [(?P<time_local>.*?)] '(?P<request>.*?)'’ r’ (?P<status>.*?) (?P<body_bytes_sent>.*?) (?P<request_time>.*?)’ r’ '(?P<http_referer>.*?)' '(?P<http_user_agent>.*?)' - (?P<http_x_forwarded_for>.*)$’# request的正則,其實(shí)是由 'request_method request_uri server_protocol'三部分組成request_uri_pattern = r’^(?P<request_method>(GET|POST|HEAD|DELETE)?) (?P<request_uri>.*?) (?P<server_protocol>HTTP.*)$’# 日志目錄log_dir = ’/nginx_log/’# 要處理的站點(diǎn)(可隨需要想list中添加)todo = [’www’, ’news’, ’m.api’,]# MySQL相關(guān)設(shè)置mysql_host = ’xxxx’mysql_user = ’xxxx’mysql_passwd = ’xxxx’mysql_port = ’xxxx’mysql_database = ’xxxx’# 表結(jié)構(gòu)creat_table = 'CREATE TABLE IF NOT EXISTS {} (id bigint unsigned NOT NULL AUTO_INCREMENT PRIMARY KEY,server char(11) NOT NULL DEFAULT ’’,uri_abs varchar(200) NOT NULL DEFAULT ’’ COMMENT ’對(duì)$uri做uridecode,然后做抽象化處理’,uri_abs_crc32 bigint unsigned NOT NULL DEFAULT ’0’ COMMENT ’對(duì)上面uri_abs字段計(jì)算crc32’,args_abs varchar(200) NOT NULL DEFAULT ’’ COMMENT ’對(duì)$args做uridecode,然后做抽象化處理’,args_abs_crc32 bigint unsigned NOT NULL DEFAULT ’0’ COMMENT ’對(duì)上面args字段計(jì)算crc32’,time_local timestamp NOT NULL DEFAULT ’0000-00-00 00:00:00’,response_code smallint NOT NULL DEFAULT ’0’,bytes_sent int NOT NULL DEFAULT ’0’ COMMENT ’發(fā)送給客戶端的響應(yīng)大小’,request_time float(6,3) NOT NULL DEFAULT ’0.000’,user_ip varchar(40) NOT NULL DEFAULT ’’,cdn_ip varchar(15) NOT NULL DEFAULT ’’ COMMENT ’CDN最后節(jié)點(diǎn)的ip:空字串表示沒經(jīng)過(guò)CDN; - 表示沒經(jīng)過(guò)CDN和F5’,request_method varchar(7) NOT NULL DEFAULT ’’,uri varchar(255) NOT NULL DEFAULT ’’ COMMENT ’$uri,已做uridecode’,args varchar(255) NOT NULL DEFAULT ’’ COMMENT ’$args,已做uridecode’,referer varchar(255) NOT NULL DEFAULT ’’ COMMENT ’’,KEY time_local (time_local),KEY uri_abs_crc32 (uri_abs_crc32),KEY args_abs_crc32 (args_abs_crc32) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 row_format=compressed'##### 自定義部分結(jié)束 ###### 主機(jī)名global serverserver = gethostname()# 今天零點(diǎn)global today_starttoday_start = time.strftime(’%Y-%m-%d’, time.localtime()) + ’ 00:00:00’# 將pymysql對(duì)于操作中的警告信息轉(zhuǎn)為可捕捉的異常warnings.filterwarnings(’error’, category=pymysql.err.Warning)def my_connect(): '''鏈接數(shù)據(jù)庫(kù)''' global connection, con_cur try:connection = pymysql.connect(host=mysql_host, user=mysql_user, password=mysql_passwd, charset=’utf8mb4’, port=mysql_port, autocommit=True, database=mysql_database) except pymysql.err.MySQLError as err:print(’Error: ’ + str(err))exit(20) con_cur = connection.cursor()def create_table(t_name): '''創(chuàng)建各站點(diǎn)對(duì)應(yīng)的表''' my_connect() try:con_cur.execute(creat_table.format(t_name)) except pymysql.err.Warning:passdef process_line(line_str): ''' 處理每一行記錄 line_str: 該行數(shù)據(jù)的字符串形式 ''' processed = log_pattern_obj.search(line_str) if not processed:’’’如果正則根本就無(wú)法匹配該行記錄時(shí)’’’print('Can’t process this line: {}'.format(line_str))return server, ’’, 0, ’’, 0, ’’, ’’, ’’, ’’, ’’, ’’ else:# remote_addr (客戶若不經(jīng)過(guò)代理,則可認(rèn)為用戶的真實(shí)ip)remote_addr = processed.group(’remote_addr’)# time_localtime_local = processed.group(’time_local’)# 轉(zhuǎn)換時(shí)間為mysql date類型ori_time = time.strptime(time_local.split()[0], ’%d/%b/%Y:%H:%M:%S’)new_time = time.strftime(’%Y-%m-%d %H:%M:%S’, ori_time)# 處理uri和argsrequest = processed.group(’request’)request_further = request_uri_pattern_obj.search(request)if request_further: request_method = request_further.group(’request_method’) request_uri = request_further.group(’request_uri’) uri_args = request_uri.split(’?’, 1) # 對(duì)uri和args進(jìn)行urldecode uri = unquote(uri_args[0]) args = ’’ if len(uri_args) == 1 else unquote(uri_args[1]) # 對(duì)uri和args進(jìn)行抽象化 uri_abs = text_abstract(uri, ’uri’) args_abs = text_abstract(args, ’args’) # 對(duì)庫(kù)里的uri_abs和args_abs字段進(jìn)行crc32校驗(yàn) uri_abs_crc32 = crc32(uri_abs.encode()) args_abs_crc32 = 0 if args_abs == ’’ else crc32(args_abs.encode())else: print(’$request abnormal: {}’.format(line_str)) request_method = ’’ uri = request uri_abs = ’’ uri_abs_crc32 = 0 args = ’’ args_abs = ’’ args_abs_crc32 = 0# 狀態(tài)碼,字節(jié)數(shù),響應(yīng)時(shí)間response_code = processed.group(’status’)bytes_sent = processed.group(’body_bytes_sent’)request_time = processed.group(’request_time’)# user_ip,cdn最后節(jié)點(diǎn)ip,以及是否經(jīng)過(guò)F5http_x_forwarded_for = processed.group(’http_x_forwarded_for’)ips = http_x_forwarded_for.split()# user_ip:用戶真實(shí)ip# cdn_ip: CDN最后節(jié)點(diǎn)的ip,’’表示沒經(jīng)過(guò)CDN;’-’表示沒經(jīng)過(guò)CDN和F5if http_x_forwarded_for == ’-’: ’’’沒經(jīng)過(guò)CDN和F5’’’ user_ip = remote_addr cdn_ip = ’-’elif ips[0] == remote_addr: ’’’沒經(jīng)過(guò)CDN,經(jīng)過(guò)F5’’’ user_ip = remote_addr cdn_ip = ’’else: ’’’經(jīng)過(guò)CDN和F5’’’ user_ip = ips[0].rstrip(’,’) cdn_ip = ips[-1]return (server, uri_abs, uri_abs_crc32, args_abs, args_abs_crc32, new_time, response_code, bytes_sent,request_time, user_ip, cdn_ip, request_method, uri, args)def text_abstract(text, what): '''進(jìn)一步處理uri和args,將其做抽象化,方便對(duì)其進(jìn)行歸類 如uri: /article/10.html 抽象為 /article/?.html 如args: s=你好&type=0 抽象為 s=?&type=? 規(guī)則:待處理部分由[a-zA-Z-_]組成的,則保留,其他情況值轉(zhuǎn)為’?’ ''' tmp_abs = ’’ if what == ’uri’:uri_list = [tmp for tmp in text.split(’/’) if tmp != ’’]if len(uri_list) == 0: ’’’uri為'/'的情況’’’ tmp_abs = ’/’else: for i in range(len(uri_list)):if not re.match(r’[a-zA-Z-_]+?(..*)?$’, uri_list[i]): ’’’uri不符合規(guī)則時(shí),進(jìn)行轉(zhuǎn)換’’’ if ’.’ in uri_list[i]:if not re.match(r’[a-zA-Z-_]+$’, uri_list[i].split(’.’)[0]): uri_list[i] = ’?.’ + uri_list[i].split(’.’)[1] else:uri_list[i] = ’?’ for v in uri_list:tmp_abs += ’/{}’.format(v) if text.endswith(’/’):’’’如果原uri后面有'/',要保留’’’tmp_abs += ’/’ elif what == ’args’: if text == ’’:tmp_abs = ’’ else:try: tmp_dict = OrderedDict((tmp.split(’=’) for tmp in text.split(’&’))) for k, v in tmp_dict.items():if not re.match(r’[a-zA-Z-_]+$’, v): ’’’除了value值為全字母的情況,都進(jìn)行轉(zhuǎn)換’’’ tmp_dict[k] = ’?’ for k, v in tmp_dict.items():if tmp_abs == ’’: tmp_abs += ’{}={}’.format(k, v)else: tmp_abs += ’&{}={}’.format(k, v)except ValueError: ’’’參數(shù)中沒有= 或者 即沒&也沒= 會(huì)拋出ValueError’’’ tmp_abs = ’?’ return tmp_absdef insert_data(line_data, cursor, results, limit, t_name, l_name): ''' 記錄處理之后的數(shù)據(jù),累積limit條執(zhí)行一次插入 line_data:每行處理之前的字符串?dāng)?shù)據(jù); limit:每limit行執(zhí)行一次數(shù)據(jù)插入; t_name:對(duì)應(yīng)的表名; l_name:日志文件名 ''' line_result = process_line(line_data) results.append(line_result) # print(’len(result):{}’.format(len(result))) #debug if len(results) == limit:insert_correct(cursor, results, t_name, l_name)results.clear()print(’{} {} 處理至 {}’.format(time.strftime(’%H:%M:%S’, time.localtime()), l_name, line_result[5]))def insert_correct(cursor, results, t_name, l_name): '''在插入數(shù)據(jù)過(guò)程中處理異常''' insert_sql = ’insert into {} (server,uri_abs,uri_abs_crc32,args_abs,args_abs_crc32,time_local,response_code,’ ’bytes_sent,request_time,user_ip,cdn_ip,request_method,uri,args) ’ ’values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)’.format(t_name) try:cursor.executemany(insert_sql, results) except pymysql.err.Warning as err:print(’n{} Warning: {}’.format(l_name, err)) except pymysql.err.MySQLError as err:print(’n{} Error: {}’.format(l_name, err))print(’插入數(shù)據(jù)時(shí)出錯(cuò)...n’)connection.close()exit(10)def get_prev_num(t_name, l_name): '''取得今天已入庫(kù)的行數(shù) t_name:表名 l_name:日志文件名''' try:con_cur.execute(’select min(id) from {0} where time_local=(’’select min(time_local) from {0} where time_local>='{1}')’.format(t_name, today_start))min_id = con_cur.fetchone()[0]if min_id is not None: # 假如有今天的數(shù)據(jù) con_cur.execute(’select max(id) from {}’.format(t_name)) max_id = con_cur.fetchone()[0] con_cur.execute(’select count(*) from {} where id>={} and id<={} and server='{}'’.format(t_name, min_id, max_id, server)) prev_num = con_cur.fetchone()[0]else: prev_num = 0return prev_num except pymysql.err.MySQLError as err:print(’Error: {}’.format(err))print(’Error:未取得已入庫(kù)的行數(shù),本次跳過(guò){}n’.format(l_name))returndef del_old_data(t_name, l_name, n=3): '''刪除n天前的數(shù)據(jù),n默認(rèn)為3''' # n天前的日期間 three_days_ago = time.strftime(’%Y-%m-%d %H:%M:%S’, time.localtime(time.time() - 3600 * 24 * n)) try:con_cur.execute(’select max(id) from {0} where time_local=(’’select max(time_local) from {0} where time_local!='0000-00-00 00:00:00' and time_local<='{1}')’.format( t_name, three_days_ago))max_id = con_cur.fetchone()[0]if max_id is not None: con_cur.execute(’delete from {} where id<={}’.format(t_name, max_id)) except pymysql.err.MySQLError as err:print(’n{} Error: {}’.format(l_name, err))print(’未能刪除表{}天前的數(shù)據(jù)...n’.format(n))def main_loop(log_name): '''主邏輯 log_name:日志文件名''' table_name = log_name.split(’.access’)[0].replace(’.’, ’_’) # 將域名例如m.api轉(zhuǎn)換成m_api,因?yàn)楸砻胁荒馨?’ results = [] # 創(chuàng)建表 create_table(table_name) # 當(dāng)前日志文件總行數(shù) num = int(subprocess.run(’wc -l {}’.format(log_dir + log_name), shell=True, stdout=subprocess.PIPE, universal_newlines=True).stdout.split()[0]) print(’num: {}’.format(num)) # debug # 上一次處理到的行數(shù) prev_num = get_prev_num(table_name, log_name) if prev_num is not None:# 根據(jù)當(dāng)前行數(shù)和上次處理之后記錄的行數(shù)對(duì)比,來(lái)決定本次要處理的行數(shù)范圍i = 0with open(log_name) as fp: for line in fp:i += 1if i <= prev_num: continueelif prev_num < i <= num: insert_data(line, con_cur, results, 1000, table_name, log_name)else: break# 插入不足1000行的resultsif len(results) > 0: insert_correct(con_cur, results, table_name, log_name) del_old_data(table_name, log_name)if __name__ == '__main__': # 檢測(cè)如果當(dāng)前已經(jīng)有該腳本在運(yùn)行,則退出 if_run = subprocess.run(’ps -ef|grep {}|grep -v grep|grep -v '/bin/sh'|wc -l’.format(argv[0]), shell=True, stdout=subprocess.PIPE).stdout if if_run.decode().strip(’n’) == ’1’:os.chdir(log_dir)logs_list = os.listdir(log_dir)logs_list = [i for i in logs_list if ’access’ in i and os.path.isfile(i) and i.split(’.access’)[0] in todo]if len(logs_list) > 0: # 并行 with Pool(len(logs_list)) as p:p.map(main_loop, logs_list)
報(bào)錯(cuò)如下
multiprocessing.pool.RemoteTraceback:'''Traceback (most recent call last): File '/usr/lib/python3.5/multiprocessing/pool.py', line 119, in worker result = (True, func(*args, **kwds)) File '/usr/lib/python3.5/multiprocessing/pool.py', line 44, in mapstar return list(map(*args)) File 'log.py', line 287, in main_loop create_table(table_name) File 'log.py', line 85, in create_table my_connect() File 'log.py', line 76, in my_connect charset=’utf8mb4’, port=mysql_port, autocommit=True, database=mysql_database) File '/usr/local/lib/python3.5/dist-packages/pymysql/__init__.py', line 90, in Connect return Connection(*args, **kwargs) File '/usr/local/lib/python3.5/dist-packages/pymysql/connections.py', line 706, in __init__ self.connect() File '/usr/local/lib/python3.5/dist-packages/pymysql/connections.py', line 922, in connect self.host_info = 'socket %s:%d' % (self.host, self.port)TypeError: %d format: a number is required, not str'''The above exception was the direct cause of the following exception:Traceback (most recent call last): File 'log.py', line 324, in <module> p.map(main_loop, logs_list) File '/usr/lib/python3.5/multiprocessing/pool.py', line 260, in map return self._map_async(func, iterable, mapstar, chunksize).get() File '/usr/lib/python3.5/multiprocessing/pool.py', line 608, in get raise self._valueTypeError: %d format: a number is required, not str
py3.5.2
這個(gè)哪里錯(cuò)了
問題解答
回答1:port不是int類型,mysql_port的需要填一個(gè)int而不是str類型的
相關(guān)文章:
1. html - eclipse 標(biāo)簽錯(cuò)誤2. javascript - js里首尾相接輪播的原理是什么?3. 求大神幫我看看是哪里寫錯(cuò)了 感謝細(xì)心解答4. php自學(xué)從哪里開始?5. javascript - 如果所有請(qǐng)求都放到actions 里面,那拿到的數(shù)據(jù)應(yīng)該 放在哪里,state 還是vue實(shí)例里面的data?6. javascript - 數(shù)組原聲方法中的一段代碼7. javascript - 怎么實(shí)現(xiàn)移動(dòng)端頁(yè)面滑動(dòng)切換,從1可以滑到2 但是不能從2滑回1 這樣的效果呢?8. 數(shù)據(jù)庫(kù) - MySQL 單表500W+數(shù)據(jù),查詢超時(shí),如何優(yōu)化呢?9. phpstady在win10上運(yùn)行10. python - 管道符和ssh傳文件
