24小时服务热线:155-2701-0680
EN 中文 EN
当前位置:本站首页 > 文章详情 > 基于机器学习的欧洲杯赛事预测

基于机器学习的欧洲杯赛事预测

发布时间 2022-12-18 03:05:55 作者 阅读 799次
由于2020年疫情的影响,欧洲杯延期到了今年举行,24支劲旅将在31天内,展开51场精彩对决。作为与奥运会、世界杯比肩的世界顶级三大体育赛事之一,四年一届的欧洲杯吸引了全世界球迷的目光。
欧足联为纪念欧洲杯60周年,本届赛事将采无主办国的巡回赛方式在欧洲的12个国家13个城市举行:丹麦哥本哈根、比利时布鲁塞尔、匈牙利布达佩斯、荷兰阿姆斯特丹、爱尔兰都柏林、罗马尼亚布加勒斯特、苏格兰格拉斯哥、西班牙毕尔巴鄂、阿塞拜疆巴库、德国慕尼黑、意大利罗马、俄罗斯圣彼得堡、英格兰伦敦。其中半决赛和决赛都将在伦敦的温布利球场举行。
本届比赛的参赛球队共24支,分成6个小组,每个小组前两名和4支成绩最好的第三名球队晋级16强,随后是淘汏赛,直至决出最后冠军。

分组情况如下:
A组:土耳其、意大利、威尔士、瑞士。 B组:丹麦、芬兰、比利时、俄罗斯。 C组:荷兰、乌克兰、奥地利、北马其顿。 D组:英格兰、克罗地亚、捷克、苏格兰。 E组:西班牙、瑞典、波兰、斯洛伐克。 F组:德国、法国、葡萄牙、匈牙利。 作为一名AI攻城狮和球迷,我采用了AI开发平台ModelArts 对各参赛队伍的实力情况进行分析,并结合数据挖掘、机器学习等人工智能技能来分析和预测欧洲杯赛果。

运行环境
进入Notebook后,请在本页面的右上角确认所选择的kernel是PySpark-2.3.2,选择的规格是CPU:2核 4GB
该规格下运行全部cell大约需要17分钟

Notebook内容概述
总体上,本notebook从数据处理、模型构建、赛果预测三部分进行介绍。

1 数据处理
本案例数据集为1872年到2021年5月28日的所有比赛数据,包括FIFA世界杯、世界杯预选赛、欧洲杯、友谊赛等,共 42105 场比赛的数据。

# 获取数据集
Resolving proxy-notebook.modelarts-dev-proxy.com (proxy-notebook.modelarts-dev-proxy.com)... 192.168.0.172
Connecting to proxy-notebook.modelarts-dev-proxy.com (proxy-notebook.modelarts-dev-proxy.com)|192.168.0.172|:8083... connected.
Proxy request sent, awaiting response... 200 OK
Length: 3184817 (3.0M) [text/csv]
Saving to: ‘raw_data.csv’
raw_data.csv        100%[===================>]   3.04M  --.-KB/s    in 0.02s   
2021-09-24 15:37:44 (153 MB/s) - ‘raw_data.csv’ saved [3184817/3184817]
import numpy as np import pandas as pd # 读取数据集
df = pd.read_csv("raw_data.csv")
df['date']= pd.to_datetime(df['date']) # 查看前5行的数据 df.head(5)
该数据集共包含9个特征:
    date:比赛日期
    home_team:主队名称
    away_team:客队名称
    home_score:主队分数(不含点球)
    away_score:客队分数(不含点球)
    tournament:比赛类型(FIFA世界杯、世界杯预选赛、欧洲杯、友谊赛等)
    city:比赛所在城市
    country:比赛所在国家或地区
    neutral:比赛是否在中立场地进行
预测足球赛事的输赢可以看做一个二分类问题(将平局看做负)。
本次预测标签为赛果(win_result),计算home_score和away_score的差值,若两者差值大于0,则赛果win_result为胜(1),反之为负(0)。
df['diff'] = df['home_score']-df['away_score']
df['win_result'] = df['diff'].apply(lambda x: 1 if x>0 else 0)

