0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

揭秘为什么Cloud TPU编写自定义估算器模型?

Tensorflowers 来源:未知 作者:王淳 2018-09-20 09:46 次阅读

文 / Google Cloud Platform 技术主管 Lak Lakshmanan (@lak_gcp)

张量处理单元 (TPU) 可加速处理 Google 内各种机器学习工作负载,并可供 Google Cloud 客户使用。您可以在Cloud TPU 参考模型存储区找到启用 TPU 的顶尖图像模型版本,例如 ResNet 和 AmoebaNet;您还可以使用强大的Tensor2Tensor库,在 TPU 上执行文本摘要和问答任务。这些教程会为您分步介绍如何使用当下很多最热门的 Cloud TPU 参考模型。

注:存储区链接

https://github.com/tensorflow/tpu

教程链接

https://cloud.google.com/tpu/docs/tutorials

但如果您拥有自定义 TensorFlow 模型,又该如何做呢?在本文中,我会逐步介绍编写自定义估算器以便在 Cloud TPU 上运行的全过程。在此过程中,我会指出需要注意的地方和建议采用的最佳实践。您可以在 GitHub 上找到此解决方案的完整代码;本文仅列出相关代码片段。

注:解决方案的完整代码链接

https://github.com/GoogleCloudPlatform/training-data-analyst/tree/master/courses/machine_learning/deepdive/08_image/flowersmodeltpu

自定义 TensorFlow 估算器包含以模型函数传递的基类估算器:

1def train_and_evaluate(output_dir, nsteps):

2estimator = tf.estimator.Estimator(

3model_fn = model_fn,

4model_dir = output_dir)

模型函数会接收特征、标签和模式,并返回 EstimatorSpec。例如,图像分类问题的模型函数可能包含

1def model_fn(features, labels, mode):

2# write the model to compute predictions, loss, etc. from the model

3

4return tf.estimator.EstimatorSpec(

5mode=mode,

6predictions={"probabilities": probabilities,

7 "classid": class_int, "class": class_str},

8loss=loss,

9train_op=train_op,

10eval_metric_ops=evalmetrics,

11export_outputs={'classes': tf.estimator.export.PredictOutput(

12{"probabilities": probabilities, "classid": class_int,

13 "class": class_str})}

14)

TensorFlow 中的 tf.contrib.tpu 包提供了包装器类,可助您以适当方式编写代码,以便在 CPUGPU 和 Cloud TPU 上运行代码。下面我们就来看看如何以这种不受加速器限制的方式编写自定义估计器。

1.将输入数据转换为 TF 记录

Cloud TPU 的速度非常快,一不小心,您的训练就会变成以读写(“馈入” 和 “馈出”)数据和存储检查点为主。让 TPU 等待输入/输出会造成浪费,所以我们会做几件事以充分利用 TPU 用于计算的时间。

首先是避免在估算器的输入函数中进行数据解析和整理,而是预先将数据转换为 TF 记录。与单个图像文件相比,批量处理 TF 记录更为简单,因为记录本身包含标签,如此可以减少系统必须读取的小型文件数量。我使用 Apache Beam 进行这种转换。您可以在官方 TPU 存储区找到读取 JPEG 和编写 TF 记录的脚本。您可以在 Cloud Dataflow 上大规模地执行 Apache Beam 程序,但如果您的数据源目前不在 Google Cloud 上,则只能在大型 VM 上本地执行此程序(请务必用 pip 安装 apache-beam)。

注:JPEG 和编写 TF 记录链接

https://github.com/tensorflow/tpu/blob/master/tools/datasets/jpeg_to_tf_record.py

TF 记录是字典。对于图像分类,上述管道编写的两个条目很重要,分别是:“image/class/label”(采用 int64)和 “image/encoded”(由 JPEG 文件内容组成)。

2.编写输入函数以读取 TF 记录

与任何估算器一样,您需要编写输入函数,以读取这些 TF 记录。使用 Dataset API 可极大地简化此任务,但还需注意几个问题。在讲解过程中,我会指出这些问题。

