当前位置: 首页 > news >正文

杭州上城区网站建设烟台网站开发

杭州上城区网站建设,烟台网站开发,网站开发 ie兼容,dw做的网站怎么放到服务器上MapReduce论文阅读 1. 编程模型 Map 函数#xff08;kv - kv#xff09; Map 函数将输入的键值对处理为一系列中间值#xff08;键值对#xff09;#xff0c;并将所有的中间结果传递给 Reduce 处理。 map(String key, String value):// key: document name// val…MapReduce论文阅读 1. 编程模型 Map 函数kv - kv Map 函数将输入的键值对处理为一系列中间值键值对并将所有的中间结果传递给 Reduce 处理。 map(String key, String value):// key: document name// value: document contentsfor each word w in value:EmitIntermediate(w, 1);Reduce 函数kv - kv Reduce 函数将相同键的值合并每个 Reduce 函数输出 0 或 1 个结果。 reduce(String key, Iterator values):// key: a word// values: a list of countsint result 0;for each v in values:result ParseInt(v);Emit(AsString(result));类型定义 map (k1, v1) - list(k2, v2) reduce (k2, list(v2)) - list(v)例子统计文本中每个单词的出现次数词频统计 输入数据 假设有以下三段文本数据 文本1“apple banana apple”文本2“banana orange apple”文本3“orange banana banana” Map 阶段 Map 函数接受输入键值对 (k1, v1)其中 k1 是文本的标识例如文本编号或文件名v1 是文本内容。Map 函数将文本内容拆分为单词并为每个单词输出中间键值对 (k2, v2)其中 k2 是单词v2 是计数值 1。 Map 输入 (文本1, apple banana apple)(文本2, banana orange apple)(文本3, orange banana banana) Map 输出 对于输入 (文本1, apple banana apple)Map 函数输出 (apple, 1) (banana, 1) (apple, 1)对于输入 (文本2, banana orange apple)Map 函数输出 (banana, 1) (orange, 1) (apple, 1)对于输入 (文本3, orange banana banana)Map 函数输出 (orange, 1) (banana, 1) (banana, 1)Shuffle and Sort整理和排序 将所有 Map 阶段的输出根据键 k2单词进行分组形成 (k2, list(v2)) 的形式 (apple, [1, 1, 1])(banana, [1, 1, 1, 1])(orange, [1, 1]) Reduce 阶段 Reduce 函数接受 (k2, list(v2))将 list(v2) 中的值累加得到每个单词的总出现次数。 对于 (apple, [1, 1, 1])Reduce 函数计算 total 1 1 1 3 输出(apple, 3)对于 (banana, [1, 1, 1, 1])Reduce 函数计算 total 1 1 1 1 4 输出(banana, 4)对于 (orange, [1, 1])Reduce 函数计算 total 1 1 2 输出(orange, 2)最终输出 (apple, 3)(banana, 4)(orange, 2) 总结流程 Map 阶段将输入的文本内容拆分为单词对每个单词生成键值对 (k2, v2)其中 k2 是单词v2 是计数 1。Shuffle and Sort将所有的键值对按照键 k2 进行分组收集所有相同键的值到一个列表 list(v2) 中。Reduce 阶段对每个键 k2将对应的 list(v2) 中的值进行累加得到该单词的总出现次数。 通过这个例子可以清楚地看到 MapReduce 的工作过程 Map 阶段处理输入数据映射 (k1, v1) 到一系列的 (k2, v2)。Reduce 阶段聚合中间结果将 (k2, list(v2)) 转化为最终结果 (k2, v)。 这个过程非常适合处理大规模的数据集因为 Map 和 Reduce 阶段都可以并行执行充分利用分布式计算资源。 示例 本节中我们给出了一些简单的示例这些示例都是可以通过 MapReduce 计算实现的有趣程序。 分布式“grep”如果一行文本匹配给定的模式那么 Map 函数会输出该行。Reduce 作为一个恒等函数仅将提供的中间数据复制到输出。 URL 访问频率计数Map 函数处理网页请求日志输出 URL, 1。Reduce 函数对相同 URL 的值求和输出 URL, 总数 键值对。 反转 Web 链接拓扑图Map 函数在名为 source 的页面中对每个指向的 target URL 输出一个 target, source 键值对。Reduce 函数将所有相同 target 的 source 合并为一个列表输出 target, list(source) 键值对。 每个主机的词向量统计词向量是对一系列文档中最重要的词的汇总其形式为 词, 词频 的键值对列表。Map 函数为每篇输入文档输出一个 主机名, 词向量 键值对主机名 由文档的 URL 解析而来。Reduce 函数接收给定主机上所有文章的词向量将这些词向量相加丢弃低频词最终输出 主机名, 词向量 键值对。 倒排索引Map 函数处理每篇文档输出一系列 词, 文档 ID 的键值对。Reduce 函数接受相同词的所有键值对按 文档 ID 排序输出 词, list(文档 ID) 键值对。所有输出的键值对集合组成了一个简单的倒排索引。如果需要跟踪词的位置可以通过增量计算实现。 分布式排序Map 函数提取每条记录中的键输出一个 键, 记录 的键值对。Reduce 函数不修改中间变量直接输出所有的键值对。排序计算依赖于 MapReduce 框架中的分区机制和排序特性以确保数据按照键的顺序输出。 2. 实现 执行概述 输入数据会自动被分割为 M M M个分片split这样map函数调用可以在多个机器上分布式执行每个输入的分片可以在不同机器上并行处理。中间键值对的键空间会通过被分区函数(例如 h a s h ( k e y ) m o d R hash(key) mod R hash(key)modR)分割为 R R R个分区这样reduce函数也可以分布式执行。其中分区的数量 R R R和分区函数由用户指定。 图1展示了在我们的实现中MapReduce操作的完整工作流。当用户程序调用MapReduce函数时会发生如下的操作下列序号与图1中序号对应 程序启动 顶部的 User Program 启动后通过 fork 操作创建一个 Master 节点和多个 Worker 节点Master 负责任务的协调和分配 输入阶段 左侧显示输入文件被分成多个 splitssplit 0 到 split 4这些 splits 代表了需要处理的原始数据的分片 Map 阶段 Master 将 map 任务分配给 worker 节点Worker 节点读取相应的输入分片图中标注为(3) read每个 map worker 处理数据并将结果写入本地磁盘标注为(4) local write生成的中间文件存储在 local disks 上 中间阶段 中间结果以文件形式存储在各个节点的本地磁盘上这些文件等待被 reduce 阶段的 worker 处理 Reduce 阶段 Master 为 reduce worker 分配任务Reduce worker 通过远程读取标注为(5) remote read获取它们需要处理的中间文件数据最后将处理结果写入最终的输出文件标注为(6) write 输出阶段 最终生成多个输出文件output file 0, output file 1这些文件包含了整个MapReduce作业的最终结果 整个流程展示了 MapReduce 的核心特点 并行处理多个 worker 同时工作数据本地性map 阶段的数据尽量在本地处理容错性master 可以重新分配失败的任务可扩展性worker 节点可以根据需要增加 这种架构设计使得 MapReduce 能够高效地处理大规模数据成为分布式计算的重要范式。 假设我们要统计一个大型文本库中每个单词出现的次数 数据分片(Split)阶段 假设我们有 100GB 的文本文件系统将其分成 M2000 个分片每个约 50MB此时会启动一个 master 和多个 worker 进程 任务分配阶段 Master 会将 2000 个 map 任务分配给空闲的 workers同时分配好 R假设50个 reduce 任务 Map 处理阶段 # Map 函数示例 def map(document):for word in document.split():emit(word, 1) # 为每个单词发出(word, 1)的键值对中间结果处理 每个 map worker 将产生的键值对如{“hello”: 1, “world”: 1}缓存在内存按照分区函数如 hash(word) mod 50将数据分成 50 个区域将位置信息报告给 master Reduce 获取与排序 每个 reduce worker 负责特定的单词集合例如reduce-0 可能负责 hash 值为 0 的所有单词从所有 map worker 收集数据后进行排序 # 排序后的数据可能如下 {hello: [1,1,1,1], hi: [1,1]}Reduce 处理 # Reduce 函数示例 def reduce(word, counts):sum 0for count in counts:sum countemit(word, sum)完成阶段 最终得到 50 个输出文件每个文件包含一部分单词的统计结果这些文件可以直接用于后续处理如 作为新的 MapReduce 任务的输入被其他分布式程序使用 关键特点 容错性如果某个 worker 失败master 会将任务重新分配给其他 worker 本地性优化master 会尽量将 map 任务分配给拥有对应输入数据的机器减少网络传输 扩展性 M 和 R 的值可以远大于机器数量动态负载均衡适应不同规模的计算集群 在我们的MapReduce实现的实际情况中对 M M M和 R R R的上限进行了限制。如前文所述master必须做出 O ( M R ) O(MR) O(MR)个调度决策并在内存中保存 O ( M × R ) O(M \times R) O(M×R)个状态。内存占用的常数因子比较小 O ( M × R ) O(M \times R) O(M×R)条状态由大约每个map/reduce任务仅一字节的数据组成。 此外 R R R还经常受用户限制因为每个reduce任务会生成一个单独的输出文件。在实际情况下我们更倾向于自定义参数 M M M这样可以使每个单独的任务的输入数据大概在16MB到64MB这样可以使前面提到的局部性优化最有效同时我们使 R R R是期望使用的worker机器的较小的倍数。我们经常在 2 , 000 2,000 2,000台机器上选择 M 200 , 000 M200,000 M200,000、 R 5 , 000 R5,000 R5,000的参数执行MapReduce计算。 容错 1. Worker 故障处理 Master 通过定期 ping 来监控 worker 状态Worker 无响应时会被标记为故障故障 worker 的任务处理 已完成的 map 任务重置为等待中状态需要重新执行因为输出在本地磁盘已完成的 reduce 任务不需重新执行因为输出在全局文件系统正在执行的任务重置为等待中状态 Map 任务重新执行时会通知所有 reduce worker系统可以处理大规模故障如文中提到的 80 台机器同时故障 2. Master 故障处理 采用检查点机制定期保存状态Master 故障时从最近检查点恢复当前实现Master 故障则终止计算由客户端决定是否重试 3. 故障时的语义保证 确定性函数情况 分布式执行结果与顺序执行相同通过原子提交机制实现 Map/Reduce 任务输出先写入临时文件Map 完成时向 master 报告临时文件名Reduce 完成时原子性重命名为永久文件 非确定性函数情况 提供较弱的语义保证同一 reduce 任务的输出等同于顺序执行不同 reduce 任务可能基于不同执行结果 改进 1. 分区函数Partitioning Function 允许用户自定义分区方式默认使用 hash(key) mod R可根据需求自定义如按 URL 主机名分区 2. 有序性保证 同一分区内的中间键值对按键升序处理便于生成有序输出支持按键的高效随机访问 3. 合并函数Combiner Function 在 map 端预先合并相同 key 的数据减少网络传输量适用于满足交换律和结合律的操作如词频统计 4. 输入输出类型 支持多种输入格式如文本行、键值对序列可扩展的 reader/writer 接口支持自定义数据源如数据库、内存数据结构 5. 附属输出 支持生成额外的输出文件通过临时文件和原子重命名确保一致性不支持跨文件的两阶段提交 6. 容错机制 可跳过导致确定性崩溃的记录使用信号处理器捕获错误master 跟踪并管理问题记录 7. 本地执行支持 提供本地顺序执行模式便于调试和测试支持常规调试工具如 gdb 8. 状态信息 master 提供 HTTP 状态页面显示进度、资源使用等信息支持任务执行状态监控 9. 计数器功能 支持自定义计数器统计事件自动维护系统级计数器用于监控和验证处理进度处理重复计数问题 1. 基本用法示例 - 文档处理统计 class DocumentCounters:def __init__(self):self.total_docs GetCounter(total_documents)self.english_docs GetCounter(english_documents)self.chinese_docs GetCounter(chinese_documents)self.invalid_docs GetCounter(invalid_documents)self.large_docs GetCounter(large_docs_over_1mb)def map(doc_id, content):# 增加总文档计数counters.total_docs.Increment()# 检查文档大小if len(content) 1024 * 1024: # 1MBcounters.large_docs.Increment()try:lang detect_language(content)if lang en:counters.english_docs.Increment()elif lang zh:counters.chinese_docs.Increment()process_document(content)except InvalidDocumentError:counters.invalid_docs.Increment()return# 任务完成后可以获取统计信息: # total_documents: 1000000 # english_documents: 600000 # chinese_documents: 350000 # invalid_documents: 50000 # large_docs_over_1mb: 50002. 质量控制示例 - 数据处理验证 class QualityCounters:def __init__(self):self.input_records GetCounter(input_records)self.output_records GetCounter(output_records)self.malformed_records GetCounter(malformed_records)self.null_fields GetCounter(null_required_fields)self.out_of_range GetCounter(value_out_of_range)def map(key, record):counters.input_records.Increment()# 验证记录格式if not is_valid_format(record):counters.malformed_records.Increment()return# 检查必填字段if has_null_required_fields(record):counters.null_fields.Increment()return# 检查数值范围if not is_value_in_range(record.value):counters.out_of_range.Increment()return# 处理有效记录process_record(record)counters.output_records.Increment()# 可以通过比较计数器验证数据质量: # input_records output_records malformed_records null_fields out_of_range3. 性能监控示例 - 处理时间统计 class PerformanceCounters:def __init__(self):self.processed_bytes GetCounter(processed_bytes)self.cache_hits GetCounter(cache_hits)self.cache_misses GetCounter(cache_misses)self.slow_operations GetCounter(slow_operations_over_100ms)def map(key, data):start_time time.time()# 统计处理的数据量counters.processed_bytes.Increment(len(data))# 缓存使用统计if cache.has(key):counters.cache_hits.Increment()else:counters.cache_misses.Increment()# 处理数据result process_data(data)# 统计慢操作if (time.time() - start_time) * 1000 100: # 100mscounters.slow_operations.Increment()# 可以分析性能指标: # cache_hit_rate cache_hits / (cache_hits cache_misses) # avg_processing_speed processed_bytes / total_time # slow_operation_percentage slow_operations / total_operations4. 业务指标统计示例 - 电商数据分析 class BusinessCounters:def __init__(self):self.total_orders GetCounter(total_orders)self.total_revenue GetCounter(total_revenue)self.cancelled_orders GetCounter(cancelled_orders)self.high_value_orders GetCounter(orders_over_1000)self.new_customers GetCounter(new_customers)def map(order_id, order):counters.total_orders.Increment()counters.total_revenue.Increment(order.amount)if order.amount 1000:counters.high_value_orders.Increment()if order.status CANCELLED:counters.cancelled_orders.Increment()if order.is_new_customer:counters.new_customers.Increment()# 可以计算关键业务指标: # average_order_value total_revenue / total_orders # cancellation_rate cancelled_orders / total_orders # high_value_percentage high_value_orders / total_orders3. 总结 目前我们使用MapReduce做的最重要的工作之一是完全重写了一个索引系统该系统被用作生成用于Google web搜索服务的数据结构。该索引系统将大量被我们爬虫系统检索到的文档作为GFS文件存储作为输入。这些文档的原始内容的数据大小超过20TB。索引进程会运行一系列5~10个MapReduce操作。使用MapReduce而不是旧版索引系统中ad-hoc分布式传递方案提供了很多好处 索引代码更简单、短、便于理解因为处理容错、分布式和并行的代码被隐藏在了MapReduce库中。例如计算中的有一个阶段的代码量从3800行C代码所见到了700行使用MapReduce的代码。 MapReduce库的性能足够好这让我们可以将概念上不相关的计算分离开而不是将它们混合在一起这样可以避免传递过多额外的数据。这使改变索引程序变得非常简单。例如在我们旧的索引系统中一处修改会花费几个月的时间而新的系统仅需要几天就能实现。 索引系统变得更容易操作。大部分因机器故障、缓慢的机器、网络不稳定等引起的问题都被MapReduce库自动处理了不需要引入额外的操作。此外向索引集群添加新机器以获得更好的性能变得更加简单。 实验环境 操作系统macosIDEGOLAND语言GO 1.23lab1链接 Lab1 说明 实验内容 MapReduce 作业的抽象视图 – 词频统计示例 Input1 - Map - a,1 b,1 Input2 - Map - b,1 Input3 - Map - a,1 c,1| | || | - Reduce - c,1| ----- Reduce - b,2--------- Reduce - a,2运行流程 输入已被分割成 M 个片段。MapReduce 为每个输入分片调用 Map() 函数生成键值对列表这些数据称为“中间数据”。每个 Map() 调用是一个“任务”。当 Map 任务完成后MapReduce 收集每个键的所有中间值并将每个键及其对应的值传递给 Reduce 调用。最终输出是一组来自 Reduce() 的键值对。 词频统计代码 Map(d)将 d 切分成单词对于每个单词 wemit(w, 1)Reduce(k, v[])emit(len(v[]))MapReduce 特点 扩展性多台计算机N台可以获得 N 倍的吞吐量Map() 和 Reduce() 函数可并行运行。自动化容错处理自动处理 Map 和 Reduce 函数代码的分发、任务跟踪、中间数据的传输、负载均衡、崩溃恢复等。设计限制不允许有交互或状态数据流只支持 Map/Reduce 模式不支持实时或流式处理。 实验要求 核心任务 实现一个分布式的 MapReduce 系统包含两个主要程序 coordinator协调者worker工作者 系统架构 只有一个 coordinator 进程。允许多个 worker 进程并行执行worker 通过 RPC 与 coordinator 通信。所有进程运行在同一台机器上。 Worker 工作流程 向 coordinator 请求任务。从一个或多个文件读取任务输入。执行任务并将输出写入一个或多个文件。重复请求新任务直到任务完成。 具体要求 Map 阶段输出要求 将中间键分成 nReduce 个桶每个 mapper 创建 nReduce 个中间文件供 reduce 任务使用。 输出文件要求 第 X 个 reduce 任务的输出放在 mr-out-X 文件中每个 Reduce 函数的输出一行使用 Go 的 “%v %v” 格式化输出 key 和 value。 容错处理 coordinator 需要检测 worker 是否在 10 秒内完成任务如果超时则将任务重新分配给其他 worker。 任务完成判断 coordinator 需实现 Done() 方法当 MapReduce 工作完全结束时返回 true通知 worker 进程退出。 可修改的文件 mr/worker.gomr/coordinator.gomr/rpc.go 实验提示Hints 入门建议 修改 mr/worker.go 的 Worker() 函数使其发送 RPC 请求给 coordinator 请求任务。修改 coordinator 使其返回尚未开始的 map 任务文件名worker 可使用该文件名调用应用程序的 Map 函数。 插件 Map 和 Reduce 函数通过 Go plugin 包从 .so 文件加载。修改 mr/ 目录下内容时需重新构建 MapReduce 插件go build -buildmodeplugin ../mrapps/wc.go文件系统 本实验要求 workers 共享文件系统同一机器上运行时较简单不同机器上则需 GFS 等全局文件系统支持。 命名约定 中间文件命名格式mr-X-Y其中 X 是 Map 任务编号Y 是 reduce 任务编号。 JSON 存储建议 // 写入 key/value 对 enc : json.NewEncoder(file) for _, kv : ... {err : enc.Encode(kv) }// 读取 key/value 对 dec : json.NewDecoder(file) for {var kv KeyValueif err : dec.Decode(kv); err ! nil {break}kva append(kva, kv) }任务分配 worker 可以使用 ihash(key) 函数在 worker.go 中来选择 reduce 任务。可参考 mrsequential.go 实现中的代码读取 Map 输入文件、中间 key/value 对排序、将 Reduce 输出存储到文件。 并发与锁定 coordinator 是 RPC 服务器需要锁定共享数据。可使用 Go 的竞态检测器测试并发go run -race等待机制 workers 有时需要等待如等待最后一个 map 完成后才能开始 reduce可让 workers 定期询问 coordinator并使用 time.Sleep()。 错误处理 coordinator 需要处理 worker 崩溃或卡住的情况建议等待 10 秒后假设 worker 已死亡重新分配任务。 测试建议 使用 mrapps/crash.go 测试崩溃恢复。使用临时文件并原子重命名处理崩溃的文件写入tempFile, _ : ioutil.TempFile(, temp-) os.Rename(tempFile.Name(), final-output-file)RPC 相关 Go RPC 只发送首字母大写的结构体字段reply 结构体需包含默认值。调用 RPC call() 函数时reply 结构体应包含所有默认值reply : SomeType{} call(..., reply)规则说明 Map 阶段规则 Map 阶段将中间键分成 nReduce 个桶用于 reduce 任务每个 mapper 为 reduce 任务创建 nReduce 个中间文件。 输出文件规则 worker 将第 X 个 reduce 任务的输出放在 mr-out-X 文件中。 输出格式规则 mr-out-X 文件中的每行输出使用 Go 的 “%v %v” 格式传入 key 和 value。 文件修改规则 可修改 mr/worker.go、mr/coordinator.go 和 mr/rpc.go。测试时需确保代码在原始版本下正常运行。 中间输出规则 worker 将 Map 的中间输出放在当前目录的文件中供后续 Reduce 任务读取。 任务完成状态规则 main/mrcoordinator.go 中 Done() 方法在任务完成时返回 true协调器结束任务。 进程退出规则 worker 如果无法联系到 coordinator可假设任务已完成并退出。coordinator 也可发送“请退出”的伪任务。 测试要求 必须通过以下测试 Word Count 测试Indexer 测试Map 并行性测试Reduce 并行性测试Job Count 测试Early Exit 测试Crash 测试 所有测试包含在 test-mr.sh 脚本中最终输出需与顺序执行结果一致。 具体实现 问题 使用goland中的模板进行build的时候出错 如果直接从main函数进行build不会报错如果手动从模板进行会报错lab题目理解出问题 总结 参考 MapReduce论文翻译
http://www.dnsts.com.cn/news/195190.html