1.2 特征衍生
本小节将做特征进行衍生,主要分为5种衍生方式:
    主队最近5、2、1场比赛的胜负情况,包括胜利场数、失败场数、净胜球(主队分数减去客队分数)均值。
    主队与当前客队的最近5、2、1场比赛的胜负情况,包括胜利场数、失败场数、净胜球均值。
    主队最近15、7、3、2、1年比赛的胜负情况,包括总场数、胜利场数、失败场数。
    主客队历史上全部赛事的胜负情况,。
    赛事发生的月份、季节。
# 剔除友谊赛的数据,共24908个样本 df = df[df['tournament']!='Friendly']

1.2.1 主队最近5、2、1场比赛的胜负情况
当前主队在历史上的比赛场次有可能少于5场、2场、1场。
获取的12个衍生特征:
    num_5:最近5场比赛场次(可能出现不满5场的情况)
    win_num_5:最近5场胜场次
    lose_num_5:最近5场负场次
    diff_mean_5:最近5场得分差平均值
    num_2:最近2场比赛场次(可能出现不满2场的情况)
    win_num_2:最近2场胜场次
    lose_num_2:最近2场负场次
    diff_mean_2:最近2场得分差平均值
    num_1:最近1场比赛场次(可能出现不满2场的情况)
    win_num_1:最近1场胜场次
    lose_num_1:最近1场负场次
    diff_1:最近1场得分差平均值
def process_home_team_latest_info(df): row_num =df.shape[0] for i in range(row_num):
        home_team = df.loc[i,'home_team'] for suffix in ['5','3','1']:
            j = i-1 flag = False while i-j<=int(suffix) and j>=0 and df.loc[j,'home_team'] == home_team:       
                j-=1 flag = True j = j+1 if flag and i-j<=int(suffix) and j>=0:
                df.loc[i,'num_'+suffix]=i-j
                df.loc[i,'diff_num_'+suffix]=df.loc[j:i-1,'diff'].mean()
                df.loc[i,'win_num_'+suffix]=df.loc[j:i-1,'win_result'].sum()  
                df.loc[i,'lose_num_'+suffix]=i-j-df.loc[j:i-1,'win_result'].sum()
# 按照主队、日期进行排序,计算最近5、2、1场的胜利场数、失败场数、净胜球均值、胜率 df = df.sort_values(['home_team','date']).reset_index()
process_home_team_latest_info(df)
df.head(5)

1.2.2 主队与当前客队的最近5、2、1场比赛的胜负情况
主队与当前客队在历史上的比赛场次有可能少于5场、2场、1场。
获取的12个衍生特征:
    num_team_5:最近5场比赛场次(可能出现不满5场的情况)
    win_num_team_5:最近5场胜场次
    lose_num_team_5:最近5场负场次
    diff_mean_team_5:最近5场得分差平均值
    num_team_2:最近2场比赛场次(可能出现不满2场的情况)
    win_num_team_2:最近2场胜场次
    lose_num_team_2:最近2场负场次
    diff_mean_team_2:最近2场得分差平均值
    nu_team_1:最近1场比赛场次(可能出现不满1场的情况)
    win_nu_team_1:最近1场胜场次
    lose_num_team_1:最近1场负场次
    diff_team_1:最近1场得分差平均值
def process_home_away_team_latest_info(df): row_num =df.shape[0] for i in range(row_num):
        home_team = df.loc[i,'home_team']
        away_team = df.loc[i,'away_team'] for suffix in ['5','3','1']:
            j = i-1 flag = False while i-j<=int(suffix) and j>=0 and df.loc[j,'home_team'] == home_team and df.loc[j,'away_team'] == away_team:       
                j-=1 flag = True j = j+1 if flag and i-j<=int(suffix) and j>=0:
                df.loc[i,'num_team_'+suffix]=i-j
                df.loc[i,'diff_num_team_'+suffix]=df.loc[j:i-1,'diff'].mean()
                df.loc[i,'win_num_team_'+suffix]=df.loc[j:i-1,'win_result'].sum() for suffix in ['5','3','1']:
        df['lose_num_team_'+suffix] = df['num_team_'+suffix]-df['win_num_team_'+suffix]