以下是我的输入函数:

1def make_input_fn(pattern, mode, num_cores=8, transpose_input=False):

2def _set_shapes(batch_size, images, labels):

3"""Statically set the batch_size dimension."""

4if transpose_input:

5images.set_shape(images.get_shape().merge_with(

6tf.TensorShape([None, None, None, batch_size])))

7labels.set_shape(labels.get_shape().merge_with(

6tf.TensorShape([batch_size])))

9else:

10images.set_shape(images.get_shape().merge_with(

11tf.TensorShape([batch_size, None, None, None])))

12labels.set_shape(labels.get_shape().merge_with(

13tf.TensorShape([batch_size])))

14return images, labels

15

16def _input_fn(params):

17batch_size = params['batch_size']

18is_training = (mode == tf.estimator.ModeKeys.TRAIN)

19

20# read the dataset

21dataset = tf.data.Dataset.list_files(pattern, shuffle=is_training)

22if is_training:

23dataset = dataset.repeat()

24def fetch_dataset(filename):

25buffer_size = 8 * 1024 * 1024 # 8 MiB per file

26dataset = tf.data.TFRecordDataset(filename, buffer_size=buffer_size)

27return dataset

28dataset = dataset.apply(

29tf.contrib.data.parallel_interleave(

30fetch_dataset, cycle_length=64, sloppy=True))

31dataset = dataset.shuffle(1024)

32

33# augment and batch

34dataset = dataset.apply(

35tf.contrib.data.map_and_batch(

36read_and_preprocess, batch_size=batch_size,

37num_parallel_batches=num_cores, drop_remainder=True

38))

39

40 if transpose_input:

41 dataset = dataset.map(

42 lambda images, labels: (tf.transpose(images, [1, 2, 3, 0]), labels),

43 num_parallel_calls=num_cores)

44

45# assign static shape

46dataset = dataset.map(

47functools.partial(_set_shapes, batch_size)

48)

49

50# prefetch data while training

51dataset = dataset.prefetch(tf.contrib.data.AUTOTUNE)

52return dataset

53

54return _input_fn

请注意,输入函数采用 params 参数。实际上,这将是传递至训练程序的命令行参数,如此一来,我们便可提取有关数据集的详情,例如训练次数和评估图像。

batch_size 很特别,因为 TPU 有多个核心,而 batch_size 由 TPU 估算器设置,且为有效批次大小。您必须完全返回 batch_size 记录,而不能返回部分填充的批次。由于您会无限期地循环使用训练数据,所以在训练期间不会出现此问题。但这意味着最简单的做法是将评估数据集向下舍入为核心数的倍数。如果核心数为 8,而评估数据集中有 1026 张图像,您只能使用前 1024 张图像进行评估。剩余的 2 张图像则会舍弃。(我们也有方法在 Cloud TPU 中处理最后剩下的部分批次,我就不在此赘述。)

与任何分布式训练一样,您应确保每个工作器看到不同的数据子集,这是由所有文件的并行交错及缓冲区本身内部的记录重排进行处理。

图像分类问题的一个常见需求是通过添加随机裁剪及翻转等方法来增强原始数据。我通过 read_and_preprocess 函数做到这一点。请注意,我将此函数应用于各个 TF 记录并创建了 8 个并行批次,同时舍弃剩余的任何记录(再次提醒,这在训练期间不会造成任何影响,因为您会无限期重复进行训练)。

接下来是转置数据。事实证明,在 TPU 中转置数据以保持批次大小可以极大地提高性能。因此,我们可以根据需要采取此做法。如果我们在 GPU 或 CPU 上运行程序,则 transpose_input 标记会变为 false。

TPU 需要静态大小的张量。尽管我们已确保维持这种情况(通过舍弃剩余的批次),但仍需为核心 TensorFlow 编写 Dataset API,这是更常见的做法。因此,我们调用一个函数,将数据集中的 batch_size 从 None 改为 batch_size。