相关文章:

  • 怎么在百度上制作自己的网站最好的 受欢迎的 免费的
  • 奇趣统计网站谁做的心理医院网站优化服务商
  • 没有网站也可以做推广吗做别墅装修的公司
  • 学校部门网站建设总结做普通网站多少钱
  • 网站设计用什么做公司做网站让拍照备案
  • 石家庄市交建高速公路建设管理有限公司网站北京又不让出京了
  • 杭州余杭网站建设数字营销公司
  • 去国外做非法网站吗婚纱影楼网站免费源码
  • 网站开发人员薪酬wordpress格式化sql串
  • 个人网站注销原因中文在线 在线
  • 网站建设初级教程网站构建代码模板
  • 网站建设基础流程深圳市网络seo推广价格
  • 一个网站是怎么建立的仿牌网站 域名注册
  • 企业为什么做企业网站和推广西安室内设计公司排名
  • 网站环境配置外发加工网会员
  • 网站建设毕业设计的分类号页面效果设计
  • 如何查询一个网站的空间服务商东莞建设网站制作
  • 做网站设计制作的公司装饰公司取名
  • 宣传型网站建设想开个网站怎样开
  • 网站建设模块化实现郑州专业网站推广公司
  • 网站搜索出来有图片ppt设计器在哪里
  • 网站js修改头像代码做平面设计素材的哪个网站好
  • 淮安做网站 卓越凯欣建设银行龙卡信用卡在境外网站支付
  • 东莞食品公司东莞网站建设seo内容优化
  • 建设银行潍坊支行网站深圳做网站价格
  • 个人网站的搭建福建住建设厅官方网站
  • 淘宝优惠券 如果做网站成都网站建设 天空在线
  • html5国内网站平顶山市住房和城乡建设局网站
  • 深圳市工程建设造价网站小程序定制开发多少钱一个
  • 网站开发后台做些什么怎么建设自己的网站