# df = df.drop(columns=['level_0']) df = df.sort_values(['home_team','away_team','date']).reset_index()
process_home_away_team_latest_info(df)
df.head(5)

1.2.3 主队最近15、7、3、2、1年比赛的胜负情况
获取主队在过去的15年、7年、3年、2年、1年中的比赛场数、胜利场数、失败场数。
def process_home_year_latest_info(df): row_num =df.shape[0] for i in range(row_num):
        home_team = df.loc[i,'home_team']
        home_team_year = df.loc[i,'date'].year for suffix in ['15','7','3','2','1']:
            j = i-1 flag = False while j>=0 and df.loc[j,'home_team'] == home_team and home_team_year-df.loc[j,'date'].year<=int(suffix):       
                j-=1 flag = True j = j+1 if flag and j>=0:
                df.loc[i,'num_year_'+suffix]=i-j
                df.loc[i,'diff_num_year_'+suffix]=df.loc[j:i-1,'diff'].mean()
                df.loc[i,'win_num_year_'+suffix]=df.loc[j:i-1,'win_result'].sum() for suffix in ['15','7','3','2','1']:
        df['lose_num_year_'+suffix]= df['num_year_'+suffix] - df['win_num_year_'+suffix]

df = df.drop(columns=['level_0'])
df = df.sort_values(['home_team','date']).reset_index()
process_home_year_latest_info(df)
df.head(5)

1.2.4 主队过去全部比赛的胜负情况
获取主客队在过去全部比赛的比赛场数、胜利场数、失败场数、胜率、净进球均值。

    home_num:当前主队作为主场的比赛场次
    home_win_num:当前主队作为主场的比赛胜场次
    home_lose_num:当前主队作为主场的比赛负场次
    home_win_rate:当前主队作为主场的胜率
    away_num:当前客队作为客场的比赛场次
    away_win_num:当前客队作为客场的比赛胜场次
    away_lose_num:当前客队作为客场的比赛负场次
    away_win_rate:当前客队作为客场的胜率
def process_home_away(df_home_team,df_away_team): row_num =df.shape[0] for i in range(row_num):
        home_team = df_home_team.loc[i,'home_team']
        away_team = df_home_team.loc[i,'away_team']
        j = i-1 flag = False while j>=0 and df_home_team.loc[j,'home_team'] == home_team:       
            j-=1 flag = True j = j+1 if flag and j>=0:
            df_home_team.loc[i,'home_num']=i-j
            df_home_team.loc[i,'home_win_num']=df_home_team.loc[j:i-1,'win_result'].sum()  
            df_home_team.loc[i,'home_lose_num']=i-j-df_home_team.loc[j:i-1,'win_result'].sum()
            df_home_team.loc[i,'home_win_rate']=df_home_team.loc[i,'home_win_num']/df_home_team.loc[i,'home_num']
        away_index = df_away_team[df_away_team['index']==df_home_team.loc[i,'index']].index[0]
        away_index_j = away_index -1 flag = False while away_index_j>=0 and df_away_team.loc[away_index_j,'away_team'] == away_team:       
            away_index_j-=1 flag = True away_index_j = away_index_j+1 if flag and away_index_j>=0:
            df_home_team.loc[i,'away_num']=away_index-away_index_j
            df_home_team.loc[i,'away_win_num']=df_away_team.loc[away_index_j:away_index-1,'win_result'].sum()  
            df_home_team.loc[i,'away_lose_num']= df_home_team.loc[i,'away_num'] - df_home_team.loc[i,'away_win_num']
            df_home_team.loc[i,'away_win_rate']=df_home_team.loc[i,'away_win_num']/df_home_team.loc[i,'away_num']