最后的优化操作至关重要。我们需要预取数据。换句话说,当 TPU 处理一批记录时,我们会通过 I/O 线程寻找并提取下一批次。如此一来,我们便可最大限度地利用 TPU(或 GPU),而且这对 CPU 没有任何影响。

3.处理 TF 记录

(上述)输入函数会设置处理输入内容的方式,但实际的解析操作还需由名为 read_and_preprocess() 的方法执行。此方法如下所示:

1def read_and_preprocess(example_data):

2parsed = tf.parse_single_example(example_data, {

3'image/encoded': tf.FixedLenFeature((), tf.string, ''),

4'image/class/label': tf.FixedLenFeature([], tf.int64, 1),

5})

6image_bytes = tf.reshape(parsed['image/encoded'], shape=[])

7label = tf.cast(

8tf.reshape(parsed['image/class/label'], shape=[]), dtype=tf.int32) - 1

9

10# end up with pixel values that are in the -1, 1 range

11image = tf.image.decode_jpeg(image_bytes, channels=NUM_CHANNELS)

12image = tf.image.convert_image_dtype(image, dtype=tf.float32) # 0-1

13image = tf.expand_dims(image, 0) # resize_bilinear needs batches

14

15image = tf.image.resize_bilinear(

16image, [HEIGHT + 10, WIDTH + 10], align_corners=False)

17image = tf.squeeze(image) # remove batch dimension

18image = tf.random_crop(image, [HEIGHT, WIDTH, NUM_CHANNELS])

19image = tf.image.random_flip_left_right(image)

20image = tf.image.random_brightness(image, max_delta=63.0 / 255.0)

21image = tf.image.random_contrast(image, lower=0.2, upper=1.8)

22

23

24#pixel values are in range [0,1], convert to [-1,1]

25image = tf.subtract(image, 0.5)

26image = tf.multiply(image, 2.0)

27return image, label

这里有两个重要的注意事项。第一是使用 parse_single_example,因为我们是从 map() 调用此函数,所以会针对单个 TF 记录调用。我们从记录中提取相关信息(经过编码的图像和标签),然后将其用于构建必要的张量。第二个注意事项是,这些数据必须为数值。比如,我无法传回标签字符串,因为 TPU 只能处理数值型数据。我们需要在预处理管道中计算标签索引,标签此时只会是整数。

4.提供输入函数

训练模型之后,您需要部署此模型,并通过 TF Serving 提供。以下代码与您使用任何其他估算器时要用到的代码相同:

1def serving_input_fn():

2# Note: only handles one image at a time

3feature_placeholders = {'image_bytes':

4tf.placeholder(tf.string, shape=())}

5image, _ = read_and_preprocess(

6tf.squeeze(feature_placeholders['image_bytes']))

7features = {

8'image': tf.expand_dims(image, 0)

9}

10return tf.estimator.export.ServingInputReceiver(features, feature_placeholders)

TPU 已针对批次推理进行优化;如果您的用例需要在线预测,目前最好是通过 CPU 或 GPU 提供(根据批次大小和模型复杂程度而定)。编写输入函数时,我假定自己只传送一张图像,所以实际上是指通过 CPU/GPU 提供。

5.模型函数

模型函数需要创建并返回 TPUEstimatorSpec。实现方式如下所示:

1def image_classifier(features, labels, mode, params):

2image = features

3if isinstance(features, dict):

4image = features['image']

5

6ylogits, nclasses = cnn_model(image, mode, params)

7

8probabilities = tf.nn.softmax(ylogits)

9class_int = tf.cast(tf.argmax(probabilities, 1), tf.int32)

10class_str = tf.gather(LIST_OF_LABELS, class_int)

11

12if mode == tf.estimator.ModeKeys.TRAIN or mode == tf.estimator.ModeKeys.EVAL:

13loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(

14logits=ylogits, labels=tf.one_hot(labels, nclasses)))

15

16def metric_fn(class_int, labels):

17return {'accuracy': tf.metrics.accuracy(class_int, labels)}

18evalmetrics = (metric_fn, [class_int, labels])

19

20if mode == tf.estimator.ModeKeys.TRAIN:

21# this is needed for batch normalization, but has no effect otherwise

22update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)

23optimizer = tf.train.AdamOptimizer(learning_rate=params['learning_rate'])

24if params['use_tpu']:

25optimizer = tf.contrib.tpu.CrossShardOptimizer(optimizer) # TPU change 1

26with tf.control_dependencies(update_ops):

27train_op = optimizer.minimize(loss, tf.train.get_global_step())

28else:

29train_op = None

30else:

31loss = None

32train_op = None

33evalmetrics = None

34

35return tf.contrib.tpu.TPUEstimatorSpec( # TPU change 2

36mode=mode,

37predictions={"probabilities": probabilities,

38"classid": class_int, "class": class_str},

39loss=loss,

40train_op=train_op,

41eval_metrics=evalmetrics,

42export_outputs={'classes': tf.estimator.export.PredictOutput(

43{"probabilities": probabilities, "classid": class_int,

44"class": class_str})}

45)

所传入的特征可能会是图像(我的训练和评估输入函数)或字典(我的提供输入函数)。我进行检查并从特征中检索图像。

然后,我在图像上调用实际模型数学函数。这应该是使用 tf.layers 的常见 TensorFlow 代码。请浏览完整的源代码以了解其形式。

由于这是分类问题,所以我使用 softmax 并根据各个类别的 logit 计算整数标签和字符串标签,之后使用了 argmax 和 gather。我还计算了交叉熵损失,这与任何其他估算器类似。

其中一个区别在于,一般的估算器需要将评估指标用作字典,而 TPUEstimator 需要能够在控制 CPU 或 TPU 上调用的函数。因此,指定 eval 指标的方式稍有不同。

如果您使用 TPU,则所用的优化器必须包装在 CrossShardOptimizer 中。这样可以在不同核心中分配优化任务。

训练操作就是将此交叉碎片优化损失最小化。请使用 optimizer.minimize(),而非 layers.optimize_loss()。

将上述所有操作整合在一起后,系统会返回 TPU 估算器规范。

6.训练和评估循环

您可能很熟悉估算器的 train_and_evaluate 循环。可惜此循环(尚)无法与 TPU 有效配合使用。幸运的是,创建您自己的循环并不太难,这让您可以更好地控制检查频率和内容(回想这样的情景,您想尽可能减少过于频繁的检查导致的环境切换和 I/O 开销)。

1def train_and_evaluate(output_dir, hparams):

2STEPS_PER_EVAL = 1000

3max_steps = hparams['train_steps']

4eval_batch_size = min(1024, hparams['num_eval_images'])

5eval_batch_size = eval_batch_size - eval_batch_size % 8 # divisible by num_cores

6tf.logging.info('train_batch_size=%d eval_batch_size=%d max_steps=%d',

7hparams['train_batch_size'],

8eval_batch_size,

9max_steps)

10

11# TPU change 3

12if hparams['use_tpu']:

13tpu_cluster_resolver = tf.contrib.cluster_resolver.TPUClusterResolver(

14hparams['tpu'],

15zone=hparams['tpu_zone'],

16project=hparams['project'])

17config = tf.contrib.tpu.RunConfig(

18cluster=tpu_cluster_resolver,

19model_dir=output_dir,

20save_checkpoints_steps=STEPS_PER_EVAL,

21tpu_config=tf.contrib.tpu.TPUConfig(

22iterations_per_loop=STEPS_PER_EVAL,

23per_host_input_for_training=True))

24else:

25config = tf.contrib.tpu.RunConfig()

26

27estimator = tf.contrib.tpu.TPUEstimator( # TPU change 4

28model_fn=image_classifier,

29config=config,

30params=hparams,

31model_dir=output_dir,

32train_batch_size=hparams['train_batch_size'],

33eval_batch_size=eval_batch_size,

34use_tpu=hparams['use_tpu']

35)

首先,提取一些命令行参数,并用其指定最大步数以及训练和评估的批次大小。