df = df.drop(columns=['level_0'])
process_away_team = df.sort_values(['away_team','date']).reset_index()
process_away_team['win_result']= process_away_team['win_result'].apply(lambda x: 0 if x==1 else 1)
df = df.sort_values(['home_team','date']).reset_index()

process_home_away(df,process_away_team)
df.head(5)

1.2.5 比赛发生的月份和季节
df = df.drop(columns=['level_0'])
df = df.sort_values(['date']).reset_index()
df['month']= df['date'].dt.month
df['season'] = df['month'].apply(lambda x: int(x/3))
# 保存处理后的数据 df.to_csv('footballdata.csv')

2 建模
足球赛果预测可以转化为二分类问题,本小节采用机器学习分类模型对足球赛事进行预测。分类指标采用AUC、F1,指标的具体含义参考:https://blog.csdn.net/HappyRocking/article/details/80082304
建模过程分为: 1.读取数据 2.数据处理 3.数据分割 4.训练模型 5.模型评估
读取数据
from pyspark.sql import SparkSession class MLSReadData: def __init__(self,
                 input_file_path, format="csv",
                 has_header=True,
                 delimiter=","): """
        read dataset
        :param input_file_path:
        :param format:
        :param has_header:
        :param delimiter:
        """ self.input_file_path = input_file_path
        self.format = format self.has_header = has_header
        self.delimiter = delimiter
        self._outputs = {} def run(self): spark = SparkSession.builder.getOrCreate()
        input_df = spark.read \
            .format(self.format) \
            .option("header", self.has_header) \
            .option("delimiter", self.delimiter) \
            .option("inferSchema", True) \
            .load(self.input_file_path.strip())
        column_names = input_df.columns for column in column_names:
            input_df = input_df.withColumnRenamed(column, column.strip())
        self._outputs = { "output_port_1": input_df
        } def get_outputs(self): return self._outputs