接下来是寻找 TPU。如果您已在 Google 计算引擎上自行创建了 Cloud TPU,可能已为其命名。假设此名称(“tpu”)作为命令行参数传入。如果您使用 Cloud ML Engine,系统会自动推断 TPU 名称和区域等内容。请务必仅在已设置 use_tpu 标记的情况下执行此操作。如果用户是在 CPU 或 GPU 上运行程序,则只需创建空白 RunConfig。

接下来,使用模型函数、配置、参数和批次大小创建 TPUEstimator。创建估算器后,我们便可进入真实训练和评估循环:

1# load last checkpoint and start from there

2current_step = load_global_step_from_checkpoint_dir(output_dir)

3steps_per_epoch = hparams['num_train_images'] // hparams['train_batch_size']

4tf.logging.info('Training for %d steps (%.2f epochs in total). Current'

5' step %d.',

6max_steps,

7max_steps / steps_per_epoch,

8current_step)

9

10start_timestamp = time.time() # This time will include compilation time

11

12while current_step < hparams['train_steps']:

13# Train for up to steps_per_eval number of steps.

14# At the end of training, a checkpoint will be written to --model_dir.

15next_checkpoint = min(current_step + STEPS_PER_EVAL, max_steps)

16estimator.train(input_fn=train_input_fn, max_steps=next_checkpoint)

17current_step = next_checkpoint

18tf.logging.info('Finished training up to step %d. Elapsed seconds %d.',

19next_checkpoint, int(time.time() - start_timestamp))

20

21# Evaluate the model on the most recent model in --model_dir.

22# Since evaluation happens in batches of --eval_batch_size, some images

23# may be excluded modulo the batch size. As long as the batch size is

24# consistent, the evaluated images are also consistent.

25tf.logging.info('Starting to evaluate at step %d', next_checkpoint)

26eval_results = estimator.evaluate(

27input_fn=eval_input_fn,

28steps=hparams['num_eval_images'] // eval_batch_size)

29tf.logging.info('Eval results at step %d: %s', next_checkpoint, eval_results)

30

31elapsed_time = int(time.time() - start_timestamp)

32tf.logging.info('Finished training up to step %d. Elapsed seconds %d.',

33max_steps, elapsed_time)

TensorFlow 估算器的运作方式是从先前已有的检查点执行暖启动。我们可以加载输出目录中提供的检查点,以进行复制。然后,我们会一次性逐步完成训练数据 train_batch_size 步骤,直至达到所指定的最大步数。

在本文的例子中,我对完整评估数据集中的每个检查点都进行了评估,但显然,您可以减少此训练的计算量。

7.导出模型以供使用

最后,在完成训练后,我导出已保存的模型。您可以使用 TF Serving 或 Cloud ML Engine 来部署已保存的模型,以进行预测。

1# export similar to Cloud ML Engine / TF Serving convention

2tf.logging.info('Starting to export model.')

3estimator.export_savedmodel(

4export_dir_base=os.path.join(output_dir, 'export/exporter'),

5serving_input_receiver_fn=serving_input_fn)

此时,我们便有了一个可以在 Cloud TPU 上训练的自定义估算器模型。采用这种方式编写模型(例如使用 use_tpu 标记并提供转置为可选项),同样的代码也支持各种不同的硬件,包括 CPU 和 GPU 在内。因此,我们的估算器模型实际可用于全部的三类硬件。

后续步骤:

  • 从 GitHub 下载本文随附的代码,然后进行试用

  • 运行代码实验室,了解如何在 TPU 上运行 ResNet训练您自己的数据(无需编写任何代码)

注:代码链接

https://github.com/GoogleCloudPlatform/training-data-analyst/tree/master/courses/machine_learning/deepdive/08_image/flowersmodeltpu

在 TPU 上运行 ResNet 链接

https://codelabs.developers.google.com/codelabs/tpu-resnet/#0

在 Coursera 上参加使用 TensorFlow 进行机器学习专业课程;此课程会逐步介绍 TensorFlow 概念,以及如何在 Google Cloud 上大规模地训练、调整和部署 ML 模型。


声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 存储区域
    +关注

    关注

    0

    文章

    8

    浏览量

    7128
  • TPU
    TPU
    +关注

    关注

    0

    文章

    131

    浏览量

    20512
  • Cloud
    +关注

    关注

    0

    文章

    61

    浏览量

    5254

原文标题:如何为 Cloud TPU 编写自定义估算器模型

文章出处:【微信号:tensorflowers,微信公众号:Tensorflowers】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    Android端自定义铃声 MobPush对安卓端自定义铃声的教程

    如何为APP推送设置独特的通知铃声呢?本次带来的是MobPush对安卓端自定义铃声的教程,快来看看吧~
    的头像 发表于 10-21 15:34 806次阅读
    Android端<b class='flag-5'>自定义</b>铃声 MobPush对安卓端<b class='flag-5'>自定义</b>铃声的教程

    基于YOLOv8实现自定义姿态评估模型训练

    Hello大家好,今天给大家分享一下如何基于YOLOv8姿态评估模型,实现在自定义数据集上,完成自定义姿态评估模型的训练与推理。
    的头像 发表于 12-25 11:29 1221次阅读
    基于YOLOv8实现<b class='flag-5'>自定义</b>姿态评估<b class='flag-5'>模型</b>训练

    nios 自定义ip问题求助

    sopc builder中添加自定义ip,编写自定义ip核的时候, avalon接口信号:clk、rst
    发表于 11-26 11:11

    STATCOM自定义建模及动稳态调压分析

    STATCOM自定义建模及动稳态调压分析:建立了statcom基于功率注入法的稳态模型和基于受控电流源的动态模型,并利用电力系统分析软件提供的用户自定义功能实现了这些
    发表于 03-18 16:09 18次下载

    1602自定义字符

    1602液晶能够显示自定义字符,能够根据读者的具体情况显示自定义字符。
    发表于 01-20 15:43 1次下载

    DOS下自定义时间重启

    DOS环境下,C语言编写自定义时间重启。
    发表于 03-16 09:40 6次下载

    RTWconfigurationguide基于模型设计—自定义

    基于模型设计—自定义目标系统配置指南,RTW自动代码生成相关资料。
    发表于 05-17 16:41 3次下载

    AVR编写1602自定义字符

    AVR编写自定义字符不错的练习题目
    发表于 03-20 14:21 1次下载

    自定义视图组件教程案例

    自定义组件 1.自定义组件-particles(粒子效果) 2.自定义组件- pulse(脉冲button效果) 3.自定义组件-progress(progress效果) 4.
    发表于 04-08 10:48 14次下载

    ArkUI如何自定义弹窗(eTS)

    自定义弹窗其实也是比较简单的,通过CustomDialogController类就可以显示自定义弹窗。
    的头像 发表于 08-31 08:24 1422次阅读

    教程 3:构建自定义配置文件

    教程 3:构建自定义配置文件
    发表于 03-15 19:39 0次下载
    教程 3:构建<b class='flag-5'>自定义</b>配置文件

    labview自定义控件

    labview自定义精美控件
    发表于 05-15 16:46 9次下载

    自定义算子开发

    一个完整的自定义算子应用过程包括注册算子、算子实现、含自定义算子模型转换和运行含自定义op模型四个阶段。在大多数情况下,您的
    的头像 发表于 04-07 16:11 1888次阅读
    <b class='flag-5'>自定义</b>算子开发

    labview超快自定义控件制作和普通自定义控件制作

    labview超快自定义控件制作和普通自定义控件制作
    发表于 08-21 10:32 5次下载

    谷歌发布多模态Gemini大模型及新一代TPU系统Cloud TPU v5p

    谷歌亦发布新一代TPU 系统——Cloud TPU v5p,以帮助训练尖端的 AI 模型。目
    的头像 发表于 12-12 10:50 789次阅读
    谷歌发布多模态Gemini大<b class='flag-5'>模型</b>及新一代<b class='flag-5'>TPU</b>系统<b class='flag-5'>Cloud</b> <b class='flag-5'>TPU</b> v5p