params = { "input_file_path": "./footballdata.csv", #@param {"label":"input_file_path","type":"string","required":"true","helpTip":""} "format": "csv", #@param {"label":"format","type":"string","required":"false","helpTip":""} "has_header": True, #@param {"label":"has_header","type":"boolean","required":"false","helpTip":""} "delimiter": "," #@param {"label":"delimiter","type":"string","required":"false","helpTip":""} }
read_data = MLSReadData(**params)
read_data.run() #@output {"label":"dataframe","name":"read_data.get_outputs()['output_port_1']","type":"DataFrame"}

数据类型转换
from pyspark.sql.types import StringType, IntegerType, LongType, FloatType, DoubleType, BooleanType, DateType, \
    TimestampType from pyspark.sql.functions import col class MLSModifyDataType: """
    modify datatype of dataframe
    """ def __init__(self,
                 inputs,
                 column_type_map_str ): """
        init
        :param inputs:
            dic of upstream node output, should have key: dataframe
        :param column_type_map_str: the format like: "column_a:string,column_b:integer",
            column type can be: string,integer,long,float,double,bool.date,timestamp
        """ self.inputs = inputs
        self.column_type_map_str = column_type_map_str
        self.dataframe = None self.column_type_map = {}
        self._outputs = {} def _check_and_solve_input_param_when_output(self): # check param inputs if not isinstance(self.inputs, dict): raise Exception("parameter \"inputs\" should be dict and has key \"dataframe\"") if "dataframe" not in self.inputs: raise Exception("parameter \"inputs\" should have key: \"dataframe\"")
        self.dataframe = self.inputs["dataframe"] # check and solve column_type_map_str if self.column_type_map_str is None or not isinstance(self.column_type_map_str, str) \ or not self.column_type_map_str.strip(): raise Exception("should input parameter \"column_type_map\", and the type should string")
        pairs = self.column_type_map_str.strip().split(",") for pair in pairs:
            array = pair.strip().split(":") if len(array) != 2: raise Exception("parameter \"column_type_map_str\" should obey the format," "like \"column_a:string,column_b:integer\"")
            self.column_type_map[array[0].strip()] = array[1].strip() def _execute_self_node_output(self): data_type_map = { "string": StringType(), "integer": IntegerType(), "long": LongType(), "float": FloatType(), "double": DoubleType(), "bool": BooleanType(), "date": DateType(), "timestamp": TimestampType()
        }
        result_dataframe = self.dataframe for (column_name, data_type) in self.column_type_map.items():
            result_dataframe = result_dataframe.withColumn(column_name,
                                                           col(column_name).cast(data_type_map[data_type]))
        self._outputs = { "output_port_1": result_dataframe
        } def run(self): self._check_and_solve_input_param_when_output()
        self._execute_self_node_output() def get_outputs(self): return self._outputs
    
inputs = { "dataframe": read_data.get_outputs()['output_port_1'] #@input {"label":"dataframe","type":"DataFrame"} }
params = { "inputs": inputs, "column_type_map_str": "neutral:string,month:string,season:string" #@param {"label":"column_type_map_str","type":"string","required":"true","helpTip":""} }
modify_data_type = MLSModifyDataType(**params)
modify_data_type.run() #@output {"label":"dataframe","name":"modify_data_type.get_outputs()['output_port_1']","type":"DataFrame"}

缺失值填充,将缺失填充为0
from pyspark.sql.functions import when import pyspark.sql.functions as F class MLSMissingValueImpute: """
    Impute missing value
    """ def __init__(self,inputs): self.dataframe = inputs["dataframe"]      
        self._outputs = {} def run(self): missing_columns=[] for col in df.columns: if self.dataframe.filter(self.dataframe[col].isNull()).count()>0:
                missing_columns.append(col) #             print(col, "\t", "with null values: ", count) for col in missing_columns:
            self.dataframe = self.dataframe.withColumn(col,when(self.dataframe[col].isNull() == True, F.lit(0)).otherwise(self.dataframe[col]))
        self._outputs = {"output_port_1": self.dataframe} def get_outputs(self): return self._outputs

inputs = { "dataframe": modify_data_type.get_outputs()['output_port_1'] #@input {"type":"DataFrame", "label": "dataframe"} }
params = { "inputs": inputs
}

missing_value_impute=MLSMissingValueImpute(**params)
missing_value_impute.run() #@output {"label":"dataframe","name":"missing_value_impute.get_outputs()['output_port_1']","type":"DataFrame"}

数据集行过滤,筛选日期2015-01-01至2019-12-31的数据作为训练集
from pyspark.sql.dataframe import DataFrame class MLSDatasetFilter: """
    dataset filter
    """ def __init__(self,
                 inputs,
                 column_name,
                 condition_map_str ): self.inputs = inputs
        self.dataframe = None self.column_name = column_name
        self.condition_map_str = condition_map_str
        self.condition_map = {}
        self._outputs = {} def _check_and_solve_param(self): # check param inputs if not isinstance(self.inputs, dict): raise Exception("parameter \"inputs\" should be dict and has key \"dataframe\"") if "dataframe" not in self.inputs: raise Exception("parameter \"inputs\" should have key: \"dataframe\"")
        self.dataframe = self.inputs["dataframe"] # check param type if not isinstance(self.dataframe, DataFrame): raise Exception("parameter \"dataframe\" should be DataFrame of pyspark") if not isinstance(self.column_name, str): raise Exception("parameter \"column_name\" should be str") if not isinstance(self.condition_map_str, str): raise Exception("parameter \"condition_map_str\" should be str") # solve param condition_map_str pairs = self.condition_map_str.strip().split(";") for pair in pairs:
            array = pair.strip().split(":") if len(array) != 1 and len(array) != 2: raise Exception( "parameter \"condition_map_str\" should have fixed format, please read the annotation.") if len(array) == 2:
                self.condition_map[array[0].strip()] = array[1].strip() elif len(array) == 1:
                self.condition_map[array[0].strip()] = "" def _execute(self): res_dataframe = self.dataframe for (operator, value) in self.condition_map.items():
            condition_expr = self.column_name.strip() + " " + operator.strip()
            formated_operator = operator.strip().upper() if formated_operator == 'BETWEEN' or formated_operator == 'NOT BETWEEN':
                value_array = value.split(',') if len(value_array) != 2: raise Exception("if use expr 'between' or 'not between', the range value string should be" "separated by comma, and the result should be array with length 2")
                condition_expr = condition_expr + " '" + value_array[0].strip() + "' AND '" + value_array[1].strip() \
                                 + "'" elif formated_operator == 'IS NULL' or formated_operator == 'IS NOT NULL':
                condition_expr = condition_expr else:
                condition_expr = condition_expr + " '" + value.strip() + "'" res_dataframe = res_dataframe.filter(condition_expr)
        self._outputs = { "output_port_1": res_dataframe
        } def run(self): self._check_and_solve_param()
        self._execute() def get_outputs(self): return self._outputs

inputs = { "dataframe":missing_value_impute.get_outputs()['output_port_1'] #@input {"label":"dataframe","type":"DataFrame"} }
params = { "inputs": inputs, "column_name": "date", #@param {"label":"column_name","type":"string","required":"true","helpTip":""} "condition_map_str": "BETWEEN:2015-01-01,2019-12-31" #@param {"label":"condition_map_str","type":"string","required":"true","helpTip":""} }
dataset_filter_train_data = MLSDatasetFilter(**params)
dataset_filter_train_data.run() #@output {"label":"dataframe","name":"dataset_filter_train_data.get_outputs()['output_port_1']","type":"DataFrame"}

数据集行过滤,筛选日期2020-01-01至2021-05-31的数据作为验证集
inputs = { "dataframe":missing_value_impute.get_outputs()['output_port_1'] #@input {"label":"dataframe","type":"DataFrame"} }
params = { "inputs": inputs, "column_name": "date", #@param {"label":"column_name","type":"string","required":"true","helpTip":""} "condition_map_str": "BETWEEN:2020-01-01,2021-05-31" #@param {"label":"condition_map_str","type":"string","required":"true","helpTip":""} }
dataset_filter_valid_data = MLSDatasetFilter(**params)
dataset_filter_valid_data.run() #@output {"label":"dataframe","name":"dataset_filter_valid_data.get_outputs()['output_port_1']","type":"DataFrame"}

训练集选择特征列作为模型输入
class MLSSelectColumns: """
    select columns
    """ def __init__(self,
                 inputs,
                 selected_cols_str): """
        select specified columns of dataframe
        :param inputs:
            dic of upstream node output, should have key: dataframe
        :param dataframe: dataframe for selecting some columns
        :param selected_cols_str: columns's string, separated bu comma
        """ self.inputs = inputs
        self.selected_cols_str = selected_cols_str
        self.dataframe = None self.selected_cols = []
        self._outputs = {} def _check_and_solve_input_param_when_output(self): # check param inputs if not isinstance(self.inputs, dict): raise Exception("parameter \"inputs\" should be dict and has key \"dataframe\"") if "dataframe" not in self.inputs: raise Exception("parameter \"inputs\" should have key: \"dataframe\"")
        self.dataframe = self.inputs["dataframe"] # check selected_cols_str if self.selected_cols_str is None or not isinstance(self.selected_cols_str, str) \ or not self.selected_cols_str.strip(): raise Exception("should input parameter \"selected_cols_str\"")
        self.selected_cols = [column.strip() for column in self.selected_cols_str.strip().split(",")]
        column_set = set() for column in self.dataframe.columns:
            column_set.add(column) for select_col in self.selected_cols: if select_col not in column_set: raise Exception("column %s does't exist in dataframe columns" % select_col) def run(self): self._check_and_solve_input_param_when_output()
        result_df = self.dataframe.select(self.selected_cols)
        self._outputs = {"output_port_1": result_df} def get_outputs(self): return self._outputs

inputs = { "dataframe": dataset_filter_train_data.get_outputs()['output_port_1'] #@input {"label":"dataframe","type":"DataFrame"} }
params = { "inputs": inputs, "selected_cols_str": "month,  season,  home_team,  away_team,  tournament, neutral,  win_result,num_5,diff_num_5,win_num_5,lose_num_5,num_3,diff_num_3,win_num_3,lose_num_3,num_1,diff_num_1,win_num_1,lose_num_1,\
    num_team_5,diff_num_team_5,win_num_team_5,lose_num_team_5,num_team_3,diff_num_team_3,win_num_team_3,lose_num_team_3,num_team_1,diff_num_team_1,win_num_team_1,lose_num_team_1,\
    num_year_15,diff_num_year_15,win_num_year_15,lose_num_year_15,num_year_7,diff_num_year_7,win_num_year_7,lose_num_year_7,num_year_3,diff_num_year_3,win_num_year_3,lose_num_year_3,\
    num_year_2,diff_num_year_2,win_num_year_2,lose_num_year_2,num_year_1,diff_num_year_1,win_num_year_1,lose_num_year_1,\
    away_num,away_win_num,away_lose_num,away_win_rate,home_num,home_win_num,home_lose_num,home_win_rate"}
select_columns_train_data = MLSSelectColumns(**params)
select_columns_train_data.run() #@output {"label":"dataframe","name":"select_columns_train_data.get_outputs()['output_port_1']","type":"DataFrame"}


验证集选择特征列作为预测输入
inputs = { "dataframe": dataset_filter_valid_data.get_outputs()['output_port_1'] #@input {"label":"dataframe","type":"DataFrame"} }
params = { "inputs": inputs, "selected_cols_str": "month,  season,  home_team,  away_team,  tournament, neutral,  win_result,num_5,diff_num_5,win_num_5,lose_num_5,num_3,diff_num_3,win_num_3,lose_num_3,num_1,diff_num_1,win_num_1,lose_num_1,\
    num_team_5,diff_num_team_5,win_num_team_5,lose_num_team_5,num_team_3,diff_num_team_3,win_num_team_3,lose_num_team_3,num_team_1,diff_num_team_1,win_num_team_1,lose_num_team_1,\
    num_year_15,diff_num_year_15,win_num_year_15,lose_num_year_15,num_year_7,diff_num_year_7,win_num_year_7,lose_num_year_7,num_year_3,diff_num_year_3,win_num_year_3,lose_num_year_3,\
    num_year_2,diff_num_year_2,win_num_year_2,lose_num_year_2,num_year_1,diff_num_year_1,win_num_year_1,lose_num_year_1,\
    away_num,away_win_num,away_lose_num,away_win_rate,home_num,home_win_num,home_lose_num,home_win_rate"}
select_columns_valid_data = MLSSelectColumns(**params)
select_columns_valid_data.run() #@output {"label":"dataframe","name":"select_columns_valid_data.get_outputs()['output_port_1']","type":"DataFrame"}

训练模型,以逻辑回归分类为例
from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, \
    VectorAssembler, IndexToString, StandardScaler from pyspark.ml.linalg import VectorUDT from pyspark.sql.types import NumericType class MLSLogisticRegressionClassifier: """
    logistic regression classifier
    """ def __init__(self,
                 inputs,
                 b_output_action=True,
                 b_use_default_encoder=True,
                 input_features_str=None,
                 outer_pipeline_stages=None,
                 label_col=None,
                 classifier_label_index_col="label_index",
                 classifier_feature_vector_col="model_features",
                 prediction_col="prediction",
                 prediction_index_col="prediction_index",
                 max_iter=100,
                 reg_param=0.0,
                 elastic_net_param=0.0,
                 tol=1e-6,
                 fit_intercept=True,
                 standardization=True,
                 aggregation_depth=2,
                 family="auto",
                 lower_bounds_on_coefficients=None,
                 upper_bounds_on_coefficients=None,
                 lower_bounds_on_intercepts=None,
                 upper_bounds_on_intercepts=None ): """
        A logistic regression classifier
        :param inputs:
            dic of upstream node output, should have key: dataframe
        :param b_output_action:
            If true, the output of this class is a pipeline model;
            If it is false, only the random forest classifier output pipeline stage is available.
            In this case, users can edit the code of the workflow node for custom execution.
            (default: True)
        :param b_use_default_encoder:
            If true, use StringIndexer and OneHotEncoderEstimator for string features;
            use StandardScaler for numerical features; then train a random forest classifier
            and obtain a pipeline model.
            (default: True)
        :param dataframe:
            Used when b_output_action=true.
        :param input_features_str:
            Input features, separated by commas.
        :param outer_pipeline_stages:
            When users edit the code of a workflow node, the stages will be collected in the upper node.
        :param label_col:
            The target column of the dataframe.
        :param classifier_label_index_col:
            The label column value of the lr classifier
            (default: "label_index")
        :param classifier_feature_vector_col:
            The feature column of the lr classifier.
            (default: "model_features")
        :param prediction_col:
            Model prediction column name.
            (default: "prediction")
        :param prediction_index_col
            Model prediction index column name.
            (default: "prediction_index")
        :param max_iter:
            The maximum number of iterations
            (default: 100)
        :param reg_param:
            The regularizer parameter.
            (default; 0.0)
        :param elastic_net_param:
            ElasticNet mixed parameters, the range is [0, 1]. For alpha = 0, the penalty is L2 penalty.
            For alpha = 1, this is the L1 penalty.
            (default: 0.0)
        :param tol:
            The convergence tolerance for the iterative algorithms.
            (default; 1e-6)
        :param fit_intercept:
            Whether to fit an intercept term.
            (default: True)
        :param standardization:
            Whether to standardize the training features before fitting the model.
            (default: True)
        :param aggregation_depth:
            Suggested depth for treeAggregate.
            (default: 2)
        :param family:
            The name of family which is a description of the label distribution to be used in the model,
            Supported "auto", "binomial", "multinomial".
            (default: "auto")
        :param lower_bounds_on_coefficients:
            The lower bounds on coefficients if fitting under bound constrained optimization.
            (default: None)
        :param upper_bounds_on_coefficients:
            The upper bounds on coefficients if fitting under bound constrained optimization.
            (default: None)
        :param lower_bounds_on_intercepts:
            The lower bounds on intercepts if fitting under bound constrained optimization.
            (default: None)
        :param upper_bounds_on_intercepts:
            The upper bounds on intercepts if fitting under bound constrained optimization.
            (default: None)
        """ self.inputs = inputs
        self.b_output_action = b_output_action
        self.b_use_default_encoder = b_use_default_encoder
        self.input_features_str = input_features_str
        self.outer_pipeline_stages = outer_pipeline_stages
        self.label_col = label_col
        self.classifier_label_index_col = classifier_label_index_col
        self.classifier_feature_vector_col = classifier_feature_vector_col
        self.prediction_col = prediction_col
        self.prediction_index_col = prediction_index_col
        self.max_iter = max_iter
        self.reg_param = reg_param
        self.elastic_net_param = elastic_net_param
        self.tol = tol
        self.fit_intercept = fit_intercept
        self.standardization = standardization
        self.aggregation_depth = aggregation_depth
        self.family = family
        self.lower_bounds_on_coefficients = lower_bounds_on_coefficients
        self.upper_bounds_on_coefficients = upper_bounds_on_coefficients
        self.lower_bounds_on_intercepts = lower_bounds_on_intercepts
        self.upper_bounds_on_intercepts = upper_bounds_on_intercepts
        self.dataframe = None self._input_feature_cols = []
        self._df_column_type_map = {}
        self.labels = []
        self._outputs = {} def _check_and_solve_input_param_when_output(self): # check param inputs if not isinstance(self.inputs, dict): raise Exception("parameter \"inputs\" should be dict and has key \"dataframe\"")


X

登录

忘记密码? 还没有账号,立即注册
X

注册

已有账号,立即登录